From 3a0c60f26090636261fb17936432107c659ed032 Mon Sep 17 00:00:00 2001 From: Pranava Vedagnya Gaddam Date: Thu, 25 Sep 2025 16:34:19 +0530 Subject: [PATCH 1/5] add e2e test --- .../TriggerFunctions.cs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs index 1299b2c5..24b775aa 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs @@ -341,6 +341,21 @@ public static void Trigger( } } + internal static class SingleItem_Avro_With_String_Key + { + public static void Trigger( + [KafkaTrigger("LocalBroker", Constants.MyAvroRecordTopicName, ConsumerGroup = Constants.ConsumerGroupID, AvroSchema = MyAvroRecord.SchemaText)] KafkaEventData kafkaEvent, + ILogger log) + { + var myRecord = kafkaEvent.Value; + if (myRecord == null) + { + throw new Exception("MyAvro record is null"); + } + log.LogInformation("{key}:{ticks}:{value}", kafkaEvent.Key, myRecord.Ticks, myRecord.ID); + } + } + // Tests for key avro schema internal static class SingleItem_GenericAvroValue_With_GenericAvroKey_Trigger { From fd32c5e1db8d0f546f7e40cfe103c9fe9c93db8b Mon Sep 17 00:00:00 2001 From: Pranava Vedagnya Gaddam Date: Thu, 25 Sep 2025 18:39:58 +0530 Subject: [PATCH 2/5] remove key schema from local registry --- .../Serialization/LocalSchemaRegistry.cs | 43 +++++-------------- 1 file changed, 11 insertions(+), 32 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs index 843a8e0c..f5d3fb33 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs @@ -12,31 +12,25 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka /// public class LocalSchemaRegistry : ISchemaRegistryClient { - private readonly string valueSchema; - private readonly string keySchema; - private const int ValueId = 1; - private const int KeyId = 2; - private string valueSubject; - private string keySubject; + private readonly string schema; private List subjects = new List(); - public LocalSchemaRegistry(string valueSchema, string keySchema = null) + public LocalSchemaRegistry(string schema) { - this.valueSchema = valueSchema; - this.keySchema = keySchema; + this.schema = schema; } public int MaxCachedSchemas { - get + get { - return 2; + return 1; } } - public string ConstructKeySubjectName(string topic, string recordType = null) => keySubject = $"{topic}-key"; + public string ConstructKeySubjectName(string topic, string recordType = null) => $"{topic}-key"; - public string ConstructValueSubjectName(string topic, string recordType = null) => valueSubject = $"{topic}-value"; + public string ConstructValueSubjectName(string topic, string recordType = null) => topic; public void Dispose() { @@ -44,7 +38,7 @@ public void Dispose() public Task> GetAllSubjectsAsync() { - return Task.FromResult(subjects); + return Task.FromResult(this.subjects); } public Task GetCompatibilityAsync(string subject = null) @@ -64,28 +58,13 @@ public Task GetRegisteredSchemaAsync(string subject, int versi public Task GetSchemaAsync(string subject, int version) { - if (subject == keySubject) - { - return Task.FromResult(keySchema); - } - else if (subject == valueSubject) - { - return Task.FromResult(valueSchema); - } - return Task.FromResult(null); + return Task.FromResult(this.schema); } public Task GetSchemaAsync(int id, string format = null) { - if (id == KeyId) - { - return Task.FromResult(new Schema(keySchema, SchemaType.Avro)); - } - else if (id == ValueId) - { - return Task.FromResult(new Schema(valueSchema, SchemaType.Avro)); - } - return Task.FromResult(null); + var schema = new Schema(this.schema, SchemaType.Avro); + return Task.FromResult(schema); } public Task GetSchemaIdAsync(string subject, string schema) From 73eaad2852f42e4271637a292d519c682ba1bd34 Mon Sep 17 00:00:00 2001 From: Pranava Vedagnya Gaddam Date: Thu, 25 Sep 2025 18:40:23 +0530 Subject: [PATCH 3/5] use two local schema registry for value and key --- .../Serialization/SerializationHelper.cs | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs index ab2c6a82..f8ff0f43 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs @@ -60,17 +60,16 @@ internal static (object, object) ResolveDeserializers(GetKeyAndValueTypesResult // check for avro deserialization if (specifiedValueAvroSchema != null || specifiedKeyAvroSchema != null) { - // creates local schema registry if no schema registry url is specified - var schemaRegistry = CreateSchemaRegistry(specifiedValueAvroSchema, specifiedKeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); - // if value avro schema exists and value type is generic record, create avro deserializer if (!string.IsNullOrWhiteSpace(specifiedValueAvroSchema)) { + // creates local schema registry to store only value schema since no schema registry url is specified + var valueSchemaRegistry = CreateSchemaRegistry(specifiedValueAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); if (isValueGenericRecord || isValueSpecificRecord) { var methodInfo = typeof(SerializationHelper).GetMethod(nameof(CreateAvroValueDeserializer), BindingFlags.Static | BindingFlags.NonPublic); var genericMethod = methodInfo.MakeGenericMethod(valueType); - valueDeserializer = genericMethod.Invoke(null, new object[] { schemaRegistry }); + valueDeserializer = genericMethod.Invoke(null, new object[] { valueSchemaRegistry }); } else { @@ -83,9 +82,11 @@ internal static (object, object) ResolveDeserializers(GetKeyAndValueTypesResult { if (isKeyGenericRecord || isKeySpecificRecord) { + // creates local schema registry to store only value schema since no schema registry url is specified + var keySchemaRegistry = CreateSchemaRegistry(specifiedKeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); var methodInfo = typeof(SerializationHelper).GetMethod(nameof(CreateAvroKeyDeserializer), BindingFlags.Static | BindingFlags.NonPublic); var genericMethod = methodInfo.MakeGenericMethod(keyType); - keyDeserializer = genericMethod.Invoke(null, new object[] { schemaRegistry }); + keyDeserializer = genericMethod.Invoke(null, new object[] { keySchemaRegistry }); } else { @@ -102,7 +103,7 @@ internal static (object, object) ResolveSchemaRegistryDeserializers(Type valueTy object valueDeserializer = null; object keyDeserializer = null; - var schemaRegistry = CreateSchemaRegistry(null, null, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + var schemaRegistry = CreateSchemaRegistry(null, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); if (typeof(GenericRecord).IsAssignableFrom(valueType)) { @@ -172,19 +173,21 @@ internal static (object, object) ResolveSerializers(Type valueType, Type keyType // check for avro serialization if (specifiedValueAvroSchema != null || specifiedKeyAvroSchema != null) { - // create schema registry client - var schemaRegistry = CreateSchemaRegistry(specifiedValueAvroSchema, specifiedKeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); object serializer = null; // create serializers for avro - generic or specific records if (isValueGenericRecord || isValueSpecificRecord) { - serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(valueType), schemaRegistry, null /* config */); + // create schema registry client only for value schema + var valueSchemaRegistry = CreateSchemaRegistry(specifiedValueAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(valueType), valueSchemaRegistry, null /* config */); valueSerializer = typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(valueType).Invoke(null, new object[] { serializer }); } if (isKeyGenericRecord || isKeySpecificRecord) { - serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(keyType), schemaRegistry, null /* config */); + // create schema registry client only for key schema + var keySchemaRegistry = CreateSchemaRegistry(specifiedKeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(keyType), keySchemaRegistry, null /* config */); keySerializer = typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(keyType).Invoke(null, new object[] { serializer }); } } @@ -197,7 +200,7 @@ internal static (object, object) ResolveSchemaRegistrySerializers(Type valueType object valueSerializer = null; object keySerializer = null; - var schemaRegistry = CreateSchemaRegistry(null, null, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + var schemaRegistry = CreateSchemaRegistry(null, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); if (typeof(GenericRecord).IsAssignableFrom(valueType) || typeof(ISpecificRecord).IsAssignableFrom(valueType)) { @@ -215,11 +218,11 @@ internal static (object, object) ResolveSchemaRegistrySerializers(Type valueType } - private static ISchemaRegistryClient CreateSchemaRegistry(string specifiedValueAvroSchema, string specifiedKeyAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword) + private static ISchemaRegistryClient CreateSchemaRegistry(string specifiedAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword) { - if (!string.IsNullOrWhiteSpace(specifiedValueAvroSchema) || !string.IsNullOrWhiteSpace(specifiedKeyAvroSchema)) + if (!string.IsNullOrWhiteSpace(specifiedAvroSchema)) { - return new LocalSchemaRegistry(specifiedValueAvroSchema, specifiedKeyAvroSchema); + return new LocalSchemaRegistry(specifiedAvroSchema); } if (schemaRegistryUrl != null) { @@ -231,7 +234,7 @@ private static ISchemaRegistryClient CreateSchemaRegistry(string specifiedValueA } return new CachedSchemaRegistryClient(schemaRegistryConfig.ToArray()); } - throw new ArgumentNullException(nameof(specifiedValueAvroSchema), $@"parameter is required when creating an generic avro serializer"); + throw new ArgumentNullException(nameof(specifiedAvroSchema), $@"parameter is required when creating an generic avro serializer"); } internal class GetKeyAndValueTypesResult From dc78d68e49ca8b88b9f96a982b6fe7ced4764b1c Mon Sep 17 00:00:00 2001 From: Pranava Vedagnya Gaddam Date: Thu, 25 Sep 2025 18:43:32 +0530 Subject: [PATCH 4/5] fix comments --- .../Serialization/SerializationHelper.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs index f8ff0f43..6a52f608 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs @@ -289,8 +289,6 @@ internal static GetKeyAndValueTypesResult GetKeyAndValueTypes(string valueAvroSc } } - // if schema registry is present, the types must be generic too? - (valueType, valueAvroSchema) = GetTypeAndSchema(valueType, valueAvroSchemaFromAttribute); } @@ -324,7 +322,7 @@ private static (Type type, string avroSchema) GetTypeAndSchema(Type type, string /// - /// Gets if the type can be serialized/deserialized + /// Gets if the type can be serialized/deserialized. /// internal static bool IsDesSerType(Type type) { From e8e0131c4920e1121c2fe68676282cb8cca04082 Mon Sep 17 00:00:00 2001 From: Pranava Vedagnya Gaddam Date: Thu, 25 Sep 2025 19:14:34 +0530 Subject: [PATCH 5/5] remove unused test --- .../TriggerFunctions.cs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs index 24b775aa..5f168312 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs @@ -133,8 +133,6 @@ public static void Trigger( } } - - internal static class MultiItem_KafkaEventData_String_With_Long_Key_Trigger { public static void Trigger( @@ -341,21 +339,6 @@ public static void Trigger( } } - internal static class SingleItem_Avro_With_String_Key - { - public static void Trigger( - [KafkaTrigger("LocalBroker", Constants.MyAvroRecordTopicName, ConsumerGroup = Constants.ConsumerGroupID, AvroSchema = MyAvroRecord.SchemaText)] KafkaEventData kafkaEvent, - ILogger log) - { - var myRecord = kafkaEvent.Value; - if (myRecord == null) - { - throw new Exception("MyAvro record is null"); - } - log.LogInformation("{key}:{ticks}:{value}", kafkaEvent.Key, myRecord.Ticks, myRecord.ID); - } - } - // Tests for key avro schema internal static class SingleItem_GenericAvroValue_With_GenericAvroKey_Trigger {