Compare commits
24 Commits
v2.0.0.RC1
...
v2.0.0.RC3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8267ea81e | ||
|
|
2b595b004f | ||
|
|
de45edc962 | ||
|
|
2406fe5237 | ||
|
|
b5a0013e1e | ||
|
|
c814ad5595 | ||
|
|
def2c3d0ed | ||
|
|
10a44d1e44 | ||
|
|
0de078ca48 | ||
|
|
8035e25359 | ||
|
|
bcf15ed3be | ||
|
|
d37ef750ad | ||
|
|
d8baca5a66 | ||
|
|
b9d7f1f537 | ||
|
|
ad819ece92 | ||
|
|
e254968eaf | ||
|
|
25a64e1d85 | ||
|
|
353c89ab63 | ||
|
|
227d8f39f6 | ||
|
|
555d3ccbd8 | ||
|
|
d7c5c9c13b | ||
|
|
e23af0ccc1 | ||
|
|
3fd93e7de8 | ||
|
|
90a778da6a |
10
pom.xml
10
pom.xml
@@ -2,20 +2,20 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC1</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.0.0.RC1</version>
|
||||
<version>2.0.0.RC2</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.1.3.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.0.2.RELEASE</spring-integration-kafka.version>
|
||||
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.0.3.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>1.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>2.0.0.RC1</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>2.0.0.RC3</spring-cloud-stream.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC1</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC1</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Properties for configuring topics.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 2.0
|
||||
*
|
||||
*/
|
||||
public class KafkaAdminProperties {
|
||||
|
||||
private Short replicationFactor;
|
||||
|
||||
private Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
|
||||
|
||||
private Map<String, String> configuration = new HashMap<>();
|
||||
|
||||
public Short getReplicationFactor() {
|
||||
return this.replicationFactor;
|
||||
}
|
||||
|
||||
public void setReplicationFactor(Short replicationFactor) {
|
||||
this.replicationFactor = replicationFactor;
|
||||
}
|
||||
|
||||
public Map<Integer, List<Integer>> getReplicasAssignments() {
|
||||
return this.replicasAssignments;
|
||||
}
|
||||
|
||||
public void setReplicasAssignments(Map<Integer, List<Integer>> replicasAssignments) {
|
||||
this.replicasAssignments = replicasAssignments;
|
||||
}
|
||||
|
||||
public Map<String, String> getConfiguration() {
|
||||
return this.configuration;
|
||||
}
|
||||
|
||||
public void setConfiguration(Map<String, String> configuration) {
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015-2017 the original author or authors.
|
||||
* Copyright 2015-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@@ -45,7 +46,7 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private final Transaction transaction = new Transaction();
|
||||
|
||||
@Autowired(required = false)
|
||||
@Autowired
|
||||
private KafkaProperties kafkaProperties;
|
||||
|
||||
private String[] zkNodes = new String[] { "localhost" };
|
||||
@@ -86,7 +87,7 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private String requiredAcks = "1";
|
||||
|
||||
private int replicationFactor = 1;
|
||||
private short replicationFactor = 1;
|
||||
|
||||
private int fetchSize = 1024 * 1024;
|
||||
|
||||
@@ -110,6 +111,13 @@ public class KafkaBinderConfigurationProperties {
|
||||
return this.transaction;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the connection String
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
@Deprecated
|
||||
public String getZkConnectionString() {
|
||||
return toConnectionString(this.zkNodes, this.defaultZkPort);
|
||||
}
|
||||
@@ -126,26 +134,68 @@ public class KafkaBinderConfigurationProperties {
|
||||
return this.headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the window.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getOffsetUpdateTimeWindow() {
|
||||
return this.offsetUpdateTimeWindow;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the count.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getOffsetUpdateCount() {
|
||||
return this.offsetUpdateCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the timeout.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getOffsetUpdateShutdownTimeout() {
|
||||
return this.offsetUpdateShutdownTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper nodes.
|
||||
* @return the nodes.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public String[] getZkNodes() {
|
||||
return this.zkNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper nodes.
|
||||
* @param zkNodes the nodes.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public void setZkNodes(String... zkNodes) {
|
||||
this.zkNodes = zkNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper port.
|
||||
* @param defaultZkPort the port.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public void setDefaultZkPort(String defaultZkPort) {
|
||||
this.defaultZkPort = defaultZkPort;
|
||||
}
|
||||
@@ -166,30 +216,79 @@ public class KafkaBinderConfigurationProperties {
|
||||
this.headers = headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param offsetUpdateTimeWindow the window.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) {
|
||||
this.offsetUpdateTimeWindow = offsetUpdateTimeWindow;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param offsetUpdateCount the count.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setOffsetUpdateCount(int offsetUpdateCount) {
|
||||
this.offsetUpdateCount = offsetUpdateCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param offsetUpdateShutdownTimeout the timeout.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) {
|
||||
this.offsetUpdateShutdownTimeout = offsetUpdateShutdownTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper session timeout.
|
||||
* @return the timeout.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public int getZkSessionTimeout() {
|
||||
return this.zkSessionTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper session timeout.
|
||||
* @param zkSessionTimeout the timout
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public void setZkSessionTimeout(int zkSessionTimeout) {
|
||||
this.zkSessionTimeout = zkSessionTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper connection timeout.
|
||||
* @return the timout.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public int getZkConnectionTimeout() {
|
||||
return this.zkConnectionTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper connection timeout.
|
||||
* @param zkConnectionTimeout the timeout.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public void setZkConnectionTimeout(int zkConnectionTimeout) {
|
||||
this.zkConnectionTimeout = zkConnectionTimeout;
|
||||
}
|
||||
@@ -212,10 +311,24 @@ public class KafkaBinderConfigurationProperties {
|
||||
return StringUtils.arrayToCommaDelimitedString(fullyFormattedHosts);
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the wait.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getMaxWait() {
|
||||
return this.maxWait;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer user.
|
||||
* @param maxWait the wait.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setMaxWait(int maxWait) {
|
||||
this.maxWait = maxWait;
|
||||
}
|
||||
@@ -232,18 +345,32 @@ public class KafkaBinderConfigurationProperties {
|
||||
this.requiredAcks = requiredAcks;
|
||||
}
|
||||
|
||||
public int getReplicationFactor() {
|
||||
public short getReplicationFactor() {
|
||||
return this.replicationFactor;
|
||||
}
|
||||
|
||||
public void setReplicationFactor(int replicationFactor) {
|
||||
public void setReplicationFactor(short replicationFactor) {
|
||||
this.replicationFactor = replicationFactor;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getFetchSize() {
|
||||
return this.fetchSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param fetchSize the size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setFetchSize(int fetchSize) {
|
||||
this.fetchSize = fetchSize;
|
||||
}
|
||||
@@ -264,10 +391,24 @@ public class KafkaBinderConfigurationProperties {
|
||||
this.healthTimeout = healthTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the queue size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getQueueSize() {
|
||||
return this.queueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param queueSize the queue size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setQueueSize(int queueSize) {
|
||||
this.queueSize = queueSize;
|
||||
}
|
||||
@@ -288,10 +429,26 @@ public class KafkaBinderConfigurationProperties {
|
||||
this.autoAddPartitions = autoAddPartitions;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used; set properties such as this via {@link #getConfiguration()
|
||||
* configuration}.
|
||||
* @return the size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0, set properties such as this via 'configuration'")
|
||||
public int getSocketBufferSize() {
|
||||
return this.socketBufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used; set properties such as this via {@link #getConfiguration()
|
||||
* configuration}.
|
||||
* @param socketBufferSize the size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0, set properties such as this via 'configuration'")
|
||||
public void setSocketBufferSize(int socketBufferSize) {
|
||||
this.socketBufferSize = socketBufferSize;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
* Copyright 2016-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -53,6 +53,8 @@ public class KafkaConsumerProperties {
|
||||
both
|
||||
}
|
||||
|
||||
private boolean ackEachRecord;
|
||||
|
||||
private boolean autoRebalanceEnabled = true;
|
||||
|
||||
private boolean autoCommitOffset = true;
|
||||
@@ -61,6 +63,8 @@ public class KafkaConsumerProperties {
|
||||
|
||||
private StartOffset startOffset;
|
||||
|
||||
private boolean resetOffsets;
|
||||
|
||||
private boolean enableDlq;
|
||||
|
||||
private String dlqName;
|
||||
@@ -79,6 +83,16 @@ public class KafkaConsumerProperties {
|
||||
|
||||
private Map<String, String> configuration = new HashMap<>();
|
||||
|
||||
private KafkaAdminProperties admin = new KafkaAdminProperties();
|
||||
|
||||
public boolean isAckEachRecord() {
|
||||
return this.ackEachRecord;
|
||||
}
|
||||
|
||||
public void setAckEachRecord(boolean ackEachRecord) {
|
||||
this.ackEachRecord = ackEachRecord;
|
||||
}
|
||||
|
||||
public boolean isAutoCommitOffset() {
|
||||
return this.autoCommitOffset;
|
||||
}
|
||||
@@ -95,6 +109,14 @@ public class KafkaConsumerProperties {
|
||||
this.startOffset = startOffset;
|
||||
}
|
||||
|
||||
public boolean isResetOffsets() {
|
||||
return this.resetOffsets;
|
||||
}
|
||||
|
||||
public void setResetOffsets(boolean resetOffsets) {
|
||||
this.resetOffsets = resetOffsets;
|
||||
}
|
||||
|
||||
public boolean isEnableDlq() {
|
||||
return this.enableDlq;
|
||||
}
|
||||
@@ -111,10 +133,22 @@ public class KafkaConsumerProperties {
|
||||
this.autoCommitOnError = autoCommitOnError;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the interval.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public int getRecoveryInterval() {
|
||||
return this.recoveryInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param recoveryInterval the interval.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public void setRecoveryInterval(int recoveryInterval) {
|
||||
this.recoveryInterval = recoveryInterval;
|
||||
}
|
||||
@@ -182,4 +216,12 @@ public class KafkaConsumerProperties {
|
||||
this.idleEventInterval = idleEventInterval;
|
||||
}
|
||||
|
||||
public KafkaAdminProperties getAdmin() {
|
||||
return this.admin;
|
||||
}
|
||||
|
||||
public void setAdmin(KafkaAdminProperties admin) {
|
||||
this.admin = admin;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -44,6 +44,8 @@ public class KafkaProducerProperties {
|
||||
|
||||
private Map<String, String> configuration = new HashMap<>();
|
||||
|
||||
private KafkaAdminProperties admin = new KafkaAdminProperties();
|
||||
|
||||
public int getBufferSize() {
|
||||
return this.bufferSize;
|
||||
}
|
||||
@@ -101,6 +103,15 @@ public class KafkaProducerProperties {
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
public KafkaAdminProperties getAdmin() {
|
||||
return this.admin;
|
||||
}
|
||||
|
||||
public void setAdmin(KafkaAdminProperties admin) {
|
||||
this.admin = admin;
|
||||
}
|
||||
|
||||
|
||||
public enum CompressionType {
|
||||
none,
|
||||
gzip,
|
||||
|
||||
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.provisioning;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
@@ -44,6 +45,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.cloud.stream.binder.BinderException;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaAdminProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
@@ -67,6 +69,7 @@ import org.springframework.util.StringUtils;
|
||||
* @author Gary Russell
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Simon Flandergan
|
||||
* @author Oleg Zhurakousky
|
||||
*/
|
||||
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
|
||||
@@ -77,19 +80,18 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
|
||||
private final KafkaBinderConfigurationProperties configurationProperties;
|
||||
|
||||
private final AdminClient adminClient;
|
||||
private final int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
|
||||
|
||||
private final Map<String, Object> adminClientProperties;
|
||||
|
||||
private RetryOperations metadataRetryOperations;
|
||||
|
||||
private final int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
|
||||
|
||||
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
|
||||
KafkaProperties kafkaProperties) {
|
||||
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
|
||||
Map<String, Object> adminClientProperties = kafkaProperties.buildAdminProperties();
|
||||
this.adminClientProperties = kafkaProperties.buildAdminProperties();
|
||||
this.configurationProperties = kafkaBinderConfigurationProperties;
|
||||
normalalizeBootPropsWithBinder(adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
|
||||
this.adminClient = AdminClient.create(adminClientProperties);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -118,33 +120,38 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProducerDestination provisionProducerDestination(final String name, ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
public ProducerDestination provisionProducerDestination(final String name,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("Using kafka topic for outbound: " + name);
|
||||
}
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
createTopic(name, properties.getPartitionCount(), false);
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
|
||||
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
|
||||
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
|
||||
try (AdminClient adminClient = AdminClient.create(this.adminClientProperties)) {
|
||||
createTopic(adminClient, name, properties.getPartitionCount(), false, properties.getExtension().getAdmin());
|
||||
int partitions = 0;
|
||||
if (this.configurationProperties.isAutoCreateTopics()) {
|
||||
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
|
||||
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
|
||||
|
||||
try {
|
||||
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
|
||||
Map<String, TopicDescription> topicDescriptions = null;
|
||||
try {
|
||||
topicDescriptions = all.get(this.operationTimeout, TimeUnit.SECONDS);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ProvisioningException("Problems encountered with partitions finding", e);
|
||||
}
|
||||
TopicDescription topicDescription = topicDescriptions.get(name);
|
||||
int partitions = topicDescription.partitions().size();
|
||||
return new KafkaProducerDestination(name, partitions);
|
||||
partitions = topicDescription.partitions().size();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ProvisioningException("Problems encountered with partitions finding", e);
|
||||
}
|
||||
}
|
||||
else {
|
||||
return new KafkaProducerDestination(name);
|
||||
return new KafkaProducerDestination(name, partitions);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumerDestination provisionConsumerDestination(final String name, final String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
public ConsumerDestination provisionConsumerDestination(final String name, final String group,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
boolean anonymous = !StringUtils.hasText(group);
|
||||
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
|
||||
@@ -153,25 +160,32 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
throw new IllegalArgumentException("Instance count cannot be zero");
|
||||
}
|
||||
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
|
||||
createTopic(name, partitionCount, properties.getExtension().isAutoRebalanceEnabled());
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
|
||||
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
|
||||
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
|
||||
try {
|
||||
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
|
||||
TopicDescription topicDescription = topicDescriptions.get(name);
|
||||
int partitions = topicDescription.partitions().size();
|
||||
ConsumerDestination dlqTopic = createDlqIfNeedBe(name, group, properties, anonymous, partitions);
|
||||
if (dlqTopic != null) {
|
||||
return dlqTopic;
|
||||
ConsumerDestination consumerDestination = new KafkaConsumerDestination(name);
|
||||
try (AdminClient adminClient = createAdminClient()) {
|
||||
createTopic(adminClient, name, partitionCount, properties.getExtension().isAutoRebalanceEnabled(),
|
||||
properties.getExtension().getAdmin());
|
||||
if (this.configurationProperties.isAutoCreateTopics()) {
|
||||
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
|
||||
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
|
||||
try {
|
||||
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
|
||||
TopicDescription topicDescription = topicDescriptions.get(name);
|
||||
int partitions = topicDescription.partitions().size();
|
||||
consumerDestination = createDlqIfNeedBe(adminClient, name, group, properties, anonymous, partitions);
|
||||
if (consumerDestination == null) {
|
||||
consumerDestination = new KafkaConsumerDestination(name, partitions);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ProvisioningException("provisioning exception", e);
|
||||
}
|
||||
return new KafkaConsumerDestination(name, partitions);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ProvisioningException("provisioning exception", e);
|
||||
}
|
||||
}
|
||||
return new KafkaConsumerDestination(name);
|
||||
return consumerDestination;
|
||||
}
|
||||
|
||||
AdminClient createAdminClient() {
|
||||
return AdminClient.create(this.adminClientProperties);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -209,14 +223,15 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
});
|
||||
}
|
||||
|
||||
private ConsumerDestination createDlqIfNeedBe(String name, String group,
|
||||
private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String name, String group,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties,
|
||||
boolean anonymous, int partitions) {
|
||||
if (properties.getExtension().isEnableDlq() && !anonymous) {
|
||||
String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName()) ?
|
||||
properties.getExtension().getDlqName() : "error." + name + "." + group;
|
||||
try {
|
||||
createTopicAndPartitions(dlqTopic, partitions, properties.getExtension().isAutoRebalanceEnabled());
|
||||
createTopicAndPartitions(adminClient, dlqTopic, partitions,
|
||||
properties.getExtension().isAutoRebalanceEnabled(), properties.getExtension().getAdmin());
|
||||
}
|
||||
catch (Throwable throwable) {
|
||||
if (throwable instanceof Error) {
|
||||
@@ -231,9 +246,10 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
return null;
|
||||
}
|
||||
|
||||
private void createTopic(String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker) {
|
||||
private void createTopic(AdminClient adminClient, String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker,
|
||||
KafkaAdminProperties properties) {
|
||||
try {
|
||||
createTopicIfNecessary(name, partitionCount, tolerateLowerPartitionsOnBroker);
|
||||
createTopicIfNecessary(adminClient, name, partitionCount, tolerateLowerPartitionsOnBroker, properties);
|
||||
}
|
||||
catch (Throwable throwable) {
|
||||
if (throwable instanceof Error) {
|
||||
@@ -245,16 +261,14 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
}
|
||||
}
|
||||
|
||||
private void createTopicIfNecessary(final String topicName, final int partitionCount,
|
||||
boolean tolerateLowerPartitionsOnBroker) throws Throwable {
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
|
||||
createTopicAndPartitions(topicName, partitionCount, tolerateLowerPartitionsOnBroker);
|
||||
private void createTopicIfNecessary(AdminClient adminClient, final String topicName, final int partitionCount,
|
||||
boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties properties) throws Throwable {
|
||||
|
||||
if (this.configurationProperties.isAutoCreateTopics()) {
|
||||
createTopicAndPartitions(adminClient, topicName, partitionCount, tolerateLowerPartitionsOnBroker,
|
||||
properties);
|
||||
}
|
||||
else if (this.configurationProperties.isAutoCreateTopics() && adminClient == null) {
|
||||
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
|
||||
"No topic will be created by the binder");
|
||||
}
|
||||
else if (!this.configurationProperties.isAutoCreateTopics()) {
|
||||
else {
|
||||
this.logger.info("Auto creation of topics is disabled.");
|
||||
}
|
||||
}
|
||||
@@ -262,9 +276,12 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
/**
|
||||
* Creates a Kafka topic if needed, or try to increase its partition count to the
|
||||
* desired number.
|
||||
* @param adminClient
|
||||
* @param adminProperties
|
||||
*/
|
||||
private void createTopicAndPartitions(final String topicName, final int partitionCount,
|
||||
boolean tolerateLowerPartitionsOnBroker) throws Throwable {
|
||||
private void createTopicAndPartitions(AdminClient adminClient, final String topicName, final int partitionCount,
|
||||
boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties adminProperties) throws Throwable {
|
||||
|
||||
ListTopicsResult listTopicsResult = adminClient.listTopics();
|
||||
KafkaFuture<Set<String>> namesFutures = listTopicsResult.names();
|
||||
|
||||
@@ -298,14 +315,26 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (!names.contains(topicName)) {
|
||||
else {
|
||||
// always consider minPartitionCount for topic creation
|
||||
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
|
||||
partitionCount);
|
||||
this.metadataRetryOperations.execute(context -> {
|
||||
|
||||
NewTopic newTopic = new NewTopic(topicName, effectivePartitionCount,
|
||||
(short) configurationProperties.getReplicationFactor());
|
||||
NewTopic newTopic;
|
||||
Map<Integer, List<Integer>> replicasAssignments = adminProperties.getReplicasAssignments();
|
||||
if (replicasAssignments != null && replicasAssignments.size() > 0) {
|
||||
newTopic = new NewTopic(topicName, adminProperties.getReplicasAssignments());
|
||||
}
|
||||
else {
|
||||
newTopic = new NewTopic(topicName, effectivePartitionCount,
|
||||
adminProperties.getReplicationFactor() != null
|
||||
? adminProperties.getReplicationFactor()
|
||||
: configurationProperties.getReplicationFactor());
|
||||
}
|
||||
if (adminProperties.getConfiguration().size() > 0) {
|
||||
newTopic.configs(adminProperties.getConfiguration());
|
||||
}
|
||||
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
|
||||
try {
|
||||
createTopicsResult.all().get(operationTimeout, TimeUnit.SECONDS);
|
||||
@@ -318,6 +347,10 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
|
||||
}
|
||||
}
|
||||
else {
|
||||
logger.error("Failed to create topics", e.getCause());
|
||||
throw e.getCause();
|
||||
}
|
||||
}
|
||||
else {
|
||||
logger.error("Failed to create topics", e.getCause());
|
||||
@@ -365,10 +398,6 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
|
||||
private final int partitions;
|
||||
|
||||
KafkaProducerDestination(String destinationName) {
|
||||
this(destinationName, 0);
|
||||
}
|
||||
|
||||
KafkaProducerDestination(String destinationName, Integer partitions) {
|
||||
this.producerDestinationName = destinationName;
|
||||
this.partitions = partitions;
|
||||
@@ -420,10 +449,6 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
return this.consumerDestinationName;
|
||||
}
|
||||
|
||||
public String getDlqName() {
|
||||
return dlqName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "KafkaConsumerDestination{" +
|
||||
|
||||
@@ -55,10 +55,11 @@ public class KafkaTopicProvisionerTests {
|
||||
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ts.getFile().getAbsolutePath());
|
||||
binderConfig.setBrokers("localhost:9092");
|
||||
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, bootConfig);
|
||||
AdminClient adminClient = KafkaTestUtils.getPropertyValue(provisioner, "adminClient", AdminClient.class);
|
||||
AdminClient adminClient = provisioner.createAdminClient();
|
||||
assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
|
||||
Map configs = KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder.configs", Map.class);
|
||||
assertThat(((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0)).isEqualTo("localhost:1234");
|
||||
adminClient.close();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@@ -73,13 +74,13 @@ public class KafkaTopicProvisionerTests {
|
||||
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ts.getFile().getAbsolutePath());
|
||||
binderConfig.setBrokers("localhost:1234");
|
||||
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, bootConfig);
|
||||
AdminClient adminClient = KafkaTestUtils.getPropertyValue(provisioner, "adminClient", AdminClient.class);
|
||||
AdminClient adminClient = provisioner.createAdminClient();
|
||||
assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
|
||||
Map configs = KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder.configs", Map.class);
|
||||
assertThat(((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0)).isEqualTo("localhost:1234");
|
||||
adminClient.close();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void brokersInvalid() throws Exception {
|
||||
KafkaProperties bootConfig = new KafkaProperties();
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC1</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
|
||||
@@ -1,13 +1,7 @@
|
||||
== Kafka Streams Binding Capabilities of Spring Cloud Stream
|
||||
== Usage
|
||||
|
||||
Spring Cloud Stream Kafka support also includes a binder specifically designed for Apache Kafka Streams binding.
|
||||
Using this binder, applications can be written that leverage the Apache Kafka Streams API.
|
||||
For more information on Kafka Streams, see https://kafka.apache.org/documentation/streams/developer-guide[Kafka Streams API Developer Manual]
|
||||
|
||||
Kafka Streams support in Spring Cloud Stream is based on the foundations provided by the Spring Kafka project.
|
||||
For details on that support, see http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafaka Streams Support in Spring Kafka].
|
||||
|
||||
Here are the maven coordinates for the Spring Cloud Stream Kafka Streams binder artifact.
|
||||
For using the Kafka Streams binder, you just need to add it to your Spring Cloud Stream application, using the following
|
||||
Maven coordinates:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
@@ -17,13 +11,29 @@ Here are the maven coordinates for the Spring Cloud Stream Kafka Streams binder
|
||||
</dependency>
|
||||
----
|
||||
|
||||
High level streams DSL provided through the Kafka Streams API can be used through Spring Cloud Stream support.
|
||||
Some minimal support for writing applications using the processor API is also available through the binder.
|
||||
Kafka Streams applications using the Spring Cloud Stream support can be written using the processor model, i.e. messages read from an inbound topic and messages written to an outbound topic or using the sink style where it does not have an output binding.
|
||||
== Kafka Streams Binder Overview
|
||||
|
||||
=== Usage example of high level streams DSL
|
||||
Spring Cloud Stream's Apache Kafka support also includes a binder implementation designed explicitly for Apache Kafka
|
||||
Streams binding. With this native integration, a Spring Cloud Stream "processor" application can directly use the
|
||||
https://kafka.apache.org/documentation/streams/developer-guide[Apache Kafka Streams] APIs in the core business logic.
|
||||
|
||||
This application will listen from a Kafka topic and write the word count for each unique word that it sees in a 5 seconds time window.
|
||||
Kafka Streams binder implementation builds on the foundation provided by the http://docs.spring.io/spring-kafka/reference/html/_reference.html#kafka-streams[Kafka Streams in Spring Kafka]
|
||||
project.
|
||||
|
||||
As part of this native integration, the high-level https://docs.confluent.io/current/streams/developer-guide/dsl-api.html[Streams DSL]
|
||||
provided by the Kafka Streams API is available for use in the business logic, too.
|
||||
|
||||
An early version of the https://docs.confluent.io/current/streams/developer-guide/processor-api.html[Processor API]
|
||||
support is available as well.
|
||||
|
||||
As noted early-on, Kafka Streams support in Spring Cloud Stream strictly only available for use in the Processor model.
|
||||
A model in which the messages read from an inbound topic, business processing can be applied, and the transformed messages
|
||||
can be written to an outbound topic. It can also be used in Processor applications with a no-outbound destination.
|
||||
|
||||
=== Streams DSL
|
||||
|
||||
This application consumes data from a Kafka topic (e.g., `words`), computes word count for each unique word in a 5 seconds
|
||||
time window, and the computed results are sent to a downstream topic (e.g., `counts`) for further processing.
|
||||
|
||||
[source]
|
||||
----
|
||||
@@ -48,25 +58,130 @@ public class WordCountProcessorApplication {
|
||||
}
|
||||
----
|
||||
|
||||
If you build it as a Spring Boot uber jar, you can run the above example in the following way:
|
||||
Once built as a uber-jar (e.g., `wordcount-processor.jar`), you can run the above example like the following.
|
||||
|
||||
[source]
|
||||
----
|
||||
java -jar uber.jar --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts
|
||||
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.input.destination=words --spring.cloud.stream.bindings.output.destination=counts
|
||||
----
|
||||
|
||||
This means that the application will listen from the incoming Kafka topic `words` and write to the output topic `counts`.
|
||||
This application will consume messages from the Kafka topic `words` and the computed results are published to an output
|
||||
topic `counts`.
|
||||
|
||||
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are bound as KStream objects.
|
||||
Applications can exclusively focus on the business aspects of the code, i.e. writing the logic required in the processor rather than setting up the streams specific configuration required by the Kafka Streams infrastructure.
|
||||
All such infrastructure details are handled by the framework.
|
||||
Spring Cloud Stream will ensure that the messages from both the incoming and outgoing topics are automatically bound as
|
||||
KStream objects. As a developer, you can exclusively focus on the business aspects of the code, i.e. writing the logic
|
||||
required in the processor. Setting up the Streams DSL specific configuration required by the Kafka Streams infrastructure
|
||||
is automatically handled by the framework.
|
||||
|
||||
=== Multiple Input bindings on the inbound
|
||||
== Configuration Options
|
||||
|
||||
Spring Cloud Stream Kafka Streams binder allows the users to write applications with multiple bindings.
|
||||
There are use cases in which you may want to have multiple incoming KStream objects or a combination of KStream and KTable objects.
|
||||
Both of these flavors are supported.
|
||||
Here are some examples.
|
||||
This section contains the configuration options used by the Kafka Streams binder.
|
||||
|
||||
For common configuration options and properties pertaining to binder, refer to the <<binding-properties,core documentation>>.
|
||||
|
||||
=== Kafka Streams Properties
|
||||
|
||||
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.binder.`
|
||||
literal.
|
||||
|
||||
configuration::
|
||||
Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
|
||||
This property must be prefixed with `spring.cloud.stream.kafka.streams.binder.`.
|
||||
Following are some examples of using this property.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
|
||||
----
|
||||
|
||||
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in
|
||||
Apache Kafka Streams docs.
|
||||
|
||||
brokers::
|
||||
Broker URL
|
||||
+
|
||||
Default: `localhost`
|
||||
zkNodes::
|
||||
Zookeeper URL
|
||||
+
|
||||
Default: `localhost`
|
||||
serdeError::
|
||||
Deserialization error handler type.
|
||||
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
|
||||
+
|
||||
Default: `logAndFail`
|
||||
applicationId::
|
||||
Application ID for all the stream configurations in the current application context.
|
||||
You can override the application id for an individual `StreamListener` method using the `group` property on the binding.
|
||||
You have to ensure that you are using the same group name for all input bindings in the case of multiple inputs on the same methods.
|
||||
+
|
||||
Default: `default`
|
||||
|
||||
The following properties are _only_ available for Kafka Streams producers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`
|
||||
literal.
|
||||
|
||||
keySerde::
|
||||
key serde to use
|
||||
+
|
||||
Default: `none`.
|
||||
valueSerde::
|
||||
value serde to use
|
||||
+
|
||||
Default: `none`.
|
||||
useNativeEncoding::
|
||||
flag to enable native encoding
|
||||
+
|
||||
Default: `false`.
|
||||
|
||||
The following properties are _only_ available for Kafka Streams consumers and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`
|
||||
literal.
|
||||
|
||||
keySerde::
|
||||
key serde to use
|
||||
+
|
||||
Default: `none`.
|
||||
valueSerde::
|
||||
value serde to use
|
||||
+
|
||||
Default: `none`.
|
||||
materializedAs::
|
||||
state store to materialize when using incoming KTable types
|
||||
+
|
||||
Default: `none`.
|
||||
useNativeDecoding::
|
||||
flag to enable native decoding
|
||||
+
|
||||
Default: `false`.
|
||||
dlqName::
|
||||
DLQ topic name.
|
||||
+
|
||||
Default: `none`.
|
||||
|
||||
=== TimeWindow properties:
|
||||
|
||||
Windowing is an important concept in stream processing applications. Following properties are available to configure
|
||||
time-window computations.
|
||||
|
||||
spring.cloud.stream.kafka.streams.timeWindow.length::
|
||||
When this property is given, you can autowire a `TimeWindows` bean into the application.
|
||||
The value is expressed in milliseconds.
|
||||
+
|
||||
Default: `none`.
|
||||
spring.cloud.stream.kstream.timeWindow.advanceBy::
|
||||
Value is given in milliseconds.
|
||||
+
|
||||
Default: `none`.
|
||||
|
||||
== Multiple Input Bindings
|
||||
|
||||
For use cases that requires multiple incoming KStream objects or a combination of KStream and KTable objects, the Kafka
|
||||
Streams binder provides multiple bindings support.
|
||||
|
||||
Let's see it in action.
|
||||
|
||||
=== Multiple Input Bindings as a Sink
|
||||
|
||||
[source]
|
||||
----
|
||||
@@ -91,17 +206,19 @@ interface KStreamKTableBinding {
|
||||
|
||||
----
|
||||
|
||||
In the above example, the application is written in a sink style, i.e. there are no output bindings and the application has to make the decision to what needs to happen.
|
||||
Most likely, when you write applications this way, you might want to send the information downstream or store them in a state store (See below for Queryable State Stores).
|
||||
In the above example, the application is written as a sink, i.e. there are no output bindings and the application has to
|
||||
decide concerning downstream processing. When you write applications in this style, you might want to send the information
|
||||
downstream or store them in a state store (See below for Queryable State Stores).
|
||||
|
||||
In the case of incoming KTable, if you want to materialize it as a state store, you have to express that through the following property.
|
||||
In the case of incoming KTable, if you want to materialize the computations to a state store, you have to express it
|
||||
through the following property.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kafka.streams.bindings.inputTable.consumer.materializedAs: all-songs
|
||||
----
|
||||
|
||||
Here is an example for multiple input bindings and an output binding (processor style).
|
||||
=== Multiple Input Bindings as a Processor
|
||||
|
||||
[source]
|
||||
----
|
||||
@@ -125,15 +242,16 @@ interface KStreamKTableBinding extends KafkaStreamsProcessor {
|
||||
|
||||
----
|
||||
|
||||
=== Support for branching in Kafka Streams API
|
||||
== Multiple Output Bindings (aka Branching)
|
||||
|
||||
Kafka Streams allow outbound data to be split into multiple topics based on some predicates.
|
||||
Spring Cloud Stream Kafka Streams binder provides support for this feature without losing the overall programming model exposed through `StreamListener` in the end user application.
|
||||
You write the application in the usual way as demonstrated above in the word count example.
|
||||
When using the branching feature, you are required to do a few things.
|
||||
First, you need to make sure that your return type is `KStream[]` instead of a regular `KStream`.
|
||||
Then you need to use the `SendTo` annotation containing the output bindings in the order (example below).
|
||||
For each of these output bindings, you need to configure destination, content-type etc. as required by any other standard Spring Cloud Stream application
|
||||
Kafka Streams allow outbound data to be split into multiple topics based on some predicates. The Kafka Streams binder provides
|
||||
support for this feature without compromising the programming model exposed through `StreamListener` in the end user application.
|
||||
|
||||
You can write the application in the usual way as demonstrated above in the word count example. However, when using the
|
||||
branching feature, you are required to do a few things. First, you need to make sure that your return type is `KStream[]`
|
||||
instead of a regular `KStream`. Second, you need to use the `SendTo` annotation containing the output bindings in the order
|
||||
(see example below). For each of these output bindings, you need to configure destination, content-type etc., complying with
|
||||
the standard Spring Cloud Stream expectations.
|
||||
|
||||
Here is an example:
|
||||
|
||||
@@ -181,7 +299,7 @@ public static class WordCountProcessorApplication {
|
||||
}
|
||||
----
|
||||
|
||||
Then in the properties:
|
||||
Properties:
|
||||
|
||||
[source]
|
||||
----
|
||||
@@ -210,18 +328,24 @@ spring.cloud.stream.bindings.input:
|
||||
headerMode: raw
|
||||
----
|
||||
|
||||
=== Message conversion in Spring Cloud Stream Kafka Streams applications
|
||||
== Message Conversion
|
||||
|
||||
Spring Cloud Stream Kafka Streams binder allows the usage of usual patterns for content type conversions as in other message channel based binder applications.
|
||||
Many Kafka Streams operations - that are part of the actual application and not at the inbound and outbound - need to know the type of SerDe’s used to correctly transform key and value data.
|
||||
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself for inbound and outbound conversions rather than using the content type conversions offered by the framework.
|
||||
On the other hand, you might be already familiar with the content type conversion patterns in spring cloud stream and want to keep using them for inbound and outbound conversions.
|
||||
Both options are supported in the Spring Cloud Stream binder for Apache Kafka Streams.
|
||||
Similar to message-channel based binder applications, the Kafka Streams binder adapts to the out-of-the-box content-type
|
||||
conversions without any compromise.
|
||||
|
||||
It is typical for Kafka Streams operations to know the type of SerDe’s used to transform the key and value correctly.
|
||||
Therefore, it may be more natural to rely on the SerDe facilities provided by the Apache Kafka Streams library itself at
|
||||
the inbound and outbound conversions rather than using the content-type conversions offered by the framework.
|
||||
On the other hand, you might be already familiar with the content-type conversion patterns provided by the framework, and
|
||||
that, you'd like to continue using for inbound and outbound conversions.
|
||||
|
||||
Both the options are supported in the Kafka Streams binder implementation.
|
||||
|
||||
==== Outbound serialization
|
||||
|
||||
If native encoding is disabled (which is the default), then the framework will convert the message using the contentType set by the user (or the default content type of application/json).
|
||||
It will ignore any Serde set on the outbound in this case for outbound serialization.
|
||||
If native encoding is disabled (which is the default), then the framework will convert the message using the contentType
|
||||
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the outbound
|
||||
in this case for outbound serialization.
|
||||
|
||||
Here is the property to set the contentType on the outbound.
|
||||
|
||||
@@ -237,17 +361,19 @@ Here is the property to enable native encoding.
|
||||
spring.cloud.stream.bindings.output.nativeEncoding: true
|
||||
----
|
||||
|
||||
If native encoding is enabled on the output binding (user has to explicitly enable it as above), then the framework will skip doing any message conversion on the outbound.
|
||||
In that case, it will use the Serde set by the user.
|
||||
First, it checks for the `valueSerde` property set on the actual output binding. Here is an example
|
||||
If native encoding is enabled on the output binding (user has to enable it as above explicitly), then the framework will
|
||||
skip any form of automatic message conversion on the outbound. In that case, it will switch to the Serde set by the user.
|
||||
The `valueSerde` property set on the actual output binding will be used. Here is an example.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
----
|
||||
If this property is not set, then it will default to the common value Serde - `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
|
||||
If this property is not set, then it will use the "default" SerDe: `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
|
||||
|
||||
It is worth to mention that Spring Cloud Stream Kafka Streams binder does not serialize the keys on outbound, rather it is always done by Kafka itself.
|
||||
Therefore, you either have to specify the keySerde property on the binding or it will default to the application wide common keySerde set on the streams configuration.
|
||||
It is worth to mention that Kafka Streams binder does not serialize the keys on outbound - it simply relies on Kafka itself.
|
||||
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
|
||||
`keySerde`.
|
||||
|
||||
Binding level key serde:
|
||||
|
||||
@@ -283,17 +409,20 @@ interface KStreamProcessorWithBranches {
|
||||
}
|
||||
----
|
||||
|
||||
If nativeEncoding is set, then you can set different Serde values on these individual output bindings as below.
|
||||
If `nativeEncoding` is set, then you can set different SerDe's on individual output bindings as below.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kstream.bindings.output1.producer.valueSerde=IntegerSerde
|
||||
spring.cloud.stream.kstream.bindings.outpu2t.producer.valueSerde=StringSerde
|
||||
spring.cloud.stream.kstream.bindings.output2.producer.valueSerde=StringSerde
|
||||
spring.cloud.stream.kstream.bindings.output3.producer.valueSerde=JsonSerde
|
||||
----
|
||||
|
||||
Then if you have `SendTo` like this, @SendTo({"output1", "output2", "output3"}), the `KStream[]` from the branches are applied with proper Serde objects as defined above.
|
||||
If you are not enabling nativeEncoding, you can then set different contentType values on the output bindings as below.
|
||||
In that case, the framework will use the appropriate message converter to convert the messages before sending to Kafka.
|
||||
Then if you have `SendTo` like this, @SendTo({"output1", "output2", "output3"}), the `KStream[]` from the branches are
|
||||
applied with proper SerDe objects as defined above. If you are not enabling `nativeEncoding`, you can then set different
|
||||
contentType values on the output bindings as below. In that case, the framework will use the appropriate message converter
|
||||
to convert the messages before sending to Kafka.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.bindings.output1.contentType: application/json
|
||||
@@ -303,10 +432,11 @@ spring.cloud.stream.bindings.output3.contentType: application/octet-stream
|
||||
|
||||
==== Inbound Deserialization
|
||||
|
||||
Similar rules apply to data deserialization on the inbound as in the case of outbound serialization.
|
||||
Similar rules apply to data deserialization on the inbound.
|
||||
|
||||
If native decoding is disabled (which is the default), then the framework will convert the message using the contentType set by the user (or the default content type of application/json).
|
||||
It will ignore any Serde set on the inbound in this case for inbound dserialization.
|
||||
If native decoding is disabled (which is the default), then the framework will convert the message using the contentType
|
||||
set by the user (otherwise, the default `application/json` will be applied). It will ignore any SerDe set on the inbound
|
||||
in this case for inbound deserialization.
|
||||
|
||||
Here is the property to set the contentType on the inbound.
|
||||
|
||||
@@ -322,18 +452,20 @@ Here is the property to enable native decoding.
|
||||
spring.cloud.stream.bindings.input.nativeDecoding: true
|
||||
----
|
||||
|
||||
If native decoding is enabled on the input binding (user has to explicitly enable it as above), then the framework will skip doing any message conversion on the inbound.
|
||||
In that case, it will use the Serde set by the user.
|
||||
First, it checks for the `valueSerde` property set on the actual input binding. Here is an example
|
||||
If native decoding is enabled on the input binding (user has to enable it as above explicitly), then the framework will
|
||||
skip doing any message conversion on the inbound. In that case, it will switch to the SerDe set by the user. The `valueSerde`
|
||||
property set on the actual output binding will be used. Here is an example.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kafka.streams.bindings.input.consumer.valueSerde: org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
----
|
||||
If this property is not set, then it will default to the common value Serde - `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
|
||||
|
||||
It is worth to mention that Spring Cloud Stream Kafka Streams binder does not deserialize the keys on inbound, rather it is always done by Kafka itself.
|
||||
Therefore, you either have to specify the keySerde property on the binding or it will default to the application wide common keySerde set on the streams configuration.
|
||||
If this property is not set, it will use the default SerDe: `spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde`.
|
||||
|
||||
It is worth to mention that Kafka Streams binder does not deserialize the keys on inbound - it simply relies on Kafka itself.
|
||||
Therefore, you either have to specify the `keySerde` property on the binding or it will default to the application-wide common
|
||||
`keySerde`.
|
||||
|
||||
Binding level key serde:
|
||||
|
||||
@@ -349,53 +481,64 @@ Common Key serde:
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
|
||||
----
|
||||
|
||||
As in the case of KStream branching on the outbound, the benefit of setting value Serde per binding is that if you have multiple input bindings (multiple KStreams) and they all require separate value Serdes, then you can configure them individually.
|
||||
If you use the common configuration approach, then that is not possible.
|
||||
As in the case of KStream branching on the outbound, the benefit of setting value SerDe per binding is that if you have
|
||||
multiple input bindings (multiple KStreams object) and they all require separate value SerDe's, then you can configure
|
||||
them individually. If you use the common configuration approach, then this feature won't be applicable.
|
||||
|
||||
==== Error handling on Deserialization exceptions
|
||||
== Error Handling
|
||||
|
||||
Apache Kafka Streams now provide the capability for natively handling exceptions from deserialization errors.
|
||||
Apache Kafka Streams provide the capability for natively handling exceptions from deserialization errors.
|
||||
For details on this support, please see https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers[this]
|
||||
Out of the box, Apache Kafka Streams provide two kinds of deserialization exception handlers - logAndContinue and logAndFail.
|
||||
As the name indicates, the former will log the error and continue processing next records and the latter will log the error and fai..
|
||||
LogAndFail is the default deserialization exception handler.
|
||||
Out of the box, Apache Kafka Streams provide two kinds of deserialization exception handlers - `logAndContinue` and `logAndFail`.
|
||||
As the name indicates, the former will log the error and continue processing the next records and the latter will log the
|
||||
error and fail. `LogAndFail` is the default deserialization exception handler.
|
||||
|
||||
=== Handling Deserialization Exceptions
|
||||
|
||||
Kafka Streams binder supports a selection of exception handlers through the following properties.
|
||||
|
||||
Spring Cloud Stream binder for Apache Kafka Streams allows to specify these exception handlers through the following properties.
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue
|
||||
----
|
||||
|
||||
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the bad records (poison pills) to a DLQ topic.
|
||||
Here is how you enable this DLQ exception handler.
|
||||
In addition to the above two deserialization exception handlers, the binder also provides a third one for sending the erroneous
|
||||
records (poison pills) to a DLQ topic. Here is how you enable this DLQ exception handler.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kafka.streams.binder.serdeError: sendToDlq
|
||||
----
|
||||
When the above property is set, then all records in error from deserialization are sent to the DLQ topic.
|
||||
First it checks, if there is a `dlqName` property is set on the binding itself using the following property.
|
||||
When the above property is set, all the deserialization error records are automatically sent to the DLQ topic.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kafka.streams.bindings.input.consumer.dlqName: foo-dlq
|
||||
----
|
||||
If this is set, then the records in error are sent to the topic `foo-dlq`.
|
||||
If this is not set, then it will create a DLQ topic called `error.<input-topic-name>.<group-name>`.
|
||||
|
||||
A couple of things to keep in mind when using the exception handling feature through Spring Cloud Stream binder for Apache Kafka Streams.
|
||||
If this is set, then the error records are sent to the topic `foo-dlq`. If this is not set, then it will create a DLQ
|
||||
topic with the name `error.<input-topic-name>.<group-name>`.
|
||||
|
||||
* The property `spring.cloud.stream.kafka.streams.binder.serdeError` is applicable for the entire application.
|
||||
This implies that if there are multiple `StreamListener` methods in the same application, this property is applied to all of them.
|
||||
* The exception handling for deserialization works consistently with native deserialization and framework provided message conversion.
|
||||
A couple of things to keep in mind when using the exception handling feature in Kafka Streams binder.
|
||||
|
||||
==== Handling Non-Deserialization exceptions
|
||||
* The property `spring.cloud.stream.kafka.streams.binder.serdeError` is applicable for the entire application. This implies
|
||||
that if there are multiple `StreamListener` methods in the same application, this property is applied to all of them.
|
||||
* The exception handling for deserialization works consistently with native deserialization and framework provided message
|
||||
conversion.
|
||||
|
||||
Other kinds of error handling is limited in Apache Kafka Streams currently and it is up to the end user applications to handle any such application level errors.
|
||||
One side effect of providing a DLQ for deserialization exception handlers as above is that, it provides a way to get access to the DLQ sending bean directly from your application.
|
||||
=== Handling Non-Deserialization Exceptions
|
||||
|
||||
For general error handling in Kafka Streams binder, it is up to the end user applications to handle application level errors.
|
||||
As a side effect of providing a DLQ for deserialization exception handlers, Kafka Streams binder provides a way to get
|
||||
access to the DLQ sending bean directly from your application.
|
||||
Once you get access to that bean, you can programmatically send any exception records from your application to the DLQ.
|
||||
Here is an example for how you may do that.
|
||||
Keep in mind that, this approach only works out of the box when you use the low level processor API in your application as below.
|
||||
It still remains hard to achieve the same using the high level DSL without the library natively providing error handling support, but this example provides some hints to work around.
|
||||
|
||||
It continues to remain hard to robust error handling using the high-level DSL; Kafka Streams doesn't natively support error
|
||||
handling yet.
|
||||
|
||||
However, when you use the low-level Processor API in your application, there are options to control this behavior. See
|
||||
below.
|
||||
|
||||
|
||||
[source]
|
||||
----
|
||||
@@ -434,11 +577,11 @@ public KStream<?, WordCount> process(KStream<Object, String> input) {
|
||||
}
|
||||
----
|
||||
|
||||
=== Support for interactive queries
|
||||
== Interactive Queries
|
||||
|
||||
As part of the public API of the binder, it now exposes a class called `QueryableStoreRegistry`.
|
||||
You can access this as a Spring bean in your application.
|
||||
One easy way to get access to this bean from your application is to autowire the bean as below.
|
||||
As part of the public Kafka Streams binder API, we expose a class called `QueryableStoreRegistry`. You can access this
|
||||
as a Spring bean in your application. An easy way to get access to this bean from your application is to "autowire" the bean
|
||||
in your application.
|
||||
|
||||
[source]
|
||||
----
|
||||
@@ -446,113 +589,10 @@ One easy way to get access to this bean from your application is to autowire the
|
||||
private QueryableStoreRegistry queryableStoreRegistry;
|
||||
----
|
||||
|
||||
Once you gain access to this bean, then you can find out the particular state store that you are interested in.
|
||||
Here is an example:
|
||||
Once you gain access to this bean, then you can query for the particular state-store that you are interested. See below.
|
||||
|
||||
[source]
|
||||
----
|
||||
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
|
||||
queryableStoreRegistry.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
|
||||
----
|
||||
Then you can retrieve the data that you stored in this store during the execution of your application.
|
||||
|
||||
=== Kafka Streams properties
|
||||
|
||||
We covered all the relevant properties that you need when writing Kafka Streams applications using Spring Cloud Stream, scattered in the above sections, but here they are again.
|
||||
|
||||
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.binder.`.
|
||||
|
||||
configuration::
|
||||
Map with a key/value pair containing properties pertaining to Apache Kafka Streams API.
|
||||
This property must be prefixed with `spring.cloud.stream.kafka.streams.binder.`.
|
||||
Following are some examples of using this property.
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
|
||||
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
|
||||
----
|
||||
|
||||
For more information about all the properties that may go into streams configuration, see StreamsConfig JavaDocs in Apache Kafka Streams docs.
|
||||
|
||||
brokers::
|
||||
Broker URL
|
||||
+
|
||||
Default: `localhost`
|
||||
zkNodes::
|
||||
Zookeeper URL
|
||||
+
|
||||
Default: `localhost`
|
||||
serdeError::
|
||||
Deserialization error handler type.
|
||||
Possible values are - `logAndContinue`, `logAndFail` or `sendToDlq`
|
||||
+
|
||||
Default: `logAndFail`
|
||||
applicationId::
|
||||
Application ID for all the stream configurations in the current application context.
|
||||
You can override the application id for an individual `StreamListener` method using the `group` property on the binding.
|
||||
You have to ensure that you are using the same group name for all input bindings in the case of multiple inputs on the same methods.
|
||||
+
|
||||
Default: `default`
|
||||
|
||||
|
||||
The following properties are available for Kafka Streams producers only and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.`.
|
||||
|
||||
keySerde::
|
||||
key serde to use
|
||||
+
|
||||
Default: `none`.
|
||||
valueSerde::
|
||||
value serde to use
|
||||
+
|
||||
Default: `none`.
|
||||
useNativeEncoding::
|
||||
flag to enable native encoding
|
||||
+
|
||||
Default: `false`.
|
||||
|
||||
The following properties are available for Kafka Streams consumers only and must be prefixed with `spring.cloud.stream.kafka.streams.bindings.<binding name>.consumer.`.
|
||||
|
||||
keySerde::
|
||||
key serde to use
|
||||
+
|
||||
Default: `none`.
|
||||
valueSerde::
|
||||
value serde to use
|
||||
+
|
||||
Default: `none`.
|
||||
materializedAs::
|
||||
state store to materialize when using incoming KTable types
|
||||
+
|
||||
Default: `none`.
|
||||
useNativeDecoding::
|
||||
flag to enable native decoding
|
||||
+
|
||||
Default: `false`.
|
||||
dlqName::
|
||||
DLQ topic name.
|
||||
+
|
||||
Default: `none`.
|
||||
|
||||
Other common properties used from core Spring Cloud Stream.
|
||||
|
||||
[source]
|
||||
----
|
||||
spring.cloud.stream.bindings.<binding name>.destination
|
||||
spring.cloud.stream.bindings.<binding name>.group
|
||||
----
|
||||
|
||||
TimeWindow properties:
|
||||
|
||||
Windowing is an important concept in stream processing applications.
|
||||
Following properties are available for configuring time windows.
|
||||
|
||||
spring.cloud.stream.kafka.streams.timeWindow.length::
|
||||
When this property is given, you can autowire a `TimeWindows` bean into the application.
|
||||
The value is expressed in milliseconds.
|
||||
+
|
||||
Default: `none`.
|
||||
spring.cloud.stream.kstream.timeWindow.advanceBy::
|
||||
Value is given in milliseconds.
|
||||
+
|
||||
Default: `none`.
|
||||
----
|
||||
@@ -7,7 +7,7 @@ In addition, this guide also explains the Kafka Streams binding capabilities of
|
||||
|
||||
== Usage
|
||||
|
||||
For using the Apache Kafka binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
|
||||
To use Apache Kafka binder all you need is to add `spring-cloud-stream-binder-kafka` as a dependency to your Spring Cloud Stream application. Below is a Maven example:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
@@ -38,6 +38,11 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
|
||||
The consumer group maps directly to the same Apache Kafka concept.
|
||||
Partitioning also maps directly to Apache Kafka partitions as well.
|
||||
|
||||
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker at least that version.
|
||||
This client can communicate with older brokers (refer to the Kafka documentation), but certain features may not be available.
|
||||
For example, with versions earlier than 0.11.x.x, native headers are not supported.
|
||||
Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
||||
|
||||
== Configuration Options
|
||||
|
||||
This section contains the configuration options used by the Apache Kafka binder.
|
||||
@@ -55,42 +60,24 @@ spring.cloud.stream.kafka.binder.defaultBrokerPort::
|
||||
This sets the default port when no port is configured in the broker list.
|
||||
+
|
||||
Default: `9092`.
|
||||
spring.cloud.stream.kafka.binder.zkNodes::
|
||||
A list of ZooKeeper nodes to which the Kafka binder can connect.
|
||||
+
|
||||
Default: `localhost`.
|
||||
spring.cloud.stream.kafka.binder.defaultZkPort::
|
||||
`zkNodes` allows hosts specified with or without port information (e.g., `host1,host2:port2`).
|
||||
This sets the default port when no port is configured in the node list.
|
||||
+
|
||||
Default: `2181`.
|
||||
spring.cloud.stream.kafka.binder.configuration::
|
||||
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder.
|
||||
Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, especially security settings.
|
||||
Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, for example, security settings.
|
||||
+
|
||||
Default: Empty map.
|
||||
spring.cloud.stream.kafka.binder.headers::
|
||||
The list of custom headers that will be transported by the binder.
|
||||
Only required when communicating with older applications (<= 1.3.x) with a `kafka-clients` version < 0.11.0.0; newer versions support headers natively.
|
||||
+
|
||||
Default: empty.
|
||||
spring.cloud.stream.kafka.binder.healthTimeout::
|
||||
The time to wait to get partition information in seconds; default 60.
|
||||
Health will report as down if this timer expires.
|
||||
Health will report as down if this timer expires.
|
||||
+
|
||||
Default: 10.
|
||||
spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow::
|
||||
The frequency, in milliseconds, with which offsets are saved.
|
||||
Ignored if `0`.
|
||||
+
|
||||
Default: `10000`.
|
||||
spring.cloud.stream.kafka.binder.offsetUpdateCount::
|
||||
The frequency, in number of updates, which which consumed offsets are persisted.
|
||||
Ignored if `0`.
|
||||
Mutually exclusive with `offsetUpdateTimeWindow`.
|
||||
+
|
||||
Default: `0`.
|
||||
spring.cloud.stream.kafka.binder.requiredAcks::
|
||||
The number of required acks on the broker.
|
||||
See the Kafka documentation for the producer `acks` property.
|
||||
+
|
||||
Default: `1`.
|
||||
spring.cloud.stream.kafka.binder.minPartitionCount::
|
||||
@@ -101,6 +88,7 @@ It can be superseded by the `partitionCount` setting of the producer or by the v
|
||||
Default: `1`.
|
||||
spring.cloud.stream.kafka.binder.replicationFactor::
|
||||
The replication factor of auto-created topics if `autoCreateTopics` is active.
|
||||
Can be overriden on each binding.
|
||||
+
|
||||
Default: `1`.
|
||||
spring.cloud.stream.kafka.binder.autoCreateTopics::
|
||||
@@ -116,10 +104,6 @@ If set to `false`, the binder will rely on the partition size of the topic being
|
||||
If the partition count of the target topic is smaller than the expected value, the binder will fail to start.
|
||||
+
|
||||
Default: `false`.
|
||||
spring.cloud.stream.kafka.binder.socketBufferSize::
|
||||
Size (in bytes) of the socket buffer to be used by the Kafka consumers.
|
||||
+
|
||||
Default: `2097152`.
|
||||
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix::
|
||||
Enable transactions in the binder; see `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
|
||||
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.
|
||||
@@ -131,12 +115,37 @@ spring.cloud.stream.kafka.binder.transaction.producer.*::
|
||||
+
|
||||
Default: See individual producer properties.
|
||||
|
||||
spring.cloud.stream.kafka.binder.headerMapperBeanName::
|
||||
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to/from Kafka headers.
|
||||
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper`, which uses JSON deserialization for the headers.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
[[kafka-consumer-properties]]
|
||||
=== Kafka Consumer Properties
|
||||
|
||||
The following properties are available for Kafka consumers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
|
||||
|
||||
admin.configuration::
|
||||
A `Map` of Kafka topic properties used when provisioning topics.
|
||||
e.g. `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
|
||||
+
|
||||
Default: none.
|
||||
|
||||
admin.replicas-assignment::
|
||||
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and value the assignments.
|
||||
Used when provisioning new topics.
|
||||
See `NewTopic` javadocs in the `kafka-clients` jar.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
admin.replication-factor::
|
||||
The replication factor to use when provisioning topics; overrides the binder-wide setting.
|
||||
Ignored if `replicas-assignments` is present.
|
||||
+
|
||||
Default: none (the binder-wide default of 1 is used).
|
||||
|
||||
autoRebalanceEnabled::
|
||||
When `true`, topic partitions will be automatically rebalanced between the members of a consumer group.
|
||||
When `false`, each consumer will be assigned a fixed set of partitions based on `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex`.
|
||||
@@ -144,12 +153,21 @@ This requires both `spring.cloud.stream.instanceCount` and `spring.cloud.stream.
|
||||
The property `spring.cloud.stream.instanceCount` must typically be greater than 1 in this case.
|
||||
+
|
||||
Default: `true`.
|
||||
ackEachRecord::
|
||||
When `autoCommitOffset` is `true`, whether to commit the offset after each record is processed.
|
||||
By default, offsets are committed after all records in the batch of records returned by `consumer.poll()` have been processed.
|
||||
The number of records returned by a poll can be controlled with the `max.poll.recods` Kafka property, set via the consumer `configuration` property.
|
||||
Setting this to true may cause a degradation in performance, but reduces the likelihood of redelivered records when a failure occurs.
|
||||
Also see the binder `requiredAcks` property, which also affects the performance of committing offsets.
|
||||
+
|
||||
Default: `false`.
|
||||
autoCommitOffset::
|
||||
Whether to autocommit offsets when a message has been processed.
|
||||
If set to `false`, a header with the key `kafka_acknowledgment` of the type `org.springframework.kafka.support.Acknowledgment` header will be present in the inbound message.
|
||||
Applications may use this header for acknowledging messages.
|
||||
See the examples section for details.
|
||||
When this property is set to `false`, Kafka binder will set the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL`.
|
||||
When this property is set to `false`, Kafka binder will set the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL` and the application is responsible for acknowledging records.
|
||||
Also see `ackEachRecord`.
|
||||
+
|
||||
Default: `true`.
|
||||
autoCommitOnError::
|
||||
@@ -159,14 +177,15 @@ If set to `true`, it will always auto-commit (if auto-commit is enabled).
|
||||
If not set (default), it effectively has the same value as `enableDlq`, auto-committing erroneous messages if they are sent to a DLQ, and not committing them otherwise.
|
||||
+
|
||||
Default: not set.
|
||||
recoveryInterval::
|
||||
The interval between connection recovery attempts, in milliseconds.
|
||||
resetOffsets::
|
||||
Whether to reset offsets on the consumer to the value provided by startOffset.
|
||||
+
|
||||
Default: `5000`.
|
||||
Default: `false`.
|
||||
startOffset::
|
||||
The starting offset for new groups.
|
||||
Allowed values: `earliest`, `latest`.
|
||||
If the consumer group is set explicitly for the consumer 'binding' (via `spring.cloud.stream.bindings.<channelName>.group`), then 'startOffset' is set to `earliest`; otherwise it is set to `latest` for the `anonymous` consumer group.
|
||||
Also see `resetOffsets`.
|
||||
+
|
||||
Default: null (equivalent to `earliest`).
|
||||
enableDlq::
|
||||
@@ -214,6 +233,25 @@ Default: `30000`
|
||||
The following properties are available for Kafka producers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
|
||||
|
||||
admin.configuration::
|
||||
A `Map` of Kafka topic properties used when provisioning new topics.
|
||||
e.g. `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
|
||||
+
|
||||
Default: none.
|
||||
|
||||
admin.replicas-assignment::
|
||||
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and value the assignments.
|
||||
Used when provisioning new topics.
|
||||
See `NewTopic` javadocs in the `kafka-clients` jar.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
admin.replication-factor::
|
||||
The replication factor to use when provisioning new topics; overrides the binder-wide setting.
|
||||
Ignored if `replicas-assignments` is present.
|
||||
+
|
||||
Default: none (the binder-wide default of 1 is used).
|
||||
|
||||
bufferSize::
|
||||
Upper limit, in bytes, of how much data the Kafka producer will attempt to batch before sending.
|
||||
+
|
||||
@@ -229,15 +267,15 @@ batchTimeout::
|
||||
Default: `0`.
|
||||
messageKeyExpression::
|
||||
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message.
|
||||
For example `headers.key` or `payload.myKey`.
|
||||
For example `headers['myKey']`; the payload cannot be used because by the time this expression is evaluated, the payload is already in the form of a `byte[]`.
|
||||
+
|
||||
Default: `none`.
|
||||
headerPatterns::
|
||||
A comma-delimited list of simple patterns to match spring-messaging headers to be mapped to the kafka `Headers` in the `ProducerRecord`.
|
||||
Patterns can begin or end with the wildcard character (asterisk).
|
||||
Patterns can be negated by prefixing with `!`; matching stops after the first match (positive or negative).
|
||||
For example `!foo,fo*` will pass `fox` but not `foo`.
|
||||
`id` and `timestamp` are never mapped.
|
||||
Patterns can begin or end with the wildcard character (asterisk).
|
||||
Patterns can be negated by prefixing with `!`; matching stops after the first match (positive or negative).
|
||||
For example `!foo,fo*` will pass `fox` but not `foo`.
|
||||
`id` and `timestamp` are never mapped.
|
||||
+
|
||||
Default: `*` (all headers - except the `id` and `timestamp`)
|
||||
configuration::
|
||||
@@ -314,7 +352,6 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
|
||||
----
|
||||
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
|
||||
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
|
||||
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
|
||||
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
|
||||
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
|
||||
----
|
||||
@@ -343,7 +380,6 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
|
||||
[source]
|
||||
----
|
||||
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
|
||||
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
|
||||
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
|
||||
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
|
||||
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
|
||||
@@ -366,7 +402,7 @@ KafkaClient {
|
||||
};
|
||||
----
|
||||
|
||||
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent. As an alternative to setting `spring.cloud.stream.kafka.binder.autoCreateTopics` you can simply remove the broker dependency from the application. See <<exclude-admin-utils>> for details.
|
||||
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent.
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
@@ -421,103 +457,6 @@ public class Application {
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
==== Using the binder with Apache Kafka 0.10
|
||||
|
||||
The default Kafka support in Spring Cloud Stream Kafka binder is for Kafka version 0.10.1.1. The binder also supports connecting to other 0.10 based versions and 0.9 clients.
|
||||
In order to do this, when you create the project that contains your application, include `spring-cloud-starter-stream-kafka` as you normally would do for the default binder.
|
||||
Then add these dependencies at the top of the `<dependencies>` section in the pom.xml file to override the dependencies.
|
||||
|
||||
Here is an example for downgrading your application to 0.10.0.1. Since it is still on the 0.10 line, the default `spring-kafka` and `spring-integration-kafka` versions can be retained.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>0.10.0.1</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>0.10.0.1</version>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
Here is another example of using 0.9.0.1 version.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>1.0.5.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>2.0.1.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>0.9.0.1</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>0.9.0.1</version>
|
||||
</dependency>
|
||||
|
||||
----
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
The versions above are provided only for the sake of the example.
|
||||
For best results, we recommend using the most recent 0.10-compatible versions of the projects.
|
||||
====
|
||||
|
||||
[[exclude-admin-utils]]
|
||||
==== Excluding Kafka broker jar from the classpath of the binder based application
|
||||
|
||||
The Apache Kafka Binder uses the administrative utilities which are part of the Apache Kafka server library to create and reconfigure topics.
|
||||
If the inclusion of the Apache Kafka server library and its dependencies is not necessary at runtime because the application will rely on the topics being configured administratively, the Kafka binder allows for Apache Kafka server dependency to be excluded from the application.
|
||||
|
||||
If you use non default versions for Kafka dependencies as advised above, all you have to do is not to include the kafka broker dependency.
|
||||
If you use the default Kafka version, then ensure that you exclude the kafka broker jar from the `spring-cloud-starter-stream-kafka` dependency as following.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
If you exclude the Apache Kafka server dependency and the topic is not present on the server, then the Apache Kafka broker will create the topic if auto topic creation is enabled on the server.
|
||||
Please keep in mind that if you are relying on this, then the Kafka server will use the default number of partitions and replication factors.
|
||||
On the other hand, if auto topic creation is disabled on the server, then care must be taken before running the application to create the topic with the desired number of partitions.
|
||||
|
||||
If you want to have full control over how partitions are allocated, then leave the default settings as they are, i.e. do not exclude the kafka broker jar and ensure that `spring.cloud.stream.kafka.binder.autoCreateTopics` is set to `true`, which is the default.
|
||||
|
||||
[[kafka-error-channels]]
|
||||
== Error Channels
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
include::overview.adoc[leveloffset=+1]
|
||||
include::dlq.adoc[leveloffset=+1]
|
||||
include::partitions.adoc[leveloffset=+1]
|
||||
include::kafka-streams.adoc[leveloffset=+1]
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC1</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -26,10 +26,12 @@ import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
|
||||
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binding.BindingService;
|
||||
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
|
||||
import org.springframework.cloud.stream.config.BindingServiceProperties;
|
||||
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
|
||||
@@ -41,6 +43,7 @@ import org.springframework.util.ObjectUtils;
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
@EnableConfigurationProperties(KafkaStreamsExtendedBindingProperties.class)
|
||||
@ConditionalOnBean(BindingService.class)
|
||||
public class KafkaStreamsBinderSupportAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC1</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
@@ -18,6 +18,11 @@
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-actuator</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
* Copyright 2016-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -20,8 +20,10 @@ import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.TimeGauge;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@@ -32,21 +34,28 @@ import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
|
||||
import org.springframework.cloud.stream.binder.BindingCreatedEvent;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* Metrics for Kafka binder.
|
||||
*
|
||||
* @author Henryk Konsek
|
||||
* @author Soby Chacko
|
||||
* @author Artem Bilan
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Jon Schneider
|
||||
*/
|
||||
public class KafkaBinderMetrics implements MeterBinder {
|
||||
public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
|
||||
|
||||
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
|
||||
|
||||
static final String METRIC_PREFIX = "spring.cloud.stream.binder.kafka";
|
||||
static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
|
||||
|
||||
private final KafkaMessageChannelBinder binder;
|
||||
|
||||
@@ -54,23 +63,29 @@ public class KafkaBinderMetrics implements MeterBinder {
|
||||
|
||||
private ConsumerFactory<?, ?> defaultConsumerFactory;
|
||||
|
||||
private final MeterRegistry meterRegistry;
|
||||
|
||||
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
|
||||
KafkaBinderConfigurationProperties binderConfigurationProperties,
|
||||
ConsumerFactory<?, ?> defaultConsumerFactory) {
|
||||
ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
|
||||
|
||||
this.binder = binder;
|
||||
this.binderConfigurationProperties = binderConfigurationProperties;
|
||||
this.defaultConsumerFactory = defaultConsumerFactory;
|
||||
this.meterRegistry = meterRegistry;
|
||||
}
|
||||
|
||||
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
|
||||
KafkaBinderConfigurationProperties binderConfigurationProperties) {
|
||||
this(binder, binderConfigurationProperties, null);
|
||||
|
||||
this(binder, binderConfigurationProperties, null, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bindTo(MeterRegistry registry) {
|
||||
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
|
||||
.entrySet()) {
|
||||
|
||||
if (!topicInfo.getValue().isConsumerTopic()) {
|
||||
continue;
|
||||
}
|
||||
@@ -78,47 +93,67 @@ public class KafkaBinderMetrics implements MeterBinder {
|
||||
String topic = topicInfo.getKey();
|
||||
String group = topicInfo.getValue().getConsumerGroup();
|
||||
|
||||
try (Consumer<?, ?> metadataConsumer = createConsumerFactory(group).createConsumer()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
List<TopicPartition> topicPartitions = new LinkedList<>();
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
|
||||
}
|
||||
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
|
||||
long lag = 0;
|
||||
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
|
||||
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
|
||||
if (current != null) {
|
||||
lag += endOffset.getValue() - current.offset();
|
||||
}
|
||||
else {
|
||||
lag += endOffset.getValue();
|
||||
}
|
||||
}
|
||||
registry.gauge(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), lag);
|
||||
TimeGauge.builder(METRIC_NAME, this, TimeUnit.MILLISECONDS,
|
||||
o -> calculateConsumerLagOnTopic(topic, group))
|
||||
.tag("group", group)
|
||||
.tag("topic", topic)
|
||||
.description("Consumer lag for a particular group and topic")
|
||||
.register(registry);
|
||||
}
|
||||
}
|
||||
|
||||
private double calculateConsumerLagOnTopic(String topic, String group) {
|
||||
long lag = 0;
|
||||
try (Consumer<?, ?> metadataConsumer = createConsumerFactory(group).createConsumer()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
List<TopicPartition> topicPartitions = new LinkedList<>();
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.debug("Cannot generate metric for topic: " + topic, e);
|
||||
|
||||
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
|
||||
|
||||
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
|
||||
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
|
||||
if (current != null) {
|
||||
lag += endOffset.getValue() - current.offset();
|
||||
}
|
||||
else {
|
||||
lag += endOffset.getValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.debug("Cannot generate metric for topic: " + topic, e);
|
||||
}
|
||||
return lag;
|
||||
}
|
||||
|
||||
private ConsumerFactory<?, ?> createConsumerFactory(String group) {
|
||||
if (defaultConsumerFactory != null) {
|
||||
return defaultConsumerFactory;
|
||||
if (this.defaultConsumerFactory == null) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConsumerConfiguration())) {
|
||||
props.putAll(binderConfigurationProperties.getConsumerConfiguration());
|
||||
}
|
||||
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
this.binderConfigurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
props.put("group.id", group);
|
||||
this.defaultConsumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
}
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConsumerConfiguration())) {
|
||||
props.putAll(binderConfigurationProperties.getConsumerConfiguration());
|
||||
|
||||
return this.defaultConsumerFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(BindingCreatedEvent event) {
|
||||
if (this.meterRegistry != null) {
|
||||
// meters are idempotent when called with the same arguments so safe to call it multiple times
|
||||
this.bindTo(this.meterRegistry);
|
||||
}
|
||||
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
|
||||
this.binderConfigurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
props.put("group.id", group);
|
||||
return new DefaultKafkaConsumerFactory<>(props);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -28,7 +28,9 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
@@ -80,7 +82,9 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaderMapper;
|
||||
@@ -325,7 +329,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
Collection<PartitionInfo> listenedPartitions;
|
||||
|
||||
if (extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ||
|
||||
boolean groupManagement = extendedConsumerProperties.getExtension().isAutoRebalanceEnabled();
|
||||
if (groupManagement ||
|
||||
extendedConsumerProperties.getInstanceCount() == 1) {
|
||||
listenedPartitions = allPartitions;
|
||||
}
|
||||
@@ -354,6 +359,7 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
containerProperties.setIdleEventInterval(extendedConsumerProperties.getExtension().getIdleEventInterval());
|
||||
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
|
||||
resetOffsets(extendedConsumerProperties, consumerFactory, groupManagement, containerProperties);
|
||||
@SuppressWarnings("rawtypes")
|
||||
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
|
||||
new ConcurrentMessageListenerContainer(consumerFactory, containerProperties) {
|
||||
@@ -366,8 +372,14 @@ public class KafkaMessageChannelBinder extends
|
||||
};
|
||||
messageListenerContainer.setConcurrency(concurrency);
|
||||
// these won't be needed if the container is made a bean
|
||||
messageListenerContainer.setApplicationEventPublisher(getApplicationContext());
|
||||
if (getApplicationEventPublisher() != null) {
|
||||
messageListenerContainer.setApplicationEventPublisher(getApplicationEventPublisher());
|
||||
}
|
||||
else if (getApplicationContext() != null) {
|
||||
messageListenerContainer.setApplicationEventPublisher(getApplicationContext());
|
||||
}
|
||||
messageListenerContainer.setBeanName(destination.getName() + ".container");
|
||||
// end of these won't be needed...
|
||||
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
|
||||
messageListenerContainer.getContainerProperties()
|
||||
.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
|
||||
@@ -376,6 +388,9 @@ public class KafkaMessageChannelBinder extends
|
||||
else {
|
||||
messageListenerContainer.getContainerProperties()
|
||||
.setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
|
||||
if (extendedConsumerProperties.getExtension().isAckEachRecord()) {
|
||||
messageListenerContainer.getContainerProperties().setAckMode(AckMode.RECORD);
|
||||
}
|
||||
}
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug(
|
||||
@@ -397,6 +412,58 @@ public class KafkaMessageChannelBinder extends
|
||||
return kafkaMessageDrivenChannelAdapter;
|
||||
}
|
||||
|
||||
/*
|
||||
* Reset the offsets if needed; may update the offsets in in the container's
|
||||
* topicPartitionInitialOffsets.
|
||||
*/
|
||||
private void resetOffsets(
|
||||
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
|
||||
final ConsumerFactory<?, ?> consumerFactory, boolean groupManagement,
|
||||
final ContainerProperties containerProperties) {
|
||||
|
||||
boolean resetOffsets = extendedConsumerProperties.getExtension().isResetOffsets();
|
||||
final Object resetTo = consumerFactory.getConfigurationProperties().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
|
||||
final AtomicBoolean initialAssignment = new AtomicBoolean(true);
|
||||
if (!"earliest".equals(resetTo) && "!latest".equals(resetTo)) {
|
||||
logger.warn("no (or unknown) " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
|
||||
" property cannot reset");
|
||||
resetOffsets = false;
|
||||
}
|
||||
if (groupManagement && resetOffsets) {
|
||||
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
// no op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
// no op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
if (initialAssignment.getAndSet(false)) {
|
||||
if ("earliest".equals(resetTo)) {
|
||||
consumer.seekToBeginning(tps);
|
||||
}
|
||||
else if ("latest".equals(resetTo)) {
|
||||
consumer.seekToEnd(tps);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
else if (resetOffsets) {
|
||||
Arrays.stream(containerProperties.getTopicPartitions())
|
||||
.map(tpio -> new TopicPartitionInitialOffset(tpio.topic(), tpio.partition(),
|
||||
// SK GH-599 "earliest".equals(resetTo) ? SeekPosition.BEGINNING : SeekPosition.END))
|
||||
"earliest".equals(resetTo) ? 0L : Long.MAX_VALUE))
|
||||
.collect(Collectors.toList()).toArray(containerProperties.getTopicPartitions());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PolledConsumerResources createPolledConsumerResources(String name, String group,
|
||||
ConsumerDestination destination, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
|
||||
@@ -617,7 +684,7 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
}
|
||||
|
||||
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
|
||||
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015-2017 the original author or authors.
|
||||
* Copyright 2015-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -17,38 +17,31 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
|
||||
import org.springframework.kafka.support.LoggingProducerListener;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* @author David Turanski
|
||||
@@ -58,24 +51,21 @@ import org.springframework.util.ObjectUtils;
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Henryk Konsek
|
||||
* @author Gary Russell
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(Binder.class)
|
||||
@Import({ PropertyPlaceholderAutoConfiguration.class})
|
||||
@Import({KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderHealthIndicatorConfiguration.class })
|
||||
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
|
||||
public class KafkaBinderConfiguration {
|
||||
|
||||
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
|
||||
|
||||
@Autowired
|
||||
private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;
|
||||
|
||||
@Autowired
|
||||
private ProducerListener producerListener;
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
|
||||
@Autowired
|
||||
private KafkaProperties kafkaProperties;
|
||||
|
||||
@@ -91,7 +81,8 @@ public class KafkaBinderConfiguration {
|
||||
|
||||
@Bean
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
configurationProperties, provisioningProvider);
|
||||
kafkaMessageChannelBinder.setProducerListener(producerListener);
|
||||
@@ -105,40 +96,37 @@ public class KafkaBinderConfiguration {
|
||||
return new LoggingProducerListener();
|
||||
}
|
||||
|
||||
@Bean
|
||||
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
|
||||
props.putAll(configurationProperties.getConsumerConfiguration());
|
||||
}
|
||||
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
|
||||
consumerFactory);
|
||||
indicator.setTimeout(configurationProperties.getHealthTimeout());
|
||||
return indicator;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties) {
|
||||
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
|
||||
return new KafkaJaasLoginModuleInitializer();
|
||||
}
|
||||
|
||||
/**
|
||||
* A conditional configuration for the {@link KafkaBinderMetrics} bean when the
|
||||
* {@link MeterRegistry} class is in classpath, as well as a {@link MeterRegistry} bean is
|
||||
* present in the application context.
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(MeterRegistry.class)
|
||||
@ConditionalOnBean(MeterRegistry.class)
|
||||
protected class KafkaBinderMetricsConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
|
||||
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
MeterRegistry meterRegistry) {
|
||||
|
||||
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, null, meterRegistry);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class JaasConfigurationProperties {
|
||||
|
||||
private JaasLoginModuleConfiguration kafka;
|
||||
|
||||
private JaasLoginModuleConfiguration zookeeper;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
*
|
||||
*/
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnClass(name="org.springframework.boot.actuate.health.HealthIndicator")
|
||||
class KafkaBinderHealthIndicatorConfiguration {
|
||||
|
||||
@Bean
|
||||
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
|
||||
props.putAll(configurationProperties.getConsumerConfiguration());
|
||||
}
|
||||
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
|
||||
consumerFactory);
|
||||
indicator.setTimeout(configurationProperties.getHealthTimeout());
|
||||
return indicator;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaAdminProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.config.BinderFactoryConfiguration;
|
||||
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
|
||||
import org.springframework.integration.config.EnableIntegration;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 2.0
|
||||
*
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = {KafkaBinderConfiguration.class,
|
||||
BinderFactoryConfiguration.class,
|
||||
BindingServiceConfiguration.class })
|
||||
@TestPropertySource(properties = {
|
||||
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replication-factor=2",
|
||||
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replicas-assignments.0=0,1",
|
||||
"spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0" })
|
||||
@EnableIntegration
|
||||
public class AdminConfigTests {
|
||||
|
||||
@Autowired
|
||||
private KafkaMessageChannelBinder binder;
|
||||
|
||||
@Test
|
||||
public void testProps() {
|
||||
KafkaConsumerProperties consumerProps = this.binder.getExtendedConsumerProperties("input");
|
||||
KafkaAdminProperties admin = consumerProps.getAdmin();
|
||||
assertThat(admin.getReplicationFactor()).isEqualTo((short) 2);
|
||||
assertThat(admin.getReplicasAssignments().get(0)).isEqualTo(Arrays.asList(0, 1));
|
||||
assertThat(admin.getConfiguration().get("message.format.version")).isEqualTo("0.9.0.0");
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
* Copyright 2016-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -28,14 +28,12 @@ import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
@@ -50,8 +48,7 @@ import static org.junit.Assert.assertTrue;
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringBootTest(classes = { KafkaBinderAutoConfigurationPropertiesTest.KafkaBinderConfigProperties.class,
|
||||
KafkaBinderConfiguration.class })
|
||||
@SpringBootTest(classes = {KafkaBinderConfiguration.class })
|
||||
@TestPropertySource(locations = "classpath:binder-config-autoconfig.properties")
|
||||
public class KafkaBinderAutoConfigurationPropertiesTest {
|
||||
|
||||
@@ -124,13 +121,4 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
|
||||
bootstrapServers.add("10.98.09.196:9092");
|
||||
assertTrue(((List<String>) configs.get("bootstrap.servers")).containsAll(bootstrapServers));
|
||||
}
|
||||
|
||||
public static class KafkaBinderConfigProperties {
|
||||
|
||||
@Bean
|
||||
KafkaProperties kafkaProperties() {
|
||||
return new KafkaProperties();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,9 +20,9 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.search.Search;
|
||||
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
@@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
@@ -70,21 +71,21 @@ public class KafkaBinderMetricsTest {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
org.mockito.BDDMockito.given(consumerFactory.createConsumer()).willReturn(consumer);
|
||||
org.mockito.BDDMockito.given(binder.getTopicsInUse()).willReturn(topicsInUse);
|
||||
metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties, consumerFactory);
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(org.mockito.Matchers.anyCollectionOf(TopicPartition.class)))
|
||||
metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties, consumerFactory, null);
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(ArgumentMatchers.anyCollection()))
|
||||
.willReturn(java.util.Collections.singletonMap(new TopicPartition(TEST_TOPIC, 0), 1000L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldIndicateLag() {
|
||||
org.mockito.BDDMockito.given(consumer.committed(org.mockito.Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(group.gauge().value()).isEqualTo(500.0);
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
|
||||
.value(TimeUnit.MILLISECONDS)).isEqualTo(500.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -92,15 +93,15 @@ public class KafkaBinderMetricsTest {
|
||||
Map<TopicPartition, Long> endOffsets = new HashMap<>();
|
||||
endOffsets.put(new TopicPartition(TEST_TOPIC, 0), 1000L);
|
||||
endOffsets.put(new TopicPartition(TEST_TOPIC, 1), 1000L);
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(org.mockito.Matchers.anyCollectionOf(TopicPartition.class))).willReturn(endOffsets);
|
||||
org.mockito.BDDMockito.given(consumer.committed(org.mockito.Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(ArgumentMatchers.anyCollection())).willReturn(endOffsets);
|
||||
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0), new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(group.gauge().value()).isEqualTo(1000.0);
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
|
||||
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -110,8 +111,8 @@ public class KafkaBinderMetricsTest {
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(group.gauge().value()).isEqualTo(1000.0);
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
|
||||
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -100,6 +100,8 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
@@ -1095,6 +1097,11 @@ public class KafkaBinderTests extends
|
||||
"testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", "test", moduleInputChannel,
|
||||
consumerProperties);
|
||||
|
||||
|
||||
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding,
|
||||
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
|
||||
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.BATCH);
|
||||
|
||||
String testPayload1 = "foo" + UUID.randomUUID().toString();
|
||||
Message<?> message1 = org.springframework.integration.support.MessageBuilder.withPayload(
|
||||
testPayload1.getBytes()).build();
|
||||
@@ -1131,12 +1138,17 @@ public class KafkaBinderTests extends
|
||||
QueueChannel inbound1 = new QueueChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerProperties.getExtension().setAckEachRecord(true);
|
||||
Binding<MessageChannel> consumerBinding1 = binder.bindConsumer(testDestination, "test1", inbound1,
|
||||
consumerProperties);
|
||||
QueueChannel inbound2 = new QueueChannel();
|
||||
Binding<MessageChannel> consumerBinding2 = binder.bindConsumer(testDestination, "test2", inbound2,
|
||||
consumerProperties);
|
||||
|
||||
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding2,
|
||||
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
|
||||
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.RECORD);
|
||||
|
||||
Message<?> receivedMessage1 = receive(inbound1);
|
||||
assertThat(receivedMessage1).isNotNull();
|
||||
assertThat(new String((byte[]) receivedMessage1.getPayload(), StandardCharsets.UTF_8)).isEqualTo(testPayload);
|
||||
|
||||
@@ -17,10 +17,20 @@
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
@@ -28,9 +38,24 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.test.util.TestUtils;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.BDDMockito.willAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
@@ -76,4 +101,134 @@ public class KafkaBinderUnitTests {
|
||||
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithGroupManagementEarliest() throws Exception {
|
||||
testOffsetResetWithGroupManagement(true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithGroupManagementLatest() throws Throwable {
|
||||
testOffsetResetWithGroupManagement(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithManualAssignmentEarliest() throws Exception {
|
||||
testOffsetResetWithGroupManagement(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithGroupManualAssignmentLatest() throws Throwable {
|
||||
testOffsetResetWithGroupManagement(false, false);
|
||||
}
|
||||
|
||||
private void testOffsetResetWithGroupManagement(final boolean earliest, boolean groupManage) throws Exception {
|
||||
final List<TopicPartition> partitions = new ArrayList<>();
|
||||
partitions.add(new TopicPartition("foo", 0));
|
||||
partitions.add(new TopicPartition("foo", 1));
|
||||
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties();
|
||||
KafkaTopicProvisioner provisioningProvider = mock(KafkaTopicProvisioner.class);
|
||||
ConsumerDestination dest = mock(ConsumerDestination.class);
|
||||
given(dest.getName()).willReturn("foo");
|
||||
given(provisioningProvider.provisionConsumerDestination(anyString(), anyString(), any())).willReturn(dest);
|
||||
final AtomicInteger part = new AtomicInteger();
|
||||
willAnswer(i -> {
|
||||
return partitions.stream()
|
||||
.map(p -> new PartitionInfo("foo", part.getAndIncrement(), null, null, null))
|
||||
.collect(Collectors.toList());
|
||||
}).given(provisioningProvider).getPartitionsForTopic(anyInt(), anyBoolean(), any());
|
||||
@SuppressWarnings("unchecked")
|
||||
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
willAnswer(i -> {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
return new ConsumerRecords<>(Collections.emptyMap());
|
||||
}).given(consumer).poll(anyLong());
|
||||
willAnswer(i -> {
|
||||
((org.apache.kafka.clients.consumer.ConsumerRebalanceListener) i.getArgument(1))
|
||||
.onPartitionsAssigned(partitions);
|
||||
latch.countDown();
|
||||
latch.countDown();
|
||||
return null;
|
||||
}).given(consumer).subscribe(eq(Collections.singletonList("foo")),
|
||||
any(org.apache.kafka.clients.consumer.ConsumerRebalanceListener.class));
|
||||
willAnswer(i -> {
|
||||
latch.countDown();
|
||||
return null;
|
||||
}).given(consumer).seek(any(), anyLong());
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties, provisioningProvider) {
|
||||
|
||||
@Override
|
||||
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
|
||||
|
||||
return new ConsumerFactory<byte[], byte[]>() {
|
||||
|
||||
@Override
|
||||
public Consumer<byte[], byte[]> createConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Consumer<byte[], byte[]> createConsumer(String arg0) {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Consumer<byte[], byte[]> createConsumer(String arg0, String arg1) {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoCommit() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getConfigurationProperties() {
|
||||
return Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
|
||||
earliest ? "earliest" : "latest");
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
};
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
MessageChannel channel = new DirectChannel();
|
||||
KafkaConsumerProperties extension = new KafkaConsumerProperties();
|
||||
extension.setResetOffsets(true);
|
||||
extension.setAutoRebalanceEnabled(groupManage);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<KafkaConsumerProperties>(
|
||||
extension);
|
||||
consumerProperties.setInstanceCount(1);
|
||||
binder.bindConsumer("foo", "bar", channel, consumerProperties);
|
||||
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
if (groupManage) {
|
||||
if (earliest) {
|
||||
verify(consumer).seekToBeginning(partitions);
|
||||
}
|
||||
else {
|
||||
verify(consumer).seekToEnd(partitions);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (earliest) {
|
||||
verify(consumer).seek(partitions.get(0), 0L);
|
||||
verify(consumer).seek(partitions.get(1), 0L);
|
||||
}
|
||||
else {
|
||||
verify(consumer).seek(partitions.get(0), Long.MAX_VALUE);
|
||||
verify(consumer).seek(partitions.get(1), Long.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.integration;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.FilteredClassLoader;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.messaging.Sink;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Jon Schneider
|
||||
*
|
||||
* @since 2.0
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
|
||||
properties = "spring.cloud.stream.bindings.input.group=" + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP)
|
||||
public class KafkaBinderActuatorTests {
|
||||
|
||||
static final String TEST_CONSUMER_GROUP = "testGroup";
|
||||
|
||||
private static final String KAFKA_BROKERS_PROPERTY = "spring.kafka.bootstrap-servers";
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true);
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
System.setProperty(KAFKA_BROKERS_PROPERTY, kafkaEmbedded.getBrokersAsString());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void clean() {
|
||||
System.clearProperty(KAFKA_BROKERS_PROPERTY);
|
||||
}
|
||||
|
||||
@Autowired
|
||||
private MeterRegistry meterRegistry;
|
||||
|
||||
@Autowired
|
||||
private KafkaTemplate<?, byte[]> kafkaTemplate;
|
||||
|
||||
@Test
|
||||
public void testKafkaBinderMetricsExposed() {
|
||||
this.kafkaTemplate.send(Sink.INPUT, null, "foo".getBytes());
|
||||
this.kafkaTemplate.flush();
|
||||
|
||||
assertThat(this.meterRegistry.get("spring.cloud.stream.binder.kafka.offset")
|
||||
.tag("group", TEST_CONSUMER_GROUP)
|
||||
.tag("topic", Sink.INPUT)
|
||||
.timeGauge().value()).isGreaterThan(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKafkaBinderMetricsWhenNoMicrometer() {
|
||||
new ApplicationContextRunner()
|
||||
.withUserConfiguration(KafkaMetricsTestConfig.class)
|
||||
.withClassLoader(new FilteredClassLoader("io.micrometer.core"))
|
||||
.run(context -> {
|
||||
assertThat(context.getBeanNamesForType(MeterRegistry.class)).isEmpty();
|
||||
assertThat(context.getBeanNamesForType(MeterBinder.class)).isEmpty();
|
||||
});
|
||||
}
|
||||
|
||||
@EnableBinding(Sink.class)
|
||||
@EnableAutoConfiguration
|
||||
public static class KafkaMetricsTestConfig {
|
||||
|
||||
@StreamListener(Sink.INPUT)
|
||||
public void process(String payload) throws InterruptedException {
|
||||
// Artificial slow listener to emulate consumer lag
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user