From 648188fc6b1f3d3dd146d5567dd063570aff04f0 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 5 Jan 2022 19:30:34 -0500 Subject: [PATCH] Event type routing improvements (Kafka Streams) When routing by event types, the deserializer omits the topic and header information. Fixing this issue. Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1186 --- .../AbstractKafkaStreamsBinderProcessor.java | 44 +++++++++++++------ 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java index 21480f0f..9c787b58 100644 --- a/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java +++ b/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,9 @@ package org.springframework.cloud.stream.binder.kafka.streams; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import org.apache.commons.logging.Log; @@ -40,9 +42,10 @@ import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -449,12 +452,15 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application //See this issue for more context: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1003 if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) { AtomicBoolean matched = new AtomicBoolean(); + AtomicReference topicObject = new AtomicReference<>(); + AtomicReference headersObject = new AtomicReference<>(); // Processor to retrieve the header value. - stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched)); + stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched, topicObject, headersObject)); // Branching based on event type match. final KStream[] branch = stream.branch((key, value) -> matched.getAndSet(false)); // Deserialize if we have a branch from above. - final KStream deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(null, ((Bytes) value).get())); + final KStream deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize( + topicObject.get(), headersObject.get(), ((Bytes) value).get())); return getkStream(bindingProperties, deserializedKStream, nativeDecoding); } return getkStream(bindingProperties, stream, nativeDecoding); @@ -549,14 +555,18 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application consumed); if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) { AtomicBoolean matched = new AtomicBoolean(); + AtomicReference topicObject = new AtomicReference<>(); + AtomicReference headersObject = new AtomicReference<>(); + final KStream stream = kTable.toStream(); // Processor to retrieve the header value. - stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched)); + stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched, topicObject, headersObject)); // Branching based on event type match. final KStream[] branch = stream.branch((key, value) -> matched.getAndSet(false)); // Deserialize if we have a branch from above. - final KStream deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(null, ((Bytes) value).get())); + final KStream deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize( + topicObject.get(), headersObject.get(), ((Bytes) value).get())); return deserializedKStream.toTable(); } @@ -581,19 +591,27 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application return consumed; } - private Processor eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, AtomicBoolean matched) { - return new Processor() { + private Processor eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, + AtomicBoolean matched, AtomicReference topicObject, AtomicReference headersObject) { + return new Processor() { - ProcessorContext context; + org.apache.kafka.streams.processor.api.ProcessorContext context; @Override - public void init(ProcessorContext context) { + public void init(org.apache.kafka.streams.processor.api.ProcessorContext context) { + Processor.super.init(context); this.context = context; } @Override - public void process(Object key, Object value) { - final Headers headers = this.context.headers(); + public void process(Record record) { + final Headers headers = record.headers(); + headersObject.set(headers); + final Optional optional = this.context.recordMetadata(); + if (optional.isPresent()) { + final RecordMetadata recordMetadata = optional.get(); + topicObject.set(recordMetadata.topic()); + } final Iterable
eventTypeHeader = headers.headers(kafkaStreamsConsumerProperties.getEventTypeHeaderKey()); if (eventTypeHeader != null && eventTypeHeader.iterator().hasNext()) { String eventTypeFromHeader = new String(eventTypeHeader.iterator().next().value());