Skip to content

API Reference

Main functions

hyperbase

hedge(source)

Create a hyperedge.

Source code in src/hyperbase/builders.py
def hedge(
    source: str | Hyperedge | list | tuple | ParseResult,
) -> Hyperedge:
    """Create a hyperedge."""
    if isinstance(source, ParseResult):
        _source = source
        edge = _rebuild_with_text(_source.edge, _source.tok_pos, _source.tokens)
        object.__setattr__(edge, "text", _source.text)
        return edge
    if type(source) in {tuple, list}:
        _source = cast(Iterable, source)
        return Hyperedge(tuple(hedge(item) for item in _source))
    elif type(source) is str:
        return _hedge_from_str(source)
    elif type(source) in {Hyperedge, Atom, UniqueAtom}:
        return source  # type: ignore
    else:
        raise TypeError(
            f"Cannot create hyperedge from {type(source).__name__}: {source!r}"
        )

load_edges(source, lazy=False)

Load a sequence of hyperedges from various sources.

source can be: - A path to a .jsonl file (one JSON object per line, each treated as a ParseResult). - A path to a .json file (must contain a JSON array, items handled as the sequence case below). - A path to any other text file (one edge string per line, fed to hedge). - Any iterable of items accepted by hedge. dict items are first converted to ParseResult via ParseResult.from_dict.

If lazy is True, return a generator (lazy evaluation). If lazy is False (default), return a list.

Source code in src/hyperbase/loaders.py
def load_edges(
    source: str | PathLike | Iterable,
    lazy: bool = False,
) -> Iterator[Hyperedge] | list[Hyperedge]:
    """Load a sequence of hyperedges from various sources.

    *source* can be:
    - A path to a ``.jsonl`` file (one JSON object per line, each treated as
      a ``ParseResult``).
    - A path to a ``.json`` file (must contain a JSON array, items handled as
      the sequence case below).
    - A path to any other text file (one edge string per line, fed to
      ``hedge``).
    - Any iterable of items accepted by ``hedge``.  ``dict`` items are first
      converted to ``ParseResult`` via ``ParseResult.from_dict``.

    If *lazy* is ``True``, return a generator (lazy evaluation).
    If *lazy* is ``False`` (default), return a list.
    """
    gen = _generate_edges(source)
    if lazy:
        return gen
    return list(gen)

get_parser(name, params=None, **kwargs)

Instantiate a parser plugin by name.

Looks up name in the hyperbase.parsers entry-point group and returns an instance of the registered :class:Parser subclass.

params is a dictionary of parser parameters. For backwards compatibility, keyword arguments are merged into params (explicit params entries take precedence).

Raises :class:ValueError if the parser is not installed.

Source code in src/hyperbase/parsers/__init__.py
def get_parser(
    name: str, params: dict[str, Any] | None = None, **kwargs: object
) -> Parser:
    """Instantiate a parser plugin by name.

    Looks up *name* in the ``hyperbase.parsers`` entry-point group and
    returns an instance of the registered :class:`Parser` subclass.

    *params* is a dictionary of parser parameters.  For backwards
    compatibility, keyword arguments are merged into *params* (explicit
    *params* entries take precedence).

    Raises :class:`ValueError` if the parser is not installed.
    """
    parsers = list_parsers()
    if name not in parsers:
        available = ", ".join(sorted(parsers)) or "(none)"
        raise ValueError(
            f"Parser {name!r} is not installed. Available parsers: {available}"
        )
    merged: dict[str, Any] = {**kwargs, **(params or {})}
    cls = parsers[name].load()
    return cls(merged)

Hyperedge module

hyperbase.hyperedge

Hyperedge dataclass

Non-atomic hyperedge.

Source code in src/hyperbase/hyperedge.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
@dataclass(frozen=True, init=False, eq=False, repr=False)
class Hyperedge:
    """Non-atomic hyperedge."""

    _edges: tuple[Hyperedge, ...]
    text: str | None
    _cache: dict[str, Any] = field(
        default_factory=dict, repr=False, compare=False, hash=False
    )

    def __init__(self, edges: Iterable[Hyperedge], text: str | None = None) -> None:
        object.__setattr__(self, "_edges", tuple(edges))
        object.__setattr__(self, "text", text)
        object.__setattr__(self, "_cache", {})

    def __iter__(self) -> Iterator[Hyperedge]:
        return iter(self._edges)

    @overload
    def __getitem__(self, key: int) -> Hyperedge: ...
    @overload
    def __getitem__(self, key: slice) -> tuple[Hyperedge, ...]: ...

    def __getitem__(self, key):
        return self._edges[key]

    def __len__(self) -> int:
        return len(self._edges)

    def __hash__(self) -> int:
        return hash(self._edges)

    def __eq__(self, other: object) -> bool:
        if isinstance(other, Hyperedge):
            return self._edges == other._edges
        if isinstance(other, tuple):
            return self._edges == other
        return NotImplemented

    def __bool__(self) -> bool:
        return True

    @property
    def atom(self) -> bool:
        """True if edge is an atom."""
        return False

    @property
    def not_atom(self) -> bool:
        """True if edge is not an atom."""
        return True

    @property
    def t(self) -> str:
        """Edge type.
        (this property is a shortcut for Hyperedge.type())
        """
        return self.type()

    @property
    def mt(self) -> str:
        """Edge main type.
        (this property is a shortcut for Hyperedge.mtype())
        """
        return self.mtype()

    @property
    def ct(self) -> str | None:
        """Edge connector type.
        (this property is a shortcut for Hyperedge.connector_type())
        """
        return self.connector_type()

    @property
    def cmt(self) -> str | None:
        """Edge connector main type.
        (this property is a shortcut for Hyperedge.mconnector_type())
        """
        return self.connector_mtype()

    def match(
        self, pattern: Hyperedge | str | list[object] | tuple[object, ...]
    ) -> list[dict[str, Hyperedge]]:
        """Match this edge against a pattern. See ``match_pattern`` for details."""
        from hyperbase.patterns import match_pattern

        return match_pattern(self, pattern)

    def label(self) -> str:
        """Generate human-readable label for edge."""
        conn_atom = self.connector_atom()
        if len(self) == 2:
            edge = self
        elif conn_atom is not None and conn_atom.parts()[-1] == ".":
            edge = self[1:]
        else:
            edge = (self[1], self[0], *self[2:])
        return " ".join([item.label() for item in edge])

    def inner_atom(self) -> Atom:
        """The inner atom inside of a modifier structure.

        For example, condider:
        (red/M shoes/C)
        The inner atom is:
        shoes/C
        Or, the more complex case:
        ((and/J slow/M steady/M) go/P)
        Yields:
        gp/P

        This method should not be used on structures that contain more than
        one inner atom, for example concepts constructed with builders or
        relations.

        The inner atom of an atom is itself.
        """
        return self[1].inner_atom()  # type: ignore[no-any-return]

    def connector_atom(self) -> Atom | None:
        """The inner atom of the connector.

        For example, condider:
        (does/M (not/M like/P.so) john/C chess/C)
        The connector atom is:
        like/P.so

        The connector atom of an atom is None.
        """
        return self[0].inner_atom()  # type: ignore[no-any-return]

    def atoms(self) -> set[Atom]:
        """Returns the set of atoms contained in the edge.

        For example, consider the edge:
        (the/md (of/br mayor/cc (the/md city/cs)))
        in this case, edge.atoms() returns:
        [the/md, of/br, mayor/cc, city/cs]
        """
        atom_set: set[Atom] = set()
        for item in self:
            for atom in item.atoms():
                atom_set.add(atom)
        return atom_set

    def all_atoms(self) -> list[Atom]:
        """Returns a list of all the atoms contained in the edge. Unlike
        atoms(), which does not return repeated atoms, all_atoms() does
        return repeated atoms if they are different objects.

        For example, consider the edge:
        (the/md (of/br mayor/cc (the/md city/cs)))
        in this case, edge.all_atoms() returns:
        [the/md, of/br, mayor/cc, the/md, city/cs]
        """
        atoms: list[Atom] = []
        for item in self:
            atoms += item.all_atoms()
        return atoms

    def size(self) -> int:
        """The size of an edge is its total number of atoms, at all depths."""
        if "size" not in self._cache:
            self._cache["size"] = sum(edge.size() for edge in self)
        return self._cache["size"]

    def depth(self) -> int:
        """Returns maximal depth of edge, an atom has depth 0."""
        if "depth" not in self._cache:
            max_d = 0
            for item in self:
                d = item.depth()
                if d > max_d:
                    max_d = d
            self._cache["depth"] = max_d + 1
        return self._cache["depth"]

    def contains(self, needle: Hyperedge) -> bool:
        """Checks recursively if 'needle' is contained in edge."""
        for item in self:
            if item == needle:
                return True
            if item.contains(needle):
                return True
        return False

    def subedges(self) -> set[Hyperedge]:
        """Returns all the subedges contained in the edge, including atoms
        and itself.
        """
        edges: set[Hyperedge] = {self}
        for item in self:
            edges = edges.union(item.subedges())
        return edges

    def replace_atom(
        self, old: Atom, new: Hyperedge, unique: bool = False
    ) -> Hyperedge:
        """Returns edge built by replacing every instance of 'old' in
        this edge with 'new'.

        Keyword argument:
        unique -- match only the exact same instance of the atom, i.e.
        UniqueAtom(self) == UniqueAtom(old) (default: False)
        """
        from hyperbase.transforms import replace_atom

        return replace_atom(self, old, new, unique=unique)

    def simplify(self, subtypes: bool = False, namespaces: bool = False) -> Hyperedge:
        """Returns a version of the edge with simplified atoms.

        Keyword arguments:
        subtypes -- include subtypes (default: True).
        namespaces -- include namespaces (default: True).
        """
        from hyperbase.transforms import simplify

        return simplify(self, subtypes=subtypes, namespaces=namespaces)

    def type(self) -> str:
        """Returns the type of this edge as a string.
        Type inference is performed.
        """
        if "type" in self._cache:
            return self._cache["type"]
        ptype = self[0].type()
        if ptype[0] == EdgeType.PREDICATE:
            outter_type = EdgeType.RELATION
        elif ptype[0] == EdgeType.MODIFIER:
            if len(self) < 2:
                raise RuntimeError(
                    f"Edge is malformed, type cannot be determined: {self!s}"
                )
            result = self[1].type()
            self._cache["type"] = result
            return result
        elif ptype[0] == EdgeType.TRIGGER:
            outter_type = EdgeType.SPECIFIER
        elif ptype[0] == EdgeType.BUILDER:
            outter_type = EdgeType.CONCEPT
        elif ptype[0] == EdgeType.CONJUNCTION:
            if len(self) < 2:
                raise RuntimeError(
                    f"Edge is malformed, type cannot be determined: {self!s}"
                )
            result = self[1].mtype()
            self._cache["type"] = result
            return result
        else:
            raise RuntimeError(
                f"Edge is malformed, type cannot be determined: {self!s}"
            )

        result = outter_type + ptype[1:]
        self._cache["type"] = result
        return result

    def connector_type(self) -> str | None:
        """Returns the type of the edge's connector.
        If the edge has no connector (i.e. it's an atom), then None is
        returned.
        """
        if "connector_type" not in self._cache:
            self._cache["connector_type"] = self[0].type()
        return self._cache["connector_type"]

    def mtype(self) -> str:
        """Returns the main type of this edge as a string of one character.
        Type inference is performed.
        """
        return self.type()[0]

    def connector_mtype(self) -> str | None:
        """Returns the main type of the edge's connector.
        If the edge has no connector (i.e. it's an atom), then None is
        returned.
        """
        ct = self.connector_type()
        if ct:
            return ct[0]
        else:
            return None

    def atom_with_type(self, atom_type: str) -> Atom | None:
        """Returns the first atom found in the edge that has the given
        'atom_type', or whose type starts with 'atom_type'.
        If no such atom is found, returns None.

        For example, given the edge (+/B a/Cn b/Cp) and the 'atom_type'
        c, this function returns:
        a/Cn
        If the 'atom_type' is 'Cp', the it will return:
        b/Cp
        """
        for item in self:
            atom: Atom | None = item.atom_with_type(atom_type)
            if atom:
                return atom
        return None

    def argroles(self) -> str:
        """Returns the argument roles string of the edge, if it exists.
        Otherwise returns empty string.

        Argument roles can be return for the entire edge that they apply to,
        which can be a relation (R) or a concept (C). For example:

        ((not/M is/P.sc) bob/C sad/C) has argument roles "sc",
        (of/B.ma city/C berlin/C) has argument roles "ma".

        Argument roles can also be returned for the connectors that define
        the outer edge, which can be of type predicate (P) or builder (B). For
        example:

        (not/M is/P.sc) has argument roles "sc",
        of/B.ma has argument roles "ma".
        """
        if "argroles" in self._cache:
            return self._cache["argroles"]
        et = self.mtype()
        if et in {EdgeType.RELATION, EdgeType.CONCEPT} and self[0].mtype() in {
            EdgeType.BUILDER,
            EdgeType.PREDICATE,
        }:
            result = self[0].argroles()
        elif et not in {EdgeType.BUILDER, EdgeType.PREDICATE}:
            result = ""
        else:
            result = self[1].argroles()
        self._cache["argroles"] = result
        return result

    def replace_argroles(self, argroles: str | None) -> Hyperedge:
        """Returns an edge with the argroles of the connector atom replaced
        with the provided string.
        Returns same edge if the atom does not contain a role part."""
        from hyperbase.transforms import replace_argroles

        return replace_argroles(self, argroles)

    def _insert_argrole(self, argrole: str, pos: int) -> Hyperedge:
        """Returns an edge with the given argrole inserted at the specified
        position in the argroles of the connector atom.
        Same restrictions as in replace_argroles() apply."""
        from hyperbase.transforms import insert_argrole

        return insert_argrole(self, argrole, pos)

    def add_argument(
        self, edge: Hyperedge, argrole: str, pos: int | None = None
    ) -> Hyperedge:
        """Returns a new edge with the provided edge and its argroles inserted
        at the specified position. If pos is not provided, the argument is
        appended at the end."""
        from hyperbase.transforms import add_argument

        return add_argument(self, edge, argrole, pos)

    def arguments_with_role(self, argrole: str) -> list[Hyperedge]:
        """Returns the list of edges with the given argument role."""
        edges: list[Hyperedge] = []
        connector = self[0]

        argroles = connector.argroles()
        if len(argroles) > 0 and argroles[0] == "{":
            argroles = argroles[1:-1]
        argroles = argroles.replace(",", "").replace("[", "").replace("]", "")
        for pos, role in enumerate(argroles):
            if role == argrole and pos < len(self) - 1:
                edges.append(self[pos + 1])
        return edges

    def check_correctness(self) -> dict[Hyperedge, list[tuple[str, str]]]:
        from hyperbase.correctness import check_correctness

        return check_correctness(self)

    def normalise(self) -> Hyperedge:
        from hyperbase.transforms import normalise

        return normalise(self)

    ############
    # patterns #
    ############
    def is_wildcard(self) -> bool:
        from hyperbase.patterns.checks import is_wildcard

        return is_wildcard(self)

    def is_pattern(self) -> bool:
        from hyperbase.patterns.checks import is_pattern

        return is_pattern(self)

    def is_fun_pattern(self) -> bool:
        from hyperbase.patterns.checks import is_fun_pattern

        return is_fun_pattern(self)

    #############
    # variables #
    #############
    def is_variable(self) -> bool:
        from hyperbase.patterns.checks import is_variable

        return is_variable(self)

    def contains_variable(self) -> bool:
        from hyperbase.patterns.checks import contains_variable

        return contains_variable(self)

    def variable_name(self) -> str:
        from hyperbase.patterns.checks import variable_name

        return variable_name(self)

    def __str__(self) -> str:
        s = " ".join([str(edge) for edge in self._edges if edge])
        return f"({s})"

    def __repr__(self) -> str:
        return str(self)

atom property

True if edge is an atom.

not_atom property

True if edge is not an atom.

t property

Edge type. (this property is a shortcut for Hyperedge.type())

mt property

Edge main type. (this property is a shortcut for Hyperedge.mtype())

ct property

Edge connector type. (this property is a shortcut for Hyperedge.connector_type())

cmt property

Edge connector main type. (this property is a shortcut for Hyperedge.mconnector_type())

match(pattern)

Match this edge against a pattern. See match_pattern for details.

Source code in src/hyperbase/hyperedge.py
def match(
    self, pattern: Hyperedge | str | list[object] | tuple[object, ...]
) -> list[dict[str, Hyperedge]]:
    """Match this edge against a pattern. See ``match_pattern`` for details."""
    from hyperbase.patterns import match_pattern

    return match_pattern(self, pattern)

label()

Generate human-readable label for edge.

Source code in src/hyperbase/hyperedge.py
def label(self) -> str:
    """Generate human-readable label for edge."""
    conn_atom = self.connector_atom()
    if len(self) == 2:
        edge = self
    elif conn_atom is not None and conn_atom.parts()[-1] == ".":
        edge = self[1:]
    else:
        edge = (self[1], self[0], *self[2:])
    return " ".join([item.label() for item in edge])

inner_atom()

The inner atom inside of a modifier structure.

For example, condider: (red/M shoes/C) The inner atom is: shoes/C Or, the more complex case: ((and/J slow/M steady/M) go/P) Yields: gp/P

This method should not be used on structures that contain more than one inner atom, for example concepts constructed with builders or relations.

The inner atom of an atom is itself.

Source code in src/hyperbase/hyperedge.py
def inner_atom(self) -> Atom:
    """The inner atom inside of a modifier structure.

    For example, condider:
    (red/M shoes/C)
    The inner atom is:
    shoes/C
    Or, the more complex case:
    ((and/J slow/M steady/M) go/P)
    Yields:
    gp/P

    This method should not be used on structures that contain more than
    one inner atom, for example concepts constructed with builders or
    relations.

    The inner atom of an atom is itself.
    """
    return self[1].inner_atom()  # type: ignore[no-any-return]

connector_atom()

The inner atom of the connector.

For example, condider: (does/M (not/M like/P.so) john/C chess/C) The connector atom is: like/P.so

The connector atom of an atom is None.

Source code in src/hyperbase/hyperedge.py
def connector_atom(self) -> Atom | None:
    """The inner atom of the connector.

    For example, condider:
    (does/M (not/M like/P.so) john/C chess/C)
    The connector atom is:
    like/P.so

    The connector atom of an atom is None.
    """
    return self[0].inner_atom()  # type: ignore[no-any-return]

atoms()

Returns the set of atoms contained in the edge.

For example, consider the edge: (the/md (of/br mayor/cc (the/md city/cs))) in this case, edge.atoms() returns: [the/md, of/br, mayor/cc, city/cs]

Source code in src/hyperbase/hyperedge.py
def atoms(self) -> set[Atom]:
    """Returns the set of atoms contained in the edge.

    For example, consider the edge:
    (the/md (of/br mayor/cc (the/md city/cs)))
    in this case, edge.atoms() returns:
    [the/md, of/br, mayor/cc, city/cs]
    """
    atom_set: set[Atom] = set()
    for item in self:
        for atom in item.atoms():
            atom_set.add(atom)
    return atom_set

all_atoms()

Returns a list of all the atoms contained in the edge. Unlike atoms(), which does not return repeated atoms, all_atoms() does return repeated atoms if they are different objects.

For example, consider the edge: (the/md (of/br mayor/cc (the/md city/cs))) in this case, edge.all_atoms() returns: [the/md, of/br, mayor/cc, the/md, city/cs]

Source code in src/hyperbase/hyperedge.py
def all_atoms(self) -> list[Atom]:
    """Returns a list of all the atoms contained in the edge. Unlike
    atoms(), which does not return repeated atoms, all_atoms() does
    return repeated atoms if they are different objects.

    For example, consider the edge:
    (the/md (of/br mayor/cc (the/md city/cs)))
    in this case, edge.all_atoms() returns:
    [the/md, of/br, mayor/cc, the/md, city/cs]
    """
    atoms: list[Atom] = []
    for item in self:
        atoms += item.all_atoms()
    return atoms

size()

The size of an edge is its total number of atoms, at all depths.

Source code in src/hyperbase/hyperedge.py
def size(self) -> int:
    """The size of an edge is its total number of atoms, at all depths."""
    if "size" not in self._cache:
        self._cache["size"] = sum(edge.size() for edge in self)
    return self._cache["size"]

depth()

Returns maximal depth of edge, an atom has depth 0.

Source code in src/hyperbase/hyperedge.py
def depth(self) -> int:
    """Returns maximal depth of edge, an atom has depth 0."""
    if "depth" not in self._cache:
        max_d = 0
        for item in self:
            d = item.depth()
            if d > max_d:
                max_d = d
        self._cache["depth"] = max_d + 1
    return self._cache["depth"]

contains(needle)

Checks recursively if 'needle' is contained in edge.

Source code in src/hyperbase/hyperedge.py
def contains(self, needle: Hyperedge) -> bool:
    """Checks recursively if 'needle' is contained in edge."""
    for item in self:
        if item == needle:
            return True
        if item.contains(needle):
            return True
    return False

subedges()

Returns all the subedges contained in the edge, including atoms and itself.

Source code in src/hyperbase/hyperedge.py
def subedges(self) -> set[Hyperedge]:
    """Returns all the subedges contained in the edge, including atoms
    and itself.
    """
    edges: set[Hyperedge] = {self}
    for item in self:
        edges = edges.union(item.subedges())
    return edges

replace_atom(old, new, unique=False)

Returns edge built by replacing every instance of 'old' in this edge with 'new'.

Keyword argument: unique -- match only the exact same instance of the atom, i.e. UniqueAtom(self) == UniqueAtom(old) (default: False)

Source code in src/hyperbase/hyperedge.py
def replace_atom(
    self, old: Atom, new: Hyperedge, unique: bool = False
) -> Hyperedge:
    """Returns edge built by replacing every instance of 'old' in
    this edge with 'new'.

    Keyword argument:
    unique -- match only the exact same instance of the atom, i.e.
    UniqueAtom(self) == UniqueAtom(old) (default: False)
    """
    from hyperbase.transforms import replace_atom

    return replace_atom(self, old, new, unique=unique)

simplify(subtypes=False, namespaces=False)

Returns a version of the edge with simplified atoms.

Keyword arguments: subtypes -- include subtypes (default: True). namespaces -- include namespaces (default: True).

Source code in src/hyperbase/hyperedge.py
def simplify(self, subtypes: bool = False, namespaces: bool = False) -> Hyperedge:
    """Returns a version of the edge with simplified atoms.

    Keyword arguments:
    subtypes -- include subtypes (default: True).
    namespaces -- include namespaces (default: True).
    """
    from hyperbase.transforms import simplify

    return simplify(self, subtypes=subtypes, namespaces=namespaces)

type()

Returns the type of this edge as a string. Type inference is performed.

Source code in src/hyperbase/hyperedge.py
def type(self) -> str:
    """Returns the type of this edge as a string.
    Type inference is performed.
    """
    if "type" in self._cache:
        return self._cache["type"]
    ptype = self[0].type()
    if ptype[0] == EdgeType.PREDICATE:
        outter_type = EdgeType.RELATION
    elif ptype[0] == EdgeType.MODIFIER:
        if len(self) < 2:
            raise RuntimeError(
                f"Edge is malformed, type cannot be determined: {self!s}"
            )
        result = self[1].type()
        self._cache["type"] = result
        return result
    elif ptype[0] == EdgeType.TRIGGER:
        outter_type = EdgeType.SPECIFIER
    elif ptype[0] == EdgeType.BUILDER:
        outter_type = EdgeType.CONCEPT
    elif ptype[0] == EdgeType.CONJUNCTION:
        if len(self) < 2:
            raise RuntimeError(
                f"Edge is malformed, type cannot be determined: {self!s}"
            )
        result = self[1].mtype()
        self._cache["type"] = result
        return result
    else:
        raise RuntimeError(
            f"Edge is malformed, type cannot be determined: {self!s}"
        )

    result = outter_type + ptype[1:]
    self._cache["type"] = result
    return result

connector_type()

Returns the type of the edge's connector. If the edge has no connector (i.e. it's an atom), then None is returned.

Source code in src/hyperbase/hyperedge.py
def connector_type(self) -> str | None:
    """Returns the type of the edge's connector.
    If the edge has no connector (i.e. it's an atom), then None is
    returned.
    """
    if "connector_type" not in self._cache:
        self._cache["connector_type"] = self[0].type()
    return self._cache["connector_type"]

mtype()

Returns the main type of this edge as a string of one character. Type inference is performed.

Source code in src/hyperbase/hyperedge.py
def mtype(self) -> str:
    """Returns the main type of this edge as a string of one character.
    Type inference is performed.
    """
    return self.type()[0]

connector_mtype()

Returns the main type of the edge's connector. If the edge has no connector (i.e. it's an atom), then None is returned.

Source code in src/hyperbase/hyperedge.py
def connector_mtype(self) -> str | None:
    """Returns the main type of the edge's connector.
    If the edge has no connector (i.e. it's an atom), then None is
    returned.
    """
    ct = self.connector_type()
    if ct:
        return ct[0]
    else:
        return None

atom_with_type(atom_type)

Returns the first atom found in the edge that has the given 'atom_type', or whose type starts with 'atom_type'. If no such atom is found, returns None.

For example, given the edge (+/B a/Cn b/Cp) and the 'atom_type' c, this function returns: a/Cn If the 'atom_type' is 'Cp', the it will return: b/Cp

Source code in src/hyperbase/hyperedge.py
def atom_with_type(self, atom_type: str) -> Atom | None:
    """Returns the first atom found in the edge that has the given
    'atom_type', or whose type starts with 'atom_type'.
    If no such atom is found, returns None.

    For example, given the edge (+/B a/Cn b/Cp) and the 'atom_type'
    c, this function returns:
    a/Cn
    If the 'atom_type' is 'Cp', the it will return:
    b/Cp
    """
    for item in self:
        atom: Atom | None = item.atom_with_type(atom_type)
        if atom:
            return atom
    return None

argroles()

Returns the argument roles string of the edge, if it exists. Otherwise returns empty string.

Argument roles can be return for the entire edge that they apply to, which can be a relation (R) or a concept (C). For example:

((not/M is/P.sc) bob/C sad/C) has argument roles "sc", (of/B.ma city/C berlin/C) has argument roles "ma".

Argument roles can also be returned for the connectors that define the outer edge, which can be of type predicate (P) or builder (B). For example:

(not/M is/P.sc) has argument roles "sc", of/B.ma has argument roles "ma".

Source code in src/hyperbase/hyperedge.py
def argroles(self) -> str:
    """Returns the argument roles string of the edge, if it exists.
    Otherwise returns empty string.

    Argument roles can be return for the entire edge that they apply to,
    which can be a relation (R) or a concept (C). For example:

    ((not/M is/P.sc) bob/C sad/C) has argument roles "sc",
    (of/B.ma city/C berlin/C) has argument roles "ma".

    Argument roles can also be returned for the connectors that define
    the outer edge, which can be of type predicate (P) or builder (B). For
    example:

    (not/M is/P.sc) has argument roles "sc",
    of/B.ma has argument roles "ma".
    """
    if "argroles" in self._cache:
        return self._cache["argroles"]
    et = self.mtype()
    if et in {EdgeType.RELATION, EdgeType.CONCEPT} and self[0].mtype() in {
        EdgeType.BUILDER,
        EdgeType.PREDICATE,
    }:
        result = self[0].argroles()
    elif et not in {EdgeType.BUILDER, EdgeType.PREDICATE}:
        result = ""
    else:
        result = self[1].argroles()
    self._cache["argroles"] = result
    return result

replace_argroles(argroles)

Returns an edge with the argroles of the connector atom replaced with the provided string. Returns same edge if the atom does not contain a role part.

Source code in src/hyperbase/hyperedge.py
def replace_argroles(self, argroles: str | None) -> Hyperedge:
    """Returns an edge with the argroles of the connector atom replaced
    with the provided string.
    Returns same edge if the atom does not contain a role part."""
    from hyperbase.transforms import replace_argroles

    return replace_argroles(self, argroles)

add_argument(edge, argrole, pos=None)

Returns a new edge with the provided edge and its argroles inserted at the specified position. If pos is not provided, the argument is appended at the end.

Source code in src/hyperbase/hyperedge.py
def add_argument(
    self, edge: Hyperedge, argrole: str, pos: int | None = None
) -> Hyperedge:
    """Returns a new edge with the provided edge and its argroles inserted
    at the specified position. If pos is not provided, the argument is
    appended at the end."""
    from hyperbase.transforms import add_argument

    return add_argument(self, edge, argrole, pos)

arguments_with_role(argrole)

Returns the list of edges with the given argument role.

Source code in src/hyperbase/hyperedge.py
def arguments_with_role(self, argrole: str) -> list[Hyperedge]:
    """Returns the list of edges with the given argument role."""
    edges: list[Hyperedge] = []
    connector = self[0]

    argroles = connector.argroles()
    if len(argroles) > 0 and argroles[0] == "{":
        argroles = argroles[1:-1]
    argroles = argroles.replace(",", "").replace("[", "").replace("]", "")
    for pos, role in enumerate(argroles):
        if role == argrole and pos < len(self) - 1:
            edges.append(self[pos + 1])
    return edges

Atom dataclass

Bases: Hyperedge

Atomic hyperedge.

Source code in src/hyperbase/hyperedge.py
@dataclass(frozen=True, init=False, eq=False, repr=False)
class Atom(Hyperedge):
    """Atomic hyperedge."""

    atom_str: str
    parens: bool

    def __init__(
        self,
        atom_str: str,
        parens: bool = False,
        text: str | None = None,
    ) -> None:
        object.__setattr__(self, "atom_str", atom_str)
        object.__setattr__(self, "parens", parens)
        object.__setattr__(self, "text", text)
        object.__setattr__(self, "_edges", ())
        object.__setattr__(self, "_cache", {})

    def __hash__(self) -> int:
        return hash(self.atom_str)

    def __eq__(self, other: object) -> bool:
        if isinstance(other, Atom):
            return self.atom_str == other.atom_str
        return False

    @property
    def atom(self) -> bool:
        """True if edge is an atom."""
        return True

    @property
    def not_atom(self) -> bool:
        """True if edge is not an atom."""
        return False

    def parts(self) -> list[str]:
        """Splits atom into its parts."""
        return self.atom_str.split("/")

    def root(self) -> str:
        """Extracts the root of an atom
        (e.g. the root of hyperbase/C/1 is hyperbase)."""
        return self.parts()[0]

    def replace_atom_part(self, part_pos: int, part: str) -> Atom:
        """Build a new atom by replacing an atom part in a given atom."""
        parts = self.parts()
        parts[part_pos] = part
        atom_str = "/".join([part for part in parts if part])
        return Atom(atom_str)

    def label(self) -> str:
        """Generate human-readable label from entity."""
        from hyperbase.constants import atom_decode

        return atom_decode(self.root())

    def inner_atom(self) -> Atom:
        return self

    def connector_atom(self) -> Atom | None:
        return None

    def atoms(self) -> set[Atom]:
        return {self}

    def all_atoms(self) -> list[Atom]:
        return [self]

    def size(self) -> int:
        return 1

    def depth(self) -> int:
        return 0

    def roots(self) -> Atom:
        """Returns edge with root-only atoms."""
        return Atom(self.root())

    def contains(self, needle: Hyperedge) -> bool:
        return self == needle

    def subedges(self) -> set[Hyperedge]:
        return {self}

    def role(self) -> list[str]:
        """Returns the role of this atom as a list of the subrole strings.

        The role of an atom is its second part, right after the root.
        A dot notation is used to separate the subroles. For example,
        the role of hyperbase/Cp.s/1 is:

            Cp.s

        For this case, this function returns:

            ['Cp', 's']

        If the atom only has a root, it is assumed to be a conjunction.
        In this case, this function returns the role with just the
        generic conjunction type:

            ['J'].
        """
        if "role" in self._cache:
            return self._cache["role"]
        parts: list[str] = self.atom_str.split("/")
        result = list("J") if len(parts) < 2 else parts[1].split(".")
        self._cache["role"] = result
        return result

    def type(self) -> str:
        """Returns the type of the atom (first subrole, default ``'J'``)."""
        if "type" in self._cache:
            return self._cache["type"]
        result = self.role()[0]
        self._cache["type"] = result
        return result

    def connector_type(self) -> str | None:
        return None

    def atom_with_type(self, atom_type: str) -> Atom | None:
        et = self.type()
        n = len(atom_type)
        if len(et) >= n and et[:n] == atom_type:
            return self
        else:
            return None

    def argroles(self) -> str:
        if "argroles" in self._cache:
            return self._cache["argroles"]
        et = self.mtype()
        if et not in {EdgeType.BUILDER, EdgeType.PREDICATE}:
            result = ""
        else:
            role = self.role()
            result = role[1] if len(role) >= 2 else ""
        self._cache["argroles"] = result
        return result

    def remove_argroles(self) -> Atom:
        from hyperbase.transforms import replace_argroles

        return replace_argroles(self, None)  # type: ignore[return-value]

    def arguments_with_role(self, argrole: str) -> list[Hyperedge]:
        return []

    def __repr__(self) -> str:
        return str(self)

    def __str__(self) -> str:
        if self.parens:
            return f"({self.atom_str})"
        else:
            return self.atom_str

atom property

True if edge is an atom.

not_atom property

True if edge is not an atom.

parts()

Splits atom into its parts.

Source code in src/hyperbase/hyperedge.py
def parts(self) -> list[str]:
    """Splits atom into its parts."""
    return self.atom_str.split("/")

root()

Extracts the root of an atom (e.g. the root of hyperbase/C/1 is hyperbase).

Source code in src/hyperbase/hyperedge.py
def root(self) -> str:
    """Extracts the root of an atom
    (e.g. the root of hyperbase/C/1 is hyperbase)."""
    return self.parts()[0]

replace_atom_part(part_pos, part)

Build a new atom by replacing an atom part in a given atom.

Source code in src/hyperbase/hyperedge.py
def replace_atom_part(self, part_pos: int, part: str) -> Atom:
    """Build a new atom by replacing an atom part in a given atom."""
    parts = self.parts()
    parts[part_pos] = part
    atom_str = "/".join([part for part in parts if part])
    return Atom(atom_str)

label()

Generate human-readable label from entity.

Source code in src/hyperbase/hyperedge.py
def label(self) -> str:
    """Generate human-readable label from entity."""
    from hyperbase.constants import atom_decode

    return atom_decode(self.root())

roots()

Returns edge with root-only atoms.

Source code in src/hyperbase/hyperedge.py
def roots(self) -> Atom:
    """Returns edge with root-only atoms."""
    return Atom(self.root())

role()

Returns the role of this atom as a list of the subrole strings.

The role of an atom is its second part, right after the root. A dot notation is used to separate the subroles. For example, the role of hyperbase/Cp.s/1 is:

Cp.s

For this case, this function returns:

['Cp', 's']

If the atom only has a root, it is assumed to be a conjunction. In this case, this function returns the role with just the generic conjunction type:

['J'].
Source code in src/hyperbase/hyperedge.py
def role(self) -> list[str]:
    """Returns the role of this atom as a list of the subrole strings.

    The role of an atom is its second part, right after the root.
    A dot notation is used to separate the subroles. For example,
    the role of hyperbase/Cp.s/1 is:

        Cp.s

    For this case, this function returns:

        ['Cp', 's']

    If the atom only has a root, it is assumed to be a conjunction.
    In this case, this function returns the role with just the
    generic conjunction type:

        ['J'].
    """
    if "role" in self._cache:
        return self._cache["role"]
    parts: list[str] = self.atom_str.split("/")
    result = list("J") if len(parts) < 2 else parts[1].split(".")
    self._cache["role"] = result
    return result

type()

Returns the type of the atom (first subrole, default 'J').

Source code in src/hyperbase/hyperedge.py
def type(self) -> str:
    """Returns the type of the atom (first subrole, default ``'J'``)."""
    if "type" in self._cache:
        return self._cache["type"]
    result = self.role()[0]
    self._cache["type"] = result
    return result

Parsers module

hyperbase.parsers

Parser

Source code in src/hyperbase/parsers/parser.py
class Parser:
    def __init__(self, params: dict[str, Any] | None = None) -> None:
        self.params: dict[str, Any] = params or {}
        self.max_depth: int = int(self.params.get("max_depth", DEFAULT_MAX_DEPTH))

    @classmethod
    def accepted_params(cls) -> dict[str, dict[str, Any]]:
        """Return the set of parameters this parser accepts.

        Each key is a parameter name. The value is a dict with:
        - ``"type"``: the expected Python type (e.g. ``str``, ``int``).
        - ``"default"``: the default value (or ``None`` if required).
        - ``"description"``: a short human-readable description.
        - ``"required"``: whether the parameter must be provided.

        Subclasses should merge their own parameters with the result of
        ``super().accepted_params()`` so that common parameters like
        ``max_depth`` remain discoverable.
        """
        return {
            "max_depth": {
                "type": int,
                "default": DEFAULT_MAX_DEPTH,
                "description": (
                    "Maximum allowed nesting depth for produced edges. "
                    "Sentences whose parse exceeds this depth are rejected "
                    "rather than processed, to avoid pathological inputs "
                    "blowing the Python stack."
                ),
                "required": False,
            },
        }

    @classmethod
    def cache_key_from_settings(cls, settings: dict[str, Any]) -> tuple:
        """Build a cache key tuple from a settings dict.

        The default implementation produces one ``(name, value)`` pair
        per entry in :meth:`accepted_params`, sorted by name. Two
        settings dicts that yield the same key are guaranteed to
        produce equivalent parser instances.
        """
        names = sorted(cls.accepted_params())
        return tuple((name, settings.get(name)) for name in names)

    @classmethod
    def format_cache_key(cls, cache_key: tuple) -> str:
        """Render a cache key produced by :meth:`cache_key_from_settings`
        as a human-readable string for the REPL ``/parsers`` command."""
        return ", ".join(f"{name}={value}" for name, value in cache_key)

    def install_repl(self, session: Any) -> None:  # noqa: ANN401
        """Hook for parser plugins to extend the Hyperbase REPL.

        Override this to register parser-specific REPL behavior on
        *session* (a :class:`hyperbase.cli.repl.ReplSession`). The
        session exposes the following registration methods:

        - ``register_command(name, help, handler)`` -- add a slash
          command callable as ``/name``.
        - ``register_setting(name, default, type_, description="")``
          -- expose an extra REPL-only setting (e.g. a display
          toggle) that can be changed via ``/set``.
        - ``register_pre_result_hook(hook)`` -- run *hook* after
          parsing but before the parse result panel is rendered.
        - ``register_post_result_hook(hook)`` -- run *hook* after the
          parse result panel is rendered.
        - ``register_stats_provider(provider)`` -- supply extra
          ``(label, value)`` rows for the statistics table.

        Hooks receive a :class:`~hyperbase.parsers.repl_api.ReplContext`
        object. The default implementation is a no-op.
        """

    def get_sentences(self, text: str) -> list[str]:
        raise NotImplementedError

    def parse_sentence(self, sentence: str) -> list[ParseResult]:
        raise NotImplementedError

    def parse_batch(self, sentences: list[str]) -> list[list[ParseResult]]:
        """Parse multiple sentences. Subclasses may override with a
        true batched implementation (e.g. a single CT2 call)."""
        return [self.parse_sentence(sentence) for sentence in sentences]

    def parse(
        self, text: str, batch_size: int = 8, progress: bool = False
    ) -> list[ParseResult]:
        """Sentensize text, then parse all sentences in batches.

        Returns a flat list of parse results across all sentences.
        """
        sentences = [s for s in self.get_sentences(text) if len(s.split()) > 1]
        batch_range = range(0, len(sentences), batch_size)
        if progress:
            from tqdm import tqdm  # type: ignore[import-untyped]

            batch_range = tqdm(batch_range, desc="Parsing batches", leave=False)
        results: list[ParseResult] = []
        for i in batch_range:
            batch = sentences[i : i + batch_size]
            for sentence_results in self.parse_batch(batch):
                results.extend(sentence_results)
        return results

    def parse_to_jsonl(
        self,
        text: str,
        output: str,
        batch_size: int = 8,
        progress: bool = False,
    ) -> None:
        """Parse *text* and write results to a JSONL file.

        Each ParseResult is serialized as one JSON line.
        """
        with open(output, "w") as f:
            for result in self.parse(text, batch_size=batch_size, progress=progress):
                f.write(result.to_json() + "\n")

    def parse_source(
        self,
        source: str,
        reader: str = "auto",
        batch_size: int = 8,
        progress: bool = False,
    ) -> Iterator[list[ParseResult]]:
        """Read text blocks from *source* and parse each one.

        Automatically selects (or explicitly uses) a reader, then
        yields one list of parse results per text block.
        """
        from hyperbase.readers.reader import get_reader

        rdr = get_reader(source, reader=reader)
        yield from rdr.read_and_parse(
            source,
            self,
            batch_size=batch_size,
            progress=progress,
        )

    def parse_source_to_jsonl(
        self,
        source: str,
        output: str,
        reader: str = "auto",
        batch_size: int = 8,
        progress: bool = False,
    ) -> None:
        """Read *source*, parse every block, and write results to a JSONL file.

        Each ParseResult is serialized as one JSON line.
        """
        with open(output, "w") as f:
            for results in self.parse_source(
                source,
                reader=reader,
                batch_size=batch_size,
                progress=progress,
            ):
                for result in results:
                    f.write(result.to_json() + "\n")

accepted_params() classmethod

Return the set of parameters this parser accepts.

Each key is a parameter name. The value is a dict with: - "type": the expected Python type (e.g. str, int). - "default": the default value (or None if required). - "description": a short human-readable description. - "required": whether the parameter must be provided.

Subclasses should merge their own parameters with the result of super().accepted_params() so that common parameters like max_depth remain discoverable.

Source code in src/hyperbase/parsers/parser.py
@classmethod
def accepted_params(cls) -> dict[str, dict[str, Any]]:
    """Return the set of parameters this parser accepts.

    Each key is a parameter name. The value is a dict with:
    - ``"type"``: the expected Python type (e.g. ``str``, ``int``).
    - ``"default"``: the default value (or ``None`` if required).
    - ``"description"``: a short human-readable description.
    - ``"required"``: whether the parameter must be provided.

    Subclasses should merge their own parameters with the result of
    ``super().accepted_params()`` so that common parameters like
    ``max_depth`` remain discoverable.
    """
    return {
        "max_depth": {
            "type": int,
            "default": DEFAULT_MAX_DEPTH,
            "description": (
                "Maximum allowed nesting depth for produced edges. "
                "Sentences whose parse exceeds this depth are rejected "
                "rather than processed, to avoid pathological inputs "
                "blowing the Python stack."
            ),
            "required": False,
        },
    }

cache_key_from_settings(settings) classmethod

Build a cache key tuple from a settings dict.

The default implementation produces one (name, value) pair per entry in :meth:accepted_params, sorted by name. Two settings dicts that yield the same key are guaranteed to produce equivalent parser instances.

Source code in src/hyperbase/parsers/parser.py
@classmethod
def cache_key_from_settings(cls, settings: dict[str, Any]) -> tuple:
    """Build a cache key tuple from a settings dict.

    The default implementation produces one ``(name, value)`` pair
    per entry in :meth:`accepted_params`, sorted by name. Two
    settings dicts that yield the same key are guaranteed to
    produce equivalent parser instances.
    """
    names = sorted(cls.accepted_params())
    return tuple((name, settings.get(name)) for name in names)

format_cache_key(cache_key) classmethod

Render a cache key produced by :meth:cache_key_from_settings as a human-readable string for the REPL /parsers command.

Source code in src/hyperbase/parsers/parser.py
@classmethod
def format_cache_key(cls, cache_key: tuple) -> str:
    """Render a cache key produced by :meth:`cache_key_from_settings`
    as a human-readable string for the REPL ``/parsers`` command."""
    return ", ".join(f"{name}={value}" for name, value in cache_key)

install_repl(session)

Hook for parser plugins to extend the Hyperbase REPL.

Override this to register parser-specific REPL behavior on session (a :class:hyperbase.cli.repl.ReplSession). The session exposes the following registration methods:

  • register_command(name, help, handler) -- add a slash command callable as /name.
  • register_setting(name, default, type_, description="") -- expose an extra REPL-only setting (e.g. a display toggle) that can be changed via /set.
  • register_pre_result_hook(hook) -- run hook after parsing but before the parse result panel is rendered.
  • register_post_result_hook(hook) -- run hook after the parse result panel is rendered.
  • register_stats_provider(provider) -- supply extra (label, value) rows for the statistics table.

Hooks receive a :class:~hyperbase.parsers.repl_api.ReplContext object. The default implementation is a no-op.

Source code in src/hyperbase/parsers/parser.py
def install_repl(self, session: Any) -> None:  # noqa: ANN401
    """Hook for parser plugins to extend the Hyperbase REPL.

    Override this to register parser-specific REPL behavior on
    *session* (a :class:`hyperbase.cli.repl.ReplSession`). The
    session exposes the following registration methods:

    - ``register_command(name, help, handler)`` -- add a slash
      command callable as ``/name``.
    - ``register_setting(name, default, type_, description="")``
      -- expose an extra REPL-only setting (e.g. a display
      toggle) that can be changed via ``/set``.
    - ``register_pre_result_hook(hook)`` -- run *hook* after
      parsing but before the parse result panel is rendered.
    - ``register_post_result_hook(hook)`` -- run *hook* after the
      parse result panel is rendered.
    - ``register_stats_provider(provider)`` -- supply extra
      ``(label, value)`` rows for the statistics table.

    Hooks receive a :class:`~hyperbase.parsers.repl_api.ReplContext`
    object. The default implementation is a no-op.
    """

parse_batch(sentences)

Parse multiple sentences. Subclasses may override with a true batched implementation (e.g. a single CT2 call).

Source code in src/hyperbase/parsers/parser.py
def parse_batch(self, sentences: list[str]) -> list[list[ParseResult]]:
    """Parse multiple sentences. Subclasses may override with a
    true batched implementation (e.g. a single CT2 call)."""
    return [self.parse_sentence(sentence) for sentence in sentences]

parse(text, batch_size=8, progress=False)

Sentensize text, then parse all sentences in batches.

Returns a flat list of parse results across all sentences.

Source code in src/hyperbase/parsers/parser.py
def parse(
    self, text: str, batch_size: int = 8, progress: bool = False
) -> list[ParseResult]:
    """Sentensize text, then parse all sentences in batches.

    Returns a flat list of parse results across all sentences.
    """
    sentences = [s for s in self.get_sentences(text) if len(s.split()) > 1]
    batch_range = range(0, len(sentences), batch_size)
    if progress:
        from tqdm import tqdm  # type: ignore[import-untyped]

        batch_range = tqdm(batch_range, desc="Parsing batches", leave=False)
    results: list[ParseResult] = []
    for i in batch_range:
        batch = sentences[i : i + batch_size]
        for sentence_results in self.parse_batch(batch):
            results.extend(sentence_results)
    return results

parse_to_jsonl(text, output, batch_size=8, progress=False)

Parse text and write results to a JSONL file.

Each ParseResult is serialized as one JSON line.

Source code in src/hyperbase/parsers/parser.py
def parse_to_jsonl(
    self,
    text: str,
    output: str,
    batch_size: int = 8,
    progress: bool = False,
) -> None:
    """Parse *text* and write results to a JSONL file.

    Each ParseResult is serialized as one JSON line.
    """
    with open(output, "w") as f:
        for result in self.parse(text, batch_size=batch_size, progress=progress):
            f.write(result.to_json() + "\n")

parse_source(source, reader='auto', batch_size=8, progress=False)

Read text blocks from source and parse each one.

Automatically selects (or explicitly uses) a reader, then yields one list of parse results per text block.

Source code in src/hyperbase/parsers/parser.py
def parse_source(
    self,
    source: str,
    reader: str = "auto",
    batch_size: int = 8,
    progress: bool = False,
) -> Iterator[list[ParseResult]]:
    """Read text blocks from *source* and parse each one.

    Automatically selects (or explicitly uses) a reader, then
    yields one list of parse results per text block.
    """
    from hyperbase.readers.reader import get_reader

    rdr = get_reader(source, reader=reader)
    yield from rdr.read_and_parse(
        source,
        self,
        batch_size=batch_size,
        progress=progress,
    )

parse_source_to_jsonl(source, output, reader='auto', batch_size=8, progress=False)

Read source, parse every block, and write results to a JSONL file.

Each ParseResult is serialized as one JSON line.

Source code in src/hyperbase/parsers/parser.py
def parse_source_to_jsonl(
    self,
    source: str,
    output: str,
    reader: str = "auto",
    batch_size: int = 8,
    progress: bool = False,
) -> None:
    """Read *source*, parse every block, and write results to a JSONL file.

    Each ParseResult is serialized as one JSON line.
    """
    with open(output, "w") as f:
        for results in self.parse_source(
            source,
            reader=reader,
            batch_size=batch_size,
            progress=progress,
        ):
            for result in results:
                f.write(result.to_json() + "\n")

ReplContext dataclass

Context passed to REPL hooks during parse_text.

Hooks may inspect the parse output and use session to access the console, formatter, settings, and the parser itself.

Source code in src/hyperbase/parsers/repl_api.py
@dataclass
class ReplContext:
    """Context passed to REPL hooks during ``parse_text``.

    Hooks may inspect the parse output and use ``session`` to access
    the console, formatter, settings, and the parser itself.
    """

    session: Any
    """The :class:`ReplSession` (duck-typed). Exposes ``parser``,
    ``console``, ``settings``, ``formatter``, plus the ``register_*``
    methods documented below."""

    text: str
    """The raw input text the user typed."""

    parse_result: list[ParseResult]
    """The full list returned by ``parser.parse(text)``."""

    edge: Hyperedge | None
    """The primary parsed edge (``parse_result[0].edge``), or ``None``
    if parsing produced no result."""

    tokens: list[str] | None
    """The tokens for the primary parse, or ``None`` if absent."""

    elapsed_time: float
    """Wall-clock parse time in seconds."""

session instance-attribute

The :class:ReplSession (duck-typed). Exposes parser, console, settings, formatter, plus the register_* methods documented below.

text instance-attribute

The raw input text the user typed.

parse_result instance-attribute

The full list returned by parser.parse(text).

edge instance-attribute

The primary parsed edge (parse_result[0].edge), or None if parsing produced no result.

tokens instance-attribute

The tokens for the primary parse, or None if absent.

elapsed_time instance-attribute

Wall-clock parse time in seconds.

list_parsers()

Return all installed parser plugins.

Each plugin registers via the hyperbase.parsers entry-point group in its pyproject.toml::

[project.entry-points."hyperbase.parsers"]
myparser = "my_package:MyParser"
Source code in src/hyperbase/parsers/__init__.py
def list_parsers() -> dict[str, EntryPoint]:
    """Return all installed parser plugins.

    Each plugin registers via the ``hyperbase.parsers`` entry-point group
    in its ``pyproject.toml``::

        [project.entry-points."hyperbase.parsers"]
        myparser = "my_package:MyParser"
    """
    eps = entry_points(group="hyperbase.parsers")
    return {ep.name: ep for ep in eps}

get_parser(name, params=None, **kwargs)

Instantiate a parser plugin by name.

Looks up name in the hyperbase.parsers entry-point group and returns an instance of the registered :class:Parser subclass.

params is a dictionary of parser parameters. For backwards compatibility, keyword arguments are merged into params (explicit params entries take precedence).

Raises :class:ValueError if the parser is not installed.

Source code in src/hyperbase/parsers/__init__.py
def get_parser(
    name: str, params: dict[str, Any] | None = None, **kwargs: object
) -> Parser:
    """Instantiate a parser plugin by name.

    Looks up *name* in the ``hyperbase.parsers`` entry-point group and
    returns an instance of the registered :class:`Parser` subclass.

    *params* is a dictionary of parser parameters.  For backwards
    compatibility, keyword arguments are merged into *params* (explicit
    *params* entries take precedence).

    Raises :class:`ValueError` if the parser is not installed.
    """
    parsers = list_parsers()
    if name not in parsers:
        available = ", ".join(sorted(parsers)) or "(none)"
        raise ValueError(
            f"Parser {name!r} is not installed. Available parsers: {available}"
        )
    merged: dict[str, Any] = {**kwargs, **(params or {})}
    cls = parsers[name].load()
    return cls(merged)