Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
de4b5a9443 | ||
|
|
a090614709 | ||
|
|
a3f7ca9756 | ||
|
|
82b07a0120 | ||
|
|
0d0cf8dcb7 | ||
|
|
cbfe03be2f | ||
|
|
1f0c6cabc6 | ||
|
|
0c61cedc85 | ||
|
|
44210f1b72 | ||
|
|
7007f9494a | ||
|
|
da268bb6dd | ||
|
|
e03baefbaf | ||
|
|
01396b6573 | ||
|
|
3f009a8267 | ||
|
|
31bb86b002 | ||
|
|
cc2dfd1d08 | ||
|
|
fc768ba695 | ||
|
|
38d6deb4d5 | ||
|
|
2a0b9015de |
40
pom.xml
40
pom.xml
@@ -2,20 +2,20 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.0.M1</version>
|
||||
<version>2.1.0.M2</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.0.2.RELEASE</version>
|
||||
<version>2.1.0.M1</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.0.3.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>1.0.1</kafka.version>
|
||||
<spring-cloud-stream.version>2.1.0.M1</spring-cloud-stream.version>
|
||||
<spring-kafka.version>2.2.0.M2</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.1.0.M1</spring-integration-kafka.version>
|
||||
<kafka.version>2.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>2.1.0.M2</spring-cloud-stream.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
@@ -47,6 +47,13 @@
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<classifier>test</classifier>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
@@ -101,6 +108,27 @@
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<classifier>test</classifier>
|
||||
<scope>test</scope>
|
||||
<version>${kafka.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>jline</groupId>
|
||||
<artifactId>jline</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>log4j</groupId>
|
||||
<artifactId>log4j</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.0.M1</version>
|
||||
<version>2.1.0.M2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.0.M1</version>
|
||||
<version>2.1.0.M2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.0.M1</version>
|
||||
<version>2.1.0.M2</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
|
||||
@@ -652,3 +652,23 @@ else {
|
||||
//query from the remote host
|
||||
}
|
||||
----
|
||||
|
||||
== Accessing the underlying KafkaStreams object
|
||||
|
||||
`StreamBuilderFactoryBean` from spring-kafka that is responsible for constructing the `KafkaStreams` object can be accessed programmatically.
|
||||
Each `StreamBuilderFactoryBean` is registered as `stream-builder` and appended with the `StreamListener` method name.
|
||||
If your `StreamListener` method is named as `process` for example, the stream builder bean is named as `stream-builder-process`.
|
||||
Since this is a factory bean, it should be accessed by prepending an ampersand (`&`) when accessing it programmatically.
|
||||
Following is an example and it assumes the `StreamListener` method is named as `process`
|
||||
|
||||
[source]
|
||||
----
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
||||
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
----
|
||||
|
||||
== State Cleanup
|
||||
|
||||
By default, the `Kafkastreams.cleanup()` method is called when the binding is stopped.
|
||||
See https://docs.spring.io/spring-kafka/reference/html/_reference.html#_configuration[the Spring Kafka documentation].
|
||||
To modify this behavior simply add a single `CleanupConfig` `@Bean` (configured to clean up on start, stop, or neither) to the application context; the bean will be detected and wired into the factory bean.
|
||||
|
||||
@@ -488,6 +488,6 @@ You can consume these exceptions with your own Spring Integration flow.
|
||||
|
||||
Kafka binder module exposes the following metrics:
|
||||
|
||||
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
|
||||
For example, if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, the consumer group named `myGroup` has `1000` messages waiting to be consumed from the topic calle `myTopic`.
|
||||
`spring.cloud.stream.binder.kafka.offset`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
|
||||
The metrics provided are based on the Mircometer metrics library. The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
|
||||
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.0.M1</version>
|
||||
<version>2.1.0.M2</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
@@ -45,11 +45,6 @@
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka-test</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<classifier>test</classifier>
|
||||
</dependency>
|
||||
<!-- Added back since Kafka still depends on it, but it has been removed by Boot due to EOL -->
|
||||
<dependency>
|
||||
<groupId>log4j</groupId>
|
||||
@@ -62,5 +57,24 @@
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-autoconfigure-processor</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<!-- Following dependencies are needed to support Kafka 1.1.0 client-->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<classifier>test</classifier>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -16,17 +16,18 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
@@ -34,18 +35,30 @@ import org.springframework.context.annotation.Configuration;
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
@Configuration
|
||||
@Import({KafkaAutoConfiguration.class})
|
||||
public class KStreamBinderConfiguration {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(KStreamBinderConfiguration.class);
|
||||
|
||||
@Autowired
|
||||
private KafkaProperties kafkaProperties;
|
||||
|
||||
@Autowired
|
||||
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
|
||||
@Bean
|
||||
@ConditionalOnBean(name = "outerContext")
|
||||
public BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
|
||||
return beanFactory -> {
|
||||
ApplicationContext outerContext = (ApplicationContext) beanFactory.getBean("outerContext");
|
||||
beanFactory.registerSingleton(KafkaStreamsBinderConfigurationProperties.class.getSimpleName(), outerContext
|
||||
.getBean(KafkaStreamsBinderConfigurationProperties.class));
|
||||
beanFactory.registerSingleton(KafkaStreamsMessageConversionDelegate.class.getSimpleName(), outerContext
|
||||
.getBean(KafkaStreamsMessageConversionDelegate.class));
|
||||
beanFactory.registerSingleton(KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), outerContext
|
||||
.getBean(KafkaStreamsBindingInformationCatalogue.class));
|
||||
beanFactory.registerSingleton(KeyValueSerdeResolver.class.getSimpleName(), outerContext
|
||||
.getBean(KeyValueSerdeResolver.class));
|
||||
beanFactory.registerSingleton(KafkaStreamsExtendedBindingProperties.class.getSimpleName(), outerContext
|
||||
.getBean(KafkaStreamsExtendedBindingProperties.class));
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties) {
|
||||
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties,
|
||||
KafkaProperties kafkaProperties) {
|
||||
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
|
||||
}
|
||||
|
||||
@@ -54,7 +67,8 @@ public class KStreamBinderConfiguration {
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner,
|
||||
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate,
|
||||
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
|
||||
KeyValueSerdeResolver keyValueSerdeResolver) {
|
||||
KeyValueSerdeResolver keyValueSerdeResolver,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
|
||||
KStreamBinder kStreamBinder = new KStreamBinder(binderConfigurationProperties, kafkaTopicProvisioner,
|
||||
KafkaStreamsMessageConversionDelegate, KafkaStreamsBindingInformationCatalogue,
|
||||
keyValueSerdeResolver);
|
||||
|
||||
@@ -16,27 +16,37 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
@Configuration
|
||||
public class KTableBinderConfiguration {
|
||||
|
||||
@Autowired
|
||||
private KafkaProperties kafkaProperties;
|
||||
|
||||
@Autowired
|
||||
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
|
||||
@Bean
|
||||
@ConditionalOnBean(name = "outerContext")
|
||||
public BeanFactoryPostProcessor outerContextBeanFactoryPostProcessor() {
|
||||
return beanFactory -> {
|
||||
ApplicationContext outerContext = (ApplicationContext) beanFactory.getBean("outerContext");
|
||||
beanFactory.registerSingleton(KafkaStreamsBinderConfigurationProperties.class.getSimpleName(), outerContext
|
||||
.getBean(KafkaStreamsBinderConfigurationProperties.class));
|
||||
beanFactory.registerSingleton(KafkaStreamsBindingInformationCatalogue.class.getSimpleName(), outerContext
|
||||
.getBean(KafkaStreamsBindingInformationCatalogue.class));
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties) {
|
||||
public KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties binderConfigurationProperties,
|
||||
KafkaProperties kafkaProperties) {
|
||||
return new KafkaTopicProvisioner(binderConfigurationProperties, kafkaProperties);
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,9 @@ import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
|
||||
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
|
||||
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
@@ -34,9 +36,11 @@ import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStr
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binding.BindingService;
|
||||
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
|
||||
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
|
||||
import org.springframework.cloud.stream.config.BindingServiceProperties;
|
||||
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.core.CleanupConfig;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
@@ -46,6 +50,7 @@ import org.springframework.util.ObjectUtils;
|
||||
*/
|
||||
@EnableConfigurationProperties(KafkaStreamsExtendedBindingProperties.class)
|
||||
@ConditionalOnBean(BindingService.class)
|
||||
@AutoConfigureAfter(BindingServiceConfiguration.class)
|
||||
public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@@ -99,10 +104,12 @@ public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
|
||||
KStreamStreamListenerParameterAdapter kafkaStreamListenerParameterAdapter,
|
||||
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
|
||||
ObjectProvider<CleanupConfig> cleanupConfig) {
|
||||
return new KafkaStreamsStreamListenerSetupMethodOrchestrator(bindingServiceProperties,
|
||||
kafkaStreamsExtendedBindingProperties, keyValueSerdeResolver, kafkaStreamsBindingInformationCatalogue,
|
||||
kafkaStreamListenerParameterAdapter, streamListenerResultAdapters, binderConfigurationProperties);
|
||||
kafkaStreamListenerParameterAdapter, streamListenerResultAdapters, binderConfigurationProperties,
|
||||
cleanupConfig.getIfUnique());
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -41,7 +41,7 @@ import org.springframework.util.StringUtils;
|
||||
*
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
class KafkaStreamsMessageConversionDelegate {
|
||||
public class KafkaStreamsMessageConversionDelegate {
|
||||
|
||||
private static final ThreadLocal<KeyValue<Object, Object>> keyValueThreadLocal = new ThreadLocal<>();
|
||||
|
||||
@@ -182,12 +182,6 @@ class KafkaStreamsMessageConversionDelegate {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
@Override
|
||||
public void punctuate(long timestamp) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
|
||||
@@ -21,21 +21,19 @@ import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.streams.Consumed;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
|
||||
import org.apache.kafka.streams.kstream.Consumed;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
import org.apache.kafka.streams.state.Stores;
|
||||
|
||||
@@ -64,6 +62,7 @@ import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.core.MethodParameter;
|
||||
import org.springframework.core.annotation.AnnotationUtils;
|
||||
import org.springframework.kafka.core.CleanupConfig;
|
||||
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
@@ -86,6 +85,7 @@ import org.springframework.util.StringUtils;
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Lei Chen
|
||||
* @author Gary Russell
|
||||
*/
|
||||
class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListenerSetupMethodOrchestrator, ApplicationContextAware {
|
||||
|
||||
@@ -107,6 +107,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
|
||||
private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
|
||||
|
||||
private final CleanupConfig cleanupConfig;
|
||||
|
||||
private ConfigurableApplicationContext applicationContext;
|
||||
|
||||
KafkaStreamsStreamListenerSetupMethodOrchestrator(BindingServiceProperties bindingServiceProperties,
|
||||
@@ -115,7 +117,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
|
||||
StreamListenerParameterAdapter streamListenerParameterAdapter,
|
||||
Collection<StreamListenerResultAdapter> streamListenerResultAdapters,
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
|
||||
CleanupConfig cleanupConfig) {
|
||||
this.bindingServiceProperties = bindingServiceProperties;
|
||||
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
|
||||
this.keyValueSerdeResolver = keyValueSerdeResolver;
|
||||
@@ -123,6 +126,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
this.streamListenerParameterAdapter = streamListenerParameterAdapter;
|
||||
this.streamListenerResultAdapters = streamListenerResultAdapters;
|
||||
this.binderConfigurationProperties = binderConfigurationProperties;
|
||||
this.cleanupConfig = cleanupConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -291,6 +295,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
}
|
||||
|
||||
private <K,V> KTable<K,V> materializedAs(StreamsBuilder streamsBuilder, String destination, String storeName, Serde<K> k, Serde<V> v) {
|
||||
|
||||
return streamsBuilder.table(bindingServiceProperties.getBindingDestination(destination),
|
||||
Materialized.<K, V, KeyValueStore<Bytes, byte[]>>as(storeName)
|
||||
.withKeySerde(k)
|
||||
@@ -384,14 +389,6 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
private StreamsConfig buildStreamsBuilderAndRetrieveConfig(Method method, ApplicationContext applicationContext,
|
||||
BindingProperties bindingProperties) {
|
||||
ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
|
||||
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean();
|
||||
streamsBuilder.setAutoStartup(false);
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
BeanDefinition streamsBuilderBeanDefinition =
|
||||
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(), () -> streamsBuilder)
|
||||
.getRawBeanDefinition();
|
||||
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + uuid, streamsBuilderBeanDefinition);
|
||||
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + uuid, StreamsBuilderFactoryBean.class);
|
||||
String group = bindingProperties.getGroup();
|
||||
if (!StringUtils.hasText(group)) {
|
||||
group = binderConfigurationProperties.getApplicationId();
|
||||
@@ -418,12 +415,20 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
return super.getConfiguredInstance(key, clazz);
|
||||
}
|
||||
};
|
||||
StreamsBuilderFactoryBean streamsBuilder = this.cleanupConfig == null
|
||||
? new StreamsBuilderFactoryBean(streamsConfig)
|
||||
: new StreamsBuilderFactoryBean(streamsConfig, this.cleanupConfig);
|
||||
streamsBuilder.setAutoStartup(false);
|
||||
BeanDefinition streamsBuilderBeanDefinition =
|
||||
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(), () -> streamsBuilder)
|
||||
.getRawBeanDefinition();
|
||||
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + method.getName(), streamsBuilderBeanDefinition);
|
||||
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
|
||||
BeanDefinition streamsConfigBeanDefinition =
|
||||
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsConfig>) streamsConfig.getClass(), () -> streamsConfig)
|
||||
.getRawBeanDefinition();
|
||||
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("streamsConfig-" + uuid, streamsConfigBeanDefinition);
|
||||
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("streamsConfig-" + method.getName(), streamsConfigBeanDefinition);
|
||||
|
||||
streamsBuilder.setStreamsConfig(streamsConfig);
|
||||
methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderX);
|
||||
return streamsConfig;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.bootstrap;
|
||||
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
@Ignore("Temporarily disabling the test as builds are getting slower due to this.")
|
||||
public class KafkaStreamsBinderBootstrapTest {
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
|
||||
|
||||
@Test
|
||||
public void testKafkaStreamsBinderWithCustomEnvironmentCanStart() {
|
||||
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
|
||||
.web(WebApplicationType.NONE)
|
||||
.run("--spring.cloud.stream.bindings.input.destination=foo",
|
||||
"--spring.cloud.stream.bindings.input.binder=kBind1",
|
||||
"--spring.cloud.stream.binders.kBind1.type=kstream",
|
||||
"--spring.cloud.stream.binders.kBind1.environment.spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.binders.kBind1.environment.spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
|
||||
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKafkaStreamsBinderWithStandardConfigurationCanStart() {
|
||||
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
|
||||
.web(WebApplicationType.NONE)
|
||||
.run("--spring.cloud.stream.bindings.input.destination=foo",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers=" + embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
|
||||
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableBinding(StreamSourceProcessor.class)
|
||||
static class SimpleApplication {
|
||||
|
||||
@StreamListener
|
||||
public void handle(@Input("input") KStream<Object, String> stream) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
interface StreamSourceProcessor {
|
||||
@Input("input")
|
||||
KStream<?, ?> inputStream();
|
||||
}
|
||||
}
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
@@ -72,7 +72,7 @@ public abstract class DeserializationErrorHandlerByKafkaTests {
|
||||
"error.word1.groupx", "error.word2.groupx");
|
||||
|
||||
@SpyBean
|
||||
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
|
||||
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
|
||||
|
||||
private static Consumer<String, String> consumer;
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@@ -66,7 +66,7 @@ public abstract class DeserializtionErrorHandlerByBinderTests {
|
||||
"error.foos1.fooz-group", "error.foos2.fooz-group");
|
||||
|
||||
@SpyBean
|
||||
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
|
||||
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
|
||||
|
||||
private static Consumer<Integer, String> consumer;
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
* Copyright 2017-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
@@ -24,11 +24,14 @@ import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Serialized;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.apache.kafka.streams.state.QueryableStoreTypes;
|
||||
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
@@ -45,9 +48,13 @@ import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.integration.test.util.TestUtils;
|
||||
import org.springframework.kafka.core.CleanupConfig;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
@@ -101,7 +108,17 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
|
||||
try {
|
||||
receiveAndValidate(context);
|
||||
} finally {
|
||||
//Assertions on StreamBuilderFactoryBean
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
||||
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
ReadOnlyWindowStore<Object, Object> store = kafkaStreams.store("foo-WordCounts", QueryableStoreTypes.windowStore());
|
||||
assertThat(store).isNotNull();
|
||||
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean, "cleanupConfig",
|
||||
CleanupConfig.class);
|
||||
assertThat(cleanup.cleanupOnStart()).isTrue();
|
||||
assertThat(cleanup.cleanupOnStop()).isFalse();
|
||||
}
|
||||
finally {
|
||||
context.close();
|
||||
}
|
||||
}
|
||||
@@ -129,8 +146,6 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
public KStream<?, WordCount> process(@Input("input") KStream<Object, String> input) {
|
||||
|
||||
input.map((k,v) -> {
|
||||
System.out.println(k);
|
||||
System.out.println(v);
|
||||
return new KeyValue<>(k,v);
|
||||
});
|
||||
return input
|
||||
@@ -143,6 +158,11 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public CleanupConfig cleanupConfig() {
|
||||
return new CleanupConfig(true, false);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static class WordCount {
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Serialized;
|
||||
import org.apache.kafka.streams.state.HostInfo;
|
||||
import org.apache.kafka.streams.state.QueryableStoreTypes;
|
||||
@@ -39,6 +40,7 @@ import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
@@ -137,7 +139,7 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
.filter((key, product) -> product.getId() == 123)
|
||||
.map((key, value) -> new KeyValue<>(value.id, value))
|
||||
.groupByKey(Serialized.with(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class)))
|
||||
.count("prod-id-count-store")
|
||||
.count(Materialized.as("prod-id-count-store"))
|
||||
.toStream()
|
||||
.map((key, value) -> new KeyValue<>(null, "Count for product with ID 123: " + value));
|
||||
}
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
@@ -71,7 +71,7 @@ public abstract class KafkaStreamsNativeEncodingDecodingTests {
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts");
|
||||
|
||||
@SpyBean
|
||||
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
|
||||
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate;
|
||||
|
||||
private static Consumer<String, String> consumer;
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
|
||||
import java.util.Map;
|
||||
@@ -116,11 +116,6 @@ public class KafkaStreamsStateStoreIntegrationTests {
|
||||
processed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void punctuate(long l) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (state != null) {
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2017 the original author or authors.
|
||||
* Copyright 2017-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@@ -38,9 +38,12 @@ import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.integration.test.util.TestUtils;
|
||||
import org.springframework.kafka.core.CleanupConfig;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.support.serializer.JsonSerde;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
@@ -92,7 +95,15 @@ public class KafkastreamsBinderPojoInputStringOutputIntegrationTests {
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
|
||||
try {
|
||||
receiveAndValidateFoo(context);
|
||||
} finally {
|
||||
//Assertions on StreamBuilderFactoryBean
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process",
|
||||
StreamsBuilderFactoryBean.class);
|
||||
CleanupConfig cleanup = TestUtils.getPropertyValue(streamsBuilderFactoryBean, "cleanupConfig",
|
||||
CleanupConfig.class);
|
||||
assertThat(cleanup.cleanupOnStart()).isFalse();
|
||||
assertThat(cleanup.cleanupOnStop()).isTrue();
|
||||
}
|
||||
finally {
|
||||
context.close();
|
||||
}
|
||||
}
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@@ -14,7 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.integration;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Date;
|
||||
@@ -1,5 +1,5 @@
|
||||
eclipse.preferences.version=1
|
||||
org.eclipse.jdt.ui.ignorelowercasenames=true
|
||||
org.eclipse.jdt.ui.importorder=java;javax;com;org;org.springframework;ch.qos;\#;
|
||||
org.eclipse.jdt.ui.importorder=java;javax;com;io.micrometer;org;org.springframework;ch.qos;\#;
|
||||
org.eclipse.jdt.ui.ondemandthreshold=99
|
||||
org.eclipse.jdt.ui.staticondemandthreshold=99
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.0.M1</version>
|
||||
<version>2.1.0.M2</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
@@ -60,6 +60,26 @@
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<!-- Following dependencies are needed to support Kafka 1.1.0 client-->
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<classifier>test</classifier>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<classifier>test</classifier>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
* Copyright 2016-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -78,25 +78,31 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
public Health call() {
|
||||
try {
|
||||
if (metadataConsumer == null) {
|
||||
metadataConsumer = consumerFactory.createConsumer();
|
||||
}
|
||||
Set<String> downMessages = new HashSet<>();
|
||||
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
|
||||
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
|
||||
downMessages.add(partitionInfo.toString());
|
||||
synchronized(KafkaBinderHealthIndicator.this) {
|
||||
if (metadataConsumer == null) {
|
||||
metadataConsumer = consumerFactory.createConsumer();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (downMessages.isEmpty()) {
|
||||
return Health.up().build();
|
||||
}
|
||||
else {
|
||||
return Health.down()
|
||||
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
|
||||
.build();
|
||||
synchronized (metadataConsumer) {
|
||||
Set<String> downMessages = new HashSet<>();
|
||||
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
|
||||
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
|
||||
downMessages.add(partitionInfo.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (downMessages.isEmpty()) {
|
||||
return Health.up().build();
|
||||
}
|
||||
else {
|
||||
return Health.down()
|
||||
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
||||
@@ -20,11 +20,17 @@ import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.TimeGauge;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
@@ -51,9 +57,12 @@ import org.springframework.util.ObjectUtils;
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Jon Schneider
|
||||
* @author Thomas Cheyney
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
|
||||
|
||||
private static final int DEFAULT_TIMEOUT = 60;
|
||||
|
||||
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
|
||||
|
||||
static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
|
||||
@@ -68,6 +77,8 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
|
||||
|
||||
private Consumer<?, ?> metadataConsumer;
|
||||
|
||||
private int timeout = DEFAULT_TIMEOUT;
|
||||
|
||||
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
|
||||
KafkaBinderConfigurationProperties binderConfigurationProperties,
|
||||
ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
|
||||
@@ -84,6 +95,10 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
|
||||
this(binder, binderConfigurationProperties, null, null);
|
||||
}
|
||||
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bindTo(MeterRegistry registry) {
|
||||
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
|
||||
@@ -106,33 +121,56 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
|
||||
}
|
||||
|
||||
private double calculateConsumerLagOnTopic(String topic, String group) {
|
||||
long lag = 0;
|
||||
ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||
Future<Long> future = exec.submit(() -> {
|
||||
|
||||
long lag = 0;
|
||||
try {
|
||||
if (metadataConsumer == null) {
|
||||
synchronized(KafkaBinderMetrics.this) {
|
||||
if (metadataConsumer == null) {
|
||||
metadataConsumer = createConsumerFactory(group).createConsumer();
|
||||
}
|
||||
}
|
||||
}
|
||||
synchronized (metadataConsumer) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
List<TopicPartition> topicPartitions = new LinkedList<>();
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
|
||||
}
|
||||
|
||||
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
|
||||
|
||||
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
|
||||
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
|
||||
if (current != null) {
|
||||
lag += endOffset.getValue() - current.offset();
|
||||
}
|
||||
else {
|
||||
lag += endOffset.getValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.debug("Cannot generate metric for topic: " + topic, e);
|
||||
}
|
||||
return lag;
|
||||
});
|
||||
try {
|
||||
if (metadataConsumer == null) {
|
||||
metadataConsumer = createConsumerFactory(group).createConsumer();
|
||||
}
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
List<TopicPartition> topicPartitions = new LinkedList<>();
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
|
||||
}
|
||||
|
||||
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
|
||||
|
||||
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
|
||||
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
|
||||
if (current != null) {
|
||||
lag += endOffset.getValue() - current.offset();
|
||||
}
|
||||
else {
|
||||
lag += endOffset.getValue();
|
||||
}
|
||||
}
|
||||
return future.get(this.timeout, TimeUnit.SECONDS);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.debug("Cannot generate metric for topic: " + topic, e);
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return 0L;
|
||||
}
|
||||
catch (ExecutionException | TimeoutException e) {
|
||||
return 0L;
|
||||
}
|
||||
finally {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
return lag;
|
||||
}
|
||||
|
||||
private ConsumerFactory<?, ?> createConsumerFactory(String group) {
|
||||
|
||||
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@@ -67,32 +68,32 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
|
||||
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.cloud.stream.provisioning.ProducerDestination;
|
||||
import org.springframework.context.Lifecycle;
|
||||
import org.springframework.expression.common.LiteralExpression;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.integration.StaticMessageHeaderAccessor;
|
||||
import org.springframework.integration.acks.AcknowledgmentCallback;
|
||||
import org.springframework.integration.channel.ChannelInterceptorAware;
|
||||
import org.springframework.integration.core.MessageProducer;
|
||||
import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
|
||||
import org.springframework.integration.kafka.inbound.KafkaMessageSource;
|
||||
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
|
||||
import org.springframework.integration.kafka.support.RawRecordHeaderErrorMessageStrategy;
|
||||
import org.springframework.integration.support.AcknowledgmentCallback;
|
||||
import org.springframework.integration.support.AcknowledgmentCallback.Status;
|
||||
import org.springframework.integration.support.ErrorMessageStrategy;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.integration.support.StaticMessageHeaderAccessor;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
import org.springframework.kafka.listener.ContainerProperties;
|
||||
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
@@ -105,6 +106,7 @@ import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.support.ChannelInterceptor;
|
||||
import org.springframework.messaging.support.ErrorMessage;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
@@ -130,12 +132,21 @@ public class KafkaMessageChannelBinder extends
|
||||
AbstractMessageChannelBinder<ExtendedConsumerProperties<KafkaConsumerProperties>, ExtendedProducerProperties<KafkaProducerProperties>, KafkaTopicProvisioner>
|
||||
implements ExtendedPropertiesBinder<MessageChannel, KafkaConsumerProperties, KafkaProducerProperties> {
|
||||
|
||||
public static final String X_EXCEPTION_FQCN = "x-exception-fqcn";
|
||||
|
||||
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
|
||||
|
||||
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
|
||||
|
||||
public static final String X_ORIGINAL_TOPIC = "x-original-topic";
|
||||
|
||||
public static final String X_ORIGINAL_PARTITION = "x-original-partition";
|
||||
|
||||
public static final String X_ORIGINAL_OFFSET = "x-original-offset";
|
||||
|
||||
public static final String X_ORIGINAL_TIMESTAMP = "x-original-timestamp";
|
||||
|
||||
public static final String X_ORIGINAL_TIMESTAMP_TYPE = "x-original-timestamp-type";
|
||||
|
||||
private final KafkaBinderConfigurationProperties configurationProperties;
|
||||
|
||||
@@ -207,6 +218,14 @@ public class KafkaMessageChannelBinder extends
|
||||
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties, MessageChannel errorChannel)
|
||||
throws Exception {
|
||||
throw new IllegalStateException("The abstract binder should not call this method");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
|
||||
MessageChannel channel, MessageChannel errorChannel)
|
||||
throws Exception {
|
||||
/*
|
||||
* IMPORTANT: With a transactional binder, individual producer properties for Kafka are
|
||||
* ignored; the global binder (spring.cloud.stream.kafka.binder.transaction.producer.*)
|
||||
@@ -226,20 +245,18 @@ public class KafkaMessageChannelBinder extends
|
||||
return partitionsFor;
|
||||
});
|
||||
this.topicsInUse.put(destination.getName(), new TopicInformation(null, partitions));
|
||||
if (producerProperties.getPartitionCount() < partitions.size()) {
|
||||
if (producerProperties.isPartitioned() && producerProperties.getPartitionCount() < partitions.size()) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("The `partitionCount` of the producer for topic " + destination.getName() + " is "
|
||||
+ producerProperties.getPartitionCount() + ", smaller than the actual partition count of "
|
||||
+ partitions.size() + " of the topic. The larger number will be used instead.");
|
||||
+ partitions.size() + " for the topic. The larger number will be used instead.");
|
||||
}
|
||||
/*
|
||||
* This is dirty; it relies on the fact that we, and the partition interceptor, share a
|
||||
* hard reference to the producer properties instance. But I don't see another way to fix
|
||||
* it since the interceptor has already been added to the channel, and we don't have
|
||||
* access to the channel here; if we did, we could inject the proper partition count
|
||||
* there. TODO: Consider this when doing the 2.0 binder restructuring.
|
||||
*/
|
||||
producerProperties.setPartitionCount(partitions.size());
|
||||
List<ChannelInterceptor> interceptors = ((ChannelInterceptorAware) channel).getChannelInterceptors();
|
||||
interceptors.forEach(interceptor -> {
|
||||
if (interceptor instanceof PartitioningInterceptor) {
|
||||
((PartitioningInterceptor) interceptor).setPartitionCount(partitions.size());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFB);
|
||||
@@ -398,14 +415,14 @@ public class KafkaMessageChannelBinder extends
|
||||
// end of these won't be needed...
|
||||
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
|
||||
messageListenerContainer.getContainerProperties()
|
||||
.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
|
||||
.setAckMode(ContainerProperties.AckMode.MANUAL);
|
||||
messageListenerContainer.getContainerProperties().setAckOnError(false);
|
||||
}
|
||||
else {
|
||||
messageListenerContainer.getContainerProperties()
|
||||
.setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
|
||||
if (extendedConsumerProperties.getExtension().isAckEachRecord()) {
|
||||
messageListenerContainer.getContainerProperties().setAckMode(AckMode.RECORD);
|
||||
messageListenerContainer.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
|
||||
}
|
||||
}
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
@@ -656,7 +673,7 @@ public class KafkaMessageChannelBinder extends
|
||||
DlqSender<?,?> dlqSender = new DlqSender(kafkaTemplate);
|
||||
|
||||
return message -> {
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
final ConsumerRecord<Object, Object> record = message.getHeaders()
|
||||
.get(KafkaHeaders.RAW_DATA, ConsumerRecord.class);
|
||||
|
||||
@@ -683,11 +700,25 @@ public class KafkaMessageChannelBinder extends
|
||||
Headers kafkaHeaders = new RecordHeaders(record.headers().toArray());
|
||||
AtomicReference<ConsumerRecord<?, ?>> recordToSend = new AtomicReference<>(record);
|
||||
if (message.getPayload() instanceof Throwable) {
|
||||
|
||||
Throwable throwable = (Throwable) message.getPayload();
|
||||
|
||||
HeaderMode headerMode = properties.getHeaderMode();
|
||||
|
||||
if (headerMode == null || HeaderMode.headers.equals(headerMode)) {
|
||||
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TOPIC,
|
||||
record.topic().getBytes(StandardCharsets.UTF_8)));
|
||||
|
||||
kafkaHeaders.add(
|
||||
new RecordHeader(X_ORIGINAL_TOPIC, record.topic().getBytes(StandardCharsets.UTF_8)));
|
||||
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_PARTITION,
|
||||
ByteBuffer.allocate(Integer.BYTES).putInt(record.partition()).array()));
|
||||
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_OFFSET,
|
||||
ByteBuffer.allocate(Long.BYTES).putLong(record.offset()).array()));
|
||||
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TIMESTAMP,
|
||||
ByteBuffer.allocate(Long.BYTES).putLong(record.timestamp()).array()));
|
||||
kafkaHeaders.add(new RecordHeader(X_ORIGINAL_TIMESTAMP_TYPE,
|
||||
record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
|
||||
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_FQCN,
|
||||
throwable.getClass().getName().getBytes(StandardCharsets.UTF_8)));
|
||||
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_MESSAGE,
|
||||
throwable.getMessage().getBytes(StandardCharsets.UTF_8)));
|
||||
kafkaHeaders.add(new RecordHeader(X_EXCEPTION_STACKTRACE,
|
||||
@@ -696,14 +727,18 @@ public class KafkaMessageChannelBinder extends
|
||||
else if (HeaderMode.embeddedHeaders.equals(headerMode)) {
|
||||
try {
|
||||
MessageValues messageValues = EmbeddedHeaderUtils
|
||||
.extractHeaders(MessageBuilder.withPayload((byte[]) record.value()).build(),
|
||||
false);
|
||||
.extractHeaders(MessageBuilder.withPayload((byte[]) record.value()).build(), false);
|
||||
messageValues.put(X_ORIGINAL_TOPIC, record.topic());
|
||||
messageValues.put(X_ORIGINAL_PARTITION, record.partition());
|
||||
messageValues.put(X_ORIGINAL_OFFSET, record.offset());
|
||||
messageValues.put(X_ORIGINAL_TIMESTAMP, record.timestamp());
|
||||
messageValues.put(X_ORIGINAL_TIMESTAMP_TYPE, record.timestampType().toString());
|
||||
messageValues.put(X_EXCEPTION_FQCN, throwable.getClass().getName());
|
||||
messageValues.put(X_EXCEPTION_MESSAGE, throwable.getMessage());
|
||||
messageValues.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(throwable));
|
||||
|
||||
final String[] headersToEmbed = new ArrayList<>(messageValues.keySet()).toArray(
|
||||
new String[messageValues.keySet().size()]);
|
||||
final String[] headersToEmbed = new ArrayList<>(messageValues.keySet())
|
||||
.toArray(new String[messageValues.keySet().size()]);
|
||||
byte[] payload = EmbeddedHeaderUtils.embedHeaders(messageValues,
|
||||
EmbeddedHeaderUtils.headersToEmbed(headersToEmbed));
|
||||
recordToSend.set(new ConsumerRecord<Object, Object>(record.topic(), record.partition(),
|
||||
@@ -715,8 +750,7 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
}
|
||||
String dlqName = StringUtils.hasText(kafkaConsumerProperties.getDlqName())
|
||||
? kafkaConsumerProperties.getDlqName()
|
||||
: "error." + record.topic() + "." + group;
|
||||
? kafkaConsumerProperties.getDlqName() : "error." + record.topic() + "." + group;
|
||||
dlqSender.sendToDlq(recordToSend.get(), kafkaHeaders, dlqName);
|
||||
};
|
||||
}
|
||||
@@ -747,10 +781,10 @@ public class KafkaMessageChannelBinder extends
|
||||
((MessagingException) message.getPayload()).getFailedMessage());
|
||||
if (ack != null) {
|
||||
if (isAutoCommitOnError(properties)) {
|
||||
ack.acknowledge(Status.REJECT);
|
||||
ack.acknowledge(AcknowledgmentCallback.Status.REJECT);
|
||||
}
|
||||
else {
|
||||
ack.acknowledge(Status.REQUEUE);
|
||||
ack.acknowledge(AcknowledgmentCallback.Status.REQUEUE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
|
||||
@@ -59,7 +61,8 @@ import org.springframework.lang.Nullable;
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(Binder.class)
|
||||
@Import({KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderHealthIndicatorConfiguration.class })
|
||||
@Import({ KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
|
||||
KafkaBinderHealthIndicatorConfiguration.class })
|
||||
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
|
||||
public class KafkaBinderConfiguration {
|
||||
|
||||
@@ -84,7 +87,7 @@ public class KafkaBinderConfiguration {
|
||||
|
||||
@Bean
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
|
||||
KafkaTopicProvisioner provisioningProvider, @Nullable ListenerContainerCustomizer<AbstractMessageListenerContainer<?,?>> listenerContainerCustomizer) {
|
||||
KafkaTopicProvisioner provisioningProvider, @Nullable ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer) {
|
||||
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
configurationProperties, provisioningProvider, listenerContainerCustomizer);
|
||||
@@ -100,8 +103,34 @@ public class KafkaBinderConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
|
||||
return new KafkaJaasLoginModuleInitializer();
|
||||
@ConditionalOnMissingBean(KafkaJaasLoginModuleInitializer.class)
|
||||
public KafkaJaasLoginModuleInitializer jaasInitializer(KafkaBinderConfigurationProperties configurationProperties) throws IOException {
|
||||
KafkaJaasLoginModuleInitializer kafkaJaasLoginModuleInitializer = new KafkaJaasLoginModuleInitializer();
|
||||
JaasLoginModuleConfiguration jaas = configurationProperties.getJaas();
|
||||
if (jaas != null) {
|
||||
kafkaJaasLoginModuleInitializer.setLoginModule(jaas.getLoginModule());
|
||||
|
||||
KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = null;
|
||||
AppConfigurationEntry.LoginModuleControlFlag controlFlagValue = jaas.getControlFlagValue();
|
||||
|
||||
if (AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL.equals(controlFlagValue)) {
|
||||
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.OPTIONAL;
|
||||
}
|
||||
else if (AppConfigurationEntry.LoginModuleControlFlag.REQUIRED.equals(controlFlagValue)) {
|
||||
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED;
|
||||
}
|
||||
else if (AppConfigurationEntry.LoginModuleControlFlag.REQUISITE.equals(controlFlagValue)) {
|
||||
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUISITE;
|
||||
}
|
||||
else if (AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT.equals(controlFlagValue)) {
|
||||
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.SUFFICIENT;
|
||||
}
|
||||
if (controlFlag != null) {
|
||||
kafkaJaasLoginModuleInitializer.setControlFlag(controlFlag);
|
||||
}
|
||||
kafkaJaasLoginModuleInitializer.setOptions(jaas.getOptions());
|
||||
}
|
||||
return kafkaJaasLoginModuleInitializer;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -46,7 +46,8 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
@TestPropertySource(properties = {
|
||||
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replication-factor=2",
|
||||
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replicas-assignments.0=0,1",
|
||||
"spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0" })
|
||||
"spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0",
|
||||
"spring.main.allow-bean-definition-overriding=true"})
|
||||
@EnableIntegration
|
||||
public class AdminConfigTests {
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
@Test
|
||||
public void kafkaBinderIsUp() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group1-healthIndicator", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Health health = indicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.UP);
|
||||
@@ -82,7 +82,7 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
@Test
|
||||
public void kafkaBinderIsDown() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group2-healthIndicator", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
Health health = indicator.health();
|
||||
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
|
||||
@@ -91,7 +91,7 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
@Test(timeout = 5000)
|
||||
public void kafkaBinderDoesNotAnswer() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group3-healthIndicator", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willAnswer(new Answer<Object>() {
|
||||
|
||||
@Override
|
||||
@@ -110,7 +110,7 @@ public class KafkaBinderHealthIndicatorTest {
|
||||
@Test
|
||||
public void createsConsumerOnceWhenInvokedMultipleTimes() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group", partitions));
|
||||
topicsInUse.put(TEST_TOPIC, new KafkaMessageChannelBinder.TopicInformation("group4-healthIndicator", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
|
||||
indicator.health();
|
||||
|
||||
@@ -84,11 +84,11 @@ public class KafkaBinderMetricsTest {
|
||||
public void shouldIndicateLag() {
|
||||
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group1-metrics", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group1-metrics").tag("topic", TEST_TOPIC).timeGauge()
|
||||
.value(TimeUnit.MILLISECONDS)).isEqualTo(500.0);
|
||||
}
|
||||
|
||||
@@ -100,22 +100,22 @@ public class KafkaBinderMetricsTest {
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(ArgumentMatchers.anyCollection())).willReturn(endOffsets);
|
||||
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0), new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group2-metrics", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group2-metrics").tag("topic", TEST_TOPIC).timeGauge()
|
||||
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldIndicateFullLagForNotCommittedGroups() {
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group3-metrics", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group3-metrics").tag("topic", TEST_TOPIC).timeGauge()
|
||||
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
|
||||
}
|
||||
|
||||
@@ -130,11 +130,11 @@ public class KafkaBinderMetricsTest {
|
||||
@Test
|
||||
public void createsConsumerOnceWhenInvokedMultipleTimes() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group4-metrics", partitions));
|
||||
|
||||
metrics.bindTo(meterRegistry);
|
||||
|
||||
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge();
|
||||
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group4-metrics").tag("topic", TEST_TOPIC).timeGauge();
|
||||
gauge.value(TimeUnit.MILLISECONDS);
|
||||
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
|
||||
|
||||
@@ -147,11 +147,11 @@ public class KafkaBinderMetricsTest {
|
||||
.willReturn(consumer);
|
||||
|
||||
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group5-metrics", partitions));
|
||||
|
||||
metrics.bindTo(meterRegistry);
|
||||
|
||||
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge();
|
||||
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group5-metrics").tag("topic", TEST_TOPIC).timeGauge();
|
||||
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(0);
|
||||
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@@ -30,10 +31,10 @@ import java.util.UUID;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.apache.kafka.clients.admin.CreateTopicsResult;
|
||||
@@ -48,6 +49,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.KafkaFuture;
|
||||
import org.apache.kafka.common.record.TimestampType;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.apache.kafka.common.serialization.Deserializer;
|
||||
@@ -80,6 +82,7 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerPro
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.utils.KafkaTopicUtils;
|
||||
import org.springframework.cloud.stream.binding.MessageConverterConfigurer.PartitioningInterceptor;
|
||||
import org.springframework.cloud.stream.config.BindingProperties;
|
||||
import org.springframework.cloud.stream.provisioning.ProvisioningException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
@@ -100,8 +103,8 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ContainerProperties;
|
||||
import org.springframework.kafka.listener.MessageListenerContainer;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
@@ -109,7 +112,7 @@ import org.springframework.kafka.support.SendResult;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.kafka.support.converter.MessagingMessageConverter;
|
||||
import org.springframework.kafka.test.core.BrokerAddress;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
@@ -118,11 +121,11 @@ import org.springframework.messaging.MessageHandlingException;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.ChannelInterceptor;
|
||||
import org.springframework.messaging.support.ErrorMessage;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.SettableListenableFuture;
|
||||
@@ -149,7 +152,7 @@ public class KafkaBinderTests extends
|
||||
private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName();
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10, "error.pollableDlq.group");
|
||||
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10, "error.pollableDlq.group-pcWithDlq");
|
||||
|
||||
private KafkaTestBinder binder;
|
||||
|
||||
@@ -209,7 +212,7 @@ public class KafkaBinderTests extends
|
||||
private KafkaBinderConfigurationProperties createConfigurationProperties() {
|
||||
KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties(
|
||||
new TestKafkaProperties());
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getEmbeddedKafka().getBrokerAddresses();
|
||||
List<String> bAddresses = new ArrayList<>();
|
||||
for (BrokerAddress bAddress : brokerAddresses) {
|
||||
bAddresses.add(bAddress.toString());
|
||||
@@ -223,7 +226,7 @@ public class KafkaBinderTests extends
|
||||
return consumerFactory().createConsumer().partitionsFor(topic).size();
|
||||
}
|
||||
|
||||
private void invokeCreateTopic(String topic, int partitions, int replicationFactor) throws Throwable {
|
||||
private void invokeCreateTopic(String topic, int partitions, int replicationFactor) throws Exception {
|
||||
|
||||
NewTopic newTopic = new NewTopic(topic, partitions,
|
||||
(short) replicationFactor);
|
||||
@@ -242,7 +245,7 @@ public class KafkaBinderTests extends
|
||||
timeoutMultiplier = Double.parseDouble(multiplier);
|
||||
}
|
||||
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses();
|
||||
BrokerAddress[] brokerAddresses = embeddedKafka.getEmbeddedKafka().getBrokerAddresses();
|
||||
List<String> bAddresses = new ArrayList<>();
|
||||
for (BrokerAddress bAddress : brokerAddresses) {
|
||||
bAddresses.add(bAddress.toString());
|
||||
@@ -334,9 +337,9 @@ public class KafkaBinderTests extends
|
||||
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
|
||||
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
|
||||
Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")).isInstanceOf(MimeType.class);
|
||||
MimeType actual = (MimeType) inboundMessageRef.get().getHeaders().get("foo");
|
||||
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
|
||||
Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")).isInstanceOf(String.class);
|
||||
String actual = (String) inboundMessageRef.get().getHeaders().get("foo");
|
||||
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN.toString());
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
@@ -493,7 +496,7 @@ public class KafkaBinderTests extends
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
|
||||
.isEqualTo("foo.bar".getBytes(StandardCharsets.UTF_8));
|
||||
assertThat(new String((byte[]) receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE)))
|
||||
.startsWith("failed to send Message to channel 'input'");
|
||||
.startsWith("Dispatcher failed to deliver Message; nested exception is java.lang.RuntimeException: fail");
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE))
|
||||
.isNotNull();
|
||||
binderBindUnbindLatency();
|
||||
@@ -660,21 +663,51 @@ public class KafkaBinderTests extends
|
||||
assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes());
|
||||
if (HeaderMode.embeddedHeaders.equals(headerMode)) {
|
||||
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
|
||||
.isEqualTo(producerName);
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_PARTITION)).isEqualTo(0);
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_OFFSET))
|
||||
.isEqualTo(0);
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP)).isNotNull();
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP_TYPE))
|
||||
.isEqualTo(TimestampType.CREATE_TIME.toString());
|
||||
|
||||
assertThat(((String) receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE)))
|
||||
.startsWith("failed to send Message to channel 'input'");
|
||||
.startsWith("Dispatcher failed to deliver Message; nested exception is java.lang.RuntimeException: fail");
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE))
|
||||
.isNotNull();
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_FQCN)).isNotNull();
|
||||
}
|
||||
else if (!HeaderMode.none.equals(headerMode)) {
|
||||
assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts());
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC))
|
||||
.isEqualTo(producerName.getBytes(StandardCharsets.UTF_8));
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_PARTITION))
|
||||
.isEqualTo(ByteBuffer.allocate(Integer.BYTES).putInt(0).array());
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_OFFSET))
|
||||
.isEqualTo(ByteBuffer.allocate(Long.BYTES).putLong(0).array());
|
||||
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP)).isNotNull();
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TIMESTAMP_TYPE))
|
||||
.isEqualTo(TimestampType.CREATE_TIME.toString().getBytes());
|
||||
|
||||
assertThat(new String((byte[]) receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_MESSAGE)))
|
||||
.startsWith("failed to send Message to channel 'input'");
|
||||
.startsWith("Dispatcher failed to deliver Message; nested exception is java.lang.RuntimeException: fail");
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_STACKTRACE))
|
||||
.isNotNull();
|
||||
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_EXCEPTION_FQCN)).isNotNull();
|
||||
}
|
||||
else {
|
||||
assertThat(receivedMessage.getHeaders().get(KafkaMessageChannelBinder.X_ORIGINAL_TOPIC)).isNull();
|
||||
@@ -1132,7 +1165,7 @@ public class KafkaBinderTests extends
|
||||
|
||||
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding,
|
||||
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
|
||||
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.BATCH);
|
||||
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(ContainerProperties.AckMode.BATCH);
|
||||
|
||||
String testPayload1 = "foo" + UUID.randomUUID().toString();
|
||||
Message<?> message1 = org.springframework.integration.support.MessageBuilder.withPayload(
|
||||
@@ -1179,7 +1212,7 @@ public class KafkaBinderTests extends
|
||||
|
||||
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding2,
|
||||
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
|
||||
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.RECORD);
|
||||
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(ContainerProperties.AckMode.RECORD);
|
||||
|
||||
Message<?> receivedMessage1 = receive(inbound1);
|
||||
assertThat(receivedMessage1).isNotNull();
|
||||
@@ -1221,6 +1254,7 @@ public class KafkaBinderTests extends
|
||||
producerProperties.setPartitionKeyExpression(spelExpressionParser.parseExpression("payload"));
|
||||
producerProperties.setPartitionSelectorExpression(spelExpressionParser.parseExpression("hashCode()"));
|
||||
producerProperties.setPartitionCount(3);
|
||||
invokeCreateTopic("output", 6, 1);
|
||||
|
||||
DirectChannel output = createBindableChannel("output", createProducerBindingProperties(producerProperties));
|
||||
output.setBeanName("test.output");
|
||||
@@ -1232,7 +1266,14 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
catch (UnsupportedOperationException ignored) {
|
||||
}
|
||||
|
||||
List<ChannelInterceptor> interceptors = output.getChannelInterceptors();
|
||||
AtomicInteger count = new AtomicInteger();
|
||||
interceptors.forEach(interceptor -> {
|
||||
if (interceptor instanceof PartitioningInterceptor) {
|
||||
count.set(TestUtils.getPropertyValue(interceptor, "partitionHandler.partitionCount", Integer.class));
|
||||
}
|
||||
});
|
||||
assertThat(count.get()).isEqualTo(6);
|
||||
Message<Integer> message2 = org.springframework.integration.support.MessageBuilder.withPayload(2)
|
||||
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "foo")
|
||||
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 42)
|
||||
@@ -2423,7 +2464,7 @@ public class KafkaBinderTests extends
|
||||
assertThat(inbound.getHeaders().get(BinderHeaders.NATIVE_HEADERS_PRESENT)).isNull();
|
||||
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testSendAndReceiveWithMixedMode", "false",
|
||||
embeddedKafka);
|
||||
embeddedKafka.getEmbeddedKafka());
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
@@ -2460,9 +2501,9 @@ public class KafkaBinderTests extends
|
||||
PollableSource<MessageHandler> inboundBindTarget = new DefaultPollableMessageSource(this.messageConverter);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProps = createConsumerProperties();
|
||||
consumerProps.setMultiplex(true);
|
||||
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer("pollable,anotherOne", "group",
|
||||
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer("pollable,anotherOne", "group-polledConsumer",
|
||||
inboundBindTarget, consumerProps);
|
||||
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
|
||||
KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
|
||||
template.send("pollable", "testPollable");
|
||||
boolean polled = inboundBindTarget.poll(m -> {
|
||||
@@ -2501,8 +2542,8 @@ public class KafkaBinderTests extends
|
||||
properties.setMaxAttempts(2);
|
||||
properties.setBackOffInitialInterval(0);
|
||||
properties.getExtension().setEnableDlq(true);
|
||||
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
|
||||
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer("pollableDlq", "group",
|
||||
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
|
||||
Binding<PollableSource<MessageHandler>> binding = binder.bindPollableConsumer("pollableDlq", "group-pcWithDlq",
|
||||
inboundBindTarget, properties);
|
||||
KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps));
|
||||
template.send("pollableDlq", "testPollableDLQ");
|
||||
@@ -2518,12 +2559,12 @@ public class KafkaBinderTests extends
|
||||
catch (MessageHandlingException e) {
|
||||
assertThat(e.getCause().getMessage()).isEqualTo("test DLQ");
|
||||
}
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("dlq", "false", embeddedKafka);
|
||||
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("dlq", "false", embeddedKafka.getEmbeddedKafka());
|
||||
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
ConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps);
|
||||
Consumer consumer = cf.createConsumer();
|
||||
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "error.pollableDlq.group");
|
||||
ConsumerRecord deadLetter = KafkaTestUtils.getSingleRecord(consumer, "error.pollableDlq.group");
|
||||
embeddedKafka.getEmbeddedKafka().consumeFromAnEmbeddedTopic(consumer, "error.pollableDlq.group-pcWithDlq");
|
||||
ConsumerRecord deadLetter = KafkaTestUtils.getSingleRecord(consumer, "error.pollableDlq.group-pcWithDlq");
|
||||
assertThat(deadLetter).isNotNull();
|
||||
assertThat(deadLetter.value()).isEqualTo("testPollableDLQ");
|
||||
binding.unbind();
|
||||
@@ -2534,7 +2575,7 @@ public class KafkaBinderTests extends
|
||||
@Test
|
||||
public void testTopicPatterns() throws Exception {
|
||||
try (AdminClient admin = AdminClient.create(Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
embeddedKafka.getBrokersAsString()))) {
|
||||
embeddedKafka.getEmbeddedKafka().getBrokersAsString()))) {
|
||||
admin.createTopics(Collections.singletonList(new NewTopic("topicPatterns.1", 1, (short) 1))).all().get();
|
||||
Binder binder = getBinder();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
@@ -2549,7 +2590,7 @@ public class KafkaBinderTests extends
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer("topicPatterns\\..*",
|
||||
"testTopicPatterns", moduleInputChannel, consumerProperties);
|
||||
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(
|
||||
KafkaTestUtils.producerProps(embeddedKafka));
|
||||
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka()));
|
||||
KafkaTemplate template = new KafkaTemplate(pf);
|
||||
template.send("topicPatterns.1", "foo");
|
||||
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.time.Duration;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
@@ -36,6 +37,7 @@ import org.apache.kafka.common.TopicPartition;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.cloud.stream.binder.Binding;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
@@ -82,25 +84,25 @@ public class KafkaBinderUnitTests {
|
||||
method.setAccessible(true);
|
||||
|
||||
// test default for anon
|
||||
Object factory = method.invoke(binder, true, "foo", ecp);
|
||||
Object factory = method.invoke(binder, true, "foo-1", ecp);
|
||||
Map<?, ?> configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
|
||||
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");
|
||||
|
||||
// test default for named
|
||||
factory = method.invoke(binder, false, "foo", ecp);
|
||||
factory = method.invoke(binder, false, "foo-2", ecp);
|
||||
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
|
||||
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
|
||||
|
||||
// binder level setting
|
||||
binderConfigurationProperties.setConfiguration(
|
||||
Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"));
|
||||
factory = method.invoke(binder, false, "foo", ecp);
|
||||
factory = method.invoke(binder, false, "foo-3", ecp);
|
||||
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
|
||||
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("latest");
|
||||
|
||||
// consumer level setting
|
||||
consumerProps.setConfiguration(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
|
||||
factory = method.invoke(binder, false, "foo", ecp);
|
||||
factory = method.invoke(binder, false, "foo-4", ecp);
|
||||
configs = TestUtils.getPropertyValue(factory, "configs", Map.class);
|
||||
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
|
||||
}
|
||||
@@ -131,38 +133,38 @@ public class KafkaBinderUnitTests {
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithGroupManagementEarliest() throws Exception {
|
||||
testOffsetResetWithGroupManagement(true, true);
|
||||
testOffsetResetWithGroupManagement(true, true, "foo-100", "testOffsetResetWithGroupManagementEarliest");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithGroupManagementLatest() throws Throwable {
|
||||
testOffsetResetWithGroupManagement(false, true);
|
||||
testOffsetResetWithGroupManagement(false, true, "foo-101", "testOffsetResetWithGroupManagementLatest");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithManualAssignmentEarliest() throws Exception {
|
||||
testOffsetResetWithGroupManagement(true, false);
|
||||
testOffsetResetWithGroupManagement(true, false, "foo-102", "testOffsetResetWithManualAssignmentEarliest");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithGroupManualAssignmentLatest() throws Throwable {
|
||||
testOffsetResetWithGroupManagement(false, false);
|
||||
testOffsetResetWithGroupManagement(false, false, "foo-103", "testOffsetResetWithGroupManualAssignmentLatest");
|
||||
}
|
||||
|
||||
private void testOffsetResetWithGroupManagement(final boolean earliest, boolean groupManage) throws Exception {
|
||||
private void testOffsetResetWithGroupManagement(final boolean earliest, boolean groupManage, String topic, String group) throws Exception {
|
||||
final List<TopicPartition> partitions = new ArrayList<>();
|
||||
partitions.add(new TopicPartition("foo", 0));
|
||||
partitions.add(new TopicPartition("foo", 1));
|
||||
partitions.add(new TopicPartition(topic, 0));
|
||||
partitions.add(new TopicPartition(topic, 1));
|
||||
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
|
||||
new TestKafkaProperties());
|
||||
KafkaTopicProvisioner provisioningProvider = mock(KafkaTopicProvisioner.class);
|
||||
ConsumerDestination dest = mock(ConsumerDestination.class);
|
||||
given(dest.getName()).willReturn("foo");
|
||||
given(dest.getName()).willReturn(topic);
|
||||
given(provisioningProvider.provisionConsumerDestination(anyString(), anyString(), any())).willReturn(dest);
|
||||
final AtomicInteger part = new AtomicInteger();
|
||||
willAnswer(i -> {
|
||||
return partitions.stream()
|
||||
.map(p -> new PartitionInfo("foo", part.getAndIncrement(), null, null, null))
|
||||
.map(p -> new PartitionInfo(topic, part.getAndIncrement(), null, null, null))
|
||||
.collect(Collectors.toList());
|
||||
}).given(provisioningProvider).getPartitionsForTopic(anyInt(), anyBoolean(), any());
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -176,14 +178,14 @@ public class KafkaBinderUnitTests {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
return new ConsumerRecords<>(Collections.emptyMap());
|
||||
}).given(consumer).poll(anyLong());
|
||||
}).given(consumer).poll(any(Duration.class));
|
||||
willAnswer(i -> {
|
||||
((org.apache.kafka.clients.consumer.ConsumerRebalanceListener) i.getArgument(1))
|
||||
.onPartitionsAssigned(partitions);
|
||||
latch.countDown();
|
||||
latch.countDown();
|
||||
return null;
|
||||
}).given(consumer).subscribe(eq(Collections.singletonList("foo")),
|
||||
}).given(consumer).subscribe(eq(Collections.singletonList(topic)),
|
||||
any(org.apache.kafka.clients.consumer.ConsumerRebalanceListener.class));
|
||||
willAnswer(i -> {
|
||||
latch.countDown();
|
||||
@@ -193,7 +195,7 @@ public class KafkaBinderUnitTests {
|
||||
|
||||
@Override
|
||||
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
|
||||
|
||||
return new ConsumerFactory<byte[], byte[]>() {
|
||||
|
||||
@@ -212,6 +214,11 @@ public class KafkaBinderUnitTests {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Consumer<byte[], byte[]> createConsumer(String groupId, String clientIdPrefix, String clientIdSuffix) {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoCommit() {
|
||||
return false;
|
||||
@@ -222,7 +229,7 @@ public class KafkaBinderUnitTests {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
|
||||
earliest ? "earliest" : "latest");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, "bar");
|
||||
props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
|
||||
return props;
|
||||
}
|
||||
|
||||
@@ -240,7 +247,7 @@ public class KafkaBinderUnitTests {
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<KafkaConsumerProperties>(
|
||||
extension);
|
||||
consumerProperties.setInstanceCount(1);
|
||||
binder.bindConsumer("foo", "bar", channel, consumerProperties);
|
||||
Binding<MessageChannel> messageChannelBinding = binder.bindConsumer(topic, group, channel, consumerProperties);
|
||||
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
if (groupManage) {
|
||||
if (earliest) {
|
||||
@@ -260,7 +267,7 @@ public class KafkaBinderUnitTests {
|
||||
verify(consumer).seek(partitions.get(1), Long.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
messageChannelBinding.unbind();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -34,7 +34,7 @@ import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProv
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
|
||||
@@ -53,13 +53,13 @@ import static org.mockito.Mockito.spy;
|
||||
public class KafkaTransactionTests {
|
||||
|
||||
@ClassRule
|
||||
public static final KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);
|
||||
public static final EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1);
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Test
|
||||
public void testProducerRunsInTx() {
|
||||
KafkaProperties kafkaProperties = new TestKafkaProperties();
|
||||
kafkaProperties.setBootstrapServers(Collections.singletonList(embeddedKafka.getBrokersAsString()));
|
||||
kafkaProperties.setBootstrapServers(Collections.singletonList(embeddedKafka.getEmbeddedKafka().getBrokersAsString()));
|
||||
KafkaBinderConfigurationProperties configurationProperties =
|
||||
new KafkaBinderConfigurationProperties(kafkaProperties);
|
||||
configurationProperties.getTransaction().setTransactionIdPrefix("foo-");
|
||||
|
||||
@@ -23,7 +23,7 @@ import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
@@ -31,14 +31,14 @@ import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
public class KafkaBinderBootstrapTest {
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
|
||||
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10);
|
||||
|
||||
@Test
|
||||
public void testKafkaBinderConfiguration() throws Exception {
|
||||
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
|
||||
.web(WebApplicationType.NONE)
|
||||
.run("--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
|
||||
.run("--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getEmbeddedKafka().getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.binder.zkNodes=" + embeddedKafka.getEmbeddedKafka().getZookeeperConnectionString());
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,6 @@ import java.util.Map;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
@@ -43,7 +42,7 @@ import org.springframework.cloud.stream.messaging.Sink;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
@@ -61,16 +60,16 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
properties = "spring.cloud.stream.bindings.input.group=" + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP)
|
||||
public class KafkaBinderActuatorTests {
|
||||
|
||||
static final String TEST_CONSUMER_GROUP = "testGroup";
|
||||
static final String TEST_CONSUMER_GROUP = "testGroup-actuatorTests";
|
||||
|
||||
private static final String KAFKA_BROKERS_PROPERTY = "spring.kafka.bootstrap-servers";
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true);
|
||||
public static EmbeddedKafkaRule kafkaEmbedded = new EmbeddedKafkaRule(1, true);
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
System.setProperty(KAFKA_BROKERS_PROPERTY, kafkaEmbedded.getBrokersAsString());
|
||||
System.setProperty(KAFKA_BROKERS_PROPERTY, kafkaEmbedded.getEmbeddedKafka().getBrokersAsString());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
||||
Reference in New Issue
Block a user