initial commit

This commit is contained in:
bit 2022-05-29 14:16:42 +02:00
commit a0a1098fad
8 changed files with 791 additions and 0 deletions

3
README.md Normal file
View File

@ -0,0 +1,3 @@
# pydatastorage
A python data storage backend library.

3
pyproject.toml Normal file
View File

@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools>=40.8.0", "wheel"]
build-backend = "setuptools.build_meta"

26
setup.cfg Normal file
View File

@ -0,0 +1,26 @@
[metadata]
name = pydatastorage
version = attr: pydatastorage.__version__
description = A python data storage backend library.
long_description = file: README.md
long_description_content_type = text/markdown
classifiers =
Development Status :: 2 - Pre-Alpha
Intended Audience :: Science/Research
Programming Language :: Python :: 3
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10
[options]
install_requires =
psutil
pydantic
pydantic_uuid_model@git+https://git.chaosbit.de/bit/PydanticUUIDModel.git
python-magic
requests
package_dir=
=src
packages=find:
[options.packages.find]
where=src

View File

@ -0,0 +1,20 @@
__version__ = "0.1.0"
__all__ = [
"DataID",
"Data",
"DataStorage",
"DirectoryDataStorage",
"Hash",
"NoDataWithIDError",
"RemoteDataStorage",
"RemoteDataStorageAccount",
"RemoteDataStorageServer",
"TempDataStorage",
]
from .base import Data, DataID, DataStorage, Hash, NoDataWithIDError
from .path import DirectoryDataStorage
from .temp import TempDataStorage
from .rds import RemoteDataStorage, RemoteDataStorageAccount, RemoteDataStorageServer

173
src/pydatastorage/base.py Normal file
View File

@ -0,0 +1,173 @@
import hashlib
import mimetypes
from typing import Iterator, List, Optional, Union
from io import BytesIO
import magic
from pydantic import BaseModel, PrivateAttr
from pydantic_uuid_model import UUIDBaseModel # type: ignore[import]
class Hash(str):
pass
DEFAULT_CHUNK_SIZE = 2**20
def sha256(data: Union[bytes, BytesIO], close: bool = True) -> Hash:
h = hashlib.sha256()
if isinstance(data, bytes):
data = BytesIO(data)
try:
while (chunk := data.read(DEFAULT_CHUNK_SIZE)):
h.update(chunk)
finally:
if close:
data.close()
return Hash(h.hexdigest())
class DataID(str):
pass
class StorageException(Exception):
pass
class NoDataWithIDError(StorageException):
id: DataID
def __init__(self, id: DataID, msg: Optional[str] = None) -> None:
if msg is None:
msg = f"no data with ID '{id}'"
super().__init__(msg)
self.id = id
class DataStorage(UUIDBaseModel, base=True): # type: ignore[call-arg]
def is_valid(self) -> bool:
raise NotImplementedError
def exists(self, did: DataID) -> bool:
try:
with self.get_io(did):
return True
except NoDataWithIDError:
return False
def has(self, identifier: Optional[Union[bytes, BytesIO, Hash]]) -> List["Data"]:
if not isinstance(identifier, str):
identifier = sha256(identifier)
return self.has_hash(identifier) if identifier else []
def has_hash(self, h: Hash) -> List[DataID]:
return [data.did for data in self if data.hash == h]
def get(self, did: DataID) -> "Data":
return Data(did=did, storage=self)
def get_bytes(self, did: DataID) -> bytes:
with self.get_io(did) as data_stream:
return data_stream.read()
def get_io(self, did: DataID) -> BytesIO:
raise NotImplementedError
def get_hash(self, did: DataID) -> Hash:
return sha256(self.get_io(did))
def get_mimetype(self, did: DataID) -> str:
with self.get_io(did) as stream:
return magic.from_buffer(stream.read(4096), mime=True)
def get_size(self, did: DataID) -> str:
size = 0
with self.get_io(did) as stream:
while (chunk := stream.read(DEFAULT_CHUNK_SIZE)):
size += len(chunk)
return size
def put(self, data: Union[bytes, BytesIO], close: bool = True) -> "Data":
if isinstance(data, bytes):
data = BytesIO(data)
try:
return Data(did=self.put_io(data), storage=self)
finally:
if close:
data.close()
def put_io(self, data: BytesIO) -> DataID:
raise NotImplementedError
def size(self) -> int:
return sum(d.size for d in self)
def available(self) -> int:
raise NotImplementedError
def total(self) -> int:
return self.size() - self.available()
def remove(self, did: DataID) -> bool:
raise NotImplementedError
def __len__(self) -> int:
return len(list(iter(self)))
def __iter__(self) -> Iterator["Data"]:
raise NotImplementedError
def destroy(self) -> bool:
return True
class Data(BaseModel):
did: DataID
storage: DataStorage
_hash: Optional[Hash] = PrivateAttr(default=None)
_mimetype: Optional[str] = PrivateAttr(default=None)
_suffix: Optional[str] = PrivateAttr(default=None)
_size: Optional[int] = PrivateAttr(default=None)
def exists(self) -> bool:
return self.storage.exists(self.did)
def bytes(self) -> bytes:
return self.storage.get_bytes(self.did)
def stream(self) -> BytesIO:
return self.storage.get_io(self.did)
@property
def hash(self) -> Hash:
if self._hash is None:
self._hash = self.storage.get_hash(self.did)
return self._hash
@property
def mimetype(self) -> Hash:
if self._mimetype is None:
self._mimetype = self.storage.get_mimetype(self.did)
return self._mimetype
@property
def suffix(self) -> str:
if self._suffix is None:
self._suffix = mimetypes.guess_extension(self.mimetype) or ""
return self._suffix
@property
def size(self) -> int:
if self._size is None:
self._size = self.storage.get_size(self.did)
return self._size
def delete(self) -> None:
self.storage.remove(self.did)
def remove(self) -> None:
return self.delete()

166
src/pydatastorage/path.py Normal file
View File

@ -0,0 +1,166 @@
import hashlib
import os
import shutil
from io import BytesIO
from pathlib import Path
from tempfile import TemporaryFile
from typing import Iterator, List, Optional, Union
from uuid import uuid4
import magic
import psutil
from . import Data, DataStorage, DataID, Hash, NoDataWithIDError
from .base import sha256, DEFAULT_CHUNK_SIZE
def hash_path(hx: str) -> Path:
return Path(".") / hx[:2] / hx[2:4] / hx[4:8] / hx
def append_id(file: Path, id: Optional[str] = None, suffix: str = "ids") -> str:
ids = load_ids(file, suffix)
ids.append(id or str(uuid4()))
store_ids(file, ids, suffix)
return ids[-1]
def rm_id(file: Path, id: str, suffix: str = "ids") -> bool:
ids = load_ids(file, suffix)
if id in ids:
ids.remove(id)
store_ids(file, ids, suffix)
return len(ids) == 0
def store_ids(file: Path, ids: List[str], suffix: str = "ids") -> None:
id_file = file.parent / f"{file.name}.{suffix}"
if len(ids) > 0:
id_file.write_text(os.linesep.join(ids))
else:
os.remove(id_file)
def load_ids(file: Path, suffix: str = "ids") -> List[str]:
id_file = file.parent / f"{file.name}.{suffix}"
return id_file.read_text().splitlines() if id_file.is_file() else []
class DirectoryDataStorage(DataStorage):
muuid = "11ffc9f4-1a7a-4e13-8a28-ab78d8364f27"
path: Path
@property
def data_path(self) -> Path:
return self.path / "data"
@property
def uuid_path(self) -> Path:
return self.path / "uuid"
def __init__(self, path: Path):
super().__init__(path=path)
self.is_valid()
def is_valid(self) -> bool:
for p in [self.path, self.data_path, self.uuid_path]:
if p.exists():
if not p.is_dir():
return False
else:
try:
p.mkdir(parents=True, exist_ok=True)
except IOError:
return False
return True
def exists(self, did: DataID) -> bool:
return (self.uuid_path / hash_path(did)).is_file()
def has_hash(self, h: Hash) -> List[DataID]:
return [DataID(did) for did in load_ids(self.data_path / hash_path(h))]
def get_file(self, did) -> Path:
return self.data_path / hash_path(self.get_hash(did))
def get_io(self, did: DataID) -> BytesIO:
return self.get_file(did).open("rb")
def get_hash(self, did: DataID) -> Hash:
file = self.uuid_path / hash_path(did)
if not file.is_file():
raise NoDataWithIDError(did)
return file.read_text()
def get_mimetype(self, did) -> str:
return magic.from_file(self.get_file(did), mime=True)
def get_size(self, did) -> int:
return self.get_file(did).stat().st_size
def available(self) -> int:
return psutil.disk_usage(self.path).free
def put_io(self, data: BytesIO) -> DataID:
did = str(uuid4())
did_file = self.uuid_path / hash_path(did)
file: Optional[Path] = None
try:
with TemporaryFile() as f:
while (chunk := data.read(DEFAULT_CHUNK_SIZE)):
f.write(chunk)
f.seek(0)
h = sha256(f, close=False)
file = self.data_path / hash_path(h)
file.parent.mkdir(parents=True, exist_ok=True)
append_id(file, did)
did_file.parent.mkdir(parents=True, exist_ok=True)
did_file.write_text(h)
if not file.is_file():
f.seek(0)
with file.open("wb") as fobj:
while (chunk := f.read(DEFAULT_CHUNK_SIZE)):
fobj.write(chunk)
return DataID(did)
except Exception:
if did_file.is_file():
os.remove(did_file)
while (did_file := did_file.parent) != self.uuid_path and did_file.is_dir() and len(list(did_file.iterdir())) == 0:
did_file.rmdir()
if file is not None:
if rm_id(file, did):
if file.is_file():
os.remove(file)
while (file := file.parent) != self.data_path and file.is_dir() and len(list(file.iterdir())) == 0:
file.rmdir()
raise
def remove(self, did: DataID) -> bool:
did_file = self.uuid_path / hash_path(did)
if not did_file.exists():
return False
h = did_file.read_text()
file = self.data_path / hash_path(h)
os.remove(did_file)
if did_file.is_file():
os.remove(did_file)
while (did_file := did_file.parent) != self.uuid_path and did_file.is_dir() and len(list(did_file.iterdir())) == 0:
did_file.rmdir()
if rm_id(file, did):
if file.is_file():
os.remove(file)
while (file := file.parent) != self.data_path and file.is_dir() and len(list(file.iterdir())) == 0:
file.rmdir()
return True
def destroy(self) -> bool:
success = True
if self.is_valid():
def onerror(*args, **kwargs):
success = False
shutil.rmtree(self.path, onerror=onerror)
return success
def __iter__(self) -> Iterator[Data]:
return iter(Data(did=DataID(f.name), storage=self, _hash=Hash(f.read_text())) for f in self.uuid_path.glob("*/*/*/*"))
def __len__(self) -> int:
return list(len(self.uuid_path.glob("*/*/*/*")))

329
src/pydatastorage/rds.py Normal file
View File

@ -0,0 +1,329 @@
import hashlib
import os
import shutil
from dataclasses import dataclass
from enum import Enum
from io import BytesIO
from pathlib import Path
from tempfile import TemporaryFile
from typing import Any, Iterator, List, Optional, Union
from uuid import uuid4
import requests
from pydantic import BaseModel, PrivateAttr
from . import Data, DataStorage, DataID, Hash, NoDataWithIDError
from .base import sha256, DEFAULT_CHUNK_SIZE
REMOTE_DATA_STORAGE_SERVER_HEALTH_CHECK_DATA = {"data": "RemoteDataStorage", "error": None, "success": True}
class Token(str):
pass
class User(BaseModel):
name: str
password: str
class AccessType(Enum):
READ = "read"
WRITE = "write"
POOL = "pool"
USER = "user"
class TokenAccessType(Enum):
READ = "read"
WRITE = "write"
POOL = "pool"
TOKEN_ACCESS_TYPES = {at.value: at for at in TokenAccessType}
class RemoteDataStorageException(Exception):
pass
class InvalidCredentialsException(RemoteDataStorageException):
pass
class InvalidResponseException(RemoteDataStorageException):
pass
class RemoteDataStorageServer(BaseModel):
base_url: str
def is_valid(self) -> bool:
res = requests.get(f"{self.base_url}/")
return (res.ok and res.json() == REMOTE_DATA_STORAGE_SERVER_HEALTH_CHECK_DATA)
def __init__(self, base_url: str) -> None:
return super().__init__(base_url=base_url)
def get_account(self, username: str, password: str) -> "RemoteDataStorageAccount":
return RemoteDataStorageAccount(server=self, user=User(name=username, password=password))
def get_pool(self, token: Union[Token, str], pid: Optional[str] = None) -> "RemoteDataStorage":
res = requests.get(f"{self.base_url}/pools/{pid or ''}", headers={"Authorization": f"Bearer {token}"})
if res.ok:
res = requests.get(f"{self.base_url}/pools/", headers={"Authorization": f"Bearer {token}"})
if res.ok:
pid = res.json()["data"][0]["uuid"]
elif res.status_code == 403:
raise InvalidCredentialsException
else:
raise InvalidResponseException(f"Status Code: {res.status_code}")
return RemoteDataStorage(base_url=self.base_url, pid=pid, creds=Token(token))
class RemoteDataStorageAccount(BaseModel):
server: RemoteDataStorageServer
user: User
_session: Optional[requests.Session] = PrivateAttr(None)
@property
def session(self) -> requests.Session:
if self._session is None:
self._session = requests.Session()
self._session.auth = (self.user.name, self.user.password)
return self._session
def is_valid(self) -> bool:
res = requests.get(f"{self.server.base_url}/")
if res.ok and res.json() == REMOTE_DATA_STORAGE_SERVER_HEALTH_CHECK_DATA:
res = self.session.get(f"{self.server.base_url}/pools/")
if res.ok:
return True
return False
def pools(self) -> List["RemoteDataStorage"]:
res = self.session.get(f"{self.server.base_url}/pools/")
if res.ok:
return [RemoteDataStorage(base_url=self.server.base_url, pid=pool["uuid"], creds=self.user) for pool in res.json()["data"]]
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
def get_pool(self, pid: str) -> "RemoteDataStorage":
return RemoteDataStorage(base_url=self.server.base_url, pid=pid, creds=self.user)
def create_pool(self) -> "RemoteDataStorage":
res = self.session.post(f"{self.server.base_url}/pools/")
if res.ok:
pid = res.json()["data"]["uuid"]
return RemoteDataStorage(base_url=self.server.base_url, pid=pid, creds=self.user)
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
class RemoteDataStorage(DataStorage):
muuid = "e9be7b8b-5cf4-4931-a9e8-966138822e05"
base_url: str
pid: str
creds: Union[Token, User]
_session: Optional[requests.Session] = PrivateAttr(None)
_level: Optional[AccessType] = PrivateAttr(None)
def __init__(self, base_url: str, pid: str, creds: Union[Token, User]) -> None:
super().__init__(base_url=base_url, pid=pid, creds=creds)
if isinstance(creds, User):
self._level = AccessType.USER
self.is_valid()
def server(self) -> RemoteDataStorageServer:
return RemoteDataStorageServer(base_url=self.base_url)
def account(self) -> RemoteDataStorageAccount:
if isinstance(self.creds, User):
return RemoteDataStorageAccount(self.server(), self.creds)
raise InvalidCredentialsException("credentials type 'user' required but 'token' given")
def __setattr__(self, name: str, value: Any) -> None:
if name in ["base_url", "pid", "creds"]:
self._session = None
self._level = None
return super().__setattr__(name, value)
@property
def level(self) -> AccessType:
if self._level is None:
if isinstance(self.creds, User):
self._level = AccessType.USER
else:
res = self.session.get(f"{self.base_url}/token/{self.creds}")
if res.ok:
self._level = res.json()["data"]["value"]
elif res.status_code in [403, 404]:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
return self._level
@property
def session(self) -> requests.Session:
if self._session is None:
_session = requests.Session()
if isinstance(self.creds, User):
_session.auth = (self.creds.name, self.creds.password)
else:
_session.headers["Authorization"] = f"Bearer {self.creds}"
self._session = _session
return self._session
def is_valid(self) -> bool:
res = requests.get(f"{self.base_url}/")
if res.ok and res.json() == REMOTE_DATA_STORAGE_SERVER_HEALTH_CHECK_DATA:
res = self.session.get(f"{self.base_url}/pools/{self.pid}")
if res.ok:
return True
return False
def token(self) -> List[Token]:
return [rds.creds for rds in self.by_token()]
def by_token(self) -> List["RemoteDataStorage"]:
res = self.session.get(f"{self.base_url}/token/", params={"pool": self.pid})
if res.ok:
return [
RemoteDataStorage(
base_url=self.base_url,
pid=self.pid,
creds=Token(token["value"]),
_level=TOKEN_ACCESS_TYPES[token["type"]]
)
for token in res.json()["data"]
]
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
def create(self, typ: TokenAccessType = TokenAccessType.READ) -> "RemoteDataStorage":
res = self.session.post(f"{self.base_url}/token/", data={"pool": self.pid, "type": typ.value})
if res.ok:
token = res.json()["data"]
rds = RemoteDataStorage(
base_url=self.base_url,
pid=self.pid,
creds=token["value"],
)
rds._level=TOKEN_ACCESS_TYPES[token["type"]]
return rds
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
def exists(self, did: DataID) -> bool:
try:
self.get_hash(did)
except NoDataWithIDError:
return False
return True
def has_hash(self, h: Hash) -> List[DataID]:
res = self.session.get(f"{self.base_url}/data/", stream=True, params={"pool": self.pid, "sha256": h})
if res.ok:
entries = res.json()["data"]
return [DataID(entry["uuid"]) for entry in entries]
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
def get_io(self, did: DataID) -> BytesIO:
res = self.session.get(f"{self.base_url}/data/{did}", stream=True, params={"pool": self.pid, "download": True})
if res.ok:
return res.raw
elif res.status_code == 403:
raise InvalidCredentialsException
elif res.status_code == 404:
raise NoDataWithIDError(did)
raise InvalidResponseException(f"Status Code: {res.status_code}")
def get(self, did: DataID) -> Data:
res = self.session.get(f"{self.base_url}/data/{did}", params={"pool": self.pid})
if res.ok:
rdata = res.json()["data"]
data = super().get(did)
data._hash = Hash(rdata["sha256"])
data._mimetype = rdata["mimetype"]
data._size = rdata["size"]
return data
elif res.status_code == 403:
raise InvalidCredentialsException
elif res.status_code == 404:
raise NoDataWithIDError(did)
raise InvalidResponseException(f"Status Code: {res.status_code}")
def get_hash(self, did: DataID) -> Hash:
return self.get(did).hash
def get_mimetype(self, did: DataID) -> Hash:
return self.get(did).mimetype
def get_size(self, did: DataID) -> Hash:
return self.get(did).size
def put_io(self, data: BytesIO) -> DataID:
res = self.session.post(f"{self.base_url}/data/", stream=True, data=data, params={"pool": self.pid})
if res.ok:
return DataID(res.json()["data"]["uuid"])
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
def remove(self, did: DataID) -> bool:
res = self.session.delete(f"{self.base_url}/data/{did}")
if res.ok:
return True
elif res.status_code == 403:
raise InvalidCredentialsException
elif res.status_code == 404:
return False
raise InvalidResponseException(f"Status Code: {res.status_code}")
def available(self) -> int:
res = self.session.get(f"{self.base_url}/pools/{self.pid}")
if res.ok:
return res.json()["data"]["available"]
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
def size(self) -> int:
res = self.session.get(f"{self.base_url}/pools/{self.pid}")
if res.ok:
return res.json()["data"]["used"]
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
def total(self) -> int:
res = self.session.get(f"{self.base_url}/pools/{self.pid}")
if res.ok:
return res.json()["data"]["size"]
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
def destroy(self) -> bool:
if self.is_valid():
if self.level not in [AccessType.USER, AccessType.POOL]:
return False
res = self.session.delete(f"{self.base_url}/pools/{self.pid}")
if res.ok:
return True
elif res.status_code in [403, 404]:
return False
raise InvalidResponseException(f"Status Code: {res.status_code}")
return True
def __iter__(self) -> Iterator[Data]:
res = self.session.get(f"{self.base_url}/data/", params={"pool": self.pid})
if res.ok:
entries = res.json()["data"]
return iter(Data(did=entry["uuid"], storage=self, _hash=entry["sha256"], _mimetype=entry["mimetype"], _size=entry["size"]) for entry in entries)
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")
def __len__(self) -> int:
res = self.session.get(f"{self.base_url}/pools/{self.pid}")
if res.ok:
return res.json()["data"]["count"]
elif res.status_code == 403:
raise InvalidCredentialsException
raise InvalidResponseException(f"Status Code: {res.status_code}")

71
src/pydatastorage/temp.py Normal file
View File

@ -0,0 +1,71 @@
import os
from hashlib import md5, sha256
from io import BytesIO
from pathlib import Path
from typing import Iterator, List, Optional, Type, Union
from types import TracebackType
from tempfile import TemporaryDirectory
from pathlib import Path
from contextlib import AbstractContextManager
from pydantic import Field, PrivateAttr
from . import Data, DataStorage, DataID, DirectoryDataStorage, Hash
class TempDataStorage(DataStorage, AbstractContextManager):
muuid = "2ffb39fb-2fea-4dae-b3e3-1bb610904b29"
_tmp_dir: TemporaryDirectory = PrivateAttr()
_ds: DirectoryDataStorage = PrivateAttr()
prefix: Optional[str]
def __init__(self, prefix: Optional[str] = None):
super().__init__(prefix=prefix)
self._tmp_dir = TemporaryDirectory(prefix=prefix)
self._ds = DirectoryDataStorage(path=self._tmp_dir.name)
def is_valid(self) -> bool:
return Path(self._tmp_dir.name).is_dir() and self._ds.is_valid()
def exists(self, did: DataID) -> bool:
return self._ds.exists(did)
def has_hash(self, h: Hash) -> List[DataID]:
return self._ds.has_hash(h)
def get_file(self, did: DataID) -> Path:
return self._ds.get_file()
def get_io(self, did: DataID) -> BytesIO:
return self._ds.get_io(did)
def get_hash(self, did: DataID) -> Hash:
return self._ds.get_hash(did)
def get_mimetype(self, did: DataID) -> str:
return self._ds.get_mimetype(did)
def get_size(self, did: DataID) -> int:
return self._ds.get_size(did)
def put_io(self, data: BytesIO) -> DataID:
return self._ds.put_io(data)
def remove(self, did: DataID) -> bool:
return self._ds.remove(did)
def destroy(self) -> bool:
if self.is_valid():
self._tmp_dir.cleanup()
return True
def __iter__(self) -> Iterator[Data]:
return iter(self._ds)
def __len__(self) -> int:
return len(self._ds)
def __exit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType]) -> None:
self.destroy()