cmoncrawl.middleware.stompware
Classes
- class cmoncrawl.middleware.stompware.StompAggregator(queue_host: str, queue_port: int, url: str, index_agg: GatewayAggregator, heartbeat: int = 10000)
Aggregator that listens queries the common crawl index and sends the results to a queue using the stomp protocol. It the creates a queue with name queue.{url} and sends the results to it. It also creates a topic with name topic.poisson_pill.{url} and sends a message with type poisson_pill to it when it finishes.
- Parameters:
queue_host (str) – The host of the queue
queue_port (int) – The port of the queue
url (str) – The url of the queue
index_agg (IndexAggregator) – The index aggregator
heartbeat (int, optional) – The heartbeat of the connection. Defaults to 10000.
- async aggregate(filter_duplicates: bool = True)
Aggregates the results of the index aggregator and sends them to the queue. If filter_duplicates is True, it will use the DUPL_ID_HEADER header, which Artemis uses to filter duplicates.
- class cmoncrawl.middleware.stompware.StompProcessor(queue_host: str, queue_port: int, pills_to_die: int | None, queue_size: int, timeout: int, addresses: List[str], pipeline: ProcessorPipeline, heartbeat: int = 10000)
Processor that listens to a queues and processes the messages using a pipeline. When it receives a message with type enough poisson_pill messages, it will stop listening if it doesn’t receive any messages for timeout minutes.
- Parameters:
queue_host (str) – The host of the queue
queue_port (int) – The port of the queue
pills_to_die (int, optional) – The number of poisson_pill messages to receive before dying. Defaults to None.
queue_size (int) – The size of the queue
timeout (int) – The timeout in minutes
addresses (List[str]) – The addresses of the queues
pipeline (ProcessorPipeline) – The pipeline to use for processing
heartbeat (int, optional) – The heartbeat of the connection. Defaults to 10000.