gpt4free/g4f/Provider/MagickPen.py

92 lines
3.0 KiB
Python
Raw Normal View History

2024-09-07 01:16:11 +03:00
from __future__ import annotations
2024-09-24 13:23:53 +03:00
from aiohttp import ClientSession
import hashlib
2024-09-07 01:16:11 +03:00
import time
import random
import re
2024-09-24 13:23:53 +03:00
import json
2024-09-07 01:16:11 +03:00
from ..typing import AsyncResult, Messages
from .base_provider import AsyncGeneratorProvider, ProviderModelMixin
from .helper import format_prompt
class MagickPen(AsyncGeneratorProvider, ProviderModelMixin):
url = "https://magickpen.com"
2024-09-24 13:23:53 +03:00
api_endpoint = "https://api.magickpen.com/ask"
2024-09-07 01:16:11 +03:00
working = True
supports_gpt_4 = True
2024-09-24 13:23:53 +03:00
supports_stream = True
supports_system_message = True
supports_message_history = True
2024-09-07 01:16:11 +03:00
2024-09-24 13:23:53 +03:00
default_model = 'gpt-4o-mini'
models = ['gpt-4o-mini']
2024-09-07 01:16:11 +03:00
@classmethod
2024-09-24 13:23:53 +03:00
async def fetch_api_credentials(cls) -> tuple:
url = "https://magickpen.com/_nuxt/9e47cd7579e60a9d1f13.js"
2024-09-07 01:16:11 +03:00
async with ClientSession() as session:
async with session.get(url) as response:
2024-09-24 13:23:53 +03:00
text = await response.text()
# Extract the necessary values from the file
pattern = r'"X-API-Secret":"(\w+)"'
match = re.search(pattern, text)
X_API_SECRET = match.group(1) if match else None
# Generate timestamp and nonce
timestamp = str(int(time.time() * 1000)) # in milliseconds
nonce = str(random.random())
# Generate the signature
s = ["TGDBU9zCgM", timestamp, nonce]
s.sort()
signature_string = ''.join(s)
signature = hashlib.md5(signature_string.encode()).hexdigest()
pattern = r'secret:"(\w+)"'
match = re.search(pattern, text)
secret = match.group(1) if match else None
if X_API_SECRET and timestamp and nonce and secret:
return X_API_SECRET, signature, timestamp, nonce, secret
else:
raise Exception("Unable to extract all the necessary data from the JavaScript file.")
2024-09-07 01:16:11 +03:00
@classmethod
async def create_async_generator(
cls,
model: str,
messages: Messages,
proxy: str = None,
**kwargs
) -> AsyncResult:
model = cls.get_model(model)
2024-09-24 13:23:53 +03:00
X_API_SECRET, signature, timestamp, nonce, secret = await cls.fetch_api_credentials()
2024-09-07 01:16:11 +03:00
headers = {
2024-09-24 13:23:53 +03:00
'accept': 'application/json, text/plain, */*',
'accept-language': 'en-US,en;q=0.9',
'content-type': 'application/json',
'nonce': nonce,
'origin': cls.url,
'referer': f"{cls.url}/",
'secret': secret,
'signature': signature,
'timestamp': timestamp,
'x-api-secret': X_API_SECRET,
2024-09-07 01:16:11 +03:00
}
async with ClientSession(headers=headers) as session:
2024-09-24 13:23:53 +03:00
prompt = format_prompt(messages)
payload = {
'query': prompt,
'turnstileResponse': '',
'action': 'verify'
}
async with session.post(cls.api_endpoint, json=payload, proxy=proxy) as response:
response.raise_for_status()
async for chunk in response.content:
if chunk:
yield chunk.decode()