2023-11-20 16:00:40 +03:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
from platformdirs import user_config_dir
|
2023-12-02 07:40:07 +03:00
|
|
|
from selenium.webdriver.remote.webdriver import WebDriver
|
|
|
|
from undetected_chromedriver import Chrome, ChromeOptions
|
2023-12-07 09:18:05 +03:00
|
|
|
from selenium.webdriver.common.by import By
|
|
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
from os import path
|
2023-12-06 11:35:36 +03:00
|
|
|
from . import debug
|
2023-12-02 07:40:07 +03:00
|
|
|
|
2023-11-20 16:00:40 +03:00
|
|
|
try:
|
|
|
|
from pyvirtualdisplay import Display
|
|
|
|
has_pyvirtualdisplay = True
|
|
|
|
except ImportError:
|
|
|
|
has_pyvirtualdisplay = False
|
|
|
|
|
|
|
|
def get_browser(
|
|
|
|
user_data_dir: str = None,
|
|
|
|
headless: bool = False,
|
|
|
|
proxy: str = None,
|
|
|
|
options: ChromeOptions = None
|
2023-12-02 07:40:07 +03:00
|
|
|
) -> WebDriver:
|
2023-11-20 16:00:40 +03:00
|
|
|
if user_data_dir == None:
|
|
|
|
user_data_dir = user_config_dir("g4f")
|
2023-12-07 09:18:05 +03:00
|
|
|
if user_data_dir and debug.logging:
|
|
|
|
print("Open browser with config dir:", user_data_dir)
|
2023-12-02 07:40:07 +03:00
|
|
|
if not options:
|
|
|
|
options = ChromeOptions()
|
2023-11-20 16:00:40 +03:00
|
|
|
if proxy:
|
|
|
|
options.add_argument(f'--proxy-server={proxy}')
|
2023-12-06 11:35:36 +03:00
|
|
|
driver = '/usr/bin/chromedriver'
|
2023-12-07 09:18:05 +03:00
|
|
|
if not path.isfile(driver):
|
2023-12-06 11:35:36 +03:00
|
|
|
driver = None
|
2023-12-07 09:18:05 +03:00
|
|
|
return Chrome(
|
|
|
|
options=options,
|
|
|
|
user_data_dir=user_data_dir,
|
|
|
|
driver_executable_path=driver,
|
|
|
|
headless=headless
|
|
|
|
)
|
|
|
|
|
|
|
|
def bypass_cloudflare(driver: WebDriver, url: str, timeout: int) -> None:
|
|
|
|
# Open website
|
|
|
|
driver.get(url)
|
|
|
|
# Is cloudflare protection
|
|
|
|
if driver.find_element(By.TAG_NAME, "body").get_attribute("class") == "no-js":
|
|
|
|
if debug.logging:
|
|
|
|
print("Cloudflare protection detected:", url)
|
|
|
|
try:
|
|
|
|
# Click button in iframe
|
|
|
|
WebDriverWait(driver, 5).until(
|
|
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, "#turnstile-wrapper iframe"))
|
|
|
|
)
|
|
|
|
driver.switch_to.frame(driver.find_element(By.CSS_SELECTOR, "#turnstile-wrapper iframe"))
|
|
|
|
WebDriverWait(driver, 5).until(
|
|
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, "#challenge-stage input"))
|
|
|
|
)
|
|
|
|
driver.find_element(By.CSS_SELECTOR, "#challenge-stage input").click()
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
finally:
|
|
|
|
driver.switch_to.default_content()
|
|
|
|
# No cloudflare protection
|
|
|
|
WebDriverWait(driver, timeout).until(
|
|
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, "body:not(.no-js)"))
|
|
|
|
)
|
2023-11-20 16:00:40 +03:00
|
|
|
|
|
|
|
class WebDriverSession():
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
webdriver: WebDriver = None,
|
|
|
|
user_data_dir: str = None,
|
|
|
|
headless: bool = False,
|
|
|
|
virtual_display: bool = False,
|
|
|
|
proxy: str = None,
|
|
|
|
options: ChromeOptions = None
|
|
|
|
):
|
|
|
|
self.webdriver = webdriver
|
|
|
|
self.user_data_dir = user_data_dir
|
|
|
|
self.headless = headless
|
|
|
|
self.virtual_display = None
|
|
|
|
if has_pyvirtualdisplay and virtual_display:
|
2023-12-07 09:18:05 +03:00
|
|
|
self.virtual_display = Display(size=(1920, 1080))
|
2023-11-20 16:00:40 +03:00
|
|
|
self.proxy = proxy
|
|
|
|
self.options = options
|
|
|
|
self.default_driver = None
|
|
|
|
|
|
|
|
def reopen(
|
|
|
|
self,
|
|
|
|
user_data_dir: str = None,
|
|
|
|
headless: bool = False,
|
|
|
|
virtual_display: bool = False
|
|
|
|
) -> WebDriver:
|
|
|
|
if user_data_dir == None:
|
|
|
|
user_data_dir = self.user_data_dir
|
|
|
|
if self.default_driver:
|
|
|
|
self.default_driver.quit()
|
|
|
|
if not virtual_display and self.virtual_display:
|
|
|
|
self.virtual_display.stop()
|
|
|
|
self.virtual_display = None
|
|
|
|
self.default_driver = get_browser(user_data_dir, headless, self.proxy)
|
|
|
|
return self.default_driver
|
|
|
|
|
|
|
|
def __enter__(self) -> WebDriver:
|
|
|
|
if self.webdriver:
|
|
|
|
return self.webdriver
|
|
|
|
if self.virtual_display:
|
|
|
|
self.virtual_display.start()
|
|
|
|
self.default_driver = get_browser(self.user_data_dir, self.headless, self.proxy, self.options)
|
|
|
|
return self.default_driver
|
|
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
|
|
if self.default_driver:
|
|
|
|
try:
|
|
|
|
self.default_driver.close()
|
|
|
|
except:
|
|
|
|
pass
|
|
|
|
self.default_driver.quit()
|
|
|
|
if self.virtual_display:
|
|
|
|
self.virtual_display.stop()
|