How to extract from Common Crawl (practice)
Since we now know what steps should we do in order to extract data from Common Crawl and
how they map to cmoncrawl
primitives, let’s now see how to do it in practice.
Pipeline
We already know how to get the domain records and we also know how to download, extract and save the data. The pipeline allows use to combine all but the first step into single object that can be used to extract data from Common Crawl.
To create a pipeline simply initialize cmoncrawl.processor.pipeline.pipeline.ProcessorPipeline
with Downloader, Router and Streamer.
You can then call it’s cmoncrawl.processor.pipeline.pipeline.ProcessorPipeline.process_domain_record()
method with the query and it will run the whole pipeline for single domain record.
Note
The exceptions are not handled by the pipeline and are passed to the caller, to handle them as you wish.
Simulatenous querying and extracting
Now all we need to resolve is how t effectively connect querying index and download/extracting (pipeline) data.
One way is to query index and whenever we get a domain record, we can pass it to the pipeline, this is exactly how
cmoncrawl.integrations.middleware.synchronized.query_and_extract()
works. This works great when we use Gateway DAO,
as the querying index takes about the same time as downloading/extracting. This is how we can do it:
from typing import Any, Dict
from bs4 import BeautifulSoup
from cmoncrawl.aggregator.gateway_query import GatewayAggregator
from cmoncrawl.processor.pipeline.extractor import BaseExtractor
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.processor.pipeline.downloader import AsyncDownloader
from cmoncrawl.processor.pipeline.router import Router
from cmoncrawl.processor.pipeline.streamer import StreamerFileJSON
from cmoncrawl.common.loggers import all_purpose_logger
from cmoncrawl.common.types import MatchType, PipeMetadata
from cmoncrawl.middleware.synchronized import query_and_extract
from cmoncrawl.processor.dao.s3 import S3Dao
from pathlib import Path
class YourCustomExtractor(BaseExtractor):
def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata) -> Dict[str, Any] | None:
return {"title": "Dummy"}
your_custom_extractor = YourCustomExtractor()
# We register our custom extractor to the router
router = Router()
router.load_extractor("ext", your_custom_extractor)
router.register_route("ext", ".*bbc.com.*")
streamer = StreamerFileJSON(Path("extracted"), max_directory_size=1000, max_file_size=100)
async with S3Dao(aws_profile="dev") as dao:
downloader = AsyncDownloader(dao)
pipeline = ProcessorPipeline(downloader=downloader, router=router, outstreamer=streamer)
index_agg = GatewayAggregator(
urls=["bbc.com"],
match_type=MatchType.DOMAIN,
limit=1000,
)
processed_urls = await query_and_extract(index_agg, pipeline)
Query records and then extract
The otherway is to query index for all records and download/extract them afterwards. This approach works great with Athena as the query takes around 1-2 minutes. With this approach we can than abuse both multiprocessing to process and asyncio queues to download the data faster. This is how we can do it:
from cmoncrawl.aggregator.athena_query import AthenaAggregator
from cmoncrawl.common.types import MatchType
from typing import Any, Dict
from bs4 import BeautifulSoup
from cmoncrawl.aggregator.gateway_query import GatewayAggregator
from cmoncrawl.processor.pipeline.extractor import BaseExtractor
from cmoncrawl.processor.pipeline.pipeline import ProcessorPipeline
from cmoncrawl.processor.pipeline.downloader import AsyncDownloader
from cmoncrawl.processor.pipeline.router import Router
from cmoncrawl.processor.pipeline.streamer import StreamerFileJSON
from cmoncrawl.common.loggers import all_purpose_logger
from cmoncrawl.common.types import MatchType, PipeMetadata
from cmoncrawl.middleware.synchronized import extract
from cmoncrawl.processor.dao.s3 import S3Dao
from pathlib import Path
# Query
records = []
async with AthenaAggregator(urls=["bbc.com"],
match_type=MatchType.DOMAIN,
limit=1000,
bucket_name="test-dev-cmoncrawl",
aws_profile="dev"
) as agg:
async for record in agg:
records.append(record)
#Then extract
class YourCustomExtractor(BaseExtractor):
def extract_soup(self, soup: BeautifulSoup, metadata: PipeMetadata) -> Dict[str, Any] | None:
return {"title": "Dummy"}
your_custom_extractor = YourCustomExtractor()
# We register our custom extractor to the router
router = Router()
router.load_extractor("ext", your_custom_extractor)
router.register_route("ext", ".*bbc.com.*")
streamer = StreamerFileJSON(Path("extracted"), max_directory_size=1000, max_file_size=100)
async with S3Dao(aws_profile="dev") as dao:
downloader = AsyncDownloader(dao)
pipeline = ProcessorPipeline(downloader=downloader, router=router, outstreamer=streamer)
index_agg = GatewayAggregator(
urls=["bbc.com"],
match_type=MatchType.DOMAIN,
limit=1000,
)
processed_urls = await extract(pipeline=pipeline, records=[(rec, {}) for rec in records])
To leverage multiprocessing, simply divide the records into n chunks and for each chunk initialize a new process.
Distributed Simulatenous high-throughput querying and extracting
Lastly you can leverage cmoncrawl.middleware.stompware.StompAggregator
to query and send data to queue using stomp protocol,
and simulatenous retrieve the data from the queue and extract it using cmoncrawl.middleware.stompware.StompProcessor
.
Be cooperative
If you plan to use multiprocessing or distributed approach, please try to be nice to others and limit the number of requests at Downloader/Aggregator accordingly.