sygil-webui/scripts/hydrus_api/__init__.py

767 lines
26 KiB
Python

# Copyright (C) 2021 cryzed
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import enum
import json
import os
import typing as T
from collections import abc
import requests
__version__ = "4.0.0"
DEFAULT_API_URL = "http://127.0.0.1:45869/"
HYDRUS_METADATA_ENCODING = "utf-8"
AUTHENTICATION_TIMEOUT_CODE = 419
class HydrusAPIException(Exception):
pass
class ConnectionError(HydrusAPIException, requests.ConnectTimeout):
pass
class APIError(HydrusAPIException):
def __init__(self, response: requests.Response):
super().__init__(response.text)
self.response = response
class MissingParameter(APIError):
pass
class InsufficientAccess(APIError):
pass
class DatabaseLocked(APIError):
pass
class ServerError(APIError):
pass
# Customize IntEnum, so we can just do str(Enum.member) to get the string representation of its value unmodified,
# without users having to access .value explicitly
class StringableIntEnum(enum.IntEnum):
def __str__(self):
return str(self.value)
@enum.unique
class Permission(StringableIntEnum):
IMPORT_URLS = 0
IMPORT_FILES = 1
ADD_TAGS = 2
SEARCH_FILES = 3
MANAGE_PAGES = 4
MANAGE_COOKIES = 5
MANAGE_DATABASE = 6
ADD_NOTES = 7
@enum.unique
class URLType(StringableIntEnum):
POST_URL = 0
FILE_URL = 2
GALLERY_URL = 3
WATCHABLE_URL = 4
UNKNOWN_URL = 5
@enum.unique
class ImportStatus(StringableIntEnum):
IMPORTABLE = 0
SUCCESS = 1
EXISTS = 2
PREVIOUSLY_DELETED = 3
FAILED = 4
VETOED = 7
@enum.unique
class TagAction(StringableIntEnum):
ADD = 0
DELETE = 1
PEND = 2
RESCIND_PENDING = 3
PETITION = 4
RESCIND_PETITION = 5
@enum.unique
class TagStatus(StringableIntEnum):
CURRENT = 0
PENDING = 1
DELETED = 2
PETITIONED = 3
@enum.unique
class PageType(StringableIntEnum):
GALLERY_DOWNLOADER = 1
SIMPLE_DOWNLOADER = 2
HARD_DRIVE_IMPORT = 3
PETITIONS = 5
FILE_SEARCH = 6
URL_DOWNLOADER = 7
DUPLICATES = 8
THREAD_WATCHER = 9
PAGE_OF_PAGES = 10
@enum.unique
class FileSortType(StringableIntEnum):
FILE_SIZE = 0
DURATION = 1
IMPORT_TIME = 2
FILE_TYPE = 3
RANDOM = 4
WIDTH = 5
HEIGHT = 6
RATIO = 7
NUMBER_OF_PIXELS = 8
NUMBER_OF_TAGS = 9
NUMBER_OF_MEDIA_VIEWS = 10
TOTAL_MEDIA_VIEWTIME = 11
APPROXIMATE_BITRATE = 12
HAS_AUDIO = 13
MODIFIED_TIME = 14
FRAMERATE = 15
NUMBER_OF_FRAMES = 16
class BinaryFileLike(T.Protocol):
def read(self):
...
# The client should accept all objects that either support the iterable or mapping protocol. We must ensure that objects
# are either lists or dicts, so Python's json module can handle them
class JSONEncoder(json.JSONEncoder):
def default(self, object_: T.Any):
if isinstance(object_, abc.Mapping):
return dict(object_)
if isinstance(object_, abc.Iterable):
return list(object_)
return super().default(object_)
class Client:
VERSION = 31
# Access Management
_GET_API_VERSION_PATH = "/api_version"
_REQUEST_NEW_PERMISSIONS_PATH = "/request_new_permissions"
_GET_SESSION_KEY_PATH = "/session_key"
_VERIFY_ACCESS_KEY_PATH = "/verify_access_key"
_GET_SERVICES_PATH = "/get_services"
# Adding Files
_ADD_FILE_PATH = "/add_files/add_file"
_DELETE_FILES_PATH = "/add_files/delete_files"
_UNDELETE_FILES_PATH = "/add_files/undelete_files"
_ARCHIVE_FILES_PATH = "/add_files/archive_files"
_UNARCHIVE_FILES_PATH = "/add_files/unarchive_files"
# Adding Tags
_CLEAN_TAGS_PATH = "/add_tags/clean_tags"
_SEARCH_TAGS_PATH = "/add_tags/search_tags"
_ADD_TAGS_PATH = "/add_tags/add_tags"
# Adding URLs
_GET_URL_FILES_PATH = "/add_urls/get_url_files"
_GET_URL_INFO_PATH = "/add_urls/get_url_info"
_ADD_URL_PATH = "/add_urls/add_url"
_ASSOCIATE_URL_PATH = "/add_urls/associate_url"
# Adding Notes
_SET_NOTES_PATH = "/add_notes/set_notes"
_DELETE_NOTES_PATH = "/add_notes/delete_notes"
# Managing Cookies and HTTP Headers
_GET_COOKIES_PATH = "/manage_cookies/get_cookies"
_SET_COOKIES_PATH = "/manage_cookies/set_cookies"
_SET_USER_AGENT_PATH = "/manage_headers/set_user_agent"
# Managing Pages
_GET_PAGES_PATH = "/manage_pages/get_pages"
_GET_PAGE_INFO_PATH = "/manage_pages/get_page_info"
_ADD_FILES_TO_PAGE_PATH = "/manage_pages/add_files"
_FOCUS_PAGE_PATH = "/manage_pages/focus_page"
# Searching and Fetching Files
_SEARCH_FILES_PATH = "/get_files/search_files"
_GET_FILE_METADATA_PATH = "/get_files/file_metadata"
_GET_FILE_PATH = "/get_files/file"
_GET_THUMBNAIL_PATH = "/get_files/thumbnail"
# Managing the Database
_LOCK_DATABASE_PATH = "/manage_database/lock_on"
_UNLOCK_DATABASE_PATH = "/manage_database/lock_off"
_MR_BONES_PATH = "/manage_database/mr_bones"
def __init__(
self,
access_key = None,
api_url: str = DEFAULT_API_URL,
session = None,
):
"""
See https://hydrusnetwork.github.io/hydrus/help/client_api.html for documentation.
"""
self.access_key = access_key
self.api_url = api_url.rstrip("/")
self.session = session or requests.Session()
def _api_request(self, method: str, path: str, **kwargs: T.Any):
if self.access_key is not None:
kwargs.setdefault("headers", {}).update({"Hydrus-Client-API-Access-Key": self.access_key})
# Make sure we use our custom JSONEncoder that can serialize all objects that implement the iterable or mapping
# protocol
json_data = kwargs.pop("json", None)
if json_data is not None:
kwargs["data"] = json.dumps(json_data, cls=JSONEncoder)
# Since we aren't using the json keyword-argument, we have to set the Content-Type manually
kwargs["headers"]["Content-Type"] = "application/json"
try:
response = self.session.request(method, self.api_url + path, **kwargs)
except requests.RequestException as error:
# Re-raise connection and timeout errors as hydrus.ConnectionErrors so these are more easy to handle for
# client applications
raise ConnectionError(*error.args)
try:
response.raise_for_status()
except requests.HTTPError:
if response.status_code == requests.codes.bad_request:
raise MissingParameter(response)
elif response.status_code in {
requests.codes.unauthorized,
requests.codes.forbidden,
AUTHENTICATION_TIMEOUT_CODE,
}:
raise InsufficientAccess(response)
elif response.status_code == requests.codes.service_unavailable:
raise DatabaseLocked(response)
elif response.status_code == requests.codes.server_error:
raise ServerError(response)
raise APIError(response)
return response
def get_api_version(self):
response = self._api_request("GET", self._GET_API_VERSION_PATH)
return response.json()
def request_new_permissions(self, name, permissions):
response = self._api_request(
"GET",
self._REQUEST_NEW_PERMISSIONS_PATH,
params={"name": name, "basic_permissions": json.dumps(permissions, cls=JSONEncoder)},
)
return response.json()["access_key"]
def get_session_key(self):
response = self._api_request("GET", self._GET_SESSION_KEY_PATH)
return response.json()["session_key"]
def verify_access_key(self):
response = self._api_request("GET", self._VERIFY_ACCESS_KEY_PATH)
return response.json()
def get_services(self):
response = self._api_request("GET", self._GET_SERVICES_PATH)
return response.json()
def add_file(self, path_or_file: T.Union[str, os.PathLike, BinaryFileLike]):
if isinstance(path_or_file, (str, os.PathLike)):
response = self._api_request("POST", self._ADD_FILE_PATH, json={"path": os.fspath(path_or_file)})
else:
response = self._api_request(
"POST",
self._ADD_FILE_PATH,
data=path_or_file.read(),
headers={"Content-Type": "application/octet-stream"},
)
return response.json()
def delete_files(
self,
hashes = None,
file_ids = None,
file_service_name = None,
file_service_key = None,
reason = None
):
if hashes is None and file_ids is None:
raise ValueError("At least one of hashes, file_ids is required")
if file_service_name is not None and file_service_key is not None:
raise ValueError("Exactly one of file_service_name, file_service_key is required")
payload: dict[str, T.Any] = {}
if hashes is not None:
payload["hashes"] = hashes
if file_ids is not None:
payload["file_ids"] = file_ids
if file_service_name is not None:
payload["file_service_name"] = file_service_name
if file_service_key is not None:
payload["file_service_key"] = file_service_key
if reason is not None:
payload["reason"] = reason
self._api_request("POST", self._DELETE_FILES_PATH, json=payload)
def undelete_files(
self,
hashes = None,
file_ids = None,
file_service_name = None,
file_service_key = None,
):
if hashes is None and file_ids is None:
raise ValueError("At least one of hashes, file_ids is required")
if file_service_name is not None and file_service_key is not None:
raise ValueError("Exactly one of file_service_name, file_service_key is required")
payload: dict[str, T.Any] = {}
if hashes is not None:
payload["hashes"] = hashes
if file_ids is not None:
payload["file_ids"] = file_ids
if file_service_name is not None:
payload["file_service_name"] = file_service_name
if file_service_key is not None:
payload["file_service_key"] = file_service_key
self._api_request("POST", self._UNDELETE_FILES_PATH, json=payload)
def archive_files(
self,
hashes = None,
file_ids = None
):
if hashes is None and file_ids is None:
raise ValueError("At least one of hashes, file_ids is required")
payload: dict[str, T.Any] = {}
if hashes is not None:
payload["hashes"] = hashes
if file_ids is not None:
payload["file_ids"] = file_ids
self._api_request("POST", self._ARCHIVE_FILES_PATH, json=payload)
def unarchive_files(
self,
hashes = None,
file_ids = None
):
if hashes is None and file_ids is None:
raise ValueError("At least one of hashes, file_ids is required")
payload: dict[str, T.Any] = {}
if hashes is not None:
payload["hashes"] = hashes
if file_ids is not None:
payload["file_ids"] = file_ids
self._api_request("POST", self._UNARCHIVE_FILES_PATH, json=payload)
def clean_tags(self, tags ):
response = self._api_request("GET", self._CLEAN_TAGS_PATH, params={"tags": json.dumps(tags, cls=JSONEncoder)})
return response.json()["tags"]
def search_tags(
self,
search: str,
tag_service_key = None,
tag_service_name = None
):
if tag_service_name is not None and tag_service_key is not None:
raise ValueError("Exactly one of tag_service_name, tag_service_key is required")
payload: dict[str, T.Any] = {"search": search}
if tag_service_key is not None:
payload["tag_service_key"] = tag_service_key
if tag_service_name is not None:
payload["tag_service_name"] = tag_service_name
response = self._api_request("GET", self._SEARCH_TAGS_PATH, params=payload)
return response.json()["tags"]
def add_tags(
self,
hashes = None,
file_ids = None,
service_names_to_tags = None,
service_keys_to_tags = None,
service_names_to_actions_to_tags = None,
service_keys_to_actions_to_tags = None,
):
if hashes is None and file_ids is None:
raise ValueError("At least one of hashes, file_ids is required")
if (
service_names_to_tags is None
and service_keys_to_tags is None
and service_names_to_actions_to_tags is None
and service_keys_to_actions_to_tags is None
):
raise ValueError(
"At least one of service_names_to_tags, service_keys_to_tags, service_names_to_actions_to_tags or "
"service_keys_to_actions_to_tags is required"
)
payload: dict[str, T.Any] = {}
if hashes is not None:
payload["hashes"] = hashes
if file_ids is not None:
payload["file_ids"] = file_ids
if service_names_to_tags is not None:
payload["service_names_to_tags"] = service_names_to_tags
if service_keys_to_tags is not None:
payload["service_keys_to_tags"] = service_keys_to_tags
if service_names_to_actions_to_tags is not None:
payload["service_names_to_actions_to_tags"] = service_names_to_actions_to_tags
if service_keys_to_actions_to_tags is not None:
payload["service_keys_to_actions_to_tags"] = service_keys_to_actions_to_tags
self._api_request("POST", self._ADD_TAGS_PATH, json=payload)
def get_url_files(self, url: str):
response = self._api_request("GET", self._GET_URL_FILES_PATH, params={"url": url})
return response.json()
def get_url_info(self, url: str):
response = self._api_request("GET", self._GET_URL_INFO_PATH, params={"url": url})
return response.json()
def add_url(
self,
url: str,
destination_page_key = None,
destination_page_name = None,
show_destination_page = None,
service_names_to_additional_tags = None,
service_keys_to_additional_tags = None,
filterable_tags = None,
):
if destination_page_key is not None and destination_page_name is not None:
raise ValueError("Exactly one of destination_page_key, destination_page_name is required")
payload: dict[str, T.Any] = {"url": url}
if destination_page_key is not None:
payload["destination_page_key"] = destination_page_key
if destination_page_name is not None:
payload["destination_page_name"] = destination_page_name
if show_destination_page is not None:
payload["show_destination_page"] = show_destination_page
if service_names_to_additional_tags is not None:
payload["service_names_to_additional_tags"] = service_names_to_additional_tags
if service_keys_to_additional_tags is not None:
payload["service_keys_to_additional_tags"] = service_keys_to_additional_tags
if filterable_tags is not None:
payload["filterable_tags"] = filterable_tags
response = self._api_request("POST", self._ADD_URL_PATH, json=payload)
return response.json()
def associate_url(
self,
hashes = None,
file_ids = None,
urls_to_add = None,
urls_to_delete = None,
):
if hashes is None and file_ids is None:
raise ValueError("At least one of hashes, file_ids is required")
if urls_to_add is None and urls_to_delete is None:
raise ValueError("At least one of urls_to_add, urls_to_delete is required")
payload: dict[str, T.Any] = {}
if hashes is not None:
payload["hashes"] = hashes
if file_ids is not None:
payload["file_ids"] = file_ids
if urls_to_add is not None:
urls_to_add = urls_to_add
payload["urls_to_add"] = urls_to_add
if urls_to_delete is not None:
urls_to_delete = urls_to_delete
payload["urls_to_delete"] = urls_to_delete
self._api_request("POST", self._ASSOCIATE_URL_PATH, json=payload)
def set_notes(self, notes , hash_= None, file_id = None):
if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
raise ValueError("Exactly one of hash_, file_id is required")
payload: dict[str, T.Any] = {"notes": notes}
if hash_ is not None:
payload["hash"] = hash_
if file_id is not None:
payload["file_id"] = file_id
self._api_request("POST", self._SET_NOTES_PATH, json=payload)
def delete_notes(
self,
note_names ,
hash_ = None,
file_id = None
):
if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
raise ValueError("Exactly one of hash_, file_id is required")
payload: dict[str, T.Any] = {"note_names": note_names}
if hash_ is not None:
payload["hash"] = hash_
if file_id is not None:
payload["file_id"] = file_id
self._api_request("POST", self._DELETE_NOTES_PATH, json=payload)
def get_cookies(self, domain: str):
response = self._api_request("GET", self._GET_COOKIES_PATH, params={"domain": domain})
return response.json()["cookies"]
def set_cookies(self, cookies ):
self._api_request("POST", self._SET_COOKIES_PATH, json={"cookies": cookies})
def set_user_agent(self, user_agent: str):
self._api_request("POST", self._SET_USER_AGENT_PATH, json={"user-agent": user_agent})
def get_pages(self):
response = self._api_request("GET", self._GET_PAGES_PATH)
return response.json()["pages"]
def get_page_info(self, page_key: str, simple = None):
parameters = {"page_key": page_key}
if simple is not None:
parameters["simple"] = json.dumps(simple, cls=JSONEncoder)
response = self._api_request("GET", self._GET_PAGE_INFO_PATH, params=parameters)
return response.json()["page_info"]
def add_files_to_page(
self,
page_key: str,
file_ids = None,
hashes = None
):
if file_ids is None and hashes is None:
raise ValueError("At least one of file_ids, hashes is required")
payload: dict[str, T.Any] = {"page_key": page_key}
if file_ids is not None:
payload["file_ids"] = file_ids
if hashes is not None:
payload["hashes"] = hashes
self._api_request("POST", self._ADD_FILES_TO_PAGE_PATH, json=payload)
def focus_page(self, page_key: str):
self._api_request("POST", self._FOCUS_PAGE_PATH, json={"page_key": page_key})
def search_files(
self,
tags,
file_service_name = None,
file_service_key = None,
tag_service_name = None,
tag_service_key = None,
file_sort_type = None,
file_sort_asc = None,
return_hashes = None,
):
if file_service_name is not None and file_service_key is not None:
raise ValueError("Exactly one of file_service_name, file_service_key is required")
if tag_service_name is not None and tag_service_key is not None:
raise ValueError("Exactly one of tag_service_name, tag_service_key is required")
parameters: dict[str, T.Union[str, int]] = {"tags": json.dumps(tags, cls=JSONEncoder)}
if file_service_name is not None:
parameters["file_service_name"] = file_service_name
if file_service_key is not None:
parameters["file_service_key"] = file_service_key
if tag_service_name is not None:
parameters["tag_service_name"] = tag_service_name
if tag_service_key is not None:
parameters["tag_service_key"] = tag_service_key
if file_sort_type is not None:
parameters["file_sort_type"] = file_sort_type
if file_sort_asc is not None:
parameters["file_sort_asc"] = json.dumps(file_sort_asc, cls=JSONEncoder)
if return_hashes is not None:
parameters["return_hashes"] = json.dumps(return_hashes, cls=JSONEncoder)
response = self._api_request("GET", self._SEARCH_FILES_PATH, params=parameters)
return response.json()["hashes" if return_hashes else "file_ids"]
def get_file_metadata(
self,
hashes = None,
file_ids = None,
create_new_file_ids = None,
only_return_identifiers = None,
only_return_basic_information = None,
detailed_url_information = None,
hide_service_name_tags = None,
include_notes = None
):
if hashes is None and file_ids is None:
raise ValueError("At least one of hashes, file_ids is required")
parameters = {}
if hashes is not None:
parameters["hashes"] = json.dumps(hashes, cls=JSONEncoder)
if file_ids is not None:
parameters["file_ids"] = json.dumps(file_ids, cls=JSONEncoder)
if create_new_file_ids is not None:
parameters["create_new_file_ids"] = json.dumps(create_new_file_ids, cls=JSONEncoder)
if only_return_identifiers is not None:
parameters["only_return_identifiers"] = json.dumps(only_return_identifiers, cls=JSONEncoder)
if only_return_basic_information is not None:
parameters["only_return_basic_information"] = json.dumps(only_return_basic_information, cls=JSONEncoder)
if detailed_url_information is not None:
parameters["detailed_url_information"] = json.dumps(detailed_url_information, cls=JSONEncoder)
if hide_service_name_tags is not None:
parameters["hide_service_name_tags"] = json.dumps(hide_service_name_tags, cls=JSONEncoder)
if include_notes is not None:
parameters["include_notes"] = json.dumps(include_notes, cls=JSONEncoder)
response = self._api_request("GET", self._GET_FILE_METADATA_PATH, params=parameters)
return response.json()["metadata"]
def get_file(self, hash_ = None, file_id = None):
if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
raise ValueError("Exactly one of hash_, file_id is required")
parameters: dict[str, T.Union[str, int]] = {}
if hash_ is not None:
parameters["hash"] = hash_
if file_id is not None:
parameters["file_id"] = file_id
return self._api_request("GET", self._GET_FILE_PATH, params=parameters, stream=True)
def get_thumbnail(self, hash_ = None, file_id = None):
if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
raise ValueError("Exactly one of hash_, file_id is required")
parameters: dict[str, T.Union[str, int]] = {}
if hash_ is not None:
parameters["hash"] = hash_
if file_id is not None:
parameters["file_id"] = file_id
return self._api_request("GET", self._GET_THUMBNAIL_PATH, params=parameters, stream=True)
def lock_database(self):
self._api_request("POST", self._LOCK_DATABASE_PATH)
def unlock_database(self):
self._api_request("POST", self._UNLOCK_DATABASE_PATH)
def get_mr_bones(self):
return self._api_request("GET", self._MR_BONES_PATH).json()["boned_stats"]
def add_and_tag_files(
self,
paths_or_files,
tags ,
service_names = None,
service_keys = None,
):
"""Convenience method to add and tag multiple files at the same time.
If service_names and service_keys aren't specified, the default service name "my tags" will be used. If a file
already exists in Hydrus, it will also be tagged.
Returns:
list[dict[str, T.Any]]: Returns results of all `Client.add_file()` calls, matching the order of the
paths_or_files iterable
"""
if service_names is None and service_keys is None:
service_names = ("my tags",)
results = []
hashes = set()
for path_or_file in paths_or_files:
result = self.add_file(path_or_file)
results.append(result)
if result["status"] != ImportStatus.FAILED:
hashes.add(result["hash"])
service_names_to_tags = {name: tags for name in service_names} if service_names is not None else None
service_keys_to_tags = {key: tags for key in service_keys} if service_keys is not None else None
# Ignore type, we know that hashes only contains strings
self.add_tags(hashes, service_names_to_tags=service_names_to_tags, service_keys_to_tags=service_keys_to_tags) # type: ignore
return results
def get_page_list(self):
"""Convenience method that returns a flattened version of the page tree from `Client.get_pages()`.
Returns:
list[dict[str, T.Any]]: A list of every "pages" value in the page tree in pre-order (NLR)
"""
tree = self.get_pages()
pages = []
def walk_tree(page: dict[str, T.Any]):
pages.append(page)
# Ignore type, we know that pages is always a list
for sub_page in page.get("pages", ()): # type: ignore
# Ignore type, we know that sub_page is always a dict
walk_tree(sub_page) # type: ignore
walk_tree(tree)
return pages
__all__ = [
"__version__",
"DEFAULT_API_URL",
"HYDRUS_METADATA_ENCODING",
"HydrusAPIException",
"ConnectionError",
"APIError",
"MissingParameter",
"InsufficientAccess",
"DatabaseLocked",
"ServerError",
"Permission",
"URLType",
"ImportStatus",
"TagAction",
"TagStatus",
"PageType",
"FileSortType",
"Client",
]