jsondocstore
JsonDocStore package.
Store JSON documents as individual files and optionally query them through exact-match indexes on top-level fields.
20class JsonDocStore: 21 """Store JSON documents as individual files in a directory. 22 23 Document identity is the filename stem. Optional indexes are stored in 24 ``index.json`` and support exact-match queries on top-level fields. 25 """ 26 27 def __init__(self, root: str | Path, create: bool = False): 28 """Open a document store rooted at ``root``. 29 30 If ``create`` is true, the directory is created when missing. 31 ``index.json`` is optional and is only needed for indexed queries. 32 """ 33 self.root = Path(root) 34 if create: 35 self.root.mkdir(parents=True, exist_ok=True) 36 elif not self.root.exists(): 37 raise ValueError(f"Directory does not exist: {self.root}") 38 if not self.root.is_dir(): 39 raise ValueError(f"Path is not a directory: {self.root}") 40 41 self.schema_path = self.root / _SCHEMA_FILENAME 42 self.schema = self._load_schema() if self.schema_path.exists() else None 43 self.index_fields = list(self.schema.get("index_fields", [])) if self.schema else [] 44 self.indexes: dict[str, dict[Any, set[str]]] = {field: defaultdict(set) for field in self.index_fields} 45 if self.schema is not None: 46 self._rebuild_index() 47 48 def _load_schema(self) -> dict[str, Any]: 49 schema = json.loads(self.schema_path.read_text(encoding="utf-8")) 50 if "index_fields" not in schema: 51 raise ValueError("index.json must contain 'index_fields'") 52 if not isinstance(schema["index_fields"], list): 53 raise ValueError("'index_fields' must be a list") 54 if not all(isinstance(field, str) for field in schema["index_fields"]): 55 raise ValueError("'index_fields' entries must be strings") 56 if not all(field for field in schema["index_fields"]): 57 raise ValueError("'index_fields' entries must not be empty") 58 return schema 59 60 def _doc_path(self, pk: str) -> Path: 61 return self.root / f"{pk}.json" 62 63 def _validate_key(self, pk: str) -> None: 64 if not pk: 65 raise ValueError("Document key must not be empty") 66 if pk in {".", ".."}: 67 raise ValueError(f"Invalid document key: {pk}") 68 if pk == Path(_SCHEMA_FILENAME).stem: 69 raise ValueError(f"Document key '{Path(_SCHEMA_FILENAME).stem}' is reserved") 70 if not _VALID_KEY_RE.fullmatch(pk): 71 raise ValueError( 72 "Invalid document key. Use only letters, digits, dot, underscore, or hyphen" 73 ) 74 75 def _validate_field_name(self, field: str) -> None: 76 if not isinstance(field, str): 77 raise ValueError("Indexed field name must be a string") 78 if not field: 79 raise ValueError("Indexed field name must not be empty") 80 81 def _rebuild_index(self) -> None: 82 for field in self.index_fields: 83 self.indexes[field].clear() 84 for path in sorted(self.root.glob("*.json")): 85 if path.name == _SCHEMA_FILENAME: 86 continue 87 doc = json.loads(path.read_text(encoding="utf-8")) 88 self._add_indexes(path.stem, doc) 89 90 def _list_doc_names(self) -> list[str]: 91 return [ 92 path.name 93 for path in sorted(self.root.glob("*.json")) 94 if path.name != _SCHEMA_FILENAME 95 ] 96 97 def _add_indexes(self, pk: str, doc: dict[str, Any]) -> None: 98 for field in self.index_fields: 99 if field in doc: 100 self.indexes[field][doc[field]].add(pk) 101 102 def _remove_indexes(self, pk: str, doc: dict[str, Any]) -> None: 103 for field in self.index_fields: 104 if field in doc: 105 value = doc[field] 106 bucket = self.indexes[field].get(value) 107 if bucket is not None: 108 bucket.discard(pk) 109 if not bucket: 110 del self.indexes[field][value] 111 112 def _write_json_atomic(self, path: Path, obj: dict[str, Any]) -> None: 113 with tempfile.NamedTemporaryFile( 114 "w", encoding="utf-8", dir=str(self.root), delete=False 115 ) as tmp: 116 tmp.write(_json_dump(obj)) 117 tmp_path = Path(tmp.name) 118 tmp_path.replace(path) 119 120 def list_all(self) -> list[str]: 121 """Return all document filenames in the store.""" 122 return self._list_doc_names() 123 124 def get(self, pk: str) -> dict[str, Any]: 125 """Return the document stored under ``pk``. 126 127 Raises ``KeyError`` if the document does not exist. 128 """ 129 self._validate_key(pk) 130 path = self._doc_path(pk) 131 if not path.exists(): 132 raise KeyError(f"Document not found: {pk}") 133 return json.loads(path.read_text(encoding="utf-8")) 134 135 def query_by(self, field: str, value: Any) -> dict[str, dict[str, Any]]: 136 """Return documents whose indexed ``field`` exactly matches ``value``. 137 138 The result is a mapping of ``key -> document``. Raises ``ValueError`` if 139 no index exists or if ``field`` is not indexed. 140 """ 141 self._validate_field_name(field) 142 if self.schema is None: 143 raise ValueError("Cannot query without an index. Create an index first") 144 if field not in self.indexes: 145 raise ValueError(f"Field is not indexed: {field}") 146 return {pk: self.get(pk) for pk in sorted(self.indexes[field].get(value, ()))} 147 148 def create_index(self, field: str) -> None: 149 """Create an exact-match index for a top-level field. 150 151 Creates ``index.json`` when needed. Raises ``ValueError`` if the index 152 already exists. 153 """ 154 self._validate_field_name(field) 155 if self.schema is None: 156 self.schema = {"index_fields": []} 157 self.index_fields = [] 158 self.indexes = {} 159 if field in self.indexes: 160 raise ValueError(f"Index already exists: {field}. Delete it before recreating") 161 self.index_fields.append(field) 162 self.indexes[field] = defaultdict(set) 163 self.schema["index_fields"] = list(self.index_fields) 164 self._write_json_atomic(self.schema_path, self.schema) 165 self._rebuild_index() 166 167 def list_indexes(self) -> list[str]: 168 """Return the sorted list of indexed fields.""" 169 return sorted(self.index_fields) 170 171 def delete_index(self, field: str) -> bool: 172 """Delete an index by field name. 173 174 Returns ``True`` if an index was deleted, ``False`` if it did not 175 exist. Raises ``ValueError`` if no ``index.json`` exists. 176 """ 177 self._validate_field_name(field) 178 if self.schema is None: 179 raise ValueError("Cannot delete an index without an index.json") 180 if field not in self.indexes: 181 return False 182 self.index_fields = [name for name in self.index_fields if name != field] 183 del self.indexes[field] 184 self.schema["index_fields"] = list(self.index_fields) 185 self._write_json_atomic(self.schema_path, self.schema) 186 return True 187 188 def insert(self, pk: str, doc: dict[str, Any]) -> dict[str, Any]: 189 """Insert a new document under ``pk`` and return it. 190 191 Raises ``ValueError`` if the key is invalid or already exists. 192 """ 193 self._validate_key(pk) 194 path = self._doc_path(pk) 195 if path.exists(): 196 raise ValueError(f"Primary key already exists: {pk}") 197 if self.schema is not None: 198 self._add_indexes(pk, doc) 199 self._write_json_atomic(path, doc) 200 return doc 201 202 def update(self, pk: str, doc: dict[str, Any]) -> dict[str, Any]: 203 """Replace the existing document stored under ``pk`` and return it. 204 205 Raises ``KeyError`` if the document does not exist. 206 """ 207 self._validate_key(pk) 208 path = self._doc_path(pk) 209 if not path.exists(): 210 raise KeyError(f"Document not found: {pk}") 211 if self.schema is not None: 212 old_doc = self.get(pk) 213 self._remove_indexes(pk, old_doc) 214 self._add_indexes(pk, doc) 215 self._write_json_atomic(path, doc) 216 return doc 217 218 def delete(self, pk: str) -> None: 219 """Delete the document stored under ``pk``. 220 221 Raises ``KeyError`` if the document does not exist. 222 """ 223 self._validate_key(pk) 224 path = self._doc_path(pk) 225 if not path.exists(): 226 raise KeyError(f"Document not found: {pk}") 227 if self.schema is not None: 228 doc = self.get(pk) 229 self._remove_indexes(pk, doc) 230 path.unlink()
Store JSON documents as individual files in a directory.
Document identity is the filename stem. Optional indexes are stored in
index.json and support exact-match queries on top-level fields.
27 def __init__(self, root: str | Path, create: bool = False): 28 """Open a document store rooted at ``root``. 29 30 If ``create`` is true, the directory is created when missing. 31 ``index.json`` is optional and is only needed for indexed queries. 32 """ 33 self.root = Path(root) 34 if create: 35 self.root.mkdir(parents=True, exist_ok=True) 36 elif not self.root.exists(): 37 raise ValueError(f"Directory does not exist: {self.root}") 38 if not self.root.is_dir(): 39 raise ValueError(f"Path is not a directory: {self.root}") 40 41 self.schema_path = self.root / _SCHEMA_FILENAME 42 self.schema = self._load_schema() if self.schema_path.exists() else None 43 self.index_fields = list(self.schema.get("index_fields", [])) if self.schema else [] 44 self.indexes: dict[str, dict[Any, set[str]]] = {field: defaultdict(set) for field in self.index_fields} 45 if self.schema is not None: 46 self._rebuild_index()
Open a document store rooted at root.
If create is true, the directory is created when missing.
index.json is optional and is only needed for indexed queries.
120 def list_all(self) -> list[str]: 121 """Return all document filenames in the store.""" 122 return self._list_doc_names()
Return all document filenames in the store.
124 def get(self, pk: str) -> dict[str, Any]: 125 """Return the document stored under ``pk``. 126 127 Raises ``KeyError`` if the document does not exist. 128 """ 129 self._validate_key(pk) 130 path = self._doc_path(pk) 131 if not path.exists(): 132 raise KeyError(f"Document not found: {pk}") 133 return json.loads(path.read_text(encoding="utf-8"))
Return the document stored under pk.
Raises KeyError if the document does not exist.
135 def query_by(self, field: str, value: Any) -> dict[str, dict[str, Any]]: 136 """Return documents whose indexed ``field`` exactly matches ``value``. 137 138 The result is a mapping of ``key -> document``. Raises ``ValueError`` if 139 no index exists or if ``field`` is not indexed. 140 """ 141 self._validate_field_name(field) 142 if self.schema is None: 143 raise ValueError("Cannot query without an index. Create an index first") 144 if field not in self.indexes: 145 raise ValueError(f"Field is not indexed: {field}") 146 return {pk: self.get(pk) for pk in sorted(self.indexes[field].get(value, ()))}
Return documents whose indexed field exactly matches value.
The result is a mapping of key -> document. Raises ValueError if
no index exists or if field is not indexed.
148 def create_index(self, field: str) -> None: 149 """Create an exact-match index for a top-level field. 150 151 Creates ``index.json`` when needed. Raises ``ValueError`` if the index 152 already exists. 153 """ 154 self._validate_field_name(field) 155 if self.schema is None: 156 self.schema = {"index_fields": []} 157 self.index_fields = [] 158 self.indexes = {} 159 if field in self.indexes: 160 raise ValueError(f"Index already exists: {field}. Delete it before recreating") 161 self.index_fields.append(field) 162 self.indexes[field] = defaultdict(set) 163 self.schema["index_fields"] = list(self.index_fields) 164 self._write_json_atomic(self.schema_path, self.schema) 165 self._rebuild_index()
Create an exact-match index for a top-level field.
Creates index.json when needed. Raises ValueError if the index
already exists.
167 def list_indexes(self) -> list[str]: 168 """Return the sorted list of indexed fields.""" 169 return sorted(self.index_fields)
Return the sorted list of indexed fields.
171 def delete_index(self, field: str) -> bool: 172 """Delete an index by field name. 173 174 Returns ``True`` if an index was deleted, ``False`` if it did not 175 exist. Raises ``ValueError`` if no ``index.json`` exists. 176 """ 177 self._validate_field_name(field) 178 if self.schema is None: 179 raise ValueError("Cannot delete an index without an index.json") 180 if field not in self.indexes: 181 return False 182 self.index_fields = [name for name in self.index_fields if name != field] 183 del self.indexes[field] 184 self.schema["index_fields"] = list(self.index_fields) 185 self._write_json_atomic(self.schema_path, self.schema) 186 return True
Delete an index by field name.
Returns True if an index was deleted, False if it did not
exist. Raises ValueError if no index.json exists.
188 def insert(self, pk: str, doc: dict[str, Any]) -> dict[str, Any]: 189 """Insert a new document under ``pk`` and return it. 190 191 Raises ``ValueError`` if the key is invalid or already exists. 192 """ 193 self._validate_key(pk) 194 path = self._doc_path(pk) 195 if path.exists(): 196 raise ValueError(f"Primary key already exists: {pk}") 197 if self.schema is not None: 198 self._add_indexes(pk, doc) 199 self._write_json_atomic(path, doc) 200 return doc
Insert a new document under pk and return it.
Raises ValueError if the key is invalid or already exists.
202 def update(self, pk: str, doc: dict[str, Any]) -> dict[str, Any]: 203 """Replace the existing document stored under ``pk`` and return it. 204 205 Raises ``KeyError`` if the document does not exist. 206 """ 207 self._validate_key(pk) 208 path = self._doc_path(pk) 209 if not path.exists(): 210 raise KeyError(f"Document not found: {pk}") 211 if self.schema is not None: 212 old_doc = self.get(pk) 213 self._remove_indexes(pk, old_doc) 214 self._add_indexes(pk, doc) 215 self._write_json_atomic(path, doc) 216 return doc
Replace the existing document stored under pk and return it.
Raises KeyError if the document does not exist.
218 def delete(self, pk: str) -> None: 219 """Delete the document stored under ``pk``. 220 221 Raises ``KeyError`` if the document does not exist. 222 """ 223 self._validate_key(pk) 224 path = self._doc_path(pk) 225 if not path.exists(): 226 raise KeyError(f"Document not found: {pk}") 227 if self.schema is not None: 228 doc = self.get(pk) 229 self._remove_indexes(pk, doc) 230 path.unlink()
Delete the document stored under pk.
Raises KeyError if the document does not exist.