Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
<PackageVersion Include="Azure.Storage.Blobs" Version="12.17.0"/>
<PackageVersion Include="Azure.Storage.Queues" Version="12.15.0"/>
<PackageVersion Include="ClickHouse.Client" Version="7.9.1"/>
<PackageVersion Include="Confluent.Kafka" Version="2.0.2"/>
<PackageVersion Include="Confluent.Kafka" Version="2.8.0"/>
<PackageVersion Include="Confluent.SchemaRegistry.Serdes.Json" Version="2.8.0" />
<PackageVersion Include="Confluent.SchemaRegistry" Version="2.8.0" />
<PackageVersion Include="Consul" Version="1.6.10.9"/>
<PackageVersion Include="CouchbaseNetClient" Version="3.6.4"/>
<PackageVersion Include="DotPulsar" Version="3.3.2"/>
Expand Down
55 changes: 52 additions & 3 deletions src/Testcontainers.Kafka/KafkaBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ public sealed class KafkaBuilder : ContainerBuilder<KafkaBuilder, KafkaContainer

public const ushort BrokerPort = 9093;

public const ushort ControllerPort = 9094;

public const ushort ZookeeperPort = 2181;

public const string StartupScriptFilePath = "/testcontainers.sh";

private const string ProtocolPrefix = "TC";

/// <summary>
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
/// </summary>
Expand Down Expand Up @@ -43,6 +47,48 @@ public override KafkaContainer Build()
return new KafkaContainer(DockerResourceConfiguration);
}

/// <summary>
/// Adds a listener to the Kafka configuration in the format <c>host:port</c>.
/// </summary>
/// <remarks>
/// The host will be included as a network alias, allowing additional connections
/// to the Kafka broker within the same container network.
///
/// This method is useful for registering custom listeners beyond the default ones,
/// enabling specific connection points for Kafka brokers.
///
/// Default listeners include:
/// - <c>PLAINTEXT://0.0.0.0:9092</c>
/// - <c>BROKER://0.0.0.0:9093</c>
/// - <c>CONTROLLER://0.0.0.0:9094</c>
/// </remarks>
/// <param name="kafka">The MsSql database.</param>
/// <returns>A configured instance of <see cref="KafkaBuilder" />.</returns>
public KafkaBuilder WithListener(string kafka)
{
var index = DockerResourceConfiguration.Listeners?.Count() ?? 0;
var protocol = $"{ProtocolPrefix}-{index}";
var listener = $"{protocol}://{kafka}";
var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";

var listeners = new[] { listener };

var host = kafka.Split(':')[0];

var currentListeners = DockerResourceConfiguration.Environments["KAFKA_LISTENERS"]
.Split([','], StringSplitOptions.RemoveEmptyEntries)
.Concat([listener]);

var currentListenersSecurityProtocolMap = DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"]
.Split([','], StringSplitOptions.RemoveEmptyEntries)
.Concat([listenerSecurityProtocolMap]);

return Merge(DockerResourceConfiguration, new KafkaConfiguration(listeners, listeners))
.WithEnvironment("KAFKA_LISTENERS", string.Join(",", currentListeners))
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", string.Join(",", currentListenersSecurityProtocolMap))
.WithNetworkAliases(host);
}

/// <inheritdoc />
protected override KafkaBuilder Init()
{
Expand All @@ -51,10 +97,12 @@ protected override KafkaBuilder Init()
.WithPortBinding(KafkaPort, true)
.WithPortBinding(BrokerPort, true)
.WithPortBinding(ZookeeperPort, true)
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KafkaPort + ",BROKER://0.0.0.0:" + BrokerPort)
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
.WithEnvironment("KAFKA_BROKER_ID", "1")
.WithEnvironment("KAFKA_NODE_ID", "1")
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort)
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1")
.WithEnvironment("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
Expand All @@ -68,6 +116,7 @@ protected override KafkaBuilder Init()
.WithStartupCallback((container, ct) =>
{
const char lf = '\n';
var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty<string>());
var startupScript = new StringBuilder();
startupScript.Append("#!/bin/bash");
startupScript.Append(lf);
Expand All @@ -79,7 +128,7 @@ protected override KafkaBuilder Init()
startupScript.Append(lf);
startupScript.Append("zookeeper-server-start zookeeper.properties &");
startupScript.Append(lf);
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort);
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort + "," + additionalAdvertisedListeners);
startupScript.Append(lf);
startupScript.Append("echo '' > /etc/confluent/docker/ensure");
startupScript.Append(lf);
Expand Down
20 changes: 19 additions & 1 deletion src/Testcontainers.Kafka/KafkaConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ public sealed class KafkaConfiguration : ContainerConfiguration
/// <summary>
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
/// </summary>
public KafkaConfiguration()
/// <param name="listeners">A list of listeners.</param>
/// <param name="advertisedListeners">A list of advertised listeners.</param>
public KafkaConfiguration(
IEnumerable<string> listeners = null,
IEnumerable<string> advertisedListeners = null)
{
Listeners = listeners;
AdvertisedListeners = advertisedListeners;
}

/// <summary>
Expand Down Expand Up @@ -49,5 +55,17 @@ public KafkaConfiguration(KafkaConfiguration resourceConfiguration)
public KafkaConfiguration(KafkaConfiguration oldValue, KafkaConfiguration newValue)
: base(oldValue, newValue)
{
Listeners = BuildConfiguration.Combine(oldValue.Listeners, newValue.Listeners);
AdvertisedListeners = BuildConfiguration.Combine(oldValue.AdvertisedListeners, newValue.AdvertisedListeners);
}

/// <summary>
/// Gets a list of listeners.
/// </summary>
public IEnumerable<string> Listeners { get; }

/// <summary>
/// Gets a list of advertised listeners.
/// </summary>
public IEnumerable<string> AdvertisedListeners { get; }
}
14 changes: 14 additions & 0 deletions src/Testcontainers.Kafka/KafkaContainer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ namespace Testcontainers.Kafka;
[PublicAPI]
public sealed class KafkaContainer : DockerContainer
{
private readonly KafkaConfiguration _configuration;

/// <summary>
/// Initializes a new instance of the <see cref="KafkaContainer" /> class.
/// </summary>
/// <param name="configuration">The container configuration.</param>
public KafkaContainer(KafkaConfiguration configuration)
: base(configuration)
{
_configuration = configuration;
}

/// <summary>
Expand All @@ -21,4 +24,15 @@ public string GetBootstrapAddress()
{
return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString();
}

/// <summary>
/// Gets a list of advertised listeners.
/// </summary>
public IEnumerable<string> AdvertisedListeners
{
get
{
return _configuration.AdvertisedListeners;
}
}
}
2 changes: 2 additions & 0 deletions src/Testcontainers.Kafka/Usings.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
global using System;
global using System.Collections.Generic;
global using System.Linq;
global using System.Text;
global using Docker.DotNet.Models;
global using DotNet.Testcontainers.Builders;
Expand Down
63 changes: 63 additions & 0 deletions tests/Testcontainers.Kafka.Tests/KafkaContainerNetworkTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using System.Collections.Generic;
using System.Text;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;
using DotNet.Testcontainers.Networks;

namespace Testcontainers.Kafka;

public sealed class KafkaContainerNetworkTest : IAsyncLifetime
{
private INetwork _network;
private KafkaContainer _kafkaContainer;

private IContainer _kCatContainer;
public async Task InitializeAsync()
{
_network = new NetworkBuilder().Build();
_kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka")
.WithNetwork(_network)
.WithListener("kafka:19092")
.Build();

_kCatContainer = new ContainerBuilder()
.WithImage("confluentinc/cp-kcat")
.WithNetwork(_network)
.WithCommand("-c", "tail -f /dev/null")
.WithEntrypoint("sh")
.WithResourceMapping(Encoding.Default.GetBytes("Message produced by kcat"), "/data/msgs.txt")
.Build();

await _kCatContainer.StartAsync();
await _kafkaContainer.StartAsync();
}

public Task DisposeAsync()
{
return Task.WhenAll(
_kafkaContainer.DisposeAsync().AsTask(),
_kCatContainer.DisposeAsync().AsTask()
);
}

[Fact]
public async Task TestUsageWithListener()
{
// kcat producer
await _kCatContainer.ExecAsync(new List<string>()
{
"kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt"
});


// kcat consumer
var kCatResult = await _kCatContainer.ExecAsync(new List<string>()
{
"kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1"
});

Assert.Contains("Message produced by kcat", kCatResult.Stdout);
}

}
132 changes: 132 additions & 0 deletions tests/Testcontainers.Kafka.Tests/KafkaContainerWithRegistryTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
using System.Diagnostics;
using System.Threading;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;
using DotNet.Testcontainers.Networks;

namespace Testcontainers.Kafka;

public class KafkaContainerWithRegistryTest : IAsyncLifetime
{
private INetwork _network;
private KafkaContainer _kafkaContainer;
private IContainer _kafkaSchemaRegistry;

const string schema = @"
{
""$schema"": ""http://json-schema.org/draft-07/schema#"",
""$id"": ""http://example.com/product.schema.json"",
""title"": ""User"",
""description"": ""A User"",
""type"": ""object"",
""properties"": {
""age"": {
""description"": ""The age of the user"",
""type"": ""integer""
},
""lastname"": {
""description"": ""Last name of the user"",
""type"": ""string""
},
""firstname"": {
""description"": ""First name of the user"",
""type"": ""string""
}
},
""required"": [""firstname"", ""lastname""]
}";

public async Task InitializeAsync()
{
_network = new NetworkBuilder().Build();
_kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka")
.WithNetwork(_network)
.WithListener("kafka:19092")
.Build();

_kafkaSchemaRegistry = new ContainerBuilder()
.WithImage("confluentinc/cp-schema-registry:7.8.0")
.DependsOn(_kafkaContainer)
.WithPortBinding(8085, true)
.WithNetworkAliases("schemaregistry")
.WithNetwork(_network)
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:19092")
.WithEnvironment("SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8085")
.WithEnvironment("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry")
.WithEnvironment("SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL", "PLAINTEXT")
.WithWaitStrategy(
Wait.ForUnixContainer()
.UntilHttpRequestIsSucceeded(request => request.ForPath("/subjects")
.ForPort(8085))

)
.Build();

await _kafkaContainer.StartAsync();
await _kafkaSchemaRegistry.StartAsync();
}

public async Task DisposeAsync()
{
await Task.WhenAll(
_kafkaContainer.DisposeAsync().AsTask(),
_kafkaSchemaRegistry.DisposeAsync().AsTask()
);
}

/// <summary>
/// Test the usage of the Kafka container with the schema registry.
/// </summary>
[Fact]
public async Task TestUsageWithSchemaRegistry()
{
const string topicName = "user-topic";
var subject = SubjectNameStrategy.Topic.ConstructValueSubjectName(topicName, null);

var bootstrapServers = this._kafkaContainer.GetBootstrapAddress()
.Replace("PLAINTEXT://", "", StringComparison.OrdinalIgnoreCase);

var jsonSerializerConfig = new JsonSerializerConfig
{
BufferBytes = 100,
};

var schemaRegistryUrl = $"http://localhost:{_kafkaSchemaRegistry.GetMappedPublicPort(8085)}";

var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = schemaRegistryUrl,
};
// Init Kafka producer to send a message
var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrapServers,
ClientId = $"test-client-{DateTime.Now.Ticks}",
};
using var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, new Schema(schema, SchemaType.Json));

using var producer = new ProducerBuilder<string, User>(producerConfig)
.SetValueSerializer(new JsonSerializer<User>(schemaRegistry, jsonSerializerConfig))
.Build();

await Assert.ThrowsAsync<SchemaRegistryException>(async () =>
{
try
{
var user = new User { Name = "value", Age = 30 };
await producer.ProduceAsync(topicName, new Message<string, User> { Value = user });
}
catch (Exception e)
{
Assert.True(e is ProduceException<string, User>);
Debug.Assert(e.InnerException != null, "e.InnerException != null");
throw e.InnerException;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
<PackageReference Include="xunit.runner.visualstudio"/>
<PackageReference Include="xunit"/>
<PackageReference Include="Confluent.Kafka"/>
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Json"/>
<PackageReference Include="Confluent.SchemaRegistry"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="../../src/Testcontainers.Kafka/Testcontainers.Kafka.csproj"/>
Expand Down
7 changes: 7 additions & 0 deletions tests/Testcontainers.Kafka.Tests/User.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Testcontainers.Kafka;

public class User
{
public string Name { get; set; }
public int Age { get; set; }
}