mirror of
https://github.com/lemeow125/DRF_Template.git
synced 2025-04-28 10:41:15 +08:00
Overhauled entire project config, added notifications, email templates, optimized stripe subscriptions, redis caching, and webdriver utilities
This commit is contained in:
parent
7cbe8fd720
commit
99dfcef67b
84 changed files with 4300 additions and 867 deletions
0
backend/webdriver/__init__.py
Normal file
0
backend/webdriver/__init__.py
Normal file
6
backend/webdriver/apps.py
Normal file
6
backend/webdriver/apps.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class EmailsConfig(AppConfig):
|
||||
default_auto_field = 'django.db.models.BigAutoField'
|
||||
name = 'webdriver'
|
16
backend/webdriver/tasks.py
Normal file
16
backend/webdriver/tasks.py
Normal file
|
@ -0,0 +1,16 @@
|
|||
from celery import shared_task
|
||||
from webdriver.utils import setup_webdriver, selenium_action_template
|
||||
|
||||
# Sample Celery Selenium function
|
||||
# TODO: Modify this as needed
|
||||
|
||||
|
||||
@shared_task(autoretry_for=(Exception,), retry_kwargs={'max_retries': 6, 'countdown': 5})
|
||||
def sample_selenium_task():
|
||||
driver = setup_webdriver()
|
||||
selenium_action_template(driver)
|
||||
# Place any other actions here after Selenium is done executing
|
||||
|
||||
# Once completed, always close the session
|
||||
driver.close()
|
||||
driver.quit()
|
390
backend/webdriver/utils.py
Normal file
390
backend/webdriver/utils.py
Normal file
|
@ -0,0 +1,390 @@
|
|||
"""
|
||||
Settings file to hold constants and functions
|
||||
"""
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.common.keys import Keys
|
||||
import os
|
||||
from config.settings import get_secret
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver import FirefoxOptions
|
||||
from selenium import webdriver
|
||||
import undetected_chromedriver as uc
|
||||
from config.settings import USE_PROXY, CAPTCHA_TESTING
|
||||
from config.settings import get_secret
|
||||
from twocaptcha import TwoCaptcha
|
||||
from whois import whois
|
||||
from whois.parser import PywhoisError
|
||||
|
||||
|
||||
def take_snapshot(driver, filename='dump.png'):
|
||||
# Set window size
|
||||
required_width = driver.execute_script(
|
||||
'return document.body.parentNode.scrollWidth')
|
||||
required_height = driver.execute_script(
|
||||
'return document.body.parentNode.scrollHeight')
|
||||
driver.set_window_size(
|
||||
required_width, required_height+(required_height*0.05))
|
||||
|
||||
# Take the snapshot
|
||||
driver.find_element(By.TAG_NAME,
|
||||
'body').screenshot('/dumps/'+filename) # avoids any scrollbars
|
||||
print('Snapshot saved')
|
||||
|
||||
|
||||
def dump_html(driver, filename='dump.html'):
|
||||
# Save the page source to error.html
|
||||
with open(('/dumps/'+filename), 'w', encoding='utf-8') as file:
|
||||
file.write(driver.page_source)
|
||||
|
||||
|
||||
def setup_webdriver(driver_type="chrome", use_proxy=True, use_saved_session=False):
|
||||
# Manual proxy override via .env variable
|
||||
if not USE_PROXY:
|
||||
use_proxy = False
|
||||
if use_proxy:
|
||||
print('Running driver with proxy enabled')
|
||||
else:
|
||||
print('Running driver with proxy disabled')
|
||||
|
||||
if use_saved_session:
|
||||
print('Running with saved session')
|
||||
else:
|
||||
print('Running without using saved session')
|
||||
|
||||
if driver_type == "chrome":
|
||||
print('Using Chrome driver')
|
||||
opts = uc.ChromeOptions()
|
||||
|
||||
if use_saved_session:
|
||||
if os.path.exists("/tmp_chrome_profile"):
|
||||
print('Existing Chrome ephemeral profile found')
|
||||
else:
|
||||
print('No existing Chrome ephemeral profile found')
|
||||
os.system("mkdir /tmp_chrome_profile")
|
||||
if os.path.exists('/chrome'):
|
||||
print('Copying Chrome Profile to ephemeral directory')
|
||||
# Flush any non-essential cache directories from the existing profile as they may balloon in size overtime
|
||||
os.system(
|
||||
'rm -rf "/chrome/Selenium Profile/Code Cache/*"')
|
||||
# Create a copy of the Chrome Profile
|
||||
os.system("cp -r /chrome/* /tmp_chrome_profile")
|
||||
try:
|
||||
# Remove some items related to file locks
|
||||
os.remove('/tmp_chrome_profile/SingletonLock')
|
||||
os.remove('/tmp_chrome_profile/SingletonSocket')
|
||||
os.remove('/tmp_chrome_profile/SingletonLock')
|
||||
except:
|
||||
pass
|
||||
else:
|
||||
print('No existing Chrome Profile found. Creating one from scratch')
|
||||
|
||||
if use_saved_session:
|
||||
# Specify the user data directory
|
||||
opts.add_argument(f'--user-data-dir=/tmp_chrome_profile')
|
||||
opts.add_argument('--profile-directory=Selenium Profile')
|
||||
|
||||
# Set proxy
|
||||
if use_proxy:
|
||||
opts.add_argument(
|
||||
f'--proxy-server=socks5://{get_secret("PROXY_IP")}:{get_secret("PROXY_PORT_IP_AUTH")}')
|
||||
|
||||
opts.add_argument("--disable-extensions")
|
||||
opts.add_argument('--disable-application-cache')
|
||||
opts.add_argument("--disable-setuid-sandbox")
|
||||
opts.add_argument('--disable-dev-shm-usage')
|
||||
opts.add_argument("--disable-gpu")
|
||||
opts.add_argument("--no-sandbox")
|
||||
opts.add_argument("--headless=new")
|
||||
driver = uc.Chrome(options=opts)
|
||||
|
||||
elif driver_type == "firefox":
|
||||
print('Using firefox driver')
|
||||
opts = FirefoxOptions()
|
||||
if use_saved_session:
|
||||
if not os.path.exists("/firefox"):
|
||||
print('No profile found')
|
||||
os.makedirs("/firefox")
|
||||
else:
|
||||
print('Existing profile found')
|
||||
# Specify a profile if it exists
|
||||
opts.profile = "/firefox"
|
||||
|
||||
# Set proxy
|
||||
if use_proxy:
|
||||
opts.set_preference('network.proxy.type', 1)
|
||||
opts.set_preference('network.proxy.socks',
|
||||
get_secret('PROXY_IP'))
|
||||
opts.set_preference('network.proxy.socks_port',
|
||||
int(get_secret('PROXY_PORT_IP_AUTH')))
|
||||
opts.set_preference('network.proxy.socks_remote_dns', False)
|
||||
|
||||
opts.add_argument('--disable-dev-shm-usage')
|
||||
opts.add_argument("--headless")
|
||||
opts.add_argument("--disable-gpu")
|
||||
driver = webdriver.Firefox(options=opts)
|
||||
|
||||
driver.maximize_window()
|
||||
|
||||
# Check if proxy is working
|
||||
driver.get('https://api.ipify.org/')
|
||||
body = WebDriverWait(driver, 10).until(
|
||||
EC.presence_of_element_located((By.TAG_NAME, "body")))
|
||||
ip_address = body.text
|
||||
print(f'External IP: {ip_address}')
|
||||
return driver
|
||||
|
||||
# Function to get the element once it has loaded in
|
||||
|
||||
|
||||
def get_element(driver, by, key, hidden_element=False, timeout=8):
|
||||
try:
|
||||
if by == "xpath":
|
||||
by = By.XPATH
|
||||
elif by == "css":
|
||||
by = By.CSS_SELECTOR
|
||||
elif by == "id":
|
||||
by = By.ID
|
||||
elif by == "tagname":
|
||||
by = By.TAG_NAME
|
||||
elif by == "name":
|
||||
by = By.NAME
|
||||
elif by == "classname":
|
||||
by == By.CLASS_NAME
|
||||
wait = WebDriverWait(driver, timeout=timeout)
|
||||
if not hidden_element:
|
||||
element = wait.until(
|
||||
EC.element_to_be_clickable((by, key)) and EC.visibility_of_element_located((by, key)))
|
||||
else:
|
||||
element = wait.until(EC.presence_of_element_located(
|
||||
(by, key)))
|
||||
return element
|
||||
except Exception:
|
||||
raise Exception(f"Unable to get element of {by} value: {key}")
|
||||
|
||||
|
||||
def execute_selenium_elements(driver, timeout, elements):
|
||||
try:
|
||||
for index, element in enumerate(elements):
|
||||
print('Waiting...')
|
||||
# Element may have a keyword specified, check if that exists before running any actions
|
||||
if "keyword" in element:
|
||||
# Skip a step if the keyword does not exist
|
||||
if element['keyword'] not in driver.page_source:
|
||||
print(
|
||||
f'Keyword {element["keyword"]} does not exist. Skipping step: {index+1} - {element["name"]}')
|
||||
continue
|
||||
elif element['keyword'] in driver.page_source and element['type'] == 'skip':
|
||||
print(
|
||||
f'Keyword {element["keyword"]} does exists. Stopping at step: {index+1} - {element["name"]}')
|
||||
break
|
||||
print(f'Step: {index+1} - {element["name"]}')
|
||||
# Revert to default iframe action
|
||||
if element["type"] == "revert_default_frame":
|
||||
driver.switch_to.default_content()
|
||||
continue
|
||||
# CAPTCHA Callback
|
||||
elif element["type"] == "recaptchav2_callback":
|
||||
if callable(element["input"]):
|
||||
values = element["input"]()
|
||||
else:
|
||||
values = element["input"]
|
||||
if type(values) is list:
|
||||
raise Exception(
|
||||
'Invalid input value specified for "callback" type')
|
||||
else:
|
||||
# For single input values
|
||||
driver.execute_script(
|
||||
f'onRecaptcha("{values}");')
|
||||
continue
|
||||
try:
|
||||
# Try to get default element
|
||||
if "hidden" in element:
|
||||
site_element = get_element(
|
||||
driver, element["default"]["type"], element["default"]["key"], hidden_element=True, timeout=timeout)
|
||||
else:
|
||||
site_element = get_element(
|
||||
driver, element["default"]["type"], element["default"]["key"], timeout=timeout)
|
||||
except Exception as e:
|
||||
print(f'Failed to find primary element')
|
||||
# If that fails, try to get the failover one
|
||||
print('Trying to find legacy element')
|
||||
if "hidden" in element:
|
||||
site_element = get_element(
|
||||
driver, element["failover"]["type"], element["failover"]["key"], hidden_element=True, timeout=timeout)
|
||||
else:
|
||||
site_element = get_element(
|
||||
driver, element["failover"]["type"], element["failover"]["key"], timeout=timeout)
|
||||
# Clicking an element
|
||||
if element["type"] == "click":
|
||||
site_element.click()
|
||||
# Switching to an element frame/iframe
|
||||
elif element["type"] == "switch_to_iframe_click":
|
||||
driver.switch_to.frame(site_element)
|
||||
# Input type simulates user typing
|
||||
elif element["type"] == "input":
|
||||
if callable(element["input"]):
|
||||
values = element["input"]()
|
||||
else:
|
||||
values = element["input"]
|
||||
values = values.splitlines()
|
||||
|
||||
# For multiple input values
|
||||
for index, value in enumerate(values):
|
||||
site_element.send_keys(value)
|
||||
# Only send a new line keypress if this is not the last value to enter in the list
|
||||
if index != len(values) - 1:
|
||||
site_element.send_keys(Keys.RETURN)
|
||||
elif element["type"] == "input_enter":
|
||||
site_element.send_keys(Keys.RETURN)
|
||||
# Input_replace type places values directly. Useful for CAPTCHA
|
||||
elif element["type"] == "input_replace":
|
||||
if callable(element["input"]):
|
||||
values = element["input"]()
|
||||
else:
|
||||
values = element["input"]
|
||||
if type(values) is list:
|
||||
raise Exception(
|
||||
'Invalid input value specified for "input_replace" type')
|
||||
else:
|
||||
# For single input values
|
||||
driver.execute_script(
|
||||
f'arguments[0].value = "{values}";', site_element)
|
||||
except Exception as e:
|
||||
take_snapshot(driver)
|
||||
dump_html(driver)
|
||||
driver.close()
|
||||
driver.quit()
|
||||
raise Exception(e)
|
||||
|
||||
|
||||
def solve_captcha(site_key, url, retry_attempts=3, version='v2', enterprise=False, use_proxy=True):
|
||||
# Manual proxy override set via $ENV
|
||||
if not USE_PROXY:
|
||||
use_proxy = False
|
||||
if CAPTCHA_TESTING:
|
||||
print('Initializing CAPTCHA solver in dummy mode')
|
||||
code = "12345"
|
||||
print("CAPTCHA Successful")
|
||||
return code
|
||||
|
||||
elif use_proxy:
|
||||
print('Using CAPTCHA solver with proxy')
|
||||
else:
|
||||
print('Using CAPTCHA solver without proxy')
|
||||
|
||||
captcha_params = {
|
||||
"url": url,
|
||||
"sitekey": site_key,
|
||||
"version": version,
|
||||
"enterprise": 1 if enterprise else 0,
|
||||
"proxy": {
|
||||
'type': 'socks5',
|
||||
'uri': get_secret('PROXY_USER_AUTH')
|
||||
} if use_proxy else None
|
||||
}
|
||||
|
||||
# Keep retrying until max attempts is reached
|
||||
for _ in range(retry_attempts):
|
||||
# Solver uses 2CAPTCHA by default
|
||||
solver = TwoCaptcha(get_secret("CAPTCHA_API_KEY"))
|
||||
try:
|
||||
print('Waiting for CAPTCHA code...')
|
||||
code = solver.recaptcha(**captcha_params)["code"]
|
||||
print("CAPTCHA Successful")
|
||||
return code
|
||||
except Exception as e:
|
||||
print(f'CAPTCHA Failed! {e}')
|
||||
|
||||
raise Exception(f"CAPTCHA API Failed!")
|
||||
|
||||
|
||||
def whois_lookup(url):
|
||||
try:
|
||||
lookup_info = whois(url)
|
||||
# TODO: Add your own processing here
|
||||
except PywhoisError:
|
||||
print(f"No WhoIs record found for {url}")
|
||||
return lookup_info
|
||||
|
||||
|
||||
def save_browser_session(driver):
|
||||
# Copy over the profile once we finish logging in
|
||||
if isinstance(driver, webdriver.Firefox):
|
||||
# Copy process for Firefox
|
||||
print('Updating saved Firefox profile')
|
||||
# Get the current profile directory from about:support page
|
||||
driver.get("about:support")
|
||||
box = get_element(
|
||||
driver, "id", "profile-dir-box", timeout=4)
|
||||
temp_profile_path = os.path.join(os.getcwd(), box.text)
|
||||
profile_path = '/firefox'
|
||||
# Create the command
|
||||
copy_command = "cp -r " + temp_profile_path + "/* " + profile_path
|
||||
# Copy over the Firefox profile
|
||||
if os.system(copy_command):
|
||||
print("Firefox profile saved")
|
||||
elif isinstance(driver, uc.Chrome):
|
||||
# Copy the Chrome profile
|
||||
print('Updating non-ephemeral Chrome profile')
|
||||
# Flush Code Cache again to speed up copy
|
||||
os.system(
|
||||
'rm -rf "/tmp_chrome_profile/SimpleDMCA Profile/Code Cache/*"')
|
||||
if os.system("cp -r /tmp_chrome_profile/* /chrome"):
|
||||
print("Chrome profile saved")
|
||||
|
||||
# Sample function
|
||||
# Call this within a Celery task
|
||||
# TODO: Modify as needed to your needs
|
||||
|
||||
|
||||
def selenium_action_template(driver):
|
||||
info = {
|
||||
"sample_field1": "sample_data",
|
||||
"sample_field2": "sample_data",
|
||||
"captcha_code": lambda: solve_captcha('SITE_KEY', 'SITE_URL')
|
||||
}
|
||||
|
||||
elements = [
|
||||
{
|
||||
"name": "Enter data for sample field 1",
|
||||
"type": "input",
|
||||
"input": "{first_name}",
|
||||
# If a site implements canary design releases, you can place the ID for the element in the new design
|
||||
"default": {
|
||||
# See get_element() for possible selector types
|
||||
"type": "xpath",
|
||||
"key": ''
|
||||
},
|
||||
# If a site implements canary design releases, you can place the ID for the element in the old design here
|
||||
"failover": {
|
||||
"type": "xpath",
|
||||
"key": ''
|
||||
}
|
||||
},
|
||||
]
|
||||
|
||||
# Dictionary to store values which will be entered via Selenium
|
||||
# Helps prevent duplicates and stale values compared to just using the info variable directly
|
||||
site_form_values = {}
|
||||
|
||||
# Fill in final fstring values in elements
|
||||
for element in elements:
|
||||
if 'input' in element and '{' in element['input']:
|
||||
a = element['input'].strip('{}')
|
||||
if a in info:
|
||||
value = info[a]
|
||||
# Check if the value is a callable (a lambda function) and call it if so
|
||||
if callable(value):
|
||||
# Check if the value has already been called
|
||||
if a not in site_form_values:
|
||||
# Call the value and store it in the dictionary
|
||||
site_form_values[a] = value()
|
||||
# Use the stored value
|
||||
value = site_form_values[a]
|
||||
# Replace the placeholder with the actual value
|
||||
element['input'] = str(value)
|
||||
|
||||
# Execute the selenium actions
|
||||
execute_selenium_elements(driver, 8, elements)
|
Loading…
Add table
Add a link
Reference in a new issue