178 lines
6.1 KiB
Python
178 lines
6.1 KiB
Python
"""
|
|
Selenium scraper for JavaScript-heavy and dynamic websites.
|
|
"""
|
|
from typing import Dict, Any, Optional
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.service import Service
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.common.exceptions import (
|
|
TimeoutException,
|
|
NoSuchElementException,
|
|
WebDriverException
|
|
)
|
|
from webdriver_manager.chrome import ChromeDriverManager
|
|
from bs4 import BeautifulSoup
|
|
from scrapers.base_scraper import BaseScraper
|
|
from utils.retry import retry_with_backoff
|
|
from config import SELENIUM_HEADLESS, SELENIUM_IMPLICIT_WAIT, USER_AGENT
|
|
|
|
|
|
class SeleniumScraper(BaseScraper):
|
|
"""
|
|
Scraper for dynamic websites using Selenium WebDriver.
|
|
"""
|
|
|
|
def __init__(self, headless: bool = SELENIUM_HEADLESS, **kwargs):
|
|
"""
|
|
Initialize Selenium scraper.
|
|
|
|
Args:
|
|
headless: Run browser in headless mode
|
|
**kwargs: Additional arguments for BaseScraper
|
|
"""
|
|
super().__init__(**kwargs)
|
|
self.headless = headless
|
|
self.driver = None
|
|
self._initialize_driver()
|
|
|
|
def _initialize_driver(self):
|
|
"""Initialize Chrome WebDriver with appropriate options."""
|
|
chrome_options = Options()
|
|
|
|
if self.headless:
|
|
chrome_options.add_argument("--headless=new")
|
|
|
|
chrome_options.add_argument(f"user-agent={USER_AGENT}")
|
|
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
|
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
|
chrome_options.add_argument("--no-sandbox")
|
|
chrome_options.add_argument("--disable-gpu")
|
|
chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
|
chrome_options.add_experimental_option("useAutomationExtension", False)
|
|
|
|
try:
|
|
service = Service(ChromeDriverManager().install())
|
|
self.driver = webdriver.Chrome(service=service, options=chrome_options)
|
|
self.driver.implicitly_wait(SELENIUM_IMPLICIT_WAIT)
|
|
self.logger.info("Chrome WebDriver initialized successfully")
|
|
except WebDriverException as e:
|
|
self.logger.error(f"Failed to initialize WebDriver: {str(e)}")
|
|
raise
|
|
|
|
@retry_with_backoff(
|
|
max_retries=2,
|
|
exceptions=(TimeoutException, WebDriverException)
|
|
)
|
|
def scrape(self, url: str, wait_for: Optional[str] = None, **kwargs) -> Dict[str, Any]:
|
|
"""
|
|
Scrape a dynamic website using Selenium.
|
|
|
|
Args:
|
|
url: Target URL to scrape
|
|
wait_for: CSS selector to wait for before returning
|
|
**kwargs: Additional parameters
|
|
|
|
Returns:
|
|
Dictionary containing page source and BeautifulSoup object
|
|
"""
|
|
self.logger.info(f"Scraping URL with Selenium: {url}")
|
|
self.rate_limiter.wait()
|
|
|
|
try:
|
|
self.driver.get(url)
|
|
|
|
# Wait for specific element if provided
|
|
if wait_for:
|
|
timeout = kwargs.get('timeout', 10)
|
|
WebDriverWait(self.driver, timeout).until(
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, wait_for))
|
|
)
|
|
|
|
page_source = self.driver.page_source
|
|
soup = BeautifulSoup(page_source, 'lxml')
|
|
|
|
return {
|
|
"url": url,
|
|
"html": page_source,
|
|
"soup": soup,
|
|
"title": self.driver.title,
|
|
"current_url": self.driver.current_url,
|
|
"success": True
|
|
}
|
|
|
|
except (TimeoutException, WebDriverException) as e:
|
|
self.logger.error(f"Selenium scraping failed for {url}: {str(e)}")
|
|
return {
|
|
"url": url,
|
|
"error": str(e),
|
|
"success": False
|
|
}
|
|
|
|
def click_element(self, selector: str, by: By = By.CSS_SELECTOR, timeout: int = 10):
|
|
"""
|
|
Click an element on the page.
|
|
|
|
Args:
|
|
selector: Element selector
|
|
by: Selenium By strategy (default: CSS_SELECTOR)
|
|
timeout: Wait timeout in seconds
|
|
"""
|
|
try:
|
|
element = WebDriverWait(self.driver, timeout).until(
|
|
EC.element_to_be_clickable((by, selector))
|
|
)
|
|
element.click()
|
|
self.logger.info(f"Clicked element: {selector}")
|
|
except (TimeoutException, NoSuchElementException) as e:
|
|
self.logger.error(f"Failed to click element {selector}: {str(e)}")
|
|
raise
|
|
|
|
def fill_form(self, selector: str, text: str, by: By = By.CSS_SELECTOR):
|
|
"""
|
|
Fill a form field with text.
|
|
|
|
Args:
|
|
selector: Element selector
|
|
text: Text to input
|
|
by: Selenium By strategy
|
|
"""
|
|
try:
|
|
element = self.driver.find_element(by, selector)
|
|
element.clear()
|
|
element.send_keys(text)
|
|
self.logger.info(f"Filled form field: {selector}")
|
|
except NoSuchElementException as e:
|
|
self.logger.error(f"Form field not found {selector}: {str(e)}")
|
|
raise
|
|
|
|
def execute_script(self, script: str):
|
|
"""
|
|
Execute JavaScript in the browser.
|
|
|
|
Args:
|
|
script: JavaScript code to execute
|
|
|
|
Returns:
|
|
Result of script execution
|
|
"""
|
|
return self.driver.execute_script(script)
|
|
|
|
def take_screenshot(self, filepath: str):
|
|
"""
|
|
Take a screenshot of the current page.
|
|
|
|
Args:
|
|
filepath: Path to save the screenshot
|
|
"""
|
|
self.driver.save_screenshot(filepath)
|
|
self.logger.info(f"Screenshot saved to {filepath}")
|
|
|
|
def cleanup(self):
|
|
"""Quit the WebDriver and cleanup resources."""
|
|
if self.driver:
|
|
self.driver.quit()
|
|
self.logger.info("WebDriver closed")
|
|
|