|
7 | 7 |
|
8 | 8 | import java.nio.ByteBuffer; |
9 | 9 | import java.util.Collections; |
10 | | -import java.util.Map; |
11 | 10 |
|
12 | 11 | import javax.enterprise.context.ApplicationScoped; |
13 | 12 | import javax.enterprise.inject.Produces; |
14 | 13 | import javax.inject.Inject; |
15 | 14 |
|
16 | 15 | import org.apache.avro.generic.GenericRecord; |
| 16 | +import org.apache.kafka.common.serialization.Deserializer; |
17 | 17 | import org.apache.kafka.common.serialization.Serde; |
18 | 18 | import org.apache.kafka.common.serialization.Serdes; |
19 | 19 | import org.apache.kafka.streams.StreamsBuilder; |
|
24 | 24 | import org.slf4j.Logger; |
25 | 25 | import org.slf4j.LoggerFactory; |
26 | 26 |
|
27 | | -import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; |
28 | | -import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde; |
| 27 | +import io.apicurio.registry.client.CompatibleClient; |
| 28 | +import io.apicurio.registry.client.RegistryService; |
| 29 | +import io.apicurio.registry.utils.serde.AvroKafkaDeserializer; |
| 30 | +import io.apicurio.registry.utils.serde.AvroKafkaSerializer; |
| 31 | + |
29 | 32 | import io.debezium.examples.cloudevents.dataextractor.model.CloudEvent; |
30 | 33 | import io.debezium.serde.DebeziumSerdes; |
| 34 | + |
| 35 | + |
31 | 36 | /** |
32 | 37 | * Starts up the KStreams pipeline once the source topics have been created. |
33 | 38 | * |
@@ -73,9 +78,10 @@ Topology createStreamTopology() { |
73 | 78 | .mapValues(ce -> ce.data) |
74 | 79 | .to(jsonAvroExtractedTopic, Produced.with(longKeySerde, Serdes.ByteArray())); |
75 | 80 |
|
76 | | - Serde<GenericRecord> genericAvroSerde = new GenericAvroSerde(); |
77 | | - Map<String, String> config = Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); |
78 | | - genericAvroSerde.configure(config, false); |
| 81 | + |
| 82 | + RegistryService service = CompatibleClient.createCompatible(schemaRegistryUrl); |
| 83 | + Deserializer<GenericRecord> deserializer = new AvroKafkaDeserializer(service); |
| 84 | + Serde<GenericRecord> genericAvroSerde = Serdes.serdeFrom(new AvroKafkaSerializer<>(service), deserializer); |
79 | 85 |
|
80 | 86 | builder.stream(avroAvroCustomersTopic, Consumed.with(longKeySerde, genericAvroSerde)) |
81 | 87 | .mapValues(ce -> ((ByteBuffer) ce.get("data")).array()) |
|
0 commit comments