| | |
| | """ |
| | Enhanced Google Maps Reviews Scraper for David's Bridal |
| | Scrapes reviews from Google Maps with parallel processing and improved element detection |
| | """ |
| |
|
| | import csv |
| | import time |
| | import random |
| | import asyncio |
| | from concurrent.futures import ThreadPoolExecutor, as_completed |
| | from selenium import webdriver |
| | from selenium.webdriver.common.by import By |
| | from selenium.webdriver.support.ui import WebDriverWait |
| | from selenium.webdriver.support import expected_conditions as EC |
| | from selenium.webdriver.chrome.options import Options |
| | from selenium.webdriver.chrome.service import Service |
| | from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException, ElementClickInterceptedException |
| | from webdriver_manager.chrome import ChromeDriverManager |
| | import pandas as pd |
| | from datetime import datetime |
| | import logging |
| | import sys |
| | import threading |
| | from queue import Queue |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| | logger = logging.getLogger(__name__) |
| |
|
| | class EnhancedGoogleMapsReviewsScraper: |
| | def __init__(self, headless=True, wait_time=10, max_workers=3): |
| | """Initialize the scraper with Chrome driver options""" |
| | self.wait_time = wait_time |
| | self.max_workers = max_workers |
| | self.reviews_queue = Queue() |
| | self.processed_reviews = [] |
| | self.lock = threading.Lock() |
| | self.setup_driver(headless) |
| | |
| | def setup_driver(self, headless): |
| | """Set up Chrome driver with appropriate options""" |
| | try: |
| | chrome_options = Options() |
| | if headless: |
| | chrome_options.add_argument("--headless") |
| | chrome_options.add_argument("--no-sandbox") |
| | chrome_options.add_argument("--disable-dev-shm-usage") |
| | chrome_options.add_argument("--disable-blink-features=AutomationControlled") |
| | chrome_options.add_argument("--disable-extensions") |
| | chrome_options.add_argument("--disable-gpu") |
| | chrome_options.add_argument("--remote-debugging-port=9222") |
| | chrome_options.add_argument("--window-size=1920,1080") |
| | chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) |
| | chrome_options.add_experimental_option('useAutomationExtension', False) |
| | chrome_options.add_argument("--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") |
| | |
| | logger.info("Setting up ChromeDriver...") |
| | service = Service(ChromeDriverManager().install()) |
| | |
| | self.driver = webdriver.Chrome(service=service, options=chrome_options) |
| | self.driver.execute_script("Object.defineProperty(navigator, 'webdriver', {get: () => undefined})") |
| | self.wait = WebDriverWait(self.driver, self.wait_time) |
| | logger.info("ChromeDriver setup successful") |
| | |
| | except WebDriverException as e: |
| | logger.error(f"Failed to setup ChromeDriver: {e}") |
| | sys.exit(1) |
| |
|
| | def search_location(self, query): |
| | """Search for David's Bridal location on Google Maps""" |
| | try: |
| | search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" |
| | logger.info(f"Navigating to: {search_url}") |
| | self.driver.get(search_url) |
| | |
| | |
| | time.sleep(5) |
| | |
| | |
| | result_selectors = [ |
| | "button.hh2c6.G7m0Af", |
| | ] |
| | |
| | result_found = False |
| | for selector in result_selectors: |
| | try: |
| | first_result = self.wait.until( |
| | EC.element_to_be_clickable((By.CSS_SELECTOR, selector)) |
| | ) |
| | self.driver.execute_script("arguments[0].click();", first_result) |
| | time.sleep(3) |
| | result_found = True |
| | break |
| | except TimeoutException: |
| | continue |
| | |
| | return result_found |
| | |
| | except Exception as e: |
| | logger.error(f"Error in search_location: {e}") |
| | return False |
| |
|
| | def click_reviews_tab(self): |
| | """Click on the reviews tab using the specific element structure""" |
| | try: |
| | |
| | reviews_button = self.wait.until( |
| | EC.element_to_be_clickable((By.CSS_SELECTOR, "button[data-tab-index='1'][aria-label*='Reviews']")) |
| | ) |
| | |
| | |
| | self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", reviews_button) |
| | time.sleep(1) |
| | |
| | |
| | self.driver.execute_script("arguments[0].click();", reviews_button) |
| | logger.info("Successfully clicked reviews tab") |
| | |
| | |
| | time.sleep(3) |
| | return True |
| | |
| | except Exception as e: |
| | logger.error(f"Could not click reviews tab: {e}") |
| | return False |
| |
|
| | def expand_review_text(self, review_element): |
| | """Expand review text by clicking 'More' button if present""" |
| | try: |
| | |
| | more_button = review_element.find_element( |
| | By.CSS_SELECTOR, |
| | "button.w8nwRe.kyuRq[aria-label='See more']" |
| | ) |
| | |
| | |
| | self.driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", more_button) |
| | time.sleep(0.5) |
| | self.driver.execute_script("arguments[0].click();", more_button) |
| | time.sleep(1) |
| | return True |
| | |
| | except NoSuchElementException: |
| | |
| | return False |
| | except Exception as e: |
| | logger.warning(f"Error expanding review text: {e}") |
| | return False |
| |
|
| | def scroll_and_load_reviews(self, target_count=5000): |
| | """Scroll through reviews to load all available reviews""" |
| | try: |
| | scrollable_container = self.driver.find_element(By.CSS_SELECTOR, "div.m6QErb.DxyBCb.kA9KIf.dS8AEf.XiKgde") |
| | last_review_count = 0 |
| | stagnant_rounds = 0 |
| | max_stagnant_rounds = 5 |
| | scroll_attempts = 0 |
| | max_scroll_attempts = 1000 |
| |
|
| | while scroll_attempts < max_scroll_attempts: |
| | |
| | self.driver.execute_script( |
| | "arguments[0].scrollTo(0, arguments[0].scrollHeight);", |
| | scrollable_container |
| | ) |
| |
|
| | |
| | time.sleep(random.uniform(2, 4)) |
| |
|
| | |
| | current_reviews = len(self.driver.find_elements(By.CSS_SELECTOR, "div[data-review-id]")) |
| | logger.info(f"Attempt {scroll_attempts + 1}: Loaded {current_reviews} reviews (target: {target_count})") |
| |
|
| | |
| | if current_reviews >= target_count: |
| | logger.info("Reached target review count.") |
| | break |
| |
|
| | |
| | if current_reviews == last_review_count: |
| | stagnant_rounds += 1 |
| | logger.info(f"No new reviews this round. Stagnant rounds: {stagnant_rounds}/{max_stagnant_rounds}") |
| | if stagnant_rounds >= max_stagnant_rounds: |
| | logger.info("No new reviews after several attempts. Stopping scroll.") |
| | break |
| | else: |
| | stagnant_rounds = 0 |
| |
|
| | last_review_count = current_reviews |
| | scroll_attempts += 1 |
| |
|
| | |
| | if scroll_attempts % 10 == 0: |
| | logger.info("Taking a longer pause to mimic human browsing...") |
| | time.sleep(random.uniform(5, 8)) |
| |
|
| | logger.info(f"Finished scrolling. Total reviews found: {current_reviews}") |
| | return current_reviews |
| |
|
| | except Exception as e: |
| | logger.error(f"Error scrolling reviews: {e}") |
| | return 0 |
| |
|
| | def extract_single_review_data(self, review_element): |
| | """Extract data from a single review element""" |
| | try: |
| | review_data = {} |
| | |
| | |
| | self.expand_review_text(review_element) |
| | |
| | |
| | try: |
| | name_element = review_element.find_element(By.CSS_SELECTOR, "div[class*='d4r55']") |
| | review_data['reviewer_name'] = name_element.text.strip() |
| | except NoSuchElementException: |
| | review_data['reviewer_name'] = "Anonymous" |
| | |
| | |
| | try: |
| | rating_element = review_element.find_element(By.CSS_SELECTOR, "span[role='img'][aria-label*='star']") |
| | rating_text = rating_element.get_attribute('aria-label') |
| | review_data['rating'] = self.extract_rating_from_text(rating_text) |
| | except NoSuchElementException: |
| | review_data['rating'] = None |
| | |
| | |
| | try: |
| | text_element = review_element.find_element(By.CSS_SELECTOR, "span.wiI7pd") |
| | review_data['review_text'] = text_element.text.strip() |
| | except NoSuchElementException: |
| | review_data['review_text'] = "" |
| | |
| | |
| | try: |
| | date_element = review_element.find_element(By.CSS_SELECTOR, "span.rsqaWe") |
| | review_data['date'] = date_element.text.strip() |
| | except NoSuchElementException: |
| | review_data['date'] = "" |
| | |
| | |
| | try: |
| | response_element = review_element.find_element(By.CSS_SELECTOR, "div[class*='wiI7pd']") |
| | review_data['owner_response'] = response_element.text.strip() |
| | except NoSuchElementException: |
| | review_data['owner_response'] = "" |
| | |
| | |
| | review_data['scraped_at'] = datetime.now().isoformat() |
| | review_data['review_id'] = review_element.get_attribute('data-review-id') or f"review_{int(time.time() * 1000)}" |
| | |
| | return review_data |
| | |
| | except Exception as e: |
| | logger.error(f"Error extracting single review: {e}") |
| | return None |
| |
|
| | def extract_rating_from_text(self, text): |
| | """Extract numeric rating from aria-label text""" |
| | if not text: |
| | return None |
| | |
| | import re |
| | |
| | match = re.search(r'(\d+)\s*(?:out of \d+\s*)?stars?', text.lower()) |
| | if match: |
| | return int(match.group(1)) |
| | |
| | |
| | star_count = text.count('★') or text.count('⭐') |
| | if star_count > 0: |
| | return star_count |
| | |
| | return None |
| |
|
| | def process_reviews_batch(self, review_elements, start_idx, end_idx): |
| | """Process a batch of reviews in parallel""" |
| | batch_results = [] |
| | |
| | for i in range(start_idx, min(end_idx, len(review_elements))): |
| | try: |
| | review_data = self.extract_single_review_data(review_elements[i]) |
| | if review_data: |
| | batch_results.append(review_data) |
| | logger.info(f"Processed review {i+1}/{len(review_elements)}") |
| | except Exception as e: |
| | logger.warning(f"Error processing review {i+1}: {e}") |
| | continue |
| | |
| | return batch_results |
| |
|
| | def extract_all_reviews_parallel(self): |
| | """Extract all reviews using parallel processing with duplicate removal""" |
| | try: |
| | |
| | review_elements = self.driver.find_elements(By.CSS_SELECTOR, "div[data-review-id]") |
| | total_reviews = len(review_elements) |
| | logger.info(f"Found {total_reviews} review elements to process") |
| | |
| | if total_reviews == 0: |
| | return [] |
| | |
| | |
| | processed_review_ids = set() |
| | all_reviews = [] |
| | |
| | |
| | for i, review_element in enumerate(review_elements): |
| | try: |
| | |
| | review_id = review_element.get_attribute('data-review-id') |
| | |
| | if review_id and review_id in processed_review_ids: |
| | logger.debug(f"Skipping duplicate review ID: {review_id}") |
| | continue |
| | |
| | |
| | review_data = self.extract_single_review_data(review_element) |
| | |
| | if review_data and review_data.get('review_id'): |
| | |
| | processed_review_ids.add(review_data['review_id']) |
| | all_reviews.append(review_data) |
| | logger.info(f"Processed review {len(all_reviews)}/{total_reviews}") |
| | |
| | except Exception as e: |
| | logger.warning(f"Error processing review {i+1}: {e}") |
| | continue |
| | |
| | logger.info(f"Successfully extracted {len(all_reviews)} unique reviews") |
| | return all_reviews |
| | |
| | except Exception as e: |
| | logger.error(f"Error in review extraction: {e}") |
| | return [] |
| |
|
| | def save_to_csv(self, reviews_data, filename="davids_bridal_reviews.csv"): |
| | """Save reviews data to CSV file with duplicate removal and better formatting""" |
| | if not reviews_data: |
| | logger.warning("No reviews data to save") |
| | return |
| | |
| | try: |
| | df = pd.DataFrame(reviews_data) |
| | |
| | |
| | initial_count = len(df) |
| | df = df.drop_duplicates(subset=['review_id'], keep='first') |
| | |
| | |
| | df = df.drop_duplicates(subset=['reviewer_name', 'review_text', 'date'], keep='first') |
| | |
| | final_count = len(df) |
| | if initial_count > final_count: |
| | logger.info(f"Removed {initial_count - final_count} duplicate reviews") |
| | |
| | |
| | column_order = ['reviewer_name', 'rating', 'date', 'review_text', 'owner_response', 'review_id', 'scraped_at'] |
| | df = df.reindex(columns=column_order) |
| | |
| | |
| | df.to_csv(filename, index=False, encoding='utf-8') |
| | logger.info(f"Successfully saved {len(df)} unique reviews to {filename}") |
| | |
| | |
| | if 'rating' in df.columns and len(df) > 0: |
| | avg_rating = df['rating'].mean() |
| | logger.info(f"Average rating: {avg_rating:.2f}") |
| | logger.info(f"Rating distribution:\n{df['rating'].value_counts().sort_index()}") |
| | |
| | except Exception as e: |
| | logger.error(f"Error saving to CSV: {e}") |
| |
|
| | def scrape_reviews(self, location_query, output_file="davids_bridal_reviews.csv"): |
| | """Main method to scrape all reviews""" |
| | try: |
| | logger.info("Starting enhanced review scraping...") |
| | |
| | |
| | if not self.search_location(location_query): |
| | logger.error("Failed to find location") |
| | return None |
| | |
| | |
| | if not self.click_reviews_tab(): |
| | logger.error("Failed to access reviews tab") |
| | return None |
| | |
| | |
| | total_loaded = self.scroll_and_load_reviews(target_count=2394) |
| | |
| | if total_loaded == 0: |
| | logger.error("No reviews found after scrolling") |
| | return None |
| | |
| | |
| | reviews_data = self.extract_all_reviews_parallel() |
| | |
| | |
| | if reviews_data: |
| | self.save_to_csv(reviews_data, output_file) |
| | logger.info(f"Successfully scraped {len(reviews_data)} reviews") |
| | return reviews_data |
| | else: |
| | logger.warning("No reviews extracted") |
| | return None |
| | |
| | except Exception as e: |
| | logger.error(f"Error during scraping: {e}") |
| | return None |
| | finally: |
| | self.close() |
| |
|
| | def close(self): |
| | """Close the browser driver""" |
| | if hasattr(self, 'driver'): |
| | self.driver.quit() |
| |
|
| | def main(): |
| | """Enhanced main function with better error handling""" |
| | try: |
| | |
| | scraper = EnhancedGoogleMapsReviewsScraper( |
| | headless=False, |
| | max_workers=3 |
| | ) |
| | |
| | |
| | search_query = "David's Bridal Middletown NY" |
| | |
| | logger.info(f"Starting scrape for: {search_query}") |
| | |
| | |
| | reviews = scraper.scrape_reviews( |
| | location_query=search_query, |
| | output_file="davids_bridal_middletown_reviews.csv" |
| | ) |
| | |
| | if reviews: |
| | logger.info(f"Scraping completed successfully! Total reviews: {len(reviews)}") |
| | else: |
| | logger.error("Scraping failed - no reviews collected") |
| | |
| | except KeyboardInterrupt: |
| | logger.info("Scraping interrupted by user") |
| | except Exception as e: |
| | logger.error(f"Unexpected error in main: {e}") |
| |
|
| | if __name__ == "__main__": |
| | main() |