4.0.x changes for Kafka Streams tests

Migrating StreamListener based Kafka Streams binder tests to
use the funcitonal model
This commit is contained in:
Soby Chacko
2021-11-08 19:21:49 -05:00
parent d37cbc79d7
commit c32be995f6
13 changed files with 85 additions and 1303 deletions

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -38,7 +39,6 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
@@ -46,9 +46,6 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
@@ -61,7 +58,6 @@ import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.internal.verification.VerificationModeFactory.times;
@@ -125,12 +121,13 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
}
@Test
@Ignore
public void testKstreamBinderWithPojoInputAndStringOuput() throws Exception {
public void testKstreamBinderWithPojoInputAndStringOuput() {
SpringApplication app = new SpringApplication(ProductCountApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.bindings.process-in-0=input",
"--spring.cloud.stream.function.bindings.process-out-0=output",
"--spring.cloud.stream.bindings.input.destination=foos",
"--spring.cloud.stream.bindings.output.destination=counts-id",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
@@ -203,15 +200,13 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
public static class ProductCountApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, String> process(KStream<Object, Product> input) {
@Bean
public Function<KStream<Object, Product>, KStream<?, String>> process() {
return input.filter((key, product) -> product.getId() == 123)
return input -> input.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey(Grouped.with(new Serdes.IntegerSerde(),
new JsonSerde<>(Product.class)))

View File

@@ -1,271 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.utils.DlqPartitionFunction;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
/**
* @author Soby Chacko
*/
@RunWith(SpringRunner.class)
@ContextConfiguration
@DirtiesContext
public abstract class DeserializationErrorHandlerByKafkaTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
"abc-DeserializationErrorHandlerByKafkaTests-In",
"xyz-DeserializationErrorHandlerByKafkaTests-In",
"DeserializationErrorHandlerByKafkaTests-out",
"error.abc-DeserializationErrorHandlerByKafkaTests-In.group",
"error.xyz-DeserializationErrorHandlerByKafkaTests-In.group",
"error.word1.groupx",
"error.word2.groupx");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
.getEmbeddedKafka();
@SpyBean
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate conversionDelegate;
private static Consumer<String, String> consumer;
@BeforeClass
public static void setUp() {
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers",
embeddedKafka.getBrokersAsString());
System.setProperty("server.port", "0");
System.setProperty("spring.jmx.enabled", "false");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("fooc", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer, "DeserializationErrorHandlerByKafkaTests-out", "DeserializationErrorHandlerByKafkaTests-out");
}
@AfterClass
public static void tearDown() {
consumer.close();
System.clearProperty("spring.cloud.stream.kafka.streams.binder.brokers");
System.clearProperty("server.port");
System.clearProperty("spring.jmx.enabled");
}
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.destination=abc-DeserializationErrorHandlerByKafkaTests-In",
"spring.cloud.stream.bindings.output.destination=DeserializationErrorHandlerByKafkaTests-Out",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=deser-kafka-dlq",
"spring.cloud.stream.bindings.input.group=group",
"spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=sendToDlq",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde="
+ "org.apache.kafka.common.serialization.Serdes$IntegerSerde" }, webEnvironment = SpringBootTest.WebEnvironment.NONE)
public static class DeserializationByKafkaAndDlqTests
extends DeserializationErrorHandlerByKafkaTests {
@Test
@Ignore
public void test() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("abc-DeserializationErrorHandlerByKafkaTests-In");
template.sendDefault(1, null, "foobar");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("foobar",
"false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
Consumer<String, String> consumer1 = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer1, "error.abc-DeserializationErrorHandlerByKafkaTests-In.group");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer1,
"error.abc-DeserializationErrorHandlerByKafkaTests-In.group");
assertThat(cr.value()).isEqualTo("foobar");
assertThat(cr.partition()).isEqualTo(0); // custom partition function
// Ensuring that the deserialization was indeed done by Kafka natively
verify(conversionDelegate, never()).deserializeOnInbound(any(Class.class),
any(KStream.class));
verify(conversionDelegate, never()).serializeOnOutbound(any(KStream.class));
}
}
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.destination=xyz-DeserializationErrorHandlerByKafkaTests-In",
"spring.cloud.stream.bindings.output.destination=DeserializationErrorHandlerByKafkaTests-Out",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=deser-kafka-dlq",
"spring.cloud.stream.bindings.input.group=group",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.deserializationExceptionHandler=sendToDlq",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde="
+ "org.apache.kafka.common.serialization.Serdes$IntegerSerde" }, webEnvironment = SpringBootTest.WebEnvironment.NONE)
public static class DeserializationByKafkaAndDlqPerBindingTests
extends DeserializationErrorHandlerByKafkaTests {
@Test
public void test() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("xyz-DeserializationErrorHandlerByKafkaTests-In");
template.sendDefault(1, null, "foobar");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("foobar",
"false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
Consumer<String, String> consumer1 = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer1, "error.xyz-DeserializationErrorHandlerByKafkaTests-In.group");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer1,
"error.xyz-DeserializationErrorHandlerByKafkaTests-In.group");
assertThat(cr.value()).isEqualTo("foobar");
assertThat(cr.partition()).isEqualTo(0); // custom partition function
// Ensuring that the deserialization was indeed done by Kafka natively
verify(conversionDelegate, never()).deserializeOnInbound(any(Class.class),
any(KStream.class));
verify(conversionDelegate, never()).serializeOnOutbound(any(KStream.class));
}
}
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.destination=word1,word2",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=deser-kafka-dlq-multi-input",
"spring.cloud.stream.bindings.input.group=groupx",
"spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde="
+ "org.apache.kafka.common.serialization.Serdes$IntegerSerde" }, webEnvironment = SpringBootTest.WebEnvironment.NONE)
// @checkstyle:on
public static class DeserializationByKafkaAndDlqTestsWithMultipleInputs
extends DeserializationErrorHandlerByKafkaTests {
@Test
public void test() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("word1");
template.sendDefault("foobar");
template.setDefaultTopic("word2");
template.sendDefault("foobar");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("foobarx",
"false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
Consumer<String, String> consumer1 = cf.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer1, "error.word1.groupx",
"error.word2.groupx");
ConsumerRecord<String, String> cr1 = KafkaTestUtils.getSingleRecord(consumer1,
"error.word1.groupx");
assertThat(cr1.value()).isEqualTo("foobar");
ConsumerRecord<String, String> cr2 = KafkaTestUtils.getSingleRecord(consumer1,
"error.word2.groupx");
assertThat(cr2.value()).isEqualTo("foobar");
// Ensuring that the deserialization was indeed done by Kafka natively
verify(conversionDelegate, never()).deserializeOnInbound(any(Class.class),
any(KStream.class));
verify(conversionDelegate, never()).serializeOnOutbound(any(KStream.class));
}
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
@PropertySource("classpath:/org/springframework/cloud/stream/binder/kstream/integTest-1.properties")
public static class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, String> process(KStream<Object, String> input) {
return input
.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofMillis(5000))).count(Materialized.as("foo-WordCounts-x"))
.toStream().map((key, value) -> new KeyValue<>(null,
"Count for " + key.key() + " : " + value));
}
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, rec, ex) -> 0;
}
}
}

View File

@@ -1,286 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
/**
* @author Soby Chacko
*/
@RunWith(SpringRunner.class)
@ContextConfiguration
@DirtiesContext
public abstract class DeserializtionErrorHandlerByBinderTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
"foos", "goos",
"counts-id", "error.foos.foobar-group", "error.goos.foobar-group", "error.foos1.fooz-group",
"error.foos2.fooz-group");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
.getEmbeddedKafka();
@SpyBean
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate conversionDelegate;
private static Consumer<Integer, String> consumer;
@BeforeClass
public static void setUp() throws Exception {
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers",
embeddedKafka.getBrokersAsString());
System.setProperty("server.port", "0");
System.setProperty("spring.jmx.enabled", "false");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("kafka-streams-dlq-tests", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts-id");
}
@AfterClass
public static void tearDown() {
consumer.close();
System.clearProperty("spring.cloud.stream.kafka.streams.binder.brokers");
System.clearProperty("server.port");
System.clearProperty("spring.jmx.enabled");
}
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.consumer.useNativeDecoding=false",
"spring.cloud.stream.bindings.output.producer.useNativeEncoding=false",
"spring.cloud.stream.bindings.input.destination=foos",
"spring.cloud.stream.bindings.output.destination=counts-id",
"spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
+ "=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler=sendToDlq",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id"
+ "=deserializationByBinderAndDlqTests",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqPartitions=1",
"spring.cloud.stream.bindings.input.group=foobar-group" }, webEnvironment = SpringBootTest.WebEnvironment.NONE)
public static class DeserializationByBinderAndDlqTests
extends DeserializtionErrorHandlerByBinderTests {
@Test
@Ignore
public void test() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foos");
template.sendDefault(1, 7, "hello");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("foobar",
"false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
Consumer<String, String> consumer1 = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer1,
"error.foos.foobar-group");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer1,
"error.foos.foobar-group");
assertThat(cr.value()).isEqualTo("hello");
assertThat(cr.partition()).isEqualTo(0);
// Ensuring that the deserialization was indeed done by the binder
verify(conversionDelegate).deserializeOnInbound(any(Class.class),
any(KStream.class));
}
}
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.consumer.useNativeDecoding=false",
"spring.cloud.stream.bindings.output.producer.useNativeEncoding=false",
"spring.cloud.stream.bindings.input.destination=goos",
"spring.cloud.stream.bindings.output.destination=counts-id",
"spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
+ "=org.apache.kafka.common.serialization.Serdes$IntegerSerde",
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.deserializationExceptionHandler=sendToDlq",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id"
+ "=deserializationByBinderAndDlqTests",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqPartitions=1",
"spring.cloud.stream.bindings.input.group=foobar-group" }, webEnvironment = SpringBootTest.WebEnvironment.NONE)
public static class DeserializationByBinderAndDlqSetOnConsumerBindingTests
extends DeserializtionErrorHandlerByBinderTests {
@Test
public void test() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("goos");
template.sendDefault(1, 7, "hello");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("foobar",
"false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
Consumer<String, String> consumer1 = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer1,
"error.goos.foobar-group");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer1,
"error.goos.foobar-group");
assertThat(cr.value()).isEqualTo("hello");
assertThat(cr.partition()).isEqualTo(0);
// Ensuring that the deserialization was indeed done by the binder
verify(conversionDelegate).deserializeOnInbound(any(Class.class),
any(KStream.class));
}
}
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.consumer.useNativeDecoding=false",
"spring.cloud.stream.bindings.output.producer.useNativeEncoding=false",
"spring.cloud.stream.bindings.input.destination=foos1,foos2",
"spring.cloud.stream.bindings.output.destination=counts-id",
"spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"spring.cloud.stream.kafka.streams.binder.serdeError=sendToDlq",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id"
+ "=deserializationByBinderAndDlqTestsWithMultipleInputs",
"spring.cloud.stream.bindings.input.group=fooz-group" }, webEnvironment = SpringBootTest.WebEnvironment.NONE)
public static class DeserializationByBinderAndDlqTestsWithMultipleInputs
extends DeserializtionErrorHandlerByBinderTests {
@Test
@SuppressWarnings("unchecked")
public void test() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foos1");
template.sendDefault("hello");
template.setDefaultTopic("foos2");
template.sendDefault("hello");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("foobar1",
"false", embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
Consumer<String, String> consumer1 = cf.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer1, "error.foos1.fooz-group",
"error.foos2.fooz-group");
ConsumerRecord<String, String> cr1 = KafkaTestUtils.getSingleRecord(consumer1,
"error.foos1.fooz-group");
assertThat(cr1.value().equals("hello")).isTrue();
ConsumerRecord<String, String> cr2 = KafkaTestUtils.getSingleRecord(consumer1,
"error.foos2.fooz-group");
assertThat(cr2.value().equals("hello")).isTrue();
// Ensuring that the deserialization was indeed done by the binder
verify(conversionDelegate).deserializeOnInbound(any(Class.class),
any(KStream.class));
}
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
public static class ProductCountApplication {
@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
return input.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(new JsonSerde<>(Product.class),
new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
.count(Materialized.as("id-count-store-x")).toStream()
.map((key, value) -> new KeyValue<>(key.key().id, value));
}
}
static class Product {
Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
}

View File

@@ -20,6 +20,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -39,12 +40,7 @@ import org.springframework.boot.actuate.health.CompositeHealthContributor;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.KafkaStreamsCustomizer;
@@ -56,7 +52,6 @@ import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@@ -207,6 +202,8 @@ public class KafkaStreamsBinderHealthIndicatorTests {
SpringApplication app = new SpringApplication(KStreamApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
return app.run("--server.port=0", "--spring.jmx.enabled=false",
"--spring.cloud.stream.function.bindings.process-in-0=input",
"--spring.cloud.stream.function.bindings.process-out-0=output",
"--spring.cloud.stream.bindings.input.destination=in",
"--spring.cloud.stream.bindings.output.destination=out",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
@@ -225,6 +222,11 @@ public class KafkaStreamsBinderHealthIndicatorTests {
SpringApplication app = new SpringApplication(AnotherKStreamApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
return app.run("--server.port=0", "--spring.jmx.enabled=false",
"--spring.cloud.function.definition=process;process2",
"--spring.cloud.stream.function.bindings.process-in-0=input",
"--spring.cloud.stream.function.bindings.process-out-0=output",
"--spring.cloud.stream.function.bindings.process2-in-0=input2",
"--spring.cloud.stream.function.bindings.process2-out-0=output2",
"--spring.cloud.stream.bindings.input.destination=in",
"--spring.cloud.stream.bindings.output.destination=out",
"--spring.cloud.stream.bindings.input2.destination=in2",
@@ -242,14 +244,12 @@ public class KafkaStreamsBinderHealthIndicatorTests {
+ embeddedKafka.getBrokersAsString());
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
public static class KStreamApplication {
@StreamListener("input")
@SendTo("output")
public KStream<Object, Product> process(KStream<Object, Product> input) {
return input.filter((key, product) -> {
@Bean
public Function<KStream<Object, Product>, KStream<Object, Product>> process() {
return input -> input.filter((key, product) -> {
if (product.getId() != 123) {
throw new IllegalArgumentException();
}
@@ -259,14 +259,12 @@ public class KafkaStreamsBinderHealthIndicatorTests {
}
@EnableBinding({ KafkaStreamsProcessor.class, KafkaStreamsProcessorX.class })
@EnableAutoConfiguration
public static class AnotherKStreamApplication {
@StreamListener("input")
@SendTo("output")
public KStream<Object, Product> process(KStream<Object, Product> input) {
return input.filter((key, product) -> {
@Bean
public Function<KStream<Object, Product>, KStream<Object, Product>> process() {
return input -> input.filter((key, product) -> {
if (product.getId() != 123) {
throw new IllegalArgumentException();
}
@@ -274,10 +272,9 @@ public class KafkaStreamsBinderHealthIndicatorTests {
});
}
@StreamListener("input2")
@SendTo("output2")
public KStream<Object, Product> process2(KStream<Object, Product> input) {
return input.filter((key, product) -> {
@Bean
public Function<KStream<Object, Product>, KStream<Object, Product>> process2() {
return input -> input.filter((key, product) -> {
if (product.getId() != 123) {
throw new IllegalArgumentException();
}
@@ -300,16 +297,6 @@ public class KafkaStreamsBinderHealthIndicatorTests {
}
public interface KafkaStreamsProcessorX {
@Input("input2")
KStream<?, ?> input();
@Output("output2")
KStream<?, ?> output();
}
public static class Product {
Integer id;
@@ -323,5 +310,4 @@ public class KafkaStreamsBinderHealthIndicatorTests {
}
}
}

View File

@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -37,10 +38,6 @@ import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.CleanupConfig;
@@ -50,7 +47,6 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import static org.assertj.core.api.Assertions.assertThat;
@@ -100,6 +96,8 @@ public class KafkaStreamsBinderMultipleInputTopicsTest {
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.bindings.process-in-0=input",
"--spring.cloud.stream.function.bindings.process-out-0=output",
"--spring.cloud.stream.bindings.input.destination=words1,words2",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.bindings.output.contentType=application/json",
@@ -146,21 +144,13 @@ public class KafkaStreamsBinderMultipleInputTopicsTest {
assertThat(wordCounts.contains("{\"word\":\"foobar2\",\"count\":1}")).isTrue();
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
static class WordCountProcessorApplication {
@StreamListener
@SendTo("output")
public KStream<?, WordCount> process(
@Input("input") KStream<Object, String> input) {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
input.map((k, v) -> {
System.out.println(k);
System.out.println(v);
return new KeyValue<>(k, v);
});
return input
return input -> input
.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -36,10 +37,8 @@ import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
@@ -47,7 +46,6 @@ import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import static org.assertj.core.api.Assertions.assertThat;
@@ -89,6 +87,8 @@ public class KafkaStreamsBinderPojoInputAndPrimitiveTypeOutputTests {
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.bindings.process-in-0=input",
"--spring.cloud.stream.function.bindings.process-out-0=output",
"--spring.cloud.stream.bindings.input.destination=foos",
"--spring.cloud.stream.bindings.output.destination=counts-id",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
@@ -122,24 +122,19 @@ public class KafkaStreamsBinderPojoInputAndPrimitiveTypeOutputTests {
assertThat(cr.value()).isEqualTo(1L);
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
public static class ProductCountApplication {
@StreamListener("input")
@SendTo("output")
public KStream<Integer, Long> process(KStream<Object, Product> input) {
return input.filter((key, product) -> product.getId() == 123)
@Bean
public Function<KStream<Object, Product>, KStream<Integer, Long>> process() {
return input -> input.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(new JsonSerde<>(Product.class),
new JsonSerde<>(Product.class)))
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
.count(Materialized.as("id-count-store-x")).toStream()
.map((key, value) -> {
return new KeyValue<>(key.key().id, value);
});
.map((key, value) -> new KeyValue<>(key.key().id, value));
}
}
public static class Product {

View File

@@ -1,186 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.SpyBean;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.util.StopWatch;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
/**
* @author Soby Chacko
*/
@RunWith(SpringRunner.class)
@ContextConfiguration
@DirtiesContext
public abstract class KafkaStreamsNativeEncodingDecodingTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
"decode-counts", "decode-counts-1");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
.getEmbeddedKafka();
@SpyBean
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate conversionDelegate;
private static Consumer<String, String> consumer;
@BeforeClass
public static void setUp() {
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers",
embeddedKafka.getBrokersAsString());
System.setProperty("server.port", "0");
System.setProperty("spring.jmx.enabled", "false");
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group", "false",
embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer, "decode-counts", "decode-counts-1");
}
@AfterClass
public static void tearDown() {
consumer.close();
System.clearProperty("spring.cloud.stream.kafka.streams.binder.brokers");
System.clearProperty("server.port");
System.clearProperty("spring.jmx.enabled");
}
@SpringBootTest(properties = {
"spring.cloud.stream.bindings.input.destination=decode-words-1",
"spring.cloud.stream.bindings.output.destination=decode-counts-1",
"spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
+ "=NativeEncodingDecodingEnabledTests-abc" }, webEnvironment = SpringBootTest.WebEnvironment.NONE)
public static class NativeEncodingDecodingEnabledTests
extends KafkaStreamsNativeEncodingDecodingTests {
@Test
public void test() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("decode-words-1");
template.sendDefault("foobar");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer,
"decode-counts-1");
assertThat(cr.value().equals("Count for foobar : 1")).isTrue();
verify(conversionDelegate, never()).serializeOnOutbound(any(KStream.class));
verify(conversionDelegate, never()).deserializeOnInbound(any(Class.class),
any(KStream.class));
}
}
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.cloud.stream.bindings.input.destination=decode-words",
"spring.cloud.stream.bindings.output.destination=decode-counts",
"spring.cloud.stream.bindings.input.consumer.useNativeDecoding=false",
"spring.cloud.stream.bindings.output.producer.useNativeEncoding=false",
"spring.cloud.stream.kafka.streams.bindings.input3.consumer.applicationId"
+ "=hello-NativeEncodingDecodingEnabledTests-xyz" })
public static class NativeEncodingDecodingDisabledTests
extends KafkaStreamsNativeEncodingDecodingTests {
@Test
public void test() {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(
senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("decode-words");
template.sendDefault("foobar");
StopWatch stopWatch = new StopWatch();
stopWatch.start();
System.out.println("Starting: ");
ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer,
"decode-counts");
stopWatch.stop();
System.out.println("Total time: " + stopWatch.getTotalTimeSeconds());
assertThat(cr.value().equals("Count for foobar : 1")).isTrue();
verify(conversionDelegate).serializeOnOutbound(any(KStream.class));
verify(conversionDelegate).deserializeOnInbound(any(Class.class),
any(KStream.class));
}
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
public static class WordCountProcessorApplication {
@StreamListener("input")
@SendTo("output")
public KStream<?, String> process(KStream<Object, String> input) {
return input
.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5))).count(Materialized.as("foo-WordCounts-x"))
.toStream().map((key, value) -> new KeyValue<>(null,
"Count for " + key.key() + " : " + value));
}
}
}

View File

@@ -18,6 +18,8 @@ package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
@@ -32,11 +34,6 @@ import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsStateStore;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsStateStoreProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -67,6 +64,7 @@ public class KafkaStreamsStateStoreIntegrationTests {
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.bindings.process-in-0=input",
"--spring.cloud.stream.bindings.input.destination=foobar",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
@@ -89,41 +87,14 @@ public class KafkaStreamsStateStoreIntegrationTests {
}
}
@Test
public void testKstreamStateStoreBuilderBeansDefinedInApplication() throws Exception {
SpringApplication app = new SpringApplication(StateStoreBeanApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input3.destination=foobar",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.input3.consumer.applicationId"
+ "=KafkaStreamsStateStoreIntegrationTests-xyzabc-123",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString());
try {
Thread.sleep(2000);
receiveAndValidateFoo(context, StateStoreBeanApplication.class);
}
catch (Exception e) {
throw e;
}
finally {
context.close();
}
}
@Test
public void testSameStateStoreIsCreatedOnlyOnceWhenMultipleInputBindingsArePresent() throws Exception {
SpringApplication app = new SpringApplication(ProductCountApplicationWithMultipleInputBindings.class);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.bindings.process-in-0=input1",
"--spring.cloud.stream.function.bindings.process-in-1=input2",
"--spring.cloud.stream.bindings.input1.destination=foobar",
"--spring.cloud.stream.bindings.input2.destination=hello-foobar",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
@@ -171,22 +142,12 @@ public class KafkaStreamsStateStoreIntegrationTests {
assertThat(state.persistent()).isTrue();
assertThat(productCount.processed).isTrue();
}
else if (clazz.isAssignableFrom(StateStoreBeanApplication.class)) {
StateStoreBeanApplication productCount = context
.getBean(StateStoreBeanApplication.class);
WindowStore<Object, String> state = productCount.state;
assertThat(state != null).isTrue();
assertThat(state.name()).isEqualTo("mystate");
assertThat(state.persistent()).isTrue();
assertThat(productCount.processed).isTrue();
}
else {
fail("Expected assertiond did not happen");
fail("Expected assertions did not happen");
}
}
@EnableBinding(KafkaStreamsProcessorX.class)
@EnableAutoConfiguration
public static class ProductCountApplication {
@@ -194,46 +155,10 @@ public class KafkaStreamsStateStoreIntegrationTests {
boolean processed;
@StreamListener("input")
@KafkaStreamsStateStore(name = "mystate", type = KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs = 300000, retentionMs = 300000)
@SuppressWarnings({ "deprecation", "unchecked" })
public void process(KStream<Object, Product> input) {
@Bean
public Consumer<KStream<Object, Product>> process() {
input.process(() -> new Processor<Object, Product>() {
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore) processorContext.getStateStore("mystate");
}
@Override
public void process(Object s, Product product) {
processed = true;
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "mystate");
}
}
@EnableBinding(KafkaStreamsProcessorZ.class)
@EnableAutoConfiguration
public static class StateStoreBeanApplication {
WindowStore<Object, String> state;
boolean processed;
@StreamListener("input3")
@SuppressWarnings({"unchecked" })
public void process(KStream<Object, Product> input) {
input.process(() -> new Processor<Object, Product>() {
return input -> input.process(() -> new Processor<Object, Product>() {
@Override
public void init(ProcessorContext processorContext) {
@@ -263,8 +188,6 @@ public class KafkaStreamsStateStoreIntegrationTests {
}
}
@EnableBinding(KafkaStreamsProcessorY.class)
@EnableAutoConfiguration
public static class ProductCountApplicationWithMultipleInputBindings {
@@ -272,33 +195,41 @@ public class KafkaStreamsStateStoreIntegrationTests {
boolean processed;
@StreamListener
@KafkaStreamsStateStore(name = "mystate", type = KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs = 300000, retentionMs = 300000)
@SuppressWarnings({ "deprecation", "unchecked" })
public void process(@Input("input1")KStream<Object, Product> input, @Input("input2")KStream<Object, Product> input2) {
@Bean
public BiConsumer<KStream<Object, Product>, KStream<Object, Product>> process() {
input.process(() -> new Processor<Object, Product>() {
return (input, input2) -> {
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore) processorContext.getStateStore("mystate");
}
input.process(() -> new Processor<Object, Product>() {
@Override
public void process(Object s, Product product) {
processed = true;
}
@Override
public void close() {
if (state != null) {
state.close();
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore) processorContext.getStateStore("mystate");
}
}
}, "mystate");
//simple use of input2, we are not using input2 for anything other than triggering some test behavior.
input2.foreach((key, value) -> { });
@Override
public void process(Object s, Product product) {
processed = true;
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "mystate");
//simple use of input2, we are not using input2 for anything other than triggering some test behavior.
input2.foreach((key, value) -> { });
};
}
@Bean
public StoreBuilder mystore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("mystate",
Duration.ofMillis(3), Duration.ofMillis(3), false), Serdes.String(),
Serdes.String());
}
}
@@ -315,25 +246,4 @@ public class KafkaStreamsStateStoreIntegrationTests {
}
}
interface KafkaStreamsProcessorX {
@Input("input")
KStream<?, ?> input();
}
interface KafkaStreamsProcessorY {
@Input("input1")
KStream<?, ?> input1();
@Input("input2")
KStream<?, ?> input2();
}
interface KafkaStreamsProcessorZ {
@Input("input3")
KStream<?, ?> input3();
}
}

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.time.Duration;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -35,10 +36,8 @@ import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.CleanupConfig;
@@ -49,7 +48,6 @@ import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
import static org.assertj.core.api.Assertions.assertThat;
@@ -91,6 +89,8 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.function.bindings.process-in-0=input",
"--spring.cloud.stream.function.bindings.process-out-0=output",
"--spring.cloud.stream.bindings.input.destination=foos",
"--spring.cloud.stream.bindings.output.destination=counts-id",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
@@ -105,7 +105,7 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
receiveAndValidateFoo();
// Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
.getBean("&stream-builder-ProductCountApplication-process", StreamsBuilderFactoryBean.class);
.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean,
"cleanupConfig", CleanupConfig.class);
assertThat(cleanup.cleanupOnStart()).isFalse();
@@ -128,15 +128,12 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
assertThat(cr.value().contains("Count for product with ID 123: 1")).isTrue();
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
public static class ProductCountApplication {
@StreamListener("input")
@SendTo("output")
public KStream<Integer, String> process(KStream<Object, Product> input) {
return input.filter((key, product) -> product.getId() == 123)
@Bean
public Function<KStream<Object, Product>, KStream<Integer, String>> process() {
return input -> input.filter((key, product) -> product.getId() == 123)
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(new JsonSerde<>(Product.class),
new JsonSerde<>(Product.class)))

View File

@@ -1,95 +0,0 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.stereotype.Component;
import static org.assertj.core.api.Assertions.assertThat;
public class MultiProcessorsWithSameNameAndBindingTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
"counts");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
.getEmbeddedKafka();
@Test
public void testBinderStartsSuccessfullyWhenTwoProcessorsWithSameNamesAndBindingsPresent() {
SpringApplication app = new SpringApplication(
MultiProcessorsWithSameNameAndBindingTests.WordCountProcessorApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=words",
"--spring.cloud.stream.bindings.input-1.destination=words",
"--spring.cloud.stream.bindings.output.destination=counts",
"--spring.cloud.stream.bindings.output.contentType=application/json",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString())) {
StreamsBuilderFactoryBean streamsBuilderFactoryBean1 = context
.getBean("&stream-builder-Foo-process", StreamsBuilderFactoryBean.class);
assertThat(streamsBuilderFactoryBean1).isNotNull();
StreamsBuilderFactoryBean streamsBuilderFactoryBean2 = context
.getBean("&stream-builder-Bar-process", StreamsBuilderFactoryBean.class);
assertThat(streamsBuilderFactoryBean2).isNotNull();
}
}
@EnableBinding(KafkaStreamsProcessorX.class)
@EnableAutoConfiguration
static class WordCountProcessorApplication {
@Component
static class Foo {
@StreamListener
public void process(@Input("input-1") KStream<Object, String> input) {
}
}
//Second class with a stub processor that has the same name as above ("process")
@Component
static class Bar {
@StreamListener
public void process(@Input("input-1") KStream<Object, String> input) {
}
}
}
interface KafkaStreamsProcessorX {
@Input("input-1")
KStream<?, ?> input1();
}
}

View File

@@ -1,184 +0,0 @@
/*
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration;
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import com.example.Sensor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.function.context.converter.avro.AvroSchemaMessageConverter;
import org.springframework.cloud.function.context.converter.avro.AvroSchemaServiceManagerImpl;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.integration.utils.TestAvroSerializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeTypeUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
*/
public class PerRecordAvroContentTypeTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
"received-sensors");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
.getEmbeddedKafka();
private static Consumer<String, byte[]> consumer;
@BeforeClass
public static void setUp() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("avro-ct-test",
"false", embeddedKafka);
// Receive the data as byte[]
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, byte[]> cf = new DefaultKafkaConsumerFactory<>(
consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "received-sensors");
}
@AfterClass
public static void tearDown() {
consumer.close();
}
@Test
public void testPerRecordAvroConentTypeAndVerifySerialization() throws Exception {
SpringApplication app = new SpringApplication(SensorCountAvroApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.consumer.useNativeDecoding=false",
"--spring.cloud.stream.bindings.output.producer.useNativeEncoding=false",
"--spring.cloud.stream.bindings.input.destination=sensors",
"--spring.cloud.stream.bindings.output.destination=received-sensors",
"--spring.cloud.stream.bindings.output.contentType=application/avro",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.application-id=per-record-avro-contentType-test",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString())) {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
// Use a custom avro test serializer
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
TestAvroSerializer.class);
DefaultKafkaProducerFactory<Integer, Sensor> pf = new DefaultKafkaProducerFactory<>(
senderProps);
try {
KafkaTemplate<Integer, Sensor> template = new KafkaTemplate<>(pf, true);
Random random = new Random();
Sensor sensor = new Sensor();
sensor.setId(UUID.randomUUID().toString() + "-v1");
sensor.setAcceleration(random.nextFloat() * 10);
sensor.setVelocity(random.nextFloat() * 100);
sensor.setTemperature(random.nextFloat() * 50);
// Send with avro content type set.
Message<?> message = MessageBuilder.withPayload(sensor)
.setHeader("contentType", "application/avro").build();
template.setDefaultTopic("sensors");
template.send(message);
// Serialized byte[] ^^ is received by the binding process and deserialzed
// it using avro converter.
// Then finally, the data will be output to a return topic as byte[]
// (using the same avro converter).
// Receive the byte[] from return topic
ConsumerRecord<String, byte[]> cr = KafkaTestUtils
.getSingleRecord(consumer, "received-sensors");
final byte[] value = cr.value();
// Convert the byte[] received back to avro object and verify that it is
// the same as the one we sent ^^.
AvroSchemaMessageConverter avroSchemaMessageConverter = new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
Message<?> receivedMessage = MessageBuilder.withPayload(value)
.setHeader("contentType",
MimeTypeUtils.parseMimeType("application/avro"))
.build();
Sensor messageConverted = (Sensor) avroSchemaMessageConverter
.fromMessage(receivedMessage, Sensor.class);
assertThat(messageConverted).isEqualTo(sensor);
}
finally {
pf.destroy();
}
}
}
@EnableBinding(KafkaStreamsProcessor.class)
@EnableAutoConfiguration
static class SensorCountAvroApplication {
@StreamListener
@SendTo("output")
public KStream<?, Sensor> process(@Input("input") KStream<Object, Sensor> input) {
// return the same Sensor object unchanged so that we can do test
// verifications
return input.map(KeyValue::new);
}
@Bean
public MessageConverter sensorMessageConverter() throws IOException {
return new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
}
}
}

View File

@@ -1,63 +0,0 @@
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.streams.integration.utils;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.cloud.function.context.converter.avro.AvroSchemaMessageConverter;
import org.springframework.cloud.function.context.converter.avro.AvroSchemaServiceManagerImpl;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
/**
* Custom avro serializer intended to be used for testing only.
*
* @param <S> Target type to serialize
* @author Soby Chacko
*/
public class TestAvroSerializer<S> implements Serializer<S> {
public TestAvroSerializer() {
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public byte[] serialize(String topic, S data) {
AvroSchemaMessageConverter avroSchemaMessageConverter = new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
Message<?> message = MessageBuilder.withPayload(data).build();
Map<String, Object> headers = new HashMap<>(message.getHeaders());
headers.put(MessageHeaders.CONTENT_TYPE, "application/avro");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Object payload = avroSchemaMessageConverter
.toMessage(message.getPayload(), messageHeaders).getPayload();
return (byte[]) payload;
}
@Override
public void close() {
}
}

View File

@@ -1,6 +0,0 @@
spring.cloud.stream.bindings.input.destination=DeserializationErrorHandlerByKafkaTests-In
spring.cloud.stream.bindings.output.destination=DeserializationErrorHandlerByKafkaTests-Out
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde