rdflib.plugins.stores.memory — rdflib 7.1.4 documentation (original) (raw)

from future import annotations

from typing import ( TYPE_CHECKING, Any, Collection, Dict, Generator, Iterator, Mapping, Optional, Set, Tuple, Union, overload, )

from rdflib.store import Store from rdflib.util import _coalesce

if TYPE_CHECKING: from rdflib.graph import ( Graph, _ContextType, _ObjectType, _PredicateType, _SubjectType, _TriplePatternType, _TripleType, ) from rdflib.plugins.sparql.sparql import Query, Update from rdflib.query import Result from rdflib.term import Identifier, URIRef

all = ["SimpleMemory", "Memory"]

ANY: None = None

[docs]class SimpleMemory(Store): """
A fast naive in memory implementation of a triple store.

This triple store uses nested dictionaries to store triples. Each
triple is stored in two such indices as follows spo[s][p][o] = 1 and
pos[p][o][s] = 1.

Authors: Michel Pelletier, Daniel Krech, Stefan Niederhauser
"""

[docs] def init( self, configuration: Optional[str] = None, identifier: Optional[Identifier] = None, ): super(SimpleMemory, self).init(configuration) self.identifier = identifier

    # indexed by [subject][predicate][object]
    self.__spo: Dict[_SubjectType, Dict[_PredicateType, Dict[_ObjectType, int]]] = (
        {}
    )

    # indexed by [predicate][object][subject]
    self.__pos: Dict[_PredicateType, Dict[_ObjectType, Dict[_SubjectType, int]]] = (
        {}
    )

    # indexed by [predicate][object][subject]
    self.__osp: Dict[_ObjectType, Dict[_SubjectType, Dict[_PredicateType, int]]] = (
        {}
    )

    self.__namespace: Dict[str, URIRef] = {}
    self.__prefix: Dict[URIRef, str] = {}

[docs] def add( self, triple: _TripleType, context: _ContextType, quoted: bool = False, ) -> None: """
Add a triple to the store of triples. """ # add dictionary entries for spo[s][p][p] = 1 and pos[p][o][s] # = 1, creating the nested dictionaries where they do not yet # exits. subject, predicate, object = triple spo = self.__spo try: po = spo[subject] except: # noqa: E722 po = spo[subject] = {} try: o = po[predicate] except: # noqa: E722 o = po[predicate] = {} o[object] = 1

    pos = self.__pos
    try:
        os = pos[predicate]
    except:  # noqa: E722
        os = pos[predicate] = {}
    try:
        s = os[object]
    except:  # noqa: E722
        s = os[object] = {}
    s[subject] = 1

    osp = self.__osp
    try:
        sp = osp[object]
    except:  # noqa: E722
        sp = osp[object] = {}
    try:
        p = sp[subject]
    except:  # noqa: E722
        p = sp[subject] = {}
    p[predicate] = 1

[docs] def remove( self, triple_pattern: _TriplePatternType, context: Optional[_ContextType] = None, ) -> None: for (subject, predicate, object), c in list(self.triples(triple_pattern)): del self.__spo[subject][predicate][object] del self.__pos[predicate][object][subject] del self.__osp[object][subject][predicate]

[docs] def triples( self, triple_pattern: _TriplePatternType, context: Optional[_ContextType] = None, ) -> Iterator[Tuple[_TripleType, Iterator[Optional[_ContextType]]]]: """A generator over all the triples matching""" subject, predicate, object = triple_pattern if subject != ANY: # subject is given spo = self.__spo if subject in spo: subjectDictionary = spo[subject] # noqa: N806 if predicate != ANY: # subject+predicate is given if predicate in subjectDictionary: if object != ANY: # subject+predicate+object is given if object in subjectDictionary[predicate]: yield (subject, predicate, object), self.__contexts() else: # given object not found pass else: # subject+predicate is given, object unbound for o in subjectDictionary[predicate].keys(): yield (subject, predicate, o), self.__contexts() else: # given predicate not found pass else: # subject given, predicate unbound for p in subjectDictionary.keys(): if object != ANY: # object is given if object in subjectDictionary[p]: yield (subject, p, object), self.__contexts() else: # given object not found pass else: # object unbound for o in subjectDictionary[p].keys(): yield (subject, p, o), self.__contexts() else: # given subject not found pass elif predicate != ANY: # predicate is given, subject unbound pos = self.__pos if predicate in pos: predicateDictionary = pos[predicate] # noqa: N806 if object != ANY: # predicate+object is given, subject unbound if object in predicateDictionary: for s in predicateDictionary[object].keys(): yield (s, predicate, object), self.__contexts() else: # given object not found pass else: # predicate is given, object+subject unbound for o in predicateDictionary.keys(): for s in predicateDictionary[o].keys(): yield (s, predicate, o), self.__contexts() elif object != ANY: # object is given, subject+predicate unbound osp = self.__osp if object in osp: objectDictionary = osp[object] # noqa: N806 for s in objectDictionary.keys(): for p in objectDictionary[s].keys(): yield (s, p, object), self.__contexts() else: # subject+predicate+object unbound spo = self.__spo for s in spo.keys(): subjectDictionary = spo[s] # noqa: N806 for p in subjectDictionary.keys(): for o in subjectDictionary[p].keys(): yield (s, p, o), self.__contexts()

[docs] def len(self, context: Optional[_ContextType] = None) -> int: # @@ optimize i = 0 for triple in self.triples((None, None, None)): i += 1 return i

[docs] def bind(self, prefix: str, namespace: URIRef, override: bool = True) -> None: # should be identical to Memory.bind bound_namespace = self.__namespace.get(prefix) bound_prefix = _coalesce( self.__prefix.get(namespace), # type error: error: Argument 1 to "get" of "Mapping" has incompatible type "Optional[URIRef]"; expected "URIRef" self.__prefix.get(bound_namespace), # type: ignore[arg-type] ) if override: if bound_prefix is not None: del self.__namespace[bound_prefix] if bound_namespace is not None: del self.__prefix[bound_namespace] self.__prefix[namespace] = prefix self.__namespace[prefix] = namespace else: # type error: Invalid index type "Optional[URIRef]" for "Dict[URIRef, str]"; expected type "URIRef" self.__prefix[_coalesce(bound_namespace, namespace)] = _coalesce( # type: ignore[index] bound_prefix, default=prefix ) # type error: Invalid index type "Optional[str]" for "Dict[str, URIRef]"; expected type "str" self.__namespace[_coalesce(bound_prefix, prefix)] = _coalesce( # type: ignore[index] bound_namespace, default=namespace )

[docs] def namespace(self, prefix: str) -> Optional[URIRef]: return self.__namespace.get(prefix, None)

[docs] def prefix(self, namespace: URIRef) -> Optional[str]: return self.__prefix.get(namespace, None)

[docs] def namespaces(self) -> Iterator[Tuple[str, URIRef]]: for prefix, namespace in self.__namespace.items(): yield prefix, namespace

def __contexts(self) -> Generator[_ContextType, None, None]:
    # TODO: best way to return empty generator
    # type error: Need type annotation for "c"
    return (c for c in [])  # type: ignore[var-annotated]

# type error: Missing return statement

[docs] def query( # type: ignore[return] self, query: Union[Query, str], initNs: Mapping[str, Any], # noqa: N803 initBindings: Mapping[str, Identifier], # noqa: N803 queryGraph: str, # noqa: N803 **kwargs: Any, ) -> Result: super(SimpleMemory, self).query( query, initNs, initBindings, queryGraph, **kwargs )

[docs] def update( self, update: Union[Update, str], initNs: Mapping[str, Any], # noqa: N803 initBindings: Mapping[str, Identifier], # noqa: N803 queryGraph: str, # noqa: N803 **kwargs: Any, ) -> None: super(SimpleMemory, self).update( update, initNs, initBindings, queryGraph, **kwargs )

[docs]class Memory(Store): """
An in memory implementation of a triple store.

Same as SimpleMemory above, but is Context-aware, Graph-aware, and Formula-aware
Authors: Ashley Sommer
"""

context_aware = True
formula_aware = True
graph_aware = True

[docs] def init( self, configuration: Optional[str] = None, identifier: Optional[Identifier] = None, ): super(Memory, self).init(configuration) self.identifier = identifier

    # indexed by [subject][predicate][object]
    self.__spo: Dict[_SubjectType, Dict[_PredicateType, Dict[_ObjectType, int]]] = (
        {}
    )

    # indexed by [predicate][object][subject]
    self.__pos: Dict[_PredicateType, Dict[_ObjectType, Dict[_SubjectType, int]]] = (
        {}
    )

    # indexed by [predicate][object][subject]
    self.__osp: Dict[_ObjectType, Dict[_SubjectType, Dict[_PredicateType, int]]] = (
        {}
    )

    self.__namespace: Dict[str, URIRef] = {}
    self.__prefix: Dict[URIRef, str] = {}
    self.__context_obj_map: Dict[str, Graph] = {}
    self.__tripleContexts: Dict[_TripleType, Dict[Optional[str], bool]] = {}
    self.__contextTriples: Dict[Optional[str], Set[_TripleType]] = {None: set()}
    # all contexts used in store (unencoded)
    self.__all_contexts: Set[Graph] = set()
    # default context information for triples
    self.__defaultContexts: Optional[Dict[Optional[str], bool]] = None

[docs] def add( self, triple: _TripleType, context: _ContextType, quoted: bool = False, ) -> None: """
Add a triple to the store of triples. """ # add dictionary entries for spo[s][p][p] = 1 and pos[p][o][s] # = 1, creating the nested dictionaries where they do not yet # exits. Store.add(self, triple, context, quoted=quoted) if context is not None: self._all_contexts.add(context) subject, predicate, object = triple

    spo = self.__spo
    try:
        po = spo[subject]
    except LookupError:
        po = spo[subject] = {}
    try:
        o = po[predicate]
    except LookupError:
        o = po[predicate] = {}

    try:
        _ = o[object_]
        # This cannot be reached if (s, p, o) was not inserted before.
        triple_exists = True
    except KeyError:
        o[object_] = 1
        triple_exists = False
    self.__add_triple_context(triple, triple_exists, context, quoted)

    if triple_exists:
        # No need to insert twice this triple.
        return

    pos = self.__pos
    try:
        os = pos[predicate]
    except LookupError:
        os = pos[predicate] = {}
    try:
        s = os[object_]
    except LookupError:
        s = os[object_] = {}
    s[subject] = 1

    osp = self.__osp
    try:
        sp = osp[object_]
    except LookupError:
        sp = osp[object_] = {}
    try:
        p = sp[subject]
    except LookupError:
        p = sp[subject] = {}
    p[predicate] = 1

[docs] def remove( self, triple_pattern: _TriplePatternType, context: Optional[_ContextType] = None, ) -> None: req_ctx = self._ctx_to_str(context) for triple, c in self.triples(triple_pattern, context=context): subject, predicate, object = triple for ctx in self.__get_context_for_triple(triple): if context is not None and req_ctx != ctx: continue self.__remove_triple_context(triple, ctx) ctxs = self.__get_context_for_triple(triple, skipQuoted=True) if None in ctxs and (context is None or len(ctxs) == 1): # remove from default graph too self.__remove_triple_context(triple, None) if len(self.__get_context_for_triple(triple)) == 0: del self._spo[subject][predicate][object] del self._pos[predicate][object][subject] del self._osp[object][subject][predicate] del self.__tripleContexts[triple] if ( req_ctx is not None and req_ctx in self.__contextTriples and len(self.__contextTriples[req_ctx]) == 0 ): # all triples are removed out of this context # and it's not the default context so delete it del self.__contextTriples[req_ctx]

    if (
        triple_pattern == (None, None, None)
        and context in self.__all_contexts
        and not self.graph_aware
    ):
        # remove the whole context
        self.__all_contexts.remove(context)

[docs] def triples( self, triple_pattern: _TriplePatternType, context: Optional[_ContextType] = None, ) -> Generator[ Tuple[_TripleType, Generator[Optional[_ContextType], None, None]], None, None, ]: """A generator over all the triples matching""" req_ctx = self._ctx_to_str(context) subject, predicate, object = triple_pattern

    # all triples case (no triple parts given as pattern)
    if subject is None and predicate is None and object_ is None:
        # Just dump all known triples from the given graph
        if req_ctx not in self.__contextTriples:
            return
        for triple in self.__contextTriples[req_ctx].copy():
            yield triple, self.__contexts(triple)

    # optimize "triple in graph" case (all parts given)
    elif subject is not None and predicate is not None and object_ is not None:
        # type error: Incompatible types in assignment (expression has type "Tuple[Optional[IdentifiedNode], Optional[IdentifiedNode], Optional[Identifier]]", variable has type "Tuple[IdentifiedNode, IdentifiedNode, Identifier]")
        # NOTE on type error: at this point, all elements of triple_pattern
        # is not None, so it has the same type as triple
        triple = triple_pattern  # type: ignore[assignment]
        try:
            _ = self.__spo[subject][predicate][object_]
            if self.__triple_has_context(triple, req_ctx):
                yield triple, self.__contexts(triple)
        except KeyError:
            return

    elif subject is not None:  # subject is given
        spo = self.__spo
        if subject in spo:
            subjectDictionary = spo[subject]  # noqa: N806
            if predicate is not None:  # subject+predicate is given
                if predicate in subjectDictionary:
                    if object_ is not None:  # subject+predicate+object is given
                        if object_ in subjectDictionary[predicate]:
                            triple = (subject, predicate, object_)
                            if self.__triple_has_context(triple, req_ctx):
                                yield triple, self.__contexts(triple)
                        else:  # given object not found
                            pass
                    else:  # subject+predicate is given, object unbound
                        for o in list(subjectDictionary[predicate].keys()):
                            triple = (subject, predicate, o)
                            if self.__triple_has_context(triple, req_ctx):
                                yield triple, self.__contexts(triple)
                else:  # given predicate not found
                    pass
            else:  # subject given, predicate unbound
                for p in list(subjectDictionary.keys()):
                    if object_ is not None:  # object is given
                        if object_ in subjectDictionary[p]:
                            triple = (subject, p, object_)
                            if self.__triple_has_context(triple, req_ctx):
                                yield triple, self.__contexts(triple)
                        else:  # given object not found
                            pass
                    else:  # object unbound
                        for o in list(subjectDictionary[p].keys()):
                            triple = (subject, p, o)
                            if self.__triple_has_context(triple, req_ctx):
                                yield triple, self.__contexts(triple)
        else:  # given subject not found
            pass
    elif predicate is not None:  # predicate is given, subject unbound
        pos = self.__pos
        if predicate in pos:
            predicateDictionary = pos[predicate]  # noqa: N806
            if object_ is not None:  # predicate+object is given, subject unbound
                if object_ in predicateDictionary:
                    for s in list(predicateDictionary[object_].keys()):
                        triple = (s, predicate, object_)
                        if self.__triple_has_context(triple, req_ctx):
                            yield triple, self.__contexts(triple)
                else:  # given object not found
                    pass
            else:  # predicate is given, object+subject unbound
                for o in list(predicateDictionary.keys()):
                    for s in list(predicateDictionary[o].keys()):
                        triple = (s, predicate, o)
                        if self.__triple_has_context(triple, req_ctx):
                            yield triple, self.__contexts(triple)
    elif object_ is not None:  # object is given, subject+predicate unbound
        osp = self.__osp
        if object_ in osp:
            objectDictionary = osp[object_]  # noqa: N806
            for s in list(objectDictionary.keys()):
                for p in list(objectDictionary[s].keys()):
                    triple = (s, p, object_)
                    if self.__triple_has_context(triple, req_ctx):
                        yield triple, self.__contexts(triple)
    else:  # subject+predicate+object unbound
        # Shouldn't get here if all other cases above worked correctly.
        spo = self.__spo
        for s in list(spo.keys()):
            subjectDictionary = spo[s]  # noqa: N806
            for p in list(subjectDictionary.keys()):
                for o in list(subjectDictionary[p].keys()):
                    triple = (s, p, o)
                    if self.__triple_has_context(triple, req_ctx):
                        yield triple, self.__contexts(triple)

[docs] def bind(self, prefix: str, namespace: URIRef, override: bool = True) -> None: # should be identical to SimpleMemory.bind bound_namespace = self.__namespace.get(prefix) bound_prefix = _coalesce( self.__prefix.get(namespace), # type error: error: Argument 1 to "get" of "Mapping" has incompatible type "Optional[URIRef]"; expected "URIRef" self.__prefix.get(bound_namespace), # type: ignore[arg-type] ) if override: if bound_prefix is not None: del self.__namespace[bound_prefix] if bound_namespace is not None: del self.__prefix[bound_namespace] self.__prefix[namespace] = prefix self.__namespace[prefix] = namespace else: # type error: Invalid index type "Optional[URIRef]" for "Dict[URIRef, str]"; expected type "URIRef" self.__prefix[_coalesce(bound_namespace, namespace)] = _coalesce( # type: ignore[index] bound_prefix, default=prefix ) # type error: Invalid index type "Optional[str]" for "Dict[str, URIRef]"; expected type "str" # type error: Incompatible types in assignment (expression has type "Optional[URIRef]", target has type "URIRef") self.__namespace[_coalesce(bound_prefix, prefix)] = _coalesce( # type: ignore[index] bound_namespace, default=namespace )

[docs] def namespace(self, prefix: str) -> Optional[URIRef]: return self.__namespace.get(prefix, None)

[docs] def prefix(self, namespace: URIRef) -> Optional[str]: return self.__prefix.get(namespace, None)

[docs] def namespaces(self) -> Iterator[Tuple[str, URIRef]]: for prefix, namespace in self.__namespace.items(): yield prefix, namespace

[docs] def contexts( self, triple: Optional[_TripleType] = None ) -> Generator[_ContextType, None, None]: if triple is None or triple == (None, None, None): return (context for context in self.__all_contexts)

    subj, pred, obj = triple
    try:
        _ = self.__spo[subj][pred][obj]
        return self.__contexts(triple)
    except KeyError:
        return (_ for _ in [])

[docs] def len(self, context: Optional[_ContextType] = None) -> int: ctx = self.__ctx_to_str(context) if ctx not in self.__contextTriples: return 0 return len(self.__contextTriples[ctx])

[docs] def add_graph(self, graph: Graph) -> None: if not self.graph_aware: Store.add_graph(self, graph) else: self.__all_contexts.add(graph)

[docs] def remove_graph(self, graph: Graph) -> None: if not self.graph_aware: Store.remove_graph(self, graph) else: self.remove((None, None, None), graph) try: self.__all_contexts.remove(graph) except KeyError: pass # we didn't know this graph, no problem

# internal utility methods below
def __add_triple_context(
    self,
    triple: _TripleType,
    triple_exists: bool,
    context: Optional[_ContextType],
    quoted: bool,
) -> None:
    """add the given context to the set of contexts for the triple"""
    ctx = self.__ctx_to_str(context)
    quoted = bool(quoted)
    if triple_exists:
        # we know the triple exists somewhere in the store
        try:
            triple_context = self.__tripleContexts[triple]
        except KeyError:
            # triple exists with default ctx info
            # start with a copy of the default ctx info
            # type error: Item "None" of "Optional[Dict[Optional[str], bool]]" has no attribute "copy"
            triple_context = self.__tripleContexts[triple] = (
                self.__defaultContexts.copy()  # type: ignore[union-attr]
            )

        triple_context[ctx] = quoted

        if not quoted:
            triple_context[None] = quoted

    else:
        # the triple didn't exist before in the store
        if quoted:  # this context only
            triple_context = self.__tripleContexts[triple] = {ctx: quoted}
        else:  # default context as well
            triple_context = self.__tripleContexts[triple] = {
                ctx: quoted,
                None: quoted,
            }

    # if the triple is not quoted add it to the default context
    if not quoted:
        self.__contextTriples[None].add(triple)

    # always add the triple to given context, making sure it's initialized
    if ctx not in self.__contextTriples:
        self.__contextTriples[ctx] = set()
    self.__contextTriples[ctx].add(triple)

    # if this is the first ever triple in the store, set default ctx info
    if self.__defaultContexts is None:
        self.__defaultContexts = triple_context
    # if the context info is the same as default, no need to store it
    if triple_context == self.__defaultContexts:
        del self.__tripleContexts[triple]

def __get_context_for_triple(
    self, triple: _TripleType, skipQuoted: bool = False  # noqa: N803
) -> Collection[Optional[str]]:
    """return a list of contexts (str) for the triple, skipping
    quoted contexts if skipQuoted==True"""

    ctxs = self.__tripleContexts.get(triple, self.__defaultContexts)

    if not skipQuoted:
        # type error: Item "None" of "Optional[Dict[Optional[str], bool]]" has no attribute "keys"
        return ctxs.keys()  # type: ignore[union-attr]

    # type error: Item "None" of "Optional[Dict[Optional[str], bool]]" has no attribute "items"
    return [ctx for ctx, quoted in ctxs.items() if not quoted]  # type: ignore[union-attr]

def __triple_has_context(self, triple: _TripleType, ctx: Optional[str]) -> bool:
    """return True if the triple exists in the given context"""
    # type error: Unsupported right operand type for in ("Optional[Dict[Optional[str], bool]]")
    return ctx in self.__tripleContexts.get(triple, self.__defaultContexts)  # type: ignore[operator]

def __remove_triple_context(self, triple: _TripleType, ctx):
    """remove the context from the triple"""
    # type error: Item "None" of "Optional[Dict[Optional[str], bool]]" has no attribute "copy"
    ctxs = self.__tripleContexts.get(triple, self.__defaultContexts).copy()  # type: ignore[union-attr]
    del ctxs[ctx]
    if ctxs == self.__defaultContexts:
        del self.__tripleContexts[triple]
    else:
        self.__tripleContexts[triple] = ctxs
    self.__contextTriples[ctx].remove(triple)

@overload
def __ctx_to_str(self, ctx: _ContextType) -> str: ...

@overload
def __ctx_to_str(self, ctx: None) -> None: ...

def __ctx_to_str(self, ctx: Optional[_ContextType]) -> Optional[str]:
    if ctx is None:
        return None
    try:
        # ctx could be a graph. In that case, use its identifier
        ctx_str = "{}:{}".format(ctx.identifier.__class__.__name__, ctx.identifier)
        self.__context_obj_map[ctx_str] = ctx
        return ctx_str
    except AttributeError:
        # otherwise, ctx should be a URIRef or BNode or str
        # NOTE on type errors: This is actually never called with ctx value as str in all unit tests, so this seems like it should just not be here.
        # type error: Subclass of "Graph" and "str" cannot exist: would have incompatible method signatures
        if isinstance(ctx, str):  # type: ignore[unreachable]
            # type error: Statement is unreachable
            ctx_str = "{}:{}".format(ctx.__class__.__name__, ctx)  # type: ignore[unreachable]
            if ctx_str in self.__context_obj_map:
                return ctx_str
            self.__context_obj_map[ctx_str] = ctx
            return ctx_str
        raise RuntimeError("Cannot use that type of object as a Graph context")

def __contexts(self, triple: _TripleType) -> Generator[_ContextType, None, None]:
    """return a generator for all the non-quoted contexts
    (dereferenced) the encoded triple appears in"""
    # type error: Argument 2 to "get" of "Mapping" has incompatible type "str"; expected "Optional[Graph]"
    return (
        self.__context_obj_map.get(ctx_str, ctx_str)  # type: ignore[arg-type]
        for ctx_str in self.__get_context_for_triple(triple, skipQuoted=True)
        if ctx_str is not None
    )

# type error: Missing return statement

[docs] def query( # type: ignore[return] self, query: Union[Query, str], initNs: Mapping[str, Any], # noqa: N803 initBindings: Mapping[str, Identifier], # noqa: N803 queryGraph: str, # noqa: N803 **kwargs, ) -> Result: super(Memory, self).query(query, initNs, initBindings, queryGraph, **kwargs)

[docs] def update( self, update: Union[Update, Any], initNs: Mapping[str, Any], # noqa: N803 initBindings: Mapping[str, Identifier], # noqa: N803 queryGraph: str, # noqa: N803 **kwargs, ) -> None: super(Memory, self).update(update, initNs, initBindings, queryGraph, **kwargs)