mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-12-18 08:02:09 +03:00
89 lines
2.6 KiB
Python
89 lines
2.6 KiB
Python
from __future__ import annotations
|
|
|
|
import time, random, json
|
|
|
|
from ..requests import StreamSession
|
|
from ..typing import AsyncResult, Messages
|
|
from .base_provider import AsyncGeneratorProvider
|
|
from .helper import format_prompt
|
|
|
|
class MyShell(AsyncGeneratorProvider):
|
|
url = "https://api.myshell.ai/v1/bot/chat/send_message"
|
|
|
|
@classmethod
|
|
async def create_async_generator(
|
|
cls,
|
|
model: str,
|
|
messages: Messages,
|
|
proxy: str = None,
|
|
timeout: int = 120,
|
|
**kwargs
|
|
) -> AsyncResult:
|
|
user_agent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36"
|
|
headers = {
|
|
"User-Agent": user_agent,
|
|
"Myshell-Service-Name": "organics-api",
|
|
"Visitor-Id": generate_visitor_id(user_agent)
|
|
}
|
|
async with StreamSession(
|
|
impersonate="chrome107",
|
|
proxies={"https": proxy},
|
|
timeout=timeout,
|
|
headers=headers
|
|
) as session:
|
|
prompt = format_prompt(messages)
|
|
data = {
|
|
"botId": "1",
|
|
"conversation_scenario": 3,
|
|
"message": prompt,
|
|
"messageType": 1
|
|
}
|
|
async with session.post(cls.url, json=data) as response:
|
|
response.raise_for_status()
|
|
event = None
|
|
async for line in response.iter_lines():
|
|
if line.startswith(b"event: "):
|
|
event = line[7:]
|
|
elif event == b"MESSAGE_REPLY_SSE_ELEMENT_EVENT_NAME_TEXT":
|
|
if line.startswith(b"data: "):
|
|
yield json.loads(line[6:])["content"]
|
|
if event == b"MESSAGE_REPLY_SSE_ELEMENT_EVENT_NAME_TEXT_STREAM_PUSH_FINISHED":
|
|
break
|
|
|
|
|
|
def xor_hash(B: str):
|
|
r = []
|
|
i = 0
|
|
|
|
def o(e, t):
|
|
o_val = 0
|
|
for i in range(len(t)):
|
|
o_val |= r[i] << (8 * i)
|
|
return e ^ o_val
|
|
|
|
for e in range(len(B)):
|
|
t = ord(B[e])
|
|
r.insert(0, 255 & t)
|
|
|
|
if len(r) >= 4:
|
|
i = o(i, r)
|
|
r = []
|
|
|
|
if len(r) > 0:
|
|
i = o(i, r)
|
|
|
|
return hex(i)[2:]
|
|
|
|
def performance() -> str:
|
|
t = int(time.time() * 1000)
|
|
e = 0
|
|
while t == int(time.time() * 1000):
|
|
e += 1
|
|
return hex(t)[2:] + hex(e)[2:]
|
|
|
|
def generate_visitor_id(user_agent: str) -> str:
|
|
f = performance()
|
|
r = hex(int(random.random() * (16**16)))[2:-2]
|
|
d = xor_hash(user_agent)
|
|
e = hex(1080 * 1920)[2:]
|
|
return f"{f}-{r}-{d}-{e}-{f}" |