Руководство по распределенному веб-кроулингу

Узнайте о стратегиях распределенного веб-скраплинга, архитектуре и реальных примерах запуска масштабируемых скреперов на нескольких машинах.
13 мин. чтения
Guide to Distributed Web Crawling blog image

Распределенный веб-краулинг – это стратегия масштабирования веб-краулеров на нескольких машинах, что позволяет преодолеть ограничения одноузловых краулеров. В этой статье мы рассмотрим:

  • Распределенный веб-краулинг в сравнении с веб-краулингом на одном узле
  • Основная архитектура распределенного веб-ползания
  • Реальные примеры распределенного веб-ползания
  • Стратегии внедрения и передовой опыт
  • Распространенные подводные камни и способы их устранения

TL;DR: Распределенный веб-краулинг использует кластер машин для параллельного просмотра веб-сайтов, решая проблемы масштабируемости и скорости, с которыми не могут справиться одноузловые краулеры. Это обеспечивает более высокую пропускную способность и надежность (нет одного узкого места) за счет дополнительной архитектурной сложности и накладных расходов.

Распределенное и одноузловое ползание

Большинству проектов не нужны распределенные системы, однако команды регулярно тратят месяцы на создание сложных распределенных архитектур, когда достаточно одного сервера.

В одноузловом краулере одна машина выполняет все операции по сбору, разбору и хранению информации. Такую систему легче разрабатывать и поддерживать, и она экономит ваши деньги. Она отлично подходит для получения 60-500 страниц в минуту, но по мере роста потребностей в краулинге один узел станет узким местом, поскольку вы будете ограничены процессором, памятью и сетевыми ограничениями.

В отличие от них распределенные краулеры распределяют работу между несколькими узлами, обеспечивая одновременную выборку в масштабе, высокую скорость и повышенную отказоустойчивость. Если один рабочий выходит из строя, другие продолжают работу, повышая тем самым надежность. Компромисс заключается в том, что распределенные системы требуют очередей сообщений, синхронизации границы URL и тщательного проектирования, чтобы избежать дублирования или переполнения целевых сайтов.

Всестороннее сравнение

Аспект Одноузловой Распределенный
Производительность В среднем 4 секунды на страницу, 60-120 страниц в минуту В 30 раз быстрее, 50 000+ запросов в секунду
Масштабируемость Ограниченность ресурсов одной машины Линейное масштабирование по узлам
Отказоустойчивость Единая точка отказа Автоматическое восстановление после отказа, самовосстановление
Географическое распределение Фиксированное местоположение Развертывание в нескольких регионах
Использование ресурсов Только вертикальное масштабирование Оптимизировано горизонтальное масштабирование
Сложность Простая настройка, минимальные накладные расходы Сложная оркестровка, более высокие эксплуатационные расходы
Стоимость Низкие первоначальные инвестиции Более высокие затраты на инфраструктуру, более высокая рентабельность инвестиций при масштабировании
Техническое обслуживание Минимальная эксплуатационная нагрузка Требуется опыт работы с распределенными системами
Обработка данных Только локальная обработка Параллельная обработка на всех узлах
Антиобнаружение Ограниченная ротация ИС Расширенное управление прокси-серверами, отпечатки пальцев

Стоит ли вам переходить на распределенную систему? (Дерево принятия решений)

Дерево решений, показывающее, является ли распространение правильным подходом

Основные строительные блоки и архитектура

Если вы решили использовать распределенный краулинг, следующим шагом будет разбивка того, что вы собираетесь построить. Подумайте об этом, как о сборе высокопроизводительной гоночной команды, где каждый компонент выполняет определенную работу, и все они должны работать слаженно. Вот ключевые компоненты, которые понадобятся для создания системы распределенного поиска:

Планировщик / Очередь (Мозг)

Сердцем распределенного краулера является планировщик или очередь задач, которая координирует работу между узлами, и именно в ней живут ваши URL-адреса, прежде чем они попадут в краулер. Компонент планировщика может также управлять вежливостью (временем) и повторными попытками. Например, вы можете реализовать очереди для конкретных доменов, чтобы один сайт не попадал ко всем работникам сразу.

У планировщиков есть три основных варианта, каждый из которых имеет свою индивидуальность:

  • Кафка: Это чемпион в тяжелом весе. Он создан для работы с огромной пропускной способностью и не боится пота, обрабатывая миллионы сообщений в секунду. Прелесть заключается в том, что он основан на журналах, что идеально подходит для управления границей URL. Вы можете сделать разделение по доменам, чтобы сделать ползание вежливым.
  • RabbitMQ: Это как швейцарский армейский нож. Более гибкая маршрутизация, чем в Kafka, с такими функциями, как приоритетные очереди. RabbitMQ имеет хранилище в памяти, поэтому он быстрее для небольших рабочих нагрузок. Отлично подходит, когда вам нужны разные стратегии поиска для разных типов контента.
  • Celery: Лучший друг разработчика Python. Этот вариант не так эффективен, как другие, но он прост в использовании. Celery идеально подходит для создания прототипов или средних по масштабу работ, когда вам нужно быстро заставить что-то работать.

URL Frontier и дедупликация: Память кроулера

Случалось ли вам случайно просмотреть одну и ту же страницу 1000 раз? Именно здесь вас спасет дедупликация. Вам нужно отслеживать просмотренное, соблюдая при этом вежливость сервера, чтобы не повторять один и тот же домен.

Наборы Redis могут обеспечить идеальную точность, но они занимают много памяти. Фильтры Блума используют на 90 % меньше памяти (1,2 ГБ против 12 ГБ+ для миллиарда URL), но иногда дают ложные срабатывания (могут сказать, что вы не видели URL, когда вы его видели), поэтому вам лучше выбрать эту реализацию Redis:

class DistributedURLFrontier:
    def __init__(self, redis_client):
        self.redis = redis_client

    def add_url(self, url, priority=0):
        domain = urlparse(url).netloc

        # Skip if already seen
        if self.redis.sismember("seen_urls", url):
            return

        # Mark as seen and queue by domain
        self.redis.sadd("seen_urls", url)
        self.redis.lpush(f"queue:{domain}", url)
        self.redis.zadd("priority_queue", {url: priority})

    def get_next_url(self):
        # Get highest priority URL
        result = self.redis.zrevrange("priority_queue", 0, 0)
        if not result:
            return None

        url = result[0]
        domain = urlparse(url).netloc

        # Respect crawl delay (1 second between requests per domain)
        last_crawl = self.redis.get(f"last_crawl:{domain}")
        if last_crawl and time.time() - float(last_crawl) < 1.0:
            return None

        # Remove from queues and update last crawl time
        self.redis.zrem("priority_queue", url)
        self.redis.rpop(f"queue:{domain}")
        self.redis.set(f"last_crawl:{domain}", time.time())

        return url

Рабочие узлы (мышцы)

Рабочие узлы – это рабочие лошадки краулинга. Они представляют собой процессы или машины, которые фактически выполняют работу по наполнению, например, получают URL-адреса и обрабатывают содержимое. На каждом рабочем узле выполняется идентичная логика краулинга (например, один и тот же скрипт или приложение на Python), но они работают параллельно с разными URL из очереди.

Чтобы получить максимальную отдачу от рабочих, необходимо сделать их статичными, чтобы любое состояние (посещенные URL, результаты и т. д.) хранилось в общем хранилище или передавалось через сообщения. Таким образом, любой работник может взяться за любую работу, а когда один из них умирает, другие мгновенно подхватывают слабину, не пропуская ни одного удара.

class DistributedWorker:
    def __init__(self, worker_id, max_concurrent=50):
        self.worker_id = worker_id
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=30),
            connector=aiohttp.TCPConnector(limit=100)
        )

    async def crawl_batch(self, urls):
        tasks = [self.crawl_url(url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

    async def crawl_url(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    content = await response.text()
                    return {'url': url, 'content': content, 'status': response.status}
            except Exception as e:
                return {'url': url, 'error': str(e)}

Совет: при использовании рабочих не стоит применять кувалду для всего. Вы должны использовать легкие HTTP-рабочие для статического HTML и тяжелые Puppeteer-рабочие для страниц с JavaScript-рендерингом. Разные инструменты, разные пулы рабочих. Вы можете легко выбрать подходящие типы прокси для вашего пула рабочих с помощью нашего исчерпывающего руководства по выбору прокси.

Уровень хранения (Хранилище)

Уровень хранения – это место, где вы сохраняете собранные данные и метаданные, и он часто состоит из двух частей:

  • Хранилище контента обрабатывает большие объемы необработанных HTML, ответов JSON, изображений и PDF-файлов. Считайте, что это ваш цифровой склад. Объектные хранилища, такие как S3, Google Cloud Storage или HDFS, отлично подходят для этой цели, поскольку они бесконечно масштабируются и обрабатывают одновременные записи от нескольких рабочих, не вытирая пота.
  • Вхранилище метаданных хранится структурированное золото, которое вы извлекли – разобранные поля, связи между сущностями, временные метки выполнения и статус успеха/неудачи. Эти данные используются в базах данных, оптимизированных для запросов и обновлений, а не только для хранения.

Распределенные краулеры нуждаются в хранилищах, способных обрабатывать массивные одновременные записи, не захлебываясь. Объектные хранилища, такие как S3 или Google Cloud Storage, отлично справляются с необработанным контентом, поскольку они бесконечно масштабируются, а базы данных NoSQL (MongoDB, Cassandra) или SQL эффективно обрабатывают структурированные метаданные.

Мониторинг и оповещение

Работа распределенного краулера требует контроля за производительностью системы. Вы можете использовать Prometheus и Grafana для создания комплексных панелей мониторинга, которые отслеживают скорость переползания, успешность, время отклика и глубину очереди. Ключевые показатели включают запросы в секунду по доменам, время отклика 95-го процентиля и динамику изменения размера очереди.

Уровень защиты от ботов и уклонения

Масштабная работа в Интернете означает постоянные игры в кошки-мышки с системами защиты от ботов. Вам нужны три уровня защиты: ротация IP-адресов на тысячах прокси-серверов в жилых домах и центрах обработки данных, рандомизация отпечатков пальцев пользовательских агентов и сигнатур браузеров, а также имитация поведения, чтобы избежать шаблонов обнаружения.

Bright Data Web Unlocker предлагает возможности защиты от ботов корпоративного уровня с коэффициентом успешности 99%+, достигаемым благодаря автоматическому решению CAPTCHA, ротации IP-адресов и отпечатку браузера. Его подход, основанный на API, упрощает интеграцию при решении сложных задач по борьбе с ботами.

class BrightDataWebUnlocker:
    def crawl_url(self, url: str, options: Dict = None) -> Dict:
        payload = {
            "url": url,
            "zone": self.zone,
            "format": "raw",
            "country": "US",
            "render_js": True,
            "wait_for_selector": ".content"
        }

        response = requests.post(
            self.base_url,
            headers={"Authorization": f"Bearer {self.api_key}"},
            json=payload,
            timeout=60
        )

Продвинутая ротация прокси реализует проверку работоспособности, географическую оптимизацию и восстановление после сбоев в пулах прокси для жилых домов, центров обработки данных и мобильных устройств. Для успешного управления прокси требуется 1000+ IP-адресов с интеллектуальными алгоритмами ротации.

Избежание отпечатков пальцев рандомизирует пользовательские агенты, отпечатки браузеров и характеристики сети, чтобы предотвратить обнаружение сложными системами защиты от ботов. Сюда входят ротация отпечатков TLS, подмена отпечатков холста и имитация поведенческих моделей.

Реальные примеры использования в реальном мире с примерами кода

Давайте рассмотрим два распространенных случая использования распределенных краулеров и расскажем, как их можно реализовать с помощью фрагментов кода. Для простоты мы будем использовать Python и Celery, но принципы применимы в целом.

Пример 1: Мониторинг цен в электронной коммерции

Представьте, что вы ежедневно отслеживаете цены конкурентов на 50 000 страниц товаров. Если вы попытаетесь использовать одну машину для поиска всех этих URL-адресов, вам придется потратить 12+ часов на ползание по сайту, при условии, что ничего не сломается. Кроме того, большинство сайтов электронной коммерции начнут блокировать вас после нескольких тысяч быстрых запросов с одного и того же IP.

Здесь на помощь приходит распределенное ползание. Вместо одной перегруженной машины вы распределяете эти 50 000 URL по десяткам рабочих, каждый из которых использует разные IP-адреса. То, на что раньше уходило полдня, теперь завершается за 2-3 часа, и вы остаетесь незамеченными для систем защиты от ботов.

Настройка проста. Вам нужно вести списки URL-адресов конкурентов (взять их из sitemaps или discovery crawls), а затем использовать что-то вроде Celery с Redis для распределения работы. Каждое утро вы ставите в очередь все 50 000 URL-адресов, и ваша армия рабочих приступает к работе. Рабочий 1 занимается кроссовками Nike, рабочий 2 – кроссовками Adidas, рабочий 3 – ценами Puma. Все одновременно, все с разных IP.

from celery import Celery
import requests
from bs4 import BeautifulSoup
import random
import time
import re
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Initialize Celery app with Redis as broker
app = Celery('price_monitor', broker='redis://localhost:6379/0')

# Realistic user agents for rotation
USER_AGENTS = [
   "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
   "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
   "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
   "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15"
]

# Proxy pool (replace with your actual proxy service)
PROXY_POOL = [
   "<http://proxy1:8080>",
   "<http://proxy2:8080>",
   "<http://proxy3:8080>",
   # Add your proxy endpoints here
]

def get_session_with_retries():
   """Create a session with retry strategy and random proxy."""
   session = requests.Session()

   # Retry strategy for resilience
   retry_strategy = Retry(
       total=3,
       backoff_factor=1,
       status_forcelist=[429, 500, 502, 503, 504],
   )
   adapter = HTTPAdapter(max_retries=retry_strategy)
   session.mount("http://", adapter)
   session.mount("https://", adapter)

   # Random proxy rotation
   if PROXY_POOL:
       proxy = random.choice(PROXY_POOL)
       session.proxies = {"http": proxy, "https": proxy}

   return session

@app.task(bind=True, max_retries=3)
def fetch_product_price(self, url, site_config=None):
   """Fetches product price with full anti-detection measures."""

   # Human-like delay before starting
   time.sleep(random.uniform(2, 8))

   # Randomized headers to avoid fingerprinting
   headers = {
       "User-Agent": random.choice(USER_AGENTS),
       "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
       "Accept-Language": "en-US,en;q=0.9",
       "Accept-Encoding": "gzip, deflate, br",
       "Connection": "keep-alive",
       "Upgrade-Insecure-Requests": "1",
       "Sec-Fetch-Dest": "document",
       "Sec-Fetch-Mode": "navigate",
       "Sec-Fetch-Site": "none",
       "Cache-Control": "max-age=0"
   }

   try:
       session = get_session_with_retries()
       resp = session.get(url, headers=headers, timeout=30)
       resp.raise_for_status()

       # Parse the page for price
       soup = BeautifulSoup(resp.text, 'html.parser')
       price_value = extract_price(soup, url, site_config)

       if price_value:
           # Store in database (implement your storage logic here)
           store_price_data(url, price_value, resp.status_code)
           return {"url": url, "price": price_value, "status": "success"}
       else:
           return {"url": url, "error": "Price not found", "status": "failed"}

   except requests.exceptions.RequestException as e:
       print(f"Request failed for {url}: {e}")

       # Retry with exponential backoff
       if self.request.retries < self.max_retries:
           raise self.retry(countdown=60 * (2 ** self.request.retries))

       return {"url": url, "error": str(e), "status": "failed"}

def extract_price(soup, url, site_config=None):
   """Extract price using multiple strategies."""

   # Site-specific selectors (customize for each competitor)
   price_selectors = [
       ".price", ".product-price", ".current-price", ".sale-price",
       "[data-price]", ".price-current", ".price-now", ".offer-price"
   ]

   # Try configured selectors first
   if site_config and site_config.get('price_selector'):
       price_selectors.insert(0, site_config['price_selector'])

   price_text = None
   for selector in price_selectors:
       price_elem = soup.select_one(selector)
       if price_elem:
           price_text = price_elem.get_text(strip=True)
           break

   # Try data attributes as fallback
   if not price_text:
       price_elem = soup.find(attrs={"data-price": True})
       if price_elem:
           price_text = price_elem.get("data-price")

   if not price_text:
       return None

   # Clean and parse price
   return parse_price(price_text)

def parse_price(price_text):
   """Parse price from various formats."""
   # Remove common currency symbols and whitespace
   cleaned = re.sub(r'[^\\d.,]', '', price_text)

   # Handle formats like "1,299.99" or "1299.99"
   try:
       # Remove commas and convert to float
       if ',' in cleaned and '.' in cleaned:
           # Format: 1,299.99
           price_value = float(cleaned.replace(',', ''))
       elif ',' in cleaned:
           # Could be European format: 1299,99
           if cleaned.count(',') == 1 and len(cleaned.split(',')[1]) == 2:
               price_value = float(cleaned.replace(',', '.'))
           else:
               # Format: 1,299 (no cents)
               price_value = float(cleaned.replace(',', ''))
       else:
           price_value = float(cleaned)

       return price_value

   except ValueError:
       print(f"Could not parse price from: {price_text}")
       return None

def store_price_data(url, price, status_code):
   """Store price data in your database."""
   # Implement your storage logic here
   # Could be PostgreSQL, MongoDB, or any other database
   print(f"Storing: {url} -> ${price} (Status: {status_code})")

# Site-specific configurations for better accuracy
SITE_CONFIGS = {
   "competitor1.com": {"price_selector": ".price-box .price"},
   "competitor2.com": {"price_selector": "[data-testid='price']"},
   "competitor3.com": {"price_selector": ".product-price-value"},
}

def get_site_config(url):
   """Get site-specific configuration."""
   for domain, config in SITE_CONFIGS.items():
       if domain in url:
           return config
   return None

# Load your 50k product URLs (from database, file, or API)
def load_product_urls():
   """Load URLs from your data source."""
   # Replace with your actual data loading logic
   urls = [
       "<https://competitor1.com/product/123>",
       "<https://competitor2.com/product/456>",
       # ... 49,998 more URLs
   ]
   return urls

# Main execution: dispatch all crawling tasks
def start_daily_price_monitoring():
   """Start the daily price monitoring job."""
   product_urls = load_product_urls()

   print(f"Starting crawl for {len(product_urls)} URLs...")

   for url in product_urls:
       site_config = get_site_config(url)
       fetch_product_price.delay(url, site_config)

   print("All tasks queued successfully!")

# Run with: python -m celery worker -A price_monitor --loglevel=info
# Start monitoring with: start_daily_price_monitoring()

В приведенном выше усовершенствованном коде fetch_product_price – это надежная задача Celery, предназначенная для мониторинга цен в масштабах предприятия. Вызывая delay(url, site_config) для каждого URL, мы ставим задачи в очередь в Redis, где 100+ рабочих могут получить их мгновенно. Распределенный подход превращает 12-часовое ползание на одной машине в 2-3-часовую операцию для всего парка рабочих.

Ключевые производственные соображения:

  • Управление прокси очень важно: в этом примере используется PROXY_POOL, который чередует IP-адреса при каждом запросе, что очень важно для 50 000 URL-адресов. Без этого вы, по сути, осуществляете DoS’инг целевых сайтов с одного IP, гарантируя блокировку.
  • Ограничение скорости на домен: Даже при распределении 50 000 URL-адресов с одного сайта конкурента будут вызывать тревогу, если все они попадут на сайт в течение нескольких минут. Мы включаем человекоподобные задержки(time.sleep(random.uniform(2, 8))), но рассматриваем возможность дросселирования в зависимости от домена.
  • Планирование и мониторинг. Используйте Celery Beat для ежедневного планирования или интегрируйте с Airflow для сложных рабочих процессов. Функция start_daily_price_monitoring() может быть запущена через cron или вашу платформу оркестровки.
  • Интеграция конвейера данных. После каждого просмотра функция store_price_data() сохраняет результаты в вашей базе данных.
  • Устойчивость к сбоям. Код включает логику повторных попыток с экспоненциальным отступлением, но планируйте частичные отказы. Если 5 % URL-адресов постоянно терпят неудачу, выясните, не были ли эти продукты сняты с производства, перемещены, или если на этих конкретных сайтах применяются более жесткие меры по борьбе с ботами, требующие других подходов.

Пример использования 2: SEO и маркетинговые исследования

SEO и маркетинговые исследования требуют просмотра миллионов страниц в двух важнейших направлениях: анализ контента и мониторинг поисковых систем. Вы не просто собираете информацию, вы создаете конкурентные данные, которые требуют скорости, скрытности и точности.

Если вы хотите отслеживать упоминания ключевых слов на 1 миллионе страниц конкурентов и одновременно контролировать рейтинг в SERP для сотен целевых ключевых слов ежедневно, одна машина займет несколько недель и будет заблокирована в течение нескольких часов. Это кричит о необходимости распределенной архитектуры.

Распределенный подход к поиску информации в Интернете можно разделить на два потока:

  • Анализ контента: Просматривайте сайты конкурентов, новостные издания и отраслевые блоги, чтобы отслеживать плотность ключевых слов, пробелы в контенте и тенденции рынка.
  • SERP Surveillance: Мониторинг рейтингов Google/Bing по целевым ключевым словам, отслеживание позиций конкурентов и изменений в функциях SERP
from celery import Celery
import requests
from bs4 import BeautifulSoup
import redis
import hashlib
import json
import time
import random
import re
from urllib.parse import urljoin, urlparse
from dataclasses import dataclass
from typing import List, Dict, Optional
import logging

# Initialize Celery and Redis
app = Celery('seo_intelligence', broker='redis://localhost:6379/0')
redis_client = redis.Redis(host='localhost', port=6379, db=1)

# Anti-detection configurations
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15"
]

PROXY_POOL = [
    "<http://user:[email protected]:8080>",
    "<http://user:[email protected]:8080>",
    # Add your proxy endpoints
]

@dataclass
class KeywordData:
    keyword: str
    frequency: int
    context: List[str]  # Surrounding text snippets
    url: str
    domain: str

@dataclass
class SERPResult:
    keyword: str
    position: int
    title: str
    url: str
    snippet: str
    domain: str

class SEOCrawler:
    def __init__(self):
        self.session = self._create_session()
        
    def _create_session(self):
        session = requests.Session()
        if PROXY_POOL:
            proxy = random.choice(PROXY_POOL)
            session.proxies = {"http": proxy, "https": proxy}
        return session
    
    def _get_headers(self):
        return {
            "User-Agent": random.choice(USER_AGENTS),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Cache-Control": "max-age=0"
        }

# Deduplication utilities
def get_url_hash(url: str) -> str:
    """Generate consistent hash for URL deduplication."""
    return hashlib.md5(url.encode()).hexdigest()

def is_url_processed(url: str) -> bool:
    """Check if URL was already processed today."""
    url_hash = get_url_hash(url)
    today = time.strftime("%Y-%m-%d")
    return redis_client.exists(f"processed:{today}:{url_hash}")

def mark_url_processed(url: str):
    """Mark URL as processed with 24h expiry."""
    url_hash = get_url_hash(url)
    today = time.strftime("%Y-%m-%d")
    redis_client.setex(f"processed:{today}:{url_hash}", 86400, 1)

# Stream 1: Content Intelligence Crawling
@app.task(bind=True, max_retries=3)
def crawl_content_for_keywords(self, url: str, target_keywords: List[str]):
    """Crawl a page and extract keyword intelligence."""
    
    # Skip if already processed today
    if is_url_processed(url):
        return {"status": "skipped", "reason": "already_processed", "url": url}
    
    # Human-like delay
    time.sleep(random.uniform(3, 7))
    
    try:
        crawler = SEOCrawler()
        response = crawler.session.get(
            url, 
            headers=crawler._get_headers(), 
            timeout=30
        )
        response.raise_for_status()
        
        # Extract content and analyze keywords
        soup = BeautifulSoup(response.text, 'html.parser')
        content_data = extract_keyword_intelligence(soup, url, target_keywords)
        
        # Store results
        store_keyword_data(content_data)
        mark_url_processed(url)
        
        return {
            "status": "success",
            "url": url,
            "keywords_found": len(content_data),
            "total_mentions": sum(kd.frequency for kd in content_data)
        }
        
    except Exception as e:
        logging.error(f"Content crawl failed for {url}: {e}")
        if self.request.retries < self.max_retries:
            raise self.retry(countdown=60 * (2 ** self.request.retries))
        return {"status": "failed", "url": url, "error": str(e)}

def extract_keyword_intelligence(soup: BeautifulSoup, url: str, keywords: List[str]) -> List[KeywordData]:
    """Extract keyword data from page content."""
    # Remove script and style elements
    for script in soup(["script", "style", "nav", "footer", "header"]):
        script.decompose()
    
    # Get clean text content
    text = soup.get_text()
    text = re.sub(r'\\s+', ' ', text).strip().lower()
    
    domain = urlparse(url).netloc
    keyword_data = []
    
    for keyword in keywords:
        keyword_lower = keyword.lower()
        
        # Find all occurrences
        pattern = r'\\b' + re.escape(keyword_lower) + r'\\b'
        matches = list(re.finditer(pattern, text))
        
        if matches:
            # Extract context around each match
            contexts = []
            for match in matches[:5]:  # Limit to first 5 for performance
                start = max(0, match.start() - 100)
                end = min(len(text), match.end() + 100)
                context = text[start:end].strip()
                contexts.append(context)
            
            keyword_data.append(KeywordData(
                keyword=keyword,
                frequency=len(matches),
                context=contexts,
                url=url,
                domain=domain
            ))
    
    return keyword_data

# Stream 2: SERP Tracking
@app.task(bind=True, max_retries=3)
def track_serp_rankings(self, keyword: str, search_engine: str = "google"):
    """Track SERP positions for a keyword."""
    
    time.sleep(random.uniform(5, 10))  # Longer delay for search engines
    
    try:
        crawler = SEOCrawler()
        
        if search_engine == "google":
            search_url = f"<https://www.google.com/search?q={keyword}&num=20>"
        else:  # Bing
            search_url = f"<https://www.bing.com/search?q={keyword}&count=20>"
        
        # Special headers for search engines
        headers = crawler._get_headers()
        headers.update({
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Referer": "<https://www.google.com/>" if search_engine == "google" else "<https://www.bing.com/>"
        })
        
        response = crawler.session.get(search_url, headers=headers, timeout=30)
        response.raise_for_status()
        
        # Parse SERP results
        soup = BeautifulSoup(response.text, 'html.parser')
        serp_data = parse_serp_results(soup, keyword, search_engine)
        
        # Store SERP data
        store_serp_data(serp_data)
        
        return {
            "status": "success",
            "keyword": keyword,
            "results_found": len(serp_data),
            "search_engine": search_engine
        }
        
    except Exception as e:
        logging.error(f"SERP tracking failed for '{keyword}': {e}")
        if self.request.retries < self.max_retries:
            raise self.retry(countdown=120 * (2 ** self.request.retries))
        return {"status": "failed", "keyword": keyword, "error": str(e)}

def parse_serp_results(soup: BeautifulSoup, keyword: str, search_engine: str) -> List[SERPResult]:
    """Parse search engine results page."""
    results = []
    position = 1
    
    if search_engine == "google":
        # Google result selectors
        result_elements = soup.select('div.g')
        
        for element in result_elements:
            title_elem = element.select_one('h3')
            link_elem = element.select_one('a[href]')
            snippet_elem = element.select_one('.VwiC3b, .s3v9rd')
            
            if title_elem and link_elem:
                url = link_elem.get('href', '')
                if url.startswith('/url?q='):
                    url = url.split('/url?q=')[1].split('&')[0]
                
                results.append(SERPResult(
                    keyword=keyword,
                    position=position,
                    title=title_elem.get_text(strip=True),
                    url=url,
                    snippet=snippet_elem.get_text(strip=True) if snippet_elem else "",
                    domain=urlparse(url).netloc if url else ""
                ))
                position += 1
                
                if position > 20:  # Limit to top 20
                    break
    
    else:  # Bing
        result_elements = soup.select('.b_algo')
        
        for element in result_elements:
            title_elem = element.select_one('h2 a')
            snippet_elem = element.select_one('.b_caption p')
            
            if title_elem:
                url = title_elem.get('href', '')
                
                results.append(SERPResult(
                    keyword=keyword,
                    position=position,
                    title=title_elem.get_text(strip=True),
                    url=url,
                    snippet=snippet_elem.get_text(strip=True) if snippet_elem else "",
                    domain=urlparse(url).netloc if url else ""
                ))
                position += 1
                
                if position > 20:
                    break
    
    return results

# Data storage functions
def store_keyword_data(keyword_data: List[KeywordData]):
    """Store keyword intelligence in database."""
    for kd in keyword_data:
        data = {
            "keyword": kd.keyword,
            "frequency": kd.frequency,
            "context": kd.context,
            "url": kd.url,
            "domain": kd.domain,
            "crawled_at": time.time()
        }
        # Store in your preferred database (PostgreSQL, MongoDB, etc.)
        redis_client.lpush(f"keyword_data:{kd.keyword}", json.dumps(data))
        print(f"Stored: {kd.keyword} found {kd.frequency} times on {kd.domain}")

def store_serp_data(serp_data: List[SERPResult]):
    """Store SERP tracking data."""
    for result in serp_data:
        data = {
            "keyword": result.keyword,
            "position": result.position,
            "title": result.title,
            "url": result.url,
            "snippet": result.snippet,
            "domain": result.domain,
            "tracked_at": time.time()
        }
        redis_client.lpush(f"serp_data:{result.keyword}", json.dumps(data))
        print(f"SERP: '{result.keyword}' -> #{result.position} {result.domain}")

# Orchestration functions
def start_content_intelligence_crawl(urls: List[str], keywords: List[str]):
    """Launch content crawling across 1M+ URLs."""
    print(f"Starting content intelligence crawl for {len(urls)} URLs...")
    
    for url in urls:
        crawl_content_for_keywords.delay(url, keywords)
    
    print(f"Queued {len(urls)} content crawling tasks")

def start_serp_tracking(keywords: List[str], search_engines: List[str] = ["google", "bing"]):
    """Launch SERP tracking for target keywords."""
    print(f"Starting SERP tracking for {len(keywords)} keywords...")
    
    for keyword in keywords:
        for engine in search_engines:
            track_serp_rankings.delay(keyword, engine)
    
    print(f"Queued {len(keywords) * len(search_engines)} SERP tracking tasks")

# Example usage
if __name__ == "__main__":
    # Target keywords for analysis
    target_keywords = [
        "artificial intelligence", "machine learning", "data science",
        "cloud computing", "cybersecurity", "digital transformation"
    ]
    
    # URLs to crawl for content intelligence (load from your database)
    content_urls = [
        "<https://techcrunch.com/ai>",
        "<https://venturebeat.com/ai>",
        "<https://competitor-blog.com/insights>",
        # ... 999,997 more URLs
    ]
    
    # Keywords to track in SERPs
    serp_keywords = [
        "best AI tools 2025", "enterprise machine learning",
        "data analytics platform", "cloud security solutions"
    ]
    
    # Launch both crawling streams
    start_content_intelligence_crawl(content_urls, target_keywords)
    start_serp_tracking(serp_keywords)

Основные производственные соображения:

  • Интеллектуальная дедупликация: Система использует Redis с 24-часовым сроком действия, чтобы избежать ежедневного повторного поиска одного и того же контента. Для более глубокой дедупликации можно использовать хэширование контента, чтобы обнаружить страницы, изменившие URL-адреса, но сохранившие прежнее содержание.
  • Ограничение скорости с учетом домена: Поиск в SERP требует особой осторожности, поскольку поисковые системы более агрессивно относятся к блокировке. Наш пример включает более длительные задержки (5-10 секунд) для поисковых запросов по сравнению с просмотром контента (3-7 секунд).
  • Отслеживание особенностей SERP: Парсер работает с результатами Google и Bing, но вы можете расширить его, чтобы отслеживать особенные сниппеты, локальные пакеты и другие особенности SERP, которые влияют на вашу стратегию видимости.
  • Интеграция конвейера данных: Храните результаты в предпочтительной базе данных (PostgreSQL для реляционного анализа, MongoDB для гибких схем).

Лучшие практики

Уважайте robots.txt или столкнитесь с последствиями

Проанализируйте robots.txt, прежде чем ставить URL в очередь, и неукоснительно соблюдайте директивы crawl-delay. Игнорирование этих указаний приведет к тому, что весь ваш IP-адрес попадет в черный список быстрее, чем вы скажете “распределенный краулер”. Встраивайте проверку robots.txt непосредственно в границу URL и не возлагайте ее на рабочий узел.

Помимо соблюдения требований robots.txt, вам также следует внедрить комплексные стратегии предотвращения обнаружения во всем распределенном парке.

Всегда ведите журнал для отладки в 3 часа ночи

Когда ваш краулер умирает в полночь, вам нужны метаданные: URL, HTTP-статус, задержка, ID прокси, ID работника и временная метка для каждого запроса. JSON-структурированные журналы спасут ваш рассудок. Вопрос не в том, понадобится ли вам отлаживать производственный сбой, а в том, когда.

Проверяйте все, не доверяйте ничему

Проверка схемы извлеченных данных необходима для выживания ваших распределенных веб-краулеров, поскольку всего один неверный ответ может отравить весь ваш набор данных. Проверяйте типы полей, обязательные поля и свежесть данных при вводе. Выловите мусор на ранней стадии или обнаружите, что он портит ваш анализ спустя месяцы.

Боритесь с долгами за скорость безжалостно

Распределенные системы быстро устаревают. Вам нужно планировать ежемесячную очистку устаревших ключей Redis, неработающих очередей задач и осиротевших рабочих процессов. Мертвые URL накапливаются, пулы прокси-серверов загрязняются заблокированными IP-адресами, а утечки памяти рабочих процессов со временем усугубляются. Техническое обслуживание – не самое приятное занятие, но оно поддерживает здоровье краулера. Технический долг в краулерах увеличивается в геометрической прогрессии, поэтому устраняйте его до того, как он сломает вашу систему.

Общие ловушки распределенного краулинга и способы их избежать

При использовании распределенного веб-кроулинга существует множество распространенных ошибок, с которыми сталкиваются люди, поэтому большинство инженеров ищут альтернативные варианты, такие как наборы данных Bright Data. Некоторые из них включают в себя:

Ловушка “единственной точки отказа”

Строить все вокруг одного экземпляра Redis или главного координатора – плохая идея. Когда он умрет, вся ваша работа остановится.

Исправление: Используйте кластер Redis или несколько экземпляров брокера. Предусмотрите исчезновение координатора, поэтому рабочие должны изящно справляться с перебоями в работе брокера и автоматически переподключаться.

Спираль смерти от повторных попыток

Когда неудачные URL-адреса сразу же возвращаются в основную очередь, это создает бесконечный цикл, который бьет по неработающим конечным точкам и засоряет ваш конвейер.

Исправление: Раздельные очереди повторных попыток с экспоненциальным отступлением. Первая повторная попытка через 1 минуту, затем через 5, затем через 30. После трех неудач отправляйте в очередь мертвых писем для ручного просмотра.

Заблуждение о том, что все работники равны

Распределение задач по кругу предполагает, что у каждого работника одинаковая скорость сети, качество прокси и вычислительная мощность. Реальность часто оказывается сложнее.

Исправление: Внедрите оценку работников на основе коэффициента успешности, задержки и пропускной способности. Передавайте более сложные задания лучшим исполнителям.

Бомба замедленного действия с утечкой памяти

Рабочие механизмы, которые никогда не перезапускаются, накапливают утечки памяти, особенно при разборе неправильно сформированного HTML или обработке больших ответов. Если оставить их в покое, производительность распределенного веб-краулинга будет снижаться до тех пор, пока рабочие не выйдут из строя.

Исправить: Перезапуск рабочих после обработки 1000 заданий или каждые 4 часа. Следите за использованием памяти и внедряйте автоматические выключатели.

Заключение

Теперь у вас есть план распределенного сбора информации, который позволяет масштабировать систему до миллионов страниц. Чтобы углубить свое понимание основ веб-кроулинга, лежащих в основе распределенных систем, прочитайте наш полный обзор веб-кроулеров.

Архитектура проста, но жестокая правда заключается в том, что 90 % команд все равно терпят неудачу, потому что недооценивают сложность защиты от обнаружения распределенной системы веб-ползания. Управление тысячами прокси-серверов, ротация отпечатков пальцев и обработка CAPTCHA превращается в кошмар инженера, который отвлекает от извлечения ценных данных.

Именно для этого и существует API Web Unlocker от Bright Data. Вместо того чтобы тратить месяцы на создание прокси-инфраструктуры, которая ломается каждую неделю, ваши распределенные сотрудники просто направляют запросы через API Web Unlocker с 99 %+ коэффициентом успешности.

Никакого управления прокси, никакого вращения отпечатков пальцев, никакого решения CAPTCHA – только надежное извлечение данных в масштабе. Ваша команда инженеров сосредоточится на создании бизнес-логики, а Bright Data займется игрой в кошки-мышки с системами защиты от ботов.

Математика проста: домашние средства защиты от обнаружения стоят месяцы инженерного времени и постоянных проблем с обслуживанием, в то время как Web Unlocker стоит в разы меньше, обеспечивая при этом надежность корпоративного уровня. Так что перестаньте изобретать колесо и начните извлекать полезную информацию. Получите бесплатную учетную запись Bright Data сегодня и превратите распределенный краулер из бремени обслуживания в конкурентное преимущество.