From 88a0deccca308d8d8b6555ef0f85db29e2d67033 Mon Sep 17 00:00:00 2001 From: ZeroCool940711 Date: Mon, 17 Oct 2022 05:11:01 -0700 Subject: [PATCH] Added hydrus_api library which will be used to query hydrus for tags. --- scripts/hydrus_api/__init__.py | 766 +++++++++++++++++++++++++++++++++ scripts/hydrus_api/utils.py | 102 +++++ 2 files changed, 868 insertions(+) create mode 100644 scripts/hydrus_api/__init__.py create mode 100644 scripts/hydrus_api/utils.py diff --git a/scripts/hydrus_api/__init__.py b/scripts/hydrus_api/__init__.py new file mode 100644 index 0000000..63cdf02 --- /dev/null +++ b/scripts/hydrus_api/__init__.py @@ -0,0 +1,766 @@ +# Copyright (C) 2021 cryzed +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import enum +import json +import os +import typing as T +from collections import abc + +import requests + +__version__ = "4.0.0" + +DEFAULT_API_URL = "http://127.0.0.1:45869/" +HYDRUS_METADATA_ENCODING = "utf-8" +AUTHENTICATION_TIMEOUT_CODE = 419 + + +class HydrusAPIException(Exception): + pass + + +class ConnectionError(HydrusAPIException, requests.ConnectTimeout): + pass + + +class APIError(HydrusAPIException): + def __init__(self, response: requests.Response): + super().__init__(response.text) + self.response = response + + +class MissingParameter(APIError): + pass + + +class InsufficientAccess(APIError): + pass + + +class DatabaseLocked(APIError): + pass + + +class ServerError(APIError): + pass + + +# Customize IntEnum, so we can just do str(Enum.member) to get the string representation of its value unmodified, +# without users having to access .value explicitly +class StringableIntEnum(enum.IntEnum): + def __str__(self): + return str(self.value) + + +@enum.unique +class Permission(StringableIntEnum): + IMPORT_URLS = 0 + IMPORT_FILES = 1 + ADD_TAGS = 2 + SEARCH_FILES = 3 + MANAGE_PAGES = 4 + MANAGE_COOKIES = 5 + MANAGE_DATABASE = 6 + ADD_NOTES = 7 + + +@enum.unique +class URLType(StringableIntEnum): + POST_URL = 0 + FILE_URL = 2 + GALLERY_URL = 3 + WATCHABLE_URL = 4 + UNKNOWN_URL = 5 + + +@enum.unique +class ImportStatus(StringableIntEnum): + IMPORTABLE = 0 + SUCCESS = 1 + EXISTS = 2 + PREVIOUSLY_DELETED = 3 + FAILED = 4 + VETOED = 7 + + +@enum.unique +class TagAction(StringableIntEnum): + ADD = 0 + DELETE = 1 + PEND = 2 + RESCIND_PENDING = 3 + PETITION = 4 + RESCIND_PETITION = 5 + + +@enum.unique +class TagStatus(StringableIntEnum): + CURRENT = 0 + PENDING = 1 + DELETED = 2 + PETITIONED = 3 + + +@enum.unique +class PageType(StringableIntEnum): + GALLERY_DOWNLOADER = 1 + SIMPLE_DOWNLOADER = 2 + HARD_DRIVE_IMPORT = 3 + PETITIONS = 5 + FILE_SEARCH = 6 + URL_DOWNLOADER = 7 + DUPLICATES = 8 + THREAD_WATCHER = 9 + PAGE_OF_PAGES = 10 + + +@enum.unique +class FileSortType(StringableIntEnum): + FILE_SIZE = 0 + DURATION = 1 + IMPORT_TIME = 2 + FILE_TYPE = 3 + RANDOM = 4 + WIDTH = 5 + HEIGHT = 6 + RATIO = 7 + NUMBER_OF_PIXELS = 8 + NUMBER_OF_TAGS = 9 + NUMBER_OF_MEDIA_VIEWS = 10 + TOTAL_MEDIA_VIEWTIME = 11 + APPROXIMATE_BITRATE = 12 + HAS_AUDIO = 13 + MODIFIED_TIME = 14 + FRAMERATE = 15 + NUMBER_OF_FRAMES = 16 + + +class BinaryFileLike(T.Protocol): + def read(self): + ... + + +# The client should accept all objects that either support the iterable or mapping protocol. We must ensure that objects +# are either lists or dicts, so Python's json module can handle them +class JSONEncoder(json.JSONEncoder): + def default(self, object_: T.Any): + if isinstance(object_, abc.Mapping): + return dict(object_) + if isinstance(object_, abc.Iterable): + return list(object_) + return super().default(object_) + + +class Client: + VERSION = 31 + + # Access Management + _GET_API_VERSION_PATH = "/api_version" + _REQUEST_NEW_PERMISSIONS_PATH = "/request_new_permissions" + _GET_SESSION_KEY_PATH = "/session_key" + _VERIFY_ACCESS_KEY_PATH = "/verify_access_key" + _GET_SERVICES_PATH = "/get_services" + + # Adding Files + _ADD_FILE_PATH = "/add_files/add_file" + _DELETE_FILES_PATH = "/add_files/delete_files" + _UNDELETE_FILES_PATH = "/add_files/undelete_files" + _ARCHIVE_FILES_PATH = "/add_files/archive_files" + _UNARCHIVE_FILES_PATH = "/add_files/unarchive_files" + + # Adding Tags + _CLEAN_TAGS_PATH = "/add_tags/clean_tags" + _SEARCH_TAGS_PATH = "/add_tags/search_tags" + _ADD_TAGS_PATH = "/add_tags/add_tags" + + # Adding URLs + _GET_URL_FILES_PATH = "/add_urls/get_url_files" + _GET_URL_INFO_PATH = "/add_urls/get_url_info" + _ADD_URL_PATH = "/add_urls/add_url" + _ASSOCIATE_URL_PATH = "/add_urls/associate_url" + + # Adding Notes + _SET_NOTES_PATH = "/add_notes/set_notes" + _DELETE_NOTES_PATH = "/add_notes/delete_notes" + + # Managing Cookies and HTTP Headers + _GET_COOKIES_PATH = "/manage_cookies/get_cookies" + _SET_COOKIES_PATH = "/manage_cookies/set_cookies" + _SET_USER_AGENT_PATH = "/manage_headers/set_user_agent" + + # Managing Pages + _GET_PAGES_PATH = "/manage_pages/get_pages" + _GET_PAGE_INFO_PATH = "/manage_pages/get_page_info" + _ADD_FILES_TO_PAGE_PATH = "/manage_pages/add_files" + _FOCUS_PAGE_PATH = "/manage_pages/focus_page" + + # Searching and Fetching Files + _SEARCH_FILES_PATH = "/get_files/search_files" + _GET_FILE_METADATA_PATH = "/get_files/file_metadata" + _GET_FILE_PATH = "/get_files/file" + _GET_THUMBNAIL_PATH = "/get_files/thumbnail" + + # Managing the Database + _LOCK_DATABASE_PATH = "/manage_database/lock_on" + _UNLOCK_DATABASE_PATH = "/manage_database/lock_off" + _MR_BONES_PATH = "/manage_database/mr_bones" + + def __init__( + self, + access_key = None, + api_url: str = DEFAULT_API_URL, + session = None, + ): + """ + See https://hydrusnetwork.github.io/hydrus/help/client_api.html for documentation. + """ + + self.access_key = access_key + self.api_url = api_url.rstrip("/") + self.session = session or requests.Session() + + def _api_request(self, method: str, path: str, **kwargs: T.Any): + if self.access_key is not None: + kwargs.setdefault("headers", {}).update({"Hydrus-Client-API-Access-Key": self.access_key}) + + # Make sure we use our custom JSONEncoder that can serialize all objects that implement the iterable or mapping + # protocol + json_data = kwargs.pop("json", None) + if json_data is not None: + kwargs["data"] = json.dumps(json_data, cls=JSONEncoder) + # Since we aren't using the json keyword-argument, we have to set the Content-Type manually + kwargs["headers"]["Content-Type"] = "application/json" + + try: + response = self.session.request(method, self.api_url + path, **kwargs) + except requests.RequestException as error: + # Re-raise connection and timeout errors as hydrus.ConnectionErrors so these are more easy to handle for + # client applications + raise ConnectionError(*error.args) + + try: + response.raise_for_status() + except requests.HTTPError: + if response.status_code == requests.codes.bad_request: + raise MissingParameter(response) + elif response.status_code in { + requests.codes.unauthorized, + requests.codes.forbidden, + AUTHENTICATION_TIMEOUT_CODE, + }: + raise InsufficientAccess(response) + elif response.status_code == requests.codes.service_unavailable: + raise DatabaseLocked(response) + elif response.status_code == requests.codes.server_error: + raise ServerError(response) + raise APIError(response) + + return response + + def get_api_version(self): + response = self._api_request("GET", self._GET_API_VERSION_PATH) + return response.json() + + def request_new_permissions(self, name, permissions): + response = self._api_request( + "GET", + self._REQUEST_NEW_PERMISSIONS_PATH, + params={"name": name, "basic_permissions": json.dumps(permissions, cls=JSONEncoder)}, + ) + return response.json()["access_key"] + + def get_session_key(self): + response = self._api_request("GET", self._GET_SESSION_KEY_PATH) + return response.json()["session_key"] + + def verify_access_key(self): + response = self._api_request("GET", self._VERIFY_ACCESS_KEY_PATH) + return response.json() + + def get_services(self): + response = self._api_request("GET", self._GET_SERVICES_PATH) + return response.json() + + def add_file(self, path_or_file: T.Union[str, os.PathLike, BinaryFileLike]): + if isinstance(path_or_file, (str, os.PathLike)): + response = self._api_request("POST", self._ADD_FILE_PATH, json={"path": os.fspath(path_or_file)}) + else: + response = self._api_request( + "POST", + self._ADD_FILE_PATH, + data=path_or_file.read(), + headers={"Content-Type": "application/octet-stream"}, + ) + + return response.json() + + def delete_files( + self, + hashes = None, + file_ids = None, + file_service_name = None, + file_service_key = None, + reason = None + ): + if hashes is None and file_ids is None: + raise ValueError("At least one of hashes, file_ids is required") + if file_service_name is not None and file_service_key is not None: + raise ValueError("Exactly one of file_service_name, file_service_key is required") + + payload: dict[str, T.Any] = {} + if hashes is not None: + payload["hashes"] = hashes + if file_ids is not None: + payload["file_ids"] = file_ids + if file_service_name is not None: + payload["file_service_name"] = file_service_name + if file_service_key is not None: + payload["file_service_key"] = file_service_key + if reason is not None: + payload["reason"] = reason + + self._api_request("POST", self._DELETE_FILES_PATH, json=payload) + + def undelete_files( + self, + hashes = None, + file_ids = None, + file_service_name = None, + file_service_key = None, + ): + if hashes is None and file_ids is None: + raise ValueError("At least one of hashes, file_ids is required") + if file_service_name is not None and file_service_key is not None: + raise ValueError("Exactly one of file_service_name, file_service_key is required") + + payload: dict[str, T.Any] = {} + if hashes is not None: + payload["hashes"] = hashes + if file_ids is not None: + payload["file_ids"] = file_ids + if file_service_name is not None: + payload["file_service_name"] = file_service_name + if file_service_key is not None: + payload["file_service_key"] = file_service_key + + self._api_request("POST", self._UNDELETE_FILES_PATH, json=payload) + + def archive_files( + self, + hashes = None, + file_ids = None + ): + if hashes is None and file_ids is None: + raise ValueError("At least one of hashes, file_ids is required") + + payload: dict[str, T.Any] = {} + if hashes is not None: + payload["hashes"] = hashes + if file_ids is not None: + payload["file_ids"] = file_ids + + self._api_request("POST", self._ARCHIVE_FILES_PATH, json=payload) + + def unarchive_files( + self, + hashes = None, + file_ids = None + ): + if hashes is None and file_ids is None: + raise ValueError("At least one of hashes, file_ids is required") + + payload: dict[str, T.Any] = {} + if hashes is not None: + payload["hashes"] = hashes + if file_ids is not None: + payload["file_ids"] = file_ids + + self._api_request("POST", self._UNARCHIVE_FILES_PATH, json=payload) + + def clean_tags(self, tags ): + response = self._api_request("GET", self._CLEAN_TAGS_PATH, params={"tags": json.dumps(tags, cls=JSONEncoder)}) + return response.json()["tags"] + + def search_tags( + self, + search: str, + tag_service_key = None, + tag_service_name = None + ): + if tag_service_name is not None and tag_service_key is not None: + raise ValueError("Exactly one of tag_service_name, tag_service_key is required") + + payload: dict[str, T.Any] = {"search": search} + if tag_service_key is not None: + payload["tag_service_key"] = tag_service_key + if tag_service_name is not None: + payload["tag_service_name"] = tag_service_name + + response = self._api_request("GET", self._SEARCH_TAGS_PATH, params=payload) + return response.json()["tags"] + + def add_tags( + self, + hashes = None, + file_ids = None, + service_names_to_tags = None, + service_keys_to_tags = None, + service_names_to_actions_to_tags = None, + service_keys_to_actions_to_tags = None, + ): + if hashes is None and file_ids is None: + raise ValueError("At least one of hashes, file_ids is required") + if ( + service_names_to_tags is None + and service_keys_to_tags is None + and service_names_to_actions_to_tags is None + and service_keys_to_actions_to_tags is None + ): + raise ValueError( + "At least one of service_names_to_tags, service_keys_to_tags, service_names_to_actions_to_tags or " + "service_keys_to_actions_to_tags is required" + ) + + payload: dict[str, T.Any] = {} + if hashes is not None: + payload["hashes"] = hashes + if file_ids is not None: + payload["file_ids"] = file_ids + if service_names_to_tags is not None: + payload["service_names_to_tags"] = service_names_to_tags + if service_keys_to_tags is not None: + payload["service_keys_to_tags"] = service_keys_to_tags + if service_names_to_actions_to_tags is not None: + payload["service_names_to_actions_to_tags"] = service_names_to_actions_to_tags + if service_keys_to_actions_to_tags is not None: + payload["service_keys_to_actions_to_tags"] = service_keys_to_actions_to_tags + + self._api_request("POST", self._ADD_TAGS_PATH, json=payload) + + def get_url_files(self, url: str): + response = self._api_request("GET", self._GET_URL_FILES_PATH, params={"url": url}) + return response.json() + + def get_url_info(self, url: str): + response = self._api_request("GET", self._GET_URL_INFO_PATH, params={"url": url}) + return response.json() + + def add_url( + self, + url: str, + destination_page_key = None, + destination_page_name = None, + show_destination_page = None, + service_names_to_additional_tags = None, + service_keys_to_additional_tags = None, + filterable_tags = None, + ): + if destination_page_key is not None and destination_page_name is not None: + raise ValueError("Exactly one of destination_page_key, destination_page_name is required") + + payload: dict[str, T.Any] = {"url": url} + if destination_page_key is not None: + payload["destination_page_key"] = destination_page_key + if destination_page_name is not None: + payload["destination_page_name"] = destination_page_name + if show_destination_page is not None: + payload["show_destination_page"] = show_destination_page + if service_names_to_additional_tags is not None: + payload["service_names_to_additional_tags"] = service_names_to_additional_tags + if service_keys_to_additional_tags is not None: + payload["service_keys_to_additional_tags"] = service_keys_to_additional_tags + if filterable_tags is not None: + payload["filterable_tags"] = filterable_tags + + response = self._api_request("POST", self._ADD_URL_PATH, json=payload) + return response.json() + + def associate_url( + self, + hashes = None, + file_ids = None, + urls_to_add = None, + urls_to_delete = None, + ): + if hashes is None and file_ids is None: + raise ValueError("At least one of hashes, file_ids is required") + if urls_to_add is None and urls_to_delete is None: + raise ValueError("At least one of urls_to_add, urls_to_delete is required") + + payload: dict[str, T.Any] = {} + if hashes is not None: + payload["hashes"] = hashes + if file_ids is not None: + payload["file_ids"] = file_ids + if urls_to_add is not None: + urls_to_add = urls_to_add + payload["urls_to_add"] = urls_to_add + if urls_to_delete is not None: + urls_to_delete = urls_to_delete + payload["urls_to_delete"] = urls_to_delete + + self._api_request("POST", self._ASSOCIATE_URL_PATH, json=payload) + + def set_notes(self, notes , hash_= None, file_id = None): + if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None): + raise ValueError("Exactly one of hash_, file_id is required") + + payload: dict[str, T.Any] = {"notes": notes} + if hash_ is not None: + payload["hash"] = hash_ + if file_id is not None: + payload["file_id"] = file_id + + self._api_request("POST", self._SET_NOTES_PATH, json=payload) + + def delete_notes( + self, + note_names , + hash_ = None, + file_id = None + ): + if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None): + raise ValueError("Exactly one of hash_, file_id is required") + + payload: dict[str, T.Any] = {"note_names": note_names} + if hash_ is not None: + payload["hash"] = hash_ + if file_id is not None: + payload["file_id"] = file_id + + self._api_request("POST", self._DELETE_NOTES_PATH, json=payload) + + def get_cookies(self, domain: str): + response = self._api_request("GET", self._GET_COOKIES_PATH, params={"domain": domain}) + return response.json()["cookies"] + + def set_cookies(self, cookies ): + self._api_request("POST", self._SET_COOKIES_PATH, json={"cookies": cookies}) + + def set_user_agent(self, user_agent: str): + self._api_request("POST", self._SET_USER_AGENT_PATH, json={"user-agent": user_agent}) + + def get_pages(self): + response = self._api_request("GET", self._GET_PAGES_PATH) + return response.json()["pages"] + + def get_page_info(self, page_key: str, simple = None): + parameters = {"page_key": page_key} + if simple is not None: + parameters["simple"] = json.dumps(simple, cls=JSONEncoder) + + response = self._api_request("GET", self._GET_PAGE_INFO_PATH, params=parameters) + return response.json()["page_info"] + + def add_files_to_page( + self, + page_key: str, + file_ids = None, + hashes = None + ): + if file_ids is None and hashes is None: + raise ValueError("At least one of file_ids, hashes is required") + + payload: dict[str, T.Any] = {"page_key": page_key} + if file_ids is not None: + payload["file_ids"] = file_ids + if hashes is not None: + payload["hashes"] = hashes + + self._api_request("POST", self._ADD_FILES_TO_PAGE_PATH, json=payload) + + def focus_page(self, page_key: str): + self._api_request("POST", self._FOCUS_PAGE_PATH, json={"page_key": page_key}) + + def search_files( + self, + tags, + file_service_name = None, + file_service_key = None, + tag_service_name = None, + tag_service_key = None, + file_sort_type = None, + file_sort_asc = None, + return_hashes = None, + ): + if file_service_name is not None and file_service_key is not None: + raise ValueError("Exactly one of file_service_name, file_service_key is required") + if tag_service_name is not None and tag_service_key is not None: + raise ValueError("Exactly one of tag_service_name, tag_service_key is required") + + parameters: dict[str, T.Union[str, int]] = {"tags": json.dumps(tags, cls=JSONEncoder)} + if file_service_name is not None: + parameters["file_service_name"] = file_service_name + if file_service_key is not None: + parameters["file_service_key"] = file_service_key + + if tag_service_name is not None: + parameters["tag_service_name"] = tag_service_name + if tag_service_key is not None: + parameters["tag_service_key"] = tag_service_key + + if file_sort_type is not None: + parameters["file_sort_type"] = file_sort_type + if file_sort_asc is not None: + parameters["file_sort_asc"] = json.dumps(file_sort_asc, cls=JSONEncoder) + if return_hashes is not None: + parameters["return_hashes"] = json.dumps(return_hashes, cls=JSONEncoder) + + response = self._api_request("GET", self._SEARCH_FILES_PATH, params=parameters) + return response.json()["hashes" if return_hashes else "file_ids"] + + def get_file_metadata( + self, + hashes = None, + file_ids = None, + create_new_file_ids = None, + only_return_identifiers = None, + only_return_basic_information = None, + detailed_url_information = None, + hide_service_name_tags = None, + include_notes = None + ): + if hashes is None and file_ids is None: + raise ValueError("At least one of hashes, file_ids is required") + + parameters = {} + if hashes is not None: + parameters["hashes"] = json.dumps(hashes, cls=JSONEncoder) + if file_ids is not None: + parameters["file_ids"] = json.dumps(file_ids, cls=JSONEncoder) + + if create_new_file_ids is not None: + parameters["create_new_file_ids"] = json.dumps(create_new_file_ids, cls=JSONEncoder) + if only_return_identifiers is not None: + parameters["only_return_identifiers"] = json.dumps(only_return_identifiers, cls=JSONEncoder) + if only_return_basic_information is not None: + parameters["only_return_basic_information"] = json.dumps(only_return_basic_information, cls=JSONEncoder) + if detailed_url_information is not None: + parameters["detailed_url_information"] = json.dumps(detailed_url_information, cls=JSONEncoder) + if hide_service_name_tags is not None: + parameters["hide_service_name_tags"] = json.dumps(hide_service_name_tags, cls=JSONEncoder) + if include_notes is not None: + parameters["include_notes"] = json.dumps(include_notes, cls=JSONEncoder) + + response = self._api_request("GET", self._GET_FILE_METADATA_PATH, params=parameters) + return response.json()["metadata"] + + def get_file(self, hash_ = None, file_id = None): + if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None): + raise ValueError("Exactly one of hash_, file_id is required") + + parameters: dict[str, T.Union[str, int]] = {} + if hash_ is not None: + parameters["hash"] = hash_ + if file_id is not None: + parameters["file_id"] = file_id + + return self._api_request("GET", self._GET_FILE_PATH, params=parameters, stream=True) + + def get_thumbnail(self, hash_ = None, file_id = None): + if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None): + raise ValueError("Exactly one of hash_, file_id is required") + + parameters: dict[str, T.Union[str, int]] = {} + if hash_ is not None: + parameters["hash"] = hash_ + if file_id is not None: + parameters["file_id"] = file_id + + return self._api_request("GET", self._GET_THUMBNAIL_PATH, params=parameters, stream=True) + + def lock_database(self): + self._api_request("POST", self._LOCK_DATABASE_PATH) + + def unlock_database(self): + self._api_request("POST", self._UNLOCK_DATABASE_PATH) + + def get_mr_bones(self): + return self._api_request("GET", self._MR_BONES_PATH).json()["boned_stats"] + + def add_and_tag_files( + self, + paths_or_files, + tags , + service_names = None, + service_keys = None, + ): + """Convenience method to add and tag multiple files at the same time. + + If service_names and service_keys aren't specified, the default service name "my tags" will be used. If a file + already exists in Hydrus, it will also be tagged. + + Returns: + list[dict[str, T.Any]]: Returns results of all `Client.add_file()` calls, matching the order of the + paths_or_files iterable + """ + if service_names is None and service_keys is None: + service_names = ("my tags",) + + results = [] + hashes = set() + for path_or_file in paths_or_files: + result = self.add_file(path_or_file) + results.append(result) + if result["status"] != ImportStatus.FAILED: + hashes.add(result["hash"]) + + service_names_to_tags = {name: tags for name in service_names} if service_names is not None else None + service_keys_to_tags = {key: tags for key in service_keys} if service_keys is not None else None + # Ignore type, we know that hashes only contains strings + self.add_tags(hashes, service_names_to_tags=service_names_to_tags, service_keys_to_tags=service_keys_to_tags) # type: ignore + return results + + def get_page_list(self): + """Convenience method that returns a flattened version of the page tree from `Client.get_pages()`. + + Returns: + list[dict[str, T.Any]]: A list of every "pages" value in the page tree in pre-order (NLR) + """ + tree = self.get_pages() + pages = [] + + def walk_tree(page: dict[str, T.Any]): + pages.append(page) + # Ignore type, we know that pages is always a list + for sub_page in page.get("pages", ()): # type: ignore + # Ignore type, we know that sub_page is always a dict + walk_tree(sub_page) # type: ignore + + walk_tree(tree) + return pages + + +__all__ = [ + "__version__", + "DEFAULT_API_URL", + "HYDRUS_METADATA_ENCODING", + "HydrusAPIException", + "ConnectionError", + "APIError", + "MissingParameter", + "InsufficientAccess", + "DatabaseLocked", + "ServerError", + "Permission", + "URLType", + "ImportStatus", + "TagAction", + "TagStatus", + "PageType", + "FileSortType", + "Client", +] diff --git a/scripts/hydrus_api/utils.py b/scripts/hydrus_api/utils.py new file mode 100644 index 0000000..f6aea2a --- /dev/null +++ b/scripts/hydrus_api/utils.py @@ -0,0 +1,102 @@ +# Copyright (C) 2021 cryzed +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +import collections +import os +import typing as T +from collections import abc + +from hydrus_api import DEFAULT_API_URL, HYDRUS_METADATA_ENCODING, Client, Permission + +X = T.TypeVar("X") + + +class TextFileLike(T.Protocol): + def read(self) -> str: + pass + + +def verify_permissions( + client: Client, permissions: abc.Iterable[T.Union[int, Permission]], exact: bool = False +) -> bool: + granted_permissions = set(client.verify_access_key()["basic_permissions"]) + return granted_permissions == set(permissions) if exact else granted_permissions.issuperset(permissions) + + +def cli_request_api_key( + name: str, + permissions: abc.Iterable[T.Union[int, Permission]], + verify: bool = True, + exact: bool = False, + api_url: str = DEFAULT_API_URL, +) -> str: + while True: + input( + 'Navigate to "services->review services->local->client api" in the Hydrus client and click "add->from api ' + 'request". Then press enter to continue...' + ) + access_key = Client(api_url=api_url).request_new_permissions(name, permissions) + input("Press OK and then apply in the Hydrus client dialog. Then press enter to continue...") + + client = Client(access_key, api_url) + if verify and not verify_permissions(client, permissions, exact): + granted = client.verify_access_key()["basic_permissions"] + print( + f"The granted permissions ({granted}) differ from the requested permissions ({permissions}), please " + "grant all requested permissions." + ) + continue + + return access_key + + +def parse_hydrus_metadata(text: str) -> collections.defaultdict[T.Optional[str], set[str]]: + namespaces = collections.defaultdict(set) + for line in (line.strip() for line in text.splitlines()): + if not line: + continue + + parts = line.split(":", 1) + namespace, tag = (None, line) if len(parts) == 1 else parts + namespaces[namespace].add(tag) + + # Ignore type, mypy has trouble figuring out that tag isn't optional + return namespaces # type: ignore + + +def parse_hydrus_metadata_file( + path_or_file: T.Union[str, os.PathLike, TextFileLike] +) -> collections.defaultdict[T.Optional[str], set[str]]: + if isinstance(path_or_file, (str, os.PathLike)): + with open(path_or_file, encoding=HYDRUS_METADATA_ENCODING) as file: + return parse_hydrus_metadata(file.read()) + + return parse_hydrus_metadata(path_or_file.read()) + + +# Useful for splitting up requests to get_file_metadata() +def yield_chunks(sequence: T.Sequence[X], chunk_size: int, offset: int = 0) -> T.Generator[T.Sequence[X], None, None]: + while offset < len(sequence): + yield sequence[offset : offset + chunk_size] + offset += chunk_size + + +__all__ = [ + "verify_permissions", + "cli_request_api_key", + "parse_hydrus_metadata", + "parse_hydrus_metadata_file", + "yield_chunks", +]