mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-12-25 04:01:52 +03:00
130 lines
5.2 KiB
Python
130 lines
5.2 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Union
|
|
|
|
from .. import debug, version
|
|
from ..errors import ProviderNotFoundError, ModelNotFoundError, ProviderNotWorkingError, StreamNotSupportedError
|
|
from ..models import Model, ModelUtils, default
|
|
from ..Provider import ProviderUtils
|
|
from ..providers.types import BaseRetryProvider, ProviderType
|
|
from ..providers.retry_provider import IterListProvider
|
|
|
|
def convert_to_provider(provider: str) -> ProviderType:
|
|
if " " in provider:
|
|
provider_list = [ProviderUtils.convert[p] for p in provider.split() if p in ProviderUtils.convert]
|
|
if not provider_list:
|
|
raise ProviderNotFoundError(f'Providers not found: {provider}')
|
|
provider = IterListProvider(provider_list, False)
|
|
elif provider in ProviderUtils.convert:
|
|
provider = ProviderUtils.convert[provider]
|
|
elif provider:
|
|
raise ProviderNotFoundError(f'Provider not found: {provider}')
|
|
return provider
|
|
|
|
def get_model_and_provider(model : Union[Model, str],
|
|
provider : Union[ProviderType, str, None],
|
|
stream : bool,
|
|
ignored : list[str] = None,
|
|
ignore_working: bool = False,
|
|
ignore_stream: bool = False,
|
|
logging: bool = True) -> tuple[str, ProviderType]:
|
|
"""
|
|
Retrieves the model and provider based on input parameters.
|
|
|
|
Args:
|
|
model (Union[Model, str]): The model to use, either as an object or a string identifier.
|
|
provider (Union[ProviderType, str, None]): The provider to use, either as an object, a string identifier, or None.
|
|
stream (bool): Indicates if the operation should be performed as a stream.
|
|
ignored (list[str], optional): List of provider names to be ignored.
|
|
ignore_working (bool, optional): If True, ignores the working status of the provider.
|
|
ignore_stream (bool, optional): If True, ignores the streaming capability of the provider.
|
|
|
|
Returns:
|
|
tuple[str, ProviderType]: A tuple containing the model name and the provider type.
|
|
|
|
Raises:
|
|
ProviderNotFoundError: If the provider is not found.
|
|
ModelNotFoundError: If the model is not found.
|
|
ProviderNotWorkingError: If the provider is not working.
|
|
StreamNotSupportedError: If streaming is not supported by the provider.
|
|
"""
|
|
if debug.version_check:
|
|
debug.version_check = False
|
|
version.utils.check_version()
|
|
|
|
if isinstance(provider, str):
|
|
provider = convert_to_provider(provider)
|
|
|
|
if isinstance(model, str):
|
|
if model in ModelUtils.convert:
|
|
model = ModelUtils.convert[model]
|
|
|
|
if not provider:
|
|
if not model:
|
|
model = default
|
|
provider = model.best_provider
|
|
elif isinstance(model, str):
|
|
if model in ProviderUtils.convert:
|
|
provider = ProviderUtils.convert[model]
|
|
model = provider.default_model if hasattr(provider, "default_model") else ""
|
|
else:
|
|
raise ModelNotFoundError(f'Model not found: {model}')
|
|
elif isinstance(model, Model):
|
|
provider = model.best_provider
|
|
else:
|
|
raise ValueError(f"Unexpected type: {type(model)}")
|
|
if not provider:
|
|
raise ProviderNotFoundError(f'No provider found for model: {model}')
|
|
|
|
provider_name = provider.__name__ if hasattr(provider, "__name__") else type(provider).__name__
|
|
|
|
if isinstance(model, Model):
|
|
model = model.name
|
|
|
|
if not ignore_working and not provider.working:
|
|
raise ProviderNotWorkingError(f"{provider_name} is not working")
|
|
|
|
if isinstance(provider, BaseRetryProvider):
|
|
if not ignore_working:
|
|
provider.providers = [p for p in provider.providers if p.working]
|
|
if ignored:
|
|
provider.providers = [p for p in provider.providers if p.__name__ not in ignored]
|
|
|
|
if not ignore_stream and not provider.supports_stream and stream:
|
|
raise StreamNotSupportedError(f'{provider_name} does not support "stream" argument')
|
|
|
|
if logging:
|
|
if model:
|
|
debug.log(f'Using {provider_name} provider and {model} model')
|
|
else:
|
|
debug.log(f'Using {provider_name} provider')
|
|
|
|
debug.last_provider = provider
|
|
debug.last_model = model
|
|
|
|
return model, provider
|
|
|
|
def get_last_provider(as_dict: bool = False) -> Union[ProviderType, dict[str, str], None]:
|
|
"""
|
|
Retrieves the last used provider.
|
|
|
|
Args:
|
|
as_dict (bool, optional): If True, returns the provider information as a dictionary.
|
|
|
|
Returns:
|
|
Union[ProviderType, dict[str, str]]: The last used provider, either as an object or a dictionary.
|
|
"""
|
|
last = debug.last_provider
|
|
if isinstance(last, BaseRetryProvider):
|
|
last = last.last_provider
|
|
if as_dict:
|
|
if last:
|
|
return {
|
|
"name": last.__name__ if hasattr(last, "__name__") else type(last).__name__,
|
|
"url": last.url,
|
|
"model": debug.last_model,
|
|
"label": getattr(last, "label", None) if hasattr(last, "label") else None
|
|
}
|
|
else:
|
|
return {}
|
|
return last |