Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
Zipkin Reporter buffers and sends trace data collected from tracer libraries to a Zipkin compatible backend.

This repository includes a Java reporting library with transport-specific senders.
Transport options include HTTP, Apache ActiveMQ, Apache Kafka, gRPC, RabbitMQ
and Scribe (Apache Thrift). Requires JRE 6 or later.
Transport options include HTTP, Apache ActiveMQ, Apache Kafka, gRPC, RabbitMQ, Scribe (Apache Thrift)
and Apache Pulsar. Requires JRE 6 or later.

# Usage
These components can be called when spans have been recorded and ready to send to zipkin.
Expand Down
2 changes: 1 addition & 1 deletion activemq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<artifactId>zipkin-reporter-parent</artifactId>
<groupId>io.zipkin.reporter2</groupId>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ final class ActiveMQContainer extends GenericContainer<ActiveMQContainer> {
static final int ACTIVEMQ_PORT = 61616;

ActiveMQContainer() {
super(parse("ghcr.io/openzipkin/zipkin-activemq:3.1.1"));
super(parse("ghcr.io/openzipkin/zipkin-activemq:3.4.3"));
withExposedPorts(ACTIVEMQ_PORT);
waitStrategy = Wait.forListeningPorts(ACTIVEMQ_PORT);
withStartupTimeout(Duration.ofSeconds(60));
Expand Down
2 changes: 1 addition & 1 deletion amqp-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-parent</artifactId>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-sender-amqp-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ final class RabbitMQContainer extends GenericContainer<RabbitMQContainer> {
static final int RABBIT_PORT = 5672;

RabbitMQContainer() {
super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:3.1.1"));
super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:3.4.3"));
withExposedPorts(RABBIT_PORT);
waitStrategy = Wait.forLogMessage(".*Server startup complete.*", 1);
withStartupTimeout(Duration.ofSeconds(60));
Expand Down
8 changes: 7 additions & 1 deletion benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-parent</artifactId>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
</parent>

<artifactId>benchmarks</artifactId>
Expand Down Expand Up @@ -109,6 +109,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-sender-pulsar-client</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class KafkaSenderBenchmarks extends SenderBenchmarks {

static final class KafkaContainer extends GenericContainer<KafkaContainer> {
KafkaContainer() {
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.1.1"));
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.4.3"));
waitStrategy = Wait.forHealthcheck();
// Kafka broker listener port (19092) needs to be exposed for test cases to access it.
addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.reporter;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import zipkin2.reporter.internal.SenderBenchmarks;
import zipkin2.reporter.pulsar.PulsarSender;

import java.time.Duration;
import java.util.Collections;

import static org.testcontainers.utility.DockerImageName.parse;

public class PulsarSenderBenchmarks extends SenderBenchmarks {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add to the description the run of this benchmark in triple backticks

static final Logger LOGGER = LoggerFactory.getLogger(PulsarSenderBenchmarks.class);

static final class PulsarContainer extends GenericContainer<PulsarContainer> {
static final int BROKER_PORT = 6650;
static final int BROKER_HTTP_PORT = 8080;

PulsarContainer() {
super(parse("ghcr.io/openzipkin/zipkin-pulsar:3.4.3"));
withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
String cmd = "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf " +
"&& bin/pulsar standalone " +
"--no-functions-worker -nss";
withEnv("PULSAR_MEM", "-Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"); // limit memory usage
waitStrategy = new HttpWaitStrategy()
.forPort(BROKER_HTTP_PORT)
.forStatusCode(200)
.forPath("/admin/v2/clusters")
.withStartupTimeout(Duration.ofSeconds(120));
withCommand("/bin/bash", "-c", cmd);
withLogConsumer(new Slf4jLogConsumer(LOGGER));
}

String serviceUrl() {
return "pulsar://" + getHost() + ":" + getMappedPort(BROKER_PORT);
}
}

PulsarContainer pulsar;
Consumer<byte[]> consumer;

@Override protected BytesMessageSender createSender() throws PulsarClientException {
pulsar = new PulsarContainer();
pulsar.start();

String topicName = "zipkin";
PulsarSender sender = PulsarSender.newBuilder().topic(topicName)
.serviceUrl(pulsar.serviceUrl()).build();

sender.send(Collections.emptyList());
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.serviceUrl())
.build()) {
consumer = client.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName(topicName)
.subscribe();
}

return sender;
}

@Override protected void afterSenderClose() {
pulsar.stop();
}

// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + PulsarSenderBenchmarks.class.getSimpleName() + ".*")
.build();

new Runner(opt).run();
}
}

7 changes: 6 additions & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-bom</artifactId>
<name>Zipkin Reporter BOM</name>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
<packaging>pom</packaging>
<description>Bill Of Materials POM for all Zipkin reporter artifacts</description>

Expand Down Expand Up @@ -136,6 +136,11 @@
<artifactId>zipkin-reporter-metrics-micrometer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-sender-pulsar-client</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
2 changes: 1 addition & 1 deletion brave/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-parent</artifactId>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-parent</artifactId>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-reporter</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-parent</artifactId>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-sender-kafka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ final class KafkaContainer extends GenericContainer<KafkaContainer> {
static final int KAFKA_PORT = 19092;

KafkaContainer() {
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.1.1"));
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.4.3"));
waitStrategy = Wait.forHealthcheck();
// Kafka broker listener port (19092) needs to be exposed for test cases to access it.
addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP);
Expand Down
2 changes: 1 addition & 1 deletion libthrift/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-parent</artifactId>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-sender-libthrift</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ final class ZipkinContainer extends GenericContainer<ZipkinContainer> {
static final int HTTP_PORT = 9411;

ZipkinContainer() {
super(parse("ghcr.io/openzipkin/zipkin:3.1.1"));
super(parse("ghcr.io/openzipkin/zipkin:3.4.3"));
// zipkin-server disables scribe by default.
withEnv("COLLECTOR_SCRIBE_ENABLED", "true");
withExposedPorts(SCRIBE_PORT, HTTP_PORT);
Expand Down
2 changes: 1 addition & 1 deletion metrics-micrometer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-parent</artifactId>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-reporter-metrics-micrometer</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion okhttp3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-parent</artifactId>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-sender-okhttp3</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter-parent</artifactId>
<version>3.4.4-SNAPSHOT</version>
<version>3.5.0-SNAPSHOT</version>
<packaging>pom</packaging>

<modules>
Expand All @@ -25,6 +25,7 @@
<module>spring-beans</module>
<module>brave</module>
<module>metrics-micrometer</module>
<module>pulsar-client</module>
</modules>

<properties>
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client/bnd.bnd
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Import-Package: \
*
Export-Package: \
zipkin2.reporter.pulsar
91 changes: 91 additions & 0 deletions pulsar-client/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright The OpenZipkin Authors
SPDX-License-Identifier: Apache-2.0

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>zipkin-reporter-parent</artifactId>
<groupId>io.zipkin.reporter2</groupId>
<version>3.5.0-SNAPSHOT</version>
</parent>

<artifactId>zipkin-sender-pulsar-client</artifactId>
<name>Zipkin Sender: Pulsar Client 4.x</name>

<properties>
<!-- Matches Export-Package in bnd.bnd -->
<module.name>zipkin2.reporter.pulsar</module.name>

<main.basedir>${project.basedir}/..</main.basedir>
<pulsar-client.version>4.0.2</pulsar-client.version>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-reporter</artifactId>
<version>${project.version}</version>
<!-- Senders don't use zipkin types. Excluding allows brave users to
avoid them by default. -->
<exclusions>
<exclusion>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good. sender name matches the artifact here!

<version>${pulsar-client.version}</version>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
<profile>
<id>release</id>
<properties>
<!-- pulsar 4.0+ is Java 8 bytecode -->
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<maven.compiler.release>8</maven.compiler.release>
</properties>
<build>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>${maven-enforcer-plugin.version}</version>
<executions>
<execution>
<id>enforce-java</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<!-- The only LTS JDK we support that can compile 8 bytecode is 11.
https://www.oracle.com/java/technologies/javase/17-relnote-issues.html -->
<requireJavaVersion>
<version>[11,12)</version>
</requireJavaVersion>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Loading