KTable event type routing

Introduce event type based routing for KTable types. This is
already available for KStream types.
See this commit: 386a361a66

Adding a new DeserialiationExceptionHander to address event type routing
use case for GlobalKTable.
See this comment: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1003#issuecomment-799847819

Upgrade Spring Kafka to 2.6.6
Upgrade Spring Integration to 5.4.4
This commit is contained in:
Soby Chacko
2021-03-15 20:19:04 -04:00
parent 7cc001ac4c
commit 47e7ca07a8
6 changed files with 189 additions and 39 deletions

View File

@@ -12,8 +12,8 @@
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.6.5</spring-kafka.version>
<spring-integration-kafka.version>5.4.3</spring-integration-kafka.version>
<spring-kafka.version>2.6.6</spring-kafka.version>
<spring-integration-kafka.version>5.4.4</spring-integration-kafka.version>
<kafka.version>2.6.1</kafka.version>
<spring-cloud-schema-registry.version>1.1.2-SNAPSHOT</spring-cloud-schema-registry.version>
<spring-cloud-stream.version>3.1.2-SNAPSHOT</spring-cloud-stream.version>

View File

@@ -314,6 +314,11 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
streamConfiguration.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER,
applicationContext.getBean(SendToDlqAndContinue.class));
}
else if (deserializationExceptionHandler == DeserializationExceptionHandler.skipAndContinue) {
streamConfiguration.put(
StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
SkipAndContinueExceptionHandler.class);
}
KafkaStreamsConfiguration kafkaStreamsConfiguration = new KafkaStreamsConfiguration(streamConfiguration);
@@ -439,36 +444,7 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
AtomicBoolean matched = new AtomicBoolean();
// Processor to retrieve the header value.
stream.process(() -> new Processor() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, Object value) {
final Headers headers = this.context.headers();
final Iterable<Header> eventTypeHeader = headers.headers(kafkaStreamsConsumerProperties.getEventTypeHeaderKey());
if (eventTypeHeader != null && eventTypeHeader.iterator().hasNext()) {
String eventTypeFromHeader = new String(eventTypeHeader.iterator().next().value());
final String[] eventTypesFromBinding = StringUtils.commaDelimitedListToStringArray(kafkaStreamsConsumerProperties.getEventTypes());
for (String eventTypeFromBinding : eventTypesFromBinding) {
if (eventTypeFromHeader.equals(eventTypeFromBinding)) {
matched.set(true);
break;
}
}
}
}
@Override
public void close() {
}
});
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched));
// Branching based on event type match.
final KStream<?, ?>[] branch = stream.branch((key, value) -> matched.getAndSet(false));
// Deserialize if we have a branch from above.
@@ -554,12 +530,31 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
StreamsBuilder streamsBuilder, Serde<?> keySerde,
Serde<?> valueSerde, String materializedAs, String bindingDestination,
Topology.AutoOffsetReset autoOffsetReset) {
final Consumed<?, ?> consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerde, autoOffsetReset);
return materializedAs != null
final Serde<?> valueSerdeToUse = StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes()) ?
new Serdes.BytesSerde() : valueSerde;
final Consumed<?, ?> consumed = getConsumed(kafkaStreamsConsumerProperties, keySerde, valueSerdeToUse, autoOffsetReset);
final KTable<?, ?> kTable = materializedAs != null
? materializedAs(streamsBuilder, bindingDestination, materializedAs,
keySerde, valueSerde, autoOffsetReset, kafkaStreamsConsumerProperties)
keySerde, valueSerdeToUse, autoOffsetReset, kafkaStreamsConsumerProperties)
: streamsBuilder.table(bindingDestination,
consumed);
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
AtomicBoolean matched = new AtomicBoolean();
final KStream<?, ?> stream = kTable.toStream();
// Processor to retrieve the header value.
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched));
// Branching based on event type match.
final KStream<?, ?>[] branch = stream.branch((key, value) -> matched.getAndSet(false));
// Deserialize if we have a branch from above.
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(null, ((Bytes) value).get()));
return deserializedKStream.toTable();
}
return kTable;
}
private <K, V> Consumed<K, V> getConsumed(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
@@ -576,4 +571,37 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
}
return consumed;
}
private <K, V> Processor<K, V> eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, AtomicBoolean matched) {
return new Processor() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, Object value) {
final Headers headers = this.context.headers();
final Iterable<Header> eventTypeHeader = headers.headers(kafkaStreamsConsumerProperties.getEventTypeHeaderKey());
if (eventTypeHeader != null && eventTypeHeader.iterator().hasNext()) {
String eventTypeFromHeader = new String(eventTypeHeader.iterator().next().value());
final String[] eventTypesFromBinding = StringUtils.commaDelimitedListToStringArray(kafkaStreamsConsumerProperties.getEventTypes());
for (String eventTypeFromBinding : eventTypesFromBinding) {
if (eventTypeFromHeader.equals(eventTypeFromBinding)) {
matched.set(true);
break;
}
}
}
}
@Override
public void close() {
}
};
}
}

View File

@@ -38,6 +38,10 @@ public enum DeserializationExceptionHandler {
* Deserialization error handler with DLQ send.
* See {@link org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler}
*/
sendToDlq
sendToDlq,
/**
* Deserialization error handler that silently skips the error and continue.
*/
skipAndContinue;
}

View File

@@ -16,8 +16,6 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.List;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.springframework.cloud.stream.binder.AbstractBinder;
@@ -33,7 +31,6 @@ import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStr
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.StringUtils;

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
/**
*
* {@link DeserializationExceptionHandler} that allows to silently skip
* deserialization exceptions and continue processing.
*
* @author Soby Chakco
* @since 3.1.2
*/
public class SkipAndContinueExceptionHandler implements DeserializationExceptionHandler {
@Override
public DeserializationExceptionHandler.DeserializationHandlerResponse handle(final ProcessorContext context,
final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {
return DeserializationExceptionHandler.DeserializationHandlerResponse.CONTINUE;
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore
}
}

View File

@@ -20,6 +20,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
@@ -29,7 +31,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -48,6 +52,7 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.util.Assert;
import static org.assertj.core.api.Assertions.assertThat;
@@ -61,6 +66,8 @@ public class KafkaStreamsEventTypeRoutingTests {
private static Consumer<Integer, Foo> consumer;
private static CountDownLatch LATCH = new CountDownLatch(3);
@BeforeClass
public static void setUp() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("test-group-1", "false",
@@ -149,6 +156,61 @@ public class KafkaStreamsEventTypeRoutingTests {
}
}
@Test
public void testRoutingWorksBasedOnEventTypesConsumer() throws Exception {
SpringApplication app = new SpringApplication(EventTypeRoutingTestConfig.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run(
"--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.definition=consumer",
"--spring.cloud.stream.bindings.consumer-in-0.destination=foo-consumer-1",
"--spring.cloud.stream.kafka.streams.bindings.consumer-in-0.consumer.eventTypes=foo,bar",
"--spring.cloud.stream.kafka.streams.binder.functions.consumer.applicationId=consumer-id-foo-0",
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put("value.serializer", JsonSerializer.class);
DefaultKafkaProducerFactory<Integer, Foo> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, Foo> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foo-consumer-1");
Foo foo1 = new Foo();
foo1.setFoo("foo-1");
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("event_type", "foo".getBytes()));
final ProducerRecord<Integer, Foo> producerRecord1 = new ProducerRecord<>("foo-consumer-1", 0, 56, foo1, headers);
template.send(producerRecord1);
Foo foo2 = new Foo();
foo2.setFoo("foo-2");
final ProducerRecord<Integer, Foo> producerRecord2 = new ProducerRecord<>("foo-consumer-1", 0, 57, foo2);
template.send(producerRecord2);
Foo foo3 = new Foo();
foo3.setFoo("foo-3");
final ProducerRecord<Integer, Foo> producerRecord3 = new ProducerRecord<>("foo-consumer-1", 0, 58, foo3, headers);
template.send(producerRecord3);
Foo foo4 = new Foo();
foo4.setFoo("foo-4");
Headers headers1 = new RecordHeaders();
headers1.add(new RecordHeader("event_type", "bar".getBytes()));
final ProducerRecord<Integer, Foo> producerRecord4 = new ProducerRecord<>("foo-consumer-1", 0, 59, foo4, headers1);
template.send(producerRecord4);
Assert.isTrue(LATCH.await(10, TimeUnit.SECONDS), "Foo");
}
finally {
pf.destroy();
}
}
}
@EnableAutoConfiguration
public static class EventTypeRoutingTestConfig {
@@ -157,6 +219,19 @@ public class KafkaStreamsEventTypeRoutingTests {
return input -> input;
}
@Bean
public java.util.function.Consumer<KTable<Integer, Foo>> consumer() {
return ktable -> ktable.toStream().foreach((key, value) -> {
LATCH.countDown();
});
}
@Bean
public java.util.function.Consumer<GlobalKTable<Integer, Foo>> global() {
return ktable -> {
};
}
}
static class Foo {