Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
822da62
First cut at AssociatedNameStrategy refactor
rayokota Jan 13, 2026
4bab228
Minor fix
rayokota Jan 13, 2026
827a334
Minor fix
rayokota Jan 13, 2026
9d22fec
Fix async usage
rayokota Jan 15, 2026
27a602d
Implement associated name strategy
rayokota Jan 23, 2026
54c081a
Minor refactor
rayokota Jan 24, 2026
a524eb6
Minor cleanup
rayokota Jan 24, 2026
748016a
Add more assoc apis
rayokota Jan 29, 2026
1cf3782
handle 404
rayokota Feb 7, 2026
2a8141a
Minor opt
rayokota Feb 11, 2026
a8b5e8a
Minor renaming
rayokota Feb 23, 2026
4d3d8a6
Incorporate review feedback
rayokota Feb 24, 2026
96f1d99
Incorporate review feedback
rayokota Feb 24, 2026
9e3dc5a
Add tests
rayokota Feb 25, 2026
33a6e62
Fix RecordType handling in subject name strategy
rayokota Feb 25, 2026
890a30b
Fix AssociatedNameStrategy caching
rayokota Feb 25, 2026
d59e325
Minor fix
rayokota Feb 25, 2026
298c191
Minor fix
rayokota Feb 25, 2026
d8251be
Minor fix
rayokota Feb 27, 2026
0b3eefe
Make Associated the default strategy
rayokota Feb 27, 2026
851b1c3
Fallback if sr client is null
rayokota Feb 27, 2026
a698717
Minor fixes
rayokota Feb 27, 2026
55db7ce
Set default in serdes
rayokota Feb 27, 2026
737abfc
Move setting of strategy before config null check
rayokota Feb 27, 2026
e860739
Move setting of strategy before config null check
rayokota Feb 27, 2026
c36e97c
Update docs
rayokota Feb 27, 2026
0e9d8a2
Minor renaming
rayokota Feb 27, 2026
f6e2485
Add association example
rayokota Mar 4, 2026
f350f9d
Minor cleanup
rayokota Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2019 Confluent Inc.
// Copyright 2016-2019 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -73,7 +73,8 @@ public AvroDeserializer(ISchemaRegistryClient schemaRegistryClient, AvroDeserial
if (config == null) { return; }

var nonAvroConfig = config
.Where(item => !item.Key.StartsWith("avro.") && !item.Key.StartsWith("rules."));
.Where(item => !item.Key.StartsWith("avro.") && !item.Key.StartsWith("rules.")
&& !item.Key.StartsWith("subject.name.strategy."));
if (nonAvroConfig.Count() > 0)
{
throw new ArgumentException($"AvroDeserializer: unknown configuration parameter {nonAvroConfig.First().Key}.");
Expand Down
5 changes: 3 additions & 2 deletions src/Confluent.SchemaRegistry.Serdes.Avro/AvroSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2018 Confluent Inc.
// Copyright 2016-2018 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -91,7 +91,8 @@ public AvroSerializer(ISchemaRegistryClient schemaRegistryClient, AvroSerializer
if (config == null) { return; }

var nonAvroConfig = config
.Where(item => !item.Key.StartsWith("avro.") && !item.Key.StartsWith("rules."));
.Where(item => !item.Key.StartsWith("avro.") && !item.Key.StartsWith("rules.")
&& !item.Key.StartsWith("subject.name.strategy."));
if (nonAvroConfig.Count() > 0)
{
throw new ArgumentException($"AvroSerializer: unknown configuration parameter {nonAvroConfig.First().Key}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public GenericDeserializerImpl(

if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToAsyncDelegate(schemaRegistryClient, config); }
if (config.SchemaIdStrategy != null) { this.schemaIdDecoder = config.SchemaIdStrategy.Value.ToDeserializer(); }
}

Expand All @@ -67,7 +67,7 @@ private async Task<GenericRecord> Deserialize(string topic, Headers headers, Rea
// Note: topic is not necessary for deserialization (or knowing if it's a key
// or value) only the schema id is needed.

string subject = GetSubjectName(topic, isKey, null);
string subject = await GetSubjectName(topic, isKey, null).ConfigureAwait(false);
RegisteredSchema latestSchema = null;
if (subject != null)
{
Expand All @@ -89,7 +89,7 @@ private async Task<GenericRecord> Deserialize(string topic, Headers headers, Rea
(writerSchemaJson, writerSchema) = await GetWriterSchema(subject, writerId).ConfigureAwait(false);
if (subject == null)
{
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
subject = await GetSubjectName(topic, isKey, writerSchema.Fullname).ConfigureAwait(false);
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public GenericSerializerImpl(
if (config.UseSchemaId != null) { this.useSchemaId = config.UseSchemaId.Value; }
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToAsyncDelegate(schemaRegistryClient, config); }
if (config.SchemaIdStrategy != null) { this.schemaIdEncoder = config.SchemaIdStrategy.Value.ToEncoder(); }

if (this.useLatestVersion && this.autoRegisterSchema)
Expand Down Expand Up @@ -132,7 +132,7 @@ public async Task<byte[]> Serialize(string topic, Headers headers, GenericRecord
// better to use hash functions based on the writerSchemaString
// object reference, not value.

subject = GetSubjectName(topic, isKey, data.Schema.Fullname);
subject = await GetSubjectName(topic, isKey, data.Schema.Fullname).ConfigureAwait(false);
var subjectSchemaPair = new KeyValuePair<string, string>(subject, writerSchemaString);
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public SpecificDeserializerImpl(

if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToAsyncDelegate(schemaRegistryClient, config); }
if (config.SchemaIdStrategy != null) { this.schemaIdDecoder = config.SchemaIdStrategy.Value.ToDeserializer(); }
}

Expand All @@ -118,7 +118,7 @@ public async Task<T> Deserialize(string topic, Headers headers, ReadOnlyMemory<b
// Note: topic is not necessary for deserialization (or knowing if it's a key
// or value) only the schema id is needed.

string subject = GetSubjectName(topic, isKey, null);
string subject = await GetSubjectName(topic, isKey, null).ConfigureAwait(false);
RegisteredSchema latestSchema = null;
if (subject != null)
{
Expand All @@ -138,7 +138,7 @@ public async Task<T> Deserialize(string topic, Headers headers, ReadOnlyMemory<b
(writerSchemaJson, writerSchema) = await GetWriterSchema(subject, writerId).ConfigureAwait(false);
if (subject == null)
{
subject = GetSubjectName(topic, isKey, writerSchema.Fullname);
subject = await GetSubjectName(topic, isKey, writerSchema.Fullname).ConfigureAwait(false);
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public SpecificSerializerImpl(
if (config.UseSchemaId != null) { this.useSchemaId = config.UseSchemaId.Value; }
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToAsyncDelegate(schemaRegistryClient, config); }
if (config.SchemaIdStrategy != null) { this.schemaIdEncoder = config.SchemaIdStrategy.Value.ToEncoder(); }

if (this.useLatestVersion && this.autoRegisterSchema)
Expand Down Expand Up @@ -173,7 +173,7 @@ public async Task<byte[]> Serialize(string topic, Headers headers, T data, bool
fullname = ((Avro.RecordSchema)((ISpecificRecord)data).Schema).Fullname;
}

subject = GetSubjectName(topic, isKey, fullname);
subject = await GetSubjectName(topic, isKey, fullname).ConfigureAwait(false);
var subjectSchemaPair = new KeyValuePair<string, string>(subject, currentSchemaData.WriterSchemaString);
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,16 @@ public JsonDeserializer(ISchemaRegistryClient schemaRegistryClient, JsonDeserial
if (config == null) { return; }

var nonJsonConfig = config
.Where(item => !item.Key.StartsWith("json.") && !item.Key.StartsWith("rules."));
.Where(item => !item.Key.StartsWith("json.") && !item.Key.StartsWith("rules.")
&& !item.Key.StartsWith("subject.name.strategy."));
if (nonJsonConfig.Count() > 0)
{
throw new ArgumentException($"JsonDeserializer: unknown configuration parameter {nonJsonConfig.First().Key}.");
}

if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToAsyncDelegate(schemaRegistryClient, config); }
if (config.SchemaIdStrategy != null) { this.schemaIdDecoder = config.SchemaIdStrategy.Value.ToDeserializer(); }
if (config.Validate != null) { this.validate = config.Validate.Value; }
}
Expand Down Expand Up @@ -172,7 +173,7 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i

bool isKey = context.Component == MessageComponentType.Key;
string topic = context.Topic;
string subject = GetSubjectName(topic, isKey, null);
string subject = await GetSubjectName(topic, isKey, null).ConfigureAwait(false);
RegisteredSchema latestSchema = null;
if (subject != null)
{
Expand All @@ -197,7 +198,7 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i
.ConfigureAwait(false);
if (subject == null)
{
subject = GetSubjectName(topic, isKey, writerSchema.Title);
subject = await GetSubjectName(topic, isKey, writerSchema.Title).ConfigureAwait(false);
if (subject != null)
{
latestSchema = await GetReaderSchema(subject)
Expand Down
8 changes: 5 additions & 3 deletions src/Confluent.SchemaRegistry.Serdes.Json/JsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer
if (config == null) { return; }

var nonJsonConfig = config
.Where(item => !item.Key.StartsWith("json.") && !item.Key.StartsWith("rules."));
.Where(item => !item.Key.StartsWith("json.") && !item.Key.StartsWith("rules.")
&& !item.Key.StartsWith("subject.name.strategy."));
if (nonJsonConfig.Count() > 0)
{
throw new ArgumentException($"JsonSerializer: unknown configuration parameter {nonJsonConfig.First().Key}");
Expand All @@ -130,7 +131,7 @@ public JsonSerializer(ISchemaRegistryClient schemaRegistryClient, JsonSerializer
if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.LatestCompatibilityStrict != null) { this.latestCompatibilityStrict = config.LatestCompatibilityStrict.Value; }
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToAsyncDelegate(schemaRegistryClient, config); }
if (config.SchemaIdStrategy != null) { this.schemaIdEncoder = config.SchemaIdStrategy.Value.ToEncoder(); }
if (config.Validate != null) { this.validate = config.Validate.Value; }

Expand Down Expand Up @@ -207,7 +208,8 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
subject = GetSubjectName(context.Topic, context.Component == MessageComponentType.Key, this.schemaFullname);
subject = await GetSubjectName(context.Topic, context.Component == MessageComponentType.Key, this.schemaFullname)
.ConfigureAwait(false);
latestSchema = await GetReaderSchema(subject, new Schema(schemaText, ReferenceList, SchemaType.Json))
.ConfigureAwait(continueOnCapturedContext: false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public ProtobufDeserializer(ISchemaRegistryClient schemaRegistryClient, Protobuf
if (config == null) { return; }

var nonProtobufConfig = config
.Where(item => !item.Key.StartsWith("protobuf.") && !item.Key.StartsWith("rules."));
.Where(item => !item.Key.StartsWith("protobuf.") && !item.Key.StartsWith("rules.")
&& !item.Key.StartsWith("subject.name.strategy."));
if (nonProtobufConfig.Count() > 0)
{
throw new ArgumentException($"ProtobufDeserializer: unknown configuration parameter {nonProtobufConfig.First().Key}");
Expand All @@ -87,7 +88,7 @@ public ProtobufDeserializer(ISchemaRegistryClient schemaRegistryClient, Protobuf

if (config.UseLatestVersion != null) { this.useLatestVersion = config.UseLatestVersion.Value; }
if (config.UseLatestWithMetadata != null) { this.useLatestWithMetadata = config.UseLatestWithMetadata; }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToAsyncDelegate(schemaRegistryClient, config); }
if (config.SchemaIdStrategy != null) { this.schemaIdDecoder = config.SchemaIdStrategy.Value.ToDeserializer(); }
}

Expand All @@ -114,7 +115,7 @@ public override async Task<T> DeserializeAsync(ReadOnlyMemory<byte> data, bool i

bool isKey = context.Component == MessageComponentType.Key;
string topic = context.Topic;
string subject = GetSubjectName(topic, isKey, null);
string subject = await GetSubjectName(topic, isKey, null).ConfigureAwait(false);

// Currently Protobuf does not support migration rules because of lack of support for DynamicMessage
// See https://github.com/protocolbuffers/protobuf/issues/658
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public ProtobufSerializer(ISchemaRegistryClient schemaRegistryClient, ProtobufSe
}

var nonProtobufConfig = config
.Where(item => !item.Key.StartsWith("protobuf.") && !item.Key.StartsWith("rules."));
.Where(item => !item.Key.StartsWith("protobuf.") && !item.Key.StartsWith("rules.")
&& !item.Key.StartsWith("subject.name.strategy."));
if (nonProtobufConfig.Count() > 0)
{
throw new ArgumentException($"ProtobufSerializer: unknown configuration parameter {nonProtobufConfig.First().Key}");
Expand All @@ -94,7 +95,7 @@ public ProtobufSerializer(ISchemaRegistryClient schemaRegistryClient, ProtobufSe
{
throw new NotSupportedException("ProtobufSerializer: UseDeprecatedFormat is no longer supported");
}
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToDelegate(); }
if (config.SubjectNameStrategy != null) { this.subjectNameStrategy = config.SubjectNameStrategy.Value.ToAsyncDelegate(schemaRegistryClient, config); }
if (config.SchemaIdStrategy != null) { this.schemaIdEncoder = config.SchemaIdStrategy.Value.ToEncoder(); }
this.referenceSubjectNameStrategy = config.ReferenceSubjectNameStrategy == null
? ReferenceSubjectNameStrategy.ReferenceName.ToDelegate()
Expand Down Expand Up @@ -231,7 +232,8 @@ public override async Task<byte[]> SerializeAsync(T value, SerializationContext
await serdeMutex.WaitAsync().ConfigureAwait(continueOnCapturedContext: false);
try
{
subject = GetSubjectName(context.Topic, context.Component == MessageComponentType.Key, fullname);
subject = await GetSubjectName(context.Topic, context.Component == MessageComponentType.Key, fullname)
.ConfigureAwait(false);
latestSchema = await GetReaderSchema(subject)
.ConfigureAwait(continueOnCapturedContext: false);

Expand Down
39 changes: 16 additions & 23 deletions src/Confluent.SchemaRegistry/AsyncSerde.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class AsyncSerde<TParsedSchema>
protected bool useLatestVersion = false;
protected bool latestCompatibilityStrict = false;
protected IDictionary<string, string> useLatestWithMetadata = null;
protected SubjectNameStrategyDelegate subjectNameStrategy = null;
protected AsyncSubjectNameStrategyDelegate subjectNameStrategy = null;

protected SemaphoreSlim serdeMutex = new SemaphoreSlim(1);

Expand All @@ -65,29 +65,22 @@ protected AsyncSerde(ISchemaRegistryClient schemaRegistryClient, SerdeConfig con
}
}

protected string GetSubjectName(string topic, bool isKey, string recordType)
protected async Task<string> GetSubjectName(string topic, bool isKey, string recordType)
{
try
{
string subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? this.subjectNameStrategy(
new SerializationContext(
isKey ? MessageComponentType.Key : MessageComponentType.Value,
topic),
recordType)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: schemaRegistryClient == null
? null
: isKey
? schemaRegistryClient.ConstructKeySubjectName(topic, recordType)
: schemaRegistryClient.ConstructValueSubjectName(topic, recordType);
return subject;
}
catch (Exception e)
{
return null;
}
string subject = this.subjectNameStrategy != null
// use the subject name strategy specified in the serializer config if available.
? await this.subjectNameStrategy(
new SerializationContext(
isKey ? MessageComponentType.Key : MessageComponentType.Value,
topic),
recordType).ConfigureAwait(false)
// else fall back to the deprecated config from (or default as currently supplied by) SchemaRegistry.
: schemaRegistryClient == null
? null
: isKey
? schemaRegistryClient.ConstructKeySubjectName(topic, recordType)
: schemaRegistryClient.ConstructValueSubjectName(topic, recordType);
return subject;
}

protected async Task<(Schema, TParsedSchema)> GetWriterSchema(string subject, SchemaId writerId, string format = null)
Expand Down
Loading