Compare commits

..

24 Commits

Author SHA1 Message Date
Soby Chacko
b8267ea81e 2.0.0.RC3 Release 2018-03-12 15:23:09 -04:00
Gary Russell
2b595b004f GH-337: Add ackEachRecord property
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/337
Resolves #338
2018-03-10 11:02:50 -05:00
Gary Russell
de45edc962 Add missing binder headerMapperBeanName to doc. 2018-03-08 11:46:03 -05:00
Gary Russell
2406fe5237 Event Publisher Polishing
Now that the abstract binder makes its event publisher available to subclasses,
use it, if present, instead of the application context.
In most cases, they will be the same object, but the user might override the
publisher.

Resolves #336
2018-03-08 09:23:14 -05:00
Oleg Zhurakousky
b5a0013e1e GH-330 Polishing
Resolves #330
Resolves #334
2018-03-08 09:11:11 -05:00
Gary Russell
c814ad5595 GH-330: Kafka Topic Provisioning Improvements
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/330

- allow override of binder-wide `replicationFactor` for each binding
- allow specific partition/replica configuration
- allow setting `NewTopic.configs()` properties, similar to the consumer and producer
- use a new `AdminClient` for provisioning (and `close()` it) instead of keeping a long-lived connection open.
2018-03-08 09:10:27 -05:00
Oleg Zhurakousky
def2c3d0ed SCST-GH-1259 Polishing
- added '@DeprecatedConfigurationProperty'
- minor doc polishing

Resolves #335
2018-03-07 16:04:44 -05:00
Gary Russell
10a44d1e44 SCST-GH-1259: Kafka Binder Doc Polishing
Fixes https://github.com/spring-cloud/spring-cloud-stream/issues/1259

Also deprecate properties that are no longer used.

Missed a save.
2018-03-07 16:03:36 -05:00
Oleg Zhurakousky
0de078ca48 GH-326 added KafkaAutoConfiguration to KafkaBinderConfiguration
- added KafkaAutoConfiguration to the @Import of KafkaBinderConfiguration
- removed 'optional' flag for KafkaProperties from KafkaBinderConfigurationProperties
- fixed KafkaBinderAutoConfigurationPropertiesTest

Resolves #326
Resolves #333
2018-03-06 13:30:41 -05:00
Gary Russell
8035e25359 GH-67: Workaround for SK GH-599
Fixes #67

Spring Kafka currently doesn't support `TPIO.SeekPosition` for initial offsets.
Instead, use 0 and `Long.MAX_VALUE` for `BEGINNING` and `END` respectively.

Resolves #331
2018-03-06 13:16:53 -05:00
Artem Bilan
bcf15ed3be GH-328: Make KafkaBinderMetrics bean conditional
Fixes: spring-cloud/spring-cloud-stream-binder-kafka#328

Since we consider a Micrometer dependency as an optional, it would be
better do not expose beans which depends of that library

* Move `KafkaBinderMetrics` to its own `@Configuration` class with
appropriate conditions on the classpath and beans presence
* Add an `ApplicationContextRunner`-based test-case to achieve a
condition when Micrometer is not in classpath via `FilteredClassLoader`
hook

Resolves #328
Resolves #332
2018-03-06 12:40:16 -05:00
Gary Russell
d37ef750ad GH-67: Reinstate resetOffsets property
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/67

Currently only supported with group management (`autoRebalanceEnabled` - default true).

See https://github.com/spring-projects/spring-kafka/issues/599

Resolves #67
Resolves #329
2018-03-05 20:50:17 -05:00
Oleg Zhurakousky
d8baca5a66 Polishing
Resolves #325
2018-03-05 20:03:24 -05:00
Jon Schneider
b9d7f1f537 Change KafkaBinderMetrics to a tagged TimeGauge 2018-03-05 19:56:12 -05:00
Oleg Zhurakousky
ad819ece92 GH-322 added @Conditional for 'KafkaBinderHealthIndicator' bean 2018-03-02 09:08:39 -05:00
Soby Chacko
e254968eaf Next version: 2.0.0.BUILD-SNAPSHOT 2018-03-01 09:30:11 -05:00
Soby Chacko
25a64e1d85 2.0.0.RC2 Release 2018-03-01 09:15:56 -05:00
Oleg Zhurakousky
353c89ab63 GH-250 Reworked binding event propagation
- Hooked up to the new `BindingCreatedEvent`

Simple polishing

Fixes spring-cloud/spring-cloud-stream-binder-kafka#250
2018-02-28 15:47:47 -05:00
Artem Bilan
227d8f39f6 GH-318: Fix KafkaBinderMetrics for Micrometer
Fixes spring-cloud/spring-cloud-stream-binder-kafka#318

* Use `ToDoubleFunction`-based `MeterRegistry.gauge()` variant to really
calculate a `lag` at runtime instead of statically defined before
* Add `KafkaBinderActuatorTests` integration test to demonstrate how
`Binder` is declared in the separate child context and how
`KafkaBinderMetrics` is not visible from the parent context.
This test also verify the real `gauge` value for the `consumer lag`
and should be used in the future to verify the `KafkaBinderMetrics`
exposure removing the code after TODO in the `KafkaMetricsTestConfig`
2018-02-27 18:51:24 -05:00
Oleg Zhurakousky
555d3ccbd8 GH-319 bumped up SIK and SK versions
Resolves #319
2018-02-27 11:30:36 -05:00
Oleg Zhurakousky
d7c5c9c13b GH-1211 made Actuator optional 2018-02-26 23:03:39 -05:00
Sabby Anandan
e23af0ccc1 Revise Kafka Streams docs (#317)
* Revise Kafka Streams docs

* Remove KStream starter-pom reference

* Remove KStreams from aggregate docs
2018-02-26 19:14:54 -05:00
Soby Chacko
3fd93e7de8 Kafka Streams binder autoconfiguration changes
Make KafkaStreamsBinderSupportAutoConfiguration conditional
on BindingSerive being present in the BeanFactory.
2018-02-23 16:09:58 -05:00
Soby Chacko
90a778da6a Next version: 2.0.0.BUILD-SNAPSHOT 2018-02-23 12:53:17 -05:00
26 changed files with 1297 additions and 529 deletions

10
pom.xml
View File

@@ -2,20 +2,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC1</version>
<version>2.0.0.RC3</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.0.0.RC1</version>
<version>2.0.0.RC2</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.1.3.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.0.2.RELEASE</spring-integration-kafka.version>
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.0.3.RELEASE</spring-integration-kafka.version>
<kafka.version>1.0.0</kafka.version>
<spring-cloud-stream.version>2.0.0.RC1</spring-cloud-stream.version>
<spring-cloud-stream.version>2.0.0.RC3</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC1</version>
<version>2.0.0.RC3</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC1</version>
<version>2.0.0.RC3</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -0,0 +1,62 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Properties for configuring topics.
*
* @author Gary Russell
* @since 2.0
*
*/
public class KafkaAdminProperties {
private Short replicationFactor;
private Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
private Map<String, String> configuration = new HashMap<>();
public Short getReplicationFactor() {
return this.replicationFactor;
}
public void setReplicationFactor(Short replicationFactor) {
this.replicationFactor = replicationFactor;
}
public Map<Integer, List<Integer>> getReplicasAssignments() {
return this.replicasAssignments;
}
public void setReplicasAssignments(Map<Integer, List<Integer>> replicasAssignments) {
this.replicasAssignments = replicasAssignments;
}
public Map<String, String> getConfiguration() {
return this.configuration;
}
public void setConfiguration(Map<String, String> configuration) {
this.configuration = configuration;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2017 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
@@ -45,7 +46,7 @@ public class KafkaBinderConfigurationProperties {
private final Transaction transaction = new Transaction();
@Autowired(required = false)
@Autowired
private KafkaProperties kafkaProperties;
private String[] zkNodes = new String[] { "localhost" };
@@ -86,7 +87,7 @@ public class KafkaBinderConfigurationProperties {
private String requiredAcks = "1";
private int replicationFactor = 1;
private short replicationFactor = 1;
private int fetchSize = 1024 * 1024;
@@ -110,6 +111,13 @@ public class KafkaBinderConfigurationProperties {
return this.transaction;
}
/**
* No longer used.
* @return the connection String
* @deprecated connection to zookeeper is no longer necessary
*/
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
@Deprecated
public String getZkConnectionString() {
return toConnectionString(this.zkNodes, this.defaultZkPort);
}
@@ -126,26 +134,68 @@ public class KafkaBinderConfigurationProperties {
return this.headers;
}
/**
* No longer used.
* @return the window.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getOffsetUpdateTimeWindow() {
return this.offsetUpdateTimeWindow;
}
/**
* No longer used.
* @return the count.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getOffsetUpdateCount() {
return this.offsetUpdateCount;
}
/**
* No longer used.
* @return the timeout.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getOffsetUpdateShutdownTimeout() {
return this.offsetUpdateShutdownTimeout;
}
/**
* Zookeeper nodes.
* @return the nodes.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public String[] getZkNodes() {
return this.zkNodes;
}
/**
* Zookeeper nodes.
* @param zkNodes the nodes.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public void setZkNodes(String... zkNodes) {
this.zkNodes = zkNodes;
}
/**
* Zookeeper port.
* @param defaultZkPort the port.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public void setDefaultZkPort(String defaultZkPort) {
this.defaultZkPort = defaultZkPort;
}
@@ -166,30 +216,79 @@ public class KafkaBinderConfigurationProperties {
this.headers = headers;
}
/**
* No longer used.
* @param offsetUpdateTimeWindow the window.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) {
this.offsetUpdateTimeWindow = offsetUpdateTimeWindow;
}
/**
* No longer used.
* @param offsetUpdateCount the count.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setOffsetUpdateCount(int offsetUpdateCount) {
this.offsetUpdateCount = offsetUpdateCount;
}
/**
* No longer used.
* @param offsetUpdateShutdownTimeout the timeout.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) {
this.offsetUpdateShutdownTimeout = offsetUpdateShutdownTimeout;
}
/**
* Zookeeper session timeout.
* @return the timeout.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public int getZkSessionTimeout() {
return this.zkSessionTimeout;
}
/**
* Zookeeper session timeout.
* @param zkSessionTimeout the timout
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public void setZkSessionTimeout(int zkSessionTimeout) {
this.zkSessionTimeout = zkSessionTimeout;
}
/**
* Zookeeper connection timeout.
* @return the timout.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public int getZkConnectionTimeout() {
return this.zkConnectionTimeout;
}
/**
* Zookeeper connection timeout.
* @param zkConnectionTimeout the timeout.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public void setZkConnectionTimeout(int zkConnectionTimeout) {
this.zkConnectionTimeout = zkConnectionTimeout;
}
@@ -212,10 +311,24 @@ public class KafkaBinderConfigurationProperties {
return StringUtils.arrayToCommaDelimitedString(fullyFormattedHosts);
}
/**
* No longer used.
* @return the wait.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getMaxWait() {
return this.maxWait;
}
/**
* No longer user.
* @param maxWait the wait.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setMaxWait(int maxWait) {
this.maxWait = maxWait;
}
@@ -232,18 +345,32 @@ public class KafkaBinderConfigurationProperties {
this.requiredAcks = requiredAcks;
}
public int getReplicationFactor() {
public short getReplicationFactor() {
return this.replicationFactor;
}
public void setReplicationFactor(int replicationFactor) {
public void setReplicationFactor(short replicationFactor) {
this.replicationFactor = replicationFactor;
}
/**
* No longer used.
* @return the size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getFetchSize() {
return this.fetchSize;
}
/**
* No longer used.
* @param fetchSize the size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
}
@@ -264,10 +391,24 @@ public class KafkaBinderConfigurationProperties {
this.healthTimeout = healthTimeout;
}
/**
* No longer used.
* @return the queue size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getQueueSize() {
return this.queueSize;
}
/**
* No longer used.
* @param queueSize the queue size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
@@ -288,10 +429,26 @@ public class KafkaBinderConfigurationProperties {
this.autoAddPartitions = autoAddPartitions;
}
/**
* No longer used; set properties such as this via {@link #getConfiguration()
* configuration}.
* @return the size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0, set properties such as this via 'configuration'")
public int getSocketBufferSize() {
return this.socketBufferSize;
}
/**
* No longer used; set properties such as this via {@link #getConfiguration()
* configuration}.
* @param socketBufferSize the size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0, set properties such as this via 'configuration'")
public void setSocketBufferSize(int socketBufferSize) {
this.socketBufferSize = socketBufferSize;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -53,6 +53,8 @@ public class KafkaConsumerProperties {
both
}
private boolean ackEachRecord;
private boolean autoRebalanceEnabled = true;
private boolean autoCommitOffset = true;
@@ -61,6 +63,8 @@ public class KafkaConsumerProperties {
private StartOffset startOffset;
private boolean resetOffsets;
private boolean enableDlq;
private String dlqName;
@@ -79,6 +83,16 @@ public class KafkaConsumerProperties {
private Map<String, String> configuration = new HashMap<>();
private KafkaAdminProperties admin = new KafkaAdminProperties();
public boolean isAckEachRecord() {
return this.ackEachRecord;
}
public void setAckEachRecord(boolean ackEachRecord) {
this.ackEachRecord = ackEachRecord;
}
public boolean isAutoCommitOffset() {
return this.autoCommitOffset;
}
@@ -95,6 +109,14 @@ public class KafkaConsumerProperties {
this.startOffset = startOffset;
}
public boolean isResetOffsets() {
return this.resetOffsets;
}
public void setResetOffsets(boolean resetOffsets) {
this.resetOffsets = resetOffsets;
}
public boolean isEnableDlq() {
return this.enableDlq;
}
@@ -111,10 +133,22 @@ public class KafkaConsumerProperties {
this.autoCommitOnError = autoCommitOnError;
}
/**
* No longer used.
* @return the interval.
* @deprecated
*/
@Deprecated
public int getRecoveryInterval() {
return this.recoveryInterval;
}
/**
* No longer used.
* @param recoveryInterval the interval.
* @deprecated
*/
@Deprecated
public void setRecoveryInterval(int recoveryInterval) {
this.recoveryInterval = recoveryInterval;
}
@@ -182,4 +216,12 @@ public class KafkaConsumerProperties {
this.idleEventInterval = idleEventInterval;
}
public KafkaAdminProperties getAdmin() {
return this.admin;
}
public void setAdmin(KafkaAdminProperties admin) {
this.admin = admin;
}
}

View File

@@ -44,6 +44,8 @@ public class KafkaProducerProperties {
private Map<String, String> configuration = new HashMap<>();
private KafkaAdminProperties admin = new KafkaAdminProperties();
public int getBufferSize() {
return this.bufferSize;
}
@@ -101,6 +103,15 @@ public class KafkaProducerProperties {
this.configuration = configuration;
}
public KafkaAdminProperties getAdmin() {
return this.admin;
}
public void setAdmin(KafkaAdminProperties admin) {
this.admin = admin;
}
public enum CompressionType {
none,
gzip,

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.provisioning;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -44,6 +45,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaAdminProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
@@ -67,6 +69,7 @@ import org.springframework.util.StringUtils;
* @author Gary Russell
* @author Ilayaperumal Gopinathan
* @author Simon Flandergan
* @author Oleg Zhurakousky
*/
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
@@ -77,19 +80,18 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
private final KafkaBinderConfigurationProperties configurationProperties;
private final AdminClient adminClient;
private final int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
private final Map<String, Object> adminClientProperties;
private RetryOperations metadataRetryOperations;
private final int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
Map<String, Object> adminClientProperties = kafkaProperties.buildAdminProperties();
this.adminClientProperties = kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaBinderConfigurationProperties;
normalalizeBootPropsWithBinder(adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
this.adminClient = AdminClient.create(adminClientProperties);
}
/**
@@ -118,33 +120,38 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
}
@Override
public ProducerDestination provisionProducerDestination(final String name, ExtendedProducerProperties<KafkaProducerProperties> properties) {
public ProducerDestination provisionProducerDestination(final String name,
ExtendedProducerProperties<KafkaProducerProperties> properties) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Using kafka topic for outbound: " + name);
}
KafkaTopicUtils.validateTopicName(name);
createTopic(name, properties.getPartitionCount(), false);
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
try (AdminClient adminClient = AdminClient.create(this.adminClientProperties)) {
createTopic(adminClient, name, properties.getPartitionCount(), false, properties.getExtension().getAdmin());
int partitions = 0;
if (this.configurationProperties.isAutoCreateTopics()) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
try {
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
Map<String, TopicDescription> topicDescriptions = null;
try {
topicDescriptions = all.get(this.operationTimeout, TimeUnit.SECONDS);
}
catch (Exception e) {
throw new ProvisioningException("Problems encountered with partitions finding", e);
}
TopicDescription topicDescription = topicDescriptions.get(name);
int partitions = topicDescription.partitions().size();
return new KafkaProducerDestination(name, partitions);
partitions = topicDescription.partitions().size();
}
catch (Exception e) {
throw new ProvisioningException("Problems encountered with partitions finding", e);
}
}
else {
return new KafkaProducerDestination(name);
return new KafkaProducerDestination(name, partitions);
}
}
@Override
public ConsumerDestination provisionConsumerDestination(final String name, final String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
public ConsumerDestination provisionConsumerDestination(final String name, final String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
KafkaTopicUtils.validateTopicName(name);
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
@@ -153,25 +160,32 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
throw new IllegalArgumentException("Instance count cannot be zero");
}
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
createTopic(name, partitionCount, properties.getExtension().isAutoRebalanceEnabled());
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
try {
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
TopicDescription topicDescription = topicDescriptions.get(name);
int partitions = topicDescription.partitions().size();
ConsumerDestination dlqTopic = createDlqIfNeedBe(name, group, properties, anonymous, partitions);
if (dlqTopic != null) {
return dlqTopic;
ConsumerDestination consumerDestination = new KafkaConsumerDestination(name);
try (AdminClient adminClient = createAdminClient()) {
createTopic(adminClient, name, partitionCount, properties.getExtension().isAutoRebalanceEnabled(),
properties.getExtension().getAdmin());
if (this.configurationProperties.isAutoCreateTopics()) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
try {
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
TopicDescription topicDescription = topicDescriptions.get(name);
int partitions = topicDescription.partitions().size();
consumerDestination = createDlqIfNeedBe(adminClient, name, group, properties, anonymous, partitions);
if (consumerDestination == null) {
consumerDestination = new KafkaConsumerDestination(name, partitions);
}
}
catch (Exception e) {
throw new ProvisioningException("provisioning exception", e);
}
return new KafkaConsumerDestination(name, partitions);
}
catch (Exception e) {
throw new ProvisioningException("provisioning exception", e);
}
}
return new KafkaConsumerDestination(name);
return consumerDestination;
}
AdminClient createAdminClient() {
return AdminClient.create(this.adminClientProperties);
}
/**
@@ -209,14 +223,15 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
});
}
private ConsumerDestination createDlqIfNeedBe(String name, String group,
private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String name, String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties,
boolean anonymous, int partitions) {
if (properties.getExtension().isEnableDlq() && !anonymous) {
String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName()) ?
properties.getExtension().getDlqName() : "error." + name + "." + group;
try {
createTopicAndPartitions(dlqTopic, partitions, properties.getExtension().isAutoRebalanceEnabled());
createTopicAndPartitions(adminClient, dlqTopic, partitions,
properties.getExtension().isAutoRebalanceEnabled(), properties.getExtension().getAdmin());
}
catch (Throwable throwable) {
if (throwable instanceof Error) {
@@ -231,9 +246,10 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
return null;
}
private void createTopic(String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker) {
private void createTopic(AdminClient adminClient, String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker,
KafkaAdminProperties properties) {
try {
createTopicIfNecessary(name, partitionCount, tolerateLowerPartitionsOnBroker);
createTopicIfNecessary(adminClient, name, partitionCount, tolerateLowerPartitionsOnBroker, properties);
}
catch (Throwable throwable) {
if (throwable instanceof Error) {
@@ -245,16 +261,14 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
}
}
private void createTopicIfNecessary(final String topicName, final int partitionCount,
boolean tolerateLowerPartitionsOnBroker) throws Throwable {
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
createTopicAndPartitions(topicName, partitionCount, tolerateLowerPartitionsOnBroker);
private void createTopicIfNecessary(AdminClient adminClient, final String topicName, final int partitionCount,
boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties properties) throws Throwable {
if (this.configurationProperties.isAutoCreateTopics()) {
createTopicAndPartitions(adminClient, topicName, partitionCount, tolerateLowerPartitionsOnBroker,
properties);
}
else if (this.configurationProperties.isAutoCreateTopics() && adminClient == null) {
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
"No topic will be created by the binder");
}
else if (!this.configurationProperties.isAutoCreateTopics()) {
else {
this.logger.info("Auto creation of topics is disabled.");
}
}
@@ -262,9 +276,12 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
/**
* Creates a Kafka topic if needed, or try to increase its partition count to the
* desired number.
* @param adminClient
* @param adminProperties
*/
private void createTopicAndPartitions(final String topicName, final int partitionCount,
boolean tolerateLowerPartitionsOnBroker) throws Throwable {
private void createTopicAndPartitions(AdminClient adminClient, final String topicName, final int partitionCount,
boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties adminProperties) throws Throwable {
ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> namesFutures = listTopicsResult.names();
@@ -298,14 +315,26 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
}
}
}
else if (!names.contains(topicName)) {
else {
// always consider minPartitionCount for topic creation
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
partitionCount);
this.metadataRetryOperations.execute(context -> {
NewTopic newTopic = new NewTopic(topicName, effectivePartitionCount,
(short) configurationProperties.getReplicationFactor());
NewTopic newTopic;
Map<Integer, List<Integer>> replicasAssignments = adminProperties.getReplicasAssignments();
if (replicasAssignments != null && replicasAssignments.size() > 0) {
newTopic = new NewTopic(topicName, adminProperties.getReplicasAssignments());
}
else {
newTopic = new NewTopic(topicName, effectivePartitionCount,
adminProperties.getReplicationFactor() != null
? adminProperties.getReplicationFactor()
: configurationProperties.getReplicationFactor());
}
if (adminProperties.getConfiguration().size() > 0) {
newTopic.configs(adminProperties.getConfiguration());
}
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
try {
createTopicsResult.all().get(operationTimeout, TimeUnit.SECONDS);
@@ -318,6 +347,10 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
}
}
else {
logger.error("Failed to create topics", e.getCause());
throw e.getCause();
}
}
else {
logger.error("Failed to create topics", e.getCause());
@@ -365,10 +398,6 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
private final int partitions;
KafkaProducerDestination(String destinationName) {
this(destinationName, 0);
}
KafkaProducerDestination(String destinationName, Integer partitions) {
this.producerDestinationName = destinationName;
this.partitions = partitions;
@@ -420,10 +449,6 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
return this.consumerDestinationName;
}
public String getDlqName() {
return dlqName;
}
@Override
public String toString() {
return "KafkaConsumerDestination{" +

View File

@@ -55,10 +55,11 @@ public class KafkaTopicProvisionerTests {
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:9092");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, bootConfig);
AdminClient adminClient = KafkaTestUtils.getPropertyValue(provisioner, "adminClient", AdminClient.class);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Map configs = KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder.configs", Map.class);
assertThat(((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0)).isEqualTo("localhost:1234");
adminClient.close();
}
@SuppressWarnings("rawtypes")
@@ -73,13 +74,13 @@ public class KafkaTopicProvisionerTests {
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:1234");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, bootConfig);
AdminClient adminClient = KafkaTestUtils.getPropertyValue(provisioner, "adminClient", AdminClient.class);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Map configs = KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder.configs", Map.class);
assertThat(((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0)).isEqualTo("localhost:1234");
adminClient.close();
}
@SuppressWarnings("rawtypes")
@Test
public void brokersInvalid() throws Exception {
KafkaProperties bootConfig = new KafkaProperties();

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC1</version>
<version>2.0.0.RC3</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

View File

@@ -1,13 +1,7 @@
== Kafka Streams Binding Capabilities of Spring Cloud Stream
== Usage
Spring Cloud Stream Kafka support also includes a binder specifically designed for Apache Kafka Streams binding.
Using this binder, applications can be written that leverage the Apache Kafka Streams API.
For more information on Kafka Streams, see https://kafka.apache.org/documentation/streams/developer-guide[Kafka Streams API Developer Manual]
Kafka Streams support in Spring Cloud Stream is based on the foundations provided by the Spring Kafka project.
For details on that support, see http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafaka Streams Support in Spring Kafka].
Here are the maven coordinates for the Spring Cloud Stream Kafka Streams binder artifact.
For using the Kafka Streams binder, you just need to add it to your Spring Cloud Stream application, using the following
Maven coordinates:
[source,xml]
----
@@ -17,13 +11,29 @@ Here are the maven coordinates for the Spring Cloud Stream Kafka Streams binder
</dependency>
----
High level streams DSL provided through the Kafka Streams API can be used through Spring Cloud Stream support.
Some minimal support for writing applications using the processor API is also available through the binder.
Kafka Streams applications using the Spring Cloud Stream support can be written using the processor model, i.e. messages read from an inbound topic and messages written to an outbound topic or using the sink style where it does not have an output binding.
== Kafka Streams Binder Overview
=== Usage example of high level streams DSL
Spring Cloud Stream's Apache Kafka support also includes a binder implementation designed explicitly for Apache Kafka
Streams binding. With this native integration, a Spring Cloud Stream "processor" application can directly use the
https://kafka.apache.org/documentation/streams/developer-guide[Apache Kafka Streams] APIs in the core business logic.
This application will listen from a Kafka topic and write the word count for each unique word that it sees in a 5 seconds time window.
Kafka Streams binder implementation builds on the foundation provided by the http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafka Streams in Spring Kafka]
project.
As part of this native integration, the high-level https://docs.confluent.io/current/streams/developer-guide/dsl-api.html[Streams DSL]
provided by the Kafka Streams API is available for use in the business logic, too.
An early version of the https://docs.confluent.io/current/streams/developer-guide/processor-api.html[Processor API]
support is available as well.
As noted early-on, Kafka Streams support in Spring Cloud Stream strictly only available for use in the Processor model.
A model in which the messages read from an inbound topic, business processing can be applied, and the transformed messages
can be written to an outbound topic. It can also be used in Processor applications with a no-outbound destination.
=== Streams DSL
This application consumes data from a Kafka topic (e.g., `words`), computes word count for each unique word in a 5 seconds
time window, and the computed results are sent to a downstream topic (e.g., `counts`) for further processing.
[source]
----
@@ -48,25 +58,130 @@ public class WordCountProcessorApplication {
}
----
If you build it as a Spring Boot uber jar, you can run the above example in the following way:
Once built as a uber-jar (e.g., `wordcount-processor.jar`), you can run the above example like the following.
[source]
----
java -jar uber.jar --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts
----
This means that the application will listen from the incoming Kafka topic `words` and write to the output topic `counts`.
This application will consume messages from the Kafka topic `words` and the computed results are published to an output
topic `counts`.
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are bound as KStream objects.
Applications can exclusively focus on the business aspects of the code, i.e. writing the logic required in the processor rather than setting up the streams specific configuration required by the Kafka Streams infrastructure.
All such infrastructure details are handled by the framework.
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as
KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic
required in the processor. Setting up the Streams DSL specific configuration required by the Kafka Streams infrastructure
is automatically handled by the framework.
=== Multiple Input bindings on the inbound
== Configuration Options
Spring Cloud Stream Kafka Streams binder allows the users to write applications with multiple bindings.
There are use cases in which you may want to have multiple incoming KStream objects or a combination of KStream and KTable objects.
Both of these flavors are supported.
Here are some examples.
This section contains the configuration options used by the Kafka Streams binder.
For common configuration options and properties pertaining to binder, refer to the <<binding-properties,core documentation>>.
=== Kafka Streams Properties
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.binder.`
literal.
configuration::
Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
This property must be prefixed with `spring.cloud.stream.kafka.streams.binder.`.
Following are some examples of using this property.
[source]
----
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
----
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in
Apache Kafka Streams docs.
brokers::
Broker URL
+
Default: `localhost`
zkNodes::
Zookeeper URL
+
Default: `localhost`
serdeError::
Deserialization error handler type.
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
+
Default: `logAndFail`
applicationId::
Application ID for all the stream configurations in the current application context.
You can override the application id for an individual `StreamListener` method using the `group` property on the binding.
You have to ensure that you are using the same group name for all input bindings in the case of multiple inputs on the same methods.
+
Default: `default`
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`
literal.
keySerde::
key serde to use
+
Default: `none`.
valueSerde::
value serde to use
+
Default: `none`.
useNativeEncoding::
flag to enable native encoding
+
Default: `false`.
The following properties are _only_ available for Kafka Streams consumers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`
literal.
keySerde::
key serde to use
+
Default: `none`.
valueSerde::
value serde to use
+
Default: `none`.
materializedAs::
state store to materialize when using incoming KTable types
+
Default: `none`.
useNativeDecoding::
flag to enable native decoding
+
Default: `false`.
dlqName::
DLQ topic name.
+
Default: `none`.
=== TimeWindow properties:
Windowing is an important concept in stream processing applications. Following properties are available to configure
time-window computations.
spring.cloud.stream.kafka.streams.timeWindow.length::
When this property is given, you can autowire a `TimeWindows` bean into the application.
The value is expressed in milliseconds.
+
Default: `none`.
spring.cloud.stream.kstream.timeWindow.advanceBy::
Value is given in milliseconds.
+
Default: `none`.
== Multiple Input Bindings
For use cases that requires multiple incoming KStream objects or a combination of KStream and KTable objects, the Kafka
Streams binder provides multiple bindings support.
Let's see it in action.
=== Multiple Input Bindings as a Sink
[source]
----
@@ -91,17 +206,19 @@ interface KStreamKTableBinding {
----
In the above example, the application is written in a sink style, i.e. there are no output bindings and the application has to make the decision to what needs to happen.
Most likely, when you write applications this way, you might want to send the information downstream or store them in a state store (See below for Queryable State Stores).
In the above example, the application is written as a sink, i.e. there are no output bindings and the application has to
decide concerning downstream processing. When you write applications in this style, you might want to send the information
downstream or store them in a state store (See below for Queryable State Stores).
In the case of incoming KTable, if you want to materialize it as a state store, you have to express that through the following property.
In the case of incoming KTable, if you want to materialize the computations to a state store, you have to express it
through the following property.
[source]
----
spring.cloud.stream.kafka.streams.bindings.inputTable.consumer.materializedAs: all-songs
----
Here is an example for multiple input bindings and an output binding (processor style).
=== Multiple Input Bindings as a Processor
[source]
----
@@ -125,15 +242,16 @@ interface KStreamKTableBinding extends KafkaStreamsProcessor {
----
=== Support for branching in Kafka Streams API
== Multiple Output Bindings (aka Branching)
Kafka Streams allow outbound data to be split into multiple topics based on some predicates.
Spring Cloud Stream Kafka Streams binder provides support for this feature without losing the overall programming model exposed through `StreamListener` in the end user application.
You write the application in the usual way as demonstrated above in the word count example.
When using the branching feature, you are required to do a few things.
First, you need to make sure that your return type is `KStream[]` instead of a regular `KStream`.
Then you need to use the `SendTo` annotation containing the output bindings in the order (example below).
For each of these output bindings, you need to configure destination, content-type etc. as required by any other standard Spring Cloud Stream application
Kafka Streams allow outbound data to be split into multiple topics based on some predicates. The Kafka Streams binder provides
support for this feature without compromising the programming model exposed through `StreamListener` in the end user application.
You can write the application in the usual way as demonstrated above in the word count example. However, when using the
branching feature, you are required to do a few things. First, you need to make sure that your return type is `KStream[]`
instead of a regular `KStream`. Second, you need to use the `SendTo` annotation containing the output bindings in the order
(see example below). For each of these output bindings, you need to configure destination, content-type etc., complying with
the standard Spring Cloud Stream expectations.
Here is an example:
@@ -181,7 +299,7 @@ public static class WordCountProcessorApplication {
}
----
Then in the properties:
Properties:
[source]
----
@@ -210,18 +328,24 @@ spring.cloud.stream.bindings.input:
headerMode: raw
----
=== Message conversion in Spring Cloud Stream Kafka Streams applications
== Message Conversion
Spring Cloud Stream Kafka Streams binder allows the usage of usual patterns for content type conversions as in other message channel based binder applications.
Many Kafka Streams operations - that are part of the actual application and not at the inbound and outbound - need to know the type of SerDes used to correctly transform key and value data.
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself for inbound and outbound conversions rather than using the content type conversions offered by the framework.
On the other hand, you might be already familiar with the content type conversion patterns in spring cloud stream and want to keep using them for inbound and outbound conversions.
Both options are supported in the Spring Cloud Stream binder for Apache Kafka Streams.
Similar to message-channel based binder applications, the Kafka Streams binder adapts to the out-of-the-box content-type
conversions without any compromise.
It is typical for Kafka Streams operations to know the type of SerDes used to transform the key and value correctly.
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself at
the inbound and outbound conversions rather than using the content-type conversions offered by the framework.
On the other hand, you might be already familiar with the content-type conversion patterns provided by the framework, and
that, you'd like to continue using for inbound and outbound conversions.
Both the options are supported in the Kafka Streams binder implementation.
==== Outbound serialization
If native encoding is disabled (which is the default), then the framework will convert the message using the contentType set by the user (or the default content type of application/json).
It will ignore any Serde set on the outbound in this case for outbound serialization.
If native encoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the outbound
in this case for outbound serialization.
Here is the property to set the contentType on the outbound.
@@ -237,17 +361,19 @@ Here is the property to enable native encoding.
spring.cloud.stream.bindings.output.nativeEncoding: true
----
If native encoding is enabled on the output binding (user has to explicitly enable it as above), then the framework will skip doing any message conversion on the outbound.
In that case, it will use the Serde set by the user.
First, it checks for the `valueSerde` property set on the actual output binding. Here is an example
If native encoding is enabled on the output binding (user has to enable it as above explicitly), then the framework will
skip any form of automatic message conversion on the outbound. In that case, it will switch to the Serde set by the user.
The `valueSerde` property set on the actual output binding will be used. Here is an example.
[source]
----
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
----
If this property is not set, then it will default to the common value Serde - `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
If this property is not set, then it will use the "default" SerDe: `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
It is worth to mention that Spring Cloud Stream Kafka Streams binder does not serialize the keys on outbound, rather it is always done by Kafka itself.
Therefore, you either have to specify the keySerde property on the binding or it will default to the application wide common keySerde set on the streams configuration.
It is worth to mention that Kafka Streams binder does not serialize the keys on outbound - it simply relies on Kafka itself.
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
`keySerde`.
Binding level key serde:
@@ -283,17 +409,20 @@ interface KStreamProcessorWithBranches {
}
----
If nativeEncoding is set, then you can set different Serde values on these individual output bindings as below.
If `nativeEncoding` is set, then you can set different SerDe's on individual output bindings as below.
[source]
----
spring.cloud.stream.kstream.bindings.output1.producer.valueSerde=IntegerSerde
spring.cloud.stream.kstream.bindings.outpu2t.producer.valueSerde=StringSerde
spring.cloud.stream.kstream.bindings.output2.producer.valueSerde=StringSerde
spring.cloud.stream.kstream.bindings.output3.producer.valueSerde=JsonSerde
----
Then if you have `SendTo` like this, @SendTo({"output1", "output2", "output3"}), the `KStream[]` from the branches are applied with proper Serde objects as defined above.
If you are not enabling nativeEncoding, you can then set different contentType values on the output bindings as below.
In that case, the framework will use the appropriate message converter to convert the messages before sending to Kafka.
Then if you have `SendTo` like this, @SendTo({"output1", "output2", "output3"}), the `KStream[]` from the branches are
applied with proper SerDe objects as defined above. If you are not enabling `nativeEncoding`, you can then set different
contentType values on the output bindings as below. In that case, the framework will use the appropriate message converter
to convert the messages before sending to Kafka.
[source]
----
spring.cloud.stream.bindings.output1.contentType: application/json
@@ -303,10 +432,11 @@ spring.cloud.stream.bindings.output3.contentType: application/octet-stream
==== Inbound Deserialization
Similar rules apply to data deserialization on the inbound as in the case of outbound serialization.
Similar rules apply to data deserialization on the inbound.
If native decoding is disabled (which is the default), then the framework will convert the message using the contentType set by the user (or the default content type of application/json).
It will ignore any Serde set on the inbound in this case for inbound dserialization.
If native decoding is disabled (which is the default), then the framework will convert the message using the contentType
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the inbound
in this case for inbound deserialization.
Here is the property to set the contentType on the inbound.
@@ -322,18 +452,20 @@ Here is the property to enable native decoding.
spring.cloud.stream.bindings.input.nativeDecoding: true
----
If native decoding is enabled on the input binding (user has to explicitly enable it as above), then the framework will skip doing any message conversion on the inbound.
In that case, it will use the Serde set by the user.
First, it checks for the `valueSerde` property set on the actual input binding. Here is an example
If native decoding is enabled on the input binding (user has to enable it as above explicitly), then the framework will
skip doing any message conversion on the inbound. In that case, it will switch to the SerDe set by the user. The `valueSerde`
property set on the actual output binding will be used. Here is an example.
[source]
----
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
----
If this property is not set, then it will default to the common value Serde - `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
It is worth to mention that Spring Cloud Stream Kafka Streams binder does not deserialize the keys on inbound, rather it is always done by Kafka itself.
Therefore, you either have to specify the keySerde property on the binding or it will default to the application wide common keySerde set on the streams configuration.
If this property is not set, it will use the default SerDe: `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
It is worth to mention that Kafka Streams binder does not deserialize the keys on inbound - it simply relies on Kafka itself.
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
`keySerde`.
Binding level key serde:
@@ -349,53 +481,64 @@ Common Key serde:
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
----
As in the case of KStream branching on the outbound, the benefit of setting value Serde per binding is that if you have multiple input bindings (multiple KStreams) and they all require separate value Serdes, then you can configure them individually.
If you use the common configuration approach, then that is not possible.
As in the case of KStream branching on the outbound, the benefit of setting value SerDe per binding is that if you have
multiple input bindings (multiple KStreams object) and they all require separate value SerDe's, then you can configure
them individually. If you use the common configuration approach, then this feature won't be applicable.
==== Error handling on Deserialization exceptions
== Error Handling
Apache Kafka Streams now provide the capability for natively handling exceptions from deserialization errors.
Apache Kafka Streams provide the capability for natively handling exceptions from deserialization errors.
For details on this support, please see https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers[this]
Out of the box, Apache Kafka Streams provide two kinds of deserialization exception handlers - logAndContinue and logAndFail.
As the name indicates, the former will log the error and continue processing next records and the latter will log the error and fai..
LogAndFail is the default deserialization exception handler.
Out of the box, Apache Kafka Streams provide two kinds of deserialization exception handlers - `logAndContinue` and `logAndFail`.
As the name indicates, the former will log the error and continue processing the next records and the latter will log the
error and fail. `LogAndFail` is the default deserialization exception handler.
=== Handling Deserialization Exceptions
Kafka Streams binder supports a selection of exception handlers through the following properties.
Spring Cloud Stream binder for Apache Kafka Streams allows to specify these exception handlers through the following properties.
[source]
----
spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue
----
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the bad records (poison pills) to a DLQ topic.
Here is how you enable this DLQ exception handler.
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous
records (poison pills) to a DLQ topic. Here is how you enable this DLQ exception handler.
[source]
----
spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq
----
When the above property is set, then all records in error from deserialization are sent to the DLQ topic.
First it checks, if there is a `dlqName` property is set on the binding itself using the following property.
When the above property is set, all the deserialization error records are automatically sent to the DLQ topic.
[source]
----
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: foo-dlq
----
If this is set, then the records in error are sent to the topic `foo-dlq`.
If this is not set, then it will create a DLQ topic called `error.<input-topic-name>.<group-name>`.
A couple of things to keep in mind when using the exception handling feature through Spring Cloud Stream binder for Apache Kafka Streams.
If this is set, then the error records are sent to the topic `foo-dlq`. If this is not set, then it will create a DLQ
topic with the name `error.<input-topic-name>.<group-name>`.
* The property `spring.cloud.stream.kafka.streams.binder.serdeError` is applicable for the entire application.
This implies that if there are multiple `StreamListener` methods in the same application, this property is applied to all of them.
* The exception handling for deserialization works consistently with native deserialization and framework provided message conversion.
A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.
==== Handling Non-Deserialization exceptions
* The property `spring.cloud.stream.kafka.streams.binder.serdeError` is applicable for the entire application. This implies
that if there are multiple `StreamListener` methods in the same application, this property is applied to all of them.
* The exception handling for deserialization works consistently with native deserialization and framework provided message
conversion.
Other kinds of error handling is limited in Apache Kafka Streams currently and it is up to the end user applications to handle any such application level errors.
One side effect of providing a DLQ for deserialization exception handlers as above is that, it provides a way to get access to the DLQ sending bean directly from your application.
=== Handling Non-Deserialization Exceptions
For general error handling in Kafka Streams binder, it is up to the end user applications to handle application level errors.
As a side effect of providing a DLQ for deserialization exception handlers, Kafka Streams binder provides a way to get
access to the DLQ sending bean directly from your application.
Once you get access to that bean, you can programmatically send any exception records from your application to the DLQ.
Here is an example for how you may do that.
Keep in mind that, this approach only works out of the box when you use the low level processor API in your application as below.
It still remains hard to achieve the same using the high level DSL without the library natively providing error handling support, but this example provides some hints to work around.
It continues to remain hard to robust error handling using the high-level DSL; Kafka Streams doesn't natively support error
handling yet.
However, when you use the low-level Processor API in your application, there are options to control this behavior. See
below.
[source]
----
@@ -434,11 +577,11 @@ public KStream<?, WordCount> process(KStream<Object, String> input) {
}
----
=== Support for interactive queries
== Interactive Queries
As part of the public API of the binder, it now exposes a class called `QueryableStoreRegistry`.
You can access this as a Spring bean in your application.
One easy way to get access to this bean from your application is to autowire the bean as below.
As part of the public Kafka Streams binder API, we expose a class called `QueryableStoreRegistry`. You can access this
as a Spring bean in your application. An easy way to get access to this bean from your application is to "autowire" the bean
in your application.
[source]
----
@@ -446,113 +589,10 @@ One easy way to get access to this bean from your application is to autowire the
private QueryableStoreRegistry queryableStoreRegistry;
----
Once you gain access to this bean, then you can find out the particular state store that you are interested in.
Here is an example:
Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.
[source]
----
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
queryableStoreRegistry.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
----
Then you can retrieve the data that you stored in this store during the execution of your application.
=== Kafka Streams properties
We covered all the relevant properties that you need when writing Kafka Streams applications using Spring Cloud Stream, scattered in the above sections, but here they are again.
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.binder.`.
configuration::
Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
This property must be prefixed with `spring.cloud.stream.kafka.streams.binder.`.
Following are some examples of using this property.
[source]
----
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
----
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in Apache Kafka Streams docs.
brokers::
Broker URL
+
Default: `localhost`
zkNodes::
Zookeeper URL
+
Default: `localhost`
serdeError::
Deserialization error handler type.
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
+
Default: `logAndFail`
applicationId::
Application ID for all the stream configurations in the current application context.
You can override the application id for an individual `StreamListener` method using the `group` property on the binding.
You have to ensure that you are using the same group name for all input bindings in the case of multiple inputs on the same methods.
+
Default: `default`
The following properties are available for Kafka Streams producers only and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`.
keySerde::
key serde to use
+
Default: `none`.
valueSerde::
value serde to use
+
Default: `none`.
useNativeEncoding::
flag to enable native encoding
+
Default: `false`.
The following properties are available for Kafka Streams consumers only and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`.
keySerde::
key serde to use
+
Default: `none`.
valueSerde::
value serde to use
+
Default: `none`.
materializedAs::
state store to materialize when using incoming KTable types
+
Default: `none`.
useNativeDecoding::
flag to enable native decoding
+
Default: `false`.
dlqName::
DLQ topic name.
+
Default: `none`.
Other common properties used from core Spring Cloud Stream.
[source]
----
spring.cloud.stream.bindings.<binding name>.destination
spring.cloud.stream.bindings.<binding name>.group
----
TimeWindow properties:
Windowing is an important concept in stream processing applications.
Following properties are available for configuring time windows.
spring.cloud.stream.kafka.streams.timeWindow.length::
When this property is given, you can autowire a `TimeWindows` bean into the application.
The value is expressed in milliseconds.
+
Default: `none`.
spring.cloud.stream.kstream.timeWindow.advanceBy::
Value is given in milliseconds.
+
Default: `none`.
----

View File

@@ -7,7 +7,7 @@ In addition, this guide also explains the Kafka Streams binding capabilities of
== Usage
For using the Apache Kafka binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
To use Apache Kafka binder all you need is to add `spring-cloud-stream-binder-kafka` as a dependency to your Spring Cloud Stream application. Below is a Maven example:
[source,xml]
----
@@ -38,6 +38,11 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
The consumer group maps directly to the same Apache Kafka concept.
Partitioning also maps directly to Apache Kafka partitions as well.
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker at least that version.
This client can communicate with older brokers (refer to the Kafka documentation), but certain features may not be available.
For example, with versions earlier than 0.11.x.x, native headers are not supported.
Also, 0.11.x.x does not support the `autoAddPartitions` property.
== Configuration Options
This section contains the configuration options used by the Apache Kafka binder.
@@ -55,42 +60,24 @@ spring.cloud.stream.kafka.binder.defaultBrokerPort::
This sets the default port when no port is configured in the broker list.
+
Default: `9092`.
spring.cloud.stream.kafka.binder.zkNodes::
A list of ZooKeeper nodes to which the Kafka binder can connect.
+
Default: `localhost`.
spring.cloud.stream.kafka.binder.defaultZkPort::
`zkNodes` allows hosts specified with or without port information (e.g., `host1,host2:port2`).
This sets the default port when no port is configured in the node list.
+
Default: `2181`.
spring.cloud.stream.kafka.binder.configuration::
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder.
Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, especially security settings.
Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, for example, security settings.
+
Default: Empty map.
spring.cloud.stream.kafka.binder.headers::
The list of custom headers that will be transported by the binder.
Only required when communicating with older applications (<= 1.3.x) with a `kafka-clients` version < 0.11.0.0; newer versions support headers natively.
+
Default: empty.
spring.cloud.stream.kafka.binder.healthTimeout::
The time to wait to get partition information in seconds; default 60.
Health will report as down if this timer expires.
Health will report as down if this timer expires.
+
Default: 10.
spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow::
The frequency, in milliseconds, with which offsets are saved.
Ignored if `0`.
+
Default: `10000`.
spring.cloud.stream.kafka.binder.offsetUpdateCount::
The frequency, in number of updates, which which consumed offsets are persisted.
Ignored if `0`.
Mutually exclusive with `offsetUpdateTimeWindow`.
+
Default: `0`.
spring.cloud.stream.kafka.binder.requiredAcks::
The number of required acks on the broker.
See the Kafka documentation for the producer `acks` property.
+
Default: `1`.
spring.cloud.stream.kafka.binder.minPartitionCount::
@@ -101,6 +88,7 @@ It can be superseded by the `partitionCount` setting of the producer or by the v
Default: `1`.
spring.cloud.stream.kafka.binder.replicationFactor::
The replication factor of auto-created topics if `autoCreateTopics` is active.
Can be overriden on each binding.
+
Default: `1`.
spring.cloud.stream.kafka.binder.autoCreateTopics::
@@ -116,10 +104,6 @@ If set to `false`, the binder will rely on the partition size of the topic being
If the partition count of the target topic is smaller than the expected value, the binder will fail to start.
+
Default: `false`.
spring.cloud.stream.kafka.binder.socketBufferSize::
Size (in bytes) of the socket buffer to be used by the Kafka consumers.
+
Default: `2097152`.
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix::
Enable transactions in the binder; see `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.
@@ -131,12 +115,37 @@ spring.cloud.stream.kafka.binder.transaction.producer.*::
+
Default: See individual producer properties.
spring.cloud.stream.kafka.binder.headerMapperBeanName::
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to/from Kafka headers.
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper`, which uses JSON deserialization for the headers.
+
Default: none.
[[kafka-consumer-properties]]
=== Kafka Consumer Properties
The following properties are available for Kafka consumers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
admin.configuration::
A `Map` of Kafka topic properties used when provisioning topics.
e.g. `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
+
Default: none.
admin.replicas-assignment::
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and value the assignments.
Used when provisioning new topics.
See `NewTopic` javadocs in the `kafka-clients` jar.
+
Default: none.
admin.replication-factor::
The replication factor to use when provisioning topics; overrides the binder-wide setting.
Ignored if `replicas-assignments` is present.
+
Default: none (the binder-wide default of 1 is used).
autoRebalanceEnabled::
When `true`, topic partitions will be automatically rebalanced between the members of a consumer group.
When `false`, each consumer will be assigned a fixed set of partitions based on `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex`.
@@ -144,12 +153,21 @@ This requires both `spring.cloud.stream.instanceCount` and `spring.cloud.stream.
The property `spring.cloud.stream.instanceCount` must typically be greater than 1 in this case.
+
Default: `true`.
ackEachRecord::
When `autoCommitOffset` is `true`, whether to commit the offset after each record is processed.
By default, offsets are committed after all records in the batch of records returned by `consumer.poll()` have been processed.
The number of records returned by a poll can be controlled with the `max.poll.recods` Kafka property, set via the consumer `configuration` property.
Setting this to true may cause a degradation in performance, but reduces the likelihood of redelivered records when a failure occurs.
Also see the binder `requiredAcks` property, which also affects the performance of committing offsets.
+
Default: `false`.
autoCommitOffset::
Whether to autocommit offsets when a message has been processed.
If set to `false`, a header with the key `kafka_acknowledgment` of the type `org.springframework.kafka.support.Acknowledgment` header will be present in the inbound message.
Applications may use this header for acknowledging messages.
See the examples section for details.
When this property is set to `false`, Kafka binder will set the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL`.
When this property is set to `false`, Kafka binder will set the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL` and the application is responsible for acknowledging records.
Also see `ackEachRecord`.
+
Default: `true`.
autoCommitOnError::
@@ -159,14 +177,15 @@ If set to `true`, it will always auto-commit (if auto-commit is enabled).
If not set (default), it effectively has the same value as `enableDlq`, auto-committing erroneous messages if they are sent to a DLQ, and not committing them otherwise.
+
Default: not set.
recoveryInterval::
The interval between connection recovery attempts, in milliseconds.
resetOffsets::
Whether to reset offsets on the consumer to the value provided by startOffset.
+
Default: `5000`.
Default: `false`.
startOffset::
The starting offset for new groups.
Allowed values: `earliest`, `latest`.
If the consumer group is set explicitly for the consumer 'binding' (via `spring.cloud.stream.bindings.<channelName>.group`), then 'startOffset' is set to `earliest`; otherwise it is set to `latest` for the `anonymous` consumer group.
Also see `resetOffsets`.
+
Default: null (equivalent to `earliest`).
enableDlq::
@@ -214,6 +233,25 @@ Default: `30000`
The following properties are available for Kafka producers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
admin.configuration::
A `Map` of Kafka topic properties used when provisioning new topics.
e.g. `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
+
Default: none.
admin.replicas-assignment::
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and value the assignments.
Used when provisioning new topics.
See `NewTopic` javadocs in the `kafka-clients` jar.
+
Default: none.
admin.replication-factor::
The replication factor to use when provisioning new topics; overrides the binder-wide setting.
Ignored if `replicas-assignments` is present.
+
Default: none (the binder-wide default of 1 is used).
bufferSize::
Upper limit, in bytes, of how much data the Kafka producer will attempt to batch before sending.
+
@@ -229,15 +267,15 @@ batchTimeout::
Default: `0`.
messageKeyExpression::
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message.
For example `headers.key` or `payload.myKey`.
For example `headers['myKey']`; the payload cannot be used because by the time this expression is evaluated, the payload is already in the form of a `byte[]`.
+
Default: `none`.
headerPatterns::
A comma-delimited list of simple patterns to match spring-messaging headers to be mapped to the kafka `Headers` in the `ProducerRecord`.
Patterns can begin or end with the wildcard character (asterisk).
Patterns can be negated by prefixing with `!`; matching stops after the first match (positive or negative).
For example `!foo,fo*` will pass `fox` but not `foo`.
`id` and `timestamp` are never mapped.
Patterns can begin or end with the wildcard character (asterisk).
Patterns can be negated by prefixing with `!`; matching stops after the first match (positive or negative).
For example `!foo,fo*` will pass `fox` but not `foo`.
`id` and `timestamp` are never mapped.
+
Default: `*` (all headers - except the `id` and `timestamp`)
configuration::
@@ -314,7 +352,6 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
----
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
----
@@ -343,7 +380,6 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
[source]
----
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
@@ -366,7 +402,7 @@ KafkaClient {
};
----
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent. As an alternative to setting `spring.cloud.stream.kafka.binder.autoCreateTopics` you can simply remove the broker dependency from the application. See <<exclude-admin-utils>> for details.
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent.
[NOTE]
====
@@ -421,103 +457,6 @@ public class Application {
}
----
==== Using the binder with Apache Kafka 0.10
The default Kafka support in Spring Cloud Stream Kafka binder is for Kafka version 0.10.1.1. The binder also supports connecting to other 0.10 based versions and 0.9 clients.
In order to do this, when you create the project that contains your application, include `spring-cloud-starter-stream-kafka` as you normally would do for the default binder.
Then add these dependencies at the top of the `<dependencies>` section in the pom.xml file to override the dependencies.
Here is an example for downgrading your application to 0.10.0.1. Since it is still on the 0.10 line, the default `spring-kafka` and `spring-integration-kafka` versions can be retained.
[source,xml]
----
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
----
Here is another example of using 0.9.0.1 version.
[source,xml]
----
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
----
[NOTE]
====
The versions above are provided only for the sake of the example.
For best results, we recommend using the most recent 0.10-compatible versions of the projects.
====
[[exclude-admin-utils]]
==== Excluding Kafka broker jar from the classpath of the binder based application
The Apache Kafka Binder uses the administrative utilities which are part of the Apache Kafka server library to create and reconfigure topics.
If the inclusion of the Apache Kafka server library and its dependencies is not necessary at runtime because the application will rely on the topics being configured administratively, the Kafka binder allows for Apache Kafka server dependency to be excluded from the application.
If you use non default versions for Kafka dependencies as advised above, all you have to do is not to include the kafka broker dependency.
If you use the default Kafka version, then ensure that you exclude the kafka broker jar from the `spring-cloud-starter-stream-kafka` dependency as following.
[source,xml]
----
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
----
If you exclude the Apache Kafka server dependency and the topic is not present on the server, then the Apache Kafka broker will create the topic if auto topic creation is enabled on the server.
Please keep in mind that if you are relying on this, then the Kafka server will use the default number of partitions and replication factors.
On the other hand, if auto topic creation is disabled on the server, then care must be taken before running the application to create the topic with the desired number of partitions.
If you want to have full control over how partitions are allocated, then leave the default settings as they are, i.e. do not exclude the kafka broker jar and ensure that `spring.cloud.stream.kafka.binder.autoCreateTopics` is set to `true`, which is the default.
[[kafka-error-channels]]
== Error Channels

View File

@@ -1,4 +1,3 @@
include::overview.adoc[leveloffset=+1]
include::dlq.adoc[leveloffset=+1]
include::partitions.adoc[leveloffset=+1]
include::kafka-streams.adoc[leveloffset=+1]

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC1</version>
<version>2.0.0.RC3</version>
</parent>
<dependencies>

View File

@@ -26,10 +26,12 @@ import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
@@ -41,6 +43,7 @@ import org.springframework.util.ObjectUtils;
* @author Soby Chacko
*/
@EnableConfigurationProperties(KafkaStreamsExtendedBindingProperties.class)
@ConditionalOnBean(BindingService.class)
public class KafkaStreamsBinderSupportAutoConfiguration {
@Bean

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC1</version>
<version>2.0.0.RC3</version>
</parent>
<dependencies>
@@ -18,6 +18,11 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,8 +20,10 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.TimeGauge;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,21 +34,28 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;
/**
* Metrics for Kafka binder.
*
* @author Henryk Konsek
* @author Soby Chacko
* @author Artem Bilan
* @author Oleg Zhurakousky
* @author Jon Schneider
*/
public class KafkaBinderMetrics implements MeterBinder {
public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
static final String METRIC_PREFIX = "spring.cloud.stream.binder.kafka";
static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
private final KafkaMessageChannelBinder binder;
@@ -54,23 +63,29 @@ public class KafkaBinderMetrics implements MeterBinder {
private ConsumerFactory<?, ?> defaultConsumerFactory;
private final MeterRegistry meterRegistry;
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties binderConfigurationProperties,
ConsumerFactory<?, ?> defaultConsumerFactory) {
ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
this.binder = binder;
this.binderConfigurationProperties = binderConfigurationProperties;
this.defaultConsumerFactory = defaultConsumerFactory;
this.meterRegistry = meterRegistry;
}
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties binderConfigurationProperties) {
this(binder, binderConfigurationProperties, null);
this(binder, binderConfigurationProperties, null, null);
}
@Override
public void bindTo(MeterRegistry registry) {
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
.entrySet()) {
if (!topicInfo.getValue().isConsumerTopic()) {
continue;
}
@@ -78,47 +93,67 @@ public class KafkaBinderMetrics implements MeterBinder {
String topic = topicInfo.getKey();
String group = topicInfo.getValue().getConsumerGroup();
try (Consumer<?, ?> metadataConsumer = createConsumerFactory(group).createConsumer()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new LinkedList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
long lag = 0;
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
if (current != null) {
lag += endOffset.getValue() - current.offset();
}
else {
lag += endOffset.getValue();
}
}
registry.gauge(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), lag);
TimeGauge.builder(METRIC_NAME, this, TimeUnit.MILLISECONDS,
o -> calculateConsumerLagOnTopic(topic, group))
.tag("group", group)
.tag("topic", topic)
.description("Consumer lag for a particular group and topic")
.register(registry);
}
}
private double calculateConsumerLagOnTopic(String topic, String group) {
long lag = 0;
try (Consumer<?, ?> metadataConsumer = createConsumerFactory(group).createConsumer()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new LinkedList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
if (current != null) {
lag += endOffset.getValue() - current.offset();
}
else {
lag += endOffset.getValue();
}
}
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
}
return lag;
}
private ConsumerFactory<?, ?> createConsumerFactory(String group) {
if (defaultConsumerFactory != null) {
return defaultConsumerFactory;
if (this.defaultConsumerFactory == null) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConsumerConfiguration())) {
props.putAll(binderConfigurationProperties.getConsumerConfiguration());
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.binderConfigurationProperties.getKafkaConnectionString());
}
props.put("group.id", group);
this.defaultConsumerFactory = new DefaultKafkaConsumerFactory<>(props);
}
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConsumerConfiguration())) {
props.putAll(binderConfigurationProperties.getConsumerConfiguration());
return this.defaultConsumerFactory;
}
@Override
public void onApplicationEvent(BindingCreatedEvent event) {
if (this.meterRegistry != null) {
// meters are idempotent when called with the same arguments so safe to call it multiple times
this.bindTo(this.meterRegistry);
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
this.binderConfigurationProperties.getKafkaConnectionString());
}
props.put("group.id", group);
return new DefaultKafkaConsumerFactory<>(props);
}
}

View File

@@ -28,7 +28,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -80,7 +82,9 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
@@ -325,7 +329,8 @@ public class KafkaMessageChannelBinder extends
Collection<PartitionInfo> listenedPartitions;
if (extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ||
boolean groupManagement = extendedConsumerProperties.getExtension().isAutoRebalanceEnabled();
if (groupManagement ||
extendedConsumerProperties.getInstanceCount() == 1) {
listenedPartitions = allPartitions;
}
@@ -354,6 +359,7 @@ public class KafkaMessageChannelBinder extends
}
containerProperties.setIdleEventInterval(extendedConsumerProperties.getExtension().getIdleEventInterval());
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
resetOffsets(extendedConsumerProperties, consumerFactory, groupManagement, containerProperties);
@SuppressWarnings("rawtypes")
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
new ConcurrentMessageListenerContainer(consumerFactory, containerProperties) {
@@ -366,8 +372,14 @@ public class KafkaMessageChannelBinder extends
};
messageListenerContainer.setConcurrency(concurrency);
// these won't be needed if the container is made a bean
messageListenerContainer.setApplicationEventPublisher(getApplicationContext());
if (getApplicationEventPublisher() != null) {
messageListenerContainer.setApplicationEventPublisher(getApplicationEventPublisher());
}
else if (getApplicationContext() != null) {
messageListenerContainer.setApplicationEventPublisher(getApplicationContext());
}
messageListenerContainer.setBeanName(destination.getName() + ".container");
// end of these won't be needed...
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties()
.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
@@ -376,6 +388,9 @@ public class KafkaMessageChannelBinder extends
else {
messageListenerContainer.getContainerProperties()
.setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
if (extendedConsumerProperties.getExtension().isAckEachRecord()) {
messageListenerContainer.getContainerProperties().setAckMode(AckMode.RECORD);
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
@@ -397,6 +412,58 @@ public class KafkaMessageChannelBinder extends
return kafkaMessageDrivenChannelAdapter;
}
/*
* Reset the offsets if needed; may update the offsets in in the container's
* topicPartitionInitialOffsets.
*/
private void resetOffsets(
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
final ConsumerFactory<?, ?> consumerFactory, boolean groupManagement,
final ContainerProperties containerProperties) {
boolean resetOffsets = extendedConsumerProperties.getExtension().isResetOffsets();
final Object resetTo = consumerFactory.getConfigurationProperties().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
final AtomicBoolean initialAssignment = new AtomicBoolean(true);
if (!"earliest".equals(resetTo) && "!latest".equals(resetTo)) {
logger.warn("no (or unknown) " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
" property cannot reset");
resetOffsets = false;
}
if (groupManagement && resetOffsets) {
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
// no op
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
// no op
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
if (initialAssignment.getAndSet(false)) {
if ("earliest".equals(resetTo)) {
consumer.seekToBeginning(tps);
}
else if ("latest".equals(resetTo)) {
consumer.seekToEnd(tps);
}
}
}
});
}
else if (resetOffsets) {
Arrays.stream(containerProperties.getTopicPartitions())
.map(tpio -> new TopicPartitionInitialOffset(tpio.topic(), tpio.partition(),
// SK GH-599 "earliest".equals(resetTo) ? SeekPosition.BEGINNING : SeekPosition.END))
"earliest".equals(resetTo) ? 0L : Long.MAX_VALUE))
.collect(Collectors.toList()).toArray(containerProperties.getTopicPartitions());
}
}
@Override
protected PolledConsumerResources createPolledConsumerResources(String name, String group,
ConsumerDestination destination, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
@@ -617,7 +684,7 @@ public class KafkaMessageChannelBinder extends
}
}
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2017 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,38 +17,31 @@
package org.springframework.cloud.stream.binder.kafka.config;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.util.ObjectUtils;
/**
* @author David Turanski
@@ -58,24 +51,21 @@ import org.springframework.util.ObjectUtils;
* @author Ilayaperumal Gopinathan
* @author Henryk Konsek
* @author Gary Russell
* @author Oleg Zhurakousky
* @author Artem Bilan
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({ PropertyPlaceholderAutoConfiguration.class})
@Import({KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderHealthIndicatorConfiguration.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
@Autowired
private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;
@Autowired
private ProducerListener producerListener;
@Autowired
private ApplicationContext context;
@Autowired
private KafkaProperties kafkaProperties;
@@ -91,7 +81,8 @@ public class KafkaBinderConfiguration {
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaTopicProvisioner provisioningProvider) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider);
kafkaMessageChannelBinder.setProducerListener(producerListener);
@@ -105,40 +96,37 @@ public class KafkaBinderConfiguration {
return new LoggingProducerListener();
}
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
props.putAll(configurationProperties.getConsumerConfiguration());
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
@Bean
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties);
}
@Bean
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
return new KafkaJaasLoginModuleInitializer();
}
/**
* A conditional configuration for the {@link KafkaBinderMetrics} bean when the
* {@link MeterRegistry} class is in classpath, as well as a {@link MeterRegistry} bean is
* present in the application context.
*/
@Configuration
@ConditionalOnClass(MeterRegistry.class)
@ConditionalOnBean(MeterRegistry.class)
protected class KafkaBinderMetricsConfiguration {
@Bean
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties,
MeterRegistry meterRegistry) {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, null, meterRegistry);
}
}
public static class JaasConfigurationProperties {
private JaasLoginModuleConfiguration kafka;
private JaasLoginModuleConfiguration zookeeper;
}
}

View File

@@ -0,0 +1,63 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.ObjectUtils;
/**
*
* @author Oleg Zhurakousky
*
*/
@Configuration
@ConditionalOnClass(name="org.springframework.boot.actuate.health.HealthIndicator")
class KafkaBinderHealthIndicatorConfiguration {
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
props.putAll(configurationProperties.getConsumerConfiguration());
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaAdminProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.config.BinderFactoryConfiguration;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Gary Russell
* @since 2.0
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {KafkaBinderConfiguration.class,
BinderFactoryConfiguration.class,
BindingServiceConfiguration.class })
@TestPropertySource(properties = {
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replication-factor=2",
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replicas-assignments.0=0,1",
"spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0" })
@EnableIntegration
public class AdminConfigTests {
@Autowired
private KafkaMessageChannelBinder binder;
@Test
public void testProps() {
KafkaConsumerProperties consumerProps = this.binder.getExtendedConsumerProperties("input");
KafkaAdminProperties admin = consumerProps.getAdmin();
assertThat(admin.getReplicationFactor()).isEqualTo((short) 2);
assertThat(admin.getReplicasAssignments().get(0)).isEqualTo(Arrays.asList(0, 1));
assertThat(admin.getConfiguration().get("message.format.version")).isEqualTo("0.9.0.0");
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,14 +28,12 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -50,8 +48,7 @@ import static org.junit.Assert.assertTrue;
* @author Ilayaperumal Gopinathan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = { KafkaBinderAutoConfigurationPropertiesTest.KafkaBinderConfigProperties.class,
KafkaBinderConfiguration.class })
@SpringBootTest(classes = {KafkaBinderConfiguration.class })
@TestPropertySource(locations = "classpath:binder-config-autoconfig.properties")
public class KafkaBinderAutoConfigurationPropertiesTest {
@@ -124,13 +121,4 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
bootstrapServers.add("10.98.09.196:9092");
assertTrue(((List<String>) configs.get("bootstrap.servers")).containsAll(bootstrapServers));
}
public static class KafkaBinderConfigProperties {
@Bean
KafkaProperties kafkaProperties() {
return new KafkaProperties();
}
}
}

View File

@@ -20,9 +20,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.search.Search;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -70,21 +71,21 @@ public class KafkaBinderMetricsTest {
MockitoAnnotations.initMocks(this);
org.mockito.BDDMockito.given(consumerFactory.createConsumer()).willReturn(consumer);
org.mockito.BDDMockito.given(binder.getTopicsInUse()).willReturn(topicsInUse);
metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties, consumerFactory);
org.mockito.BDDMockito.given(consumer.endOffsets(org.mockito.Matchers.anyCollectionOf(TopicPartition.class)))
metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties, consumerFactory, null);
org.mockito.BDDMockito.given(consumer.endOffsets(ArgumentMatchers.anyCollection()))
.willReturn(java.util.Collections.singletonMap(new TopicPartition(TEST_TOPIC, 0), 1000L));
}
@Test
public void shouldIndicateLag() {
org.mockito.BDDMockito.given(consumer.committed(org.mockito.Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(group.gauge().value()).isEqualTo(500.0);
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
.value(TimeUnit.MILLISECONDS)).isEqualTo(500.0);
}
@Test
@@ -92,15 +93,15 @@ public class KafkaBinderMetricsTest {
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(new TopicPartition(TEST_TOPIC, 0), 1000L);
endOffsets.put(new TopicPartition(TEST_TOPIC, 1), 1000L);
org.mockito.BDDMockito.given(consumer.endOffsets(org.mockito.Matchers.anyCollectionOf(TopicPartition.class))).willReturn(endOffsets);
org.mockito.BDDMockito.given(consumer.committed(org.mockito.Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
org.mockito.BDDMockito.given(consumer.endOffsets(ArgumentMatchers.anyCollection())).willReturn(endOffsets);
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
List<PartitionInfo> partitions = partitions(new Node(0, null, 0), new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(group.gauge().value()).isEqualTo(1000.0);
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
}
@Test
@@ -110,8 +111,8 @@ public class KafkaBinderMetricsTest {
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(group.gauge().value()).isEqualTo(1000.0);
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
}
@Test

View File

@@ -100,6 +100,8 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
@@ -1095,6 +1097,11 @@ public class KafkaBinderTests extends
"testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", "test", moduleInputChannel,
consumerProperties);
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding,
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.BATCH);
String testPayload1 = "foo" + UUID.randomUUID().toString();
Message<?> message1 = org.springframework.integration.support.MessageBuilder.withPayload(
testPayload1.getBytes()).build();
@@ -1131,12 +1138,17 @@ public class KafkaBinderTests extends
QueueChannel inbound1 = new QueueChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerProperties.getExtension().setAckEachRecord(true);
Binding<MessageChannel> consumerBinding1 = binder.bindConsumer(testDestination, "test1", inbound1,
consumerProperties);
QueueChannel inbound2 = new QueueChannel();
Binding<MessageChannel> consumerBinding2 = binder.bindConsumer(testDestination, "test2", inbound2,
consumerProperties);
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding2,
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.RECORD);
Message<?> receivedMessage1 = receive(inbound1);
assertThat(receivedMessage1).isNotNull();
assertThat(new String((byte[]) receivedMessage1.getPayload(), StandardCharsets.UTF_8)).isEqualTo(testPayload);

View File

@@ -17,10 +17,20 @@
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@@ -28,9 +38,24 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.messaging.MessageChannel;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
/**
* @author Gary Russell
@@ -76,4 +101,134 @@ public class KafkaBinderUnitTests {
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
}
@Test
public void testOffsetResetWithGroupManagementEarliest() throws Exception {
testOffsetResetWithGroupManagement(true, true);
}
@Test
public void testOffsetResetWithGroupManagementLatest() throws Throwable {
testOffsetResetWithGroupManagement(false, true);
}
@Test
public void testOffsetResetWithManualAssignmentEarliest() throws Exception {
testOffsetResetWithGroupManagement(true, false);
}
@Test
public void testOffsetResetWithGroupManualAssignmentLatest() throws Throwable {
testOffsetResetWithGroupManagement(false, false);
}
private void testOffsetResetWithGroupManagement(final boolean earliest, boolean groupManage) throws Exception {
final List<TopicPartition> partitions = new ArrayList<>();
partitions.add(new TopicPartition("foo", 0));
partitions.add(new TopicPartition("foo", 1));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties();
KafkaTopicProvisioner provisioningProvider = mock(KafkaTopicProvisioner.class);
ConsumerDestination dest = mock(ConsumerDestination.class);
given(dest.getName()).willReturn("foo");
given(provisioningProvider.provisionConsumerDestination(anyString(), anyString(), any())).willReturn(dest);
final AtomicInteger part = new AtomicInteger();
willAnswer(i -> {
return partitions.stream()
.map(p -> new PartitionInfo("foo", part.getAndIncrement(), null, null, null))
.collect(Collectors.toList());
}).given(provisioningProvider).getPartitionsForTopic(anyInt(), anyBoolean(), any());
@SuppressWarnings("unchecked")
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
final CountDownLatch latch = new CountDownLatch(2);
willAnswer(i -> {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new ConsumerRecords<>(Collections.emptyMap());
}).given(consumer).poll(anyLong());
willAnswer(i -> {
((org.apache.kafka.clients.consumer.ConsumerRebalanceListener) i.getArgument(1))
.onPartitionsAssigned(partitions);
latch.countDown();
latch.countDown();
return null;
}).given(consumer).subscribe(eq(Collections.singletonList("foo")),
any(org.apache.kafka.clients.consumer.ConsumerRebalanceListener.class));
willAnswer(i -> {
latch.countDown();
return null;
}).given(consumer).seek(any(), anyLong());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties, provisioningProvider) {
@Override
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
return new ConsumerFactory<byte[], byte[]>() {
@Override
public Consumer<byte[], byte[]> createConsumer() {
return consumer;
}
@Override
public Consumer<byte[], byte[]> createConsumer(String arg0) {
return consumer;
}
@Override
public Consumer<byte[], byte[]> createConsumer(String arg0, String arg1) {
return consumer;
}
@Override
public boolean isAutoCommit() {
return false;
}
@Override
public Map<String, Object> getConfigurationProperties() {
return Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
earliest ? "earliest" : "latest");
}
};
}
};
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
MessageChannel channel = new DirectChannel();
KafkaConsumerProperties extension = new KafkaConsumerProperties();
extension.setResetOffsets(true);
extension.setAutoRebalanceEnabled(groupManage);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<KafkaConsumerProperties>(
extension);
consumerProperties.setInstanceCount(1);
binder.bindConsumer("foo", "bar", channel, consumerProperties);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
if (groupManage) {
if (earliest) {
verify(consumer).seekToBeginning(partitions);
}
else {
verify(consumer).seekToEnd(partitions);
}
}
else {
if (earliest) {
verify(consumer).seek(partitions.get(0), 0L);
verify(consumer).seek(partitions.get(1), 0L);
}
else {
verify(consumer).seek(partitions.get(0), Long.MAX_VALUE);
verify(consumer).seek(partitions.get(1), Long.MAX_VALUE);
}
}
}
}

View File

@@ -0,0 +1,111 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Artem Bilan
* @author Oleg Zhurakousky
* @author Jon Schneider
*
* @since 2.0
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
properties = "spring.cloud.stream.bindings.input.group=" + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP)
public class KafkaBinderActuatorTests {
static final String TEST_CONSUMER_GROUP = "testGroup";
private static final String KAFKA_BROKERS_PROPERTY = "spring.kafka.bootstrap-servers";
@ClassRule
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true);
@BeforeClass
public static void setup() {
System.setProperty(KAFKA_BROKERS_PROPERTY, kafkaEmbedded.getBrokersAsString());
}
@AfterClass
public static void clean() {
System.clearProperty(KAFKA_BROKERS_PROPERTY);
}
@Autowired
private MeterRegistry meterRegistry;
@Autowired
private KafkaTemplate<?, byte[]> kafkaTemplate;
@Test
public void testKafkaBinderMetricsExposed() {
this.kafkaTemplate.send(Sink.INPUT, null, "foo".getBytes());
this.kafkaTemplate.flush();
assertThat(this.meterRegistry.get("spring.cloud.stream.binder.kafka.offset")
.tag("group", TEST_CONSUMER_GROUP)
.tag("topic", Sink.INPUT)
.timeGauge().value()).isGreaterThan(0);
}
@Test
public void testKafkaBinderMetricsWhenNoMicrometer() {
new ApplicationContextRunner()
.withUserConfiguration(KafkaMetricsTestConfig.class)
.withClassLoader(new FilteredClassLoader("io.micrometer.core"))
.run(context -> {
assertThat(context.getBeanNamesForType(MeterRegistry.class)).isEmpty();
assertThat(context.getBeanNamesForType(MeterBinder.class)).isEmpty();
});
}
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class KafkaMetricsTestConfig {
@StreamListener(Sink.INPUT)
public void process(String payload) throws InterruptedException {
// Artificial slow listener to emulate consumer lag
Thread.sleep(1000);
}
}
}