gpt4free/g4f/Provider/helper.py
Heiner Lohaus 3c2755bc72 Add ChatgptDuo and Aibn Provider
Add support for "nest_asyncio",
Reuse event_loops with event_loop_policy
Support for  "create_async" with synchron provider
2023-09-26 10:03:37 +02:00

54 lines
1.8 KiB
Python

from __future__ import annotations
import asyncio
import sys
from asyncio import AbstractEventLoop
import browser_cookie3
_cookies: dict[str, dict[str, str]] = {}
# Use own event_loop_policy with a selector event loop on windows.
if sys.platform == 'win32':
_event_loop_policy = asyncio.WindowsSelectorEventLoopPolicy()
else:
_event_loop_policy = asyncio.get_event_loop_policy()
# If event loop is already running, handle nested event loops
# If "nest_asyncio" is installed, patch the event loop.
def get_event_loop() -> AbstractEventLoop:
try:
asyncio.get_running_loop()
except RuntimeError:
return _event_loop_policy.get_event_loop()
try:
event_loop = _event_loop_policy.get_event_loop()
if not hasattr(event_loop.__class__, "_nest_patched"):
import nest_asyncio
nest_asyncio.apply(event_loop)
return event_loop
except ImportError:
raise RuntimeError(
'Use "create_async" instead of "create" function in a running event loop. Or install the "nest_asyncio" package.')
# Load cookies for a domain from all supported browser.
# Cache the results in the "_cookies" variable
def get_cookies(cookie_domain: str) -> dict:
if cookie_domain not in _cookies:
_cookies[cookie_domain] = {}
try:
for cookie in browser_cookie3.load(cookie_domain):
_cookies[cookie_domain][cookie.name] = cookie.value
except:
pass
return _cookies[cookie_domain]
def format_prompt(messages: list[dict[str, str]], add_special_tokens=False):
if add_special_tokens or len(messages) > 1:
formatted = "\n".join(
["%s: %s" % ((message["role"]).capitalize(), message["content"]) for message in messages]
)
return f"{formatted}\nAssistant:"
else:
return messages[0]["content"]