mirror of
https://github.com/xtekky/gpt4free.git
synced 2024-12-23 11:02:40 +03:00
Merge pull request #1564 from hlohaus/gemini
Add Gemini Provider with image upload and generation
This commit is contained in:
commit
999bc2d617
26
README.md
26
README.md
@ -316,7 +316,7 @@ For generating images with Bing and for the OpenAi Chat you need cookies or a t
|
||||
```python
|
||||
from g4f import set_cookies
|
||||
|
||||
set_cookies(".bing", {
|
||||
set_cookies(".bing.com", {
|
||||
"_U": "cookie value"
|
||||
})
|
||||
set_cookies("chat.openai.com", {
|
||||
@ -336,6 +336,30 @@ pip install browser_cookie3
|
||||
pip install g4f[webdriver]
|
||||
```
|
||||
|
||||
##### Image Upload & Generation
|
||||
|
||||
Image upload and generation are supported by three main providers:
|
||||
|
||||
- **Bing & Other GPT-4 Providers:** Utilizes Microsoft's Image Creator.
|
||||
- **Google Gemini:** Available for free accounts with IP addresses outside Europe.
|
||||
- **OpenaiChat with GPT-4:** Accessible for users with a Plus subscription.
|
||||
|
||||
```python
|
||||
import g4f
|
||||
|
||||
# Setting up the request for image creation
|
||||
response = g4f.ChatCompletion.create(
|
||||
model=g4f.models.default, # Using the default model
|
||||
provider=g4f.Provider.Gemini, # Specifying the provider as Gemini
|
||||
messages=[{"role": "user", "content": "Create an image like this"}],
|
||||
image=open("images/g4f.png", "rb"), # Image input can be a data URI, bytes, PIL Image, or IO object
|
||||
image_name="g4f.png" # Optional: specifying the filename
|
||||
)
|
||||
|
||||
# Displaying the response
|
||||
print(response)
|
||||
```
|
||||
|
||||
##### Using Browser
|
||||
|
||||
Some providers using a browser to bypass the bot protection. They using the selenium webdriver to control the browser. The browser settings and the login data are saved in a custom directory. If the headless mode is enabled, the browser windows are loaded invisibly. For performance reasons, it is recommended to reuse the browser instances and close them yourself at the end:
|
||||
|
@ -5,9 +5,9 @@ from .retry_provider import RetryProvider
|
||||
from .base_provider import AsyncProvider, AsyncGeneratorProvider
|
||||
from .create_images import CreateImagesProvider
|
||||
from .deprecated import *
|
||||
from .selenium import *
|
||||
from .needs_auth import *
|
||||
from .unfinished import *
|
||||
from .selenium import *
|
||||
|
||||
from .AiAsk import AiAsk
|
||||
from .AiChatOnline import AiChatOnline
|
||||
|
@ -23,7 +23,7 @@ from ..helper import get_cookies, get_connector
|
||||
from ...webdriver import WebDriver, get_driver_cookies, get_browser
|
||||
from ...base_provider import ProviderType
|
||||
from ...image import ImageResponse
|
||||
from ...errors import MissingRequirementsError, MissingAccessToken
|
||||
from ...errors import MissingRequirementsError, MissingAuthError
|
||||
|
||||
BING_URL = "https://www.bing.com"
|
||||
TIMEOUT_LOGIN = 1200
|
||||
@ -210,7 +210,7 @@ class CreateImagesBing:
|
||||
try:
|
||||
self.cookies = get_cookies_from_browser(self.proxy)
|
||||
except MissingRequirementsError as e:
|
||||
raise MissingAccessToken(f'Missing "_U" cookie. {e}')
|
||||
raise MissingAuthError(f'Missing "_U" cookie. {e}')
|
||||
yield asyncio.run(self.create_async(prompt))
|
||||
|
||||
async def create_async(self, prompt: str) -> ImageResponse:
|
||||
@ -225,7 +225,7 @@ class CreateImagesBing:
|
||||
"""
|
||||
cookies = self.cookies or get_cookies(".bing.com", False)
|
||||
if "_U" not in cookies:
|
||||
raise MissingAccessToken('Missing "_U" cookie')
|
||||
raise MissingAuthError('Missing "_U" cookie')
|
||||
proxy = os.environ.get("G4F_PROXY")
|
||||
async with create_session(cookies, proxy) as session:
|
||||
images = await create_images(session, prompt, self.proxy)
|
||||
|
205
g4f/Provider/needs_auth/Gemini.py
Normal file
205
g4f/Provider/needs_auth/Gemini.py
Normal file
@ -0,0 +1,205 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import json
|
||||
import random
|
||||
import re
|
||||
|
||||
from aiohttp import ClientSession
|
||||
|
||||
try:
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
from ...typing import Messages, Cookies, ImageType, AsyncResult
|
||||
from ..base_provider import AsyncGeneratorProvider
|
||||
from ..helper import format_prompt, get_cookies
|
||||
from ...errors import MissingAuthError, MissingRequirementsError
|
||||
from ...image import to_bytes, ImageResponse
|
||||
from ...webdriver import get_browser, get_driver_cookies
|
||||
|
||||
REQUEST_HEADERS = {
|
||||
"authority": "gemini.google.com",
|
||||
"origin": "https://gemini.google.com",
|
||||
"referer": "https://gemini.google.com/",
|
||||
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
|
||||
'x-same-domain': '1',
|
||||
}
|
||||
REQUEST_BL_PARAM = "boq_assistant-bard-web-server_20240201.08_p8"
|
||||
REQUEST_URL = "https://gemini.google.com/_/BardChatUi/data/assistant.lamda.BardFrontendService/StreamGenerate"
|
||||
UPLOAD_IMAGE_URL = "https://content-push.googleapis.com/upload/"
|
||||
UPLOAD_IMAGE_HEADERS = {
|
||||
"authority": "content-push.googleapis.com",
|
||||
"accept": "*/*",
|
||||
"accept-language": "en-US,en;q=0.7",
|
||||
"authorization": "Basic c2F2ZXM6cyNMdGhlNmxzd2F2b0RsN3J1d1U=",
|
||||
"content-type": "application/x-www-form-urlencoded;charset=UTF-8",
|
||||
"origin": "https://gemini.google.com",
|
||||
"push-id": "feeds/mcudyrk2a4khkz",
|
||||
"referer": "https://gemini.google.com/",
|
||||
"x-goog-upload-command": "start",
|
||||
"x-goog-upload-header-content-length": "",
|
||||
"x-goog-upload-protocol": "resumable",
|
||||
"x-tenant-id": "bard-storage",
|
||||
}
|
||||
|
||||
class Gemini(AsyncGeneratorProvider):
|
||||
url = "https://gemini.google.com"
|
||||
needs_auth = True
|
||||
working = True
|
||||
supports_stream = False
|
||||
|
||||
@classmethod
|
||||
async def create_async_generator(
|
||||
cls,
|
||||
model: str,
|
||||
messages: Messages,
|
||||
proxy: str = None,
|
||||
cookies: Cookies = None,
|
||||
image: ImageType = None,
|
||||
image_name: str = None,
|
||||
**kwargs
|
||||
) -> AsyncResult:
|
||||
prompt = format_prompt(messages)
|
||||
|
||||
if not cookies:
|
||||
driver = None
|
||||
try:
|
||||
driver = get_browser(proxy=proxy)
|
||||
try:
|
||||
driver.get(f"{cls.url}/app")
|
||||
WebDriverWait(driver, 5).until(
|
||||
EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea"))
|
||||
)
|
||||
except:
|
||||
login_url = os.environ.get("G4F_LOGIN_URL")
|
||||
if login_url:
|
||||
yield f"Please login: [Google Gemini]({login_url})\n\n"
|
||||
WebDriverWait(driver, 240).until(
|
||||
EC.visibility_of_element_located((By.CSS_SELECTOR, "div.ql-editor.textarea"))
|
||||
)
|
||||
cookies = get_driver_cookies(driver)
|
||||
except MissingRequirementsError:
|
||||
pass
|
||||
finally:
|
||||
if driver:
|
||||
driver.close()
|
||||
|
||||
if not cookies:
|
||||
cookies = get_cookies(".google.com", False)
|
||||
if "__Secure-1PSID" not in cookies:
|
||||
raise MissingAuthError('Missing "__Secure-1PSID" cookie')
|
||||
|
||||
image_url = await cls.upload_image(to_bytes(image), image_name, proxy) if image else None
|
||||
|
||||
async with ClientSession(
|
||||
cookies=cookies,
|
||||
headers=REQUEST_HEADERS
|
||||
) as session:
|
||||
async with session.get(cls.url, proxy=proxy) as response:
|
||||
text = await response.text()
|
||||
match = re.search(r'SNlM0e\":\"(.*?)\"', text)
|
||||
if match:
|
||||
snlm0e = match.group(1)
|
||||
else:
|
||||
raise RuntimeError("SNlM0e not found")
|
||||
|
||||
params = {
|
||||
'bl': REQUEST_BL_PARAM,
|
||||
'_reqid': random.randint(1111, 9999),
|
||||
'rt': 'c'
|
||||
}
|
||||
data = {
|
||||
'at': snlm0e,
|
||||
'f.req': json.dumps([None, json.dumps(cls.build_request(
|
||||
prompt,
|
||||
image_url=image_url,
|
||||
image_name=image_name
|
||||
))])
|
||||
}
|
||||
async with session.post(
|
||||
REQUEST_URL,
|
||||
data=data,
|
||||
params=params,
|
||||
proxy=proxy
|
||||
) as response:
|
||||
response = await response.text()
|
||||
response_part = json.loads(json.loads(response.splitlines()[-5])[0][2])
|
||||
if response_part[4] is None:
|
||||
response_part = json.loads(json.loads(response.splitlines()[-7])[0][2])
|
||||
|
||||
content = response_part[4][0][1][0]
|
||||
image_prompt = None
|
||||
match = re.search(r'\[Imagen of (.*?)\]', content)
|
||||
if match:
|
||||
image_prompt = match.group(1)
|
||||
content = content.replace(match.group(0), '')
|
||||
|
||||
yield content
|
||||
if image_prompt:
|
||||
images = [image[0][3][3] for image in response_part[4][0][12][7][0]]
|
||||
resolved_images = []
|
||||
for image in images:
|
||||
async with session.get(image, allow_redirects=False) as fetch:
|
||||
image = fetch.headers["location"]
|
||||
async with session.get(image, allow_redirects=False) as fetch:
|
||||
image = fetch.headers["location"]
|
||||
resolved_images.append(image)
|
||||
yield ImageResponse(resolved_images, image_prompt, {"orginal_links": images})
|
||||
|
||||
def build_request(
|
||||
prompt: str,
|
||||
conversation_id: str = "",
|
||||
response_id: str = "",
|
||||
choice_id: str = "",
|
||||
image_url: str = None,
|
||||
image_name: str = None,
|
||||
tools: list[list[str]] = []
|
||||
) -> list:
|
||||
image_list = [[[image_url, 1], image_name]] if image_url else []
|
||||
return [
|
||||
[prompt, 0, None, image_list, None, None, 0],
|
||||
["en"],
|
||||
[conversation_id, response_id, choice_id, None, None, []],
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
[1],
|
||||
0,
|
||||
[],
|
||||
tools,
|
||||
1,
|
||||
0,
|
||||
]
|
||||
|
||||
async def upload_image(image: bytes, image_name: str = None, proxy: str = None):
|
||||
async with ClientSession(
|
||||
headers=UPLOAD_IMAGE_HEADERS
|
||||
) as session:
|
||||
async with session.options(UPLOAD_IMAGE_URL, proxy=proxy) as reponse:
|
||||
reponse.raise_for_status()
|
||||
|
||||
headers = {
|
||||
"size": str(len(image)),
|
||||
"x-goog-upload-command": "start"
|
||||
}
|
||||
data = f"File name: {image_name}" if image_name else None
|
||||
async with session.post(
|
||||
UPLOAD_IMAGE_URL, headers=headers, data=data, proxy=proxy
|
||||
) as response:
|
||||
response.raise_for_status()
|
||||
upload_url = response.headers["X-Goog-Upload-Url"]
|
||||
|
||||
async with session.options(upload_url, headers=headers) as response:
|
||||
response.raise_for_status()
|
||||
|
||||
headers["x-goog-upload-command"] = "upload, finalize"
|
||||
headers["X-Goog-Upload-Offset"] = "0"
|
||||
async with session.post(
|
||||
upload_url, headers=headers, data=image, proxy=proxy
|
||||
) as response:
|
||||
response.raise_for_status()
|
||||
return await response.text()
|
@ -25,7 +25,7 @@ from ...webdriver import get_browser, get_driver_cookies
|
||||
from ...typing import AsyncResult, Messages, Cookies, ImageType
|
||||
from ...requests import StreamSession
|
||||
from ...image import to_image, to_bytes, ImageResponse, ImageRequest
|
||||
from ...errors import MissingRequirementsError, MissingAccessToken
|
||||
from ...errors import MissingRequirementsError, MissingAuthError
|
||||
|
||||
|
||||
class OpenaiChat(AsyncGeneratorProvider, ProviderModelMixin):
|
||||
@ -99,7 +99,8 @@ class OpenaiChat(AsyncGeneratorProvider, ProviderModelMixin):
|
||||
cls,
|
||||
session: StreamSession,
|
||||
headers: dict,
|
||||
image: ImageType
|
||||
image: ImageType,
|
||||
image_name: str = None
|
||||
) -> ImageRequest:
|
||||
"""
|
||||
Upload an image to the service and get the download URL
|
||||
@ -118,7 +119,7 @@ class OpenaiChat(AsyncGeneratorProvider, ProviderModelMixin):
|
||||
# Convert the image to a bytes object and get the size
|
||||
data_bytes = to_bytes(image)
|
||||
data = {
|
||||
"file_name": f"{image.width}x{image.height}.{extension}",
|
||||
"file_name": image_name if image_name else f"{image.width}x{image.height}.{extension}",
|
||||
"file_size": len(data_bytes),
|
||||
"use_case": "multimodal"
|
||||
}
|
||||
@ -338,7 +339,7 @@ class OpenaiChat(AsyncGeneratorProvider, ProviderModelMixin):
|
||||
try:
|
||||
access_token, cookies = cls.browse_access_token(proxy)
|
||||
except MissingRequirementsError:
|
||||
raise MissingAccessToken(f'Missing "access_token"')
|
||||
raise MissingAuthError(f'Missing "access_token"')
|
||||
cls._cookies = cookies
|
||||
|
||||
headers = {"Authorization": f"Bearer {access_token}"}
|
||||
@ -351,7 +352,7 @@ class OpenaiChat(AsyncGeneratorProvider, ProviderModelMixin):
|
||||
try:
|
||||
image_response = None
|
||||
if image:
|
||||
image_response = await cls.upload_image(session, headers, image)
|
||||
image_response = await cls.upload_image(session, headers, image, kwargs.get("image_name"))
|
||||
except Exception as e:
|
||||
yield e
|
||||
end_turn = EndTurn()
|
||||
@ -438,21 +439,18 @@ class OpenaiChat(AsyncGeneratorProvider, ProviderModelMixin):
|
||||
Returns:
|
||||
tuple[str, dict]: A tuple containing the access token and cookies.
|
||||
"""
|
||||
driver = get_browser(proxy=proxy)
|
||||
try:
|
||||
with get_browser(proxy=proxy) as driver:
|
||||
driver.get(f"{cls.url}/")
|
||||
WebDriverWait(driver, timeout).until(EC.presence_of_element_located((By.ID, "prompt-textarea")))
|
||||
access_token = driver.execute_script(
|
||||
"let session = await fetch('/api/auth/session');"
|
||||
"let data = await session.json();"
|
||||
"let accessToken = data['accessToken'];"
|
||||
"let expires = new Date(); expires.setTime(expires.getTime() + 60 * 60 * 24 * 7);"
|
||||
"let expires = new Date(); expires.setTime(expires.getTime() + 60 * 60 * 4);"
|
||||
"document.cookie = 'access_token=' + accessToken + ';expires=' + expires.toUTCString() + ';path=/';"
|
||||
"return accessToken;"
|
||||
)
|
||||
return access_token, get_driver_cookies(driver)
|
||||
finally:
|
||||
driver.quit()
|
||||
|
||||
@classmethod
|
||||
async def get_arkose_token(cls, session: StreamSession) -> str:
|
||||
|
@ -3,7 +3,8 @@ from __future__ import annotations
|
||||
import requests
|
||||
|
||||
from ...typing import Any, CreateResult, Messages
|
||||
from ..base_provider import AbstractProvider
|
||||
from ..base_provider import AbstractProvider, ProviderModelMixin
|
||||
from ...errors import MissingAuthError
|
||||
|
||||
models = {
|
||||
"theb-ai": "TheB.AI",
|
||||
@ -29,13 +30,16 @@ models = {
|
||||
"qwen-7b-chat": "Qwen 7B"
|
||||
}
|
||||
|
||||
class ThebApi(AbstractProvider):
|
||||
class ThebApi(AbstractProvider, ProviderModelMixin):
|
||||
url = "https://theb.ai"
|
||||
working = True
|
||||
needs_auth = True
|
||||
default_model = "gpt-3.5-turbo"
|
||||
models = list(models)
|
||||
|
||||
@staticmethod
|
||||
@classmethod
|
||||
def create_completion(
|
||||
cls,
|
||||
model: str,
|
||||
messages: Messages,
|
||||
stream: bool,
|
||||
@ -43,8 +47,8 @@ class ThebApi(AbstractProvider):
|
||||
proxy: str = None,
|
||||
**kwargs
|
||||
) -> CreateResult:
|
||||
if model and model not in models:
|
||||
raise ValueError(f"Model are not supported: {model}")
|
||||
if not auth:
|
||||
raise MissingAuthError("Missing auth")
|
||||
headers = {
|
||||
'accept': 'application/json',
|
||||
'authorization': f'Bearer {auth}',
|
||||
@ -54,7 +58,7 @@ class ThebApi(AbstractProvider):
|
||||
# models = dict([(m["id"], m["name"]) for m in response])
|
||||
# print(json.dumps(models, indent=4))
|
||||
data: dict[str, Any] = {
|
||||
"model": model if model else "gpt-3.5-turbo",
|
||||
"model": cls.get_model(model),
|
||||
"messages": messages,
|
||||
"stream": False,
|
||||
"model_params": {
|
||||
|
@ -1,4 +1,4 @@
|
||||
from .Bard import Bard
|
||||
from .Gemini import Gemini
|
||||
from .Raycast import Raycast
|
||||
from .Theb import Theb
|
||||
from .ThebApi import ThebApi
|
||||
|
@ -20,6 +20,7 @@ class Bard(AbstractProvider):
|
||||
url = "https://bard.google.com"
|
||||
working = True
|
||||
needs_auth = True
|
||||
webdriver = True
|
||||
|
||||
@classmethod
|
||||
def create_completion(
|
@ -2,4 +2,5 @@ from .AItianhuSpace import AItianhuSpace
|
||||
from .MyShell import MyShell
|
||||
from .PerplexityAi import PerplexityAi
|
||||
from .Phind import Phind
|
||||
from .TalkAi import TalkAi
|
||||
from .TalkAi import TalkAi
|
||||
from .Bard import Bard
|
@ -91,7 +91,7 @@ class ChatCompletion:
|
||||
auth : Union[str, None] = None,
|
||||
ignored : list[str] = None,
|
||||
ignore_working: bool = False,
|
||||
ignore_stream_and_auth: bool = False,
|
||||
ignore_stream: bool = False,
|
||||
patch_provider: callable = None,
|
||||
**kwargs) -> Union[CreateResult, str]:
|
||||
"""
|
||||
@ -105,7 +105,7 @@ class ChatCompletion:
|
||||
auth (Union[str, None], optional): Authentication token or credentials, if required.
|
||||
ignored (list[str], optional): List of provider names to be ignored.
|
||||
ignore_working (bool, optional): If True, ignores the working status of the provider.
|
||||
ignore_stream_and_auth (bool, optional): If True, ignores the stream and authentication requirement checks.
|
||||
ignore_stream (bool, optional): If True, ignores the stream and authentication requirement checks.
|
||||
patch_provider (callable, optional): Function to modify the provider.
|
||||
**kwargs: Additional keyword arguments.
|
||||
|
||||
@ -118,10 +118,11 @@ class ChatCompletion:
|
||||
ProviderNotWorkingError: If the provider is not operational.
|
||||
StreamNotSupportedError: If streaming is requested but not supported by the provider.
|
||||
"""
|
||||
model, provider = get_model_and_provider(model, provider, stream, ignored, ignore_working, ignore_stream_and_auth)
|
||||
|
||||
if not ignore_stream_and_auth and provider.needs_auth and not auth:
|
||||
raise AuthenticationRequiredError(f'{provider.__name__} requires authentication (use auth=\'cookie or token or jwt ...\' param)')
|
||||
model, provider = get_model_and_provider(
|
||||
model, provider, stream,
|
||||
ignored, ignore_working,
|
||||
ignore_stream or kwargs.get("ignore_stream_and_auth")
|
||||
)
|
||||
|
||||
if auth:
|
||||
kwargs['auth'] = auth
|
||||
@ -135,7 +136,7 @@ class ChatCompletion:
|
||||
provider = patch_provider(provider)
|
||||
|
||||
result = provider.create_completion(model, messages, stream, **kwargs)
|
||||
return result if stream else ''.join(result)
|
||||
return result if stream else ''.join([str(chunk) for chunk in result])
|
||||
|
||||
@staticmethod
|
||||
def create_async(model : Union[Model, str],
|
||||
|
@ -7,9 +7,6 @@ class ProviderNotWorkingError(Exception):
|
||||
class StreamNotSupportedError(Exception):
|
||||
pass
|
||||
|
||||
class AuthenticationRequiredError(Exception):
|
||||
pass
|
||||
|
||||
class ModelNotFoundError(Exception):
|
||||
pass
|
||||
|
||||
@ -37,5 +34,5 @@ class MissingRequirementsError(Exception):
|
||||
class MissingAiohttpSocksError(MissingRequirementsError):
|
||||
pass
|
||||
|
||||
class MissingAccessToken(Exception):
|
||||
class MissingAuthError(Exception):
|
||||
pass
|
@ -154,7 +154,7 @@
|
||||
<option value="Bing">Bing</option>
|
||||
<option value="OpenaiChat">OpenaiChat</option>
|
||||
<option value="HuggingChat">HuggingChat</option>
|
||||
<option value="Bard">Bard</option>
|
||||
<option value="Gemini">Gemini</option>
|
||||
<option value="Liaobots">Liaobots</option>
|
||||
<option value="Phind">Phind</option>
|
||||
<option value="">----</option>
|
||||
|
@ -162,7 +162,7 @@ class Backend_Api:
|
||||
"provider": provider,
|
||||
"messages": messages,
|
||||
"stream": True,
|
||||
"ignore_stream_and_auth": True,
|
||||
"ignore_stream": True,
|
||||
"patch_provider": patch,
|
||||
**kwargs
|
||||
}
|
||||
|
25
g4f/image.py
25
g4f/image.py
@ -46,9 +46,8 @@ def to_image(image: ImageType, is_svg: bool = False) -> Image:
|
||||
return open_image(BytesIO(image))
|
||||
elif not isinstance(image, Image):
|
||||
image = open_image(image)
|
||||
copy = image.copy()
|
||||
copy.format = image.format
|
||||
return copy
|
||||
image.load()
|
||||
return image
|
||||
return image
|
||||
|
||||
def is_allowed_extension(filename: str) -> bool:
|
||||
@ -210,20 +209,28 @@ def format_images_markdown(images, alt: str, preview: str = None) -> str:
|
||||
end_flag = "<!-- generated images end -->\n"
|
||||
return f"\n{start_flag}{images}\n{end_flag}\n"
|
||||
|
||||
def to_bytes(image: Image) -> bytes:
|
||||
def to_bytes(image: ImageType) -> bytes:
|
||||
"""
|
||||
Converts the given image to bytes.
|
||||
|
||||
Args:
|
||||
image (Image.Image): The image to convert.
|
||||
image (ImageType): The image to convert.
|
||||
|
||||
Returns:
|
||||
bytes: The image as bytes.
|
||||
"""
|
||||
bytes_io = BytesIO()
|
||||
image.save(bytes_io, image.format)
|
||||
image.seek(0)
|
||||
return bytes_io.getvalue()
|
||||
if isinstance(image, bytes):
|
||||
return image
|
||||
elif isinstance(image, str):
|
||||
is_data_uri_an_image(image)
|
||||
return extract_data_uri(image)
|
||||
elif isinstance(image, Image):
|
||||
bytes_io = BytesIO()
|
||||
image.save(bytes_io, image.format)
|
||||
image.seek(0)
|
||||
return bytes_io.getvalue()
|
||||
else:
|
||||
return image.read()
|
||||
|
||||
class ImageResponse:
|
||||
def __init__(
|
||||
|
Loading…
Reference in New Issue
Block a user