DRF_Template/backend/webdriver/utils.py

391 lines
15 KiB
Python
Raw Normal View History

"""
Settings file to hold constants and functions
"""
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import os
from config.settings import get_secret
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver import FirefoxOptions
from selenium import webdriver
import undetected_chromedriver as uc
from config.settings import USE_PROXY, CAPTCHA_TESTING
from config.settings import get_secret
from twocaptcha import TwoCaptcha
from whois import whois
from whois.parser import PywhoisError
def take_snapshot(driver, filename='dump.png'):
# Set window size
required_width = driver.execute_script(
'return document.body.parentNode.scrollWidth')
required_height = driver.execute_script(
'return document.body.parentNode.scrollHeight')
driver.set_window_size(
required_width, required_height+(required_height*0.05))
# Take the snapshot
driver.find_element(By.TAG_NAME,
'body').screenshot('/dumps/'+filename) # avoids any scrollbars
print('Snapshot saved')
def dump_html(driver, filename='dump.html'):
# Save the page source to error.html
with open(('/dumps/'+filename), 'w', encoding='utf-8') as file:
file.write(driver.page_source)
def setup_webdriver(driver_type="chrome", use_proxy=True, use_saved_session=False):
# Manual proxy override via .env variable
if not USE_PROXY:
use_proxy = False
if use_proxy:
print('Running driver with proxy enabled')
else:
print('Running driver with proxy disabled')
if use_saved_session:
print('Running with saved session')
else:
print('Running without using saved session')
if driver_type == "chrome":
print('Using Chrome driver')
opts = uc.ChromeOptions()
if use_saved_session:
if os.path.exists("/tmp_chrome_profile"):
print('Existing Chrome ephemeral profile found')
else:
print('No existing Chrome ephemeral profile found')
os.system("mkdir /tmp_chrome_profile")
if os.path.exists('/chrome'):
print('Copying Chrome Profile to ephemeral directory')
# Flush any non-essential cache directories from the existing profile as they may balloon in size overtime
os.system(
'rm -rf "/chrome/Selenium Profile/Code Cache/*"')
# Create a copy of the Chrome Profile
os.system("cp -r /chrome/* /tmp_chrome_profile")
try:
# Remove some items related to file locks
os.remove('/tmp_chrome_profile/SingletonLock')
os.remove('/tmp_chrome_profile/SingletonSocket')
os.remove('/tmp_chrome_profile/SingletonLock')
except:
pass
else:
print('No existing Chrome Profile found. Creating one from scratch')
if use_saved_session:
# Specify the user data directory
opts.add_argument(f'--user-data-dir=/tmp_chrome_profile')
opts.add_argument('--profile-directory=Selenium Profile')
# Set proxy
if use_proxy:
opts.add_argument(
f'--proxy-server=socks5://{get_secret("PROXY_IP")}:{get_secret("PROXY_PORT_IP_AUTH")}')
opts.add_argument("--disable-extensions")
opts.add_argument('--disable-application-cache')
opts.add_argument("--disable-setuid-sandbox")
opts.add_argument('--disable-dev-shm-usage')
opts.add_argument("--disable-gpu")
opts.add_argument("--no-sandbox")
opts.add_argument("--headless=new")
driver = uc.Chrome(options=opts)
elif driver_type == "firefox":
print('Using firefox driver')
opts = FirefoxOptions()
if use_saved_session:
if not os.path.exists("/firefox"):
print('No profile found')
os.makedirs("/firefox")
else:
print('Existing profile found')
# Specify a profile if it exists
opts.profile = "/firefox"
# Set proxy
if use_proxy:
opts.set_preference('network.proxy.type', 1)
opts.set_preference('network.proxy.socks',
get_secret('PROXY_IP'))
opts.set_preference('network.proxy.socks_port',
int(get_secret('PROXY_PORT_IP_AUTH')))
opts.set_preference('network.proxy.socks_remote_dns', False)
opts.add_argument('--disable-dev-shm-usage')
opts.add_argument("--headless")
opts.add_argument("--disable-gpu")
driver = webdriver.Firefox(options=opts)
driver.maximize_window()
# Check if proxy is working
driver.get('https://api.ipify.org/')
body = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body")))
ip_address = body.text
print(f'External IP: {ip_address}')
return driver
# Function to get the element once it has loaded in
def get_element(driver, by, key, hidden_element=False, timeout=8):
try:
if by == "xpath":
by = By.XPATH
elif by == "css":
by = By.CSS_SELECTOR
elif by == "id":
by = By.ID
elif by == "tagname":
by = By.TAG_NAME
elif by == "name":
by = By.NAME
elif by == "classname":
by == By.CLASS_NAME
wait = WebDriverWait(driver, timeout=timeout)
if not hidden_element:
element = wait.until(
EC.element_to_be_clickable((by, key)) and EC.visibility_of_element_located((by, key)))
else:
element = wait.until(EC.presence_of_element_located(
(by, key)))
return element
except Exception:
raise Exception(f"Unable to get element of {by} value: {key}")
def execute_selenium_elements(driver, timeout, elements):
try:
for index, element in enumerate(elements):
print('Waiting...')
# Element may have a keyword specified, check if that exists before running any actions
if "keyword" in element:
# Skip a step if the keyword does not exist
if element['keyword'] not in driver.page_source:
print(
f'Keyword {element["keyword"]} does not exist. Skipping step: {index+1} - {element["name"]}')
continue
elif element['keyword'] in driver.page_source and element['type'] == 'skip':
print(
f'Keyword {element["keyword"]} does exists. Stopping at step: {index+1} - {element["name"]}')
break
print(f'Step: {index+1} - {element["name"]}')
# Revert to default iframe action
if element["type"] == "revert_default_frame":
driver.switch_to.default_content()
continue
# CAPTCHA Callback
elif element["type"] == "recaptchav2_callback":
if callable(element["input"]):
values = element["input"]()
else:
values = element["input"]
if type(values) is list:
raise Exception(
'Invalid input value specified for "callback" type')
else:
# For single input values
driver.execute_script(
f'onRecaptcha("{values}");')
continue
try:
# Try to get default element
if "hidden" in element:
site_element = get_element(
driver, element["default"]["type"], element["default"]["key"], hidden_element=True, timeout=timeout)
else:
site_element = get_element(
driver, element["default"]["type"], element["default"]["key"], timeout=timeout)
except Exception as e:
print(f'Failed to find primary element')
# If that fails, try to get the failover one
print('Trying to find legacy element')
if "hidden" in element:
site_element = get_element(
driver, element["failover"]["type"], element["failover"]["key"], hidden_element=True, timeout=timeout)
else:
site_element = get_element(
driver, element["failover"]["type"], element["failover"]["key"], timeout=timeout)
# Clicking an element
if element["type"] == "click":
site_element.click()
# Switching to an element frame/iframe
elif element["type"] == "switch_to_iframe_click":
driver.switch_to.frame(site_element)
# Input type simulates user typing
elif element["type"] == "input":
if callable(element["input"]):
values = element["input"]()
else:
values = element["input"]
values = values.splitlines()
# For multiple input values
for index, value in enumerate(values):
site_element.send_keys(value)
# Only send a new line keypress if this is not the last value to enter in the list
if index != len(values) - 1:
site_element.send_keys(Keys.RETURN)
elif element["type"] == "input_enter":
site_element.send_keys(Keys.RETURN)
# Input_replace type places values directly. Useful for CAPTCHA
elif element["type"] == "input_replace":
if callable(element["input"]):
values = element["input"]()
else:
values = element["input"]
if type(values) is list:
raise Exception(
'Invalid input value specified for "input_replace" type')
else:
# For single input values
driver.execute_script(
f'arguments[0].value = "{values}";', site_element)
except Exception as e:
take_snapshot(driver)
dump_html(driver)
driver.close()
driver.quit()
raise Exception(e)
def solve_captcha(site_key, url, retry_attempts=3, version='v2', enterprise=False, use_proxy=True):
# Manual proxy override set via $ENV
if not USE_PROXY:
use_proxy = False
if CAPTCHA_TESTING:
print('Initializing CAPTCHA solver in dummy mode')
code = "12345"
print("CAPTCHA Successful")
return code
elif use_proxy:
print('Using CAPTCHA solver with proxy')
else:
print('Using CAPTCHA solver without proxy')
captcha_params = {
"url": url,
"sitekey": site_key,
"version": version,
"enterprise": 1 if enterprise else 0,
"proxy": {
'type': 'socks5',
'uri': get_secret('PROXY_USER_AUTH')
} if use_proxy else None
}
# Keep retrying until max attempts is reached
for _ in range(retry_attempts):
# Solver uses 2CAPTCHA by default
solver = TwoCaptcha(get_secret("CAPTCHA_API_KEY"))
try:
print('Waiting for CAPTCHA code...')
code = solver.recaptcha(**captcha_params)["code"]
print("CAPTCHA Successful")
return code
except Exception as e:
print(f'CAPTCHA Failed! {e}')
raise Exception(f"CAPTCHA API Failed!")
def whois_lookup(url):
try:
lookup_info = whois(url)
# TODO: Add your own processing here
except PywhoisError:
print(f"No WhoIs record found for {url}")
return lookup_info
def save_browser_session(driver):
# Copy over the profile once we finish logging in
if isinstance(driver, webdriver.Firefox):
# Copy process for Firefox
print('Updating saved Firefox profile')
# Get the current profile directory from about:support page
driver.get("about:support")
box = get_element(
driver, "id", "profile-dir-box", timeout=4)
temp_profile_path = os.path.join(os.getcwd(), box.text)
profile_path = '/firefox'
# Create the command
copy_command = "cp -r " + temp_profile_path + "/* " + profile_path
# Copy over the Firefox profile
if os.system(copy_command):
print("Firefox profile saved")
elif isinstance(driver, uc.Chrome):
# Copy the Chrome profile
print('Updating non-ephemeral Chrome profile')
# Flush Code Cache again to speed up copy
os.system(
'rm -rf "/tmp_chrome_profile/SimpleDMCA Profile/Code Cache/*"')
if os.system("cp -r /tmp_chrome_profile/* /chrome"):
print("Chrome profile saved")
# Sample function
# Call this within a Celery task
# TODO: Modify as needed to your needs
def selenium_action_template(driver):
info = {
"sample_field1": "sample_data",
"sample_field2": "sample_data",
"captcha_code": lambda: solve_captcha('SITE_KEY', 'SITE_URL')
}
elements = [
{
"name": "Enter data for sample field 1",
"type": "input",
"input": "{first_name}",
# If a site implements canary design releases, you can place the ID for the element in the new design
"default": {
# See get_element() for possible selector types
"type": "xpath",
"key": ''
},
# If a site implements canary design releases, you can place the ID for the element in the old design here
"failover": {
"type": "xpath",
"key": ''
}
},
]
# Dictionary to store values which will be entered via Selenium
# Helps prevent duplicates and stale values compared to just using the info variable directly
site_form_values = {}
# Fill in final fstring values in elements
for element in elements:
if 'input' in element and '{' in element['input']:
a = element['input'].strip('{}')
if a in info:
value = info[a]
# Check if the value is a callable (a lambda function) and call it if so
if callable(value):
# Check if the value has already been called
if a not in site_form_values:
# Call the value and store it in the dictionary
site_form_values[a] = value()
# Use the stored value
value = site_form_values[a]
# Replace the placeholder with the actual value
element['input'] = str(value)
# Execute the selenium actions
execute_selenium_elements(driver, 8, elements)