Skip to content

[FEATURE] ISchemaRegistryProvider: Abstraction and Implementations (Confluent, Azure, Apicurio) #602

@dlrivada

Description

@dlrivada

Summary

Define ISchemaRegistryProvider abstraction and implement for Confluent Schema Registry, Azure Schema Registry, and Apicurio Registry.

Motivation

Schema registry is mentioned in issues #259 and #309 but no abstraction exists. Schema registry is essential for:

  • Event versioning and evolution
  • Contract-first development
  • Cross-service compatibility
  • Avoiding breaking changes in event-driven systems

Proposed Solution

Interface Definition

namespace Encina.SchemaRegistry;

public interface ISchemaRegistryProvider
{
    /// <summary>
    /// Registers a new schema version.
    /// </summary>
    Task<SchemaVersion> RegisterSchemaAsync(string subject, string schema, SchemaFormat format, CancellationToken ct = default);

    /// <summary>
    /// Gets the latest schema for a subject.
    /// </summary>
    Task<Schema?> GetLatestSchemaAsync(string subject, CancellationToken ct = default);

    /// <summary>
    /// Gets a specific schema version.
    /// </summary>
    Task<Schema?> GetSchemaByVersionAsync(string subject, int version, CancellationToken ct = default);

    /// <summary>
    /// Gets a schema by its global ID.
    /// </summary>
    Task<Schema?> GetSchemaByIdAsync(int schemaId, CancellationToken ct = default);

    /// <summary>
    /// Checks if a schema is compatible with the subject's compatibility rules.
    /// </summary>
    Task<CompatibilityResult> CheckCompatibilityAsync(string subject, string schema, SchemaFormat format, CancellationToken ct = default);

    /// <summary>
    /// Lists all subjects.
    /// </summary>
    Task<IEnumerable<string>> GetSubjectsAsync(CancellationToken ct = default);

    /// <summary>
    /// Lists all versions for a subject.
    /// </summary>
    Task<IEnumerable<int>> GetVersionsAsync(string subject, CancellationToken ct = default);
}

public enum SchemaFormat { Avro, Json, Protobuf }
public record Schema(int Id, string Subject, int Version, string Content, SchemaFormat Format);
public record SchemaVersion(int Id, int Version);
public record CompatibilityResult(bool IsCompatible, IEnumerable<string>? Messages = null);

Confluent Schema Registry Implementation

public class SchemaRegistryConfluent : ISchemaRegistryProvider
{
    private readonly ISchemaRegistryClient _client;

    public async Task<SchemaVersion> RegisterSchemaAsync(string subject, string schema, SchemaFormat format, CancellationToken ct)
    {
        var schemaType = format switch
        {
            SchemaFormat.Avro => Confluent.SchemaRegistry.SchemaType.Avro,
            SchemaFormat.Json => Confluent.SchemaRegistry.SchemaType.Json,
            SchemaFormat.Protobuf => Confluent.SchemaRegistry.SchemaType.Protobuf,
            _ => throw new ArgumentOutOfRangeException()
        };

        var id = await _client.RegisterSchemaAsync(subject, new Schema(schema, schemaType));
        var latestVersion = await _client.GetLatestSchemaAsync(subject);
        return new SchemaVersion(id, latestVersion.Version);
    }
}

Supported Providers

Provider Package Notes
Confluent Schema Registry Confluent.SchemaRegistry Standard for Kafka
Azure Schema Registry Azure.Data.SchemaRegistry Event Hubs integration
Apicurio Registry REST API Open source alternative

Alternatives Considered

  1. Use Confluent Schema Registry SDK directly: The Confluent SDK is mature but ties you to Kafka. The abstraction allows using Azure Event Hubs or other brokers with schema validation.

  2. Use JSON Schema validation only: JSON Schema is simple but lacks the registry, versioning, and compatibility checks that schema registries provide.

  3. Use contract-first with OpenAPI/AsyncAPI only: These are great for documentation but don't provide runtime schema validation and evolution like registries do.

Affected Packages

  • Encina (core)
  • Encina.EntityFrameworkCore
  • Encina.Dapper.*
  • Encina.ADO.*
  • Encina.AspNetCore
  • Encina.OpenTelemetry
  • Encina.Caching.*
  • Other: Encina.SchemaRegistry (new package), Encina.SchemaRegistry.Confluent, Encina.SchemaRegistry.Azure

Additional Context

Acceptance Criteria

  • ISchemaRegistryProvider interface in new Encina.SchemaRegistry package
  • Confluent Schema Registry implementation
  • Azure Schema Registry implementation
  • Apicurio Registry implementation (REST client)
  • Serializer integration (Avro, Json, Protobuf)
  • ServiceCollection extensions
  • Unit tests
  • Integration tests
  • Documentation

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Labels

    area-messagingMessaging patterns (Outbox, Inbox, Sagas, etc.)area-schema-registrySchema Registry and event schema governancecomplexity-mediumComplexity: MediumenhancementNew feature or requestnew-packageRequires creating a new NuGet packagephase-2-functionalityPhase 2: Functionality - new featurespriority-mediumPriority: Medium (⭐⭐⭐)

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions