Skip to content

Commit 42bf79a

Browse files
committed
tests: integration: cover Kafka OTLP resource partitioning
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 845990b commit 42bf79a

3 files changed

Lines changed: 98 additions & 0 deletions

File tree

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
service:
2+
flush: 5
3+
log_level: info
4+
http_server: on
5+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
6+
7+
pipeline:
8+
inputs:
9+
- name: opentelemetry
10+
port: ${FLUENT_BIT_TEST_LISTENER_PORT}
11+
12+
outputs:
13+
- name: kafka
14+
match: "*"
15+
brokers: 127.0.0.1:${TEST_SUITE_KAFKA_PORT}
16+
topics: otlp-topic
17+
format: otlp_json
18+
otlp_logs_partition_by_resource: true
19+
message_key: static-otlp-key
20+
queue_full_retries: 1
21+
rdkafka.api.version.request: false
22+
rdkafka.broker.version.fallback: 0.8.2.0
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
service:
2+
flush: 5
3+
log_level: info
4+
http_server: on
5+
http_port: ${FLUENT_BIT_HTTP_MONITORING_PORT}
6+
7+
pipeline:
8+
inputs:
9+
- name: opentelemetry
10+
port: ${FLUENT_BIT_TEST_LISTENER_PORT}
11+
12+
outputs:
13+
- name: kafka
14+
match: "*"
15+
brokers: 127.0.0.1:${TEST_SUITE_KAFKA_PORT}
16+
topics: otlp-topic
17+
format: otlp_proto
18+
otlp_logs_partition_by_resource: true
19+
message_key: static-otlp-key
20+
queue_full_retries: 1
21+
rdkafka.api.version.request: false
22+
rdkafka.broker.version.fallback: 0.8.2.0

tests/integration/scenarios/out_kafka/tests/test_out_kafka_001.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,3 +645,57 @@ def test_out_kafka_otlp_logs_preserve_resource_schema_urls_across_requests(
645645
assert body_to_schema_url["event-a"] == "schema-a"
646646
assert body_to_schema_url["event-b"] == "schema-b"
647647
assert len(resources) == 2
648+
649+
650+
@pytest.mark.parametrize(
651+
"format_name,config_file",
652+
[
653+
("otlp_json", "out_kafka_otlp_json_partition_by_resource.yaml"),
654+
("otlp_proto", "out_kafka_otlp_proto_partition_by_resource.yaml"),
655+
],
656+
)
657+
def test_out_kafka_otlp_logs_partition_by_resource(format_name, config_file):
658+
service = Service(config_file)
659+
service.start()
660+
service.send_payload_dict(
661+
_build_resource_collision_payload("user-a", "event-a"),
662+
"logs",
663+
)
664+
service.send_payload_dict(
665+
_build_resource_collision_payload("user-b", "event-b"),
666+
"logs",
667+
)
668+
669+
messages = service.wait_for_messages(2, timeout=10)
670+
service.stop()
671+
672+
assert len(messages) == 2
673+
674+
keys = {message["key"] for message in messages}
675+
assert len(keys) == 2
676+
assert b"static-otlp-key" not in keys
677+
678+
body_to_user = {}
679+
for message in messages:
680+
assert message["topic"] == "otlp-topic"
681+
assert message["key"]
682+
683+
payload = _decode_kafka_payload(message, format_name, "logs")
684+
resources = payload["resourceLogs"]
685+
assert len(resources) == 1
686+
687+
resource = resources[0]
688+
user_id = next(
689+
attribute["value"]["stringValue"]
690+
for attribute in resource["resource"]["attributes"]
691+
if attribute["key"] == "user.id"
692+
)
693+
694+
for scope in resource["scopeLogs"]:
695+
for record in scope["logRecords"]:
696+
body_to_user[record["body"]["stringValue"]] = user_id
697+
698+
assert body_to_user == {
699+
"event-a": "user-a",
700+
"event-b": "user-b",
701+
}

0 commit comments

Comments
 (0)