diff --git a/kafka-e2e-kotlin-sample/README.adoc b/kafka-e2e-kotlin-sample/README.adoc index 5d43c42..60a153d 100644 --- a/kafka-e2e-kotlin-sample/README.adoc +++ b/kafka-e2e-kotlin-sample/README.adoc @@ -69,12 +69,11 @@ Customer topic kafkacat output: ``` Key (4 bytes): - Value (21 bytes): John DoeElm Street - Timestamp: 1560236330571 + Value (26 bytes): John DoeElm Street + Timestamp: 1561153134430 Partition: 0 Offset: 0 - Headers: contentType="application/vnd.customer.v1+avro",spring_json_header_types={"contentType":"java.lang.String"} -% Reached end of topic customer [0] at offset 1 + Headers: ``` @@ -82,18 +81,17 @@ Order topic kafkacat output: ``` Key (4 bytes): - Value (4 bytes): ? - Timestamp: 1560236355714 + Value (9 bytes): ? + Timestamp: 1561153313512 Partition: 0 Offset: 0 - Headers: contentType="application/vnd.ordercreatedevent.v1+avro",spring_json_header_types={"contentType":"java.lang.String"} + Headers: Key (4 bytes): - Value (23 bytes): ?John DoeElm Street - Timestamp: 1560236355714 + Value (28 bytes): ?John DoeElm Street + Timestamp: 1561153313512 Partition: 0 Offset: 1 - Headers: spring_json_header_types={"contentType":"java.lang.String"},contentType="application/vnd.ordershippedevent.v1+avro" -% Reached end of topic order [0] at offset 2 + Headers: ``` diff --git a/kafka-e2e-kotlin-sample/customer-service/src/main/resources/application.yml b/kafka-e2e-kotlin-sample/customer-service/src/main/resources/application.yml index bb8f278..29d1c10 100644 --- a/kafka-e2e-kotlin-sample/customer-service/src/main/resources/application.yml +++ b/kafka-e2e-kotlin-sample/customer-service/src/main/resources/application.yml @@ -3,6 +3,8 @@ spring: name: customer-service cloud: stream: + schema-registry-client: + endpoint: http://localhost:8081 kafka: bindings: output: @@ -10,12 +12,19 @@ spring: configuration: key: serializer: org.apache.kafka.common.serialization.IntegerSerializer + value: + serializer: io.confluent.kafka.serializers.KafkaAvroSerializer + subject: + name: + strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy + schema: + registry: + url: ${spring.cloud.stream.schema-registry-client.endpoint} bindings: output: destination: customer - contentType: application/*+avro - schema-registry-client: - endpoint: http://localhost:8081 + producer: + useNativeEncoding: true server: port: 8084 diff --git a/kafka-e2e-kotlin-sample/order-service/src/main/resources/application.yml b/kafka-e2e-kotlin-sample/order-service/src/main/resources/application.yml index 6480dc3..af0603e 100644 --- a/kafka-e2e-kotlin-sample/order-service/src/main/resources/application.yml +++ b/kafka-e2e-kotlin-sample/order-service/src/main/resources/application.yml @@ -3,6 +3,8 @@ spring: name: order-service cloud: stream: + schema-registry-client: + endpoint: http://localhost:8081 kafka: bindings: output: @@ -10,12 +12,19 @@ spring: configuration: key: serializer: org.apache.kafka.common.serialization.IntegerSerializer + value: + serializer: io.confluent.kafka.serializers.KafkaAvroSerializer + subject: + name: + strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy + schema: + registry: + url: ${spring.cloud.stream.schema-registry-client.endpoint} bindings: output: destination: order - contentType: application/*+avro - schema-registry-client: - endpoint: http://localhost:8081 + producer: + useNativeEncoding: true server: port: 8085 diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt index 10d8e87..dff7dd7 100644 --- a/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt +++ b/kafka-e2e-kotlin-sample/shipping-service/src/main/kotlin/kafka/e2e/shipping/stream/ShippingKStreamConfiguration.kt @@ -17,7 +17,6 @@ package kafka.e2e.shipping.stream import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig -import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde import kafka.e2e.customer.Customer import kafka.e2e.order.OrderCreatedEvent @@ -40,7 +39,6 @@ import org.springframework.messaging.handler.annotation.SendTo @Configuration class ShippingKStreamConfiguration { - @StreamListener @SendTo("output") fun process(@Input("input") input: KStream, @Input("order") orderEvent: KStream): KStream { @@ -56,8 +54,6 @@ class ShippingKStreamConfiguration { val orderShippedSerde = SpecificAvroSerde() orderShippedSerde.configure(serdeConfig, false) - - val stateStore: Materialized> = Materialized.`as`>("customer-store") .withKeySerde(intSerde) diff --git a/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml b/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml index c75c840..7f0331a 100644 --- a/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml +++ b/kafka-e2e-kotlin-sample/shipping-service/src/main/resources/application.yml @@ -1,6 +1,3 @@ -spring.cloud.stream.bindings.input.consumer.useNativeDecoding: false -spring.cloud.stream.bindings.order.consumer.useNativeDecoding: false -spring.cloud.stream.bindings.output.producer.useNativeEncoding: false spring: application: name: shipping-service @@ -17,22 +14,34 @@ spring: default: key: serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde -# bindings: -# input: -# consumer: -# valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde + schema: + registry: + url: ${spring.cloud.stream.schema-registry-client.endpoint} + value: + subject: + name: + strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy + bindings: + input: + consumer: + valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde + order: + consumer: + valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde + output: + producer: + valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde bindings: input: destination: customer - contentType: application/*+avro order: destination: order - contentType: application/*+avro output: destination: order - contentType: application/*+avro - server: port: 8086 +logging: + level: + org.springframework.kafka.config: debug