Skip to content
Draft

WIP #7843

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions scripts/consumer_performance_benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from concurrent.futures import ProcessPoolExecutor
from logging import getLogger

from snuba.cli.rust_consumer import rust_consumer_impl

NUM_CONSUMERS = 4

logger = getLogger(__name__)

if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=NUM_CONSUMERS) as executor:
for i in range(NUM_CONSUMERS):
logger.info(f"starting consumer {i}")
executor.submit(
rust_consumer_impl,
("eap_items",),
"eap_items_group",
no_strict_offset_reset=True,
auto_offset_reset="latest",
enforce_schema=True,
)
121 changes: 86 additions & 35 deletions snuba/cli/rust_consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import sys
from dataclasses import asdict
from typing import Optional, Sequence
from typing import Sequence

import click

Expand All @@ -26,7 +26,6 @@
)
@click.option(
"--auto-offset-reset",
default="earliest",
type=click.Choice(["error", "earliest", "latest"]),
help="Kafka consumer auto offset reset.",
)
Expand All @@ -37,13 +36,11 @@
)
@click.option(
"--queued-max-messages-kbytes",
default=settings.DEFAULT_QUEUED_MAX_MESSAGE_KBYTES,
type=int,
help="Maximum number of kilobytes per topic+partition in the local consumer queue.",
)
@click.option(
"--queued-min-messages",
default=settings.DEFAULT_QUEUED_MIN_MESSAGES,
type=int,
help="Minimum number of messages per topic+partition librdkafka tries to maintain in the local consumer queue.",
)
Expand Down Expand Up @@ -76,13 +73,11 @@
)
@click.option(
"--max-batch-size",
default=settings.DEFAULT_MAX_BATCH_SIZE,
type=int,
help="Max number of messages to batch in memory before writing to Kafka.",
)
@click.option(
"--max-batch-time-ms",
default=settings.DEFAULT_MAX_BATCH_TIME_MS,
type=int,
help="Max length of time to buffer messages in memory before writing to Kafka.",
)
Expand All @@ -91,7 +86,6 @@
"log_level",
type=click.Choice(["error", "warn", "info", "debug", "trace"], False),
help="Logging level to use.",
default="info",
)
@click.option(
"--concurrency",
Expand All @@ -107,48 +101,40 @@
"use_rust_processor",
is_flag=True,
help="Use the Rust (if available) or Python message processor",
default=True,
)
@click.option(
"--group-instance-id",
type=str,
default=None,
help="Kafka group instance id. passing a value here will run kafka with static membership.",
)
@click.option(
"--python-max-queue-depth",
type=int,
default=None,
help="How many messages should be queued up in the Python message processor before backpressure kicks in. Defaults to the number of processes.",
)
@click.option(
"--max-poll-interval-ms",
type=int,
default=30000,
)
@click.option(
"--async-inserts",
is_flag=True,
default=False,
help="Enable async inserts for ClickHouse",
)
@click.option(
"--max-dlq-buffer-length",
type=int,
default=25000,
help="Set a per-partition limit to the length of the DLQ buffer",
)
@click.option(
"--health-check-file",
default=None,
type=str,
help="Arroyo will touch this file at intervals to indicate health. If not provided, no health check is performed.",
)
@click.option(
"--enforce-schema",
type=bool,
is_flag=True,
default=False,
help="Enforce schema on the raw events topic.",
)
@click.option(
Expand All @@ -159,36 +145,30 @@
@click.option(
"--batch-write-timeout-ms",
type=int,
default=None,
help="Optional timeout for batch writer client connecting and sending request to Clickhouse",
)
@click.option(
"--quantized-rebalance-consumer-group-delay-secs",
type=int,
default=None,
help="Quantized rebalancing means that during deploys, rebalancing is triggered across all pods within a consumer group at the same time. The value is used by the pods to align their group join/leave activity to some multiple of the delay",
)
@click.option(
"--join-timeout-ms",
type=int,
default=1000,
help="number of milliseconds to wait for the current batch to be flushed by the consumer in case of rebalance",
)
@click.option(
"--health-check",
default="arroyo",
type=click.Choice(["snuba", "arroyo"]),
help="Specify which health check to use for the consumer. If not specified, the default Arroyo health check is used.",
)
@click.option(
"--use-row-binary",
is_flag=True,
default=False,
help="Use RowBinary format for ClickHouse inserts instead of JSONEachRow. Currently only supported for EAPItemsProcessor.",
)
@click.option(
"--consumer-version",
default="v2",
type=click.Choice(["v1", "v2"]),
help="DEPRECATED: value is ignored.",
)
Expand All @@ -200,32 +180,103 @@ def rust_consumer(
no_strict_offset_reset: bool,
queued_max_messages_kbytes: int,
queued_min_messages: int,
raw_events_topic: Optional[str],
commit_log_topic: Optional[str],
replacements_topic: Optional[str],
raw_events_topic: str | None,
commit_log_topic: str | None,
replacements_topic: str | None,
bootstrap_servers: Sequence[str],
commit_log_bootstrap_servers: Sequence[str],
replacement_bootstrap_servers: Sequence[str],
max_batch_size: int,
max_batch_time_ms: int,
log_level: str,
concurrency: Optional[int],
clickhouse_concurrency: Optional[int],
concurrency: int | None,
clickhouse_concurrency: int | None,
use_rust_processor: bool,
group_instance_id: Optional[str],
group_instance_id: str | None,
max_poll_interval_ms: int,
async_inserts: bool,
health_check: str,
python_max_queue_depth: Optional[int],
health_check_file: Optional[str],
python_max_queue_depth: int | None,
health_check_file: str | None,
enforce_schema: bool,
stop_at_timestamp: Optional[int],
batch_write_timeout_ms: Optional[int],
max_dlq_buffer_length: Optional[int],
quantized_rebalance_consumer_group_delay_secs: Optional[int],
join_timeout_ms: Optional[int],
stop_at_timestamp: int | None,
batch_write_timeout_ms: int | None,
max_dlq_buffer_length: int | None,
quantized_rebalance_consumer_group_delay_secs: int | None,
join_timeout_ms: int | None,
use_row_binary: bool,
consumer_version: Optional[str],
consumer_version: str | None,
) -> None:
"""
Experimental alternative to `snuba consumer`
"""
rust_consumer_impl(
storage_names=storage_names,
consumer_group=consumer_group,
auto_offset_reset=auto_offset_reset,
no_strict_offset_reset=no_strict_offset_reset,
queued_max_messages_kbytes=queued_max_messages_kbytes,
queued_min_messages=queued_min_messages,
raw_events_topic=raw_events_topic,
commit_log_topic=commit_log_topic,
replacements_topic=replacements_topic,
bootstrap_servers=bootstrap_servers,
commit_log_bootstrap_servers=commit_log_bootstrap_servers,
replacement_bootstrap_servers=replacement_bootstrap_servers,
max_batch_size=max_batch_size,
max_batch_time_ms=max_batch_time_ms,
log_level=log_level,
concurrency=concurrency,
clickhouse_concurrency=clickhouse_concurrency,
use_rust_processor=use_rust_processor,
group_instance_id=group_instance_id,
max_poll_interval_ms=max_poll_interval_ms,
async_inserts=async_inserts,
health_check=health_check,
python_max_queue_depth=python_max_queue_depth,
health_check_file=health_check_file,
enforce_schema=enforce_schema,
stop_at_timestamp=stop_at_timestamp,
batch_write_timeout_ms=batch_write_timeout_ms,
max_dlq_buffer_length=max_dlq_buffer_length,
quantized_rebalance_consumer_group_delay_secs=quantized_rebalance_consumer_group_delay_secs,
join_timeout_ms=join_timeout_ms,
use_row_binary=use_row_binary,
)


def rust_consumer_impl(
storage_names: Sequence[str],
consumer_group: str,
auto_offset_reset: str = "earliest",
no_strict_offset_reset: bool = False,
queued_max_messages_kbytes: int = settings.DEFAULT_QUEUED_MAX_MESSAGE_KBYTES,
queued_min_messages: int = settings.DEFAULT_QUEUED_MIN_MESSAGES,
raw_events_topic: str | None = None,
commit_log_topic: str | None = None,
replacements_topic: str | None = None,
bootstrap_servers: Sequence[str] = (),
commit_log_bootstrap_servers: Sequence[str] = (),
replacement_bootstrap_servers: Sequence[str] = (),
max_batch_size: int = settings.DEFAULT_MAX_BATCH_SIZE,
max_batch_time_ms: int = settings.DEFAULT_MAX_BATCH_TIME_MS,
log_level: str = "info",
concurrency: int | None = None,
clickhouse_concurrency: int | None = None,
use_rust_processor: bool | None = True,
group_instance_id: str | None = None,
max_poll_interval_ms: int | None = 30000,
async_inserts: bool | None = False,
health_check: str = "arroyo",
python_max_queue_depth: int | None = None,
health_check_file: str | None = None,
enforce_schema: bool = False,
stop_at_timestamp: int | None = None,
batch_write_timeout_ms: int | None = None,
max_dlq_buffer_length: int | None = 25000,
quantized_rebalance_consumer_group_delay_secs: int | None = None,
join_timeout_ms: int | None = 1000,
use_row_binary: bool = False,
) -> None:
"""
Experimental alternative to `snuba consumer`
Expand Down
Loading