""" Settings file to hold constants and functions """ import os import random import undetected_chromedriver as uc from config.settings import CAPTCHA_TESTING, USE_PROXY, get_secret from selenium import webdriver from selenium.webdriver import FirefoxOptions from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from twocaptcha import TwoCaptcha from whois import whois from whois.parser import PywhoisError def take_snapshot(driver, filename="dump.png"): # Set window size required_width = driver.execute_script( "return document.body.parentNode.scrollWidth" ) required_height = driver.execute_script( "return document.body.parentNode.scrollHeight" ) driver.set_window_size(required_width, required_height + (required_height * 0.05)) # Take the snapshot driver.find_element(By.TAG_NAME, "body").screenshot( "/dumps/" + filename ) # avoids any scrollbars print("Snapshot saved") def dump_html(driver, filename="dump.html"): # Save the page source to error.html with open(("/dumps/" + filename), "w", encoding="utf-8") as file: file.write(driver.page_source) def setup_webdriver(driver_type="chrome", use_proxy=True, use_saved_session=False): # Manual proxy override via .env variable if not USE_PROXY: use_proxy = False if use_proxy: print("Running driver with proxy enabled") else: print("Running driver with proxy disabled") if use_saved_session: print("Running with saved session") else: print("Running without using saved session") if driver_type == "chrome": print("Using Chrome driver") opts = uc.ChromeOptions() if use_saved_session: if os.path.exists("/tmp_chrome_profile"): print("Existing Chrome ephemeral profile found") else: print("No existing Chrome ephemeral profile found") os.system("mkdir /tmp_chrome_profile") if os.path.exists("/chrome"): print("Copying Chrome Profile to ephemeral directory") # Flush any non-essential cache directories from the existing profile as they may balloon in size overtime os.system('rm -rf "/chrome/Selenium Profile/Code Cache/*"') # Create a copy of the Chrome Profile os.system("cp -r /chrome/* /tmp_chrome_profile") try: # Remove some items related to file locks os.remove("/tmp_chrome_profile/SingletonLock") os.remove("/tmp_chrome_profile/SingletonSocket") os.remove("/tmp_chrome_profile/SingletonLock") except: pass else: print("No existing Chrome Profile found. Creating one from scratch") if use_saved_session: # Specify the user data directory opts.add_argument(f"--user-data-dir=/tmp_chrome_profile") opts.add_argument("--profile-directory=Selenium Profile") # Set proxy if use_proxy: opts.add_argument( f'--proxy-server=socks5://{get_secret("PROXY_IP")}:{get_secret("PROXY_PORT_IP_AUTH")}' ) opts.add_argument("--disable-extensions") opts.add_argument("--disable-application-cache") opts.add_argument("--disable-setuid-sandbox") opts.add_argument("--disable-dev-shm-usage") opts.add_argument("--disable-gpu") opts.add_argument("--no-sandbox") opts.add_argument("--headless=new") driver = uc.Chrome(options=opts) elif driver_type == "firefox": print("Using firefox driver") opts = FirefoxOptions() if use_saved_session: if not os.path.exists("/firefox"): print("No profile found") os.makedirs("/firefox") else: print("Existing profile found") # Specify a profile if it exists opts.profile = "/firefox" # Set proxy if use_proxy: opts.set_preference("network.proxy.type", 1) opts.set_preference("network.proxy.socks", get_secret("PROXY_IP")) opts.set_preference( "network.proxy.socks_port", int(get_secret("PROXY_PORT_IP_AUTH")) ) opts.set_preference("network.proxy.socks_remote_dns", False) opts.add_argument("--disable-dev-shm-usage") opts.add_argument("--headless") opts.add_argument("--disable-gpu") driver = webdriver.Firefox(options=opts) driver.maximize_window() # Check if proxy is working driver.get("https://api.ipify.org/") body = WebDriverWait(driver, 10).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) ip_address = body.text print(f"External IP: {ip_address}") return driver # These are wrapper function for quickly automating multiple steps in webscraping (logins, button presses, text inputs, etc.) # Depending on your use case, you may have to opt out of using this # Function to get the element once it has loaded in def get_element(driver, by, key, hidden_element=False, timeout=8): try: # Convert string-based locators to By objects (By.XPATH, By.CSS, etc.) if isinstance(by, str): by = getattr(By, by.upper()) wait = WebDriverWait(driver, timeout=timeout) if not hidden_element: element = wait.until( EC.element_to_be_clickable((by, key)) and EC.visibility_of_element_located((by, key)) ) else: element = wait.until(EC.presence_of_element_located((by, key))) return element except Exception: dump_html(driver) take_snapshot(driver) driver.close() driver.quit() raise Exception(f"Unable to get element of {by} value: {key}") def get_elements(driver, by, key, hidden_element=False, timeout=8): try: # Convert string-based locators to By objects (By.XPATH, By.CSS, etc.) if isinstance(by, str): by = getattr(By, by.upper()) wait = WebDriverWait(driver, timeout=timeout) if hidden_element: elements = wait.until(EC.presence_of_all_elements_located((by, key))) else: visible_elements = wait.until( EC.visibility_of_any_elements_located((by, key)) ) elements = [element for element in visible_elements if element.is_enabled()] return elements except Exception: dump_html(driver) take_snapshot(driver) driver.close() driver.quit() raise Exception(f"Unable to get elements of {by} value: {key}") def execute_selenium_elements(driver, timeout, elements): try: for index, element in enumerate(elements): print("Waiting...") # Element may have a keyword specified, check if that exists before running any actions if "keyword" in element: # Skip a step if the keyword does not exist if element["keyword"] not in driver.page_source: print( f'Keyword {element["keyword"]} does not exist. Skipping step: {index+1} - {element["name"]}' ) continue elif ( element["keyword"] in driver.page_source and element["type"] == "skip" ): print( f'Keyword {element["keyword"]} does exists. Stopping at step: {index+1} - {element["name"]}' ) break print(f'Step: {index+1} - {element["name"]}') # Revert to default iframe action if element["type"] == "revert_default_frame": driver.switch_to.default_content() continue # CAPTCHA Callback elif element["type"] == "recaptchav2_callback": if callable(element["input"]): values = element["input"]() else: values = element["input"] if type(values) is list: raise Exception('Invalid input value specified for "callback" type') else: # For single input values driver.execute_script(f'onRecaptcha("{values}");') continue try: # Try to get default element if "hidden" in element: site_element = get_element( driver, element["default"]["type"], element["default"]["key"], hidden_element=True, timeout=timeout, ) else: site_element = get_element( driver, element["default"]["type"], element["default"]["key"], timeout=timeout, ) except Exception: print(f"Failed to find primary element") # If that fails, try to get the failover one print("Trying to find legacy element") if "hidden" in element: site_element = get_element( driver, element["failover"]["type"], element["failover"]["key"], hidden_element=True, timeout=timeout, ) else: site_element = get_element( driver, element["failover"]["type"], element["failover"]["key"], timeout=timeout, ) # Clicking an element if element["type"] == "click": site_element.click() # Switching to an element frame/iframe elif element["type"] == "switch_to_iframe_click": driver.switch_to.frame(site_element) # Input type simulates user typing elif element["type"] == "input": if callable(element["input"]): values = element["input"]() else: values = element["input"] values = values.splitlines() # For multiple input values for index, value in enumerate(values): site_element.send_keys(value) # Only send a new line keypress if this is not the last value to enter in the list if index != len(values) - 1: site_element.send_keys(Keys.RETURN) elif element["type"] == "input_enter": site_element.send_keys(Keys.RETURN) # Input_replace type places values directly. Useful for CAPTCHA elif element["type"] == "input_replace": if callable(element["input"]): values = element["input"]() else: values = element["input"] if type(values) is list: raise Exception( 'Invalid input value specified for "input_replace" type' ) else: # For single input values driver.execute_script( f'arguments[0].value = "{values}";', site_element ) except Exception as e: take_snapshot(driver) dump_html(driver) driver.close() driver.quit() raise Exception(e) def solve_captcha( site_key, url, retry_attempts=3, version="v2", enterprise=False, use_proxy=True ): # Manual proxy override set via $ENV if not USE_PROXY: use_proxy = False if CAPTCHA_TESTING: print("Initializing CAPTCHA solver in dummy mode") code = random.randint() print("CAPTCHA Successful") return code elif use_proxy: print("Using CAPTCHA solver with proxy") else: print("Using CAPTCHA solver without proxy") captcha_params = { "url": url, "sitekey": site_key, "version": version, "enterprise": 1 if enterprise else 0, "proxy": ( {"type": "socks5", "uri": get_secret("PROXY_USER_AUTH")} if use_proxy else None ), } # Keep retrying until max attempts is reached for _ in range(retry_attempts): # Solver uses 2CAPTCHA by default solver = TwoCaptcha(get_secret("CAPTCHA_API_KEY")) try: print("Waiting for CAPTCHA code...") code = solver.recaptcha(**captcha_params)["code"] print("CAPTCHA Successful") return code except Exception as e: print(f"CAPTCHA Failed! {e}") raise Exception(f"CAPTCHA API Failed!") def whois_lookup(url): try: lookup_info = whois(url) # TODO: Add your own processing here except PywhoisError: print(f"No WhoIs record found for {url}") return lookup_info def save_browser_session(driver): # Copy over the profile once we finish logging in if isinstance(driver, webdriver.Firefox): # Copy process for Firefox print("Updating saved Firefox profile") # Get the current profile directory from about:support page driver.get("about:support") box = get_element(driver, "id", "profile-dir-box", timeout=4) temp_profile_path = os.path.join(os.getcwd(), box.text) profile_path = "/firefox" # Create the command copy_command = "cp -r " + temp_profile_path + "/* " + profile_path # Copy over the Firefox profile if os.system(copy_command): print("Firefox profile saved") elif isinstance(driver, uc.Chrome): # Copy the Chrome profile print("Updating non-ephemeral Chrome profile") # Flush Code Cache again to speed up copy os.system('rm -rf "/tmp_chrome_profile/SimpleDMCA Profile/Code Cache/*"') if os.system("cp -r /tmp_chrome_profile/* /chrome"): print("Chrome profile saved") # Sample function # Call this within a Celery task # TODO: Modify as needed to your needs def selenium_action_template(driver): # Data that might need to be entered during webscraping info = { "sample_field1": "sample_data", "sample_field2": "sample_data", "captcha_code": lambda: solve_captcha("SITE_KEY", "SITE_URL"), } elements = [ { "name": "Enter data for sample field 1", "type": "input", "input": "{sample_field1}", # If a site implements canary design releases, you can place the ID for the element in the new design "default": { # See get_element() for possible selector types "type": "xpath", "key": "", }, # If a site implements canary design releases, you can place the ID for the element in the old design here "failover": {"type": "xpath", "key": ""}, }, ] # Dictionary to store values which will be entered via Selenium # Helps prevent duplicates and stale values compared to just using the info variable directly site_form_values = {} # Fill in final fstring values in elements for element in elements: if "input" in element and "{" in element["input"]: a = element["input"].strip("{}") if a in info: value = info[a] # Check if the value is a callable (a lambda function) and call it if so if callable(value): # Check if the value has already been called if a not in site_form_values: # Call the value and store it in the dictionary site_form_values[a] = value() # Use the stored value value = site_form_values[a] # Replace the placeholder with the actual value element["input"] = str(value) # Execute the selenium actions execute_selenium_elements(driver, 8, elements) # Sample task for Google search def google_search(driver, search_term): info = { "search_term": search_term, } elements = [ { "name": "Type in search term", "type": "input", "input": "{search_term}", "default": {"type": "xpath", "key": '//*[@id="APjFqb"]'}, "failover": {"type": "xpath", "key": '//*[@id="APjFqb"]'}, }, { "name": "Press enter", "type": "input_enter", "default": {"type": "xpath", "key": '//*[@id="APjFqb"]'}, "failover": {"type": "xpath", "key": '//*[@id="APjFqb"]'}, }, ] site_form_values = {} for element in elements: if "input" in element and "{" in element["input"]: a = element["input"].strip("{}") if a in info: value = info[a] if callable(value): if a not in site_form_values: site_form_values[a] = value() value = site_form_values[a] element["input"] = str(value) execute_selenium_elements(driver, 8, elements)