gpt4free/g4f/Provider/PerplexityAi.py
Heiner Lohaus 951a1332a7 Fix create_event_loop function
Add PerplexityAi Provider
2023-09-20 23:06:52 +02:00

87 lines
2.9 KiB
Python

from __future__ import annotations
import json
import time
import base64
from curl_cffi.requests import AsyncSession
from .base_provider import AsyncProvider, format_prompt
class PerplexityAi(AsyncProvider):
url = "https://www.perplexity.ai"
working = True
supports_gpt_35_turbo = True
_sources = []
@classmethod
async def create_async(
cls,
model: str,
messages: list[dict[str, str]],
proxy: str = None,
**kwargs
) -> str:
url = cls.url + "/socket.io/?EIO=4&transport=polling"
async with AsyncSession(proxies={"https": proxy}, impersonate="chrome107") as session:
url_session = "https://www.perplexity.ai/api/auth/session"
response = await session.get(url_session)
response = await session.get(url, params={"t": timestamp()})
response.raise_for_status()
sid = json.loads(response.text[1:])["sid"]
data = '40{"jwt":"anonymous-ask-user"}'
response = await session.post(url, params={"t": timestamp(), "sid": sid}, data=data)
response.raise_for_status()
data = "424" + json.dumps([
"perplexity_ask",
format_prompt(messages),
{
"version":"2.1",
"source":"default",
"language":"en",
"timezone": time.tzname[0],
"search_focus":"internet",
"mode":"concise"
}
])
response = await session.post(url, params={"t": timestamp(), "sid": sid}, data=data)
response.raise_for_status()
while True:
response = await session.get(url, params={"t": timestamp(), "sid": sid})
response.raise_for_status()
for line in response.text.splitlines():
if line.startswith("434"):
result = json.loads(json.loads(line[3:])[0]["text"])
cls._sources = [{
"name": source["name"],
"url": source["url"],
"snippet": source["snippet"]
} for source in result["web_results"]]
return result["answer"]
@classmethod
def get_sources(cls):
return cls._sources
@classmethod
@property
def params(cls):
params = [
("model", "str"),
("messages", "list[dict[str, str]]"),
("stream", "bool"),
("proxy", "str"),
]
param = ", ".join([": ".join(p) for p in params])
return f"g4f.provider.{cls.__name__} supports: ({param})"
def timestamp() -> str:
return base64.urlsafe_b64encode(int(time.time()-1407782612).to_bytes(4, 'big')).decode()