cmoncrawl.middleware.stompware

Classes

class cmoncrawl.middleware.stompware.StompAggregator(queue_host: str, queue_port: int, url: str, index_agg: GatewayAggregator, heartbeat: int = 10000)

Aggregator that listens queries the common crawl index and sends the results to a queue using the stomp protocol. It the creates a queue with name queue.{url} and sends the results to it. It also creates a topic with name topic.poisson_pill.{url} and sends a message with type poisson_pill to it when it finishes.

Parameters:
  • queue_host (str) – The host of the queue

  • queue_port (int) – The port of the queue

  • url (str) – The url of the queue

  • index_agg (IndexAggregator) – The index aggregator

  • heartbeat (int, optional) – The heartbeat of the connection. Defaults to 10000.

async aggregate(filter_duplicates: bool = True)

Aggregates the results of the index aggregator and sends them to the queue. If filter_duplicates is True, it will use the DUPL_ID_HEADER header, which Artemis uses to filter duplicates.

class cmoncrawl.middleware.stompware.StompProcessor(queue_host: str, queue_port: int, pills_to_die: int | None, queue_size: int, timeout: int, addresses: List[str], pipeline: ProcessorPipeline, heartbeat: int = 10000)

Processor that listens to a queues and processes the messages using a pipeline. When it receives a message with type enough poisson_pill messages, it will stop listening if it doesn’t receive any messages for timeout minutes.

Parameters:
  • queue_host (str) – The host of the queue

  • queue_port (int) – The port of the queue

  • pills_to_die (int, optional) – The number of poisson_pill messages to receive before dying. Defaults to None.

  • queue_size (int) – The size of the queue

  • timeout (int) – The timeout in minutes

  • addresses (List[str]) – The addresses of the queues

  • pipeline (ProcessorPipeline) – The pipeline to use for processing

  • heartbeat (int, optional) – The heartbeat of the connection. Defaults to 10000.

class Listener(messages: Queue[Message], listener_stats: ListnerStats)
on_message(frame: Frame)

Called by the STOMP connection when a MESSAGE frame is received.

Parameters:

frame (Frame) – the stomp frame