Clean up docker-compose and run Black formatter over entire codebase

This commit is contained in:
Keannu Christian Bernasol 2024-10-30 22:09:58 +08:00
parent 6c232b3e89
commit 069aba80b1
60 changed files with 1946 additions and 1485 deletions

View file

@ -2,5 +2,5 @@ from django.apps import AppConfig
class EmailsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'webdriver'
default_auto_field = "django.db.models.BigAutoField"
name = "webdriver"

View file

@ -1,11 +1,19 @@
from celery import shared_task
from webdriver.utils import setup_webdriver, selenium_action_template, google_search, get_element, get_elements
from webdriver.utils import (
setup_webdriver,
selenium_action_template,
google_search,
get_element,
get_elements,
)
from selenium.webdriver.common.by import By
from search_results.tasks import create_search_result
# Task template
@shared_task(autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 5})
@shared_task(
autoretry_for=(Exception,), retry_kwargs={"max_retries": 3, "countdown": 5}
)
def sample_selenium_task():
driver = setup_webdriver(use_proxy=False, use_saved_session=False)
@ -18,27 +26,29 @@ def sample_selenium_task():
driver.close()
driver.quit()
# Sample task to scrape Google for search results based on a keyword
@shared_task(autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 5})
@shared_task(
autoretry_for=(Exception,), retry_kwargs={"max_retries": 3, "countdown": 5}
)
def simple_google_search():
driver = setup_webdriver(driver_type="firefox",
use_proxy=False, use_saved_session=False)
driver = setup_webdriver(
driver_type="firefox", use_proxy=False, use_saved_session=False
)
driver.get(f"https://google.com/")
google_search(driver, search_term="cat blog posts")
# Count number of Google search results
search_items = get_elements(
driver, "xpath", '//*[@id="search"]/div[1]/div[1]/*')
search_items = get_elements(driver, "xpath", '//*[@id="search"]/div[1]/div[1]/*')
for item in search_items:
title = item.find_element(By.TAG_NAME, 'h3').text
link = item.find_element(By.TAG_NAME, 'a').get_attribute('href')
title = item.find_element(By.TAG_NAME, "h3").text
link = item.find_element(By.TAG_NAME, "a").get_attribute("href")
create_search_result.apply_async(
kwargs={"title": title, "link": link})
create_search_result.apply_async(kwargs={"title": title, "link": link})
driver.close()
driver.quit()

View file

@ -1,6 +1,7 @@
"""
Settings file to hold constants and functions
"""
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from config.settings import get_secret
@ -18,24 +19,26 @@ import os
import random
def take_snapshot(driver, filename='dump.png'):
def take_snapshot(driver, filename="dump.png"):
# Set window size
required_width = driver.execute_script(
'return document.body.parentNode.scrollWidth')
"return document.body.parentNode.scrollWidth"
)
required_height = driver.execute_script(
'return document.body.parentNode.scrollHeight')
driver.set_window_size(
required_width, required_height+(required_height*0.05))
"return document.body.parentNode.scrollHeight"
)
driver.set_window_size(required_width, required_height + (required_height * 0.05))
# Take the snapshot
driver.find_element(By.TAG_NAME,
'body').screenshot('/dumps/'+filename) # avoids any scrollbars
print('Snapshot saved')
driver.find_element(By.TAG_NAME, "body").screenshot(
"/dumps/" + filename
) # avoids any scrollbars
print("Snapshot saved")
def dump_html(driver, filename='dump.html'):
def dump_html(driver, filename="dump.html"):
# Save the page source to error.html
with open(('/dumps/'+filename), 'w', encoding='utf-8') as file:
with open(("/dumps/" + filename), "w", encoding="utf-8") as file:
file.write(driver.page_source)
@ -44,83 +47,83 @@ def setup_webdriver(driver_type="chrome", use_proxy=True, use_saved_session=Fals
if not USE_PROXY:
use_proxy = False
if use_proxy:
print('Running driver with proxy enabled')
print("Running driver with proxy enabled")
else:
print('Running driver with proxy disabled')
print("Running driver with proxy disabled")
if use_saved_session:
print('Running with saved session')
print("Running with saved session")
else:
print('Running without using saved session')
print("Running without using saved session")
if driver_type == "chrome":
print('Using Chrome driver')
print("Using Chrome driver")
opts = uc.ChromeOptions()
if use_saved_session:
if os.path.exists("/tmp_chrome_profile"):
print('Existing Chrome ephemeral profile found')
print("Existing Chrome ephemeral profile found")
else:
print('No existing Chrome ephemeral profile found')
print("No existing Chrome ephemeral profile found")
os.system("mkdir /tmp_chrome_profile")
if os.path.exists('/chrome'):
print('Copying Chrome Profile to ephemeral directory')
if os.path.exists("/chrome"):
print("Copying Chrome Profile to ephemeral directory")
# Flush any non-essential cache directories from the existing profile as they may balloon in size overtime
os.system(
'rm -rf "/chrome/Selenium Profile/Code Cache/*"')
os.system('rm -rf "/chrome/Selenium Profile/Code Cache/*"')
# Create a copy of the Chrome Profile
os.system("cp -r /chrome/* /tmp_chrome_profile")
try:
# Remove some items related to file locks
os.remove('/tmp_chrome_profile/SingletonLock')
os.remove('/tmp_chrome_profile/SingletonSocket')
os.remove('/tmp_chrome_profile/SingletonLock')
os.remove("/tmp_chrome_profile/SingletonLock")
os.remove("/tmp_chrome_profile/SingletonSocket")
os.remove("/tmp_chrome_profile/SingletonLock")
except:
pass
else:
print('No existing Chrome Profile found. Creating one from scratch')
print("No existing Chrome Profile found. Creating one from scratch")
if use_saved_session:
# Specify the user data directory
opts.add_argument(f'--user-data-dir=/tmp_chrome_profile')
opts.add_argument('--profile-directory=Selenium Profile')
opts.add_argument(f"--user-data-dir=/tmp_chrome_profile")
opts.add_argument("--profile-directory=Selenium Profile")
# Set proxy
if use_proxy:
opts.add_argument(
f'--proxy-server=socks5://{get_secret("PROXY_IP")}:{get_secret("PROXY_PORT_IP_AUTH")}')
f'--proxy-server=socks5://{get_secret("PROXY_IP")}:{get_secret("PROXY_PORT_IP_AUTH")}'
)
opts.add_argument("--disable-extensions")
opts.add_argument('--disable-application-cache')
opts.add_argument("--disable-application-cache")
opts.add_argument("--disable-setuid-sandbox")
opts.add_argument('--disable-dev-shm-usage')
opts.add_argument("--disable-dev-shm-usage")
opts.add_argument("--disable-gpu")
opts.add_argument("--no-sandbox")
opts.add_argument("--headless=new")
driver = uc.Chrome(options=opts)
elif driver_type == "firefox":
print('Using firefox driver')
print("Using firefox driver")
opts = FirefoxOptions()
if use_saved_session:
if not os.path.exists("/firefox"):
print('No profile found')
print("No profile found")
os.makedirs("/firefox")
else:
print('Existing profile found')
print("Existing profile found")
# Specify a profile if it exists
opts.profile = "/firefox"
# Set proxy
if use_proxy:
opts.set_preference('network.proxy.type', 1)
opts.set_preference('network.proxy.socks',
get_secret('PROXY_IP'))
opts.set_preference('network.proxy.socks_port',
int(get_secret('PROXY_PORT_IP_AUTH')))
opts.set_preference('network.proxy.socks_remote_dns', False)
opts.set_preference("network.proxy.type", 1)
opts.set_preference("network.proxy.socks", get_secret("PROXY_IP"))
opts.set_preference(
"network.proxy.socks_port", int(get_secret("PROXY_PORT_IP_AUTH"))
)
opts.set_preference("network.proxy.socks_remote_dns", False)
opts.add_argument('--disable-dev-shm-usage')
opts.add_argument("--disable-dev-shm-usage")
opts.add_argument("--headless")
opts.add_argument("--disable-gpu")
driver = webdriver.Firefox(options=opts)
@ -128,13 +131,15 @@ def setup_webdriver(driver_type="chrome", use_proxy=True, use_saved_session=Fals
driver.maximize_window()
# Check if proxy is working
driver.get('https://api.ipify.org/')
driver.get("https://api.ipify.org/")
body = WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.TAG_NAME, "body")))
EC.presence_of_element_located((By.TAG_NAME, "body"))
)
ip_address = body.text
print(f'External IP: {ip_address}')
print(f"External IP: {ip_address}")
return driver
# These are wrapper function for quickly automating multiple steps in webscraping (logins, button presses, text inputs, etc.)
# Depending on your use case, you may have to opt out of using this
@ -151,10 +156,11 @@ def get_element(driver, by, key, hidden_element=False, timeout=8):
wait = WebDriverWait(driver, timeout=timeout)
if not hidden_element:
element = wait.until(
EC.element_to_be_clickable((by, key)) and EC.visibility_of_element_located((by, key)))
EC.element_to_be_clickable((by, key))
and EC.visibility_of_element_located((by, key))
)
else:
element = wait.until(EC.presence_of_element_located(
(by, key)))
element = wait.until(EC.presence_of_element_located((by, key)))
return element
except Exception:
dump_html(driver)
@ -173,13 +179,12 @@ def get_elements(driver, by, key, hidden_element=False, timeout=8):
wait = WebDriverWait(driver, timeout=timeout)
if hidden_element:
elements = wait.until(
EC.presence_of_all_elements_located((by, key)))
elements = wait.until(EC.presence_of_all_elements_located((by, key)))
else:
visible_elements = wait.until(
EC.visibility_of_any_elements_located((by, key)))
elements = [
element for element in visible_elements if element.is_enabled()]
EC.visibility_of_any_elements_located((by, key))
)
elements = [element for element in visible_elements if element.is_enabled()]
return elements
except Exception:
@ -193,17 +198,22 @@ def get_elements(driver, by, key, hidden_element=False, timeout=8):
def execute_selenium_elements(driver, timeout, elements):
try:
for index, element in enumerate(elements):
print('Waiting...')
print("Waiting...")
# Element may have a keyword specified, check if that exists before running any actions
if "keyword" in element:
# Skip a step if the keyword does not exist
if element['keyword'] not in driver.page_source:
if element["keyword"] not in driver.page_source:
print(
f'Keyword {element["keyword"]} does not exist. Skipping step: {index+1} - {element["name"]}')
f'Keyword {element["keyword"]} does not exist. Skipping step: {index+1} - {element["name"]}'
)
continue
elif element['keyword'] in driver.page_source and element['type'] == 'skip':
elif (
element["keyword"] in driver.page_source
and element["type"] == "skip"
):
print(
f'Keyword {element["keyword"]} does exists. Stopping at step: {index+1} - {element["name"]}')
f'Keyword {element["keyword"]} does exists. Stopping at step: {index+1} - {element["name"]}'
)
break
print(f'Step: {index+1} - {element["name"]}')
# Revert to default iframe action
@ -217,31 +227,47 @@ def execute_selenium_elements(driver, timeout, elements):
else:
values = element["input"]
if type(values) is list:
raise Exception(
'Invalid input value specified for "callback" type')
raise Exception('Invalid input value specified for "callback" type')
else:
# For single input values
driver.execute_script(
f'onRecaptcha("{values}");')
driver.execute_script(f'onRecaptcha("{values}");')
continue
try:
# Try to get default element
if "hidden" in element:
site_element = get_element(
driver, element["default"]["type"], element["default"]["key"], hidden_element=True, timeout=timeout)
driver,
element["default"]["type"],
element["default"]["key"],
hidden_element=True,
timeout=timeout,
)
else:
site_element = get_element(
driver, element["default"]["type"], element["default"]["key"], timeout=timeout)
driver,
element["default"]["type"],
element["default"]["key"],
timeout=timeout,
)
except Exception as e:
print(f'Failed to find primary element')
print(f"Failed to find primary element")
# If that fails, try to get the failover one
print('Trying to find legacy element')
print("Trying to find legacy element")
if "hidden" in element:
site_element = get_element(
driver, element["failover"]["type"], element["failover"]["key"], hidden_element=True, timeout=timeout)
driver,
element["failover"]["type"],
element["failover"]["key"],
hidden_element=True,
timeout=timeout,
)
else:
site_element = get_element(
driver, element["failover"]["type"], element["failover"]["key"], timeout=timeout)
driver,
element["failover"]["type"],
element["failover"]["key"],
timeout=timeout,
)
# Clicking an element
if element["type"] == "click":
site_element.click()
@ -272,11 +298,13 @@ def execute_selenium_elements(driver, timeout, elements):
values = element["input"]
if type(values) is list:
raise Exception(
'Invalid input value specified for "input_replace" type')
'Invalid input value specified for "input_replace" type'
)
else:
# For single input values
driver.execute_script(
f'arguments[0].value = "{values}";', site_element)
f'arguments[0].value = "{values}";', site_element
)
except Exception as e:
take_snapshot(driver)
dump_html(driver)
@ -285,30 +313,33 @@ def execute_selenium_elements(driver, timeout, elements):
raise Exception(e)
def solve_captcha(site_key, url, retry_attempts=3, version='v2', enterprise=False, use_proxy=True):
def solve_captcha(
site_key, url, retry_attempts=3, version="v2", enterprise=False, use_proxy=True
):
# Manual proxy override set via $ENV
if not USE_PROXY:
use_proxy = False
if CAPTCHA_TESTING:
print('Initializing CAPTCHA solver in dummy mode')
print("Initializing CAPTCHA solver in dummy mode")
code = random.randint()
print("CAPTCHA Successful")
return code
elif use_proxy:
print('Using CAPTCHA solver with proxy')
print("Using CAPTCHA solver with proxy")
else:
print('Using CAPTCHA solver without proxy')
print("Using CAPTCHA solver without proxy")
captcha_params = {
"url": url,
"sitekey": site_key,
"version": version,
"enterprise": 1 if enterprise else 0,
"proxy": {
'type': 'socks5',
'uri': get_secret('PROXY_USER_AUTH')
} if use_proxy else None
"proxy": (
{"type": "socks5", "uri": get_secret("PROXY_USER_AUTH")}
if use_proxy
else None
),
}
# Keep retrying until max attempts is reached
@ -316,12 +347,12 @@ def solve_captcha(site_key, url, retry_attempts=3, version='v2', enterprise=Fals
# Solver uses 2CAPTCHA by default
solver = TwoCaptcha(get_secret("CAPTCHA_API_KEY"))
try:
print('Waiting for CAPTCHA code...')
print("Waiting for CAPTCHA code...")
code = solver.recaptcha(**captcha_params)["code"]
print("CAPTCHA Successful")
return code
except Exception as e:
print(f'CAPTCHA Failed! {e}')
print(f"CAPTCHA Failed! {e}")
raise Exception(f"CAPTCHA API Failed!")
@ -339,13 +370,12 @@ def save_browser_session(driver):
# Copy over the profile once we finish logging in
if isinstance(driver, webdriver.Firefox):
# Copy process for Firefox
print('Updating saved Firefox profile')
print("Updating saved Firefox profile")
# Get the current profile directory from about:support page
driver.get("about:support")
box = get_element(
driver, "id", "profile-dir-box", timeout=4)
box = get_element(driver, "id", "profile-dir-box", timeout=4)
temp_profile_path = os.path.join(os.getcwd(), box.text)
profile_path = '/firefox'
profile_path = "/firefox"
# Create the command
copy_command = "cp -r " + temp_profile_path + "/* " + profile_path
# Copy over the Firefox profile
@ -353,13 +383,13 @@ def save_browser_session(driver):
print("Firefox profile saved")
elif isinstance(driver, uc.Chrome):
# Copy the Chrome profile
print('Updating non-ephemeral Chrome profile')
print("Updating non-ephemeral Chrome profile")
# Flush Code Cache again to speed up copy
os.system(
'rm -rf "/tmp_chrome_profile/SimpleDMCA Profile/Code Cache/*"')
os.system('rm -rf "/tmp_chrome_profile/SimpleDMCA Profile/Code Cache/*"')
if os.system("cp -r /tmp_chrome_profile/* /chrome"):
print("Chrome profile saved")
# Sample function
# Call this within a Celery task
# TODO: Modify as needed to your needs
@ -370,7 +400,7 @@ def selenium_action_template(driver):
info = {
"sample_field1": "sample_data",
"sample_field2": "sample_data",
"captcha_code": lambda: solve_captcha('SITE_KEY', 'SITE_URL')
"captcha_code": lambda: solve_captcha("SITE_KEY", "SITE_URL"),
}
elements = [
@ -382,13 +412,10 @@ def selenium_action_template(driver):
"default": {
# See get_element() for possible selector types
"type": "xpath",
"key": ''
"key": "",
},
# If a site implements canary design releases, you can place the ID for the element in the old design here
"failover": {
"type": "xpath",
"key": ''
}
"failover": {"type": "xpath", "key": ""},
},
]
@ -398,8 +425,8 @@ def selenium_action_template(driver):
# Fill in final fstring values in elements
for element in elements:
if 'input' in element and '{' in element['input']:
a = element['input'].strip('{}')
if "input" in element and "{" in element["input"]:
a = element["input"].strip("{}")
if a in info:
value = info[a]
# Check if the value is a callable (a lambda function) and call it if so
@ -411,11 +438,12 @@ def selenium_action_template(driver):
# Use the stored value
value = site_form_values[a]
# Replace the placeholder with the actual value
element['input'] = str(value)
element["input"] = str(value)
# Execute the selenium actions
execute_selenium_elements(driver, 8, elements)
# Sample task for Google search
@ -429,40 +457,28 @@ def google_search(driver, search_term):
"name": "Type in search term",
"type": "input",
"input": "{search_term}",
"default": {
"type": "xpath",
"key": '//*[@id="APjFqb"]'
},
"failover": {
"type": "xpath",
"key": '//*[@id="APjFqb"]'
}
"default": {"type": "xpath", "key": '//*[@id="APjFqb"]'},
"failover": {"type": "xpath", "key": '//*[@id="APjFqb"]'},
},
{
"name": "Press enter",
"type": "input_enter",
"default": {
"type": "xpath",
"key": '//*[@id="APjFqb"]'
},
"failover": {
"type": "xpath",
"key": '//*[@id="APjFqb"]'
}
"default": {"type": "xpath", "key": '//*[@id="APjFqb"]'},
"failover": {"type": "xpath", "key": '//*[@id="APjFqb"]'},
},
]
site_form_values = {}
for element in elements:
if 'input' in element and '{' in element['input']:
a = element['input'].strip('{}')
if "input" in element and "{" in element["input"]:
a = element["input"].strip("{}")
if a in info:
value = info[a]
if callable(value):
if a not in site_form_values:
site_form_values[a] = value()
value = site_form_values[a]
element['input'] = str(value)
element["input"] = str(value)
execute_selenium_elements(driver, 8, elements)