removed unnecessary props - native serialization/deserialization

This commit is contained in:
Jose Antonio Iñigo Garrido
2019-06-21 23:51:20 +02:00
committed by Soby Chacko
parent 5fd4b3e7e2
commit ff95320be1
5 changed files with 53 additions and 32 deletions

View File

@@ -69,12 +69,11 @@ Customer topic kafkacat output:
```
Key (4 bytes):
Value (21 bytes): John DoeElm Street
Timestamp: 1560236330571
Value (26 bytes): John DoeElm Street
Timestamp: 1561153134430
Partition: 0
Offset: 0
Headers: contentType="application/vnd.customer.v1+avro",spring_json_header_types={"contentType":"java.lang.String"}
% Reached end of topic customer [0] at offset 1
Headers:
```
@@ -82,18 +81,17 @@ Order topic kafkacat output:
```
Key (4 bytes):
Value (4 bytes): ?
Timestamp: 1560236355714
Value (9 bytes): ?
Timestamp: 1561153313512
Partition: 0
Offset: 0
Headers: contentType="application/vnd.ordercreatedevent.v1+avro",spring_json_header_types={"contentType":"java.lang.String"}
Headers:
Key (4 bytes):
Value (23 bytes): ?John DoeElm Street
Timestamp: 1560236355714
Value (28 bytes): ?John DoeElm Street
Timestamp: 1561153313512
Partition: 0
Offset: 1
Headers: spring_json_header_types={"contentType":"java.lang.String"},contentType="application/vnd.ordershippedevent.v1+avro"
% Reached end of topic order [0] at offset 2
Headers:
```

View File

@@ -3,6 +3,8 @@ spring:
name: customer-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
bindings:
output:
@@ -10,12 +12,19 @@ spring:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
bindings:
output:
destination: customer
contentType: application/*+avro
schema-registry-client:
endpoint: http://localhost:8081
producer:
useNativeEncoding: true
server:
port: 8084

View File

@@ -3,6 +3,8 @@ spring:
name: order-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
bindings:
output:
@@ -10,12 +12,19 @@ spring:
configuration:
key:
serializer: org.apache.kafka.common.serialization.IntegerSerializer
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
bindings:
output:
destination: order
contentType: application/*+avro
schema-registry-client:
endpoint: http://localhost:8081
producer:
useNativeEncoding: true
server:
port: 8085

View File

@@ -17,7 +17,6 @@
package kafka.e2e.shipping.stream
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
import kafka.e2e.customer.Customer
import kafka.e2e.order.OrderCreatedEvent
@@ -40,7 +39,6 @@ import org.springframework.messaging.handler.annotation.SendTo
@Configuration
class ShippingKStreamConfiguration {
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, GenericRecord>): KStream<Int, OrderShippedEvent> {
@@ -56,8 +54,6 @@ class ShippingKStreamConfiguration {
val orderShippedSerde = SpecificAvroSerde<OrderShippedEvent>()
orderShippedSerde.configure(serdeConfig, false)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)

View File

@@ -1,6 +1,3 @@
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: false
spring.cloud.stream.bindings.order.consumer.useNativeDecoding: false
spring.cloud.stream.bindings.output.producer.useNativeEncoding: false
spring:
application:
name: shipping-service
@@ -17,22 +14,34 @@ spring:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
# bindings:
# input:
# consumer:
# valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
input:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
order:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
output:
producer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
input:
destination: customer
contentType: application/*+avro
order:
destination: order
contentType: application/*+avro
output:
destination: order
contentType: application/*+avro
server:
port: 8086
logging:
level:
org.springframework.kafka.config: debug