2024-03-19 20:48:32 +03:00
|
|
|
from __future__ import annotations
|
|
|
|
|
2024-03-16 12:48:37 +03:00
|
|
|
import logging
|
|
|
|
import json
|
2024-03-19 20:48:32 +03:00
|
|
|
import os.path
|
2024-03-16 12:48:37 +03:00
|
|
|
from typing import Iterator
|
2024-03-19 20:48:32 +03:00
|
|
|
from uuid import uuid4
|
|
|
|
from functools import partial
|
2024-03-16 12:48:37 +03:00
|
|
|
|
|
|
|
try:
|
|
|
|
import webview
|
2024-03-19 20:48:32 +03:00
|
|
|
import platformdirs
|
|
|
|
from plyer import camera
|
|
|
|
from plyer import filechooser
|
2024-03-19 22:28:13 +03:00
|
|
|
app_storage_path = platformdirs.user_pictures_dir
|
2024-03-19 20:48:32 +03:00
|
|
|
has_plyer = True
|
|
|
|
except ImportError:
|
|
|
|
has_plyer = False
|
|
|
|
try:
|
|
|
|
from android.runnable import run_on_ui_thread
|
|
|
|
from android.storage import app_storage_path
|
|
|
|
from android.permissions import request_permissions, Permission
|
|
|
|
from android.permissions import _RequestPermissionsManager
|
|
|
|
_RequestPermissionsManager.register_callback()
|
|
|
|
from .android_gallery import user_select_image
|
|
|
|
has_android = True
|
|
|
|
except ImportError:
|
|
|
|
run_on_ui_thread = lambda a : a
|
|
|
|
user_select_image = partial(
|
|
|
|
filechooser.open_file,
|
|
|
|
path=platformdirs.user_pictures_dir(),
|
|
|
|
filters=[["Image", "*.jpg", "*.jpeg", "*.png", "*.webp", "*.svg"]],
|
|
|
|
)
|
|
|
|
has_android = False
|
2024-03-16 12:48:37 +03:00
|
|
|
|
|
|
|
from g4f import version, models
|
|
|
|
from g4f import get_last_provider, ChatCompletion
|
|
|
|
from g4f.errors import VersionNotFoundError
|
|
|
|
from g4f.Provider import ProviderType, __providers__, __map__
|
|
|
|
from g4f.providers.base_provider import ProviderModelMixin
|
|
|
|
from g4f.Provider.bing.create_images import patch_provider
|
2024-03-16 20:07:53 +03:00
|
|
|
from g4f.Provider.Bing import Conversation
|
|
|
|
|
|
|
|
conversations: dict[str, Conversation] = {}
|
|
|
|
|
|
|
|
class Api():
|
2024-03-16 12:48:37 +03:00
|
|
|
|
|
|
|
def get_models(self) -> list[str]:
|
|
|
|
"""
|
|
|
|
Return a list of all models.
|
|
|
|
|
|
|
|
Fetches and returns a list of all available models in the system.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
List[str]: A list of model names.
|
|
|
|
"""
|
|
|
|
return models._all_models
|
|
|
|
|
|
|
|
def get_provider_models(self, provider: str) -> list[dict]:
|
|
|
|
if provider in __map__:
|
|
|
|
provider: ProviderType = __map__[provider]
|
|
|
|
if issubclass(provider, ProviderModelMixin):
|
|
|
|
return [{"model": model, "default": model == provider.default_model} for model in provider.get_models()]
|
|
|
|
elif provider.supports_gpt_35_turbo or provider.supports_gpt_4:
|
|
|
|
return [
|
|
|
|
*([{"model": "gpt-4", "default": not provider.supports_gpt_4}] if provider.supports_gpt_4 else []),
|
|
|
|
*([{"model": "gpt-3.5-turbo", "default": not provider.supports_gpt_4}] if provider.supports_gpt_35_turbo else [])
|
|
|
|
]
|
|
|
|
else:
|
|
|
|
return [];
|
|
|
|
|
|
|
|
def get_providers(self) -> list[str]:
|
|
|
|
"""
|
|
|
|
Return a list of all working providers.
|
|
|
|
"""
|
|
|
|
return [provider.__name__ for provider in __providers__ if provider.working]
|
|
|
|
|
|
|
|
def get_version(self):
|
|
|
|
"""
|
|
|
|
Returns the current and latest version of the application.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: A dictionary containing the current and latest version.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
current_version = version.utils.current_version
|
|
|
|
except VersionNotFoundError:
|
|
|
|
current_version = None
|
|
|
|
return {
|
|
|
|
"version": current_version,
|
|
|
|
"latest_version": version.utils.latest_version,
|
|
|
|
}
|
|
|
|
|
|
|
|
def generate_title(self):
|
|
|
|
"""
|
|
|
|
Generates and returns a title based on the request data.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: A dictionary with the generated title.
|
|
|
|
"""
|
|
|
|
return {'title': ''}
|
|
|
|
|
|
|
|
def get_conversation(self, options: dict, **kwargs) -> Iterator:
|
2024-03-19 20:48:32 +03:00
|
|
|
window = webview.windows[0]
|
|
|
|
if hasattr(self, "image") and self.image is not None:
|
|
|
|
kwargs["image"] = open(self.image, "rb")
|
2024-03-16 12:48:37 +03:00
|
|
|
for message in self._create_response_stream(
|
2024-03-16 20:07:53 +03:00
|
|
|
self._prepare_conversation_kwargs(options, kwargs),
|
|
|
|
options.get("conversation_id")
|
2024-03-16 12:48:37 +03:00
|
|
|
):
|
2024-03-17 12:51:08 +03:00
|
|
|
if not window.evaluate_js(f"if (!this.abort) this.add_message_chunk({json.dumps(message)}); !this.abort && !this.error;"):
|
|
|
|
break
|
2024-03-19 20:48:32 +03:00
|
|
|
self.image = None
|
|
|
|
self.set_selected(None)
|
|
|
|
|
|
|
|
@run_on_ui_thread
|
|
|
|
def choose_file(self):
|
|
|
|
self.request_permissions()
|
|
|
|
filechooser.open_file(
|
|
|
|
path=platformdirs.user_pictures_dir(),
|
|
|
|
on_selection=print
|
|
|
|
)
|
|
|
|
|
|
|
|
@run_on_ui_thread
|
|
|
|
def choose_image(self):
|
|
|
|
self.request_permissions()
|
|
|
|
user_select_image(
|
|
|
|
on_selection=self.on_image_selection
|
|
|
|
)
|
|
|
|
|
|
|
|
@run_on_ui_thread
|
|
|
|
def take_picture(self):
|
|
|
|
self.request_permissions()
|
|
|
|
filename = os.path.join(app_storage_path(), f"chat-{uuid4()}.png")
|
|
|
|
camera.take_picture(filename=filename, on_complete=self.on_camera)
|
|
|
|
|
|
|
|
def on_image_selection(self, filename):
|
|
|
|
if filename is not None and os.path.exists(filename):
|
|
|
|
self.image = filename
|
|
|
|
else:
|
|
|
|
self.image = None
|
|
|
|
self.set_selected(None if self.image is None else "image")
|
|
|
|
|
|
|
|
def on_camera(self, filename):
|
|
|
|
if filename is not None and os.path.exists(filename):
|
|
|
|
self.image = filename
|
|
|
|
else:
|
|
|
|
self.image = None
|
|
|
|
self.set_selected(None if self.image is None else "camera")
|
|
|
|
|
|
|
|
def set_selected(self, input_id: str = None):
|
|
|
|
window = webview.windows[0]
|
|
|
|
if window is not None:
|
|
|
|
window.evaluate_js(
|
|
|
|
f"document.querySelector(`.file-label.selected`)?.classList.remove(`selected`);"
|
|
|
|
)
|
|
|
|
if input_id is not None and input_id in ("image", "camera"):
|
|
|
|
window.evaluate_js(
|
|
|
|
f'document.querySelector(`label[for="{input_id}"]`)?.classList.add(`selected`);'
|
|
|
|
)
|
|
|
|
|
|
|
|
def request_permissions(self):
|
|
|
|
if has_android:
|
|
|
|
request_permissions([
|
|
|
|
Permission.CAMERA,
|
|
|
|
Permission.READ_EXTERNAL_STORAGE,
|
|
|
|
Permission.WRITE_EXTERNAL_STORAGE
|
|
|
|
])
|
2024-03-16 12:48:37 +03:00
|
|
|
|
|
|
|
def _prepare_conversation_kwargs(self, json_data: dict, kwargs: dict):
|
|
|
|
"""
|
|
|
|
Prepares arguments for chat completion based on the request data.
|
|
|
|
|
|
|
|
Reads the request and prepares the necessary arguments for handling
|
|
|
|
a chat completion request.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
dict: Arguments prepared for chat completion.
|
|
|
|
"""
|
|
|
|
provider = json_data.get('provider', None)
|
|
|
|
if "image" in kwargs and provider is None:
|
|
|
|
provider = "Bing"
|
|
|
|
if provider == 'OpenaiChat':
|
|
|
|
kwargs['auto_continue'] = True
|
|
|
|
|
|
|
|
messages = json_data['messages']
|
|
|
|
if json_data.get('web_search'):
|
|
|
|
if provider == "Bing":
|
|
|
|
kwargs['web_search'] = True
|
|
|
|
else:
|
|
|
|
from .internet import get_search_message
|
|
|
|
messages[-1]["content"] = get_search_message(messages[-1]["content"])
|
|
|
|
|
2024-03-16 20:07:53 +03:00
|
|
|
conversation_id = json_data.get("conversation_id")
|
|
|
|
if conversation_id and conversation_id in conversations:
|
|
|
|
kwargs["conversation"] = conversations[conversation_id]
|
|
|
|
|
2024-03-16 12:48:37 +03:00
|
|
|
model = json_data.get('model')
|
|
|
|
model = model if model else models.default
|
|
|
|
patch = patch_provider if json_data.get('patch_provider') else None
|
|
|
|
|
|
|
|
return {
|
|
|
|
"model": model,
|
|
|
|
"provider": provider,
|
|
|
|
"messages": messages,
|
|
|
|
"stream": True,
|
|
|
|
"ignore_stream": True,
|
|
|
|
"patch_provider": patch,
|
2024-03-16 20:07:53 +03:00
|
|
|
"return_conversation": True,
|
2024-03-16 12:48:37 +03:00
|
|
|
**kwargs
|
|
|
|
}
|
|
|
|
|
2024-03-16 20:07:53 +03:00
|
|
|
def _create_response_stream(self, kwargs, conversation_id: str) -> Iterator:
|
2024-03-16 12:48:37 +03:00
|
|
|
"""
|
|
|
|
Creates and returns a streaming response for the conversation.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
kwargs (dict): Arguments for creating the chat completion.
|
|
|
|
|
|
|
|
Yields:
|
|
|
|
str: JSON formatted response chunks for the stream.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
Exception: If an error occurs during the streaming process.
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
first = True
|
|
|
|
for chunk in ChatCompletion.create(**kwargs):
|
|
|
|
if first:
|
|
|
|
first = False
|
2024-03-16 20:07:53 +03:00
|
|
|
yield self._format_json("provider", get_last_provider(True))
|
|
|
|
if isinstance(chunk, Conversation):
|
|
|
|
conversations[conversation_id] = chunk
|
|
|
|
yield self._format_json("conversation", conversation_id)
|
|
|
|
elif isinstance(chunk, Exception):
|
2024-03-16 12:48:37 +03:00
|
|
|
logging.exception(chunk)
|
2024-03-16 20:07:53 +03:00
|
|
|
yield self._format_json("message", get_error_message(chunk))
|
2024-03-16 12:48:37 +03:00
|
|
|
else:
|
2024-03-16 20:07:53 +03:00
|
|
|
yield self._format_json("content", chunk)
|
2024-03-16 12:48:37 +03:00
|
|
|
except Exception as e:
|
|
|
|
logging.exception(e)
|
|
|
|
yield self._format_json('error', get_error_message(e))
|
|
|
|
|
|
|
|
def _format_json(self, response_type: str, content):
|
|
|
|
"""
|
|
|
|
Formats and returns a JSON response.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
response_type (str): The type of the response.
|
|
|
|
content: The content to be included in the response.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str: A JSON formatted string.
|
|
|
|
"""
|
|
|
|
return {
|
|
|
|
'type': response_type,
|
|
|
|
response_type: content
|
|
|
|
}
|
|
|
|
|
|
|
|
def get_error_message(exception: Exception) -> str:
|
|
|
|
"""
|
|
|
|
Generates a formatted error message from an exception.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
exception (Exception): The exception to format.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str: A formatted error message string.
|
|
|
|
"""
|
|
|
|
return f"{get_last_provider().__name__}: {type(exception).__name__}: {exception}"
|