Event type routing improvements (Kafka Streams)

When routing by event types, the deserializer omits the
topic and header information. Fixing this issue.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1186
This commit is contained in:
Soby Chacko
2022-01-05 19:30:34 -05:00
parent 63b306d34c
commit 648188fc6b

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +19,9 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -40,9 +42,10 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -449,12 +452,15 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
//See this issue for more context: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1003
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
AtomicBoolean matched = new AtomicBoolean();
AtomicReference<String> topicObject = new AtomicReference<>();
AtomicReference<Headers> headersObject = new AtomicReference<>();
// Processor to retrieve the header value.
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched));
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched, topicObject, headersObject));
// Branching based on event type match.
final KStream<?, ?>[] branch = stream.branch((key, value) -> matched.getAndSet(false));
// Deserialize if we have a branch from above.
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(null, ((Bytes) value).get()));
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(
topicObject.get(), headersObject.get(), ((Bytes) value).get()));
return getkStream(bindingProperties, deserializedKStream, nativeDecoding);
}
return getkStream(bindingProperties, stream, nativeDecoding);
@@ -549,14 +555,18 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
consumed);
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
AtomicBoolean matched = new AtomicBoolean();
AtomicReference<String> topicObject = new AtomicReference<>();
AtomicReference<Headers> headersObject = new AtomicReference<>();
final KStream<?, ?> stream = kTable.toStream();
// Processor to retrieve the header value.
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched));
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched, topicObject, headersObject));
// Branching based on event type match.
final KStream<?, ?>[] branch = stream.branch((key, value) -> matched.getAndSet(false));
// Deserialize if we have a branch from above.
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(null, ((Bytes) value).get()));
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(
topicObject.get(), headersObject.get(), ((Bytes) value).get()));
return deserializedKStream.toTable();
}
@@ -581,19 +591,27 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
return consumed;
}
private <K, V> Processor<K, V> eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, AtomicBoolean matched) {
return new Processor() {
private <K, V> Processor<K, V, Void, Void> eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
AtomicBoolean matched, AtomicReference<String> topicObject, AtomicReference<Headers> headersObject) {
return new Processor<K, V, Void, Void>() {
ProcessorContext context;
org.apache.kafka.streams.processor.api.ProcessorContext<?, ?> context;
@Override
public void init(ProcessorContext context) {
public void init(org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> context) {
Processor.super.init(context);
this.context = context;
}
@Override
public void process(Object key, Object value) {
final Headers headers = this.context.headers();
public void process(Record<K, V> record) {
final Headers headers = record.headers();
headersObject.set(headers);
final Optional<RecordMetadata> optional = this.context.recordMetadata();
if (optional.isPresent()) {
final RecordMetadata recordMetadata = optional.get();
topicObject.set(recordMetadata.topic());
}
final Iterable<Header> eventTypeHeader = headers.headers(kafkaStreamsConsumerProperties.getEventTypeHeaderKey());
if (eventTypeHeader != null && eventTypeHeader.iterator().hasNext()) {
String eventTypeFromHeader = new String(eventTypeHeader.iterator().next().value());