gpt4free/g4f/Provider/Bing.py
2023-08-14 11:46:32 +09:00

363 lines
12 KiB
Python

import asyncio
import json
import os
import random
import ssl
import uuid
import aiohttp
import certifi
import requests
from ..typing import Any, AsyncGenerator, CreateResult, Tuple
from .base_provider import BaseProvider
class Bing(BaseProvider):
url = "https://bing.com/chat"
supports_gpt_4 = True
@staticmethod
def create_completion(
model: str,
messages: list[dict[str, str]],
stream: bool,
**kwargs: Any,
) -> CreateResult:
if len(messages) < 2:
prompt = messages[0]["content"]
context = False
else:
prompt = messages[-1]["content"]
context = convert(messages[:-1])
response = run(stream_generate(prompt, jailbreak, context))
for token in response:
yield token
def convert(messages: list[dict[str, str]]):
context = ""
for message in messages:
context += "[%s](#message)\n%s\n\n" % (message["role"], message["content"])
return context
jailbreak = {
"optionsSets": [
"saharasugg",
"enablenewsfc",
"clgalileo",
"gencontentv3",
"nlu_direct_response_filter",
"deepleo",
"disable_emoji_spoken_text",
"responsible_ai_policy_235",
"enablemm",
"h3precise"
# "harmonyv3",
"dtappid",
"cricinfo",
"cricinfov2",
"dv3sugg",
"nojbfedge",
]
}
ssl_context = ssl.create_default_context()
ssl_context.load_verify_locations(certifi.where())
def _format(msg: dict[str, Any]) -> str:
return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter
async def stream_generate(
prompt: str,
mode: dict[str, list[str]] = jailbreak,
context: bool | str = False,
):
timeout = aiohttp.ClientTimeout(total=900)
session = aiohttp.ClientSession(timeout=timeout)
conversationId, clientId, conversationSignature = await create_conversation()
wss = await session.ws_connect(
"wss://sydney.bing.com/sydney/ChatHub",
ssl=ssl_context,
autoping=False,
headers={
"accept": "application/json",
"accept-language": "en-US,en;q=0.9",
"content-type": "application/json",
"sec-ch-ua": '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"',
"sec-ch-ua-arch": '"x86"',
"sec-ch-ua-bitness": '"64"',
"sec-ch-ua-full-version": '"109.0.1518.78"',
"sec-ch-ua-full-version-list": '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-model": "",
"sec-ch-ua-platform": '"Windows"',
"sec-ch-ua-platform-version": '"15.0.0"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"x-ms-client-request-id": str(uuid.uuid4()),
"x-ms-useragent": "azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32",
"Referer": "https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx",
"Referrer-Policy": "origin-when-cross-origin",
"x-forwarded-for": Defaults.ip_address,
},
)
await wss.send_str(_format({"protocol": "json", "version": 1}))
await wss.receive(timeout=900)
argument: dict[str, Any] = {
**mode,
"source": "cib",
"allowedMessageTypes": Defaults.allowedMessageTypes,
"sliceIds": Defaults.sliceIds,
"traceId": os.urandom(16).hex(),
"isStartOfSession": True,
"message": Defaults.location
| {
"author": "user",
"inputMethod": "Keyboard",
"text": prompt,
"messageType": "Chat",
},
"conversationSignature": conversationSignature,
"participant": {"id": clientId},
"conversationId": conversationId,
}
if context:
argument["previousMessages"] = [
{
"author": "user",
"description": context,
"contextType": "WebPage",
"messageType": "Context",
"messageId": "discover-web--page-ping-mriduna-----",
}
]
struct: dict[str, list[dict[str, Any]] | str | int] = {
"arguments": [argument],
"invocationId": "0",
"target": "chat",
"type": 4,
}
await wss.send_str(_format(struct))
final = False
draw = False
resp_txt = ""
result_text = ""
resp_txt_no_link = ""
cache_text = ""
while not final:
msg = await wss.receive(timeout=900)
objects = msg.data.split(Defaults.delimiter) # type: ignore
for obj in objects: # type: ignore
if obj is None or not obj:
continue
response = json.loads(obj) # type: ignore
if response.get("type") == 1 and response["arguments"][0].get(
"messages",
):
if not draw:
if (
response["arguments"][0]["messages"][0]["contentOrigin"]
!= "Apology"
) and not draw:
resp_txt = result_text + response["arguments"][0]["messages"][
0
]["adaptiveCards"][0]["body"][0].get("text", "")
resp_txt_no_link = result_text + response["arguments"][0][
"messages"
][0].get("text", "")
if response["arguments"][0]["messages"][0].get(
"messageType",
):
resp_txt = (
resp_txt
+ response["arguments"][0]["messages"][0][
"adaptiveCards"
][0]["body"][0]["inlines"][0].get("text")
+ "\n"
)
result_text = (
result_text
+ response["arguments"][0]["messages"][0][
"adaptiveCards"
][0]["body"][0]["inlines"][0].get("text")
+ "\n"
)
if cache_text.endswith(" "):
final = True
if wss and not wss.closed:
await wss.close()
if session and not session.closed:
await session.close()
yield (resp_txt.replace(cache_text, ""))
cache_text = resp_txt
elif response.get("type") == 2:
if response["item"]["result"].get("error"):
if wss and not wss.closed:
await wss.close()
if session and not session.closed:
await session.close()
raise Exception(
f"{response['item']['result']['value']}: {response['item']['result']['message']}"
)
if draw:
cache = response["item"]["messages"][1]["adaptiveCards"][0]["body"][
0
]["text"]
response["item"]["messages"][1]["adaptiveCards"][0]["body"][0][
"text"
] = (cache + resp_txt)
if (
response["item"]["messages"][-1]["contentOrigin"] == "Apology"
and resp_txt
):
response["item"]["messages"][-1]["text"] = resp_txt_no_link
response["item"]["messages"][-1]["adaptiveCards"][0]["body"][0][
"text"
] = resp_txt
# print('Preserved the message from being deleted', file=sys.stderr)
final = True
if wss and not wss.closed:
await wss.close()
if session and not session.closed:
await session.close()
async def create_conversation() -> Tuple[str, str, str]:
create = requests.get(
"https://www.bing.com/turing/conversation/create",
headers={
"authority": "edgeservices.bing.com",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "en-US,en;q=0.9",
"cache-control": "max-age=0",
"sec-ch-ua": '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"',
"sec-ch-ua-arch": '"x86"',
"sec-ch-ua-bitness": '"64"',
"sec-ch-ua-full-version": '"110.0.1587.69"',
"sec-ch-ua-full-version-list": '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-model": '""',
"sec-ch-ua-platform": '"Windows"',
"sec-ch-ua-platform-version": '"15.0.0"',
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "none",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69",
"x-edge-shopping-flag": "1",
"x-forwarded-for": Defaults.ip_address,
},
)
conversationId = create.json().get("conversationId")
clientId = create.json().get("clientId")
conversationSignature = create.json().get("conversationSignature")
if not conversationId or not clientId or not conversationSignature:
raise Exception("Failed to create conversation.")
return conversationId, clientId, conversationSignature
class Defaults:
delimiter = "\x1e"
ip_address = f"13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}"
allowedMessageTypes = [
"Chat",
"Disengaged",
"AdsQuery",
"SemanticSerp",
"GenerateContentQuery",
"SearchQuery",
"ActionRequest",
"Context",
"Progress",
"AdsQuery",
"SemanticSerp",
]
sliceIds = [
# "222dtappid",
# "225cricinfo",
# "224locals0"
"winmuid3tf",
"osbsdusgreccf",
"ttstmout",
"crchatrev",
"winlongmsgtf",
"ctrlworkpay",
"norespwtf",
"tempcacheread",
"temptacache",
"505scss0",
"508jbcars0",
"515enbotdets0",
"5082tsports",
"515vaoprvs",
"424dagslnv1s0",
"kcimgattcf",
"427startpms0",
]
location = {
"locale": "en-US",
"market": "en-US",
"region": "US",
"locationHints": [
{
"country": "United States",
"state": "California",
"city": "Los Angeles",
"timezoneoffset": 8,
"countryConfidence": 8,
"Center": {"Latitude": 34.0536909, "Longitude": -118.242766},
"RegionType": 2,
"SourceType": 1,
}
],
}
def run(generator: AsyncGenerator[Any | str, Any]):
loop = asyncio.get_event_loop()
gen = generator.__aiter__()
while True:
try:
yield loop.run_until_complete(gen.__anext__())
except StopAsyncIteration:
break