mirror of
https://github.com/sd-webui/stable-diffusion-webui.git
synced 2024-12-15 07:12:58 +03:00
Added hydrus_api library which will be used to query hydrus for tags.
This commit is contained in:
parent
cc825e55ea
commit
88a0deccca
766
scripts/hydrus_api/__init__.py
Normal file
766
scripts/hydrus_api/__init__.py
Normal file
@ -0,0 +1,766 @@
|
|||||||
|
# Copyright (C) 2021 cryzed
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import enum
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import typing as T
|
||||||
|
from collections import abc
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
__version__ = "4.0.0"
|
||||||
|
|
||||||
|
DEFAULT_API_URL = "http://127.0.0.1:45869/"
|
||||||
|
HYDRUS_METADATA_ENCODING = "utf-8"
|
||||||
|
AUTHENTICATION_TIMEOUT_CODE = 419
|
||||||
|
|
||||||
|
|
||||||
|
class HydrusAPIException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectionError(HydrusAPIException, requests.ConnectTimeout):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class APIError(HydrusAPIException):
|
||||||
|
def __init__(self, response: requests.Response):
|
||||||
|
super().__init__(response.text)
|
||||||
|
self.response = response
|
||||||
|
|
||||||
|
|
||||||
|
class MissingParameter(APIError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class InsufficientAccess(APIError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseLocked(APIError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ServerError(APIError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Customize IntEnum, so we can just do str(Enum.member) to get the string representation of its value unmodified,
|
||||||
|
# without users having to access .value explicitly
|
||||||
|
class StringableIntEnum(enum.IntEnum):
|
||||||
|
def __str__(self):
|
||||||
|
return str(self.value)
|
||||||
|
|
||||||
|
|
||||||
|
@enum.unique
|
||||||
|
class Permission(StringableIntEnum):
|
||||||
|
IMPORT_URLS = 0
|
||||||
|
IMPORT_FILES = 1
|
||||||
|
ADD_TAGS = 2
|
||||||
|
SEARCH_FILES = 3
|
||||||
|
MANAGE_PAGES = 4
|
||||||
|
MANAGE_COOKIES = 5
|
||||||
|
MANAGE_DATABASE = 6
|
||||||
|
ADD_NOTES = 7
|
||||||
|
|
||||||
|
|
||||||
|
@enum.unique
|
||||||
|
class URLType(StringableIntEnum):
|
||||||
|
POST_URL = 0
|
||||||
|
FILE_URL = 2
|
||||||
|
GALLERY_URL = 3
|
||||||
|
WATCHABLE_URL = 4
|
||||||
|
UNKNOWN_URL = 5
|
||||||
|
|
||||||
|
|
||||||
|
@enum.unique
|
||||||
|
class ImportStatus(StringableIntEnum):
|
||||||
|
IMPORTABLE = 0
|
||||||
|
SUCCESS = 1
|
||||||
|
EXISTS = 2
|
||||||
|
PREVIOUSLY_DELETED = 3
|
||||||
|
FAILED = 4
|
||||||
|
VETOED = 7
|
||||||
|
|
||||||
|
|
||||||
|
@enum.unique
|
||||||
|
class TagAction(StringableIntEnum):
|
||||||
|
ADD = 0
|
||||||
|
DELETE = 1
|
||||||
|
PEND = 2
|
||||||
|
RESCIND_PENDING = 3
|
||||||
|
PETITION = 4
|
||||||
|
RESCIND_PETITION = 5
|
||||||
|
|
||||||
|
|
||||||
|
@enum.unique
|
||||||
|
class TagStatus(StringableIntEnum):
|
||||||
|
CURRENT = 0
|
||||||
|
PENDING = 1
|
||||||
|
DELETED = 2
|
||||||
|
PETITIONED = 3
|
||||||
|
|
||||||
|
|
||||||
|
@enum.unique
|
||||||
|
class PageType(StringableIntEnum):
|
||||||
|
GALLERY_DOWNLOADER = 1
|
||||||
|
SIMPLE_DOWNLOADER = 2
|
||||||
|
HARD_DRIVE_IMPORT = 3
|
||||||
|
PETITIONS = 5
|
||||||
|
FILE_SEARCH = 6
|
||||||
|
URL_DOWNLOADER = 7
|
||||||
|
DUPLICATES = 8
|
||||||
|
THREAD_WATCHER = 9
|
||||||
|
PAGE_OF_PAGES = 10
|
||||||
|
|
||||||
|
|
||||||
|
@enum.unique
|
||||||
|
class FileSortType(StringableIntEnum):
|
||||||
|
FILE_SIZE = 0
|
||||||
|
DURATION = 1
|
||||||
|
IMPORT_TIME = 2
|
||||||
|
FILE_TYPE = 3
|
||||||
|
RANDOM = 4
|
||||||
|
WIDTH = 5
|
||||||
|
HEIGHT = 6
|
||||||
|
RATIO = 7
|
||||||
|
NUMBER_OF_PIXELS = 8
|
||||||
|
NUMBER_OF_TAGS = 9
|
||||||
|
NUMBER_OF_MEDIA_VIEWS = 10
|
||||||
|
TOTAL_MEDIA_VIEWTIME = 11
|
||||||
|
APPROXIMATE_BITRATE = 12
|
||||||
|
HAS_AUDIO = 13
|
||||||
|
MODIFIED_TIME = 14
|
||||||
|
FRAMERATE = 15
|
||||||
|
NUMBER_OF_FRAMES = 16
|
||||||
|
|
||||||
|
|
||||||
|
class BinaryFileLike(T.Protocol):
|
||||||
|
def read(self):
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
# The client should accept all objects that either support the iterable or mapping protocol. We must ensure that objects
|
||||||
|
# are either lists or dicts, so Python's json module can handle them
|
||||||
|
class JSONEncoder(json.JSONEncoder):
|
||||||
|
def default(self, object_: T.Any):
|
||||||
|
if isinstance(object_, abc.Mapping):
|
||||||
|
return dict(object_)
|
||||||
|
if isinstance(object_, abc.Iterable):
|
||||||
|
return list(object_)
|
||||||
|
return super().default(object_)
|
||||||
|
|
||||||
|
|
||||||
|
class Client:
|
||||||
|
VERSION = 31
|
||||||
|
|
||||||
|
# Access Management
|
||||||
|
_GET_API_VERSION_PATH = "/api_version"
|
||||||
|
_REQUEST_NEW_PERMISSIONS_PATH = "/request_new_permissions"
|
||||||
|
_GET_SESSION_KEY_PATH = "/session_key"
|
||||||
|
_VERIFY_ACCESS_KEY_PATH = "/verify_access_key"
|
||||||
|
_GET_SERVICES_PATH = "/get_services"
|
||||||
|
|
||||||
|
# Adding Files
|
||||||
|
_ADD_FILE_PATH = "/add_files/add_file"
|
||||||
|
_DELETE_FILES_PATH = "/add_files/delete_files"
|
||||||
|
_UNDELETE_FILES_PATH = "/add_files/undelete_files"
|
||||||
|
_ARCHIVE_FILES_PATH = "/add_files/archive_files"
|
||||||
|
_UNARCHIVE_FILES_PATH = "/add_files/unarchive_files"
|
||||||
|
|
||||||
|
# Adding Tags
|
||||||
|
_CLEAN_TAGS_PATH = "/add_tags/clean_tags"
|
||||||
|
_SEARCH_TAGS_PATH = "/add_tags/search_tags"
|
||||||
|
_ADD_TAGS_PATH = "/add_tags/add_tags"
|
||||||
|
|
||||||
|
# Adding URLs
|
||||||
|
_GET_URL_FILES_PATH = "/add_urls/get_url_files"
|
||||||
|
_GET_URL_INFO_PATH = "/add_urls/get_url_info"
|
||||||
|
_ADD_URL_PATH = "/add_urls/add_url"
|
||||||
|
_ASSOCIATE_URL_PATH = "/add_urls/associate_url"
|
||||||
|
|
||||||
|
# Adding Notes
|
||||||
|
_SET_NOTES_PATH = "/add_notes/set_notes"
|
||||||
|
_DELETE_NOTES_PATH = "/add_notes/delete_notes"
|
||||||
|
|
||||||
|
# Managing Cookies and HTTP Headers
|
||||||
|
_GET_COOKIES_PATH = "/manage_cookies/get_cookies"
|
||||||
|
_SET_COOKIES_PATH = "/manage_cookies/set_cookies"
|
||||||
|
_SET_USER_AGENT_PATH = "/manage_headers/set_user_agent"
|
||||||
|
|
||||||
|
# Managing Pages
|
||||||
|
_GET_PAGES_PATH = "/manage_pages/get_pages"
|
||||||
|
_GET_PAGE_INFO_PATH = "/manage_pages/get_page_info"
|
||||||
|
_ADD_FILES_TO_PAGE_PATH = "/manage_pages/add_files"
|
||||||
|
_FOCUS_PAGE_PATH = "/manage_pages/focus_page"
|
||||||
|
|
||||||
|
# Searching and Fetching Files
|
||||||
|
_SEARCH_FILES_PATH = "/get_files/search_files"
|
||||||
|
_GET_FILE_METADATA_PATH = "/get_files/file_metadata"
|
||||||
|
_GET_FILE_PATH = "/get_files/file"
|
||||||
|
_GET_THUMBNAIL_PATH = "/get_files/thumbnail"
|
||||||
|
|
||||||
|
# Managing the Database
|
||||||
|
_LOCK_DATABASE_PATH = "/manage_database/lock_on"
|
||||||
|
_UNLOCK_DATABASE_PATH = "/manage_database/lock_off"
|
||||||
|
_MR_BONES_PATH = "/manage_database/mr_bones"
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
access_key = None,
|
||||||
|
api_url: str = DEFAULT_API_URL,
|
||||||
|
session = None,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
See https://hydrusnetwork.github.io/hydrus/help/client_api.html for documentation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.access_key = access_key
|
||||||
|
self.api_url = api_url.rstrip("/")
|
||||||
|
self.session = session or requests.Session()
|
||||||
|
|
||||||
|
def _api_request(self, method: str, path: str, **kwargs: T.Any):
|
||||||
|
if self.access_key is not None:
|
||||||
|
kwargs.setdefault("headers", {}).update({"Hydrus-Client-API-Access-Key": self.access_key})
|
||||||
|
|
||||||
|
# Make sure we use our custom JSONEncoder that can serialize all objects that implement the iterable or mapping
|
||||||
|
# protocol
|
||||||
|
json_data = kwargs.pop("json", None)
|
||||||
|
if json_data is not None:
|
||||||
|
kwargs["data"] = json.dumps(json_data, cls=JSONEncoder)
|
||||||
|
# Since we aren't using the json keyword-argument, we have to set the Content-Type manually
|
||||||
|
kwargs["headers"]["Content-Type"] = "application/json"
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = self.session.request(method, self.api_url + path, **kwargs)
|
||||||
|
except requests.RequestException as error:
|
||||||
|
# Re-raise connection and timeout errors as hydrus.ConnectionErrors so these are more easy to handle for
|
||||||
|
# client applications
|
||||||
|
raise ConnectionError(*error.args)
|
||||||
|
|
||||||
|
try:
|
||||||
|
response.raise_for_status()
|
||||||
|
except requests.HTTPError:
|
||||||
|
if response.status_code == requests.codes.bad_request:
|
||||||
|
raise MissingParameter(response)
|
||||||
|
elif response.status_code in {
|
||||||
|
requests.codes.unauthorized,
|
||||||
|
requests.codes.forbidden,
|
||||||
|
AUTHENTICATION_TIMEOUT_CODE,
|
||||||
|
}:
|
||||||
|
raise InsufficientAccess(response)
|
||||||
|
elif response.status_code == requests.codes.service_unavailable:
|
||||||
|
raise DatabaseLocked(response)
|
||||||
|
elif response.status_code == requests.codes.server_error:
|
||||||
|
raise ServerError(response)
|
||||||
|
raise APIError(response)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
def get_api_version(self):
|
||||||
|
response = self._api_request("GET", self._GET_API_VERSION_PATH)
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def request_new_permissions(self, name, permissions):
|
||||||
|
response = self._api_request(
|
||||||
|
"GET",
|
||||||
|
self._REQUEST_NEW_PERMISSIONS_PATH,
|
||||||
|
params={"name": name, "basic_permissions": json.dumps(permissions, cls=JSONEncoder)},
|
||||||
|
)
|
||||||
|
return response.json()["access_key"]
|
||||||
|
|
||||||
|
def get_session_key(self):
|
||||||
|
response = self._api_request("GET", self._GET_SESSION_KEY_PATH)
|
||||||
|
return response.json()["session_key"]
|
||||||
|
|
||||||
|
def verify_access_key(self):
|
||||||
|
response = self._api_request("GET", self._VERIFY_ACCESS_KEY_PATH)
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def get_services(self):
|
||||||
|
response = self._api_request("GET", self._GET_SERVICES_PATH)
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def add_file(self, path_or_file: T.Union[str, os.PathLike, BinaryFileLike]):
|
||||||
|
if isinstance(path_or_file, (str, os.PathLike)):
|
||||||
|
response = self._api_request("POST", self._ADD_FILE_PATH, json={"path": os.fspath(path_or_file)})
|
||||||
|
else:
|
||||||
|
response = self._api_request(
|
||||||
|
"POST",
|
||||||
|
self._ADD_FILE_PATH,
|
||||||
|
data=path_or_file.read(),
|
||||||
|
headers={"Content-Type": "application/octet-stream"},
|
||||||
|
)
|
||||||
|
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def delete_files(
|
||||||
|
self,
|
||||||
|
hashes = None,
|
||||||
|
file_ids = None,
|
||||||
|
file_service_name = None,
|
||||||
|
file_service_key = None,
|
||||||
|
reason = None
|
||||||
|
):
|
||||||
|
if hashes is None and file_ids is None:
|
||||||
|
raise ValueError("At least one of hashes, file_ids is required")
|
||||||
|
if file_service_name is not None and file_service_key is not None:
|
||||||
|
raise ValueError("Exactly one of file_service_name, file_service_key is required")
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {}
|
||||||
|
if hashes is not None:
|
||||||
|
payload["hashes"] = hashes
|
||||||
|
if file_ids is not None:
|
||||||
|
payload["file_ids"] = file_ids
|
||||||
|
if file_service_name is not None:
|
||||||
|
payload["file_service_name"] = file_service_name
|
||||||
|
if file_service_key is not None:
|
||||||
|
payload["file_service_key"] = file_service_key
|
||||||
|
if reason is not None:
|
||||||
|
payload["reason"] = reason
|
||||||
|
|
||||||
|
self._api_request("POST", self._DELETE_FILES_PATH, json=payload)
|
||||||
|
|
||||||
|
def undelete_files(
|
||||||
|
self,
|
||||||
|
hashes = None,
|
||||||
|
file_ids = None,
|
||||||
|
file_service_name = None,
|
||||||
|
file_service_key = None,
|
||||||
|
):
|
||||||
|
if hashes is None and file_ids is None:
|
||||||
|
raise ValueError("At least one of hashes, file_ids is required")
|
||||||
|
if file_service_name is not None and file_service_key is not None:
|
||||||
|
raise ValueError("Exactly one of file_service_name, file_service_key is required")
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {}
|
||||||
|
if hashes is not None:
|
||||||
|
payload["hashes"] = hashes
|
||||||
|
if file_ids is not None:
|
||||||
|
payload["file_ids"] = file_ids
|
||||||
|
if file_service_name is not None:
|
||||||
|
payload["file_service_name"] = file_service_name
|
||||||
|
if file_service_key is not None:
|
||||||
|
payload["file_service_key"] = file_service_key
|
||||||
|
|
||||||
|
self._api_request("POST", self._UNDELETE_FILES_PATH, json=payload)
|
||||||
|
|
||||||
|
def archive_files(
|
||||||
|
self,
|
||||||
|
hashes = None,
|
||||||
|
file_ids = None
|
||||||
|
):
|
||||||
|
if hashes is None and file_ids is None:
|
||||||
|
raise ValueError("At least one of hashes, file_ids is required")
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {}
|
||||||
|
if hashes is not None:
|
||||||
|
payload["hashes"] = hashes
|
||||||
|
if file_ids is not None:
|
||||||
|
payload["file_ids"] = file_ids
|
||||||
|
|
||||||
|
self._api_request("POST", self._ARCHIVE_FILES_PATH, json=payload)
|
||||||
|
|
||||||
|
def unarchive_files(
|
||||||
|
self,
|
||||||
|
hashes = None,
|
||||||
|
file_ids = None
|
||||||
|
):
|
||||||
|
if hashes is None and file_ids is None:
|
||||||
|
raise ValueError("At least one of hashes, file_ids is required")
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {}
|
||||||
|
if hashes is not None:
|
||||||
|
payload["hashes"] = hashes
|
||||||
|
if file_ids is not None:
|
||||||
|
payload["file_ids"] = file_ids
|
||||||
|
|
||||||
|
self._api_request("POST", self._UNARCHIVE_FILES_PATH, json=payload)
|
||||||
|
|
||||||
|
def clean_tags(self, tags ):
|
||||||
|
response = self._api_request("GET", self._CLEAN_TAGS_PATH, params={"tags": json.dumps(tags, cls=JSONEncoder)})
|
||||||
|
return response.json()["tags"]
|
||||||
|
|
||||||
|
def search_tags(
|
||||||
|
self,
|
||||||
|
search: str,
|
||||||
|
tag_service_key = None,
|
||||||
|
tag_service_name = None
|
||||||
|
):
|
||||||
|
if tag_service_name is not None and tag_service_key is not None:
|
||||||
|
raise ValueError("Exactly one of tag_service_name, tag_service_key is required")
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {"search": search}
|
||||||
|
if tag_service_key is not None:
|
||||||
|
payload["tag_service_key"] = tag_service_key
|
||||||
|
if tag_service_name is not None:
|
||||||
|
payload["tag_service_name"] = tag_service_name
|
||||||
|
|
||||||
|
response = self._api_request("GET", self._SEARCH_TAGS_PATH, params=payload)
|
||||||
|
return response.json()["tags"]
|
||||||
|
|
||||||
|
def add_tags(
|
||||||
|
self,
|
||||||
|
hashes = None,
|
||||||
|
file_ids = None,
|
||||||
|
service_names_to_tags = None,
|
||||||
|
service_keys_to_tags = None,
|
||||||
|
service_names_to_actions_to_tags = None,
|
||||||
|
service_keys_to_actions_to_tags = None,
|
||||||
|
):
|
||||||
|
if hashes is None and file_ids is None:
|
||||||
|
raise ValueError("At least one of hashes, file_ids is required")
|
||||||
|
if (
|
||||||
|
service_names_to_tags is None
|
||||||
|
and service_keys_to_tags is None
|
||||||
|
and service_names_to_actions_to_tags is None
|
||||||
|
and service_keys_to_actions_to_tags is None
|
||||||
|
):
|
||||||
|
raise ValueError(
|
||||||
|
"At least one of service_names_to_tags, service_keys_to_tags, service_names_to_actions_to_tags or "
|
||||||
|
"service_keys_to_actions_to_tags is required"
|
||||||
|
)
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {}
|
||||||
|
if hashes is not None:
|
||||||
|
payload["hashes"] = hashes
|
||||||
|
if file_ids is not None:
|
||||||
|
payload["file_ids"] = file_ids
|
||||||
|
if service_names_to_tags is not None:
|
||||||
|
payload["service_names_to_tags"] = service_names_to_tags
|
||||||
|
if service_keys_to_tags is not None:
|
||||||
|
payload["service_keys_to_tags"] = service_keys_to_tags
|
||||||
|
if service_names_to_actions_to_tags is not None:
|
||||||
|
payload["service_names_to_actions_to_tags"] = service_names_to_actions_to_tags
|
||||||
|
if service_keys_to_actions_to_tags is not None:
|
||||||
|
payload["service_keys_to_actions_to_tags"] = service_keys_to_actions_to_tags
|
||||||
|
|
||||||
|
self._api_request("POST", self._ADD_TAGS_PATH, json=payload)
|
||||||
|
|
||||||
|
def get_url_files(self, url: str):
|
||||||
|
response = self._api_request("GET", self._GET_URL_FILES_PATH, params={"url": url})
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def get_url_info(self, url: str):
|
||||||
|
response = self._api_request("GET", self._GET_URL_INFO_PATH, params={"url": url})
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def add_url(
|
||||||
|
self,
|
||||||
|
url: str,
|
||||||
|
destination_page_key = None,
|
||||||
|
destination_page_name = None,
|
||||||
|
show_destination_page = None,
|
||||||
|
service_names_to_additional_tags = None,
|
||||||
|
service_keys_to_additional_tags = None,
|
||||||
|
filterable_tags = None,
|
||||||
|
):
|
||||||
|
if destination_page_key is not None and destination_page_name is not None:
|
||||||
|
raise ValueError("Exactly one of destination_page_key, destination_page_name is required")
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {"url": url}
|
||||||
|
if destination_page_key is not None:
|
||||||
|
payload["destination_page_key"] = destination_page_key
|
||||||
|
if destination_page_name is not None:
|
||||||
|
payload["destination_page_name"] = destination_page_name
|
||||||
|
if show_destination_page is not None:
|
||||||
|
payload["show_destination_page"] = show_destination_page
|
||||||
|
if service_names_to_additional_tags is not None:
|
||||||
|
payload["service_names_to_additional_tags"] = service_names_to_additional_tags
|
||||||
|
if service_keys_to_additional_tags is not None:
|
||||||
|
payload["service_keys_to_additional_tags"] = service_keys_to_additional_tags
|
||||||
|
if filterable_tags is not None:
|
||||||
|
payload["filterable_tags"] = filterable_tags
|
||||||
|
|
||||||
|
response = self._api_request("POST", self._ADD_URL_PATH, json=payload)
|
||||||
|
return response.json()
|
||||||
|
|
||||||
|
def associate_url(
|
||||||
|
self,
|
||||||
|
hashes = None,
|
||||||
|
file_ids = None,
|
||||||
|
urls_to_add = None,
|
||||||
|
urls_to_delete = None,
|
||||||
|
):
|
||||||
|
if hashes is None and file_ids is None:
|
||||||
|
raise ValueError("At least one of hashes, file_ids is required")
|
||||||
|
if urls_to_add is None and urls_to_delete is None:
|
||||||
|
raise ValueError("At least one of urls_to_add, urls_to_delete is required")
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {}
|
||||||
|
if hashes is not None:
|
||||||
|
payload["hashes"] = hashes
|
||||||
|
if file_ids is not None:
|
||||||
|
payload["file_ids"] = file_ids
|
||||||
|
if urls_to_add is not None:
|
||||||
|
urls_to_add = urls_to_add
|
||||||
|
payload["urls_to_add"] = urls_to_add
|
||||||
|
if urls_to_delete is not None:
|
||||||
|
urls_to_delete = urls_to_delete
|
||||||
|
payload["urls_to_delete"] = urls_to_delete
|
||||||
|
|
||||||
|
self._api_request("POST", self._ASSOCIATE_URL_PATH, json=payload)
|
||||||
|
|
||||||
|
def set_notes(self, notes , hash_= None, file_id = None):
|
||||||
|
if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
|
||||||
|
raise ValueError("Exactly one of hash_, file_id is required")
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {"notes": notes}
|
||||||
|
if hash_ is not None:
|
||||||
|
payload["hash"] = hash_
|
||||||
|
if file_id is not None:
|
||||||
|
payload["file_id"] = file_id
|
||||||
|
|
||||||
|
self._api_request("POST", self._SET_NOTES_PATH, json=payload)
|
||||||
|
|
||||||
|
def delete_notes(
|
||||||
|
self,
|
||||||
|
note_names ,
|
||||||
|
hash_ = None,
|
||||||
|
file_id = None
|
||||||
|
):
|
||||||
|
if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
|
||||||
|
raise ValueError("Exactly one of hash_, file_id is required")
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {"note_names": note_names}
|
||||||
|
if hash_ is not None:
|
||||||
|
payload["hash"] = hash_
|
||||||
|
if file_id is not None:
|
||||||
|
payload["file_id"] = file_id
|
||||||
|
|
||||||
|
self._api_request("POST", self._DELETE_NOTES_PATH, json=payload)
|
||||||
|
|
||||||
|
def get_cookies(self, domain: str):
|
||||||
|
response = self._api_request("GET", self._GET_COOKIES_PATH, params={"domain": domain})
|
||||||
|
return response.json()["cookies"]
|
||||||
|
|
||||||
|
def set_cookies(self, cookies ):
|
||||||
|
self._api_request("POST", self._SET_COOKIES_PATH, json={"cookies": cookies})
|
||||||
|
|
||||||
|
def set_user_agent(self, user_agent: str):
|
||||||
|
self._api_request("POST", self._SET_USER_AGENT_PATH, json={"user-agent": user_agent})
|
||||||
|
|
||||||
|
def get_pages(self):
|
||||||
|
response = self._api_request("GET", self._GET_PAGES_PATH)
|
||||||
|
return response.json()["pages"]
|
||||||
|
|
||||||
|
def get_page_info(self, page_key: str, simple = None):
|
||||||
|
parameters = {"page_key": page_key}
|
||||||
|
if simple is not None:
|
||||||
|
parameters["simple"] = json.dumps(simple, cls=JSONEncoder)
|
||||||
|
|
||||||
|
response = self._api_request("GET", self._GET_PAGE_INFO_PATH, params=parameters)
|
||||||
|
return response.json()["page_info"]
|
||||||
|
|
||||||
|
def add_files_to_page(
|
||||||
|
self,
|
||||||
|
page_key: str,
|
||||||
|
file_ids = None,
|
||||||
|
hashes = None
|
||||||
|
):
|
||||||
|
if file_ids is None and hashes is None:
|
||||||
|
raise ValueError("At least one of file_ids, hashes is required")
|
||||||
|
|
||||||
|
payload: dict[str, T.Any] = {"page_key": page_key}
|
||||||
|
if file_ids is not None:
|
||||||
|
payload["file_ids"] = file_ids
|
||||||
|
if hashes is not None:
|
||||||
|
payload["hashes"] = hashes
|
||||||
|
|
||||||
|
self._api_request("POST", self._ADD_FILES_TO_PAGE_PATH, json=payload)
|
||||||
|
|
||||||
|
def focus_page(self, page_key: str):
|
||||||
|
self._api_request("POST", self._FOCUS_PAGE_PATH, json={"page_key": page_key})
|
||||||
|
|
||||||
|
def search_files(
|
||||||
|
self,
|
||||||
|
tags,
|
||||||
|
file_service_name = None,
|
||||||
|
file_service_key = None,
|
||||||
|
tag_service_name = None,
|
||||||
|
tag_service_key = None,
|
||||||
|
file_sort_type = None,
|
||||||
|
file_sort_asc = None,
|
||||||
|
return_hashes = None,
|
||||||
|
):
|
||||||
|
if file_service_name is not None and file_service_key is not None:
|
||||||
|
raise ValueError("Exactly one of file_service_name, file_service_key is required")
|
||||||
|
if tag_service_name is not None and tag_service_key is not None:
|
||||||
|
raise ValueError("Exactly one of tag_service_name, tag_service_key is required")
|
||||||
|
|
||||||
|
parameters: dict[str, T.Union[str, int]] = {"tags": json.dumps(tags, cls=JSONEncoder)}
|
||||||
|
if file_service_name is not None:
|
||||||
|
parameters["file_service_name"] = file_service_name
|
||||||
|
if file_service_key is not None:
|
||||||
|
parameters["file_service_key"] = file_service_key
|
||||||
|
|
||||||
|
if tag_service_name is not None:
|
||||||
|
parameters["tag_service_name"] = tag_service_name
|
||||||
|
if tag_service_key is not None:
|
||||||
|
parameters["tag_service_key"] = tag_service_key
|
||||||
|
|
||||||
|
if file_sort_type is not None:
|
||||||
|
parameters["file_sort_type"] = file_sort_type
|
||||||
|
if file_sort_asc is not None:
|
||||||
|
parameters["file_sort_asc"] = json.dumps(file_sort_asc, cls=JSONEncoder)
|
||||||
|
if return_hashes is not None:
|
||||||
|
parameters["return_hashes"] = json.dumps(return_hashes, cls=JSONEncoder)
|
||||||
|
|
||||||
|
response = self._api_request("GET", self._SEARCH_FILES_PATH, params=parameters)
|
||||||
|
return response.json()["hashes" if return_hashes else "file_ids"]
|
||||||
|
|
||||||
|
def get_file_metadata(
|
||||||
|
self,
|
||||||
|
hashes = None,
|
||||||
|
file_ids = None,
|
||||||
|
create_new_file_ids = None,
|
||||||
|
only_return_identifiers = None,
|
||||||
|
only_return_basic_information = None,
|
||||||
|
detailed_url_information = None,
|
||||||
|
hide_service_name_tags = None,
|
||||||
|
include_notes = None
|
||||||
|
):
|
||||||
|
if hashes is None and file_ids is None:
|
||||||
|
raise ValueError("At least one of hashes, file_ids is required")
|
||||||
|
|
||||||
|
parameters = {}
|
||||||
|
if hashes is not None:
|
||||||
|
parameters["hashes"] = json.dumps(hashes, cls=JSONEncoder)
|
||||||
|
if file_ids is not None:
|
||||||
|
parameters["file_ids"] = json.dumps(file_ids, cls=JSONEncoder)
|
||||||
|
|
||||||
|
if create_new_file_ids is not None:
|
||||||
|
parameters["create_new_file_ids"] = json.dumps(create_new_file_ids, cls=JSONEncoder)
|
||||||
|
if only_return_identifiers is not None:
|
||||||
|
parameters["only_return_identifiers"] = json.dumps(only_return_identifiers, cls=JSONEncoder)
|
||||||
|
if only_return_basic_information is not None:
|
||||||
|
parameters["only_return_basic_information"] = json.dumps(only_return_basic_information, cls=JSONEncoder)
|
||||||
|
if detailed_url_information is not None:
|
||||||
|
parameters["detailed_url_information"] = json.dumps(detailed_url_information, cls=JSONEncoder)
|
||||||
|
if hide_service_name_tags is not None:
|
||||||
|
parameters["hide_service_name_tags"] = json.dumps(hide_service_name_tags, cls=JSONEncoder)
|
||||||
|
if include_notes is not None:
|
||||||
|
parameters["include_notes"] = json.dumps(include_notes, cls=JSONEncoder)
|
||||||
|
|
||||||
|
response = self._api_request("GET", self._GET_FILE_METADATA_PATH, params=parameters)
|
||||||
|
return response.json()["metadata"]
|
||||||
|
|
||||||
|
def get_file(self, hash_ = None, file_id = None):
|
||||||
|
if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
|
||||||
|
raise ValueError("Exactly one of hash_, file_id is required")
|
||||||
|
|
||||||
|
parameters: dict[str, T.Union[str, int]] = {}
|
||||||
|
if hash_ is not None:
|
||||||
|
parameters["hash"] = hash_
|
||||||
|
if file_id is not None:
|
||||||
|
parameters["file_id"] = file_id
|
||||||
|
|
||||||
|
return self._api_request("GET", self._GET_FILE_PATH, params=parameters, stream=True)
|
||||||
|
|
||||||
|
def get_thumbnail(self, hash_ = None, file_id = None):
|
||||||
|
if (hash_ is None and file_id is None) or (hash_ is not None and file_id is not None):
|
||||||
|
raise ValueError("Exactly one of hash_, file_id is required")
|
||||||
|
|
||||||
|
parameters: dict[str, T.Union[str, int]] = {}
|
||||||
|
if hash_ is not None:
|
||||||
|
parameters["hash"] = hash_
|
||||||
|
if file_id is not None:
|
||||||
|
parameters["file_id"] = file_id
|
||||||
|
|
||||||
|
return self._api_request("GET", self._GET_THUMBNAIL_PATH, params=parameters, stream=True)
|
||||||
|
|
||||||
|
def lock_database(self):
|
||||||
|
self._api_request("POST", self._LOCK_DATABASE_PATH)
|
||||||
|
|
||||||
|
def unlock_database(self):
|
||||||
|
self._api_request("POST", self._UNLOCK_DATABASE_PATH)
|
||||||
|
|
||||||
|
def get_mr_bones(self):
|
||||||
|
return self._api_request("GET", self._MR_BONES_PATH).json()["boned_stats"]
|
||||||
|
|
||||||
|
def add_and_tag_files(
|
||||||
|
self,
|
||||||
|
paths_or_files,
|
||||||
|
tags ,
|
||||||
|
service_names = None,
|
||||||
|
service_keys = None,
|
||||||
|
):
|
||||||
|
"""Convenience method to add and tag multiple files at the same time.
|
||||||
|
|
||||||
|
If service_names and service_keys aren't specified, the default service name "my tags" will be used. If a file
|
||||||
|
already exists in Hydrus, it will also be tagged.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[dict[str, T.Any]]: Returns results of all `Client.add_file()` calls, matching the order of the
|
||||||
|
paths_or_files iterable
|
||||||
|
"""
|
||||||
|
if service_names is None and service_keys is None:
|
||||||
|
service_names = ("my tags",)
|
||||||
|
|
||||||
|
results = []
|
||||||
|
hashes = set()
|
||||||
|
for path_or_file in paths_or_files:
|
||||||
|
result = self.add_file(path_or_file)
|
||||||
|
results.append(result)
|
||||||
|
if result["status"] != ImportStatus.FAILED:
|
||||||
|
hashes.add(result["hash"])
|
||||||
|
|
||||||
|
service_names_to_tags = {name: tags for name in service_names} if service_names is not None else None
|
||||||
|
service_keys_to_tags = {key: tags for key in service_keys} if service_keys is not None else None
|
||||||
|
# Ignore type, we know that hashes only contains strings
|
||||||
|
self.add_tags(hashes, service_names_to_tags=service_names_to_tags, service_keys_to_tags=service_keys_to_tags) # type: ignore
|
||||||
|
return results
|
||||||
|
|
||||||
|
def get_page_list(self):
|
||||||
|
"""Convenience method that returns a flattened version of the page tree from `Client.get_pages()`.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list[dict[str, T.Any]]: A list of every "pages" value in the page tree in pre-order (NLR)
|
||||||
|
"""
|
||||||
|
tree = self.get_pages()
|
||||||
|
pages = []
|
||||||
|
|
||||||
|
def walk_tree(page: dict[str, T.Any]):
|
||||||
|
pages.append(page)
|
||||||
|
# Ignore type, we know that pages is always a list
|
||||||
|
for sub_page in page.get("pages", ()): # type: ignore
|
||||||
|
# Ignore type, we know that sub_page is always a dict
|
||||||
|
walk_tree(sub_page) # type: ignore
|
||||||
|
|
||||||
|
walk_tree(tree)
|
||||||
|
return pages
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"__version__",
|
||||||
|
"DEFAULT_API_URL",
|
||||||
|
"HYDRUS_METADATA_ENCODING",
|
||||||
|
"HydrusAPIException",
|
||||||
|
"ConnectionError",
|
||||||
|
"APIError",
|
||||||
|
"MissingParameter",
|
||||||
|
"InsufficientAccess",
|
||||||
|
"DatabaseLocked",
|
||||||
|
"ServerError",
|
||||||
|
"Permission",
|
||||||
|
"URLType",
|
||||||
|
"ImportStatus",
|
||||||
|
"TagAction",
|
||||||
|
"TagStatus",
|
||||||
|
"PageType",
|
||||||
|
"FileSortType",
|
||||||
|
"Client",
|
||||||
|
]
|
102
scripts/hydrus_api/utils.py
Normal file
102
scripts/hydrus_api/utils.py
Normal file
@ -0,0 +1,102 @@
|
|||||||
|
# Copyright (C) 2021 cryzed
|
||||||
|
#
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as
|
||||||
|
# published by the Free Software Foundation, either version 3 of the
|
||||||
|
# License, or (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import collections
|
||||||
|
import os
|
||||||
|
import typing as T
|
||||||
|
from collections import abc
|
||||||
|
|
||||||
|
from hydrus_api import DEFAULT_API_URL, HYDRUS_METADATA_ENCODING, Client, Permission
|
||||||
|
|
||||||
|
X = T.TypeVar("X")
|
||||||
|
|
||||||
|
|
||||||
|
class TextFileLike(T.Protocol):
|
||||||
|
def read(self) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def verify_permissions(
|
||||||
|
client: Client, permissions: abc.Iterable[T.Union[int, Permission]], exact: bool = False
|
||||||
|
) -> bool:
|
||||||
|
granted_permissions = set(client.verify_access_key()["basic_permissions"])
|
||||||
|
return granted_permissions == set(permissions) if exact else granted_permissions.issuperset(permissions)
|
||||||
|
|
||||||
|
|
||||||
|
def cli_request_api_key(
|
||||||
|
name: str,
|
||||||
|
permissions: abc.Iterable[T.Union[int, Permission]],
|
||||||
|
verify: bool = True,
|
||||||
|
exact: bool = False,
|
||||||
|
api_url: str = DEFAULT_API_URL,
|
||||||
|
) -> str:
|
||||||
|
while True:
|
||||||
|
input(
|
||||||
|
'Navigate to "services->review services->local->client api" in the Hydrus client and click "add->from api '
|
||||||
|
'request". Then press enter to continue...'
|
||||||
|
)
|
||||||
|
access_key = Client(api_url=api_url).request_new_permissions(name, permissions)
|
||||||
|
input("Press OK and then apply in the Hydrus client dialog. Then press enter to continue...")
|
||||||
|
|
||||||
|
client = Client(access_key, api_url)
|
||||||
|
if verify and not verify_permissions(client, permissions, exact):
|
||||||
|
granted = client.verify_access_key()["basic_permissions"]
|
||||||
|
print(
|
||||||
|
f"The granted permissions ({granted}) differ from the requested permissions ({permissions}), please "
|
||||||
|
"grant all requested permissions."
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
return access_key
|
||||||
|
|
||||||
|
|
||||||
|
def parse_hydrus_metadata(text: str) -> collections.defaultdict[T.Optional[str], set[str]]:
|
||||||
|
namespaces = collections.defaultdict(set)
|
||||||
|
for line in (line.strip() for line in text.splitlines()):
|
||||||
|
if not line:
|
||||||
|
continue
|
||||||
|
|
||||||
|
parts = line.split(":", 1)
|
||||||
|
namespace, tag = (None, line) if len(parts) == 1 else parts
|
||||||
|
namespaces[namespace].add(tag)
|
||||||
|
|
||||||
|
# Ignore type, mypy has trouble figuring out that tag isn't optional
|
||||||
|
return namespaces # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
def parse_hydrus_metadata_file(
|
||||||
|
path_or_file: T.Union[str, os.PathLike, TextFileLike]
|
||||||
|
) -> collections.defaultdict[T.Optional[str], set[str]]:
|
||||||
|
if isinstance(path_or_file, (str, os.PathLike)):
|
||||||
|
with open(path_or_file, encoding=HYDRUS_METADATA_ENCODING) as file:
|
||||||
|
return parse_hydrus_metadata(file.read())
|
||||||
|
|
||||||
|
return parse_hydrus_metadata(path_or_file.read())
|
||||||
|
|
||||||
|
|
||||||
|
# Useful for splitting up requests to get_file_metadata()
|
||||||
|
def yield_chunks(sequence: T.Sequence[X], chunk_size: int, offset: int = 0) -> T.Generator[T.Sequence[X], None, None]:
|
||||||
|
while offset < len(sequence):
|
||||||
|
yield sequence[offset : offset + chunk_size]
|
||||||
|
offset += chunk_size
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"verify_permissions",
|
||||||
|
"cli_request_api_key",
|
||||||
|
"parse_hydrus_metadata",
|
||||||
|
"parse_hydrus_metadata_file",
|
||||||
|
"yield_chunks",
|
||||||
|
]
|
Loading…
Reference in New Issue
Block a user