mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-11-27 13:42:19 +03:00
951a1332a7
Add PerplexityAi Provider
154 lines
3.9 KiB
Python
154 lines
3.9 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from asyncio import SelectorEventLoop
|
|
from abc import ABC, abstractmethod
|
|
|
|
import browser_cookie3
|
|
|
|
from ..typing import Any, AsyncGenerator, CreateResult
|
|
|
|
|
|
class BaseProvider(ABC):
|
|
url: str
|
|
working = False
|
|
needs_auth = False
|
|
supports_stream = False
|
|
supports_gpt_35_turbo = False
|
|
supports_gpt_4 = False
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def create_completion(
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
stream: bool,
|
|
**kwargs
|
|
) -> CreateResult:
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
@classmethod
|
|
@property
|
|
def params(cls):
|
|
params = [
|
|
("model", "str"),
|
|
("messages", "list[dict[str, str]]"),
|
|
("stream", "bool"),
|
|
]
|
|
param = ", ".join([": ".join(p) for p in params])
|
|
return f"g4f.provider.{cls.__name__} supports: ({param})"
|
|
|
|
|
|
class AsyncProvider(BaseProvider):
|
|
@classmethod
|
|
def create_completion(
|
|
cls,
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
stream: bool = False,
|
|
**kwargs
|
|
) -> CreateResult:
|
|
loop = create_event_loop()
|
|
try:
|
|
yield loop.run_until_complete(cls.create_async(model, messages, **kwargs))
|
|
finally:
|
|
loop.close()
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
async def create_async(
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
**kwargs
|
|
) -> str:
|
|
raise NotImplementedError()
|
|
|
|
|
|
class AsyncGeneratorProvider(AsyncProvider):
|
|
supports_stream = True
|
|
|
|
@classmethod
|
|
def create_completion(
|
|
cls,
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
stream: bool = True,
|
|
**kwargs
|
|
) -> CreateResult:
|
|
loop = create_event_loop()
|
|
try:
|
|
generator = cls.create_async_generator(
|
|
model,
|
|
messages,
|
|
stream=stream,
|
|
**kwargs
|
|
)
|
|
gen = generator.__aiter__()
|
|
while True:
|
|
try:
|
|
yield loop.run_until_complete(gen.__anext__())
|
|
except StopAsyncIteration:
|
|
break
|
|
finally:
|
|
loop.close()
|
|
|
|
@classmethod
|
|
async def create_async(
|
|
cls,
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
**kwargs
|
|
) -> str:
|
|
return "".join([
|
|
chunk async for chunk in cls.create_async_generator(
|
|
model,
|
|
messages,
|
|
stream=False,
|
|
**kwargs
|
|
)
|
|
])
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def create_async_generator(
|
|
model: str,
|
|
messages: list[dict[str, str]],
|
|
**kwargs
|
|
) -> AsyncGenerator:
|
|
raise NotImplementedError()
|
|
|
|
|
|
# Don't create a new event loop in a running async loop.
|
|
# Force use selector event loop on windows and linux use it anyway.
|
|
def create_event_loop() -> SelectorEventLoop:
|
|
try:
|
|
asyncio.get_running_loop()
|
|
except RuntimeError:
|
|
return SelectorEventLoop()
|
|
raise RuntimeError(
|
|
'Use "create_async" instead of "create" function in a async loop.')
|
|
|
|
|
|
_cookies = {}
|
|
|
|
def get_cookies(cookie_domain: str) -> dict:
|
|
if cookie_domain not in _cookies:
|
|
_cookies[cookie_domain] = {}
|
|
try:
|
|
for cookie in browser_cookie3.load(cookie_domain):
|
|
_cookies[cookie_domain][cookie.name] = cookie.value
|
|
except:
|
|
pass
|
|
return _cookies[cookie_domain]
|
|
|
|
|
|
def format_prompt(messages: list[dict[str, str]], add_special_tokens=False):
|
|
if add_special_tokens or len(messages) > 1:
|
|
formatted = "\n".join(
|
|
["%s: %s" % ((message["role"]).capitalize(), message["content"]) for message in messages]
|
|
)
|
|
return f"{formatted}\nAssistant:"
|
|
else:
|
|
return messages[0]["content"] |