gpt4free/g4f/Provider/MetaAI.py
2024-04-20 10:43:53 +02:00

199 lines
8.2 KiB
Python

import json
import uuid
import random
import time
import uuid
from typing import Dict, List
from aiohttp import ClientSession, BaseConnector
from ..typing import AsyncResult, Messages, Cookies
from ..requests import raise_for_status, DEFAULT_HEADERS
from ..image import ImageResponse
from .base_provider import AsyncGeneratorProvider
from .helper import format_prompt, get_connector, get_cookies
class MetaAI(AsyncGeneratorProvider):
url = "https://www.meta.ai"
working = True
def __init__(self, proxy: str = None, connector: BaseConnector = None):
self.session = ClientSession(connector=get_connector(connector, proxy), headers=DEFAULT_HEADERS)
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
proxy: str = None,
**kwargs
) -> AsyncResult:
#cookies = get_cookies(".meta.ai", False, True)
async for chunk in cls(proxy).prompt(format_prompt(messages)):
yield chunk
async def get_access_token(self, cookies: Cookies, birthday: str = "1999-01-01") -> str:
url = "https://www.meta.ai/api/graphql/"
payload = {
"lsd": cookies["lsd"],
"fb_api_caller_class": "RelayModern",
"fb_api_req_friendly_name": "useAbraAcceptTOSForTempUserMutation",
"variables": json.dumps({
"dob": birthday,
"icebreaker_type": "TEXT",
"__relay_internal__pv__WebPixelRatiorelayprovider": 1,
}),
"doc_id": "7604648749596940",
}
headers = {
"x-fb-friendly-name": "useAbraAcceptTOSForTempUserMutation",
"x-fb-lsd": cookies["lsd"],
"x-asbd-id": "129477",
"alt-used": "www.meta.ai",
"sec-fetch-site": "same-origin"
}
async with self.session.post(url, headers=headers, cookies=cookies, data=payload) as response:
await raise_for_status(response, "Fetch access_token failed")
auth_json = await response.json(content_type=None)
access_token = auth_json["data"]["xab_abra_accept_terms_of_service"]["new_temp_user_auth"]["access_token"]
return access_token
async def prompt(self, message: str, cookies: Cookies = None) -> AsyncResult:
access_token = None
if cookies is None:
cookies = await self.get_cookies()
access_token = await self.get_access_token(cookies)
else:
cookies = await self.get_cookies(cookies)
url = "https://graph.meta.ai/graphql?locale=user"
#url = "https://www.meta.ai/api/graphql/"
payload = {
"access_token": access_token,
#"lsd": cookies["lsd"],
"fb_api_caller_class": "RelayModern",
"fb_api_req_friendly_name": "useAbraSendMessageMutation",
"variables": json.dumps({
"message": {"sensitive_string_value": message},
"externalConversationId": str(uuid.uuid4()),
"offlineThreadingId": generate_offline_threading_id(),
"suggestedPromptIndex": None,
"flashVideoRecapInput": {"images": []},
"flashPreviewInput": None,
"promptPrefix": None,
"entrypoint": "ABRA__CHAT__TEXT",
"icebreaker_type": "TEXT",
"__relay_internal__pv__AbraDebugDevOnlyrelayprovider": False,
"__relay_internal__pv__WebPixelRatiorelayprovider": 1,
}),
"server_timestamps": "true",
"doc_id": "7783822248314888",
}
headers = {
"x-asbd-id": "129477",
"x-fb-friendly-name": "useAbraSendMessageMutation",
#"x-fb-lsd": cookies["lsd"],
}
async with self.session.post(url, headers=headers, cookies=cookies, data=payload) as response:
await raise_for_status(response, "Fetch response failed")
last_snippet_len = 0
fetch_id = None
async for line in response.content:
try:
json_line = json.loads(line)
except json.JSONDecodeError:
continue
bot_response_message = json_line.get("data", {}).get("node", {}).get("bot_response_message", {})
streaming_state = bot_response_message.get("streaming_state")
fetch_id = bot_response_message.get("fetch_id")
if streaming_state in ("STREAMING", "OVERALL_DONE"):
#imagine_card = bot_response_message["imagine_card"]
snippet = bot_response_message["snippet"]
yield snippet[last_snippet_len:]
last_snippet_len = len(snippet)
elif streaming_state == "OVERALL_DONE":
break
#if last_streamed_response is None:
# if attempts > 3:
# raise Exception("MetaAI is having issues and was not able to respond (Server Error)")
# access_token = await self.get_access_token()
# return await self.prompt(message=message, attempts=attempts + 1)
if fetch_id is not None:
sources = await self.fetch_sources(fetch_id, cookies, access_token)
if sources is not None:
yield sources
async def get_cookies(self, cookies: Cookies = None) -> dict:
async with self.session.get("https://www.meta.ai/", cookies=cookies) as response:
await raise_for_status(response, "Fetch home failed")
text = await response.text()
if cookies is None:
cookies = {
"_js_datr": self.extract_value(text, "_js_datr"),
"abra_csrf": self.extract_value(text, "abra_csrf"),
"datr": self.extract_value(text, "datr"),
}
cookies["lsd"] = self.extract_value(text, start_str='"LSD",[],{"token":"', end_str='"}')
return cookies
async def fetch_sources(self, fetch_id: str, cookies: Cookies, access_token: str) -> List[Dict]:
url = "https://graph.meta.ai/graphql?locale=user"
payload = {
"access_token": access_token,
"fb_api_caller_class": "RelayModern",
"fb_api_req_friendly_name": "AbraSearchPluginDialogQuery",
"variables": json.dumps({"abraMessageFetchID": fetch_id}),
"server_timestamps": "true",
"doc_id": "6946734308765963",
}
headers = {
"authority": "graph.meta.ai",
"x-fb-friendly-name": "AbraSearchPluginDialogQuery",
}
async with self.session.post(url, headers=headers, cookies=cookies, data=payload) as response:
await raise_for_status(response)
response_json = await response.json()
try:
message = response_json["data"]["message"]
if message is not None:
searchResults = message["searchResults"]
if searchResults is not None:
return Sources(searchResults["references"])
except (KeyError, TypeError):
raise RuntimeError(f"Response: {response_json}")
@staticmethod
def extract_value(text: str, key: str = None, start_str = None, end_str = '",') -> str:
if start_str is None:
start_str = f'{key}":{{"value":"'
start = text.find(start_str)
if start >= 0:
start+= len(start_str)
end = text.find(end_str, start)
return text[start:end]
def generate_offline_threading_id() -> str:
"""
Generates an offline threading ID.
Returns:
str: The generated offline threading ID.
"""
# Generate a random 64-bit integer
random_value = random.getrandbits(64)
# Get the current timestamp in milliseconds
timestamp = int(time.time() * 1000)
# Combine timestamp and random value
threading_id = (timestamp << 22) | (random_value & ((1 << 22) - 1))
return str(threading_id)
class Sources():
def __init__(self, list: List[Dict[str, str]]) -> None:
self.list = list
def __str__(self) -> str:
return "\n\n" + ("\n".join([f"[{link['title']}]({link['link']})" for link in self.list]))