mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-12-25 12:16:17 +03:00
131 lines
4.7 KiB
Python
131 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
from aiohttp import ClientSession, ClientResponseError
|
|
import re
|
|
from ..typing import AsyncResult, Messages
|
|
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
|
|
from .helper import format_prompt
|
|
|
|
|
|
class LiteIcoding(AsyncGeneratorProvider, ProviderModelMixin):
|
|
url = "https://lite.icoding.ink"
|
|
api_endpoint = "/api/v1/gpt/message"
|
|
working = True
|
|
supports_gpt_4 = True
|
|
default_model = "gpt-4o"
|
|
models = [
|
|
'gpt-4o',
|
|
'gpt-4-turbo',
|
|
'claude-3',
|
|
'claude-3.5',
|
|
'gemini-1.5',
|
|
]
|
|
|
|
model_aliases = {
|
|
"gpt-4o-mini": "gpt-4o",
|
|
"gemini-pro": "gemini-1.5",
|
|
}
|
|
|
|
bearer_tokens = [
|
|
"aa3020ee873e40cb8b3f515a0708ebc4",
|
|
"5d69cd271b144226ac1199b3c849a566",
|
|
"62977f48a95844f8853a953679401850",
|
|
"d815b091959e42dd8b7871dfaf879485"
|
|
]
|
|
current_token_index = 0
|
|
|
|
@classmethod
|
|
def get_next_bearer_token(cls):
|
|
token = cls.bearer_tokens[cls.current_token_index]
|
|
cls.current_token_index = (cls.current_token_index + 1) % len(cls.bearer_tokens)
|
|
return token
|
|
|
|
@classmethod
|
|
async def create_async_generator(
|
|
cls,
|
|
model: str,
|
|
messages: Messages,
|
|
proxy: str = None,
|
|
**kwargs
|
|
) -> AsyncResult:
|
|
bearer_token = cls.get_next_bearer_token()
|
|
headers = {
|
|
"Accept": "*/*",
|
|
"Accept-Language": "en-US,en;q=0.9",
|
|
"Authorization": f"Bearer {bearer_token}",
|
|
"Connection": "keep-alive",
|
|
"Content-Type": "application/json;charset=utf-8",
|
|
"DNT": "1",
|
|
"Origin": cls.url,
|
|
"Referer": f"{cls.url}/",
|
|
"Sec-Fetch-Dest": "empty",
|
|
"Sec-Fetch-Mode": "cors",
|
|
"Sec-Fetch-Site": "same-origin",
|
|
"User-Agent": (
|
|
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) "
|
|
"Chrome/126.0.0.0 Safari/537.36"
|
|
),
|
|
"sec-ch-ua": '"Not/A)Brand";v="8", "Chromium";v="126"',
|
|
"sec-ch-ua-mobile": "?0",
|
|
"sec-ch-ua-platform": '"Linux"',
|
|
}
|
|
|
|
data = {
|
|
"model": model,
|
|
"chatId": "-1",
|
|
"messages": [
|
|
{
|
|
"role": msg["role"],
|
|
"content": msg["content"],
|
|
"time": msg.get("time", ""),
|
|
"attachments": msg.get("attachments", []),
|
|
}
|
|
for msg in messages
|
|
],
|
|
"plugins": [],
|
|
"systemPrompt": "",
|
|
"temperature": 0.5,
|
|
}
|
|
|
|
async with ClientSession(headers=headers) as session:
|
|
try:
|
|
async with session.post(
|
|
f"{cls.url}{cls.api_endpoint}", json=data, proxy=proxy
|
|
) as response:
|
|
response.raise_for_status()
|
|
buffer = ""
|
|
full_response = ""
|
|
def decode_content(data):
|
|
bytes_array = bytes([int(b, 16) ^ 255 for b in data.split()])
|
|
return bytes_array.decode('utf-8')
|
|
async for chunk in response.content.iter_any():
|
|
if chunk:
|
|
buffer += chunk.decode()
|
|
while "\n\n" in buffer:
|
|
part, buffer = buffer.split("\n\n", 1)
|
|
if part.startswith("data: "):
|
|
content = part[6:].strip()
|
|
if content and content != "[DONE]":
|
|
content = content.strip('"')
|
|
decoded_content = decode_content(content)
|
|
full_response += decoded_content
|
|
full_response = (
|
|
full_response.replace('""', '')
|
|
.replace('" "', ' ')
|
|
.replace("\\n\\n", "\n\n")
|
|
.replace("\\n", "\n")
|
|
.replace('\\"', '"')
|
|
.strip()
|
|
)
|
|
filtered_response = re.sub(r'\n---\n.*', '', full_response, flags=re.DOTALL)
|
|
cleaned_response = filtered_response.strip().strip('"')
|
|
yield cleaned_response
|
|
|
|
except ClientResponseError as e:
|
|
raise RuntimeError(
|
|
f"ClientResponseError {e.status}: {e.message}, url={e.request_info.url}, data={data}"
|
|
) from e
|
|
|
|
except Exception as e:
|
|
raise RuntimeError(f"Unexpected error: {str(e)}") from e
|