rdflib.plugins.stores.berkeleydb — rdflib 7.1.4 documentation (original) (raw)
from future import annotations
import logging from os import mkdir from os.path import abspath, exists from threading import Thread from typing import TYPE_CHECKING, Any, Callable, Dict, Generator, List, Optional, Tuple from urllib.request import pathname2url
from rdflib.store import NO_STORE, VALID_STORE, Store from rdflib.term import Identifier, Node, URIRef
if TYPE_CHECKING: from rdflib.graph import Graph, _ContextType, _TriplePatternType, _TripleType
def bb(u: str) -> bytes: return u.encode("utf-8")
try: from berkeleydb import db
has_bsddb = True
except ImportError: has_bsddb = False
if has_bsddb: # These are passed to bsddb when creating DBs
# passed to db.DBEnv.set_flags
ENVSETFLAGS = db.DB_CDB_ALLDB
# passed to db.DBEnv.open
ENVFLAGS = db.DB_INIT_MPOOL | db.DB_INIT_CDB | db.DB_THREAD
CACHESIZE = 1024 * 1024 * 50
# passed to db.DB.Open()
DBOPENFLAGS = db.DB_THREAD
logger = logging.getLogger(name)
all = [ "BerkeleyDB", "_ToKeyFunc", "_FromKeyFunc", "_GetPrefixFunc", "_ResultsFromKeyFunc", ]
_ToKeyFunc = Callable[[Tuple[bytes, bytes, bytes], bytes], bytes] _FromKeyFunc = Callable[[bytes], Tuple[bytes, bytes, bytes, bytes]] _GetPrefixFunc = Callable[ [Tuple[str, str, str], Optional[str]], Generator[str, None, None] ] _ResultsFromKeyFunc = Callable[ [bytes, Optional[Node], Optional[Node], Optional[Node], bytes], Tuple[Tuple[Node, Node, Node], Generator[Node, None, None]], ]
[docs]class BerkeleyDB(Store):
"""
A store that allows for on-disk persistent using BerkeleyDB, a fast
key/value DB.
This store implementation used to be known, previous to rdflib 6.0.0
as 'Sleepycat' due to that being the then name of the Python wrapper
for BerkeleyDB.
This store allows for quads as well as triples. See examples of use
in both the `examples.berkeleydb_example` and ``test/test_store/test_store_berkeleydb.py``
files.
**NOTE on installation**:
To use this store, you must have BerkeleyDB installed on your system
separately to Python (``brew install berkeley-db`` on a Mac) and also have
the BerkeleyDB Python wrapper installed (``pip install berkeleydb``).
You may need to install BerkeleyDB Python wrapper like this:
``YES_I_HAVE_THE_RIGHT_TO_USE_THIS_BERKELEY_DB_VERSION=1 pip install berkeleydb``
"""
context_aware = True
formula_aware = True
transaction_aware = False
graph_aware = True
db_env: db.DBEnv = None
[docs] def init( self, configuration: Optional[str] = None, identifier: Optional[Identifier] = None, ): if not has_bsddb: raise ImportError("Unable to import berkeleydb, store is unusable.") self.__open = False self.__identifier = identifier super(BerkeleyDB, self).init(configuration) self._loads = self.node_pickler.loads self._dumps = self.node_pickler.dumps self.__indicies_info: List[Tuple[Any, _ToKeyFunc, _FromKeyFunc]]
def __get_identifier(self) -> Optional[Identifier]:
return self.__identifier
identifier = property(__get_identifier)
def _init_db_environment(
self, homeDir: str, create: bool = True # noqa: N803
) -> db.DBEnv:
if not exists(homeDir):
if create is True:
mkdir(homeDir)
# TODO: implement create method and refactor this to it
self.create(homeDir)
else:
return NO_STORE
db_env = db.DBEnv()
db_env.set_cachesize(0, CACHESIZE) # TODO
# db_env.set_lg_max(1024*1024)
db_env.set_flags(ENVSETFLAGS, 1)
db_env.open(homeDir, ENVFLAGS | db.DB_CREATE)
return db_env
[docs] def is_open(self) -> bool: return self.__open
[docs] def open(self, path: str, create: bool = True) -> Optional[int]: if not has_bsddb: return NO_STORE homeDir = path # noqa: N806
if self.__identifier is None:
self.__identifier = URIRef(pathname2url(abspath(homeDir)))
db_env = self._init_db_environment(homeDir, create)
if db_env == NO_STORE:
return NO_STORE
self.db_env = db_env
self.__open = True
dbname = None
dbtype = db.DB_BTREE
# auto-commit ensures that the open-call commits when transactions
# are enabled
dbopenflags = DBOPENFLAGS
if self.transaction_aware is True:
dbopenflags |= db.DB_AUTO_COMMIT
if create:
dbopenflags |= db.DB_CREATE
dbmode = 0o660
dbsetflags = 0
# create and open the DBs
self.__indicies: List[db.DB] = [
None,
] * 3
# NOTE on type ingore: this is because type checker does not like this
# way of initializing, using a temporary variable will solve it.
# type error: error: List item 0 has incompatible type "None"; expected "Tuple[Any, Callable[[Tuple[bytes, bytes, bytes], bytes], bytes], Callable[[bytes], Tuple[bytes, bytes, bytes, bytes]]]"
self.__indicies_info = [
None, # type: ignore[list-item]
] * 3
for i in range(0, 3):
index_name = to_key_func(i)(
("s".encode("latin-1"), "p".encode("latin-1"), "o".encode("latin-1")),
"c".encode("latin-1"),
).decode()
index = db.DB(db_env)
index.set_flags(dbsetflags)
index.open(index_name, dbname, dbtype, dbopenflags, dbmode)
self.__indicies[i] = index
self.__indicies_info[i] = (index, to_key_func(i), from_key_func(i))
lookup: Dict[
int, Tuple[db.DB, _GetPrefixFunc, _FromKeyFunc, _ResultsFromKeyFunc]
] = {}
for i in range(0, 8):
results: List[Tuple[Tuple[int, int], int, int]] = []
for start in range(0, 3):
score = 1
len = 0
for j in range(start, start + 3):
if i & (1 << (j % 3)):
score = score << 1
len += 1
else:
break
tie_break = 2 - start
results.append(((score, tie_break), start, len))
results.sort()
# NOTE on type error: this is because the variable `score` is
# reused with different type
# type error: Incompatible types in assignment (expression has type "Tuple[int, int]", variable has type "int")
score, start, len = results[-1] # type: ignore[assignment]
def get_prefix_func(start: int, end: int) -> _GetPrefixFunc:
def get_prefix(
triple: Tuple[str, str, str], context: Optional[str]
) -> Generator[str, None, None]:
if context is None:
yield ""
else:
yield context
i = start
while i < end:
yield triple[i % 3]
i += 1
yield ""
return get_prefix
lookup[i] = (
self.__indicies[start],
get_prefix_func(start, start + len),
from_key_func(start),
results_from_key_func(start, self._from_string),
)
self.__lookup_dict = lookup
self.__contexts = db.DB(db_env)
self.__contexts.set_flags(dbsetflags)
self.__contexts.open("contexts", dbname, dbtype, dbopenflags, dbmode)
self.__namespace = db.DB(db_env)
self.__namespace.set_flags(dbsetflags)
self.__namespace.open("namespace", dbname, dbtype, dbopenflags, dbmode)
self.__prefix = db.DB(db_env)
self.__prefix.set_flags(dbsetflags)
self.__prefix.open("prefix", dbname, dbtype, dbopenflags, dbmode)
self.__k2i = db.DB(db_env)
self.__k2i.set_flags(dbsetflags)
self.__k2i.open("k2i", dbname, db.DB_HASH, dbopenflags, dbmode)
self.__i2k = db.DB(db_env)
self.__i2k.set_flags(dbsetflags)
self.__i2k.open("i2k", dbname, db.DB_RECNO, dbopenflags, dbmode)
self.__needs_sync = False
t = Thread(target=self.__sync_run)
t.setDaemon(True)
t.start()
self.__sync_thread = t
return VALID_STORE
def __sync_run(self) -> None:
from time import sleep, time
try:
min_seconds, max_seconds = 10, 300
while self.__open:
if self.__needs_sync:
t0 = t1 = time()
self.__needs_sync = False
while self.__open:
sleep(0.1)
if self.__needs_sync:
t1 = time()
self.__needs_sync = False
if time() - t1 > min_seconds or time() - t0 > max_seconds:
self.__needs_sync = False
logger.debug("sync")
self.sync()
break
else:
sleep(1)
except Exception as e:
logger.exception(e)
[docs] def sync(self) -> None: if self.__open: for i in self.__indicies: i.sync() self.__contexts.sync() self.__namespace.sync() self.__prefix.sync() self.__i2k.sync() self.__k2i.sync()
[docs] def close(self, commit_pending_transaction: bool = False) -> None: self.__open = False self.__sync_thread.join() for i in self.__indicies: i.close() self.__contexts.close() self.__namespace.close() self.__prefix.close() self.__i2k.close() self.__k2i.close() self.db_env.close()
[docs] def add(
self,
triple: _TripleType,
context: _ContextType,
quoted: bool = False,
txn: Optional[Any] = None,
) -> None:
"""
Add a triple to the store of triples.
"""
(subject, predicate, object) = triple
assert self.__open, "The Store must be open."
assert context != self, "Can not add triple directly to store"
Store.add(self, (subject, predicate, object), context, quoted)
_to_string = self._to_string
s = _to_string(subject, txn=txn)
p = _to_string(predicate, txn=txn)
o = _to_string(object, txn=txn)
c = _to_string(context, txn=txn)
cspo, cpos, cosp = self.__indicies
value = cspo.get(bb("%s^%s^%s^%s^" % (c, s, p, o)), txn=txn)
if value is None:
self.__contexts.put(bb(c), b"", txn=txn)
contexts_value = cspo.get(
bb("%s^%s^%s^%s^" % ("", s, p, o)), txn=txn
) or "".encode("latin-1")
contexts = set(contexts_value.split("^".encode("latin-1")))
contexts.add(bb(c))
contexts_value = "^".encode("latin-1").join(contexts)
assert contexts_value is not None
cspo.put(bb("%s^%s^%s^%s^" % (c, s, p, o)), b"", txn=txn)
cpos.put(bb("%s^%s^%s^%s^" % (c, p, o, s)), b"", txn=txn)
cosp.put(bb("%s^%s^%s^%s^" % (c, o, s, p)), b"", txn=txn)
if not quoted:
cspo.put(bb("%s^%s^%s^%s^" % ("", s, p, o)), contexts_value, txn=txn)
cpos.put(bb("%s^%s^%s^%s^" % ("", p, o, s)), contexts_value, txn=txn)
cosp.put(bb("%s^%s^%s^%s^" % ("", o, s, p)), contexts_value, txn=txn)
self.__needs_sync = True
def __remove(
self,
spo: Tuple[bytes, bytes, bytes],
c: bytes,
quoted: bool = False,
txn: Optional[Any] = None,
) -> None:
s, p, o = spo
cspo, cpos, cosp = self.__indicies
contexts_value = cspo.get(
"^".encode("latin-1").join(
["".encode("latin-1"), s, p, o, "".encode("latin-1")]
),
txn=txn,
) or "".encode("latin-1")
contexts = set(contexts_value.split("^".encode("latin-1")))
contexts.discard(c)
contexts_value = "^".encode("latin-1").join(contexts)
for i, _to_key, _from_key in self.__indicies_info:
i.delete(_to_key((s, p, o), c), txn=txn)
if not quoted:
if contexts_value:
for i, _to_key, _from_key in self.__indicies_info:
i.put(
_to_key((s, p, o), "".encode("latin-1")),
contexts_value,
txn=txn,
)
else:
for i, _to_key, _from_key in self.__indicies_info:
try:
i.delete(_to_key((s, p, o), "".encode("latin-1")), txn=txn)
except db.DBNotFoundError:
pass # TODO: is it okay to ignore these?
# type error: Signature of "remove" incompatible with supertype "Store"
[docs] def remove( # type: ignore[override] self, spo: _TriplePatternType, context: Optional[_ContextType], txn: Optional[Any] = None, ) -> None: subject, predicate, object = spo assert self.__open, "The Store must be open." Store.remove(self, (subject, predicate, object), context) _to_string = self._to_string
if context is not None:
if context == self:
context = None
if (
subject is not None
and predicate is not None
and object is not None
and context is not None
):
s = _to_string(subject, txn=txn)
p = _to_string(predicate, txn=txn)
o = _to_string(object, txn=txn)
c = _to_string(context, txn=txn)
value = self.__indicies[0].get(bb("%s^%s^%s^%s^" % (c, s, p, o)), txn=txn)
if value is not None:
self.__remove((bb(s), bb(p), bb(o)), bb(c), txn=txn)
self.__needs_sync = True
else:
cspo, cpos, cosp = self.__indicies
index, prefix, from_key, results_from_key = self.__lookup(
(subject, predicate, object), context, txn=txn
)
cursor = index.cursor(txn=txn)
try:
current = cursor.set_range(prefix)
needs_sync = True
except db.DBNotFoundError:
current = None
needs_sync = False
cursor.close()
while current:
key, value = current
cursor = index.cursor(txn=txn)
try:
cursor.set_range(key)
current = cursor.next
except db.DBNotFoundError:
current = None
cursor.close()
if key.startswith(prefix):
# NOTE on type error: variables are being reused with a
# different type
# type error: Incompatible types in assignment (expression has type "bytes", variable has type "str")
c, s, p, o = from_key(key) # type: ignore[assignment]
if context is None:
contexts_value = index.get(key, txn=txn) or "".encode("latin-1")
# remove triple from all non quoted contexts
contexts = set(contexts_value.split("^".encode("latin-1")))
# and from the conjunctive index
contexts.add("".encode("latin-1"))
for c in contexts:
for i, _to_key, _ in self.__indicies_info:
# NOTE on type error: variables are being
# reused with a different type
# type error: Argument 1 has incompatible type "Tuple[str, str, str]"; expected "Tuple[bytes, bytes, bytes]"
# type error: Argument 2 has incompatible type "str"; expected "bytes"
i.delete(_to_key((s, p, o), c), txn=txn) # type: ignore[arg-type]
else:
# type error: Argument 1 to "__remove" of "BerkeleyDB" has incompatible type "Tuple[str, str, str]"; expected "Tuple[bytes, bytes, bytes]"
# type error: Argument 2 to "__remove" of "BerkeleyDB" has incompatible type "str"; expected "bytes"
self.__remove((s, p, o), c, txn=txn) # type: ignore[arg-type]
else:
break
if context is not None:
if subject is None and predicate is None and object is None:
# TODO: also if context becomes empty and not just on
# remove((None, None, None), c)
try:
self.__contexts.delete(
bb(_to_string(context, txn=txn)), txn=txn
)
except db.DBNotFoundError:
pass
self.__needs_sync = needs_sync
[docs] def triples( self, spo: _TriplePatternType, context: Optional[_ContextType] = None, txn: Optional[Any] = None, ) -> Generator[ Tuple[_TripleType, Generator[Optional[_ContextType], None, None]], None, None, ]: """A generator over all the triples matching""" assert self.__open, "The Store must be open."
subject, predicate, object = spo
if context is not None:
if context == self:
context = None
# _from_string = self._from_string ## UNUSED
index, prefix, from_key, results_from_key = self.__lookup(
(subject, predicate, object), context, txn=txn
)
cursor = index.cursor(txn=txn)
try:
current = cursor.set_range(prefix)
except db.DBNotFoundError:
current = None
cursor.close()
while current:
key, value = current
cursor = index.cursor(txn=txn)
try:
cursor.set_range(key)
current = cursor.next
except db.DBNotFoundError:
current = None
cursor.close()
if key and key.startswith(prefix):
contexts_value = index.get(key, txn=txn)
# type error: Incompatible types in "yield" (actual type "Tuple[Tuple[Node, Node, Node], Generator[Node, None, None]]", expected type "Tuple[Tuple[IdentifiedNode, URIRef, Identifier], Iterator[Optional[Graph]]]")
# NOTE on type ignore: this is needed because some context is
# lost in the process of extracting triples from the database.
yield results_from_key(key, subject, predicate, object, contexts_value) # type: ignore[misc]
else:
break
[docs] def len(self, context: Optional[_ContextType] = None) -> int: assert self.__open, "The Store must be open." if context is not None: if context == self: context = None
if context is None:
prefix = "^".encode("latin-1")
else:
prefix = bb("%s^" % self._to_string(context))
index = self.__indicies[0]
cursor = index.cursor()
current = cursor.set_range(prefix)
count = 0
while current:
key, value = current
if key.startswith(prefix):
count += 1
current = cursor.next
else:
break
cursor.close()
return count
[docs] def bind(self, prefix: str, namespace: URIRef, override: bool = True) -> None: # NOTE on type error: this is because the variables are reused with # another type. # type error: Incompatible types in assignment (expression has type "bytes", variable has type "str") prefix = prefix.encode("utf-8") # type: ignore[assignment] # type error: Incompatible types in assignment (expression has type "bytes", variable has type "URIRef") namespace = namespace.encode("utf-8") # type: ignore[assignment] bound_prefix = self.__prefix.get(namespace) bound_namespace = self.__namespace.get(prefix) if override: if bound_prefix: self.__namespace.delete(bound_prefix) if bound_namespace: self.__prefix.delete(bound_namespace) self.__prefix[namespace] = prefix self.__namespace[prefix] = namespace else: self.__prefix[bound_namespace or namespace] = bound_prefix or prefix self.__namespace[bound_prefix or prefix] = bound_namespace or namespace
[docs] def namespace(self, prefix: str) -> Optional[URIRef]: # NOTE on type error: this is because the variable is reused with # another type. # type error: Incompatible types in assignment (expression has type "bytes", variable has type "str") prefix = prefix.encode("utf-8") # type: ignore[assignment] ns = self.__namespace.get(prefix, None) if ns is not None: return URIRef(ns.decode("utf-8")) return None
[docs] def prefix(self, namespace: URIRef) -> Optional[str]: # NOTE on type error: this is because the variable is reused with # another type. # type error: Incompatible types in assignment (expression has type "bytes", variable has type "URIRef") namespace = namespace.encode("utf-8") # type: ignore[assignment] prefix = self.__prefix.get(namespace, None) if prefix is not None: return prefix.decode("utf-8") return None
[docs] def namespaces(self) -> Generator[Tuple[str, URIRef], None, None]: cursor = self.__namespace.cursor() results = [] current = cursor.first() while current: prefix, namespace = current results.append((prefix.decode("utf-8"), namespace.decode("utf-8"))) current = cursor.next cursor.close() for prefix, namespace in results: yield prefix, URIRef(namespace)
[docs] def contexts( self, triple: Optional[_TripleType] = None ) -> Generator[_ContextType, None, None]: _from_string = self._from_string _to_string = self._to_string # NOTE on type errors: context is lost because of how data is loaded # from the DB. if triple: s: str p: str o: str # type error: Incompatible types in assignment (expression has type "Node", variable has type "str") s, p, o = triple # type: ignore[assignment] # type error: Argument 1 has incompatible type "str"; expected "Node" s = _to_string(s) # type: ignore[arg-type] # type error: Argument 1 has incompatible type "str"; expected "Node" p = _to_string(p) # type: ignore[arg-type] # type error: Argument 1 has incompatible type "str"; expected "Node" o = _to_string(o) # type: ignore[arg-type] contexts = self.__indicies[0].get(bb("%s^%s^%s^%s^" % ("", s, p, o))) if contexts: for c in contexts.split("^".encode("latin-1")): if c: # type error: Incompatible types in "yield" (actual type "Node", expected type "Graph") yield _from_string(c) # type: ignore[misc] else: index = self.__contexts cursor = index.cursor() current = cursor.first() cursor.close() while current: key, value = current context = _from_string(key) # type error: Incompatible types in "yield" (actual type "Node", expected type "Graph") yield context # type: ignore[misc] cursor = index.cursor() try: cursor.set_range(key) current = cursor.next except db.DBNotFoundError: current = None cursor.close()
[docs] def add_graph(self, graph: Graph) -> None: self.__contexts.put(bb(self._to_string(graph)), b"")
[docs] def remove_graph(self, graph: Graph): self.remove((None, None, None), graph)
def _from_string(self, i: bytes) -> Node:
k = self.__i2k.get(int(i))
return self._loads(k)
def _to_string(self, term: Node, txn: Optional[Any] = None) -> str:
k = self._dumps(term)
i = self.__k2i.get(k, txn=txn)
if i is None:
# weird behaviour from bsddb not taking a txn as a keyword argument
# for append
if self.transaction_aware:
i = "%s" % self.__i2k.append(k, txn)
else:
i = "%s" % self.__i2k.append(k)
self.__k2i.put(k, i.encode(), txn=txn)
else:
i = i.decode()
return i
def __lookup(
self,
spo: _TriplePatternType,
context: Optional[_ContextType],
txn: Optional[Any] = None,
) -> Tuple[db.DB, bytes, _FromKeyFunc, _ResultsFromKeyFunc]:
subject, predicate, object = spo
_to_string = self._to_string
# NOTE on type errors: this is because the same variable is used with different types.
if context is not None:
# type error: Incompatible types in assignment (expression has type "str", variable has type "Optional[Graph]")
context = _to_string(context, txn=txn) # type: ignore[assignment]
i = 0
if subject is not None:
i += 1
# type error: Incompatible types in assignment (expression has type "str", variable has type "Node")
subject = _to_string(subject, txn=txn) # type: ignore[assignment]
if predicate is not None:
i += 2
# type error: Incompatible types in assignment (expression has type "str", variable has type "Node")
predicate = _to_string(predicate, txn=txn) # type: ignore[assignment]
if object is not None:
i += 4
# type error: Incompatible types in assignment (expression has type "str", variable has type "Node")
object = _to_string(object, txn=txn) # type: ignore[assignment]
index, prefix_func, from_key, results_from_key = self.__lookup_dict[i]
# print (subject, predicate, object), context, prefix_func, index
# #DEBUG
# type error: Argument 1 has incompatible type "Tuple[Node, Node, Node]"; expected "Tuple[str, str, str]"
# type error: Argument 2 has incompatible type "Optional[Graph]"; expected "Optional[str]"
prefix = bb("^".join(prefix_func((subject, predicate, object), context))) # type: ignore[arg-type]
return index, prefix, from_key, results_from_key
def to_key_func(i: int) -> _ToKeyFunc: def to_key(triple: Tuple[bytes, bytes, bytes], context: bytes) -> bytes: "Takes a string; returns key" return "^".encode("latin-1").join( ( context, triple[i % 3], triple[(i + 1) % 3], triple[(i + 2) % 3], "".encode("latin-1"), ) ) # "" to tac on the trailing ^
return to_key
def from_key_func(i: int) -> _FromKeyFunc: def from_key(key: bytes) -> Tuple[bytes, bytes, bytes, bytes]: "Takes a key; returns string" parts = key.split("^".encode("latin-1")) return ( parts[0], parts[(3 - i + 0) % 3 + 1], parts[(3 - i + 1) % 3 + 1], parts[(3 - i + 2) % 3 + 1], )
return from_key
def results_from_key_func( i: int, from_string: Callable[[bytes], Node] ) -> _ResultsFromKeyFunc: def from_key( key: bytes, subject: Optional[Node], predicate: Optional[Node], object: Optional[Node], contexts_value: bytes, ) -> Tuple[Tuple[Node, Node, Node], Generator[Node, None, None]]: "Takes a key and subject, predicate, object; returns tuple for yield" parts = key.split("^".encode("latin-1")) if subject is None: # TODO: i & 1: # dis assemble and/or measure to see which is faster # subject is None or i & 1 s = from_string(parts[(3 - i + 0) % 3 + 1]) else: s = subject if predicate is None: # i & 2: p = from_string(parts[(3 - i + 1) % 3 + 1]) else: p = predicate if object is None: # i & 4: o = from_string(parts[(3 - i + 2) % 3 + 1]) else: o = object return ( (s, p, o), (from_string(c) for c in contexts_value.split("^".encode("latin-1")) if c), )
return from_key
TODO: Remove unused
def readable_index(i: int) -> str: # type error: Unpacking a string is disallowed s, p, o = "?" * 3 # type: ignore[misc] if i & 1: s = "s" if i & 2: p = "p" if i & 4: o = "o" return "%s,%s,%s" % (s, p, o)