Compare commits
12 Commits
v4.0.0-M1
...
v2.1.2.REL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8cac84629f | ||
|
|
ad7f92ff89 | ||
|
|
17baad5d8a | ||
|
|
38b06e32f3 | ||
|
|
41a3c741e3 | ||
|
|
1cf05e3085 | ||
|
|
d8a55b41c7 | ||
|
|
e96174664f | ||
|
|
fa6bf19ebe | ||
|
|
46051cd1a4 | ||
|
|
1cb8cbbcc0 | ||
|
|
87f91640f4 |
14
README.adoc
14
README.adoc
@@ -1,4 +1,8 @@
|
||||
// Do not edit this file (e.g. go instead to src/main/asciidoc)
|
||||
////
|
||||
DO NOT EDIT THIS FILE. IT WAS GENERATED.
|
||||
Manual changes to this file will be lost when it is generated again.
|
||||
Edit the files in the src/main/asciidoc/ directory instead.
|
||||
////
|
||||
|
||||
:jdkversion: 1.8
|
||||
:github-tag: master
|
||||
@@ -338,12 +342,20 @@ Ignored if `replicas-assignments` is present.
|
||||
+
|
||||
Default: none (the binder-wide default of 1 is used).
|
||||
|
||||
|
||||
NOTE: The Kafka binder uses the `partitionCount` setting of the producer as a hint to create a topic with the given partition count (in conjunction with the `minPartitionCount`, the maximum of the two being the value being used).
|
||||
Exercise caution when configuring both `minPartitionCount` for a binder and `partitionCount` for an application, as the larger value is used.
|
||||
If a topic already exists with a smaller partition count and `autoAddPartitions` is disabled (the default), the binder fails to start.
|
||||
If a topic already exists with a smaller partition count and `autoAddPartitions` is enabled, new partitions are added.
|
||||
If a topic already exists with a larger number of partitions than the maximum of (`minPartitionCount` or `partitionCount`), the existing partition count is used.
|
||||
|
||||
compression::
|
||||
Set the `compression.type` producer property.
|
||||
Supported values are `none`, `gzip`, `snappy` and `lz4`.
|
||||
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
|
||||
+
|
||||
Default: `none`.
|
||||
|
||||
==== Usage examples
|
||||
|
||||
In this section, we show the use of the preceding properties for specific scenarios.
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.2.RELEASE</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
64
pom.xml
64
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.2.RELEASE</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.1.0.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.3.RELEASE</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
@@ -15,7 +15,7 @@
|
||||
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>2.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>2.1.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>2.1.2.RELEASE</spring-cloud-stream.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
@@ -142,35 +142,35 @@
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-tools</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>checkstyle-validation</id>
|
||||
<phase>validate</phase>
|
||||
<configuration>
|
||||
<configLocation>checkstyle.xml</configLocation>
|
||||
<headerLocation>checkstyle-header.txt</headerLocation>
|
||||
<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>
|
||||
<encoding>UTF-8</encoding>
|
||||
<consoleOutput>true</consoleOutput>
|
||||
<failsOnError>true</failsOnError>
|
||||
<includeTestSourceDirectory>true</includeTestSourceDirectory>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<!--<plugin>-->
|
||||
<!--<groupId>org.apache.maven.plugins</groupId>-->
|
||||
<!--<artifactId>maven-checkstyle-plugin</artifactId>-->
|
||||
<!--<dependencies>-->
|
||||
<!--<dependency>-->
|
||||
<!--<groupId>org.springframework.cloud</groupId>-->
|
||||
<!--<artifactId>spring-cloud-stream-tools</artifactId>-->
|
||||
<!--<version>${spring-cloud-stream.version}</version>-->
|
||||
<!--</dependency>-->
|
||||
<!--</dependencies>-->
|
||||
<!--<executions>-->
|
||||
<!--<execution>-->
|
||||
<!--<id>checkstyle-validation</id>-->
|
||||
<!--<phase>validate</phase>-->
|
||||
<!--<configuration>-->
|
||||
<!--<configLocation>checkstyle.xml</configLocation>-->
|
||||
<!--<headerLocation>checkstyle-header.txt</headerLocation>-->
|
||||
<!--<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>-->
|
||||
<!--<encoding>UTF-8</encoding>-->
|
||||
<!--<consoleOutput>true</consoleOutput>-->
|
||||
<!--<failsOnError>true</failsOnError>-->
|
||||
<!--<includeTestSourceDirectory>true</includeTestSourceDirectory>-->
|
||||
<!--</configuration>-->
|
||||
<!--<goals>-->
|
||||
<!--<goal>check</goal>-->
|
||||
<!--</goals>-->
|
||||
<!--</execution>-->
|
||||
<!--</executions>-->
|
||||
<!--</plugin>-->
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.2.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.2.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
@@ -39,6 +41,11 @@ public class KafkaExtendedBindingProperties
|
||||
return DEFAULTS_PREFIX;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, KafkaBindingProperties> getBindings() {
|
||||
return this.doGetBindings();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
|
||||
return KafkaBindingProperties.class;
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.2.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -17,9 +17,11 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
@@ -78,14 +80,15 @@ import org.springframework.util.StringUtils;
|
||||
/**
|
||||
* Kafka Streams specific implementation for {@link StreamListenerSetupMethodOrchestrator}
|
||||
* that overrides the default mechanisms for invoking StreamListener adapters.
|
||||
*
|
||||
* <p>
|
||||
* The orchestration primarily focus on the following areas:
|
||||
* <p>
|
||||
* 1. Allow multiple KStream output bindings (KStream branching) by allowing more than one
|
||||
* output values on {@link SendTo} 2. Allow multiple inbound bindings for multiple KStream
|
||||
* and or KTable/GlobalKTable types. 3. Each StreamListener method that it orchestrates
|
||||
* gets its own {@link StreamsBuilderFactoryBean} and {@link StreamsConfig}
|
||||
*
|
||||
* 1. Allow multiple KStream output bindings (KStream branching) by allowing more than one output values on {@link SendTo}
|
||||
* 2. Allow multiple inbound bindings for multiple KStream and or KTable/GlobalKTable types.
|
||||
* 3. Each StreamListener method that it orchestrates gets its own {@link StreamsBuilderFactoryBean} and {@link StreamsConfig}
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Soby Chacko
|
||||
* @author Lei Chen
|
||||
* @author Gary Russell
|
||||
*/
|
||||
@@ -107,6 +110,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
|
||||
private final Map<Method, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap<>();
|
||||
|
||||
private final Map<Method, List<String>> registeredStoresPerMethod = new HashMap<>();
|
||||
|
||||
private final CleanupConfig cleanupConfig;
|
||||
|
||||
private ConfigurableApplicationContext applicationContext;
|
||||
@@ -157,7 +162,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
|
||||
@Override
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean) {
|
||||
public void orchestrateStreamListenerSetupMethod(StreamListener streamListener,
|
||||
Method method, Object bean) {
|
||||
String[] methodAnnotatedOutboundNames = getOutboundBindingTargetNames(method);
|
||||
validateStreamListenerMethod(streamListener, method, methodAnnotatedOutboundNames);
|
||||
String methodAnnotatedInboundName = streamListener.value();
|
||||
@@ -250,11 +256,14 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
Topology.AutoOffsetReset autoOffsetReset = null;
|
||||
if (startOffset != null) {
|
||||
switch (startOffset) {
|
||||
case earliest : autoOffsetReset = Topology.AutoOffsetReset.EARLIEST;
|
||||
case earliest:
|
||||
autoOffsetReset = Topology.AutoOffsetReset.EARLIEST;
|
||||
break;
|
||||
case latest : autoOffsetReset = Topology.AutoOffsetReset.LATEST;
|
||||
case latest:
|
||||
autoOffsetReset = Topology.AutoOffsetReset.LATEST;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
default: break;
|
||||
}
|
||||
}
|
||||
if (extendedConsumerProperties.isResetOffsets()) {
|
||||
@@ -385,10 +394,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
}
|
||||
}
|
||||
|
||||
private KStream<?, ?> getkStream(String inboundName, KafkaStreamsStateStoreProperties storeSpec,
|
||||
BindingProperties bindingProperties,
|
||||
StreamsBuilder streamsBuilder,
|
||||
Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset) {
|
||||
private KStream<?, ?> getkStream(String inboundName,
|
||||
KafkaStreamsStateStoreProperties storeSpec,
|
||||
BindingProperties bindingProperties, StreamsBuilder streamsBuilder,
|
||||
Serde<?> keySerde, Serde<?> valueSerde,
|
||||
Topology.AutoOffsetReset autoOffsetReset) {
|
||||
if (storeSpec != null) {
|
||||
StoreBuilder storeBuilder = buildStateStore(storeSpec);
|
||||
streamsBuilder.addStateStore(storeBuilder);
|
||||
@@ -426,8 +436,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
return stream;
|
||||
}
|
||||
|
||||
private void enableNativeDecodingForKTableAlways(Class<?> parameterType, BindingProperties bindingProperties) {
|
||||
if (parameterType.isAssignableFrom(KTable.class) || parameterType.isAssignableFrom(GlobalKTable.class)) {
|
||||
|
||||
private void enableNativeDecodingForKTableAlways(Class<?> parameterType,
|
||||
BindingProperties bindingProperties) {
|
||||
if (parameterType.isAssignableFrom(KTable.class)
|
||||
|| parameterType.isAssignableFrom(GlobalKTable.class)) {
|
||||
if (bindingProperties.getConsumer() == null) {
|
||||
bindingProperties.setConsumer(new ConsumerProperties());
|
||||
}
|
||||
@@ -437,9 +450,10 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private void buildStreamsBuilderAndRetrieveConfig(Method method, ApplicationContext applicationContext,
|
||||
String inboundName) {
|
||||
ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
|
||||
private void buildStreamsBuilderAndRetrieveConfig(Method method,
|
||||
ApplicationContext applicationContext, String inboundName) {
|
||||
ConfigurableListableBeanFactory beanFactory = this.applicationContext
|
||||
.getBeanFactory();
|
||||
|
||||
Map<String, Object> streamConfigGlobalProperties = applicationContext.getBean("streamConfigGlobalProperties", Map.class);
|
||||
|
||||
@@ -471,7 +485,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
|
||||
StreamsBuilderFactoryBean streamsBuilder = this.cleanupConfig == null
|
||||
? new StreamsBuilderFactoryBean(kafkaStreamsConfiguration)
|
||||
: new StreamsBuilderFactoryBean(kafkaStreamsConfiguration, this.cleanupConfig);
|
||||
: new StreamsBuilderFactoryBean(kafkaStreamsConfiguration,
|
||||
this.cleanupConfig);
|
||||
streamsBuilder.setAutoStartup(false);
|
||||
BeanDefinition streamsBuilderBeanDefinition =
|
||||
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(), () -> streamsBuilder)
|
||||
@@ -486,7 +501,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
|
||||
}
|
||||
|
||||
private void validateStreamListenerMethod(StreamListener streamListener, Method method, String[] methodAnnotatedOutboundNames) {
|
||||
private void validateStreamListenerMethod(StreamListener streamListener,
|
||||
Method method, String[] methodAnnotatedOutboundNames) {
|
||||
String methodAnnotatedInboundName = streamListener.value();
|
||||
if (methodAnnotatedOutboundNames != null) {
|
||||
for (String s : methodAnnotatedOutboundNames) {
|
||||
@@ -541,21 +557,26 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private static KafkaStreamsStateStoreProperties buildStateStoreSpec(Method method) {
|
||||
KafkaStreamsStateStore spec = AnnotationUtils.findAnnotation(method, KafkaStreamsStateStore.class);
|
||||
if (spec != null) {
|
||||
Assert.isTrue(!ObjectUtils.isEmpty(spec.name()), "name cannot be empty");
|
||||
Assert.isTrue(spec.name().length() >= 1, "name cannot be empty.");
|
||||
KafkaStreamsStateStoreProperties props = new KafkaStreamsStateStoreProperties();
|
||||
props.setName(spec.name());
|
||||
props.setType(spec.type());
|
||||
props.setLength(spec.lengthMs());
|
||||
props.setKeySerdeString(spec.keySerde());
|
||||
props.setRetention(spec.retentionMs());
|
||||
props.setValueSerdeString(spec.valueSerde());
|
||||
props.setCacheEnabled(spec.cache());
|
||||
props.setLoggingDisabled(!spec.logging());
|
||||
return props;
|
||||
private KafkaStreamsStateStoreProperties buildStateStoreSpec(Method method) {
|
||||
if (!this.registeredStoresPerMethod.containsKey(method)) {
|
||||
KafkaStreamsStateStore spec = AnnotationUtils.findAnnotation(method,
|
||||
KafkaStreamsStateStore.class);
|
||||
if (spec != null) {
|
||||
Assert.isTrue(!ObjectUtils.isEmpty(spec.name()), "name cannot be empty");
|
||||
Assert.isTrue(spec.name().length() >= 1, "name cannot be empty.");
|
||||
this.registeredStoresPerMethod.put(method, new ArrayList<>());
|
||||
this.registeredStoresPerMethod.get(method).add(spec.name());
|
||||
KafkaStreamsStateStoreProperties props = new KafkaStreamsStateStoreProperties();
|
||||
props.setName(spec.name());
|
||||
props.setType(spec.type());
|
||||
props.setLength(spec.lengthMs());
|
||||
props.setKeySerdeString(spec.keySerde());
|
||||
props.setRetention(spec.retentionMs());
|
||||
props.setValueSerdeString(spec.valueSerde());
|
||||
props.setCacheEnabled(spec.cache());
|
||||
props.setLoggingDisabled(!spec.logging());
|
||||
return props;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.properties;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
@@ -36,6 +38,11 @@ public class KafkaStreamsExtendedBindingProperties
|
||||
return DEFAULTS_PREFIX;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, KafkaStreamsBindingProperties> getBindings() {
|
||||
return this.doGetBindings();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
|
||||
return KafkaStreamsBindingProperties.class;
|
||||
|
||||
@@ -77,7 +77,42 @@ public class KafkaStreamsStateStoreIntegrationTests {
|
||||
}
|
||||
}
|
||||
|
||||
private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception {
|
||||
@Test
|
||||
public void testSameStateStoreIsCreatedOnlyOnceWhenMultipleInputBindingsArePresent() throws Exception {
|
||||
SpringApplication app = new SpringApplication(ProductCountApplicationWithMultipleInputBindings.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=foobar",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
|
||||
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input1.consumer.applicationId"
|
||||
+ "=KafkaStreamsStateStoreIntegrationTests-abc",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
|
||||
+ embeddedKafka.getZookeeperConnectionString());
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
// We are not particularly interested in querying the state store here, as that is verified by the other test
|
||||
// in this class. This test verifies that the same store is not attempted to be created by multiple input bindings.
|
||||
// Normally, that will cause an exception to be thrown. However by not getting any exceptions, we are verifying
|
||||
// that the binder is handling it appropriately.
|
||||
//For more info, see this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/551
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
context.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void receiveAndValidateFoo(ConfigurableApplicationContext context)
|
||||
throws Exception {
|
||||
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
|
||||
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
|
||||
@@ -129,6 +164,44 @@ public class KafkaStreamsStateStoreIntegrationTests {
|
||||
}
|
||||
}
|
||||
|
||||
@EnableBinding(KafkaStreamsProcessorY.class)
|
||||
@EnableAutoConfiguration
|
||||
public static class ProductCountApplicationWithMultipleInputBindings {
|
||||
|
||||
WindowStore<Object, String> state;
|
||||
|
||||
boolean processed;
|
||||
|
||||
@StreamListener
|
||||
@KafkaStreamsStateStore(name = "mystate", type = KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs = 300000)
|
||||
@SuppressWarnings({ "deprecation", "unchecked" })
|
||||
public void process(@Input("input1")KStream<Object, Product> input, @Input("input2")KStream<Object, Product> input2) {
|
||||
|
||||
input.process(() -> new Processor<Object, Product>() {
|
||||
|
||||
@Override
|
||||
public void init(ProcessorContext processorContext) {
|
||||
state = (WindowStore) processorContext.getStateStore("mystate");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Object s, Product product) {
|
||||
processed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (state != null) {
|
||||
state.close();
|
||||
}
|
||||
}
|
||||
}, "mystate");
|
||||
|
||||
//simple use of input2, we are not using input2 for anything other than triggering some test behavior.
|
||||
input2.foreach((key, value) -> { });
|
||||
}
|
||||
}
|
||||
|
||||
public static class Product {
|
||||
|
||||
Integer id;
|
||||
@@ -147,4 +220,13 @@ public class KafkaStreamsStateStoreIntegrationTests {
|
||||
@Input("input")
|
||||
KStream<?, ?> input();
|
||||
}
|
||||
|
||||
interface KafkaStreamsProcessorY {
|
||||
|
||||
@Input("input1")
|
||||
KStream<?, ?> input1();
|
||||
|
||||
@Input("input2")
|
||||
KStream<?, ?> input2();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.2.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -38,6 +38,8 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfi
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
@@ -127,17 +129,13 @@ public class KafkaBinderConfiguration {
|
||||
return kafkaJaasLoginModuleInitializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* A conditional configuration for the {@link KafkaBinderMetrics} bean when the
|
||||
* {@link MeterRegistry} class is in classpath, as well as a {@link MeterRegistry} bean is
|
||||
* present in the application context.
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(MeterRegistry.class)
|
||||
@ConditionalOnBean(MeterRegistry.class)
|
||||
@ConditionalOnMissingBean(value = KafkaBinderMetrics.class, name = "outerContext")
|
||||
@ConditionalOnClass(name = "io.micrometer.core.instrument.MeterRegistry")
|
||||
protected class KafkaBinderMetricsConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnBean(MeterRegistry.class)
|
||||
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
|
||||
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
@@ -145,7 +143,25 @@ public class KafkaBinderConfiguration {
|
||||
|
||||
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, null, meterRegistry);
|
||||
}
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnBean(name = "outerContext")
|
||||
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
|
||||
@ConditionalOnClass(name = "io.micrometer.core.instrument.MeterRegistry")
|
||||
protected class KafkaBinderMetricsConfigurationWithMultiBinder {
|
||||
|
||||
@Bean
|
||||
public MeterBinder kafkaBinderMetrics(
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
ConfigurableApplicationContext context) {
|
||||
|
||||
MeterRegistry meterRegistry = context.getBean("outerContext", ApplicationContext.class)
|
||||
.getBean(MeterRegistry.class);
|
||||
return new KafkaBinderMetrics(kafkaMessageChannelBinder,
|
||||
configurationProperties, null, meterRegistry);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -158,5 +174,4 @@ public class KafkaBinderConfiguration {
|
||||
|
||||
private JaasLoginModuleConfiguration zookeeper;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2606,7 +2606,6 @@ public class KafkaBinderTests extends
|
||||
throw new RequeueCurrentMessageException();
|
||||
});
|
||||
}
|
||||
fail("Expected exception");
|
||||
}
|
||||
catch (MessageHandlingException e) {
|
||||
assertThat(e.getCause()).isInstanceOf(RequeueCurrentMessageException.class);
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.bootstrap;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.messaging.Sink;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class MultiBinderMeterRegistryTest {
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
|
||||
|
||||
@Test
|
||||
public void testMetricsWorkWithMultiBinders() {
|
||||
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
|
||||
SimpleApplication.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.cloud.stream.bindings.input.destination=foo",
|
||||
"--spring.cloud.stream.bindings.input.binder=inbound",
|
||||
"--spring.cloud.stream.bindings.input.group=testGroupabc",
|
||||
"--spring.cloud.stream.binders.inbound.type=kafka",
|
||||
"--spring.cloud.stream.binders.inbound.environment"
|
||||
+ ".spring.cloud.stream.kafka.binder.brokers" + "="
|
||||
+ embeddedKafka.getBrokersAsString());
|
||||
|
||||
final MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class);
|
||||
|
||||
assertThat(meterRegistry).isNotNull();
|
||||
|
||||
assertThat(meterRegistry.get("spring.cloud.stream.binder.kafka.offset")
|
||||
.tag("group", "testGroupabc")
|
||||
.tag("topic", "foo").gauge().value()).isNotNull();
|
||||
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Sink.class)
|
||||
static class SimpleApplication {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user