diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs index 843a8e0c..f5d3fb33 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs @@ -12,31 +12,25 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka /// public class LocalSchemaRegistry : ISchemaRegistryClient { - private readonly string valueSchema; - private readonly string keySchema; - private const int ValueId = 1; - private const int KeyId = 2; - private string valueSubject; - private string keySubject; + private readonly string schema; private List subjects = new List(); - public LocalSchemaRegistry(string valueSchema, string keySchema = null) + public LocalSchemaRegistry(string schema) { - this.valueSchema = valueSchema; - this.keySchema = keySchema; + this.schema = schema; } public int MaxCachedSchemas { - get + get { - return 2; + return 1; } } - public string ConstructKeySubjectName(string topic, string recordType = null) => keySubject = $"{topic}-key"; + public string ConstructKeySubjectName(string topic, string recordType = null) => $"{topic}-key"; - public string ConstructValueSubjectName(string topic, string recordType = null) => valueSubject = $"{topic}-value"; + public string ConstructValueSubjectName(string topic, string recordType = null) => topic; public void Dispose() { @@ -44,7 +38,7 @@ public void Dispose() public Task> GetAllSubjectsAsync() { - return Task.FromResult(subjects); + return Task.FromResult(this.subjects); } public Task GetCompatibilityAsync(string subject = null) @@ -64,28 +58,13 @@ public Task GetRegisteredSchemaAsync(string subject, int versi public Task GetSchemaAsync(string subject, int version) { - if (subject == keySubject) - { - return Task.FromResult(keySchema); - } - else if (subject == valueSubject) - { - return Task.FromResult(valueSchema); - } - return Task.FromResult(null); + return Task.FromResult(this.schema); } public Task GetSchemaAsync(int id, string format = null) { - if (id == KeyId) - { - return Task.FromResult(new Schema(keySchema, SchemaType.Avro)); - } - else if (id == ValueId) - { - return Task.FromResult(new Schema(valueSchema, SchemaType.Avro)); - } - return Task.FromResult(null); + var schema = new Schema(this.schema, SchemaType.Avro); + return Task.FromResult(schema); } public Task GetSchemaIdAsync(string subject, string schema) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs index ab2c6a82..6a52f608 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs @@ -60,17 +60,16 @@ internal static (object, object) ResolveDeserializers(GetKeyAndValueTypesResult // check for avro deserialization if (specifiedValueAvroSchema != null || specifiedKeyAvroSchema != null) { - // creates local schema registry if no schema registry url is specified - var schemaRegistry = CreateSchemaRegistry(specifiedValueAvroSchema, specifiedKeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); - // if value avro schema exists and value type is generic record, create avro deserializer if (!string.IsNullOrWhiteSpace(specifiedValueAvroSchema)) { + // creates local schema registry to store only value schema since no schema registry url is specified + var valueSchemaRegistry = CreateSchemaRegistry(specifiedValueAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); if (isValueGenericRecord || isValueSpecificRecord) { var methodInfo = typeof(SerializationHelper).GetMethod(nameof(CreateAvroValueDeserializer), BindingFlags.Static | BindingFlags.NonPublic); var genericMethod = methodInfo.MakeGenericMethod(valueType); - valueDeserializer = genericMethod.Invoke(null, new object[] { schemaRegistry }); + valueDeserializer = genericMethod.Invoke(null, new object[] { valueSchemaRegistry }); } else { @@ -83,9 +82,11 @@ internal static (object, object) ResolveDeserializers(GetKeyAndValueTypesResult { if (isKeyGenericRecord || isKeySpecificRecord) { + // creates local schema registry to store only value schema since no schema registry url is specified + var keySchemaRegistry = CreateSchemaRegistry(specifiedKeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); var methodInfo = typeof(SerializationHelper).GetMethod(nameof(CreateAvroKeyDeserializer), BindingFlags.Static | BindingFlags.NonPublic); var genericMethod = methodInfo.MakeGenericMethod(keyType); - keyDeserializer = genericMethod.Invoke(null, new object[] { schemaRegistry }); + keyDeserializer = genericMethod.Invoke(null, new object[] { keySchemaRegistry }); } else { @@ -102,7 +103,7 @@ internal static (object, object) ResolveSchemaRegistryDeserializers(Type valueTy object valueDeserializer = null; object keyDeserializer = null; - var schemaRegistry = CreateSchemaRegistry(null, null, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + var schemaRegistry = CreateSchemaRegistry(null, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); if (typeof(GenericRecord).IsAssignableFrom(valueType)) { @@ -172,19 +173,21 @@ internal static (object, object) ResolveSerializers(Type valueType, Type keyType // check for avro serialization if (specifiedValueAvroSchema != null || specifiedKeyAvroSchema != null) { - // create schema registry client - var schemaRegistry = CreateSchemaRegistry(specifiedValueAvroSchema, specifiedKeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); object serializer = null; // create serializers for avro - generic or specific records if (isValueGenericRecord || isValueSpecificRecord) { - serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(valueType), schemaRegistry, null /* config */); + // create schema registry client only for value schema + var valueSchemaRegistry = CreateSchemaRegistry(specifiedValueAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(valueType), valueSchemaRegistry, null /* config */); valueSerializer = typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(valueType).Invoke(null, new object[] { serializer }); } if (isKeyGenericRecord || isKeySpecificRecord) { - serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(keyType), schemaRegistry, null /* config */); + // create schema registry client only for key schema + var keySchemaRegistry = CreateSchemaRegistry(specifiedKeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(keyType), keySchemaRegistry, null /* config */); keySerializer = typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(keyType).Invoke(null, new object[] { serializer }); } } @@ -197,7 +200,7 @@ internal static (object, object) ResolveSchemaRegistrySerializers(Type valueType object valueSerializer = null; object keySerializer = null; - var schemaRegistry = CreateSchemaRegistry(null, null, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + var schemaRegistry = CreateSchemaRegistry(null, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); if (typeof(GenericRecord).IsAssignableFrom(valueType) || typeof(ISpecificRecord).IsAssignableFrom(valueType)) { @@ -215,11 +218,11 @@ internal static (object, object) ResolveSchemaRegistrySerializers(Type valueType } - private static ISchemaRegistryClient CreateSchemaRegistry(string specifiedValueAvroSchema, string specifiedKeyAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword) + private static ISchemaRegistryClient CreateSchemaRegistry(string specifiedAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword) { - if (!string.IsNullOrWhiteSpace(specifiedValueAvroSchema) || !string.IsNullOrWhiteSpace(specifiedKeyAvroSchema)) + if (!string.IsNullOrWhiteSpace(specifiedAvroSchema)) { - return new LocalSchemaRegistry(specifiedValueAvroSchema, specifiedKeyAvroSchema); + return new LocalSchemaRegistry(specifiedAvroSchema); } if (schemaRegistryUrl != null) { @@ -231,7 +234,7 @@ private static ISchemaRegistryClient CreateSchemaRegistry(string specifiedValueA } return new CachedSchemaRegistryClient(schemaRegistryConfig.ToArray()); } - throw new ArgumentNullException(nameof(specifiedValueAvroSchema), $@"parameter is required when creating an generic avro serializer"); + throw new ArgumentNullException(nameof(specifiedAvroSchema), $@"parameter is required when creating an generic avro serializer"); } internal class GetKeyAndValueTypesResult @@ -286,8 +289,6 @@ internal static GetKeyAndValueTypesResult GetKeyAndValueTypes(string valueAvroSc } } - // if schema registry is present, the types must be generic too? - (valueType, valueAvroSchema) = GetTypeAndSchema(valueType, valueAvroSchemaFromAttribute); } @@ -321,7 +322,7 @@ private static (Type type, string avroSchema) GetTypeAndSchema(Type type, string /// - /// Gets if the type can be serialized/deserialized + /// Gets if the type can be serialized/deserialized. /// internal static bool IsDesSerType(Type type) { diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs index 1299b2c5..5f168312 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs @@ -133,8 +133,6 @@ public static void Trigger( } } - - internal static class MultiItem_KafkaEventData_String_With_Long_Key_Trigger { public static void Trigger(