diff --git a/scripts/hydrus_api/__init__.py b/scripts/hydrus_api/__init__.py
new file mode 100644
index 0000000..63cdf02
--- /dev/null
+++ b/scripts/hydrus_api/__init__.py
@@ -0,0 +1,766 @@
+# Copyright (C) 2021 cryzed
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+import enum
+import json
+import os
+import typing as T
+from collections import abc
+import requests
+__version__ = "4.0.0"
+class HydrusAPIException(Exception):
+ pass
+class ConnectionError(HydrusAPIException, requests.ConnectTimeout):
+ pass
+class APIError(HydrusAPIException):
+ def __init__(self, response: requests.Response):
+ super().__init__(response.text)
+ self.response = response
+class MissingParameter(APIError):
+ pass
+class InsufficientAccess(APIError):
+ pass
+class DatabaseLocked(APIError):
+ pass
+class ServerError(APIError):
+ pass
+# Customize IntEnum, so we can just do str(Enum.member) to get the string representation of its value unmodified,
+# without users having to access .value explicitly
+class StringableIntEnum(enum.IntEnum):
+ def __str__(self):
+ return str(self.value)
+class Permission(StringableIntEnum):
+ ADD_TAGS = 2
+class URLType(StringableIntEnum):
+ POST_URL = 0
+ FILE_URL = 2
+class ImportStatus(StringableIntEnum):
+ EXISTS = 2
+ FAILED = 4
+ VETOED = 7
+class TagAction(StringableIntEnum):
+ ADD = 0
+ DELETE = 1
+ PEND = 2
+class TagStatus(StringableIntEnum):
+class PageType(StringableIntEnum):
+class FileSortType(StringableIntEnum):
+ RANDOM = 4
+ WIDTH = 5
+ HEIGHT = 6
+ RATIO = 7
+ HAS_AUDIO = 13
+class BinaryFileLike(T.Protocol):
+ def read(self):
+ ...
+# The client should accept all objects that either support the iterable or mapping protocol. We must ensure that objects
+# are either lists or dicts, so Python's json module can handle them
+class JSONEncoder(json.JSONEncoder):
+ def default(self, object_: T.Any):
+ if isinstance(object_, abc.Mapping):
+ return dict(object_)
+ if isinstance(object_, abc.Iterable):
+ return list(object_)
+ return super().default(object_)
+class Client:
+ VERSION = 31
+ # Access Management
+ _GET_API_VERSION_PATH = "/api_version"
+ _REQUEST_NEW_PERMISSIONS_PATH = "/request_new_permissions"
+ _GET_SESSION_KEY_PATH = "/session_key"
+ _VERIFY_ACCESS_KEY_PATH = "/verify_access_key"
+ _GET_SERVICES_PATH = "/get_services"
+ # Adding Files
+ _ADD_FILE_PATH = "/add_files/add_file"
+ _DELETE_FILES_PATH = "/add_files/delete_files"
+ _UNDELETE_FILES_PATH = "/add_files/undelete_files"
+ _ARCHIVE_FILES_PATH = "/add_files/archive_files"
+ _UNARCHIVE_FILES_PATH = "/add_files/unarchive_files"
+ # Adding Tags
+ _CLEAN_TAGS_PATH = "/add_tags/clean_tags"
+ _SEARCH_TAGS_PATH = "/add_tags/search_tags"
+ _ADD_TAGS_PATH = "/add_tags/add_tags"
+ # Adding URLs
+ _GET_URL_FILES_PATH = "/add_urls/get_url_files"
+ _GET_URL_INFO_PATH = "/add_urls/get_url_info"
+ _ADD_URL_PATH = "/add_urls/add_url"
+ _ASSOCIATE_URL_PATH = "/add_urls/associate_url"
+ # Adding Notes
+ _SET_NOTES_PATH = "/add_notes/set_notes"
+ _DELETE_NOTES_PATH = "/add_notes/delete_notes"
+ # Managing Cookies and HTTP Headers
+ _GET_COOKIES_PATH = "/manage_cookies/get_cookies"
+ _SET_COOKIES_PATH = "/manage_cookies/set_cookies"
+ _SET_USER_AGENT_PATH = "/manage_headers/set_user_agent"
+ # Managing Pages
+ _GET_PAGES_PATH = "/manage_pages/get_pages"
+ _GET_PAGE_INFO_PATH = "/manage_pages/get_page_info"
+ _ADD_FILES_TO_PAGE_PATH = "/manage_pages/add_files"
+ _FOCUS_PAGE_PATH = "/manage_pages/focus_page"
+ # Searching and Fetching Files
+ _SEARCH_FILES_PATH = "/get_files/search_files"
+ _GET_FILE_METADATA_PATH = "/get_files/file_metadata"
+ _GET_FILE_PATH = "/get_files/file"
+ _GET_THUMBNAIL_PATH = "/get_files/thumbnail"
+ # Managing the Database
+ _LOCK_DATABASE_PATH = "/manage_database/lock_on"
+ _UNLOCK_DATABASE_PATH = "/manage_database/lock_off"
+ _MR_BONES_PATH = "/manage_database/mr_bones"
+ def __init__(
+ self,
+ access_key = None,
+ api_url: str = DEFAULT_API_URL,
+ session = None,
+ ):
+ """
+ See https://hydrusnetwork.github.io/hydrus/help/client_api.html for documentation.
+ """
+ self.access_key = access_key
+ self.api_url = api_url.rstrip("/")
+ self.session = session or requests.Session()
+ def _api_request(self, method: str, path: str, **kwargs: T.Any):
+ if self.access_key is not None:
+ kwargs.setdefault("headers", {}).update({"Hydrus-Client-API-Access-Key": self.access_key})
+ # Make sure we use our custom JSONEncoder that can serialize all objects that implement the iterable or mapping
+ # protocol
+ json_data = kwargs.pop("json", None)
+ if json_data is not None:
+ kwargs["data"] = json.dumps(json_data, cls=JSONEncoder)
+ # Since we aren't using the json keyword-argument, we have to set the Content-Type manually
+ kwargs["headers"]["Content-Type"] = "application/json"
+ try:
+ response = self.session.request(method, self.api_url + path, **kwargs)
+ except requests.RequestException as error:
+ # Re-raise connection and timeout errors as hydrus.ConnectionErrors so these are more easy to handle for
+ # client applications
+ raise ConnectionError(*error.args)
+ try:
+ response.raise_for_status()
+ except requests.HTTPError:
+ if response.status_code == requests.codes.bad_request:
+ raise MissingParameter(response)
+ elif response.status_code in {
+ requests.codes.unauthorized,
+ requests.codes.forbidden,
+ }:
+ raise InsufficientAccess(response)
+ elif response.status_code == requests.codes.service_unavailable:
+ raise DatabaseLocked(response)
+ elif response.status_code == requests.codes.server_error:
+ raise ServerError(response)
+ raise APIError(response)
+ return response
+ def get_api_version(self):
+ response = self._api_request("GET", self._GET_API_VERSION_PATH)
+ return response.json()
+ def request_new_permissions(self, name, permissions):
+ response = self._api_request(
+ "GET",
+ params={"name": name, "basic_permissions": json.dumps(permissions, cls=JSONEncoder)},
+ )
+ return response.json()["access_key"]
+ def get_session_key(self):
+ response = self._api_request("GET", self._GET_SESSION_KEY_PATH)
+ return response.json()["session_key"]
+ def verify_access_key(self):
+ response = self._api_request("GET", self._VERIFY_ACCESS_KEY_PATH)
+ return response.json()
+ def get_services(self):
+ response = self._api_request("GET", self._GET_SERVICES_PATH)
+ return response.json()
+ def add_file(self, path_or_file: T.Union[str, os.PathLike, BinaryFileLike]):
+ if isinstance(path_or_file, (str, os.PathLike)):
+ response = self._api_request("POST", self._ADD_FILE_PATH, json={"path": os.fspath(path_or_file)})
+ else:
+ response = self._api_request(
+ "POST",
+ self._ADD_FILE_PATH,
+ data=path_or_file.read(),
+ headers={"Content-Type": "application/octet-stream"},
+ )
+ return response.json()
+ def delete_files(
+ self,
+ hashes = None,
+ file_ids = None,
+ file_service_name = None,
+ file_service_key = None,
+ reason = None
+ ):
+ if hashes is None and file_ids is None:
+ raise ValueError("At least one of hashes, file_ids is required")
+ if file_service_name is not None and file_service_key is not None:
+ raise ValueError("Exactly one of file_service_name, file_service_key is required")
+ payload: dict[str, T.Any] = {}
+ if hashes is not None:
+ payload["hashes"] = hashes
+ if file_ids is not None:
+ payload["file_ids"] = file_ids
+ if file_service_name is not None:
+ payload["file_service_name"] = file_service_name
+ if file_service_key is not None:
+ payload["file_service_key"] = file_service_key
+ if reason is not None:
+ payload["reason"] = reason
+ self._api_request("POST", self._DELETE_FILES_PATH, json=payload)
+ def undelete_files(
+ self,
+ hashes = None,
+ file_ids = None,
+ file_service_name = None,
+ file_service_key = None,
+ ):
+ if hashes is None and file_ids is None:
+ raise ValueError("At least one of hashes, file_ids is required")
+ if file_service_name is not None and file_service_key is not None:
+ raise ValueError("Exactly one of file_service_name, file_service_key is required")
+ payload: dict[str, T.Any] = {}
+ if hashes is not None:
+ payload["hashes"] = hashes
+ if file_ids is not None:
+ payload["file_ids"] = file_ids
+ if file_service_name is not None:
+ payload["file_service_name"] = file_service_name
+ if file_service_key is not None:
+ payload["file_service_key"] = file_service_key
+ self._api_request("POST", self._UNDELETE_FILES_PATH, json=payload)
+ def archive_files(
+ self,
+ hashes = None,
+ file_ids = None
+ ):
+ if hashes is None and file_ids is None:
+ raise ValueError("At least one of hashes, file_ids is required")
+ payload: dict[str, T.Any] = {}
+ if hashes is not None:
+ payload["hashes"] = hashes
+ if file_ids is not None:
+ payload["file_ids"] = file_ids
+ self._api_request("POST", self._ARCHIVE_FILES_PATH, json=payload)
+ def unarchive_files(
+ self,
+ hashes = None,
+ file_ids = None
+ ):
+ if hashes is None and file_ids is None:
+ raise ValueError("At least one of hashes, file_ids is required")
+ payload: dict[str, T.Any] = {}
+ if hashes is not None:
+ payload["hashes"] = hashes
+ if file_ids is not None:
+ payload["file_ids"] = file_ids
+ self._api_request("POST", self._UNARCHIVE_FILES_PATH, json=payload)
+ def clean_tags(self, tags ):
+ response = self._api_request("GET", self._CLEAN_TAGS_PATH, params={"tags": json.dumps(tags, cls=JSONEncoder)})
+ return response.json()["tags"]
+ def search_tags(
+ self,
+ search: str,
+ tag_service_key = None,
+ tag_service_name = None
+ ):
+ if tag_service_name is not None and tag_service_key is not None:
+ raise ValueError("Exactly one of tag_service_name, tag_service_key is required")
+ payload: dict[str, T.Any] = {"search": search}
+ if tag_service_key is not None:
+ payload["tag_service_key"] = tag_service_key
+ if tag_service_name is not None:
+ payload["tag_service_name"] = tag_service_name
+ response = self._api_request("GET", self._SEARCH_TAGS_PATH, params=payload)
+ return response.json()["tags"]
+ def add_tags(
+ self,
+ hashes = None,
+ file_ids = None,
+ service_names_to_tags = None,
+ service_keys_to_tags = None,
+ service_names_to_actions_to_tags = None,
+ service_keys_to_actions_to_tags = None,
+ ):
+ if hashes is None and file_ids is None:
+ raise ValueError("At least one of hashes, file_ids is required")
+ if (
+ service_names_to_tags is None
+ and service_keys_to_tags is None
+ and service_names_to_actions_to_tags is None
+ and service_keys_to_actions_to_tags is None
+ ):
+ raise ValueError(
+ "At least one of service_names_to_tags, service_keys_to_tags, service_names_to_actions_to_tags or "
+ "service_keys_to_actions_to_tags is required"
+ )
+ payload: dict[str, T.Any] = {}
+ if hashes is not None:
+ payload["hashes"] = hashes
+ if file_ids is not None:
+ payload["file_ids"] = file_ids
+ if service_names_to_tags is not None:
+ payload["service_names_to_tags"] = service_names_to_tags
+ if service_keys_to_tags is not None:
+ payload["service_keys_to_tags"] = service_keys_to_tags
+ if service_names_to_actions_to_tags is not None:
+ payload["service_names_to_actions_to_tags"] = service_names_to_actions_to_tags
+ if service_keys_to_actions_to_tags is not None:
+ payload["service_keys_to_actions_to_tags"] = service_keys_to_actions_to_tags
+ self._api_request("POST", self._ADD_TAGS_PATH, json=payload)
+ def get_url_files(self, url: str):
+ response = self._api_request("GET", self._GET_URL_FILES_PATH, params={"url": url})
+ return response.json()
+ def get_url_info(self, url: str):
+ response = self._api_request("GET", self._GET_URL_INFO_PATH, params={"url": url})
+ return response.json()
+ def add_url(
+ self,
+ url: str,
+ destination_page_key = None,
+ destination_page_name = None,
+ show_destination_page = None,
+ service_names_to_additional_tags = None,
+ service_keys_to_additional_tags = None,
+ filterable_tags = None,
+ ):
+ if destination_page_key is not None and destination_page_name is not None:
+ raise ValueError("Exactly one of destination_page_key, destination_page_name is required")
+ payload: dict[str, T.Any] = {"url": url}
+ if destination_page_key is not None:
+ payload["destination_page_key"] = destination_page_key
+ if destination_page_name is not None:
+ payload["destination_page_name"] = destination_page_name
+ if show_destination_page is not None:
+ payload["show_destination_page"] = show_destination_page
+ if service_names_to_additional_tags is not None:
+ payload["service_names_to_additional_tags"] = service_names_to_additional_tags
+ if service_keys_to_additional_tags is not None:
+ payload["service_keys_to_additional_tags"] = service_keys_to_additional_tags
+ if filterable_tags is not None:
+ payload["filterable_tags"] = filterable_tags
+ response = self._api_request("POST", self._ADD_URL_PATH, json=payload)
+ return response.json()
+ def associate_url(
+ self,
+ hashes = None,
+ file_ids = None,
+ urls_to_add = None,
+ urls_to_delete = None,
+ ):
+ if hashes is None and file_ids is None:
+ raise ValueError("At least one of hashes, file_ids is required")
+ if urls_to_add is None and urls_to_delete is None:
+ raise ValueError("At least one of urls_to_add, urls_to_delete is required")
+ payload: dict[str, T.Any] = {}
+ if hashes is not None:
+ payload["hashes"] = hashes
+ if file_ids is not None:
+ payload["file_ids"] = file_ids
+ if urls_to_add is not None:
+ urls_to_add = urls_to_add
+ payload["urls_to_add"] = urls_to_add
+ if urls_to_delete is not None:
+ urls_to_delete = urls_to_delete
+ payload["urls_to_delete"] = urls_to_delete
+ self._api_request("POST", self._ASSOCIATE_URL_PATH, json=payload)
+ def set_notes(self, notes , hash_= None, file_id = None):
+ if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
+ raise ValueError("Exactly one of hash_, file_id is required")
+ payload: dict[str, T.Any] = {"notes": notes}
+ if hash_ is not None:
+ payload["hash"] = hash_
+ if file_id is not None:
+ payload["file_id"] = file_id
+ self._api_request("POST", self._SET_NOTES_PATH, json=payload)
+ def delete_notes(
+ self,
+ note_names ,
+ hash_ = None,
+ file_id = None
+ ):
+ if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
+ raise ValueError("Exactly one of hash_, file_id is required")
+ payload: dict[str, T.Any] = {"note_names": note_names}
+ if hash_ is not None:
+ payload["hash"] = hash_
+ if file_id is not None:
+ payload["file_id"] = file_id
+ self._api_request("POST", self._DELETE_NOTES_PATH, json=payload)
+ def get_cookies(self, domain: str):
+ response = self._api_request("GET", self._GET_COOKIES_PATH, params={"domain": domain})
+ return response.json()["cookies"]
+ def set_cookies(self, cookies ):
+ self._api_request("POST", self._SET_COOKIES_PATH, json={"cookies": cookies})
+ def set_user_agent(self, user_agent: str):
+ self._api_request("POST", self._SET_USER_AGENT_PATH, json={"user-agent": user_agent})
+ def get_pages(self):
+ response = self._api_request("GET", self._GET_PAGES_PATH)
+ return response.json()["pages"]
+ def get_page_info(self, page_key: str, simple = None):
+ parameters = {"page_key": page_key}
+ if simple is not None:
+ parameters["simple"] = json.dumps(simple, cls=JSONEncoder)
+ response = self._api_request("GET", self._GET_PAGE_INFO_PATH, params=parameters)
+ return response.json()["page_info"]
+ def add_files_to_page(
+ self,
+ page_key: str,
+ file_ids = None,
+ hashes = None
+ ):
+ if file_ids is None and hashes is None:
+ raise ValueError("At least one of file_ids, hashes is required")
+ payload: dict[str, T.Any] = {"page_key": page_key}
+ if file_ids is not None:
+ payload["file_ids"] = file_ids
+ if hashes is not None:
+ payload["hashes"] = hashes
+ self._api_request("POST", self._ADD_FILES_TO_PAGE_PATH, json=payload)
+ def focus_page(self, page_key: str):
+ self._api_request("POST", self._FOCUS_PAGE_PATH, json={"page_key": page_key})
+ def search_files(
+ self,
+ tags,
+ file_service_name = None,
+ file_service_key = None,
+ tag_service_name = None,
+ tag_service_key = None,
+ file_sort_type = None,
+ file_sort_asc = None,
+ return_hashes = None,
+ ):
+ if file_service_name is not None and file_service_key is not None:
+ raise ValueError("Exactly one of file_service_name, file_service_key is required")
+ if tag_service_name is not None and tag_service_key is not None:
+ raise ValueError("Exactly one of tag_service_name, tag_service_key is required")
+ parameters: dict[str, T.Union[str, int]] = {"tags": json.dumps(tags, cls=JSONEncoder)}
+ if file_service_name is not None:
+ parameters["file_service_name"] = file_service_name
+ if file_service_key is not None:
+ parameters["file_service_key"] = file_service_key
+ if tag_service_name is not None:
+ parameters["tag_service_name"] = tag_service_name
+ if tag_service_key is not None:
+ parameters["tag_service_key"] = tag_service_key
+ if file_sort_type is not None:
+ parameters["file_sort_type"] = file_sort_type
+ if file_sort_asc is not None:
+ parameters["file_sort_asc"] = json.dumps(file_sort_asc, cls=JSONEncoder)
+ if return_hashes is not None:
+ parameters["return_hashes"] = json.dumps(return_hashes, cls=JSONEncoder)
+ response = self._api_request("GET", self._SEARCH_FILES_PATH, params=parameters)
+ return response.json()["hashes" if return_hashes else "file_ids"]
+ def get_file_metadata(
+ self,
+ hashes = None,
+ file_ids = None,
+ create_new_file_ids = None,
+ only_return_identifiers = None,
+ only_return_basic_information = None,
+ detailed_url_information = None,
+ hide_service_name_tags = None,
+ include_notes = None
+ ):
+ if hashes is None and file_ids is None:
+ raise ValueError("At least one of hashes, file_ids is required")
+ parameters = {}
+ if hashes is not None:
+ parameters["hashes"] = json.dumps(hashes, cls=JSONEncoder)
+ if file_ids is not None:
+ parameters["file_ids"] = json.dumps(file_ids, cls=JSONEncoder)
+ if create_new_file_ids is not None:
+ parameters["create_new_file_ids"] = json.dumps(create_new_file_ids, cls=JSONEncoder)
+ if only_return_identifiers is not None:
+ parameters["only_return_identifiers"] = json.dumps(only_return_identifiers, cls=JSONEncoder)
+ if only_return_basic_information is not None:
+ parameters["only_return_basic_information"] = json.dumps(only_return_basic_information, cls=JSONEncoder)
+ if detailed_url_information is not None:
+ parameters["detailed_url_information"] = json.dumps(detailed_url_information, cls=JSONEncoder)
+ if hide_service_name_tags is not None:
+ parameters["hide_service_name_tags"] = json.dumps(hide_service_name_tags, cls=JSONEncoder)
+ if include_notes is not None:
+ parameters["include_notes"] = json.dumps(include_notes, cls=JSONEncoder)
+ response = self._api_request("GET", self._GET_FILE_METADATA_PATH, params=parameters)
+ return response.json()["metadata"]
+ def get_file(self, hash_ = None, file_id = None):
+ if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
+ raise ValueError("Exactly one of hash_, file_id is required")
+ parameters: dict[str, T.Union[str, int]] = {}
+ if hash_ is not None:
+ parameters["hash"] = hash_
+ if file_id is not None:
+ parameters["file_id"] = file_id
+ return self._api_request("GET", self._GET_FILE_PATH, params=parameters, stream=True)
+ def get_thumbnail(self, hash_ = None, file_id = None):
+ if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
+ raise ValueError("Exactly one of hash_, file_id is required")
+ parameters: dict[str, T.Union[str, int]] = {}
+ if hash_ is not None:
+ parameters["hash"] = hash_
+ if file_id is not None:
+ parameters["file_id"] = file_id
+ return self._api_request("GET", self._GET_THUMBNAIL_PATH, params=parameters, stream=True)
+ def lock_database(self):
+ self._api_request("POST", self._LOCK_DATABASE_PATH)
+ def unlock_database(self):
+ self._api_request("POST", self._UNLOCK_DATABASE_PATH)
+ def get_mr_bones(self):
+ return self._api_request("GET", self._MR_BONES_PATH).json()["boned_stats"]
+ def add_and_tag_files(
+ self,
+ paths_or_files,
+ tags ,
+ service_names = None,
+ service_keys = None,
+ ):
+ """Convenience method to add and tag multiple files at the same time.
+ If service_names and service_keys aren't specified, the default service name "my tags" will be used. If a file
+ already exists in Hydrus, it will also be tagged.
+ Returns:
+ list[dict[str, T.Any]]: Returns results of all `Client.add_file()` calls, matching the order of the
+ paths_or_files iterable
+ """
+ if service_names is None and service_keys is None:
+ service_names = ("my tags",)
+ results = []
+ hashes = set()
+ for path_or_file in paths_or_files:
+ result = self.add_file(path_or_file)
+ results.append(result)
+ if result["status"] != ImportStatus.FAILED:
+ hashes.add(result["hash"])
+ service_names_to_tags = {name: tags for name in service_names} if service_names is not None else None
+ service_keys_to_tags = {key: tags for key in service_keys} if service_keys is not None else None
+ # Ignore type, we know that hashes only contains strings
+ self.add_tags(hashes, service_names_to_tags=service_names_to_tags, service_keys_to_tags=service_keys_to_tags) # type: ignore
+ return results
+ def get_page_list(self):
+ """Convenience method that returns a flattened version of the page tree from `Client.get_pages()`.
+ Returns:
+ list[dict[str, T.Any]]: A list of every "pages" value in the page tree in pre-order (NLR)
+ """
+ tree = self.get_pages()
+ pages = []
+ def walk_tree(page: dict[str, T.Any]):
+ pages.append(page)
+ # Ignore type, we know that pages is always a list
+ for sub_page in page.get("pages", ()): # type: ignore
+ # Ignore type, we know that sub_page is always a dict
+ walk_tree(sub_page) # type: ignore
+ walk_tree(tree)
+ return pages
+__all__ = [
+ "__version__",
+ "HydrusAPIException",
+ "ConnectionError",
+ "APIError",
+ "MissingParameter",
+ "InsufficientAccess",
+ "DatabaseLocked",
+ "ServerError",
+ "Permission",
+ "URLType",
+ "ImportStatus",
+ "TagAction",
+ "TagStatus",
+ "PageType",
+ "FileSortType",
+ "Client",
diff --git a/scripts/hydrus_api/utils.py b/scripts/hydrus_api/utils.py
new file mode 100644
index 0000000..f6aea2a
--- /dev/null
+++ b/scripts/hydrus_api/utils.py
@@ -0,0 +1,102 @@
+# Copyright (C) 2021 cryzed
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU Affero General Public License for more details.
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+import collections
+import os
+import typing as T
+from collections import abc
+from hydrus_api import DEFAULT_API_URL, HYDRUS_METADATA_ENCODING, Client, Permission
+X = T.TypeVar("X")
+class TextFileLike(T.Protocol):
+ def read(self) -> str:
+ pass
+def verify_permissions(
+ client: Client, permissions: abc.Iterable[T.Union[int, Permission]], exact: bool = False
+) -> bool:
+ granted_permissions = set(client.verify_access_key()["basic_permissions"])
+ return granted_permissions == set(permissions) if exact else granted_permissions.issuperset(permissions)
+def cli_request_api_key(
+ name: str,
+ permissions: abc.Iterable[T.Union[int, Permission]],
+ verify: bool = True,
+ exact: bool = False,
+ api_url: str = DEFAULT_API_URL,
+) -> str:
+ while True:
+ input(
+ 'Navigate to "services->review services->local->client api" in the Hydrus client and click "add->from api '
+ 'request". Then press enter to continue...'
+ )
+ access_key = Client(api_url=api_url).request_new_permissions(name, permissions)
+ input("Press OK and then apply in the Hydrus client dialog. Then press enter to continue...")
+ client = Client(access_key, api_url)
+ if verify and not verify_permissions(client, permissions, exact):
+ granted = client.verify_access_key()["basic_permissions"]
+ print(
+ f"The granted permissions ({granted}) differ from the requested permissions ({permissions}), please "
+ "grant all requested permissions."
+ )
+ continue
+ return access_key
+def parse_hydrus_metadata(text: str) -> collections.defaultdict[T.Optional[str], set[str]]:
+ namespaces = collections.defaultdict(set)
+ for line in (line.strip() for line in text.splitlines()):
+ if not line:
+ continue
+ parts = line.split(":", 1)
+ namespace, tag = (None, line) if len(parts) == 1 else parts
+ namespaces[namespace].add(tag)
+ # Ignore type, mypy has trouble figuring out that tag isn't optional
+ return namespaces # type: ignore
+def parse_hydrus_metadata_file(
+ path_or_file: T.Union[str, os.PathLike, TextFileLike]
+) -> collections.defaultdict[T.Optional[str], set[str]]:
+ if isinstance(path_or_file, (str, os.PathLike)):
+ with open(path_or_file, encoding=HYDRUS_METADATA_ENCODING) as file:
+ return parse_hydrus_metadata(file.read())
+ return parse_hydrus_metadata(path_or_file.read())
+# Useful for splitting up requests to get_file_metadata()
+def yield_chunks(sequence: T.Sequence[X], chunk_size: int, offset: int = 0) -> T.Generator[T.Sequence[X], None, None]:
+ while offset < len(sequence):
+ yield sequence[offset : offset + chunk_size]
+ offset += chunk_size
+__all__ = [
+ "verify_permissions",
+ "cli_request_api_key",
+ "parse_hydrus_metadata",
+ "parse_hydrus_metadata_file",
+ "yield_chunks",