Update webdriver utility functions and add a sample celery task for scraping Google search

This commit is contained in:
Keannu Bernasol 2024-09-24 16:08:28 +08:00
parent 0e902b1f04
commit 9e2e32fba8
11 changed files with 193 additions and 25 deletions

View file

@ -111,7 +111,8 @@ INSTALLED_APPS = [
'payments',
'billing',
'emails',
'notifications'
'notifications',
'search_results'
]
if DEBUG:
@ -393,5 +394,5 @@ CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
DATA_UPLOAD_MAX_NUMBER_FIELDS = 20480
GRAPH_MODELS = {
'app_labels': ['accounts', 'user_groups', 'billing', 'emails', 'payments', 'subscriptions']
'app_labels': ['accounts', 'user_groups', 'billing', 'emails', 'payments', 'subscriptions', 'search_results']
}

View file

View file

@ -0,0 +1,10 @@
from unfold.admin import ModelAdmin
from django.contrib import admin
from .models import SearchResult
@admin.register(SearchResult)
class SearchResultAdmin(ModelAdmin):
model = SearchResult
search_fields = ('id', 'title', 'link')
list_display = ['id', 'title']

View file

@ -0,0 +1,6 @@
from django.apps import AppConfig
class SearchResultsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'search_results'

View file

@ -0,0 +1,23 @@
# Generated by Django 5.0.6 on 2024-09-24 07:47
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='SearchResult',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('title', models.CharField(max_length=1000)),
('link', models.CharField(max_length=1000)),
('timestamp', models.DateTimeField(auto_now_add=True)),
],
),
]

View file

@ -0,0 +1,7 @@
from django.db import models
class SearchResult(models.Model):
title = models.CharField(max_length=1000)
link = models.CharField(max_length=1000)
timestamp = models.DateTimeField(auto_now_add=True, editable=False)

View file

@ -0,0 +1,16 @@
from celery import shared_task
from .models import SearchResult
@shared_task(autoretry_for=(Exception,), retry_kwargs={'max_retries': 0, 'countdown': 5})
def create_search_result(title, link):
if SearchResult.objects.filter(title=title, link=link).exists():
return ("SearchResult entry already exists")
else:
SearchResult.objects.create(
title=title,
link=link
)
return f"Created new SearchResult entry titled: {title}"

View file

@ -1,16 +1,43 @@
from celery import shared_task
from webdriver.utils import setup_webdriver, selenium_action_template
# Sample Celery Selenium function
# TODO: Modify this as needed
from webdriver.utils import setup_webdriver, selenium_action_template, google_search, get_element, get_elements
from selenium.webdriver.common.by import By
from search_results.tasks import create_search_result
@shared_task(autoretry_for=(Exception,), retry_kwargs={'max_retries': 6, 'countdown': 5})
# Task template
@shared_task(autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 5})
def sample_selenium_task():
driver = setup_webdriver()
driver = setup_webdriver(use_proxy=False, use_saved_session=False)
driver.get("Place URL here")
selenium_action_template(driver)
# Place any other actions here after Selenium is done executing
# TODO: Modify this as needed
# Once completed, always close the session
driver.close()
driver.quit()
# Sample task to scrape Google for search results based on a keyword
@shared_task(autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 5})
def simple_google_search():
driver = setup_webdriver(use_proxy=False, use_saved_session=False)
driver.get(f"https://google.com/")
google_search(driver, search_term="cat blog posts")
# Count number of Google search results
search_items = get_elements(
driver, "xpath", '//*[@id="search"]/div[1]/div[1]/*')
for item in search_items:
title = item.find_element(By.TAG_NAME, 'h3').text
link = item.find_element(By.TAG_NAME, 'a').get_attribute('href')
create_search_result.apply_async(
kwargs={"title": title, "link": link})
driver.close()
driver.quit()

View file

@ -3,9 +3,7 @@ Settings file to hold constants and functions
"""
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
import os
from config.settings import get_secret
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver import FirefoxOptions
@ -16,6 +14,8 @@ from config.settings import get_secret
from twocaptcha import TwoCaptcha
from whois import whois
from whois.parser import PywhoisError
import os
import random
def take_snapshot(driver, filename='dump.png'):
@ -135,23 +135,19 @@ def setup_webdriver(driver_type="chrome", use_proxy=True, use_saved_session=Fals
print(f'External IP: {ip_address}')
return driver
# These are wrapper function for quickly automating multiple steps in webscraping (logins, button presses, text inputs, etc.)
# Depending on your use case, you may have to opt out of using this
# Function to get the element once it has loaded in
def get_element(driver, by, key, hidden_element=False, timeout=8):
try:
if by == "xpath":
by = By.XPATH
elif by == "css":
by = By.CSS_SELECTOR
elif by == "id":
by = By.ID
elif by == "tagname":
by = By.TAG_NAME
elif by == "name":
by = By.NAME
elif by == "classname":
by == By.CLASS_NAME
# Convert string-based locators to By objects (By.XPATH, By.CSS, etc.)
if isinstance(by, str):
by = getattr(By, by.upper())
wait = WebDriverWait(driver, timeout=timeout)
if not hidden_element:
element = wait.until(
@ -161,9 +157,39 @@ def get_element(driver, by, key, hidden_element=False, timeout=8):
(by, key)))
return element
except Exception:
dump_html(driver)
take_snapshot(driver)
driver.close()
driver.quit()
raise Exception(f"Unable to get element of {by} value: {key}")
def get_elements(driver, by, key, hidden_element=False, timeout=8):
try:
# Convert string-based locators to By objects (By.XPATH, By.CSS, etc.)
if isinstance(by, str):
by = getattr(By, by.upper())
wait = WebDriverWait(driver, timeout=timeout)
if hidden_element:
elements = wait.until(
EC.presence_of_all_elements_located((by, key)))
else:
visible_elements = wait.until(
EC.visibility_of_any_elements_located((by, key)))
elements = [
element for element in visible_elements if element.is_enabled()]
return elements
except Exception:
dump_html(driver)
take_snapshot(driver)
driver.close()
driver.quit()
raise Exception(f"Unable to get elements of {by} value: {key}")
def execute_selenium_elements(driver, timeout, elements):
try:
for index, element in enumerate(elements):
@ -265,7 +291,7 @@ def solve_captcha(site_key, url, retry_attempts=3, version='v2', enterprise=Fals
use_proxy = False
if CAPTCHA_TESTING:
print('Initializing CAPTCHA solver in dummy mode')
code = "12345"
code = random.randint()
print("CAPTCHA Successful")
return code
@ -340,6 +366,7 @@ def save_browser_session(driver):
def selenium_action_template(driver):
# Data that might need to be entered during webscraping
info = {
"sample_field1": "sample_data",
"sample_field2": "sample_data",
@ -350,7 +377,7 @@ def selenium_action_template(driver):
{
"name": "Enter data for sample field 1",
"type": "input",
"input": "{first_name}",
"input": "{sample_field1}",
# If a site implements canary design releases, you can place the ID for the element in the new design
"default": {
# See get_element() for possible selector types
@ -388,3 +415,54 @@ def selenium_action_template(driver):
# Execute the selenium actions
execute_selenium_elements(driver, 8, elements)
# Sample task for Google search
def google_search(driver, search_term):
info = {
"search_term": search_term,
}
elements = [
{
"name": "Type in search term",
"type": "input",
"input": "{search_term}",
"default": {
"type": "xpath",
"key": '//*[@id="APjFqb"]'
},
"failover": {
"type": "xpath",
"key": '//*[@id="APjFqb"]'
}
},
{
"name": "Press enter",
"type": "input_enter",
"default": {
"type": "xpath",
"key": '//*[@id="APjFqb"]'
},
"failover": {
"type": "xpath",
"key": '//*[@id="APjFqb"]'
}
},
]
site_form_values = {}
for element in elements:
if 'input' in element and '{' in element['input']:
a = element['input'].strip('{}')
if a in info:
value = info[a]
if callable(value):
if a not in site_form_values:
site_form_values[a] = value()
value = site_form_values[a]
element['input'] = str(value)
execute_selenium_elements(driver, 8, elements)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 148 KiB

After

Width:  |  Height:  |  Size: 153 KiB