How to extract from Common Crawl (practice)

Since we now know what steps should we do in order to extract data from Common Crawl and how they map to cmoncrawl primitives, let’s now see how to do it in practice.

Pipeline

We already know how to get the domain records and we also know how to download, extract and save the data. The pipeline allows use to combine all but the first step into single object that can be used to extract data from Common Crawl.

To create a pipeline simply initialize cmoncrawl.processor.pipeline.pipeline.ProcessorPipeline with Downloader, Router and Streamer. You can then call it’s cmoncrawl.processor.pipeline.pipeline.ProcessorPipeline.process_domain_record() method with the query and it will run the whole pipeline for single domain record.

Note

The exceptions are not handled by the pipeline and are passed to the caller, to handle them as you wish.

Simulatenous querying and extracting

Now all we need to resolve is how t effectively connect querying index and download/extracting (pipeline) data. One way is to query index and whenever we get a domain record, we can pass it to the pipeline, this is exactly how cmoncrawl.integrations.middleware.synchronized.query_and_extract() works. This works great when we use Gateway DAO, as the querying index takes about the same time as downloading/extracting. This is how we can do it:

Simultaneously query and extract data from Common Crawl
from typing import Any, Dict
from bs4 import BeautifulSoup
from cmoncrawl.aggregator.gateway_query import GatewayAggregator
from cmoncrawl.processor.pipeline.extractor import BaseExtractor
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.processor.pipeline.downloader import AsyncDownloader
from cmoncrawl.processor.pipeline.router import Router
from cmoncrawl.processor.pipeline.streamer import StreamerFileJSON
from cmoncrawl.common.loggers import all_purpose_logger
from cmoncrawl.common.types import MatchType, PipeMetadata
from cmoncrawl.middleware.synchronized import query_and_extract
from cmoncrawl.processor.dao.s3 import S3Dao
from pathlib import Path


class YourCustomExtractor(BaseExtractor):
    def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata) -> Dict[str, Any] | None:
        return {"title": "Dummy"}

your_custom_extractor = YourCustomExtractor()

# We register our custom extractor to the router
router = Router()
router.load_extractor("ext", your_custom_extractor)
router.register_route("ext", ".*bbc.com.*")
streamer = StreamerFileJSON(Path("extracted"), max_directory_size=1000, max_file_size=100)

async with S3Dao(aws_profile="dev") as dao:
    downloader = AsyncDownloader(dao)
    pipeline = ProcessorPipeline(downloader=downloader, router=router, outstreamer=streamer)

    index_agg = GatewayAggregator(
        urls=["bbc.com"],
        match_type=MatchType.DOMAIN,
        limit=1000,
    )

    processed_urls = await query_and_extract(index_agg, pipeline)

Query records and then extract

The otherway is to query index for all records and download/extract them afterwards. This approach works great with Athena as the query takes around 1-2 minutes. With this approach we can than abuse both multiprocessing to process and asyncio queues to download the data faster. This is how we can do it:

Query and extract data from Common Crawl
from cmoncrawl.aggregator.athena_query import AthenaAggregator
from cmoncrawl.common.types import MatchType
from typing import Any, Dict
from bs4 import BeautifulSoup
from cmoncrawl.aggregator.gateway_query import GatewayAggregator
from cmoncrawl.processor.pipeline.extractor import BaseExtractor
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.processor.pipeline.downloader import AsyncDownloader
from cmoncrawl.processor.pipeline.router import Router
from cmoncrawl.processor.pipeline.streamer import StreamerFileJSON
from cmoncrawl.common.loggers import all_purpose_logger
from cmoncrawl.common.types import MatchType, PipeMetadata
from cmoncrawl.middleware.synchronized import extract
from cmoncrawl.processor.dao.s3 import S3Dao
from pathlib import Path

# Query
records = []
async with AthenaAggregator(urls=["bbc.com"],
    match_type=MatchType.DOMAIN,
    limit=1000,
    bucket_name="test-dev-cmoncrawl",
    aws_profile="dev"
) as agg:
    async for record in agg:
        records.append(record)

#Then extract



class YourCustomExtractor(BaseExtractor):
    def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata) -> Dict[str, Any] | None:
        return {"title": "Dummy"}

your_custom_extractor = YourCustomExtractor()

# We register our custom extractor to the router
router = Router()
router.load_extractor("ext", your_custom_extractor)
router.register_route("ext", ".*bbc.com.*")
streamer = StreamerFileJSON(Path("extracted"), max_directory_size=1000, max_file_size=100)

async with S3Dao(aws_profile="dev") as dao:
    downloader = AsyncDownloader(dao)
    pipeline = ProcessorPipeline(downloader=downloader, router=router, outstreamer=streamer)

    index_agg = GatewayAggregator(
        urls=["bbc.com"],
        match_type=MatchType.DOMAIN,
        limit=1000,
    )

    processed_urls = await extract(pipeline=pipeline, records=[(rec, {}) for rec in records])

To leverage multiprocessing, simply divide the records into n chunks and for each chunk initialize a new process.

Distributed Simulatenous high-throughput querying and extracting

Lastly you can leverage cmoncrawl.middleware.stompware.StompAggregator to query and send data to queue using stomp protocol, and simulatenous retrieve the data from the queue and extract it using cmoncrawl.middleware.stompware.StompProcessor.

Be cooperative

If you plan to use multiprocessing or distributed approach, please try to be nice to others and limit the number of requests at Downloader/Aggregator accordingly.