mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-11-27 13:42:19 +03:00
21e56a1af8
I used this repository (https://github.com/waylaidwanderer/node-chatgpt-api/) as a reference to fix all the bugs related to Bing "personality." I included all the required fields in the allowedMessageTypes and optionsSets (as well as sliceIds) to allow it to respond to any requests it actually supports. Will also finish the code to fully implement the image generation functionality.
487 lines
19 KiB
Python
487 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
import string
|
|
import random
|
|
import json
|
|
import os
|
|
import re
|
|
import io
|
|
import base64
|
|
import numpy as np
|
|
import uuid
|
|
import urllib.parse
|
|
from PIL import Image
|
|
from aiohttp import ClientSession, ClientTimeout
|
|
from ..typing import AsyncResult, Messages
|
|
from .base_provider import AsyncGeneratorProvider
|
|
|
|
class Tones():
|
|
creative = "Creative"
|
|
balanced = "Balanced"
|
|
precise = "Precise"
|
|
|
|
default_cookies = {
|
|
'SRCHD' : 'AF=NOFORM',
|
|
'PPLState' : '1',
|
|
'KievRPSSecAuth': '',
|
|
'SUID' : '',
|
|
'SRCHUSR' : '',
|
|
'SRCHHPGUSR' : '',
|
|
}
|
|
|
|
class Bing(AsyncGeneratorProvider):
|
|
url = "https://bing.com/chat"
|
|
working = True
|
|
supports_gpt_4 = True
|
|
|
|
@staticmethod
|
|
def create_async_generator(
|
|
model: str,
|
|
messages: Messages,
|
|
proxy: str = None,
|
|
cookies: dict = None,
|
|
tone: str = Tones.creative,
|
|
image: str = None,
|
|
**kwargs
|
|
) -> AsyncResult:
|
|
if len(messages) < 2:
|
|
prompt = messages[0]["content"]
|
|
context = None
|
|
else:
|
|
prompt = messages[-1]["content"]
|
|
context = create_context(messages[:-1])
|
|
|
|
if not cookies or "SRCHD" not in cookies:
|
|
cookies = default_cookies
|
|
return stream_generate(prompt, tone, image, context, proxy, cookies)
|
|
|
|
def create_context(messages: Messages):
|
|
return "".join(
|
|
f"[{message['role']}]" + ("(#message)" if message['role']!="system" else "(#additional_instructions)") + f"\n{message['content']}\n\n"
|
|
for message in messages
|
|
)
|
|
|
|
class Conversation():
|
|
def __init__(self, conversationId: str, clientId: str, conversationSignature: str, imageInfo: dict=None) -> None:
|
|
self.conversationId = conversationId
|
|
self.clientId = clientId
|
|
self.conversationSignature = conversationSignature
|
|
self.imageInfo = imageInfo
|
|
|
|
async def create_conversation(session: ClientSession, tone: str, image: str = None, proxy: str = None) -> Conversation:
|
|
url = 'https://www.bing.com/turing/conversation/create?bundleVersion=1.1199.4'
|
|
async with await session.get(url, proxy=proxy) as response:
|
|
data = await response.json()
|
|
|
|
conversationId = data.get('conversationId')
|
|
clientId = data.get('clientId')
|
|
conversationSignature = response.headers.get('X-Sydney-Encryptedconversationsignature')
|
|
|
|
if not conversationId or not clientId or not conversationSignature:
|
|
raise Exception('Failed to create conversation.')
|
|
conversation = Conversation(conversationId, clientId, conversationSignature, None)
|
|
if isinstance(image,str):
|
|
try:
|
|
config = {
|
|
"visualSearch": {
|
|
"maxImagePixels": 360000,
|
|
"imageCompressionRate": 0.7,
|
|
"enableFaceBlurDebug": 0,
|
|
}
|
|
}
|
|
is_data_uri_an_image(image)
|
|
img_binary_data = extract_data_uri(image)
|
|
is_accepted_format(img_binary_data)
|
|
img = Image.open(io.BytesIO(img_binary_data))
|
|
width, height = img.size
|
|
max_image_pixels = config['visualSearch']['maxImagePixels']
|
|
compression_rate = config['visualSearch']['imageCompressionRate']
|
|
|
|
if max_image_pixels / (width * height) < 1:
|
|
new_width = int(width * np.sqrt(max_image_pixels / (width * height)))
|
|
new_height = int(height * np.sqrt(max_image_pixels / (width * height)))
|
|
else:
|
|
new_width = width
|
|
new_height = height
|
|
try:
|
|
orientation = get_orientation(img)
|
|
except Exception:
|
|
orientation = None
|
|
new_img = process_image(orientation, img, new_width, new_height)
|
|
new_img_binary_data = compress_image_to_base64(new_img, compression_rate)
|
|
data, boundary = build_image_upload_api_payload(new_img_binary_data, conversation, tone)
|
|
headers = session.headers.copy()
|
|
headers["content-type"] = f'multipart/form-data; boundary={boundary}'
|
|
headers["referer"] = 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx'
|
|
headers["origin"] = 'https://www.bing.com'
|
|
async with await session.post("https://www.bing.com/images/kblob", data=data, headers=headers, proxy=proxy) as image_upload_response:
|
|
if image_upload_response.status != 200:
|
|
raise Exception("Failed to upload image.")
|
|
|
|
image_info = await image_upload_response.json()
|
|
if not image_info.get('blobId'):
|
|
raise Exception("Failed to parse image info.")
|
|
result = {'bcid': image_info.get('blobId', "")}
|
|
result['blurredBcid'] = image_info.get('processedBlobId', "")
|
|
if result['blurredBcid'] != "":
|
|
result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['blurredBcid']
|
|
elif result['bcid'] != "":
|
|
result["imageUrl"] = "https://www.bing.com/images/blob?bcid=" + result['bcid']
|
|
result['originalImageUrl'] = (
|
|
"https://www.bing.com/images/blob?bcid="
|
|
+ result['blurredBcid']
|
|
if config['visualSearch']["enableFaceBlurDebug"]
|
|
else "https://www.bing.com/images/blob?bcid="
|
|
+ result['bcid']
|
|
)
|
|
conversation.imageInfo = result
|
|
except Exception as e:
|
|
print(f"An error happened while trying to send image: {str(e)}")
|
|
return conversation
|
|
|
|
async def list_conversations(session: ClientSession) -> list:
|
|
url = "https://www.bing.com/turing/conversation/chats"
|
|
async with session.get(url) as response:
|
|
response = await response.json()
|
|
return response["chats"]
|
|
|
|
async def delete_conversation(session: ClientSession, conversation: Conversation, proxy: str = None) -> list:
|
|
url = "https://sydney.bing.com/sydney/DeleteSingleConversation"
|
|
json = {
|
|
"conversationId": conversation.conversationId,
|
|
"conversationSignature": conversation.conversationSignature,
|
|
"participant": {"id": conversation.clientId},
|
|
"source": "cib",
|
|
"optionsSets": ["autosave"]
|
|
}
|
|
async with session.post(url, json=json, proxy=proxy) as response:
|
|
response = await response.json()
|
|
return response["result"]["value"] == "Success"
|
|
|
|
class Defaults:
|
|
delimiter = "\x1e"
|
|
ip_address = f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
|
|
|
|
allowedMessageTypes = [
|
|
"ActionRequest",
|
|
"Chat",
|
|
"Context",
|
|
# "Disengaged", unwanted
|
|
"Progress",
|
|
# "AdsQuery", unwanted
|
|
"SemanticSerp",
|
|
"GenerateContentQuery",
|
|
"SearchQuery",
|
|
# The following message types should not be added so that it does not flood with
|
|
# useless messages (such as "Analyzing images" or "Searching the web") while it's retrieving the AI response
|
|
# "InternalSearchQuery",
|
|
# "InternalSearchResult",
|
|
"RenderCardRequest",
|
|
# "RenderContentRequest"
|
|
]
|
|
|
|
sliceIds = [
|
|
'abv2',
|
|
'srdicton',
|
|
'convcssclick',
|
|
'stylewv2',
|
|
'contctxp2tf',
|
|
'802fluxv1pc_a',
|
|
'806log2sphs0',
|
|
'727savemem',
|
|
'277teditgnds0',
|
|
'207hlthgrds0',
|
|
]
|
|
|
|
location = {
|
|
"locale": "en-US",
|
|
"market": "en-US",
|
|
"region": "US",
|
|
"locationHints": [
|
|
{
|
|
"country": "United States",
|
|
"state": "California",
|
|
"city": "Los Angeles",
|
|
"timezoneoffset": 8,
|
|
"countryConfidence": 8,
|
|
"Center": {"Latitude": 34.0536909, "Longitude": -118.242766},
|
|
"RegionType": 2,
|
|
"SourceType": 1,
|
|
}
|
|
],
|
|
}
|
|
|
|
headers = {
|
|
'accept': '*/*',
|
|
'accept-language': 'en-US,en;q=0.9',
|
|
'cache-control': 'max-age=0',
|
|
'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"',
|
|
'sec-ch-ua-arch': '"x86"',
|
|
'sec-ch-ua-bitness': '"64"',
|
|
'sec-ch-ua-full-version': '"110.0.1587.69"',
|
|
'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
|
|
'sec-ch-ua-mobile': '?0',
|
|
'sec-ch-ua-model': '""',
|
|
'sec-ch-ua-platform': '"Windows"',
|
|
'sec-ch-ua-platform-version': '"15.0.0"',
|
|
'sec-fetch-dest': 'document',
|
|
'sec-fetch-mode': 'navigate',
|
|
'sec-fetch-site': 'none',
|
|
'sec-fetch-user': '?1',
|
|
'upgrade-insecure-requests': '1',
|
|
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69',
|
|
'x-edge-shopping-flag': '1',
|
|
'x-forwarded-for': ip_address,
|
|
}
|
|
|
|
optionsSets = [
|
|
'nlu_direct_response_filter',
|
|
'deepleo',
|
|
'disable_emoji_spoken_text',
|
|
'responsible_ai_policy_235',
|
|
'enablemm',
|
|
'iyxapbing',
|
|
'iycapbing',
|
|
'gencontentv3',
|
|
'fluxsrtrunc',
|
|
'fluxtrunc',
|
|
'fluxv1',
|
|
'rai278',
|
|
'replaceurl',
|
|
'eredirecturl',
|
|
'nojbfedge'
|
|
]
|
|
|
|
def format_message(msg: dict) -> str:
|
|
return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter
|
|
|
|
def build_image_upload_api_payload(image_bin: str, conversation: Conversation, tone: str):
|
|
payload = {
|
|
'invokedSkills': ["ImageById"],
|
|
'subscriptionId': "Bing.Chat.Multimodal",
|
|
'invokedSkillsRequestData': {
|
|
'enableFaceBlur': True
|
|
},
|
|
'convoData': {
|
|
'convoid': "",
|
|
'convotone': tone
|
|
}
|
|
}
|
|
knowledge_request = {
|
|
'imageInfo': {},
|
|
'knowledgeRequest': payload
|
|
}
|
|
boundary="----WebKitFormBoundary" + ''.join(random.choices(string.ascii_letters + string.digits, k=16))
|
|
data = (
|
|
f'--{boundary}'
|
|
+ '\r\nContent-Disposition: form-data; name="knowledgeRequest"\r\n\r\n'
|
|
+ json.dumps(knowledge_request, ensure_ascii=False)
|
|
+ "\r\n--"
|
|
+ boundary
|
|
+ '\r\nContent-Disposition: form-data; name="imageBase64"\r\n\r\n'
|
|
+ image_bin
|
|
+ "\r\n--"
|
|
+ boundary
|
|
+ "--\r\n"
|
|
)
|
|
return data, boundary
|
|
|
|
def is_data_uri_an_image(data_uri: str):
|
|
try:
|
|
# Check if the data URI starts with 'data:image' and contains an image format (e.g., jpeg, png, gif)
|
|
if not re.match(r'data:image/(\w+);base64,', data_uri):
|
|
raise ValueError("Invalid data URI image.")
|
|
# Extract the image format from the data URI
|
|
image_format = re.match(r'data:image/(\w+);base64,', data_uri).group(1)
|
|
# Check if the image format is one of the allowed formats (jpg, jpeg, png, gif)
|
|
if image_format.lower() not in ['jpeg', 'jpg', 'png', 'gif']:
|
|
raise ValueError("Invalid image format (from mime file type).")
|
|
except Exception as e:
|
|
raise e
|
|
|
|
def is_accepted_format(binary_data: bytes) -> bool:
|
|
try:
|
|
check = False
|
|
if binary_data.startswith(b'\xFF\xD8\xFF'):
|
|
check = True # It's a JPEG image
|
|
elif binary_data.startswith(b'\x89PNG\r\n\x1a\n'):
|
|
check = True # It's a PNG image
|
|
elif binary_data.startswith(b'GIF87a') or binary_data.startswith(b'GIF89a'):
|
|
check = True # It's a GIF image
|
|
elif binary_data.startswith(b'\x89JFIF') or binary_data.startswith(b'JFIF\x00'):
|
|
check = True # It's a JPEG image
|
|
elif binary_data.startswith(b'\xFF\xD8'):
|
|
check = True # It's a JPEG image
|
|
elif binary_data.startswith(b'RIFF') and binary_data[8:12] == b'WEBP':
|
|
check = True # It's a WebP image
|
|
# else we raise ValueError
|
|
if not check:
|
|
raise ValueError("Invalid image format (from magic code).")
|
|
except Exception as e:
|
|
raise e
|
|
|
|
def extract_data_uri(data_uri: str) -> bytes:
|
|
try:
|
|
data = data_uri.split(",")[1]
|
|
data = base64.b64decode(data)
|
|
return data
|
|
except Exception as e:
|
|
raise e
|
|
|
|
def get_orientation(data: bytes) -> int:
|
|
try:
|
|
if data[:2] != b'\xFF\xD8':
|
|
raise Exception('NotJpeg')
|
|
with Image.open(data) as img:
|
|
exif_data = img._getexif()
|
|
if exif_data is not None:
|
|
orientation = exif_data.get(274) # 274 corresponds to the orientation tag in EXIF
|
|
if orientation is not None:
|
|
return orientation
|
|
except Exception:
|
|
pass
|
|
|
|
def process_image(orientation: int, img: Image.Image, new_width: int, new_height: int) -> Image.Image:
|
|
try:
|
|
# Initialize the canvas
|
|
new_img = Image.new("RGB", (new_width, new_height), color="#FFFFFF")
|
|
if orientation:
|
|
if orientation > 4:
|
|
img = img.transpose(Image.FLIP_LEFT_RIGHT)
|
|
if orientation in [3, 4]:
|
|
img = img.transpose(Image.ROTATE_180)
|
|
if orientation in [5, 6]:
|
|
img = img.transpose(Image.ROTATE_270)
|
|
if orientation in [7, 8]:
|
|
img = img.transpose(Image.ROTATE_90)
|
|
new_img.paste(img, (0, 0))
|
|
return new_img
|
|
except Exception as e:
|
|
raise e
|
|
|
|
def compress_image_to_base64(img, compression_rate) -> str:
|
|
try:
|
|
output_buffer = io.BytesIO()
|
|
img.save(output_buffer, format="JPEG", quality=int(compression_rate * 100))
|
|
return base64.b64encode(output_buffer.getvalue()).decode('utf-8')
|
|
except Exception as e:
|
|
raise e
|
|
|
|
def create_message(conversation: Conversation, prompt: str, tone: str, context: str=None) -> str:
|
|
options_sets = Defaults.optionsSets
|
|
if tone == Tones.creative:
|
|
options_sets.append("h3imaginative")
|
|
elif tone == Tones.precise:
|
|
options_sets.append("h3precise")
|
|
elif tone == Tones.balanced:
|
|
options_sets.append("galileo")
|
|
else:
|
|
options_sets.append("harmonyv3")
|
|
|
|
request_id = str(uuid.uuid4())
|
|
struct = {
|
|
'arguments': [
|
|
{
|
|
'source': 'cib',
|
|
'optionsSets': options_sets,
|
|
'allowedMessageTypes': Defaults.allowedMessageTypes,
|
|
'sliceIds': Defaults.sliceIds,
|
|
'traceId': os.urandom(16).hex(),
|
|
'isStartOfSession': True,
|
|
'requestId': request_id,
|
|
'message': Defaults.location | {
|
|
'author': 'user',
|
|
'inputMethod': 'Keyboard',
|
|
'text': prompt,
|
|
'messageType': 'Chat',
|
|
'requestId': request_id,
|
|
'messageId': request_id,
|
|
},
|
|
"scenario": "SERP",
|
|
'tone': tone,
|
|
'spokenTextMode': 'None',
|
|
'conversationId': conversation.conversationId,
|
|
'participant': {
|
|
'id': conversation.clientId
|
|
},
|
|
}
|
|
],
|
|
'invocationId': '1',
|
|
'target': 'chat',
|
|
'type': 4
|
|
}
|
|
if conversation.imageInfo != None and "imageUrl" in conversation.imageInfo and "originalImageUrl" in conversation.imageInfo:
|
|
struct['arguments'][0]['message']['originalImageUrl'] = conversation.imageInfo['originalImageUrl']
|
|
struct['arguments'][0]['message']['imageUrl'] = conversation.imageInfo['imageUrl']
|
|
struct['arguments'][0]['experienceType'] = None
|
|
struct['arguments'][0]['attachedFileInfo'] = {"fileName": None, "fileType": None}
|
|
if context:
|
|
struct['arguments'][0]['previousMessages'] = [{
|
|
"author": "user",
|
|
"description": context,
|
|
"contextType": "WebPage",
|
|
"messageType": "Context",
|
|
"messageId": "discover-web--page-ping-mriduna-----"
|
|
}]
|
|
return format_message(struct)
|
|
|
|
async def stream_generate(
|
|
prompt: str,
|
|
tone: str,
|
|
image: str = None,
|
|
context: str = None,
|
|
proxy: str = None,
|
|
cookies: dict = None
|
|
):
|
|
async with ClientSession(
|
|
timeout=ClientTimeout(total=900),
|
|
cookies=cookies,
|
|
headers=Defaults.headers,
|
|
) as session:
|
|
conversation = await create_conversation(session, tone, image, proxy)
|
|
try:
|
|
async with session.ws_connect('wss://sydney.bing.com/sydney/ChatHub', autoping=False, params={'sec_access_token': conversation.conversationSignature}, proxy=proxy) as wss:
|
|
|
|
await wss.send_str(format_message({'protocol': 'json', 'version': 1}))
|
|
await wss.receive(timeout=900)
|
|
await wss.send_str(create_message(conversation, prompt, tone, context))
|
|
|
|
response_txt = ''
|
|
returned_text = ''
|
|
final = False
|
|
while not final:
|
|
msg = await wss.receive(timeout=900)
|
|
objects = msg.data.split(Defaults.delimiter)
|
|
for obj in objects:
|
|
if obj is None or not obj:
|
|
continue
|
|
|
|
response = json.loads(obj)
|
|
if response.get('type') == 1 and response['arguments'][0].get('messages'):
|
|
message = response['arguments'][0]['messages'][0]
|
|
if (message['contentOrigin'] != 'Apology'):
|
|
if 'adaptiveCards' in message:
|
|
card = message['adaptiveCards'][0]['body'][0]
|
|
if "text" in card:
|
|
response_txt = card.get('text')
|
|
if message.get('messageType'):
|
|
inline_txt = card['inlines'][0].get('text')
|
|
response_txt += inline_txt + '\n'
|
|
elif message.get('contentType') == "IMAGE":
|
|
query = urllib.parse.quote(message.get('text'))
|
|
url = f"\nhttps://www.bing.com/images/create?q={query}"
|
|
response_txt += url
|
|
final = True
|
|
if response_txt.startswith(returned_text):
|
|
new = response_txt[len(returned_text):]
|
|
if new != "\n":
|
|
yield new
|
|
returned_text = response_txt
|
|
elif response.get('type') == 2:
|
|
result = response['item']['result']
|
|
if result.get('error'):
|
|
raise Exception(f"{result['value']}: {result['message']}")
|
|
return
|
|
finally:
|
|
await delete_conversation(session, conversation, proxy)
|