mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-12-26 12:41:56 +03:00
101 lines
3.7 KiB
Python
101 lines
3.7 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import logging
|
|
|
|
from . import debug, version
|
|
from .models import Model
|
|
from .client import Client, AsyncClient
|
|
from .typing import Messages, CreateResult, AsyncResult, Union
|
|
from .errors import StreamNotSupportedError, ModelNotAllowedError
|
|
from .cookies import get_cookies, set_cookies
|
|
from .providers.types import ProviderType
|
|
from .providers.base_provider import AsyncGeneratorProvider
|
|
from .client.service import get_model_and_provider, get_last_provider
|
|
|
|
#Configure "g4f" logger
|
|
logger = logging.getLogger(__name__)
|
|
log_handler = logging.StreamHandler()
|
|
log_handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
|
|
logger.addHandler(log_handler)
|
|
|
|
logger.setLevel(logging.ERROR)
|
|
|
|
class ChatCompletion:
|
|
@staticmethod
|
|
def create(model : Union[Model, str],
|
|
messages : Messages,
|
|
provider : Union[ProviderType, str, None] = None,
|
|
stream : bool = False,
|
|
auth : Union[str, None] = None,
|
|
ignored : list[str] = None,
|
|
ignore_working: bool = False,
|
|
ignore_stream: bool = False,
|
|
patch_provider: callable = None,
|
|
**kwargs) -> Union[CreateResult, str]:
|
|
model, provider = get_model_and_provider(
|
|
model, provider, stream,
|
|
ignored, ignore_working,
|
|
ignore_stream or kwargs.get("ignore_stream_and_auth")
|
|
)
|
|
|
|
if auth is not None:
|
|
kwargs['auth'] = auth
|
|
|
|
if "proxy" not in kwargs:
|
|
proxy = os.environ.get("G4F_PROXY")
|
|
if proxy:
|
|
kwargs['proxy'] = proxy
|
|
|
|
if patch_provider:
|
|
provider = patch_provider(provider)
|
|
|
|
result = provider.create_completion(model, messages, stream=stream, **kwargs)
|
|
|
|
return result if stream else ''.join([str(chunk) for chunk in result])
|
|
|
|
@staticmethod
|
|
def create_async(model : Union[Model, str],
|
|
messages : Messages,
|
|
provider : Union[ProviderType, str, None] = None,
|
|
stream : bool = False,
|
|
ignored : list[str] = None,
|
|
ignore_working: bool = False,
|
|
patch_provider: callable = None,
|
|
**kwargs) -> Union[AsyncResult, str]:
|
|
model, provider = get_model_and_provider(model, provider, False, ignored, ignore_working)
|
|
|
|
if stream:
|
|
if isinstance(provider, type) and issubclass(provider, AsyncGeneratorProvider):
|
|
return provider.create_async_generator(model, messages, **kwargs)
|
|
raise StreamNotSupportedError(f'{provider.__name__} does not support "stream" argument in "create_async"')
|
|
|
|
if patch_provider:
|
|
provider = patch_provider(provider)
|
|
|
|
return provider.create_async(model, messages, **kwargs)
|
|
|
|
class Completion:
|
|
@staticmethod
|
|
def create(model : Union[Model, str],
|
|
prompt : str,
|
|
provider : Union[ProviderType, None] = None,
|
|
stream : bool = False,
|
|
ignored : list[str] = None, **kwargs) -> Union[CreateResult, str]:
|
|
allowed_models = [
|
|
'code-davinci-002',
|
|
'text-ada-001',
|
|
'text-babbage-001',
|
|
'text-curie-001',
|
|
'text-davinci-002',
|
|
'text-davinci-003'
|
|
]
|
|
if model not in allowed_models:
|
|
raise ModelNotAllowedError(f'Can\'t use {model} with Completion.create()')
|
|
|
|
model, provider = get_model_and_provider(model, provider, stream, ignored)
|
|
|
|
result = provider.create_completion(model, [{"role": "user", "content": prompt}], stream=stream, **kwargs)
|
|
|
|
return result if stream else ''.join(result)
|