refactor(g4f/Provider/Blackbox.py): streamline model handling and improve image generation

This commit is contained in:
kqlio67 2024-09-12 23:20:02 +03:00
parent abdb27a6e8
commit a15578e229

View File

@ -1,43 +1,38 @@
from __future__ import annotations from __future__ import annotations
import uuid
import secrets
import re import re
import base64 import json
from aiohttp import ClientSession from aiohttp import ClientSession
from typing import AsyncGenerator, Optional
from ..typing import AsyncResult, Messages, ImageType from ..typing import AsyncResult, Messages, ImageType
from ..image import to_data_uri, ImageResponse from ..image import ImageResponse, to_data_uri
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
class Blackbox(AsyncGeneratorProvider, ProviderModelMixin): class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://www.blackbox.ai" url = "https://www.blackbox.ai"
api_endpoint = "https://www.blackbox.ai/api/chat"
working = True working = True
supports_stream = True
supports_system_message = True
supports_message_history = True
default_model = 'blackbox' default_model = 'blackbox'
models = [ models = [
default_model, 'blackbox',
"gemini-1.5-flash", 'gemini-1.5-flash',
"llama-3.1-8b", "llama-3.1-8b",
'llama-3.1-70b', 'llama-3.1-70b',
'llama-3.1-405b', 'llama-3.1-405b',
'ImageGeneration', 'ImageGenerationLV45LJp'
] ]
model_aliases = { model_config = {
"gemini-flash": "gemini-1.5-flash", "blackbox": {'mode': True, 'id': 'blackbox'},
}
agent_mode_map = {
'ImageGeneration': {"mode": True, "id": "ImageGenerationLV45LJp", "name": "Image Generation"},
}
model_id_map = {
"blackbox": {},
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'}, "gemini-1.5-flash": {'mode': True, 'id': 'Gemini'},
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"}, "llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"},
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"}, 'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"},
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"} 'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405b"},
'ImageGenerationLV45LJp': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "Image Generation"},
} }
@classmethod @classmethod
@ -49,108 +44,80 @@ class Blackbox(AsyncGeneratorProvider, ProviderModelMixin):
else: else:
return cls.default_model return cls.default_model
@classmethod
async def download_image_to_base64_url(cls, url: str) -> str:
async with ClientSession() as session:
async with session.get(url) as response:
image_data = await response.read()
base64_data = base64.b64encode(image_data).decode('utf-8')
mime_type = response.headers.get('Content-Type', 'image/jpeg')
return f"data:{mime_type};base64,{base64_data}"
@classmethod @classmethod
async def create_async_generator( async def create_async_generator(
cls, cls,
model: str, model: str,
messages: Messages, messages: Messages,
proxy: Optional[str] = None, proxy: str = None,
image: Optional[ImageType] = None, image: ImageType = None,
image_name: Optional[str] = None, image_name: str = None,
**kwargs **kwargs
) -> AsyncGenerator[AsyncResult, None]: ) -> AsyncResult:
if image is not None: model = cls.get_model(model)
messages[-1]["data"] = {
"fileText": image_name,
"imageBase64": to_data_uri(image),
"title": str(uuid.uuid4())
}
headers = { headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "accept": "*/*",
"Accept": "*/*", "accept-language": "en-US,en;q=0.9",
"Accept-Language": "en-US,en;q=0.5", "cache-control": "no-cache",
"Accept-Encoding": "gzip, deflate, br", "content-type": "application/json",
"Referer": cls.url, "origin": cls.url,
"Content-Type": "application/json", "pragma": "no-cache",
"Origin": cls.url, "referer": f"{cls.url}/",
"DNT": "1", "sec-ch-ua": '"Not;A=Brand";v="24", "Chromium";v="128"',
"Sec-GPC": "1", "sec-ch-ua-mobile": "?0",
"Alt-Used": "www.blackbox.ai", "sec-ch-ua-platform": '"Linux"',
"Connection": "keep-alive", "sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
} }
async with ClientSession(headers=headers) as session: async with ClientSession(headers=headers) as session:
random_id = secrets.token_hex(16) if image is not None:
random_user_id = str(uuid.uuid4()) messages[-1]["data"] = {
"fileText": image_name,
model = cls.get_model(model) # Resolve the model alias "imageBase64": to_data_uri(image)
}
data = { data = {
"messages": messages, "messages": messages,
"id": random_id, "id": "MRtAuMi",
"userId": random_user_id, "previewToken": None,
"userId": None,
"codeModelMode": True, "codeModelMode": True,
"agentMode": cls.agent_mode_map.get(model, {}), "agentMode": {},
"trendingAgentMode": {}, "trendingAgentMode": {},
"isMicMode": False, "isMicMode": False,
"maxTokens": 1024,
"isChromeExt": False, "isChromeExt": False,
"playgroundMode": False,
"webSearchMode": False,
"userSystemPrompt": "",
"githubToken": None, "githubToken": None,
"trendingAgentModel": cls.model_id_map.get(model, {}), "clickedAnswer2": False,
"maxTokens": None "clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"mobileClient": False
} }
async with session.post( if model == 'ImageGenerationLV45LJp':
f"{cls.url}/api/chat", json=data, proxy=proxy data["agentMode"] = cls.model_config[model]
) as response: else:
data["trendingAgentMode"] = cls.model_config[model]
async with session.post(cls.api_endpoint, json=data, proxy=proxy) as response:
response.raise_for_status() response.raise_for_status()
full_response = "" if model == 'ImageGenerationLV45LJp':
buffer = "" response_text = await response.text()
image_base64_url = None url_match = re.search(r'https://storage\.googleapis\.com/[^\s\)]+', response_text)
async for chunk in response.content.iter_any(): if url_match:
image_url = url_match.group(0)
yield ImageResponse(image_url, alt=messages[-1]['content'])
else:
raise Exception("Image URL not found in the response")
else:
async for chunk in response.content:
if chunk: if chunk:
decoded_chunk = chunk.decode() decoded_chunk = chunk.decode()
cleaned_chunk = re.sub(r'\$@\$.+?\$@\$|\$@\$', '', decoded_chunk) if decoded_chunk.startswith('$@$v=undefined-rv1$@$'):
decoded_chunk = decoded_chunk[len('$@$v=undefined-rv1$@$'):]
buffer += cleaned_chunk yield decoded_chunk
# Check if there's a complete image line in the buffer
image_match = re.search(r'!\[Generated Image\]\((https?://[^\s\)]+)\)', buffer)
if image_match:
image_url = image_match.group(1)
# Download the image and convert to base64 URL
image_base64_url = await cls.download_image_to_base64_url(image_url)
# Remove the image line from the buffer
buffer = re.sub(r'!\[Generated Image\]\(https?://[^\s\)]+\)', '', buffer)
# Send text line by line
lines = buffer.split('\n')
for line in lines[:-1]:
if line.strip():
full_response += line + '\n'
yield line + '\n'
buffer = lines[-1] # Keep the last incomplete line in the buffer
# Send the remaining buffer if it's not empty
if buffer.strip():
full_response += buffer
yield buffer
# If an image was found, send it as ImageResponse
if image_base64_url:
alt_text = "Generated Image"
image_response = ImageResponse(image_base64_url, alt=alt_text)
yield image_response