Compare commits

...

12 Commits

Author SHA1 Message Date
buildmaster
8cac84629f Update SNAPSHOT to 2.1.2.RELEASE 2019-03-04 12:57:07 +00:00
buildmaster
ad7f92ff89 Bumping versions 2019-03-04 11:58:41 +00:00
Soby Chacko
17baad5d8a KafkaStreamsStateStore with multiple input bindings
When KafkaStreamsStateStore annotation is used on a method with multiple input bindings,
it throws an exception. The reason is that each successive input binding after the first one
is trying to recreate the store that is already created. Fixing this issue.

Resolves #551
2019-02-28 17:56:36 -05:00
Oleg Zhurakousky
38b06e32f3 Upgraded stream core version 2019-02-26 19:56:40 +01:00
buildmaster
41a3c741e3 Bumping versions to 2.1.2.BUILD-SNAPSHOT after release 2019-02-15 15:36:46 +00:00
buildmaster
1cf05e3085 Going back to snapshots 2019-02-15 15:36:46 +00:00
buildmaster
d8a55b41c7 Update SNAPSHOT to 2.1.1.RELEASE 2019-02-15 15:35:22 +00:00
Soby Chacko
e96174664f Disable checkstyle plugin on 2.1.x branch 2019-02-15 10:17:44 -05:00
buildmaster
fa6bf19ebe Bumping versions 2019-02-14 10:09:25 +00:00
Oleg Zhurakousky
46051cd1a4 upgraded stream dependency to 2.1.1.BUILD-SNAPSHOT 2019-02-14 10:16:41 +01:00
Soby Chacko
1cb8cbbcc0 Fixed merge conflicts 2019-02-14 09:34:18 +01:00
Oleg Zhurakousky
87f91640f4 GH-1601 satellite changes to the core 2019-02-05 07:09:41 +01:00
14 changed files with 297 additions and 84 deletions

View File

@@ -1,4 +1,8 @@
// Do not edit this file (e.g. go instead to src/main/asciidoc)
////
DO NOT EDIT THIS FILE. IT WAS GENERATED.
Manual changes to this file will be lost when it is generated again.
Edit the files in the src/main/asciidoc/ directory instead.
////
:jdkversion: 1.8
:github-tag: master
@@ -338,12 +342,20 @@ Ignored if `replicas-assignments` is present.
+
Default: none (the binder-wide default of 1 is used).
NOTE: The Kafka binder uses the `partitionCount` setting of the producer as a hint to create a topic with the given partition count (in conjunction with the `minPartitionCount`, the maximum of the two being the value being used).
Exercise caution when configuring both `minPartitionCount` for a binder and `partitionCount` for an application, as the larger value is used.
If a topic already exists with a smaller partition count and `autoAddPartitions` is disabled (the default), the binder fails to start.
If a topic already exists with a smaller partition count and `autoAddPartitions` is enabled, new partitions are added.
If a topic already exists with a larger number of partitions than the maximum of (`minPartitionCount` or `partitionCount`), the existing partition count is used.
compression::
Set the `compression.type` producer property.
Supported values are `none`, `gzip`, `snappy` and `lz4`.
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
+
Default: `none`.
==== Usage examples
In this section, we show the use of the preceding properties for specific scenarios.

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.2.RELEASE</version>
</parent>
<packaging>pom</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>

64
pom.xml
View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.2.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.1.3.RELEASE</version>
<relativePath />
</parent>
<properties>
@@ -15,7 +15,7 @@
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
<kafka.version>2.0.0</kafka.version>
<spring-cloud-stream.version>2.1.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-stream.version>2.1.2.RELEASE</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>
@@ -142,35 +142,35 @@
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-tools</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>checkstyle-validation</id>
<phase>validate</phase>
<configuration>
<configLocation>checkstyle.xml</configLocation>
<headerLocation>checkstyle-header.txt</headerLocation>
<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<!--<plugin>-->
<!--<groupId>org.apache.maven.plugins</groupId>-->
<!--<artifactId>maven-checkstyle-plugin</artifactId>-->
<!--<dependencies>-->
<!--<dependency>-->
<!--<groupId>org.springframework.cloud</groupId>-->
<!--<artifactId>spring-cloud-stream-tools</artifactId>-->
<!--<version>${spring-cloud-stream.version}</version>-->
<!--</dependency>-->
<!--</dependencies>-->
<!--<executions>-->
<!--<execution>-->
<!--<id>checkstyle-validation</id>-->
<!--<phase>validate</phase>-->
<!--<configuration>-->
<!--<configLocation>checkstyle.xml</configLocation>-->
<!--<headerLocation>checkstyle-header.txt</headerLocation>-->
<!--<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>-->
<!--<encoding>UTF-8</encoding>-->
<!--<consoleOutput>true</consoleOutput>-->
<!--<failsOnError>true</failsOnError>-->
<!--<includeTestSourceDirectory>true</includeTestSourceDirectory>-->
<!--</configuration>-->
<!--<goals>-->
<!--<goal>check</goal>-->
<!--</goals>-->
<!--</execution>-->
<!--</executions>-->
<!--</plugin>-->
</plugins>
</build>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binder.kafka.properties;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
@@ -39,6 +41,11 @@ public class KafkaExtendedBindingProperties
return DEFAULTS_PREFIX;
}
@Override
public Map<String, KafkaBindingProperties> getBindings() {
return this.doGetBindings();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return KafkaBindingProperties.class;

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.2.RELEASE</version>
</parent>
<properties>

View File

@@ -17,9 +17,11 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -78,14 +80,15 @@ import org.springframework.util.StringUtils;
/**
* Kafka Streams specific implementation for {@link StreamListenerSetupMethodOrchestrator}
* that overrides the default mechanisms for invoking StreamListener adapters.
*
* <p>
* The orchestration primarily focus on the following areas:
* <p>
* 1. Allow multiple KStream output bindings (KStream branching) by allowing more than one
* output values on {@link SendTo} 2. Allow multiple inbound bindings for multiple KStream
* and or KTable/GlobalKTable types. 3. Each StreamListener method that it orchestrates
* gets its own {@link StreamsBuilderFactoryBean} and {@link StreamsConfig}
*
* 1. Allow multiple KStream output bindings (KStream branching) by allowing more than one output values on {@link SendTo}
* 2. Allow multiple inbound bindings for multiple KStream and or KTable/GlobalKTable types.
* 3. Each StreamListener method that it orchestrates gets its own {@link StreamsBuilderFactoryBean} and {@link StreamsConfig}
*
* @author Soby Chacko
* @author Soby Chacko
* @author Lei Chen
* @author Gary Russell
*/
@@ -107,6 +110,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
private final Map<Method, StreamsBuilderFactoryBean> methodStreamsBuilderFactoryBeanMap = new HashMap<>();
private final Map<Method, List<String>> registeredStoresPerMethod = new HashMap<>();
private final CleanupConfig cleanupConfig;
private ConfigurableApplicationContext applicationContext;
@@ -157,7 +162,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean) {
public void orchestrateStreamListenerSetupMethod(StreamListener streamListener,
Method method, Object bean) {
String[] methodAnnotatedOutboundNames = getOutboundBindingTargetNames(method);
validateStreamListenerMethod(streamListener, method, methodAnnotatedOutboundNames);
String methodAnnotatedInboundName = streamListener.value();
@@ -250,11 +256,14 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
Topology.AutoOffsetReset autoOffsetReset = null;
if (startOffset != null) {
switch (startOffset) {
case earliest : autoOffsetReset = Topology.AutoOffsetReset.EARLIEST;
case earliest:
autoOffsetReset = Topology.AutoOffsetReset.EARLIEST;
break;
case latest : autoOffsetReset = Topology.AutoOffsetReset.LATEST;
case latest:
autoOffsetReset = Topology.AutoOffsetReset.LATEST;
break;
default:
break;
default: break;
}
}
if (extendedConsumerProperties.isResetOffsets()) {
@@ -385,10 +394,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
}
}
private KStream<?, ?> getkStream(String inboundName, KafkaStreamsStateStoreProperties storeSpec,
BindingProperties bindingProperties,
StreamsBuilder streamsBuilder,
Serde<?> keySerde, Serde<?> valueSerde, Topology.AutoOffsetReset autoOffsetReset) {
private KStream<?, ?> getkStream(String inboundName,
KafkaStreamsStateStoreProperties storeSpec,
BindingProperties bindingProperties, StreamsBuilder streamsBuilder,
Serde<?> keySerde, Serde<?> valueSerde,
Topology.AutoOffsetReset autoOffsetReset) {
if (storeSpec != null) {
StoreBuilder storeBuilder = buildStateStore(storeSpec);
streamsBuilder.addStateStore(storeBuilder);
@@ -426,8 +436,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
return stream;
}
private void enableNativeDecodingForKTableAlways(Class<?> parameterType, BindingProperties bindingProperties) {
if (parameterType.isAssignableFrom(KTable.class) || parameterType.isAssignableFrom(GlobalKTable.class)) {
private void enableNativeDecodingForKTableAlways(Class<?> parameterType,
BindingProperties bindingProperties) {
if (parameterType.isAssignableFrom(KTable.class)
|| parameterType.isAssignableFrom(GlobalKTable.class)) {
if (bindingProperties.getConsumer() == null) {
bindingProperties.setConsumer(new ConsumerProperties());
}
@@ -437,9 +450,10 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
}
@SuppressWarnings({"unchecked"})
private void buildStreamsBuilderAndRetrieveConfig(Method method, ApplicationContext applicationContext,
String inboundName) {
ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
private void buildStreamsBuilderAndRetrieveConfig(Method method,
ApplicationContext applicationContext, String inboundName) {
ConfigurableListableBeanFactory beanFactory = this.applicationContext
.getBeanFactory();
Map<String, Object> streamConfigGlobalProperties = applicationContext.getBean("streamConfigGlobalProperties", Map.class);
@@ -471,7 +485,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
StreamsBuilderFactoryBean streamsBuilder = this.cleanupConfig == null
? new StreamsBuilderFactoryBean(kafkaStreamsConfiguration)
: new StreamsBuilderFactoryBean(kafkaStreamsConfiguration, this.cleanupConfig);
: new StreamsBuilderFactoryBean(kafkaStreamsConfiguration,
this.cleanupConfig);
streamsBuilder.setAutoStartup(false);
BeanDefinition streamsBuilderBeanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(), () -> streamsBuilder)
@@ -486,7 +501,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
private void validateStreamListenerMethod(StreamListener streamListener, Method method, String[] methodAnnotatedOutboundNames) {
private void validateStreamListenerMethod(StreamListener streamListener,
Method method, String[] methodAnnotatedOutboundNames) {
String methodAnnotatedInboundName = streamListener.value();
if (methodAnnotatedOutboundNames != null) {
for (String s : methodAnnotatedOutboundNames) {
@@ -541,21 +557,26 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
}
@SuppressWarnings({"unchecked"})
private static KafkaStreamsStateStoreProperties buildStateStoreSpec(Method method) {
KafkaStreamsStateStore spec = AnnotationUtils.findAnnotation(method, KafkaStreamsStateStore.class);
if (spec != null) {
Assert.isTrue(!ObjectUtils.isEmpty(spec.name()), "name cannot be empty");
Assert.isTrue(spec.name().length() >= 1, "name cannot be empty.");
KafkaStreamsStateStoreProperties props = new KafkaStreamsStateStoreProperties();
props.setName(spec.name());
props.setType(spec.type());
props.setLength(spec.lengthMs());
props.setKeySerdeString(spec.keySerde());
props.setRetention(spec.retentionMs());
props.setValueSerdeString(spec.valueSerde());
props.setCacheEnabled(spec.cache());
props.setLoggingDisabled(!spec.logging());
return props;
private KafkaStreamsStateStoreProperties buildStateStoreSpec(Method method) {
if (!this.registeredStoresPerMethod.containsKey(method)) {
KafkaStreamsStateStore spec = AnnotationUtils.findAnnotation(method,
KafkaStreamsStateStore.class);
if (spec != null) {
Assert.isTrue(!ObjectUtils.isEmpty(spec.name()), "name cannot be empty");
Assert.isTrue(spec.name().length() >= 1, "name cannot be empty.");
this.registeredStoresPerMethod.put(method, new ArrayList<>());
this.registeredStoresPerMethod.get(method).add(spec.name());
KafkaStreamsStateStoreProperties props = new KafkaStreamsStateStoreProperties();
props.setName(spec.name());
props.setType(spec.type());
props.setLength(spec.lengthMs());
props.setKeySerdeString(spec.keySerde());
props.setRetention(spec.retentionMs());
props.setValueSerdeString(spec.valueSerde());
props.setCacheEnabled(spec.cache());
props.setLoggingDisabled(!spec.logging());
return props;
}
}
return null;
}

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binder.kafka.streams.properties;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
@@ -36,6 +38,11 @@ public class KafkaStreamsExtendedBindingProperties
return DEFAULTS_PREFIX;
}
@Override
public Map<String, KafkaStreamsBindingProperties> getBindings() {
return this.doGetBindings();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return KafkaStreamsBindingProperties.class;

View File

@@ -77,7 +77,42 @@ public class KafkaStreamsStateStoreIntegrationTests {
}
}
private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception {
@Test
public void testSameStateStoreIsCreatedOnlyOnceWhenMultipleInputBindingsArePresent() throws Exception {
SpringApplication app = new SpringApplication(ProductCountApplicationWithMultipleInputBindings.class);
app.setWebApplicationType(WebApplicationType.NONE);
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=foobar",
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde"
+ "=org.apache.kafka.common.serialization.Serdes$StringSerde",
"--spring.cloud.stream.kafka.streams.bindings.input1.consumer.applicationId"
+ "=KafkaStreamsStateStoreIntegrationTests-abc",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
+ embeddedKafka.getZookeeperConnectionString());
try {
Thread.sleep(2000);
// We are not particularly interested in querying the state store here, as that is verified by the other test
// in this class. This test verifies that the same store is not attempted to be created by multiple input bindings.
// Normally, that will cause an exception to be thrown. However by not getting any exceptions, we are verifying
// that the binder is handling it appropriately.
//For more info, see this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/551
}
catch (Exception e) {
throw e;
}
finally {
context.close();
}
}
private void receiveAndValidateFoo(ConfigurableApplicationContext context)
throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true);
@@ -129,6 +164,44 @@ public class KafkaStreamsStateStoreIntegrationTests {
}
}
@EnableBinding(KafkaStreamsProcessorY.class)
@EnableAutoConfiguration
public static class ProductCountApplicationWithMultipleInputBindings {
WindowStore<Object, String> state;
boolean processed;
@StreamListener
@KafkaStreamsStateStore(name = "mystate", type = KafkaStreamsStateStoreProperties.StoreType.WINDOW, lengthMs = 300000)
@SuppressWarnings({ "deprecation", "unchecked" })
public void process(@Input("input1")KStream<Object, Product> input, @Input("input2")KStream<Object, Product> input2) {
input.process(() -> new Processor<Object, Product>() {
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore) processorContext.getStateStore("mystate");
}
@Override
public void process(Object s, Product product) {
processed = true;
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "mystate");
//simple use of input2, we are not using input2 for anything other than triggering some test behavior.
input2.foreach((key, value) -> { });
}
}
public static class Product {
Integer id;
@@ -147,4 +220,13 @@ public class KafkaStreamsStateStoreIntegrationTests {
@Input("input")
KStream<?, ?> input();
}
interface KafkaStreamsProcessorY {
@Input("input1")
KStream<?, ?> input1();
@Input("input2")
KStream<?, ?> input2();
}
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.2.RELEASE</version>
</parent>
<dependencies>

View File

@@ -38,6 +38,8 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfi
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@@ -127,17 +129,13 @@ public class KafkaBinderConfiguration {
return kafkaJaasLoginModuleInitializer;
}
/**
* A conditional configuration for the {@link KafkaBinderMetrics} bean when the
* {@link MeterRegistry} class is in classpath, as well as a {@link MeterRegistry} bean is
* present in the application context.
*/
@Configuration
@ConditionalOnClass(MeterRegistry.class)
@ConditionalOnBean(MeterRegistry.class)
@ConditionalOnMissingBean(value = KafkaBinderMetrics.class, name = "outerContext")
@ConditionalOnClass(name = "io.micrometer.core.instrument.MeterRegistry")
protected class KafkaBinderMetricsConfiguration {
@Bean
@ConditionalOnBean(MeterRegistry.class)
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties,
@@ -145,7 +143,25 @@ public class KafkaBinderConfiguration {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, null, meterRegistry);
}
}
@Configuration
@ConditionalOnBean(name = "outerContext")
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
@ConditionalOnClass(name = "io.micrometer.core.instrument.MeterRegistry")
protected class KafkaBinderMetricsConfigurationWithMultiBinder {
@Bean
public MeterBinder kafkaBinderMetrics(
KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties,
ConfigurableApplicationContext context) {
MeterRegistry meterRegistry = context.getBean("outerContext", ApplicationContext.class)
.getBean(MeterRegistry.class);
return new KafkaBinderMetrics(kafkaMessageChannelBinder,
configurationProperties, null, meterRegistry);
}
}
/**
@@ -158,5 +174,4 @@ public class KafkaBinderConfiguration {
private JaasLoginModuleConfiguration zookeeper;
}
}

View File

@@ -2606,7 +2606,6 @@ public class KafkaBinderTests extends
throw new RequeueCurrentMessageException();
});
}
fail("Expected exception");
}
catch (MessageHandlingException e) {
assertThat(e.getCause()).isInstanceOf(RequeueCurrentMessageException.class);

View File

@@ -0,0 +1,70 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.bootstrap;
import io.micrometer.core.instrument.MeterRegistry;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
*/
public class MultiBinderMeterRegistryTest {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
@Test
public void testMetricsWorkWithMultiBinders() {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
SimpleApplication.class).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.bindings.input.destination=foo",
"--spring.cloud.stream.bindings.input.binder=inbound",
"--spring.cloud.stream.bindings.input.group=testGroupabc",
"--spring.cloud.stream.binders.inbound.type=kafka",
"--spring.cloud.stream.binders.inbound.environment"
+ ".spring.cloud.stream.kafka.binder.brokers" + "="
+ embeddedKafka.getBrokersAsString());
final MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class);
assertThat(meterRegistry).isNotNull();
assertThat(meterRegistry.get("spring.cloud.stream.binder.kafka.offset")
.tag("group", "testGroupabc")
.tag("topic", "foo").gauge().value()).isNotNull();
applicationContext.close();
}
@SpringBootApplication
@EnableBinding(Sink.class)
static class SimpleApplication {
}
}