"""
Our Asynchronously Interchangeable Objects Client
=================================================
this portion is providing a client interface to manage asynchronously interchangeable oai objectz.
each oai object has a user-definable values dictionary and can optionally have a file or folder attached to it.
an instance of the here implemented class :class:`~ae.oaio_client.OaioClient`, represents a client interface
to the oaio server, a django server `available as git repository <https://gitlab.com/ae-group/oaio_server>`_.
"""
import os
import shutil
from ast import literal_eval
from copy import deepcopy
from typing import Optional, Sequence, cast
import requests
from ae.base import ( # type: ignore
defuse, mask_secrets, os_path_isdir, os_path_isfile, os_path_join, os_path_relpath,
read_file, write_bin_file, write_file)
from ae.app_log import ErrorMsgMixin # type: ignore
from ae.system import app_name_guess, os_device_id, os_user_name # type: ignore
from ae.paths import normalize, placeholder_path, Collector # type: ignore
from ae.cloud_storage import CshApiBase, csh_api_class # type: ignore
from ae.oaio_model import ( # type: ignore
CREATE_ACCESS_RIGHT, DELETE_ACTION, FILES_DIR, FILES_VALUES_KEY, MAX_STAMP_DIFF, NO_ACCESS_RIGHT, OBJECTS_DIR,
OLDEST_SYNC_STAMP, READ_ACCESS_RIGHT, REGISTER_ACTION, ROOT_VALUES_KEY, UPLOAD_ACTION,
context_encode, now_stamp, object_dict, object_id, stamp_diff,
OaioAccessRightType, OaioCshIdType, OaioIdType, OaioMapType, OaiObject, OaioStampType, OaioUserIdType,
OaioValuesType)
__version__ = '0.3.21'
LAST_SYNC_STAMP_FILE = 'last_sync_stamp.txt' #: file name to store the last synchronization timestamp
UserzAccessType = list[dict[OaioUserIdType, OaioAccessRightType]] #: :meth:`OaioClient.userz_access` return value
[docs]
class OaioClient(ErrorMsgMixin): # pylint: disable=too-many-instance-attributes
""" interface to manage creations, updates and deletions of oaio objects of a user.
.. note:: after creating an instance, the :meth:`.synchronize` method has to be called at least once.
"""
[docs]
def __init__(self, host: str, credentials: dict[str, str],
app_id: str = "",
device_id: str = "",
csh_default_id: OaioCshIdType = 'Digi',
client_root: str = "{usr}/oaio_root/",
): # pylint: disable=too-many-arguments, too-many-positional-arguments
""" initialize a new client instance to connect to the oaio server.
:param host: oaio server host name/address and optional port number.
:param credentials: oaio server user identification credentials kwargs (dict with the keys
'username' and 'password').
:param app_id: optional id/name of the app from where the client is connecting from.
defaults to :attr:`~ae.core.AppBase.app_name` attribute value (if the :mod:`ae.core`
portion/module got included in the project) or (if not set) to the return value of the
function :func:`~ae.system.app_name_guess`.
:param device_id: id of the client device.
defaults to :data:`~ae.system.os_device_id`.
:param csh_default_id: id for the default cloud storage server to use.
defaults to 'Digi'.
:param client_root: path to the folder on the local device where the oaio info and files get cached.
defaults to the placeholder path "{usr}/oaio_root/".
"""
assert defuse(app_id) == app_id, f"{app_id=} contains invalid characters {set(defuse(app_id)) - set(app_id)=}"
assert defuse(device_id) == device_id, f"invalid chr {set(defuse(device_id)) - set(device_id)=} in {device_id=}"
super().__init__()
self.base_url = 'http' + ('' if host[:9] in ('localhost', '127.0.0.1') else 's') + f'://{host}/api/'
self.credentials = credentials
self.user_name = credentials.get('username') or os_user_name()
self.app_id = app_id or self.main_app and self.main_app.app_name or app_name_guess()
self.device_id = device_id or os_device_id
self.csh_default_id = csh_default_id
self.client_root = normalize(client_root)
self.session = requests.Session()
self.session.headers.update(context_encode(self.user_name, self.device_id, self.app_id))
self.connected = False
self._init_client_folders()
self._last_sync_stamp = ""
self.client_objectz: OaioMapType = {}
[docs]
def __del__(self):
""" automatic logout and close of the http session. """
if self.connected:
self._request('post', 'logout/')
if self.session:
self.session.close()
[docs]
def _client_file_root(self, oaio_id: OaioIdType, values: OaioValuesType) -> str:
""" determine the path on the client/device where the file(s) attached to an oaio get stored.
:param oaio_id: id of the oaio.
:param values: oaio values (client_values if upload else server_values).
:return: path string.
"""
if ROOT_VALUES_KEY in values:
root_folder = normalize(values[ROOT_VALUES_KEY])
else:
root_folder = os_path_join(self.client_root, FILES_DIR, oaio_id)
return root_folder
[docs]
def _client_server_file_api_paths(self, oai_obj: OaiObject) -> tuple[list[str], Optional[CshApiBase], str, str]:
""" get attached files, the cloud storage api id, and the file root paths on client and cloud storage host. """
values = oai_obj.client_values
files = values.get(FILES_VALUES_KEY, [])
csh_id = oai_obj.csh_id
if not files or not csh_id or (csh_api := self._csh_api(csh_id)) is None:
if files:
self.error_message = f"cloud storage {csh_id=} empty/invalid; {oai_obj=}" # pragma: no cover
return [], None, "", ""
oaio_id = oai_obj.oaio_id
client_path = self._client_file_root(oaio_id, values)
server_path = os_path_join(oaio_id, oai_obj.client_stamp)
return files, csh_api, client_path, server_path
[docs]
def _csh_api(self, csh_id: OaioCshIdType) -> Optional[CshApiBase]:
""" determine api and credentials of a cloud storage host specified by its id from the web server db/config. """
res = self._request('get', f'csh_args/{csh_id}')
if self._res_err(res, f"oaio server error in {csh_id=}-args fetch"): # pragma: no cover
return None
csh_kwargs = res.json() # type: ignore # self._res_err ensures that res is not None
api_class = csh_api_class(csh_id)
return api_class(**csh_kwargs)
[docs]
def _delete_client_object(self, oaio_id: OaioIdType) -> bool:
""" delete an object locally on the client from the file system and client_objectz dict.
:param oaio_id: id of the oaio.
:return: True if oaio got deleted successfully locally on the client, else False.
"""
path = os_path_join(self.client_root, OBJECTS_DIR, oaio_id)
oai_obj = self.client_objectz.pop(oaio_id, None)
if oai_obj and os_path_isfile(path):
os.remove(path)
return True
self.error_message = f"delete client object integrity error: {path=} {oaio_id=} {oai_obj=}"
return False
[docs]
def _delete_server_object(self, oai_obj: OaiObject) -> bool:
""" delete oaio object from the oaio server.
:param oai_obj: :class:`~ae.oaio_model.OaiObject` dataclass instance of the object to delete.
(the oaio server view ObjectzDataView only uses the oaio_id field).
:return: True if the specified object got deleted from the server,
else False (check self.error_message for details).
"""
# noinspection PyTypeChecker
res = self._request('post', f'{DELETE_ACTION}/', json=object_dict(oai_obj)) # PyCharm inspection Bug
return self._res_err(res, f"object delete/unregister to server failed; {oai_obj=}") == ""
[docs]
def _download_object(self, oai_obj: OaiObject) -> bool:
""" save oaio info and download optional attached files to the local device/cache.
:param oai_obj: :class:`~ae.oaio_model.OaiObject` dataclass instance.
:return: True if object info and files got downloaded without errors, else False.
"""
files, csh_api, client_path, server_path = self._client_server_file_api_paths(oai_obj)
if files and csh_api is None: # pragma: no cover
return False
for file_path in files:
assert csh_api is not None # for mypy
content = csh_api.deployed_file_content(os_path_join(server_path, file_path))
if content is None: # pragma: no cover
self.error_message = f"cloud storage download error: {csh_api.error_message=} {oai_obj=} {file_path=}"
return False
write_bin_file(os_path_join(client_path, file_path), content, make_dirs=True)
oai_obj.server_stamp = oai_obj.client_stamp
oai_obj.server_values = deepcopy(oai_obj.client_values)
self._save_client_object_info(oai_obj)
return True
[docs]
def _folder_files(self, folder_path: str) -> Sequence[str]:
""" collect all files under the specified root folder.
:param folder_path: root folder to collect files from (can contain path placeholders).
:return: list of file names (relative to the root/folder_path and without any path placeholders).
"""
coll = Collector(main_app_name=self.app_id)
coll.collect(folder_path, append=("**/*", "**/.*"))
return [os_path_relpath(_, folder_path) for _ in coll.files]
[docs]
def _init_client_folders(self):
""" called on the first start of the client to create the folders and files under :attr:`.client_root`. """
file_path = os_path_join(self.client_root, OBJECTS_DIR)
if not os_path_isdir(file_path):
os.makedirs(file_path)
file_path = os_path_join(self.client_root, FILES_DIR)
if not os_path_isdir(file_path):
os.makedirs(file_path)
file_path = os_path_join(self.client_root, LAST_SYNC_STAMP_FILE) # add with old self.last_sync_stamp
if not os_path_isfile(file_path): # to ensure sync on the first app start
write_file(file_path, OLDEST_SYNC_STAMP)
[docs]
def _load_client_object_info(self, oaio_id: OaioIdType, info_path: str = "") -> OaiObject:
""" load actual client/local oaio info of the oaio specified by its id. """
obj_lit = read_file(os_path_join(info_path or os_path_join(self.client_root, OBJECTS_DIR), oaio_id))
# noinspection PyTypeChecker
obj_dict = literal_eval(obj_lit)
return OaiObject(**obj_dict)
[docs]
def _load_client_objectz(self) -> OaioMapType:
""" load actual client/local oaio infos of all local oai objects.
:return: mapping/dict with objectz stored on the local device.
"""
objectz = {}
info_path = os_path_join(self.client_root, OBJECTS_DIR)
for oaio_id in os.listdir(info_path):
oai_obj = self._load_client_object_info(oaio_id, info_path=info_path)
objectz[oaio_id] = oai_obj
return objectz
[docs]
def _load_server_objectz(self) -> OaioMapType:
""" get from server all newly added and changed objectz of all the subscriptions of the current user.
:return: mapping/dict with objectz subscribed by the user from oaio server.
if the client is offline an empty mapping object.
"""
objectz = {}
res = self._request('get', 'oaio_stampz/')
if res and res.ok:
for oaio_dict in res.json():
oai_obj = OaiObject(**oaio_dict)
objectz[oai_obj.oaio_id] = oai_obj
return objectz
[docs]
def _reconnect_check(self) -> bool:
""" check if the server got already connected and (re)connect if not.
:return: a boolean True value if still connected or successfully reconnected
or False if offline, connection cannot be established, CSRF token not available or on
any other server failure/error; in these cases check the message in the property
:attr:`~OaioClient.error_message` (inherited from :class:`~ae.app_log.ErrorMsgMixin`).
"""
if self.connected:
res = self._request('get', 'current_stamp/', _in_chk=True)
if res and res.ok:
client_stamp = now_stamp()
server_stamp = res.json().get('current_stamp')
if abs(stamp_diff(client_stamp, server_stamp)) > MAX_STAMP_DIFF: # pragma: no cover
self.error_message = f"clocks are out of sync; {client_stamp=} {server_stamp=}"
self.connected = False
return False
return True
self.session.headers.pop('X-CSRFToken') # pragma: no cover
self.connected = False # pragma: no cover
if not self.connected:
res = self._request('post', 'login/', _in_chk=True, json=self.credentials)
if not self._res_err(res, f"user '{self.user_name}' authentication error"):
self.connected = True
return self.connected
[docs]
def _request(self, method: str, slug: str, _in_chk: bool = False, **request_kwargs
) -> Optional[requests.Response]:
""" oaio server request checking CSRF for POST and automatic reconnect.
:param method: http request method (either 'get' or 'post').
:param slug: request slug added to base URL.
:param _in_chk: preventing endless recursion. only True for internal calls from
:meth:`~OaioClient._reconnect_check`, else False.
:param request_kwargs: extra kwargs passed onto the http request :paramref:`~_request.method` call.
:return: http response, or None if connection cannot be established (offline, authentication, ..)
or if any other error occurred (check self.error_message for details).
"""
res = None
url = self.base_url + slug
try:
if not _in_chk and not self._reconnect_check():
return None
hdr_token = self.session.headers.get('X-CSRFToken') # only DELETE|PATCH|POST|PUT|... need a CSRF token
ses_token = self.session.cookies.get('csrftoken')
if method != 'get' and (not hdr_token or hdr_token != ses_token):
if not hdr_token: # works without this if (same: if hdr_token == ses_token), but with 10% more traffic?
res = self.session.get(url) # fetch new/changed CSRF token from oaio server
ses_token = self.session.cookies.get('csrftoken')
if err_msg := self._res_err(res, f"CSRF fetch error in {method=} with {url=}",
failure="" if ses_token else ": empty token", add_err=False):
raise Exception(err_msg) # pylint: disable=broad-exception-raised # pragma: no cover
request_kwargs.setdefault('cookies', {'csrftoken': ses_token})
self.session.headers['X-CSRFToken'] = cast(str, ses_token) # w/o cast in types-requests 2.31.0.20240311
self.session.headers['Referer'] = url # needed only for https requests
met = getattr(self.session, method)
res = met(url, **request_kwargs)
assert res, f"session {method=} call with {request_kwargs=} returned unexpected empty response {res=}"
res.raise_for_status()
self.error_message = ""
return res
# pylint: disable-next=broad-exception-caught # pragma: no cover
except (requests.HTTPError, requests.ConnectionError, AssertionError, Exception) as exception:
# noinspection PyStringConversionWithoutDunderMethod
self._res_err(res, f"{method=} {exception=} {url=} {_in_chk=} {mask_secrets(request_kwargs)=}")
return None
[docs]
def _res_err(self, res: Optional[requests.Response], msg: str, failure: str = "", add_err: bool = True) -> str:
""" web request response error checker, amending error message with status-code. """
if res and res.ok and not failure:
return ""
msg += f"{failure};"
if res:
msg += f" {getattr(res, 'status_code', '?')=}"
try:
msg += f" {res.json()=}"
except (AttributeError, Exception): # pylint: disable=broad-exception-caught # pragma: no cover
msg += f" {getattr(res, 'content', '¿')=}"
if add_err:
self.error_message = msg
return msg
[docs]
def _save_client_object_info(self, oai_obj: OaiObject):
""" save oaio to local oaio info. """
oaio_id = oai_obj.oaio_id
assert oaio_id and oai_obj.client_stamp, f"cannot save oaio ({oai_obj}) locally with empty id or client_stamp"
self.client_objectz[oaio_id] = oai_obj
write_file(os_path_join(self.client_root, OBJECTS_DIR, oaio_id), repr(object_dict(oai_obj)))
[docs]
def _upload_object(self, oai_obj: OaiObject) -> bool:
""" save the specified local/client object and try to send changes to storage and oaio/web servers.
:param oai_obj: :class:`~ae.oaio_model.OaiObject` dataclass instance to update/upload/register.
:return: True if register/upload went well or False on failure or if servers are offline (for
details check the error_message attribute of this instance).
"""
files, csh_api, client_path, server_path = self._client_server_file_api_paths(oai_obj)
if files and csh_api is None:
return False # pragma: no cover
for file_path in files:
assert csh_api is not None # for mypy
if not csh_api.deploy_file(os_path_join(server_path, file_path),
source_path=os_path_join(client_path, file_path)): # pragma: no cover
self.error_message = f"'{file_path}' cloud storage upload error: {csh_api.error_message=} {oai_obj=}"
return False
action = UPLOAD_ACTION if oai_obj.server_stamp else REGISTER_ACTION
res = self._request('post', f'{action}/', json=object_dict(oai_obj))
if self._res_err(res, f"postponed {action} onto server of {oai_obj}"):
return False # pragma: no cover
srv_dict = res.json() # type: ignore # self._res_err ensures that res is not None
assert srv_dict.get('oaio_id') == oai_obj.oaio_id, f"{srv_dict.get('oaio_id')=} not matching {oai_obj.oaio_id=}"
oai_obj.server_stamp = oai_obj.client_stamp
oai_obj.server_values = deepcopy(oai_obj.client_values)
self._save_client_object_info(oai_obj)
return True
# public api of this client instance ##########################################################
@property
def last_sync_stamp(self) -> OaioStampType:
""" timestamp of last synchronization with the oaio/storage servers.
:getter: timestamp of the last server synchronization.
:setter: any assigned error message will be accumulated to recent error messages.
pass an empty string to reset the error message.
"""
if not self._last_sync_stamp:
self._last_sync_stamp = read_file(os_path_join(self.client_root, LAST_SYNC_STAMP_FILE))
# noinspection PyTypeChecker
return self._last_sync_stamp
@last_sync_stamp.setter
def last_sync_stamp(self, stamp: OaioStampType):
write_file(os_path_join(self.client_root, LAST_SYNC_STAMP_FILE), stamp)
self._last_sync_stamp = stamp
[docs]
def register_file(self, file_name: str, # pylint: disable=too-many-arguments, too-many-positional-arguments
file_content: Optional[bytes] = None, root_path: str = "", stamp: OaioStampType = "",
csh_id: OaioCshIdType = "") -> Optional[OaiObject]:
""" register a new oaio file object.
:param file_name: name of the new file object to register.
:param file_content: file content if the file does not exist on any file system, pass as bytes to create it.
:param root_path: root path on local-machine of the new file object to register.
using :attr:`.client_root` if not specified.
:param stamp: optional timestamp (created by :func:`~ae.oaio_model.now_stamp` if not specified).
:param csh_id: cloud storage server id or empty string to use the default cloud storage server.
:return: new :class:`~ae.oaio_model.OaiObject` instance
or None if either stamp or oaio_id are already registered (check self.error_message).
:raises AssertionError: if the file to register does not exist.
"""
stamp = stamp or now_stamp()
values: OaioValuesType = {FILES_VALUES_KEY: [file_name]}
if root_path:
values[ROOT_VALUES_KEY] = placeholder_path(root_path)
oaio_id = ''
file_path = normalize(os_path_join(root_path, file_name))
else:
oaio_id = object_id(user_name=self.user_name, device_id=self.device_id, app_id=self.app_id,
stamp=stamp, values=values)
root_path = self._client_file_root(oaio_id, values)
file_path = os_path_join(root_path, file_name)
if file_content is None:
assert os_path_isfile(file_path), f"file to register not found: {file_path}"
else:
write_bin_file(file_path, file_content, make_dirs=True)
return self.register_object(values, stamp=stamp, oaio_id=oaio_id, csh_id=csh_id)
[docs]
def register_folder(self, root_path: str, stamp: OaioStampType = "",
csh_id: OaioCshIdType = "") -> Optional[OaiObject]:
""" register a new oaio folder object.
:param root_path: root path on the local-machine/client of the new folder object to register.
:param stamp: optional timestamp (created by :func:`~ae.oaio_model.now_stamp` if not specified).
:param csh_id: cloud storage server id or empty string to use the default cloud storage server.
:return: new :class:`~ae.oaio_model.OaiObject` instance
or None if either stamp or oaio_id are already registered (check self.error_message).
:raises AssertionError: if files to register do not exist.
"""
root_path = normalize(root_path)
values = {ROOT_VALUES_KEY: placeholder_path(root_path),
FILES_VALUES_KEY: self._folder_files(root_path)}
return self.register_object(values, stamp=stamp, csh_id=csh_id)
[docs]
def register_object(self, values: OaioValuesType, stamp: OaioStampType = '',
oaio_id: OaioIdType = '', csh_id: OaioCshIdType = '') -> Optional[OaiObject]:
""" register a new oaio data object.
:param values: values data to register as a new oaio object.
:param stamp: optional timestamp (created by :func:`~ae.oaio_model.now_stamp` if not specified).
:param oaio_id: object id (will be created if not passed).
:param csh_id: cloud storage server id. if empty/unspecified, use the default cloud storage server.
:return: new :class:`~ae.oaio_model.OaiObject` instance.
or None if either stamp or oaio_id are already registered (check self.error_message).
:raises AssertionError: on invalid argument values/types or if the oaio_id got already registered.
"""
assert isinstance(values, dict), f"register_object(): values arg must be a dict; {values=}" # OaioValuesType
stamp = stamp or now_stamp()
assert stamp > OLDEST_SYNC_STAMP, f"register_object(): too old {stamp=} specified; <= {OLDEST_SYNC_STAMP=}"
oaio_id = oaio_id or object_id(user_name=self.user_name, device_id=self.device_id, app_id=self.app_id,
stamp=stamp, values=values)
assert oaio_id not in self.client_objectz, f"register_object(): {oaio_id=} got already registered"
oai_obj = OaiObject(
oaio_id=oaio_id,
client_stamp=stamp, # could be changed by server on upload if conflicts with another stamp
client_values=values,
csh_id=csh_id or self.csh_default_id,
username=self.user_name,
device_id=self.device_id,
app_id=self.app_id,
csh_access_right=CREATE_ACCESS_RIGHT, # registering owner always has all access rights
)
self._save_client_object_info(oai_obj) # store new obj in local OBJECTS_DIR
if not self.synchronize_with_server_if_online():
return None # pragma: no cover
return oai_obj
[docs]
def synchronize_with_server_if_online(self) -> bool:
""" synchronize local changes to server and any update/changes done on other clients from server to this client.
.. hint:: if not connected to the oaio server, then this method tries first to (re-)connect.
:return: False if the client is offline or on sync error, else True.
"""
if not self._reconnect_check():
return False
soz = self._load_server_objectz()
self.client_objectz = coz = self._load_client_objectz()
error = False
for cob in sorted(coz.values(), key=lambda _: _.client_stamp):
if cob.client_stamp > cob.server_stamp: # or cob.server_stamp == ''
error = not self._upload_object(cob) or error # _upload_object() extends self.error_message on error
newer = [_ for _ in soz.values() if coz.get(_.oaio_id, OaiObject(_.oaio_id)).client_stamp < _.client_stamp]
for sob in sorted(newer, key=lambda _: _.client_stamp):
error = not self._download_object(sob) or error
if not error:
self.last_sync_stamp = now_stamp()
return not error
[docs]
def unregister_object(self, oaio_id: OaioIdType, wipe_files: bool = False) -> str:
""" unregister/delete oai object.
:param oaio_id: id of the oaio to unregister.
:param wipe_files: pass True to also remove/wipe all attached file(s) on the local machine.
:return: empty string if unregistering was successful, else error message on failure.
"""
oai_obj = self.client_objectz.get(oaio_id)
if oai_obj is None:
self.error_message = f"client object to delete/unregister with id '{oaio_id}' not found"
return self.error_message
registered = oai_obj.server_stamp # if registered on server
if registered and not self._delete_server_object(oai_obj): # check and return errors on server object deletion
return self.error_message
if self._delete_client_object(oaio_id):
if wipe_files:
files, csh_api, client_path, server_path = self._client_server_file_api_paths(oai_obj)
if files:
if registered:
err_msg = csh_api.delete_file_or_folder(server_path) # type: ignore # csh_api is not None
if err_msg:
self.error_message = err_msg
return self.error_message
if os_path_isdir(client_path):
shutil.rmtree(client_path)
self.synchronize_with_server_if_online()
return self.error_message
[docs]
def unsubscribe(self, oaio_id: OaioIdType, user_name: OaioUserIdType) -> str:
""" remove the subscription of an oai object for a user.
:param oaio_id: id of the oaio to unsubscribe.
:param user_name: name of the subscribed user.
:return: empty string if subscription could be removed without error, else error message.
"""
data = {'oaio_id': oaio_id, 'username': user_name, 'access_right': NO_ACCESS_RIGHT}
res = self._request('post', 'subscribe/', json=data)
if not self._res_err(res, f"unsubscribe of {user_name=} and {oaio_id=} failed"):
if user_name == self.user_name:
self._delete_client_object(oaio_id)
return self.error_message
[docs]
def update_file(self, oaio_id: OaioIdType, # pylint: disable=too-many-arguments, too-many-positional-arguments
file_name: str = "", file_content: Optional[bytes] = None, root_path: str = "",
stamp: OaioStampType = "") -> OaiObject:
""" update oai file object locally.
:param oaio_id: id of the oai file object to update.
:param file_name: name (optionally with the file path) of the attached file.
:param file_content: file content if the file name does not exist, pass as bytes to create the file.
:param root_path: root path on local-machine of the new file object to update.
using :attr:`.client_root` if not specified.
:param stamp: optional timestamp (using :func:`~ae.oaio_model.now_stamp` if not specified).
:return: the updated :class:`~ae.oaio_model.OaiObject` dataclass instance.
:raises AssertionError: if the file to update does not exist.
"""
values = deepcopy(self.client_objectz[oaio_id].client_values)
if file_name:
values[FILES_VALUES_KEY] = [file_name]
else:
file_name = values[FILES_VALUES_KEY][0]
if root_path:
values[ROOT_VALUES_KEY] = placeholder_path(root_path)
elif ROOT_VALUES_KEY in values:
root_path = values[ROOT_VALUES_KEY]
else:
root_path = self._client_file_root(oaio_id, values)
file_path = normalize(os_path_join(root_path, file_name))
if file_content is None:
assert os_path_isfile(file_path), f"file to update does not exist in {file_path=}"
else:
write_bin_file(file_path, file_content, make_dirs=True)
return self.update_object(oaio_id, values, stamp=stamp)
[docs]
def update_folder(self, oaio_id: OaioIdType, root_path: str = "", stamp: OaioStampType = "") -> OaiObject:
""" update oai folder object locally.
:param oaio_id: id of the oai folder object to update.
:param root_path: new root path on local-machine of the new folder object to update.
using the current root path if not specified.
:param stamp: optional timestamp (using :func:`~ae.oaio_model.now_stamp` if not specified).
:return: the updated :class:`~ae.oaio_model.OaiObject` dataclass instance.
:raises AssertionError: if the same file is in add_files and in removed_files,
or if one of the files to update does not exist.
"""
values = deepcopy(self.client_objectz[oaio_id].client_values)
if root_path:
values[ROOT_VALUES_KEY] = placeholder_path(root_path)
client_root = self._client_file_root(oaio_id, values)
values[FILES_VALUES_KEY] = self._folder_files(client_root)
return self.update_object(oaio_id, values, stamp=stamp)
[docs]
def update_object(self, oaio_id: OaioIdType, values: OaioValuesType, stamp: OaioStampType = "", reset: bool = False
) -> OaiObject:
""" update oaio data object locally.
:param oaio_id: id of a registered oaio to update.
:param values: values to update within the oaio.
:param stamp: optional timestamp (using :func:`~ae.oaio_model.now_stamp` if not specified).
:param reset: pass True to reset the values before they get updated with the values
specified in :paramref:`~update_object.values`.
:return: the updated :class:`~ae.oaio_model.OaiObject` dataclass instance.
:raises AssertionError: on invalid argument values/types or if the oaio_id did not get registered.
"""
assert oaio_id in self.client_objectz, f"update_object(): oaio object with id '{oaio_id}' not registered"
oai_obj = self.client_objectz[oaio_id]
assert oai_obj.client_stamp, f"update_object(): oai object {oaio_id=} has empty client_stamp; {oai_obj=}"
if stamp:
assert stamp > OLDEST_SYNC_STAMP, f"update_object(): too old stamp {stamp} specified; <={OLDEST_SYNC_STAMP}"
else: # pragma: no cover
stamp = now_stamp()
assert stamp > oai_obj.client_stamp, f"update_object(): got too old stamp {stamp}; > {oai_obj.client_stamp}"
oai_obj.client_stamp = stamp
if reset:
oai_obj.client_values.clear()
oai_obj.client_values.update(values)
self._save_client_object_info(oai_obj)
self.synchronize_with_server_if_online()
return oai_obj
[docs]
def upsert_subscription(self, oaio_id: OaioIdType, user_name: OaioUserIdType,
access_right: OaioAccessRightType = READ_ACCESS_RIGHT) -> int:
""" add/delete the subscriber of an oai object or update the access right of an existing user subscription.
:param oaio_id: id of the oaio to subscribe to or to unsubscribe from.
:param user_name: name of the un-/subscribing user.
:param access_right: either :data:`~ae.oaio_model.NO_ACCESS_RIGHT` to unsubscribe user
or a user access right (one of :data:`~ae.oaio_model.ACCESS_RIGHTS`) to subscribe
the oaio specified by :paramref:`~upsert_subscription.oaio_id`
to a user specified by :paramref:`~upsert_subscription.user_name`.
e.g., to grant update rights, pass the value :data:`~ae.oaio_model.UPDATE_ACCESS_RIGHT`.
if not specified then the value :data:`~ae.oaio_model.READ_ACCESS_RIGHT` will be used.
:return: the primary key integer value (Pubz.Pid) of the added/updated Pubz subscription record
or a zero integer (0) if an error occurred.
"""
data = {'oaio_id': oaio_id, 'username': user_name, 'access_right': access_right}
res = self._request('post', 'subscribe/', json=data) # json-kwarg implies content-type 'application/json' hdr
if self._res_err(res, f"subscription of {user_name=} for {oaio_id=} failed"):
return 0
return res.json().get('Pid', 0) # type: ignore # self._res_err ensures that res is not None
[docs]
def userz_access(self, oaio_id: OaioIdType) -> UserzAccessType:
""" determine a list of all the registered users and their access right to the specified oaio.
:param oaio_id: id of the oaio determine users and access rights for
or an empty or unregistered oaio id if the oaio is not yet registered.
:return: list of dicts with the keys 'username' and 'access_right', or empty list on error. the
returned list is sorted by the access_right (create, delete, update, read) and username.
if an empty/unregistered oaio id got specified, all registered userz will be returned
with the :data:`~ae.oaio_model.NO_ACCESS_RIGHT` access right.
"""
res = self._request('get', f'user_subz/{oaio_id or "_not__existing__oaio_id_"}')
warn_msg = self._res_err(res, f"Warning: OaioClient.userz_access() error; {oaio_id=}", add_err=False)
if warn_msg:
self.vpo(warn_msg)
return []
assert res, "for mypy - not seeing that self._res_err is checking res and returning empty string if not None"
return sorted(res.json(), key=lambda _: _['access_right'] + _['username'])
[docs]
def wipe_user(self, user_name: OaioUserIdType) -> str:
""" wipe the specified user including all its objects, subscriptions and log entries.
:param user_name: name of the subscribing user to be wiped from the database.
:return: empty string if the user (including its subscription and log entries) could be removed,
else the error message.
"""
res = self._request('post', 'wipe_user/', json={'Uid': user_name})
self._res_err(res, f"wipe of user '{user_name}' failed")
return self.error_message