mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-12-12 13:44:31 +03:00
351 lines
13 KiB
Python
351 lines
13 KiB
Python
import os
|
|
import json
|
|
import random
|
|
import json
|
|
import os
|
|
import uuid
|
|
import ssl
|
|
import certifi
|
|
import aiohttp
|
|
import asyncio
|
|
|
|
import requests
|
|
from ...typing import sha256, Dict, get_type_hints
|
|
|
|
url = 'https://bing.com/chat'
|
|
model = ['gpt-4']
|
|
supports_stream = True
|
|
needs_auth = False
|
|
|
|
ssl_context = ssl.create_default_context()
|
|
ssl_context.load_verify_locations(certifi.where())
|
|
|
|
|
|
class optionsSets:
|
|
optionSet: dict = {
|
|
'tone': str,
|
|
'optionsSets': list
|
|
}
|
|
|
|
jailbreak: dict = {
|
|
"optionsSets": [
|
|
'saharasugg',
|
|
'enablenewsfc',
|
|
'clgalileo',
|
|
'gencontentv3',
|
|
"nlu_direct_response_filter",
|
|
"deepleo",
|
|
"disable_emoji_spoken_text",
|
|
"responsible_ai_policy_235",
|
|
"enablemm",
|
|
"h3precise"
|
|
# "harmonyv3",
|
|
"dtappid",
|
|
"cricinfo",
|
|
"cricinfov2",
|
|
"dv3sugg",
|
|
"nojbfedge"
|
|
]
|
|
}
|
|
|
|
|
|
class Defaults:
|
|
delimiter = '\x1e'
|
|
ip_address = f'13.{random.randint(104, 107)}.{random.randint(0, 255)}.{random.randint(0, 255)}'
|
|
|
|
allowedMessageTypes = [
|
|
'Chat',
|
|
'Disengaged',
|
|
'AdsQuery',
|
|
'SemanticSerp',
|
|
'GenerateContentQuery',
|
|
'SearchQuery',
|
|
'ActionRequest',
|
|
'Context',
|
|
'Progress',
|
|
'AdsQuery',
|
|
'SemanticSerp'
|
|
]
|
|
|
|
sliceIds = [
|
|
|
|
# "222dtappid",
|
|
# "225cricinfo",
|
|
# "224locals0"
|
|
|
|
'winmuid3tf',
|
|
'osbsdusgreccf',
|
|
'ttstmout',
|
|
'crchatrev',
|
|
'winlongmsgtf',
|
|
'ctrlworkpay',
|
|
'norespwtf',
|
|
'tempcacheread',
|
|
'temptacache',
|
|
'505scss0',
|
|
'508jbcars0',
|
|
'515enbotdets0',
|
|
'5082tsports',
|
|
'515vaoprvs',
|
|
'424dagslnv1s0',
|
|
'kcimgattcf',
|
|
'427startpms0'
|
|
]
|
|
|
|
location = {
|
|
'locale': 'en-US',
|
|
'market': 'en-US',
|
|
'region': 'US',
|
|
'locationHints': [
|
|
{
|
|
'country': 'United States',
|
|
'state': 'California',
|
|
'city': 'Los Angeles',
|
|
'timezoneoffset': 8,
|
|
'countryConfidence': 8,
|
|
'Center': {
|
|
'Latitude': 34.0536909,
|
|
'Longitude': -118.242766
|
|
},
|
|
'RegionType': 2,
|
|
'SourceType': 1
|
|
}
|
|
],
|
|
}
|
|
|
|
|
|
def _format(msg: dict) -> str:
|
|
return json.dumps(msg, ensure_ascii=False) + Defaults.delimiter
|
|
|
|
|
|
async def create_conversation():
|
|
for _ in range(5):
|
|
create = requests.get('https://www.bing.com/turing/conversation/create',
|
|
headers={
|
|
'authority': 'edgeservices.bing.com',
|
|
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
|
|
'accept-language': 'en-US,en;q=0.9',
|
|
'cache-control': 'max-age=0',
|
|
'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"',
|
|
'sec-ch-ua-arch': '"x86"',
|
|
'sec-ch-ua-bitness': '"64"',
|
|
'sec-ch-ua-full-version': '"110.0.1587.69"',
|
|
'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
|
|
'sec-ch-ua-mobile': '?0',
|
|
'sec-ch-ua-model': '""',
|
|
'sec-ch-ua-platform': '"Windows"',
|
|
'sec-ch-ua-platform-version': '"15.0.0"',
|
|
'sec-fetch-dest': 'document',
|
|
'sec-fetch-mode': 'navigate',
|
|
'sec-fetch-site': 'none',
|
|
'sec-fetch-user': '?1',
|
|
'upgrade-insecure-requests': '1',
|
|
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.69',
|
|
'x-edge-shopping-flag': '1',
|
|
'x-forwarded-for': Defaults.ip_address
|
|
})
|
|
|
|
conversationId = create.json().get('conversationId')
|
|
clientId = create.json().get('clientId')
|
|
conversationSignature = create.json().get('conversationSignature')
|
|
|
|
if not conversationId or not clientId or not conversationSignature and _ == 4:
|
|
raise Exception('Failed to create conversation.')
|
|
|
|
return conversationId, clientId, conversationSignature
|
|
|
|
|
|
async def stream_generate(prompt: str, mode: optionsSets.optionSet = optionsSets.jailbreak, context: bool or str = False):
|
|
timeout = aiohttp.ClientTimeout(total=900)
|
|
session = aiohttp.ClientSession(timeout=timeout)
|
|
|
|
conversationId, clientId, conversationSignature = await create_conversation()
|
|
|
|
wss = await session.ws_connect('wss://sydney.bing.com/sydney/ChatHub', ssl=ssl_context, autoping=False,
|
|
headers={
|
|
'accept': 'application/json',
|
|
'accept-language': 'en-US,en;q=0.9',
|
|
'content-type': 'application/json',
|
|
'sec-ch-ua': '"Not_A Brand";v="99", "Microsoft Edge";v="110", "Chromium";v="110"',
|
|
'sec-ch-ua-arch': '"x86"',
|
|
'sec-ch-ua-bitness': '"64"',
|
|
'sec-ch-ua-full-version': '"109.0.1518.78"',
|
|
'sec-ch-ua-full-version-list': '"Chromium";v="110.0.5481.192", "Not A(Brand";v="24.0.0.0", "Microsoft Edge";v="110.0.1587.69"',
|
|
'sec-ch-ua-mobile': '?0',
|
|
'sec-ch-ua-model': '',
|
|
'sec-ch-ua-platform': '"Windows"',
|
|
'sec-ch-ua-platform-version': '"15.0.0"',
|
|
'sec-fetch-dest': 'empty',
|
|
'sec-fetch-mode': 'cors',
|
|
'sec-fetch-site': 'same-origin',
|
|
'x-ms-client-request-id': str(uuid.uuid4()),
|
|
'x-ms-useragent': 'azsdk-js-api-client-factory/1.0.0-beta.1 core-rest-pipeline/1.10.0 OS/Win32',
|
|
'Referer': 'https://www.bing.com/search?q=Bing+AI&showconv=1&FORM=hpcodx',
|
|
'Referrer-Policy': 'origin-when-cross-origin',
|
|
'x-forwarded-for': Defaults.ip_address
|
|
})
|
|
|
|
await wss.send_str(_format({'protocol': 'json', 'version': 1}))
|
|
await wss.receive(timeout=900)
|
|
|
|
struct = {
|
|
'arguments': [
|
|
{
|
|
**mode,
|
|
'source': 'cib',
|
|
'allowedMessageTypes': Defaults.allowedMessageTypes,
|
|
'sliceIds': Defaults.sliceIds,
|
|
'traceId': os.urandom(16).hex(),
|
|
'isStartOfSession': True,
|
|
'message': Defaults.location | {
|
|
'author': 'user',
|
|
'inputMethod': 'Keyboard',
|
|
'text': prompt,
|
|
'messageType': 'Chat'
|
|
},
|
|
'conversationSignature': conversationSignature,
|
|
'participant': {
|
|
'id': clientId
|
|
},
|
|
'conversationId': conversationId
|
|
}
|
|
],
|
|
'invocationId': '0',
|
|
'target': 'chat',
|
|
'type': 4
|
|
}
|
|
|
|
if context:
|
|
struct['arguments'][0]['previousMessages'] = [
|
|
{
|
|
"author": "user",
|
|
"description": context,
|
|
"contextType": "WebPage",
|
|
"messageType": "Context",
|
|
"messageId": "discover-web--page-ping-mriduna-----"
|
|
}
|
|
]
|
|
|
|
await wss.send_str(_format(struct))
|
|
|
|
final = False
|
|
draw = False
|
|
resp_txt = ''
|
|
result_text = ''
|
|
resp_txt_no_link = ''
|
|
cache_text = ''
|
|
|
|
while not final:
|
|
msg = await wss.receive(timeout=900)
|
|
objects = msg.data.split(Defaults.delimiter)
|
|
|
|
for obj in objects:
|
|
if obj is None or not obj:
|
|
continue
|
|
|
|
response = json.loads(obj)
|
|
if response.get('type') == 1 and response['arguments'][0].get('messages',):
|
|
if not draw:
|
|
if (response['arguments'][0]['messages'][0]['contentOrigin'] != 'Apology') and not draw:
|
|
resp_txt = result_text + \
|
|
response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0].get(
|
|
'text', '')
|
|
resp_txt_no_link = result_text + \
|
|
response['arguments'][0]['messages'][0].get(
|
|
'text', '')
|
|
|
|
if response['arguments'][0]['messages'][0].get('messageType',):
|
|
resp_txt = (
|
|
resp_txt
|
|
+ response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text')
|
|
+ '\n'
|
|
)
|
|
result_text = (
|
|
result_text
|
|
+ response['arguments'][0]['messages'][0]['adaptiveCards'][0]['body'][0]['inlines'][0].get('text')
|
|
+ '\n'
|
|
)
|
|
|
|
if cache_text.endswith(' '):
|
|
final = True
|
|
if wss and not wss.closed:
|
|
await wss.close()
|
|
if session and not session.closed:
|
|
await session.close()
|
|
|
|
yield (resp_txt.replace(cache_text, ''))
|
|
cache_text = resp_txt
|
|
|
|
elif response.get('type') == 2:
|
|
if response['item']['result'].get('error'):
|
|
if wss and not wss.closed:
|
|
await wss.close()
|
|
if session and not session.closed:
|
|
await session.close()
|
|
|
|
raise Exception(
|
|
f"{response['item']['result']['value']}: {response['item']['result']['message']}")
|
|
|
|
if draw:
|
|
cache = response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text']
|
|
response['item']['messages'][1]['adaptiveCards'][0]['body'][0]['text'] = (
|
|
cache + resp_txt)
|
|
|
|
if (response['item']['messages'][-1]['contentOrigin'] == 'Apology' and resp_txt):
|
|
response['item']['messages'][-1]['text'] = resp_txt_no_link
|
|
response['item']['messages'][-1]['adaptiveCards'][0]['body'][0]['text'] = resp_txt
|
|
|
|
# print('Preserved the message from being deleted', file=sys.stderr)
|
|
|
|
final = True
|
|
if wss and not wss.closed:
|
|
await wss.close()
|
|
if session and not session.closed:
|
|
await session.close()
|
|
|
|
|
|
def run(generator):
|
|
loop = asyncio.get_event_loop()
|
|
gen = generator.__aiter__()
|
|
|
|
while True:
|
|
try:
|
|
next_val = loop.run_until_complete(gen.__anext__())
|
|
yield next_val
|
|
|
|
except StopAsyncIteration:
|
|
break
|
|
|
|
#print('Done')
|
|
|
|
|
|
def convert(messages):
|
|
context = ""
|
|
|
|
for message in messages:
|
|
context += "[%s](#message)\n%s\n\n" % (message['role'],
|
|
message['content'])
|
|
|
|
return context
|
|
|
|
|
|
def _create_completion(model: str, messages: list, stream: bool, **kwargs):
|
|
if len(messages) < 2:
|
|
prompt = messages[0]['content']
|
|
context = False
|
|
|
|
else:
|
|
prompt = messages[-1]['content']
|
|
context = convert(messages[:-1])
|
|
|
|
response = run(stream_generate(prompt, optionsSets.jailbreak, context))
|
|
for token in response:
|
|
yield (token)
|
|
|
|
#print('Done')
|
|
|
|
|
|
params = f'g4f.Providers.{os.path.basename(__file__)[:-3]} supports: ' + \
|
|
'(%s)' % ', '.join(
|
|
[f"{name}: {get_type_hints(_create_completion)[name].__name__}" for name in _create_completion.__code__.co_varnames[:_create_completion.__code__.co_argcount]])
|