Kafka Streams binder tests cleanup
This commit is contained in:
@@ -16,6 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.function;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
@@ -74,7 +75,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
|
||||
"counts", "counts-1", "counts-2");
|
||||
"counts", "counts-1", "counts-2", "counts-5", "counts-6");
|
||||
|
||||
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
|
||||
|
||||
@@ -90,7 +91,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
|
||||
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
|
||||
consumer = cf.createConsumer();
|
||||
embeddedKafka.consumeFromEmbeddedTopics(consumer, "counts", "counts-1", "counts-2");
|
||||
embeddedKafka.consumeFromEmbeddedTopics(consumer, "counts", "counts-1", "counts-2", "counts-5", "counts-6");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@@ -177,22 +178,23 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKstreamWordCountFunctionWithGeneratedApplicationId() throws Exception {
|
||||
public void testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer() {
|
||||
SpringApplication app = new SpringApplication(WordCountProcessorApplication.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run(
|
||||
"--server.port=0",
|
||||
try (ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.process-in-0.destination=words-1",
|
||||
"--spring.cloud.stream.bindings.process-out-0.destination=counts-1",
|
||||
"--spring.cloud.stream.bindings.process-in-0.destination=words-5",
|
||||
"--spring.cloud.stream.bindings.process-out-0.destination=counts-5",
|
||||
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde" +
|
||||
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde" +
|
||||
"=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString())) {
|
||||
receiveAndValidate("words-1", "counts-1");
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString())) {
|
||||
receiveAndValidate("words-5", "counts-5");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -204,6 +206,7 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
try (ConfigurableApplicationContext context = app.run(
|
||||
"--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.kafka.streams.binder.application-id=testKstreamWordCountFunctionWithCustomProducerStreamPartitioner",
|
||||
"--spring.cloud.stream.bindings.process-in-0.destination=words-2",
|
||||
"--spring.cloud.stream.bindings.process-out-0.destination=counts-2",
|
||||
"--spring.cloud.stream.bindings.process-out-0.producer.partitionCount=2",
|
||||
@@ -280,6 +283,45 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
}
|
||||
}
|
||||
|
||||
// The following test verifies the fixes made for this issue:
|
||||
// https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/774
|
||||
@Test
|
||||
public void testOutboundNullValueIsHandledGracefully()
|
||||
throws Exception {
|
||||
SpringApplication app = new SpringApplication(OutboundNullApplication.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.process-in-0.destination=words-6",
|
||||
"--spring.cloud.stream.bindings.process-out-0.destination=counts-6",
|
||||
"--spring.cloud.stream.bindings.process-out-0.producer.useNativeEncoding=false",
|
||||
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=testOutboundNullValueIsHandledGracefully",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString())) {
|
||||
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
|
||||
senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("words-6");
|
||||
template.sendDefault("foobar");
|
||||
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer,
|
||||
"counts-6");
|
||||
assertThat(cr.value() == null).isTrue();
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void receiveAndValidate(String in, String out) {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
@@ -387,4 +429,20 @@ public class KafkaStreamsBinderWordCountFunctionTests {
|
||||
return (t, k, v, n) -> k.equals("foo") ? 0 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
static class OutboundNullApplication {
|
||||
|
||||
@Bean
|
||||
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
|
||||
return input -> input
|
||||
.flatMapValues(
|
||||
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
||||
.map((key, value) -> new KeyValue<>(value, value))
|
||||
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
|
||||
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foobar-WordCounts"))
|
||||
.toStream()
|
||||
.map((key, value) -> new KeyValue<>(null, null));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2017-2018 the original author or authors.
|
||||
* Copyright 2017-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -21,6 +21,7 @@ import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
@@ -42,10 +43,6 @@ import org.junit.Test;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.integration.test.util.TestUtils;
|
||||
@@ -57,7 +54,6 @@ import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -66,11 +62,11 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
* @author Soby Chacko
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
public class KafkaStreamsBinderTombstoneTests {
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
|
||||
"counts", "counts-1");
|
||||
"counts-1");
|
||||
|
||||
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
|
||||
.getEmbeddedKafka();
|
||||
@@ -85,7 +81,7 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
|
||||
consumerProps);
|
||||
consumer = cf.createConsumer();
|
||||
embeddedKafka.consumeFromEmbeddedTopics(consumer, "counts", "counts-1");
|
||||
embeddedKafka.consumeFromEmbeddedTopics(consumer, "counts-1");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@@ -93,31 +89,6 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer()
|
||||
throws Exception {
|
||||
SpringApplication app = new SpringApplication(
|
||||
WordCountProcessorApplication.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=words",
|
||||
"--spring.cloud.stream.bindings.output.destination=counts",
|
||||
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=testKstreamWordCountWithApplicationIdSpecifiedAtDefaultConsumer",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
|
||||
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
|
||||
"--spring.cloud.stream.kafka.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString())) {
|
||||
receiveAndValidate("words", "counts");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendToTombstone()
|
||||
throws Exception {
|
||||
@@ -127,24 +98,22 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=words-1",
|
||||
"--spring.cloud.stream.bindings.output.destination=counts-1",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=testKstreamWordCountWithInputBindingLevelApplicationId",
|
||||
"--spring.cloud.stream.bindings.process-in-0.destination=words-1",
|
||||
"--spring.cloud.stream.bindings.process-out-0.destination=counts-1",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.application-id=testKstreamWordCountWithInputBindingLevelApplicationId",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde=org.springframework.kafka.support.serializer.JsonSerde",
|
||||
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
|
||||
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
|
||||
"--spring.cloud.stream.bindings.input.consumer.concurrency=2",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=org.springframework.kafka.support.serializer.JsonSerde",
|
||||
"--spring.cloud.stream.bindings.process-in-0.consumer.concurrency=2",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString())) {
|
||||
receiveAndValidate("words-1", "counts-1");
|
||||
// Assertions on StreamBuilderFactoryBean
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
|
||||
.getBean("&stream-builder-WordCountProcessorApplication-process", StreamsBuilderFactoryBean.class);
|
||||
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
||||
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
assertThat(kafkaStreams).isNotNull();
|
||||
// Ensure that concurrency settings are mapped to number of stream task
|
||||
@@ -200,26 +169,21 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
}
|
||||
}
|
||||
|
||||
@EnableBinding(KafkaStreamsProcessor.class)
|
||||
@EnableAutoConfiguration
|
||||
static class WordCountProcessorApplication {
|
||||
|
||||
@StreamListener
|
||||
@SendTo("output")
|
||||
public KStream<?, WordCount> process(
|
||||
@Input("input") KStream<Object, String> input) {
|
||||
@Bean
|
||||
public Function<KStream<Object, String>, KStream<String, WordCount>> process() {
|
||||
|
||||
return input
|
||||
.flatMapValues(
|
||||
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
||||
return input -> input
|
||||
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
||||
.map((key, value) -> new KeyValue<>(value, value))
|
||||
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
|
||||
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts"))
|
||||
.windowedBy(TimeWindows.of(5000))
|
||||
.count(Materialized.as("foo-WordCounts"))
|
||||
.toStream()
|
||||
.map((key, value) -> new KeyValue<>(null,
|
||||
new WordCount(key.key(), value,
|
||||
new Date(key.window().start()),
|
||||
new Date(key.window().end()))));
|
||||
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
|
||||
new Date(key.window().start()), new Date(key.window().end()))));
|
||||
}
|
||||
|
||||
@Bean
|
||||
@@ -1,146 +0,0 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Serialized;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.test.EmbeddedKafkaBroker;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class OutboundValueNullSkippedConversionTest {
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
|
||||
"counts");
|
||||
|
||||
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
|
||||
.getEmbeddedKafka();
|
||||
|
||||
private static Consumer<String, String> consumer;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() {
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false",
|
||||
embeddedKafka);
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
|
||||
consumerProps);
|
||||
consumer = cf.createConsumer();
|
||||
embeddedKafka.consumeFromEmbeddedTopics(consumer, "counts");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
consumer.close();
|
||||
}
|
||||
|
||||
// The following test verifies the fixes made for this issue:
|
||||
// https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/774
|
||||
@Test
|
||||
public void testOutboundNullValueIsHandledGracefully()
|
||||
throws Exception {
|
||||
SpringApplication app = new SpringApplication(
|
||||
OutboundNullApplication.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
|
||||
try (ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=words",
|
||||
"--spring.cloud.stream.bindings.output.destination=counts",
|
||||
"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=false",
|
||||
"--spring.cloud.stream.kafka.streams.default.consumer.application-id=testOutboundNullValueIsHandledGracefully",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.timeWindow.length=5000",
|
||||
"--spring.cloud.stream.kafka.streams.timeWindow.advanceBy=0",
|
||||
"--spring.cloud.stream.kafka.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString())) {
|
||||
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
|
||||
senderProps);
|
||||
try {
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
template.setDefaultTopic("words");
|
||||
template.sendDefault("foobar");
|
||||
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer,
|
||||
"counts");
|
||||
assertThat(cr.value() == null).isTrue();
|
||||
}
|
||||
finally {
|
||||
pf.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@EnableBinding(KafkaStreamsProcessor.class)
|
||||
@EnableAutoConfiguration
|
||||
static class OutboundNullApplication {
|
||||
|
||||
@StreamListener
|
||||
@SendTo("output")
|
||||
public KStream<?, KafkaStreamsBinderWordCountIntegrationTests.WordCount> process(
|
||||
@Input("input") KStream<Object, String> input) {
|
||||
|
||||
return input
|
||||
.flatMapValues(
|
||||
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
|
||||
.map((key, value) -> new KeyValue<>(value, value))
|
||||
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
|
||||
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts"))
|
||||
.toStream()
|
||||
.map((key, value) -> new KeyValue<>(null, null));
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user