Skip to content

Commit 1607db9

Browse files
committed
#654 collecting kafka metrics
1 parent a5b4a31 commit 1607db9

File tree

7 files changed

+205
-9
lines changed

7 files changed

+205
-9
lines changed

gateleen-kafka/README_kafka.md

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,4 +448,30 @@ This sequence diagrams shows the process when messages are sent to Kafka:
448448
│ <────────────────────│ │ │ │ │ │
449449
│ │ │ │ │ │ │
450450
│ └┬┘ │ │ │ │
451-
```
451+
```
452+
453+
### Micrometer metrics
454+
The kafka feature is monitored with micrometer. The following metrics are available:
455+
* gateleen_kafka_send_success_messages_total
456+
* gateleen_kafka_send_fail_messages_total
457+
* gateleen_kafka_validation_fail_messages_total
458+
459+
Additional tags are provided to specify the topic.
460+
461+
Example metrics:
462+
463+
```
464+
# HELP gateleen_kafka_send_success_messages_total Amount of successfully sent kafka messages
465+
# TYPE gateleen_kafka_send_success_messages_total counter
466+
gateleen_kafka_send_success_messages_total{topic="my-topic-1",} 0.0
467+
gateleen_kafka_send_fail_messages_total{topic="my-topic-1",} 455.0
468+
gateleen_kafka_send_success_messages_total{topic="my-topic-2",} 256.0
469+
gateleen_kafka_send_success_messages_total{topic="my-topic-3",} 6.0
470+
gateleen_kafka_send_fail_messages_total{topic="my-topic-4",} 222.0
471+
# HELP gateleen_kafka_validation_fail_messages_total Amount of failed kafka message validations
472+
# TYPE gateleen_kafka_validation_fail_messages_total counter
473+
gateleen_kafka_validation_fail_messages_total{topic="my-topic-6",} 212.0
474+
```
475+
476+
To enable `gateleen_kafka_send_success_messages_total` and `gateleen_kafka_send_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageSender` class.
477+
To enable `gateleen_kafka_validation_fail_messages_total` metrics, set a `MeterRegistry` instance by calling `setMeterRegistry(MeterRegistry meterRegistry)` method in `KafkaMessageValidator` class.

gateleen-kafka/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
<artifactId>gateleen-validation</artifactId>
2323
<version>${project.version}</version>
2424
</dependency>
25+
<dependency>
26+
<groupId>io.micrometer</groupId>
27+
<artifactId>micrometer-core</artifactId>
28+
</dependency>
2529
<!-- TEST dependencies -->
2630
<dependency>
2731
<groupId>org.swisspush.gateleen</groupId>

gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaHandler.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class KafkaHandler extends ConfigurationResourceConsumer {
5252
private final KafkaMessageSender kafkaMessageSender;
5353
private final Map<String, Object> properties;
5454
private final KafkaProducerRecordBuilder kafkaProducerRecordBuilder;
55-
private KafkaMessageValidator kafkaMessageValidator;
55+
private final KafkaMessageValidator kafkaMessageValidator;
5656

5757
private boolean initialized = false;
5858

@@ -140,8 +140,6 @@ private Future<Void> initializeKafkaConfiguration(Buffer configuration) {
140140
Promise<Void> promise = Promise.promise();
141141
final List<KafkaConfiguration> kafkaConfigurations = KafkaConfigurationParser.parse(configuration, properties);
142142

143-
144-
145143
repository.closeAll().future().onComplete((event -> {
146144
for (KafkaConfiguration kafkaConfiguration : kafkaConfigurations) {
147145
repository.addKafkaProducer(kafkaConfiguration);

gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageSender.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.swisspush.gateleen.kafka;
22

3+
import io.micrometer.core.instrument.Counter;
4+
import io.micrometer.core.instrument.MeterRegistry;
35
import io.vertx.core.CompositeFuture;
46
import io.vertx.core.Future;
57
import io.vertx.core.Promise;
@@ -9,7 +11,9 @@
911
import org.slf4j.Logger;
1012
import org.slf4j.LoggerFactory;
1113

14+
import java.util.HashMap;
1215
import java.util.List;
16+
import java.util.Map;
1317
import java.util.function.Function;
1418

1519
import static java.util.stream.Collectors.toList;
@@ -18,6 +22,22 @@ public class KafkaMessageSender {
1822

1923
private static final Logger log = LoggerFactory.getLogger(KafkaMessageSender.class);
2024

25+
private MeterRegistry meterRegistry;
26+
private final Map<String, Counter> successSendCounterMap = new HashMap<>();
27+
private final Map<String, Counter> failSendCounterMap = new HashMap<>();
28+
29+
public static final String SUCCESS_SEND_MESSAGES_METRIC = "gateleen.kafka.send.success.messages";
30+
public static final String SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of successfully sent kafka messages";
31+
public static final String FAIL_SEND_MESSAGES_METRIC = "gateleen.kafka.send.fail.messages";
32+
public static final String FAIL_SEND_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message sendings";
33+
public static final String TOPIC = "topic";
34+
35+
public void setMeterRegistry(MeterRegistry meterRegistry) {
36+
this.meterRegistry = meterRegistry;
37+
successSendCounterMap.clear();
38+
failSendCounterMap.clear();
39+
}
40+
2141
Future<Void> sendMessages(KafkaProducer<String, String> kafkaProducer,
2242
List<KafkaProducerRecord<String, String>> messages) {
2343
Promise<Void> promise = Promise.promise();
@@ -44,7 +64,45 @@ private Future<Void> sendMessage(KafkaProducer<String, String> kafkaProducer, Ka
4464
return kafkaProducer.send(message).compose((Function<RecordMetadata, Future<Void>>) metadata -> {
4565
log.debug("Message successfully sent to kafka topic '{}' on partition {} with offset {}. Timestamp: {}",
4666
metadata.getTopic(), metadata.getPartition(), metadata.getOffset(), metadata.getTimestamp());
67+
incrementSuccessCount(metadata.getTopic());
4768
return Future.succeededFuture();
48-
}).onFailure(throwable -> log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable));
69+
}).onFailure(throwable -> {
70+
log.warn("Failed to send message with key '{}' to kafka. Cause: {}", message.key(), throwable);
71+
incrementFailCount1(message.topic());
72+
});
73+
}
74+
75+
private void incrementSuccessCount(String topic) {
76+
Counter counter = successSendCounterMap.get(topic);
77+
if(counter != null) {
78+
counter.increment();
79+
return;
80+
}
81+
82+
if(meterRegistry != null) {
83+
Counter newCounter = Counter.builder(SUCCESS_SEND_MESSAGES_METRIC)
84+
.description(SUCCESS_SEND_MESSAGES_METRIC_DESCRIPTION)
85+
.tag(TOPIC, topic)
86+
.register(meterRegistry);
87+
newCounter.increment();
88+
successSendCounterMap.put(topic, newCounter);
89+
}
90+
}
91+
92+
private void incrementFailCount1(String topic) {
93+
Counter counter = failSendCounterMap.get(topic);
94+
if(counter != null) {
95+
counter.increment();
96+
return;
97+
}
98+
99+
if(meterRegistry != null) {
100+
Counter newCounter = Counter.builder(FAIL_SEND_MESSAGES_METRIC)
101+
.description(FAIL_SEND_MESSAGES_METRIC_DESCRIPTION)
102+
.tag(TOPIC, topic)
103+
.register(meterRegistry);
104+
newCounter.increment();
105+
failSendCounterMap.put(topic, newCounter);
106+
}
49107
}
50108
}

gateleen-kafka/src/main/java/org/swisspush/gateleen/kafka/KafkaMessageValidator.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.swisspush.gateleen.kafka;
22

3+
import io.micrometer.core.instrument.Counter;
4+
import io.micrometer.core.instrument.MeterRegistry;
35
import io.vertx.core.CompositeFuture;
46
import io.vertx.core.Future;
57
import io.vertx.core.buffer.Buffer;
@@ -14,6 +16,7 @@
1416
import org.swisspush.gateleen.validation.ValidationUtil;
1517
import org.swisspush.gateleen.validation.Validator;
1618

19+
import java.util.HashMap;
1720
import java.util.List;
1821
import java.util.Map;
1922
import java.util.Optional;
@@ -26,11 +29,23 @@ public class KafkaMessageValidator {
2629
private final Validator validator;
2730
private final Logger log = LoggerFactory.getLogger(KafkaHandler.class);
2831

32+
private MeterRegistry meterRegistry;
33+
private final Map<String, Counter> failedToValidateCounterMap = new HashMap<>();
34+
35+
public static final String FAIL_VALIDATION_MESSAGES_METRIC = "gateleen.kafka.validation.fail.messages";
36+
public static final String FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION = "Amount of failed kafka message validations";
37+
public static final String TOPIC = "topic";
38+
2939
public KafkaMessageValidator(ValidationResourceManager validationResourceManager, Validator validator) {
3040
this.validationResourceManager = validationResourceManager;
3141
this.validator = validator;
3242
}
3343

44+
public void setMeterRegistry(MeterRegistry meterRegistry) {
45+
this.meterRegistry = meterRegistry;
46+
failedToValidateCounterMap.clear();
47+
}
48+
3449
public Future<ValidationResult> validateMessages(HttpServerRequest request, List<KafkaProducerRecord<String, String>> kafkaProducerRecords) {
3550
if (kafkaProducerRecords.isEmpty()) {
3651
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
@@ -49,6 +64,8 @@ public Future<ValidationResult> validateMessages(HttpServerRequest request, List
4964

5065
SchemaLocation schemaLocation = optionalSchemaLocation.get();
5166

67+
String topic = kafkaProducerRecords.get(0).topic();
68+
5269
@SuppressWarnings("rawtypes") //https://github.com/eclipse-vertx/vert.x/issues/2627
5370
List<Future> futures = kafkaProducerRecords.stream()
5471
.map(message -> validator.validateWithSchemaLocation(schemaLocation, Buffer.buffer(message.value()), log))
@@ -57,10 +74,31 @@ public Future<ValidationResult> validateMessages(HttpServerRequest request, List
5774
return CompositeFuture.all(futures).compose(compositeFuture -> {
5875
for (Object o : compositeFuture.list()) {
5976
if (((ValidationResult) o).getValidationStatus() != ValidationStatus.VALIDATED_POSITIV) {
77+
incrementValidationFailCount(topic);
6078
return Future.succeededFuture((ValidationResult) o);
6179
}
6280
}
6381
return Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV));
82+
}, throwable -> {
83+
incrementValidationFailCount(topic);
84+
return Future.failedFuture(throwable);
6485
});
6586
}
87+
88+
private void incrementValidationFailCount(String topic) {
89+
Counter counter = failedToValidateCounterMap.get(topic);
90+
if(counter != null) {
91+
counter.increment();
92+
return;
93+
}
94+
95+
if(meterRegistry != null) {
96+
Counter newCounter = Counter.builder(FAIL_VALIDATION_MESSAGES_METRIC)
97+
.description(FAIL_VALIDATION_MESSAGES_METRIC_DESCRIPTION)
98+
.tag(TOPIC, topic)
99+
.register(meterRegistry);
100+
newCounter.increment();
101+
failedToValidateCounterMap.put(topic, newCounter);
102+
}
103+
}
66104
}

gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageSenderTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.swisspush.gateleen.kafka;
22

3+
import io.micrometer.core.instrument.Counter;
4+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
35
import io.vertx.core.*;
46
import io.vertx.core.buffer.Buffer;
57
import io.vertx.core.json.JsonArray;
@@ -35,11 +37,14 @@ public class KafkaMessageSenderTest {
3537

3638
private KafkaProducer<String, String> producer;
3739
private KafkaMessageSender kafkaMessageSender;
40+
private SimpleMeterRegistry meterRegistry;
3841

3942
@Before
4043
public void setUp() {
4144
producer = Mockito.mock(KafkaProducer.class);
4245
kafkaMessageSender = new KafkaMessageSender();
46+
meterRegistry = new SimpleMeterRegistry();
47+
kafkaMessageSender.setMeterRegistry(meterRegistry);
4348
}
4449

4550
@Test
@@ -57,6 +62,9 @@ public void sendSingleMessage(TestContext context) throws ValidationException {
5762
});
5863

5964
Mockito.verify(producer, times(1)).send(eq(records.get(0)));
65+
66+
Counter counter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
67+
context.assertEquals(1.0, counter.count(), "Counter for topic `myTopic` should have been incremented by 1");
6068
}
6169

6270
@Test
@@ -74,6 +82,9 @@ public void sendSingleMessageWithoutKey(TestContext context) throws ValidationEx
7482
});
7583

7684
Mockito.verify(producer, times(1)).send(eq(records.get(0)));
85+
86+
Counter counter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
87+
context.assertEquals(1.0, counter.count(), "Counter for topic `myTopic` should have been incremented by 1");
7788
}
7889

7990
@Test
@@ -98,6 +109,9 @@ public void sendMultipleMessages(TestContext context) throws ValidationException
98109
context.assertEquals(records.get(0), recordCaptor.getAllValues().get(0));
99110
context.assertEquals(records.get(1), recordCaptor.getAllValues().get(1));
100111
context.assertEquals(records.get(2), recordCaptor.getAllValues().get(2));
112+
113+
Counter counter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
114+
context.assertEquals(3.0, counter.count(), "Counter for topic `myTopic` should have been incremented by 3");
101115
}
102116

103117
@Test
@@ -124,6 +138,12 @@ public void sendMultipleMessagesWithFailingMessage(TestContext context) throws V
124138
context.assertEquals(records.get(0), recordCaptor.getAllValues().get(0));
125139
context.assertEquals(records.get(1), recordCaptor.getAllValues().get(1));
126140
context.assertEquals(records.get(2), recordCaptor.getAllValues().get(2));
141+
142+
Counter successCounter = meterRegistry.get(KafkaMessageSender.SUCCESS_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
143+
context.assertEquals(2.0, successCounter.count(), "Success counter for topic `myTopic` should have been incremented by 2");
144+
145+
Counter failCounter = meterRegistry.get(KafkaMessageSender.FAIL_SEND_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, topic).counter();
146+
context.assertEquals(1.0, failCounter.count(), "Fail counter for topic `myTopic` should have been incremented by 1");
127147
}
128148

129149
private JsonObject buildSingleRecordPayload(String key){

gateleen-kafka/src/test/java/org/swisspush/gateleen/kafka/KafkaMessageValidatorTest.java

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package org.swisspush.gateleen.kafka;
22

3+
import io.micrometer.core.instrument.Counter;
4+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
35
import io.vertx.core.Future;
4-
import io.vertx.core.Vertx;
56
import io.vertx.core.buffer.Buffer;
67
import io.vertx.core.http.HttpMethod;
78
import io.vertx.core.http.HttpServerResponse;
@@ -36,18 +37,18 @@
3637
@RunWith(VertxUnitRunner.class)
3738
public class KafkaMessageValidatorTest {
3839

39-
private Vertx vertx;
4040
private KafkaMessageValidator messageValidator;
4141
private Validator validator;
4242
private ValidationResourceManager validationResourceManager;
43+
private SimpleMeterRegistry meterRegistry;
4344

4445
@Before
4546
public void setUp() {
46-
vertx = Vertx.vertx();
4747
validationResourceManager = Mockito.mock(ValidationResourceManager.class);
4848
validator = Mockito.mock(Validator.class);
49-
49+
meterRegistry = new SimpleMeterRegistry();
5050
messageValidator = new KafkaMessageValidator(validationResourceManager, validator);
51+
messageValidator.setMeterRegistry(meterRegistry);
5152
}
5253

5354
@Test
@@ -141,6 +142,10 @@ public void testValidateMessagesMatchingValidationResourceEntry(TestContext cont
141142
context.assertEquals(ValidationStatus.COULD_NOT_VALIDATE, event.result().getValidationStatus());
142143
verify(validationResourceManager, times(2)).getValidationResource();
143144
verify(validator, times(1)).validateWithSchemaLocation(any(), any(), any());
145+
146+
Counter counter = meterRegistry.get(KafkaMessageValidator.FAIL_VALIDATION_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, "myOtherTopic").counter();
147+
context.assertEquals(1.0, counter.count(), "Counter for topic `myOtherTopic` should have been incremented by 1");
148+
144149
async.complete();
145150
});
146151

@@ -176,6 +181,10 @@ public void testValidateMessagesWithFailInValidator(TestContext context) {
176181
context.assertTrue(event.failed());
177182
verify(validationResourceManager, times(2)).getValidationResource();
178183
verify(validator, times(2)).validateWithSchemaLocation(any(), any(), any());
184+
185+
Counter counter = meterRegistry.get(KafkaMessageValidator.FAIL_VALIDATION_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, "myOtherTopic").counter();
186+
context.assertEquals(1.0, counter.count(), "Counter for topic `myOtherTopic` should have been incremented by 1");
187+
179188
async.complete();
180189
});
181190

@@ -217,4 +226,47 @@ public void testValidateMultipleMessages(TestContext context) {
217226
});
218227

219228
}
229+
230+
@Test
231+
public void testValidateMultipleMessagesWithValidatedNegative(TestContext context) {
232+
Async async = context.async();
233+
234+
ValidationResource validationResource = new ValidationResource();
235+
validationResource.addResource(
236+
Map.of(ValidationResource.METHOD_PROPERTY, "PUT",
237+
ValidationResource.URL_PROPERTY, "/path/to/myTopic",
238+
ValidationResource.SCHEMA_LOCATION_PROPERTY, "/path/to/schema"
239+
));
240+
241+
when(validationResourceManager.getValidationResource()).thenReturn(validationResource);
242+
243+
HttpServerResponse response = spy(new StreamingResponse(new HeadersMultiMap()));
244+
StreamingRequest request = new StreamingRequest(HttpMethod.PUT, "/path/to/myTopic", "", new HeadersMultiMap(), response);
245+
246+
String payload_1 = new JsonObject().encode();
247+
String payload_2 = new JsonObject().put("foo", "bar").encode();
248+
String payload_3 = new JsonObject().put("abc", "def").encode();
249+
List<KafkaProducerRecord<String, String>> kafkaProducerRecords = new ArrayList<>();
250+
kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_1));
251+
kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_2));
252+
kafkaProducerRecords.add(KafkaProducerRecord.create("myOtherTopic", payload_3));
253+
254+
when(validator.validateWithSchemaLocation(any(), any(), any())).thenReturn(
255+
Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_POSITIV)));
256+
when(validator.validateWithSchemaLocation(any(), eq(Buffer.buffer(payload_2)), any())).thenReturn(
257+
Future.succeededFuture(new ValidationResult(ValidationStatus.VALIDATED_NEGATIV)));
258+
259+
messageValidator.validateMessages(request, kafkaProducerRecords).onComplete(event -> {
260+
context.assertTrue(event.succeeded());
261+
context.assertEquals(ValidationStatus.VALIDATED_NEGATIV, event.result().getValidationStatus());
262+
verify(validationResourceManager, times(2)).getValidationResource();
263+
verify(validator, times(3)).validateWithSchemaLocation(any(), any(), any());
264+
265+
Counter counter = meterRegistry.get(KafkaMessageValidator.FAIL_VALIDATION_MESSAGES_METRIC).tag(KafkaMessageSender.TOPIC, "myOtherTopic").counter();
266+
context.assertEquals(1.0, counter.count(), "Counter for topic `myOtherTopic` should have been incremented by 1");
267+
268+
async.complete();
269+
});
270+
271+
}
220272
}

0 commit comments

Comments
 (0)