gpt4free/g4f/Provider/Blackbox.py

160 lines
5.6 KiB
Python

from __future__ import annotations
import re
import random
import string
from aiohttp import ClientSession
from ..typing import AsyncResult, Messages, ImageType
from ..image import ImageResponse, to_data_uri
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
working = True
supports_stream = True
supports_system_message = True
supports_message_history = True
default_model = 'blackbox'
models = [
'blackbox',
'gemini-1.5-flash',
"llama-3.1-8b",
'llama-3.1-70b',
'llama-3.1-405b',
'ImageGenerationLV45LJp',
'GPT-4o',
'Gemini-PRO',
'Claude-Sonnet-3.5',
]
model_aliases = {
"gemini-flash": "gemini-1.5-flash",
"flux": "ImageGenerationLV45LJp",
"gpt-4o": "GPT-4o",
"gemini-pro": "Gemini-PRO",
"claude-3.5-sonnet": "Claude-Sonnet-3.5",
}
agentMode = {
'ImageGenerationLV45LJp': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"},
}
trendingAgentMode = {
"blackbox": {},
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'},
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"},
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"},
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"},
}
userSelectedModel = {
"GPT-4o": "GPT-4o",
"Gemini-PRO": "Gemini-PRO",
'Claude-Sonnet-3.5': "Claude-Sonnet-3.5",
}
@classmethod
def get_model(cls, model: str) -> str:
if model in cls.models:
return model
elif model in cls.userSelectedModel:
return model
elif model in cls.model_aliases:
return cls.model_aliases[model]
else:
return cls.default_model
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
proxy: str = None,
image: ImageType = None,
image_name: str = None,
**kwargs
) -> AsyncResult:
model = cls.get_model(model)
headers = {
"accept": "*/*",
"accept-language": "en-US,en;q=0.9",
"cache-control": "no-cache",
"content-type": "application/json",
"origin": cls.url,
"pragma": "no-cache",
"referer": f"{cls.url}/",
"sec-ch-ua": '"Not;A=Brand";v="24", "Chromium";v="128"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Linux"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
if model in cls.userSelectedModel:
prefix = f"@{cls.userSelectedModel[model]}"
if not messages[0]['content'].startswith(prefix):
messages[0]['content'] = f"{prefix} {messages[0]['content']}"
async with ClientSession(headers=headers) as session:
if image is not None:
messages[-1]["data"] = {
"fileText": image_name,
"imageBase64": to_data_uri(image)
}
random_id = ''.join(random.choices(string.ascii_letters + string.digits, k=7))
data = {
"messages": messages,
"id": random_id,
"previewToken": None,
"userId": None,
"codeModelMode": True,
"agentMode": {},
"trendingAgentMode": {},
"userSelectedModel": None,
"isMicMode": False,
"maxTokens": 99999999,
"playgroundTopP": 0.9,
"playgroundTemperature": 0.5,
"isChromeExt": False,
"githubToken": None,
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"mobileClient": False,
"webSearchMode": False,
}
if model in cls.agentMode:
data["agentMode"] = cls.agentMode[model]
elif model in cls.trendingAgentMode:
data["trendingAgentMode"] = cls.trendingAgentMode[model]
elif model in cls.userSelectedModel:
data["userSelectedModel"] = cls.userSelectedModel[model]
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response:
response.raise_for_status()
if model == 'ImageGenerationLV45LJp':
response_text = await response.text()
url_match = re.search(r'https://storage\.googleapis\.com/[^\s\)]+', response_text)
if url_match:
image_url = url_match.group(0)
yield ImageResponse(image_url, alt=messages[-1]['content'])
else:
raise Exception("Image URL not found in the response")
else:
async for chunk in response.content.iter_any():
if chunk:
decoded_chunk = chunk.decode()
decoded_chunk = re.sub(r'\$@\$v=[^$]+\$@\$', '', decoded_chunk)
if decoded_chunk.strip():
yield decoded_chunk