jsondocstore

JsonDocStore package.

Store JSON documents as individual files and optionally query them through exact-match indexes on top-level fields.

1"""JsonDocStore package.
2
3Store JSON documents as individual files and optionally query them through
4exact-match indexes on top-level fields.
5"""
6
7from .core import JsonDocStore
8
9__all__ = ["JsonDocStore"]
class JsonDocStore:
 20class JsonDocStore:
 21    """Store JSON documents as individual files in a directory.
 22
 23    Document identity is the filename stem. Optional indexes are stored in
 24    ``index.json`` and support exact-match queries on top-level fields.
 25    """
 26
 27    def __init__(self, root: str | Path, create: bool = False):
 28        """Open a document store rooted at ``root``.
 29
 30        If ``create`` is true, the directory is created when missing.
 31        ``index.json`` is optional and is only needed for indexed queries.
 32        """
 33        self.root = Path(root)
 34        if create:
 35            self.root.mkdir(parents=True, exist_ok=True)
 36        elif not self.root.exists():
 37            raise ValueError(f"Directory does not exist: {self.root}")
 38        if not self.root.is_dir():
 39            raise ValueError(f"Path is not a directory: {self.root}")
 40
 41        self.schema_path = self.root / _SCHEMA_FILENAME
 42        self.schema = self._load_schema() if self.schema_path.exists() else None
 43        self.index_fields = list(self.schema.get("index_fields", [])) if self.schema else []
 44        self.indexes: dict[str, dict[Any, set[str]]] = {field: defaultdict(set) for field in self.index_fields}
 45        if self.schema is not None:
 46            self._rebuild_index()
 47
 48    def _load_schema(self) -> dict[str, Any]:
 49        schema = json.loads(self.schema_path.read_text(encoding="utf-8"))
 50        if "index_fields" not in schema:
 51            raise ValueError("index.json must contain 'index_fields'")
 52        if not isinstance(schema["index_fields"], list):
 53            raise ValueError("'index_fields' must be a list")
 54        if not all(isinstance(field, str) for field in schema["index_fields"]):
 55            raise ValueError("'index_fields' entries must be strings")
 56        if not all(field for field in schema["index_fields"]):
 57            raise ValueError("'index_fields' entries must not be empty")
 58        return schema
 59
 60    def _doc_path(self, pk: str) -> Path:
 61        return self.root / f"{pk}.json"
 62
 63    def _validate_key(self, pk: str) -> None:
 64        if not pk:
 65            raise ValueError("Document key must not be empty")
 66        if pk in {".", ".."}:
 67            raise ValueError(f"Invalid document key: {pk}")
 68        if pk == Path(_SCHEMA_FILENAME).stem:
 69            raise ValueError(f"Document key '{Path(_SCHEMA_FILENAME).stem}' is reserved")
 70        if not _VALID_KEY_RE.fullmatch(pk):
 71            raise ValueError(
 72                "Invalid document key. Use only letters, digits, dot, underscore, or hyphen"
 73            )
 74
 75    def _validate_field_name(self, field: str) -> None:
 76        if not isinstance(field, str):
 77            raise ValueError("Indexed field name must be a string")
 78        if not field:
 79            raise ValueError("Indexed field name must not be empty")
 80
 81    def _rebuild_index(self) -> None:
 82        for field in self.index_fields:
 83            self.indexes[field].clear()
 84        for path in sorted(self.root.glob("*.json")):
 85            if path.name == _SCHEMA_FILENAME:
 86                continue
 87            doc = json.loads(path.read_text(encoding="utf-8"))
 88            self._add_indexes(path.stem, doc)
 89
 90    def _list_doc_names(self) -> list[str]:
 91        return [
 92            path.name
 93            for path in sorted(self.root.glob("*.json"))
 94            if path.name != _SCHEMA_FILENAME
 95        ]
 96
 97    def _add_indexes(self, pk: str, doc: dict[str, Any]) -> None:
 98        for field in self.index_fields:
 99            if field in doc:
100                self.indexes[field][doc[field]].add(pk)
101
102    def _remove_indexes(self, pk: str, doc: dict[str, Any]) -> None:
103        for field in self.index_fields:
104            if field in doc:
105                value = doc[field]
106                bucket = self.indexes[field].get(value)
107                if bucket is not None:
108                    bucket.discard(pk)
109                    if not bucket:
110                        del self.indexes[field][value]
111
112    def _write_json_atomic(self, path: Path, obj: dict[str, Any]) -> None:
113        with tempfile.NamedTemporaryFile(
114            "w", encoding="utf-8", dir=str(self.root), delete=False
115        ) as tmp:
116            tmp.write(_json_dump(obj))
117            tmp_path = Path(tmp.name)
118        tmp_path.replace(path)
119
120    def list_all(self) -> list[str]:
121        """Return all document filenames in the store."""
122        return self._list_doc_names()
123
124    def get(self, pk: str) -> dict[str, Any]:
125        """Return the document stored under ``pk``.
126
127        Raises ``KeyError`` if the document does not exist.
128        """
129        self._validate_key(pk)
130        path = self._doc_path(pk)
131        if not path.exists():
132            raise KeyError(f"Document not found: {pk}")
133        return json.loads(path.read_text(encoding="utf-8"))
134
135    def query_by(self, field: str, value: Any) -> dict[str, dict[str, Any]]:
136        """Return documents whose indexed ``field`` exactly matches ``value``.
137
138        The result is a mapping of ``key -> document``. Raises ``ValueError`` if
139        no index exists or if ``field`` is not indexed.
140        """
141        self._validate_field_name(field)
142        if self.schema is None:
143            raise ValueError("Cannot query without an index. Create an index first")
144        if field not in self.indexes:
145            raise ValueError(f"Field is not indexed: {field}")
146        return {pk: self.get(pk) for pk in sorted(self.indexes[field].get(value, ()))}
147
148    def create_index(self, field: str) -> None:
149        """Create an exact-match index for a top-level field.
150
151        Creates ``index.json`` when needed. Raises ``ValueError`` if the index
152        already exists.
153        """
154        self._validate_field_name(field)
155        if self.schema is None:
156            self.schema = {"index_fields": []}
157            self.index_fields = []
158            self.indexes = {}
159        if field in self.indexes:
160            raise ValueError(f"Index already exists: {field}. Delete it before recreating")
161        self.index_fields.append(field)
162        self.indexes[field] = defaultdict(set)
163        self.schema["index_fields"] = list(self.index_fields)
164        self._write_json_atomic(self.schema_path, self.schema)
165        self._rebuild_index()
166
167    def list_indexes(self) -> list[str]:
168        """Return the sorted list of indexed fields."""
169        return sorted(self.index_fields)
170
171    def delete_index(self, field: str) -> bool:
172        """Delete an index by field name.
173
174        Returns ``True`` if an index was deleted, ``False`` if it did not
175        exist. Raises ``ValueError`` if no ``index.json`` exists.
176        """
177        self._validate_field_name(field)
178        if self.schema is None:
179            raise ValueError("Cannot delete an index without an index.json")
180        if field not in self.indexes:
181            return False
182        self.index_fields = [name for name in self.index_fields if name != field]
183        del self.indexes[field]
184        self.schema["index_fields"] = list(self.index_fields)
185        self._write_json_atomic(self.schema_path, self.schema)
186        return True
187
188    def insert(self, pk: str, doc: dict[str, Any]) -> dict[str, Any]:
189        """Insert a new document under ``pk`` and return it.
190
191        Raises ``ValueError`` if the key is invalid or already exists.
192        """
193        self._validate_key(pk)
194        path = self._doc_path(pk)
195        if path.exists():
196            raise ValueError(f"Primary key already exists: {pk}")
197        if self.schema is not None:
198            self._add_indexes(pk, doc)
199        self._write_json_atomic(path, doc)
200        return doc
201
202    def update(self, pk: str, doc: dict[str, Any]) -> dict[str, Any]:
203        """Replace the existing document stored under ``pk`` and return it.
204
205        Raises ``KeyError`` if the document does not exist.
206        """
207        self._validate_key(pk)
208        path = self._doc_path(pk)
209        if not path.exists():
210            raise KeyError(f"Document not found: {pk}")
211        if self.schema is not None:
212            old_doc = self.get(pk)
213            self._remove_indexes(pk, old_doc)
214            self._add_indexes(pk, doc)
215        self._write_json_atomic(path, doc)
216        return doc
217
218    def delete(self, pk: str) -> None:
219        """Delete the document stored under ``pk``.
220
221        Raises ``KeyError`` if the document does not exist.
222        """
223        self._validate_key(pk)
224        path = self._doc_path(pk)
225        if not path.exists():
226            raise KeyError(f"Document not found: {pk}")
227        if self.schema is not None:
228            doc = self.get(pk)
229            self._remove_indexes(pk, doc)
230        path.unlink()

Store JSON documents as individual files in a directory.

Document identity is the filename stem. Optional indexes are stored in index.json and support exact-match queries on top-level fields.

JsonDocStore(root: str | pathlib.Path, create: bool = False)
27    def __init__(self, root: str | Path, create: bool = False):
28        """Open a document store rooted at ``root``.
29
30        If ``create`` is true, the directory is created when missing.
31        ``index.json`` is optional and is only needed for indexed queries.
32        """
33        self.root = Path(root)
34        if create:
35            self.root.mkdir(parents=True, exist_ok=True)
36        elif not self.root.exists():
37            raise ValueError(f"Directory does not exist: {self.root}")
38        if not self.root.is_dir():
39            raise ValueError(f"Path is not a directory: {self.root}")
40
41        self.schema_path = self.root / _SCHEMA_FILENAME
42        self.schema = self._load_schema() if self.schema_path.exists() else None
43        self.index_fields = list(self.schema.get("index_fields", [])) if self.schema else []
44        self.indexes: dict[str, dict[Any, set[str]]] = {field: defaultdict(set) for field in self.index_fields}
45        if self.schema is not None:
46            self._rebuild_index()

Open a document store rooted at root.

If create is true, the directory is created when missing. index.json is optional and is only needed for indexed queries.

root
schema_path
schema
index_fields
indexes: dict[str, dict[typing.Any, set[str]]]
def list_all(self) -> list[str]:
120    def list_all(self) -> list[str]:
121        """Return all document filenames in the store."""
122        return self._list_doc_names()

Return all document filenames in the store.

def get(self, pk: str) -> dict[str, typing.Any]:
124    def get(self, pk: str) -> dict[str, Any]:
125        """Return the document stored under ``pk``.
126
127        Raises ``KeyError`` if the document does not exist.
128        """
129        self._validate_key(pk)
130        path = self._doc_path(pk)
131        if not path.exists():
132            raise KeyError(f"Document not found: {pk}")
133        return json.loads(path.read_text(encoding="utf-8"))

Return the document stored under pk.

Raises KeyError if the document does not exist.

def query_by(self, field: str, value: Any) -> dict[str, dict[str, typing.Any]]:
135    def query_by(self, field: str, value: Any) -> dict[str, dict[str, Any]]:
136        """Return documents whose indexed ``field`` exactly matches ``value``.
137
138        The result is a mapping of ``key -> document``. Raises ``ValueError`` if
139        no index exists or if ``field`` is not indexed.
140        """
141        self._validate_field_name(field)
142        if self.schema is None:
143            raise ValueError("Cannot query without an index. Create an index first")
144        if field not in self.indexes:
145            raise ValueError(f"Field is not indexed: {field}")
146        return {pk: self.get(pk) for pk in sorted(self.indexes[field].get(value, ()))}

Return documents whose indexed field exactly matches value.

The result is a mapping of key -> document. Raises ValueError if no index exists or if field is not indexed.

def create_index(self, field: str) -> None:
148    def create_index(self, field: str) -> None:
149        """Create an exact-match index for a top-level field.
150
151        Creates ``index.json`` when needed. Raises ``ValueError`` if the index
152        already exists.
153        """
154        self._validate_field_name(field)
155        if self.schema is None:
156            self.schema = {"index_fields": []}
157            self.index_fields = []
158            self.indexes = {}
159        if field in self.indexes:
160            raise ValueError(f"Index already exists: {field}. Delete it before recreating")
161        self.index_fields.append(field)
162        self.indexes[field] = defaultdict(set)
163        self.schema["index_fields"] = list(self.index_fields)
164        self._write_json_atomic(self.schema_path, self.schema)
165        self._rebuild_index()

Create an exact-match index for a top-level field.

Creates index.json when needed. Raises ValueError if the index already exists.

def list_indexes(self) -> list[str]:
167    def list_indexes(self) -> list[str]:
168        """Return the sorted list of indexed fields."""
169        return sorted(self.index_fields)

Return the sorted list of indexed fields.

def delete_index(self, field: str) -> bool:
171    def delete_index(self, field: str) -> bool:
172        """Delete an index by field name.
173
174        Returns ``True`` if an index was deleted, ``False`` if it did not
175        exist. Raises ``ValueError`` if no ``index.json`` exists.
176        """
177        self._validate_field_name(field)
178        if self.schema is None:
179            raise ValueError("Cannot delete an index without an index.json")
180        if field not in self.indexes:
181            return False
182        self.index_fields = [name for name in self.index_fields if name != field]
183        del self.indexes[field]
184        self.schema["index_fields"] = list(self.index_fields)
185        self._write_json_atomic(self.schema_path, self.schema)
186        return True

Delete an index by field name.

Returns True if an index was deleted, False if it did not exist. Raises ValueError if no index.json exists.

def insert(self, pk: str, doc: dict[str, typing.Any]) -> dict[str, typing.Any]:
188    def insert(self, pk: str, doc: dict[str, Any]) -> dict[str, Any]:
189        """Insert a new document under ``pk`` and return it.
190
191        Raises ``ValueError`` if the key is invalid or already exists.
192        """
193        self._validate_key(pk)
194        path = self._doc_path(pk)
195        if path.exists():
196            raise ValueError(f"Primary key already exists: {pk}")
197        if self.schema is not None:
198            self._add_indexes(pk, doc)
199        self._write_json_atomic(path, doc)
200        return doc

Insert a new document under pk and return it.

Raises ValueError if the key is invalid or already exists.

def update(self, pk: str, doc: dict[str, typing.Any]) -> dict[str, typing.Any]:
202    def update(self, pk: str, doc: dict[str, Any]) -> dict[str, Any]:
203        """Replace the existing document stored under ``pk`` and return it.
204
205        Raises ``KeyError`` if the document does not exist.
206        """
207        self._validate_key(pk)
208        path = self._doc_path(pk)
209        if not path.exists():
210            raise KeyError(f"Document not found: {pk}")
211        if self.schema is not None:
212            old_doc = self.get(pk)
213            self._remove_indexes(pk, old_doc)
214            self._add_indexes(pk, doc)
215        self._write_json_atomic(path, doc)
216        return doc

Replace the existing document stored under pk and return it.

Raises KeyError if the document does not exist.

def delete(self, pk: str) -> None:
218    def delete(self, pk: str) -> None:
219        """Delete the document stored under ``pk``.
220
221        Raises ``KeyError`` if the document does not exist.
222        """
223        self._validate_key(pk)
224        path = self._doc_path(pk)
225        if not path.exists():
226            raise KeyError(f"Document not found: {pk}")
227        if self.schema is not None:
228            doc = self.get(pk)
229            self._remove_indexes(pk, doc)
230        path.unlink()

Delete the document stored under pk.

Raises KeyError if the document does not exist.