Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
822da62
First cut at AssociatedNameStrategy refactor
rayokota Jan 13, 2026
4bab228
Minor fix
rayokota Jan 13, 2026
827a334
Minor fix
rayokota Jan 13, 2026
9d22fec
Fix async usage
rayokota Jan 15, 2026
27a602d
Implement associated name strategy
rayokota Jan 23, 2026
54c081a
Minor refactor
rayokota Jan 24, 2026
a524eb6
Minor cleanup
rayokota Jan 24, 2026
748016a
Add more assoc apis
rayokota Jan 29, 2026
1cf3782
handle 404
rayokota Feb 7, 2026
2a8141a
Minor opt
rayokota Feb 11, 2026
a8b5e8a
Minor renaming
rayokota Feb 23, 2026
4d3d8a6
Incorporate review feedback
rayokota Feb 24, 2026
96f1d99
Incorporate review feedback
rayokota Feb 24, 2026
9e3dc5a
Add tests
rayokota Feb 25, 2026
33a6e62
Fix RecordType handling in subject name strategy
rayokota Feb 25, 2026
890a30b
Fix AssociatedNameStrategy caching
rayokota Feb 25, 2026
d59e325
Minor fix
rayokota Feb 25, 2026
298c191
Minor fix
rayokota Feb 25, 2026
d8251be
Minor fix
rayokota Feb 27, 2026
0b3eefe
Make Associated the default strategy
rayokota Feb 27, 2026
851b1c3
Fallback if sr client is null
rayokota Feb 27, 2026
a698717
Minor fixes
rayokota Feb 27, 2026
55db7ce
Set default in serdes
rayokota Feb 27, 2026
737abfc
Move setting of strategy before config null check
rayokota Feb 27, 2026
e860739
Move setting of strategy before config null check
rayokota Feb 27, 2026
c36e97c
Update docs
rayokota Feb 27, 2026
0e9d8a2
Minor renaming
rayokota Feb 27, 2026
f6e2485
Add association example
rayokota Mar 4, 2026
f350f9d
Minor cleanup
rayokota Mar 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions Confluent.Kafka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ProtobufEncryption", "examp
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AvroGenericMigration", "examples\AvroGenericMigration\AvroGenericMigration.csproj", "{10CD6000-59A3-40C9-905F-20F4EE03C1B4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "AvroGenericAssociation", "examples\AvroGenericAssociation\AvroGenericAssociation.csproj", "{C880F178-7705-4814-A048-CBFFE5D16512}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{B544BD2C-F46D-4C68-8A31-3010E371E640}"
ProjectSection(SolutionItems) = preProject
src\Shared\DictionaryEqualityComparer.cs = src\Shared\DictionaryEqualityComparer.cs
Expand Down Expand Up @@ -652,6 +654,18 @@ Global
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Release|x64.Build.0 = Release|Any CPU
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Release|x86.ActiveCfg = Release|Any CPU
{10CD6000-59A3-40C9-905F-20F4EE03C1B4}.Release|x86.Build.0 = Release|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Debug|x64.ActiveCfg = Debug|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Debug|x64.Build.0 = Debug|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Debug|x86.ActiveCfg = Debug|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Debug|x86.Build.0 = Debug|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Release|Any CPU.Build.0 = Release|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Release|x64.ActiveCfg = Release|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Release|x64.Build.0 = Release|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Release|x86.ActiveCfg = Release|Any CPU
{C880F178-7705-4814-A048-CBFFE5D16512}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{09C3255B-1972-4EB8-91D0-FB9F5CD82BCB} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
Expand Down Expand Up @@ -694,6 +708,7 @@ Global
{6727B941-3E07-4841-84E0-8EE47E04A3B3} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
{6988FB1F-3648-4E5E-821F-55D67CA00FD7} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
{10CD6000-59A3-40C9-905F-20F4EE03C1B4} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
{C880F178-7705-4814-A048-CBFFE5D16512} = {9CE4B5F7-9251-4340-BACB-207066A5DBE8}
{B544BD2C-F46D-4C68-8A31-3010E371E640} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
{BA928213-B366-4290-A4AA-E0FA094333FE} = {D5322057-682D-49F2-A2FB-68FA69187B28}
{EDA3CED2-A80E-44F5-B5E4-79E38BC1376F} = {D5322057-682D-49F2-A2FB-68FA69187B28}
Expand Down
13 changes: 13 additions & 0 deletions examples/AvroGenericAssociation/AvroGenericAssociation.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<ProjectTypeGuids>{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
<AssemblyName>AvroGenericAssociation</AssemblyName>
</PropertyGroup>

<ItemGroup>
<!-- nuget package reference: <PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="2.9.0" /> -->
<ProjectReference Include="../../src/Confluent.SchemaRegistry.Serdes.Avro/Confluent.SchemaRegistry.Serdes.Avro.csproj" />
</ItemGroup>

</Project>
177 changes: 177 additions & 0 deletions examples/AvroGenericAssociation/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2025 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using Avro;
using Avro.Generic;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Schema = Confluent.SchemaRegistry.Schema;


namespace Confluent.Kafka.Examples.AvroGenericAssociation
{
/// <summary>
/// Demonstrates how to use the Associated subject name strategy with
/// Avro GenericRecord serialization. This example:
/// 1. Registers a schema under a custom subject name.
/// 2. Creates a STRONG association between the topic and the subject.
/// 3. Produces and consumes messages using the Associated strategy
/// (which looks up the subject via the association).
/// 4. Deletes the association.
/// </summary>
class Program
{
static async Task Main(string[] args)
{
if (args.Length != 3)
{
Console.WriteLine("Usage: .. bootstrapServers schemaRegistryUrl topicName");
return;
}

string bootstrapServers = args[0];
string schemaRegistryUrl = args[1];
string topicName = args[2];
string groupName = "avro-generic-association-example-group";
string subjectName = $"{topicName}-custom-value";

var s = (RecordSchema)RecordSchema.Parse(
@"{
""type"": ""record"",
""name"": ""User"",
""fields"": [
{""name"": ""name"", ""type"": ""string""},
{""name"": ""favorite_number"", ""type"": ""long""},
{""name"": ""favorite_color"", ""type"": ""string""}
]
}"
);

using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
{
// Step 1: Register the schema under a custom subject name.
var schemaObj = new Schema(s.ToString(), new List<SchemaReference>(), SchemaType.Avro);
var registeredSchema = await schemaRegistry.RegisterSchemaWithResponseAsync(subjectName, schemaObj, false);
Console.WriteLine($"Registered schema under subject '{subjectName}' with id {registeredSchema.Id}");

// Step 2: Create a STRONG association between the topic and the subject.
var associationRequest = new AssociationCreateOrUpdateRequest(
resourceName: topicName,
resourceNamespace: "lkc-123",
resourceId: "lkc-123:" + topicName,
resourceType: "topic",
associations: new List<AssociationCreateOrUpdateInfo>
{
new AssociationCreateOrUpdateInfo(
subject: subjectName,
associationType: "value",
lifecycle: "STRONG",
frozen: null,
schema: null,
normalize: null
)
}
);
var associationResponse = await schemaRegistry.CreateAssociationAsync(associationRequest);
Console.WriteLine($"Created STRONG association: topic '{topicName}' -> subject '{subjectName}'");

// Step 3: Produce and consume using the Associated strategy (the default).
CancellationTokenSource cts = new CancellationTokenSource();
var consumeTask = Task.Run(() =>
{
using (var consumer =
new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName, AutoOffsetReset = AutoOffsetReset.Earliest })
.SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build())
{
consumer.Subscribe(topicName);

try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed - Key: {consumeResult.Message.Key}, Value: {consumeResult.Message.Value}");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
});

using (var producer =
new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
.SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry, new AvroSerializerConfig
{
AutoRegisterSchemas = false,
UseLatestVersion = true
}))
.Build())
{
Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

long i = 1;
string text;
while ((text = Console.ReadLine()) != "q")
{
var record = new GenericRecord(s);
record.Add("name", text);
record.Add("favorite_number", i++);
record.Add("favorite_color", "blue");

try
{
var dr = await producer.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record });
Console.WriteLine($"Produced to: {dr.TopicPartitionOffset}");
}
catch (ProduceException<string, GenericRecord> ex)
{
Console.WriteLine($"Error producing message: {ex}");
}
}
}

cts.Cancel();
await consumeTask;

// Step 4: Delete the association.
var associations = await schemaRegistry.GetAssociationsByResourceNameAsync(
topicName, "-", "topic", null, null, 0, -1);
if (associations.Count > 0)
{
await schemaRegistry.DeleteAssociationsAsync(
associations[0].ResourceId, "topic", new List<string> { "value" }, true);
Console.WriteLine($"Deleted associations for topic '{topicName}'");
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2019 Confluent Inc.
// Copyright 2016-2019 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,7 +73,8 @@ public AvroDeserializer(ISchemaRegistryClient schemaRegistryClient, AvroDeserial
if (config == null) { return; }

var nonAvroConfig = config
.Where(item => !item.Key.StartsWith("avro.") && !item.Key.StartsWith("rules."));
.Where(item => !item.Key.StartsWith("avro.") && !item.Key.StartsWith("rules.")
&& !item.Key.StartsWith("subject.name.strategy."));
if (nonAvroConfig.Count() > 0)
{
throw new ArgumentException($"AvroDeserializer: unknown configuration parameter {nonAvroConfig.First().Key}.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public IDictionary<string, string> UseLatestWithMetadata
/// <summary>
/// Subject name strategy.
///
/// default: SubjectNameStrategy.Topic
/// default: SubjectNameStrategy.Associated
/// </summary>
public SubjectNameStrategy? SubjectNameStrategy
{
Expand Down
5 changes: 3 additions & 2 deletions src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2018 Confluent Inc.
// Copyright 2016-2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,7 +91,8 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer
if (config == null) { return; }

var nonAvroConfig = config
.Where(item => !item.Key.StartsWith("avro.") && !item.Key.StartsWith("rules."));
.Where(item => !item.Key.StartsWith("avro.") && !item.Key.StartsWith("rules.")
&& !item.Key.StartsWith("subject.name.strategy."));
if (nonAvroConfig.Count() > 0)
{
throw new ArgumentException($"AvroSerializer: unknown configuration parameter {nonAvroConfig.First().Key}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public IDictionary<string, string> UseLatestWithMetadata
/// <summary>
/// Subject name strategy.
///
/// default: SubjectNameStrategy.Topic
/// default: SubjectNameStrategy.Associated
/// </summary>
public SubjectNameStrategy? SubjectNameStrategy
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ public GenericDeserializerImpl(
AvroDeserializerConfig config,
RuleRegistry ruleRegistry) : base(schemaRegistryClient, config, ruleRegistry)
{
this.subjectNameStrategy = (config?.SubjectNameStrategy ?? SubjectNameStrategy.Associated).ToAsyncDelegate(schemaRegistryClient, config);
Comment thread
rayokota marked this conversation as resolved.

if (config == null) { return; }

if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SchemaIdStrategy != null) { this.schemaIdDecoder = config.SchemaIdStrategy.Value.ToDeserializer(); }
}

Expand All @@ -67,7 +68,7 @@ private async Task<GenericRecord> Deserialize(string topic, Headers headers, Rea
// Note: topic is not necessary for deserialization (or knowing if it's a key
// or value) only the schema id is needed.

string subject = GetSubjectName(topic, isKey, null);
string subject = await GetSubjectName(topic, isKey, null).ConfigureAwait(false);
RegisteredSchema latestSchema = null;
if (subject != null)
{
Expand All @@ -89,7 +90,7 @@ private async Task<GenericRecord> Deserialize(string topic, Headers headers, Rea
(writerSchemaJson, writerSchema) = await GetWriterSchema(subject, writerId).ConfigureAwait(false);
if (subject == null)
{
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
subject = await GetSubjectName(topic, isKey, writerSchema.Fullname).ConfigureAwait(false);
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public GenericSerializerImpl(
AvroSerializerConfig config,
RuleRegistry ruleRegistry) : base(schemaRegistryClient, config, ruleRegistry)
{
this.subjectNameStrategy = (config?.SubjectNameStrategy ?? SubjectNameStrategy.Associated).ToAsyncDelegate(schemaRegistryClient, config);
Comment thread
rayokota marked this conversation as resolved.

if (config == null) { return; }

if (config.BufferBytes != null) { this.initialBufferSize = config.BufferBytes.Value; }
Expand All @@ -49,7 +51,6 @@ public GenericSerializerImpl(
if (config.UseSchemaId != null) { this.useSchemaId = config.UseSchemaId.Value; }
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SchemaIdStrategy != null) { this.schemaIdEncoder = config.SchemaIdStrategy.Value.ToEncoder(); }

if (this.useLatestVersion && this.autoRegisterSchema)
Expand Down Expand Up @@ -132,7 +133,7 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
// better to use hash functions based on the writerSchemaString
// object reference, not value.

subject = GetSubjectName(topic, isKey, data.Schema.Fullname);
subject = await GetSubjectName(topic, isKey, data.Schema.Fullname).ConfigureAwait(false);
var subjectSchemaPair = new KeyValuePair<string, string>(subject, writerSchemaString);
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,12 @@ public SpecificDeserializerImpl(
);
}

this.subjectNameStrategy = (config?.SubjectNameStrategy ?? SubjectNameStrategy.Associated).ToAsyncDelegate(schemaRegistryClient, config);
Comment thread
rayokota marked this conversation as resolved.

if (config == null) { return; }

if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SchemaIdStrategy != null) { this.schemaIdDecoder = config.SchemaIdStrategy.Value.ToDeserializer(); }
}

Expand All @@ -118,7 +119,7 @@ public async Task<T> Deserialize(string topic, Headers headers, ReadOnlyMemory<b
// Note: topic is not necessary for deserialization (or knowing if it's a key
// or value) only the schema id is needed.

string subject = GetSubjectName(topic, isKey, null);
string subject = await GetSubjectName(topic, isKey, null).ConfigureAwait(false);
RegisteredSchema latestSchema = null;
if (subject != null)
{
Expand All @@ -138,7 +139,7 @@ public async Task<T> Deserialize(string topic, Headers headers, ReadOnlyMemory<b
(writerSchemaJson, writerSchema) = await GetWriterSchema(subject, writerId).ConfigureAwait(false);
if (subject == null)
{
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
subject = await GetSubjectName(topic, isKey, writerSchema.Fullname).ConfigureAwait(false);
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
Expand Down
Loading