Handling tombstones in kafka streams binder

Handle tombstones gracefully in the kafka streams binder
Modify tests to verify

Resolves spring-cloud/spring-cloud-stream-binder-kafka#294

* Addressing PR review comments

* Addressing PR review comments
This commit is contained in:
Soby Chacko
2018-09-05 12:17:54 -04:00
committed by Artem Bilan
parent ea288c62eb
commit e869516e3d
3 changed files with 73 additions and 44 deletions

View File

@@ -19,6 +19,8 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.Processor;
@@ -43,6 +45,8 @@ import org.springframework.util.StringUtils;
*/
public class KafkaStreamsMessageConversionDelegate {
private final static Log LOG = LogFactory.getLog(KafkaStreamsMessageConversionDelegate.class);
private static final ThreadLocal<KeyValue<Object, Object>> keyValueThreadLocal = new ThreadLocal<>();
private final CompositeMessageConverterFactory compositeMessageConverterFactory;
@@ -105,32 +109,34 @@ public class KafkaStreamsMessageConversionDelegate {
boolean isValidRecord = false;
try {
if (valueClass.isAssignableFrom(o2.getClass())) {
keyValueThreadLocal.set(new KeyValue<>(o, o2));
}
else if (o2 instanceof Message) {
if (valueClass.isAssignableFrom(((Message) o2).getPayload().getClass())) {
keyValueThreadLocal.set(new KeyValue<>(o, ((Message) o2).getPayload()));
//if the record is a tombstone, ignore and exit from processing further.
if (o2 != null) {
if (valueClass.isAssignableFrom(o2.getClass())) {
keyValueThreadLocal.set(new KeyValue<>(o, o2));
} else if (o2 instanceof Message) {
if (valueClass.isAssignableFrom(((Message) o2).getPayload().getClass())) {
keyValueThreadLocal.set(new KeyValue<>(o, ((Message) o2).getPayload()));
} else {
convertAndSetMessage(o, valueClass, messageConverter, (Message) o2);
}
} else if (o2 instanceof String || o2 instanceof byte[]) {
Message<?> message = MessageBuilder.withPayload(o2).build();
convertAndSetMessage(o, valueClass, messageConverter, message);
} else {
keyValueThreadLocal.set(new KeyValue<>(o, o2));
}
else {
convertAndSetMessage(o, valueClass, messageConverter, (Message) o2);
}
}
else if (o2 instanceof String || o2 instanceof byte[]) {
Message<?> message = MessageBuilder.withPayload(o2).build();
convertAndSetMessage(o, valueClass, messageConverter, message);
isValidRecord = true;
}
else {
keyValueThreadLocal.set(new KeyValue<>(o, o2));
LOG.info("Received a tombstone record. This will be skipped from further processing.");
}
isValidRecord = true;
}
catch (Exception ignored) {
//pass through
}
return isValidRecord;
},
//sedond filter that catches any messages for which an exception thrown in the first filter above.
//second filter that catches any messages for which an exception thrown in the first filter above.
(k, v) -> true
);
//process errors from the second filter in the branch above.
@@ -164,21 +170,21 @@ public class KafkaStreamsMessageConversionDelegate {
@Override
public void process(Object o, Object o2) {
if (kstreamBindingInformationCatalogue.isDlqEnabled(bindingTarget)) {
String destination = context.topic();
if (o2 instanceof Message) {
Message message = (Message) o2;
sendToDlqAndContinue.sendToDlq(destination, (byte[]) o, (byte[]) message.getPayload(), context.partition());
//Only continue if the record was not a tombstone.
if (o2 != null) {
if (kstreamBindingInformationCatalogue.isDlqEnabled(bindingTarget)) {
String destination = context.topic();
if (o2 instanceof Message) {
Message message = (Message) o2;
sendToDlqAndContinue.sendToDlq(destination, (byte[]) o, (byte[]) message.getPayload(), context.partition());
} else {
sendToDlqAndContinue.sendToDlq(destination, (byte[]) o, (byte[]) o2, context.partition());
}
} else if (kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndFail) {
throw new IllegalStateException("Inbound deserialization failed.");
} else if (kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndContinue) {
//quietly pass through. No action needed, this is similar to log and continue.
}
else {
sendToDlqAndContinue.sendToDlq(destination, (byte[]) o, (byte[]) o2, context.partition());
}
}
else if (kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndFail) {
throw new IllegalStateException("Inbound deserialization failed.");
}
else if (kstreamBinderConfigurationProperties.getSerdeError() == KafkaStreamsBinderConfigurationProperties.SerdeError.logAndContinue) {
//quietly pass through. No action needed, this is similar to log and continue.
}
}

View File

@@ -64,7 +64,6 @@ import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
@@ -363,11 +362,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
stream = stream.mapValues(value -> {
Object returnValue;
String contentType = bindingProperties.getContentType();
if (!StringUtils.isEmpty(contentType) && !nativeDecoding) {
Message<?> message = MessageBuilder.withPayload(value)
.setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
returnValue = message;
} else {
if (value != null && !StringUtils.isEmpty(contentType) && !nativeDecoding) {
returnValue = MessageBuilder.withPayload(value)
.setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
}
else {
returnValue = value;
}
return returnValue;

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.Map;
@@ -23,6 +24,7 @@ import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -116,6 +118,9 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
ReadOnlyWindowStore<Object, Object> store = kafkaStreams.store("foo-WordCounts", QueryableStoreTypes.windowStore());
assertThat(store).isNotNull();
sendTombStoneRecordsAndVerifyGracefulHandling();
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean, "cleanupConfig",
CleanupConfig.class);
assertThat(cleanup.cleanupOnStart()).isTrue();
@@ -129,11 +134,33 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
private void receiveAndValidate(ConfigurableApplicationContext context) throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts");
assertThat(cr.value().contains("\"word\":\"foobar\",\"count\":1")).isTrue();
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault("foobar");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts");
assertThat(cr.value().contains("\"word\":\"foobar\",\"count\":1")).isTrue();
}
finally {
pf.destroy();
}
}
private void sendTombStoneRecordsAndVerifyGracefulHandling() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("words");
template.sendDefault(null);
ConsumerRecords<String, String> received = consumer.poll(Duration.ofMillis(5000));
//By asserting that the received record is empty, we are ensuring that the tombstone record
//was handled by the binder gracefully.
assertThat(received.isEmpty()).isTrue();
}
finally {
pf.destroy();
}
}
@EnableBinding(KafkaStreamsProcessor.class)
@@ -148,9 +175,6 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
@SendTo("output")
public KStream<?, WordCount> process(@Input("input") KStream<Object, String> input) {
input.map((k,v) -> {
return new KeyValue<>(k,v);
});
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))