Compare commits
16 Commits
v2.0.0.RC2
...
v2.0.0.RC3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8267ea81e | ||
|
|
2b595b004f | ||
|
|
de45edc962 | ||
|
|
2406fe5237 | ||
|
|
b5a0013e1e | ||
|
|
c814ad5595 | ||
|
|
def2c3d0ed | ||
|
|
10a44d1e44 | ||
|
|
0de078ca48 | ||
|
|
8035e25359 | ||
|
|
bcf15ed3be | ||
|
|
d37ef750ad | ||
|
|
d8baca5a66 | ||
|
|
b9d7f1f537 | ||
|
|
ad819ece92 | ||
|
|
e254968eaf |
4
pom.xml
4
pom.xml
@@ -2,7 +2,7 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC2</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
@@ -15,7 +15,7 @@
|
||||
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.0.3.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>1.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>2.0.0.RC2</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>2.0.0.RC3</spring-cloud-stream.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC2</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC2</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -0,0 +1,62 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Properties for configuring topics.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 2.0
|
||||
*
|
||||
*/
|
||||
public class KafkaAdminProperties {
|
||||
|
||||
private Short replicationFactor;
|
||||
|
||||
private Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
|
||||
|
||||
private Map<String, String> configuration = new HashMap<>();
|
||||
|
||||
public Short getReplicationFactor() {
|
||||
return this.replicationFactor;
|
||||
}
|
||||
|
||||
public void setReplicationFactor(Short replicationFactor) {
|
||||
this.replicationFactor = replicationFactor;
|
||||
}
|
||||
|
||||
public Map<Integer, List<Integer>> getReplicasAssignments() {
|
||||
return this.replicasAssignments;
|
||||
}
|
||||
|
||||
public void setReplicasAssignments(Map<Integer, List<Integer>> replicasAssignments) {
|
||||
this.replicasAssignments = replicasAssignments;
|
||||
}
|
||||
|
||||
public Map<String, String> getConfiguration() {
|
||||
return this.configuration;
|
||||
}
|
||||
|
||||
public void setConfiguration(Map<String, String> configuration) {
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015-2017 the original author or authors.
|
||||
* Copyright 2015-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@@ -45,7 +46,7 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private final Transaction transaction = new Transaction();
|
||||
|
||||
@Autowired(required = false)
|
||||
@Autowired
|
||||
private KafkaProperties kafkaProperties;
|
||||
|
||||
private String[] zkNodes = new String[] { "localhost" };
|
||||
@@ -86,7 +87,7 @@ public class KafkaBinderConfigurationProperties {
|
||||
|
||||
private String requiredAcks = "1";
|
||||
|
||||
private int replicationFactor = 1;
|
||||
private short replicationFactor = 1;
|
||||
|
||||
private int fetchSize = 1024 * 1024;
|
||||
|
||||
@@ -110,6 +111,13 @@ public class KafkaBinderConfigurationProperties {
|
||||
return this.transaction;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the connection String
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
@Deprecated
|
||||
public String getZkConnectionString() {
|
||||
return toConnectionString(this.zkNodes, this.defaultZkPort);
|
||||
}
|
||||
@@ -126,26 +134,68 @@ public class KafkaBinderConfigurationProperties {
|
||||
return this.headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the window.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getOffsetUpdateTimeWindow() {
|
||||
return this.offsetUpdateTimeWindow;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the count.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getOffsetUpdateCount() {
|
||||
return this.offsetUpdateCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the timeout.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getOffsetUpdateShutdownTimeout() {
|
||||
return this.offsetUpdateShutdownTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper nodes.
|
||||
* @return the nodes.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public String[] getZkNodes() {
|
||||
return this.zkNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper nodes.
|
||||
* @param zkNodes the nodes.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public void setZkNodes(String... zkNodes) {
|
||||
this.zkNodes = zkNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper port.
|
||||
* @param defaultZkPort the port.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public void setDefaultZkPort(String defaultZkPort) {
|
||||
this.defaultZkPort = defaultZkPort;
|
||||
}
|
||||
@@ -166,30 +216,79 @@ public class KafkaBinderConfigurationProperties {
|
||||
this.headers = headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param offsetUpdateTimeWindow the window.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) {
|
||||
this.offsetUpdateTimeWindow = offsetUpdateTimeWindow;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param offsetUpdateCount the count.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setOffsetUpdateCount(int offsetUpdateCount) {
|
||||
this.offsetUpdateCount = offsetUpdateCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param offsetUpdateShutdownTimeout the timeout.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) {
|
||||
this.offsetUpdateShutdownTimeout = offsetUpdateShutdownTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper session timeout.
|
||||
* @return the timeout.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public int getZkSessionTimeout() {
|
||||
return this.zkSessionTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper session timeout.
|
||||
* @param zkSessionTimeout the timout
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public void setZkSessionTimeout(int zkSessionTimeout) {
|
||||
this.zkSessionTimeout = zkSessionTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper connection timeout.
|
||||
* @return the timout.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public int getZkConnectionTimeout() {
|
||||
return this.zkConnectionTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* Zookeeper connection timeout.
|
||||
* @param zkConnectionTimeout the timeout.
|
||||
* @deprecated connection to zookeeper is no longer necessary
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
|
||||
public void setZkConnectionTimeout(int zkConnectionTimeout) {
|
||||
this.zkConnectionTimeout = zkConnectionTimeout;
|
||||
}
|
||||
@@ -212,10 +311,24 @@ public class KafkaBinderConfigurationProperties {
|
||||
return StringUtils.arrayToCommaDelimitedString(fullyFormattedHosts);
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the wait.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getMaxWait() {
|
||||
return this.maxWait;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer user.
|
||||
* @param maxWait the wait.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setMaxWait(int maxWait) {
|
||||
this.maxWait = maxWait;
|
||||
}
|
||||
@@ -232,18 +345,32 @@ public class KafkaBinderConfigurationProperties {
|
||||
this.requiredAcks = requiredAcks;
|
||||
}
|
||||
|
||||
public int getReplicationFactor() {
|
||||
public short getReplicationFactor() {
|
||||
return this.replicationFactor;
|
||||
}
|
||||
|
||||
public void setReplicationFactor(int replicationFactor) {
|
||||
public void setReplicationFactor(short replicationFactor) {
|
||||
this.replicationFactor = replicationFactor;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getFetchSize() {
|
||||
return this.fetchSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param fetchSize the size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setFetchSize(int fetchSize) {
|
||||
this.fetchSize = fetchSize;
|
||||
}
|
||||
@@ -264,10 +391,24 @@ public class KafkaBinderConfigurationProperties {
|
||||
this.healthTimeout = healthTimeout;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the queue size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public int getQueueSize() {
|
||||
return this.queueSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param queueSize the queue size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
|
||||
public void setQueueSize(int queueSize) {
|
||||
this.queueSize = queueSize;
|
||||
}
|
||||
@@ -288,10 +429,26 @@ public class KafkaBinderConfigurationProperties {
|
||||
this.autoAddPartitions = autoAddPartitions;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used; set properties such as this via {@link #getConfiguration()
|
||||
* configuration}.
|
||||
* @return the size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0, set properties such as this via 'configuration'")
|
||||
public int getSocketBufferSize() {
|
||||
return this.socketBufferSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used; set properties such as this via {@link #getConfiguration()
|
||||
* configuration}.
|
||||
* @param socketBufferSize the size.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
@DeprecatedConfigurationProperty(reason = "Not used since 2.0, set properties such as this via 'configuration'")
|
||||
public void setSocketBufferSize(int socketBufferSize) {
|
||||
this.socketBufferSize = socketBufferSize;
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
* Copyright 2016-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -53,6 +53,8 @@ public class KafkaConsumerProperties {
|
||||
both
|
||||
}
|
||||
|
||||
private boolean ackEachRecord;
|
||||
|
||||
private boolean autoRebalanceEnabled = true;
|
||||
|
||||
private boolean autoCommitOffset = true;
|
||||
@@ -61,6 +63,8 @@ public class KafkaConsumerProperties {
|
||||
|
||||
private StartOffset startOffset;
|
||||
|
||||
private boolean resetOffsets;
|
||||
|
||||
private boolean enableDlq;
|
||||
|
||||
private String dlqName;
|
||||
@@ -79,6 +83,16 @@ public class KafkaConsumerProperties {
|
||||
|
||||
private Map<String, String> configuration = new HashMap<>();
|
||||
|
||||
private KafkaAdminProperties admin = new KafkaAdminProperties();
|
||||
|
||||
public boolean isAckEachRecord() {
|
||||
return this.ackEachRecord;
|
||||
}
|
||||
|
||||
public void setAckEachRecord(boolean ackEachRecord) {
|
||||
this.ackEachRecord = ackEachRecord;
|
||||
}
|
||||
|
||||
public boolean isAutoCommitOffset() {
|
||||
return this.autoCommitOffset;
|
||||
}
|
||||
@@ -95,6 +109,14 @@ public class KafkaConsumerProperties {
|
||||
this.startOffset = startOffset;
|
||||
}
|
||||
|
||||
public boolean isResetOffsets() {
|
||||
return this.resetOffsets;
|
||||
}
|
||||
|
||||
public void setResetOffsets(boolean resetOffsets) {
|
||||
this.resetOffsets = resetOffsets;
|
||||
}
|
||||
|
||||
public boolean isEnableDlq() {
|
||||
return this.enableDlq;
|
||||
}
|
||||
@@ -111,10 +133,22 @@ public class KafkaConsumerProperties {
|
||||
this.autoCommitOnError = autoCommitOnError;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @return the interval.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public int getRecoveryInterval() {
|
||||
return this.recoveryInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
* No longer used.
|
||||
* @param recoveryInterval the interval.
|
||||
* @deprecated
|
||||
*/
|
||||
@Deprecated
|
||||
public void setRecoveryInterval(int recoveryInterval) {
|
||||
this.recoveryInterval = recoveryInterval;
|
||||
}
|
||||
@@ -182,4 +216,12 @@ public class KafkaConsumerProperties {
|
||||
this.idleEventInterval = idleEventInterval;
|
||||
}
|
||||
|
||||
public KafkaAdminProperties getAdmin() {
|
||||
return this.admin;
|
||||
}
|
||||
|
||||
public void setAdmin(KafkaAdminProperties admin) {
|
||||
this.admin = admin;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -44,6 +44,8 @@ public class KafkaProducerProperties {
|
||||
|
||||
private Map<String, String> configuration = new HashMap<>();
|
||||
|
||||
private KafkaAdminProperties admin = new KafkaAdminProperties();
|
||||
|
||||
public int getBufferSize() {
|
||||
return this.bufferSize;
|
||||
}
|
||||
@@ -101,6 +103,15 @@ public class KafkaProducerProperties {
|
||||
this.configuration = configuration;
|
||||
}
|
||||
|
||||
public KafkaAdminProperties getAdmin() {
|
||||
return this.admin;
|
||||
}
|
||||
|
||||
public void setAdmin(KafkaAdminProperties admin) {
|
||||
this.admin = admin;
|
||||
}
|
||||
|
||||
|
||||
public enum CompressionType {
|
||||
none,
|
||||
gzip,
|
||||
|
||||
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.provisioning;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
@@ -44,6 +45,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.cloud.stream.binder.BinderException;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaAdminProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
@@ -67,6 +69,7 @@ import org.springframework.util.StringUtils;
|
||||
* @author Gary Russell
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Simon Flandergan
|
||||
* @author Oleg Zhurakousky
|
||||
*/
|
||||
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>,
|
||||
ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
|
||||
@@ -77,19 +80,18 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
|
||||
private final KafkaBinderConfigurationProperties configurationProperties;
|
||||
|
||||
private final AdminClient adminClient;
|
||||
private final int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
|
||||
|
||||
private final Map<String, Object> adminClientProperties;
|
||||
|
||||
private RetryOperations metadataRetryOperations;
|
||||
|
||||
private final int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
|
||||
|
||||
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
|
||||
KafkaProperties kafkaProperties) {
|
||||
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
|
||||
Map<String, Object> adminClientProperties = kafkaProperties.buildAdminProperties();
|
||||
this.adminClientProperties = kafkaProperties.buildAdminProperties();
|
||||
this.configurationProperties = kafkaBinderConfigurationProperties;
|
||||
normalalizeBootPropsWithBinder(adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
|
||||
this.adminClient = AdminClient.create(adminClientProperties);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -118,33 +120,38 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProducerDestination provisionProducerDestination(final String name, ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
public ProducerDestination provisionProducerDestination(final String name,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("Using kafka topic for outbound: " + name);
|
||||
}
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
createTopic(name, properties.getPartitionCount(), false);
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
|
||||
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
|
||||
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
|
||||
try (AdminClient adminClient = AdminClient.create(this.adminClientProperties)) {
|
||||
createTopic(adminClient, name, properties.getPartitionCount(), false, properties.getExtension().getAdmin());
|
||||
int partitions = 0;
|
||||
if (this.configurationProperties.isAutoCreateTopics()) {
|
||||
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
|
||||
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
|
||||
|
||||
try {
|
||||
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
|
||||
Map<String, TopicDescription> topicDescriptions = null;
|
||||
try {
|
||||
topicDescriptions = all.get(this.operationTimeout, TimeUnit.SECONDS);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ProvisioningException("Problems encountered with partitions finding", e);
|
||||
}
|
||||
TopicDescription topicDescription = topicDescriptions.get(name);
|
||||
int partitions = topicDescription.partitions().size();
|
||||
return new KafkaProducerDestination(name, partitions);
|
||||
partitions = topicDescription.partitions().size();
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ProvisioningException("Problems encountered with partitions finding", e);
|
||||
}
|
||||
}
|
||||
else {
|
||||
return new KafkaProducerDestination(name);
|
||||
return new KafkaProducerDestination(name, partitions);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConsumerDestination provisionConsumerDestination(final String name, final String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
public ConsumerDestination provisionConsumerDestination(final String name, final String group,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
boolean anonymous = !StringUtils.hasText(group);
|
||||
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
|
||||
@@ -153,25 +160,32 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
throw new IllegalArgumentException("Instance count cannot be zero");
|
||||
}
|
||||
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
|
||||
createTopic(name, partitionCount, properties.getExtension().isAutoRebalanceEnabled());
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
|
||||
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
|
||||
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
|
||||
try {
|
||||
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
|
||||
TopicDescription topicDescription = topicDescriptions.get(name);
|
||||
int partitions = topicDescription.partitions().size();
|
||||
ConsumerDestination dlqTopic = createDlqIfNeedBe(name, group, properties, anonymous, partitions);
|
||||
if (dlqTopic != null) {
|
||||
return dlqTopic;
|
||||
ConsumerDestination consumerDestination = new KafkaConsumerDestination(name);
|
||||
try (AdminClient adminClient = createAdminClient()) {
|
||||
createTopic(adminClient, name, partitionCount, properties.getExtension().isAutoRebalanceEnabled(),
|
||||
properties.getExtension().getAdmin());
|
||||
if (this.configurationProperties.isAutoCreateTopics()) {
|
||||
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
|
||||
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
|
||||
try {
|
||||
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
|
||||
TopicDescription topicDescription = topicDescriptions.get(name);
|
||||
int partitions = topicDescription.partitions().size();
|
||||
consumerDestination = createDlqIfNeedBe(adminClient, name, group, properties, anonymous, partitions);
|
||||
if (consumerDestination == null) {
|
||||
consumerDestination = new KafkaConsumerDestination(name, partitions);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ProvisioningException("provisioning exception", e);
|
||||
}
|
||||
return new KafkaConsumerDestination(name, partitions);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new ProvisioningException("provisioning exception", e);
|
||||
}
|
||||
}
|
||||
return new KafkaConsumerDestination(name);
|
||||
return consumerDestination;
|
||||
}
|
||||
|
||||
AdminClient createAdminClient() {
|
||||
return AdminClient.create(this.adminClientProperties);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -209,14 +223,15 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
});
|
||||
}
|
||||
|
||||
private ConsumerDestination createDlqIfNeedBe(String name, String group,
|
||||
private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String name, String group,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties,
|
||||
boolean anonymous, int partitions) {
|
||||
if (properties.getExtension().isEnableDlq() && !anonymous) {
|
||||
String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName()) ?
|
||||
properties.getExtension().getDlqName() : "error." + name + "." + group;
|
||||
try {
|
||||
createTopicAndPartitions(dlqTopic, partitions, properties.getExtension().isAutoRebalanceEnabled());
|
||||
createTopicAndPartitions(adminClient, dlqTopic, partitions,
|
||||
properties.getExtension().isAutoRebalanceEnabled(), properties.getExtension().getAdmin());
|
||||
}
|
||||
catch (Throwable throwable) {
|
||||
if (throwable instanceof Error) {
|
||||
@@ -231,9 +246,10 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
return null;
|
||||
}
|
||||
|
||||
private void createTopic(String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker) {
|
||||
private void createTopic(AdminClient adminClient, String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker,
|
||||
KafkaAdminProperties properties) {
|
||||
try {
|
||||
createTopicIfNecessary(name, partitionCount, tolerateLowerPartitionsOnBroker);
|
||||
createTopicIfNecessary(adminClient, name, partitionCount, tolerateLowerPartitionsOnBroker, properties);
|
||||
}
|
||||
catch (Throwable throwable) {
|
||||
if (throwable instanceof Error) {
|
||||
@@ -245,16 +261,14 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
}
|
||||
}
|
||||
|
||||
private void createTopicIfNecessary(final String topicName, final int partitionCount,
|
||||
boolean tolerateLowerPartitionsOnBroker) throws Throwable {
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
|
||||
createTopicAndPartitions(topicName, partitionCount, tolerateLowerPartitionsOnBroker);
|
||||
private void createTopicIfNecessary(AdminClient adminClient, final String topicName, final int partitionCount,
|
||||
boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties properties) throws Throwable {
|
||||
|
||||
if (this.configurationProperties.isAutoCreateTopics()) {
|
||||
createTopicAndPartitions(adminClient, topicName, partitionCount, tolerateLowerPartitionsOnBroker,
|
||||
properties);
|
||||
}
|
||||
else if (this.configurationProperties.isAutoCreateTopics() && adminClient == null) {
|
||||
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
|
||||
"No topic will be created by the binder");
|
||||
}
|
||||
else if (!this.configurationProperties.isAutoCreateTopics()) {
|
||||
else {
|
||||
this.logger.info("Auto creation of topics is disabled.");
|
||||
}
|
||||
}
|
||||
@@ -262,9 +276,12 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
/**
|
||||
* Creates a Kafka topic if needed, or try to increase its partition count to the
|
||||
* desired number.
|
||||
* @param adminClient
|
||||
* @param adminProperties
|
||||
*/
|
||||
private void createTopicAndPartitions(final String topicName, final int partitionCount,
|
||||
boolean tolerateLowerPartitionsOnBroker) throws Throwable {
|
||||
private void createTopicAndPartitions(AdminClient adminClient, final String topicName, final int partitionCount,
|
||||
boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties adminProperties) throws Throwable {
|
||||
|
||||
ListTopicsResult listTopicsResult = adminClient.listTopics();
|
||||
KafkaFuture<Set<String>> namesFutures = listTopicsResult.names();
|
||||
|
||||
@@ -298,14 +315,26 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (!names.contains(topicName)) {
|
||||
else {
|
||||
// always consider minPartitionCount for topic creation
|
||||
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
|
||||
partitionCount);
|
||||
this.metadataRetryOperations.execute(context -> {
|
||||
|
||||
NewTopic newTopic = new NewTopic(topicName, effectivePartitionCount,
|
||||
(short) configurationProperties.getReplicationFactor());
|
||||
NewTopic newTopic;
|
||||
Map<Integer, List<Integer>> replicasAssignments = adminProperties.getReplicasAssignments();
|
||||
if (replicasAssignments != null && replicasAssignments.size() > 0) {
|
||||
newTopic = new NewTopic(topicName, adminProperties.getReplicasAssignments());
|
||||
}
|
||||
else {
|
||||
newTopic = new NewTopic(topicName, effectivePartitionCount,
|
||||
adminProperties.getReplicationFactor() != null
|
||||
? adminProperties.getReplicationFactor()
|
||||
: configurationProperties.getReplicationFactor());
|
||||
}
|
||||
if (adminProperties.getConfiguration().size() > 0) {
|
||||
newTopic.configs(adminProperties.getConfiguration());
|
||||
}
|
||||
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
|
||||
try {
|
||||
createTopicsResult.all().get(operationTimeout, TimeUnit.SECONDS);
|
||||
@@ -318,6 +347,10 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
|
||||
}
|
||||
}
|
||||
else {
|
||||
logger.error("Failed to create topics", e.getCause());
|
||||
throw e.getCause();
|
||||
}
|
||||
}
|
||||
else {
|
||||
logger.error("Failed to create topics", e.getCause());
|
||||
@@ -365,10 +398,6 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
|
||||
private final int partitions;
|
||||
|
||||
KafkaProducerDestination(String destinationName) {
|
||||
this(destinationName, 0);
|
||||
}
|
||||
|
||||
KafkaProducerDestination(String destinationName, Integer partitions) {
|
||||
this.producerDestinationName = destinationName;
|
||||
this.partitions = partitions;
|
||||
@@ -420,10 +449,6 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
|
||||
return this.consumerDestinationName;
|
||||
}
|
||||
|
||||
public String getDlqName() {
|
||||
return dlqName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "KafkaConsumerDestination{" +
|
||||
|
||||
@@ -55,10 +55,11 @@ public class KafkaTopicProvisionerTests {
|
||||
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ts.getFile().getAbsolutePath());
|
||||
binderConfig.setBrokers("localhost:9092");
|
||||
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, bootConfig);
|
||||
AdminClient adminClient = KafkaTestUtils.getPropertyValue(provisioner, "adminClient", AdminClient.class);
|
||||
AdminClient adminClient = provisioner.createAdminClient();
|
||||
assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
|
||||
Map configs = KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder.configs", Map.class);
|
||||
assertThat(((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0)).isEqualTo("localhost:1234");
|
||||
adminClient.close();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@@ -73,13 +74,13 @@ public class KafkaTopicProvisionerTests {
|
||||
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ts.getFile().getAbsolutePath());
|
||||
binderConfig.setBrokers("localhost:1234");
|
||||
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, bootConfig);
|
||||
AdminClient adminClient = KafkaTestUtils.getPropertyValue(provisioner, "adminClient", AdminClient.class);
|
||||
AdminClient adminClient = provisioner.createAdminClient();
|
||||
assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
|
||||
Map configs = KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder.configs", Map.class);
|
||||
assertThat(((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0)).isEqualTo("localhost:1234");
|
||||
adminClient.close();
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void brokersInvalid() throws Exception {
|
||||
KafkaProperties bootConfig = new KafkaProperties();
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC2</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
|
||||
@@ -7,7 +7,7 @@ In addition, this guide also explains the Kafka Streams binding capabilities of
|
||||
|
||||
== Usage
|
||||
|
||||
For using the Apache Kafka binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
|
||||
To use Apache Kafka binder all you need is to add `spring-cloud-stream-binder-kafka` as a dependency to your Spring Cloud Stream application. Below is a Maven example:
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
@@ -38,6 +38,11 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
|
||||
The consumer group maps directly to the same Apache Kafka concept.
|
||||
Partitioning also maps directly to Apache Kafka partitions as well.
|
||||
|
||||
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker at least that version.
|
||||
This client can communicate with older brokers (refer to the Kafka documentation), but certain features may not be available.
|
||||
For example, with versions earlier than 0.11.x.x, native headers are not supported.
|
||||
Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
||||
|
||||
== Configuration Options
|
||||
|
||||
This section contains the configuration options used by the Apache Kafka binder.
|
||||
@@ -55,42 +60,24 @@ spring.cloud.stream.kafka.binder.defaultBrokerPort::
|
||||
This sets the default port when no port is configured in the broker list.
|
||||
+
|
||||
Default: `9092`.
|
||||
spring.cloud.stream.kafka.binder.zkNodes::
|
||||
A list of ZooKeeper nodes to which the Kafka binder can connect.
|
||||
+
|
||||
Default: `localhost`.
|
||||
spring.cloud.stream.kafka.binder.defaultZkPort::
|
||||
`zkNodes` allows hosts specified with or without port information (e.g., `host1,host2:port2`).
|
||||
This sets the default port when no port is configured in the node list.
|
||||
+
|
||||
Default: `2181`.
|
||||
spring.cloud.stream.kafka.binder.configuration::
|
||||
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder.
|
||||
Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, especially security settings.
|
||||
Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, for example, security settings.
|
||||
+
|
||||
Default: Empty map.
|
||||
spring.cloud.stream.kafka.binder.headers::
|
||||
The list of custom headers that will be transported by the binder.
|
||||
Only required when communicating with older applications (<= 1.3.x) with a `kafka-clients` version < 0.11.0.0; newer versions support headers natively.
|
||||
+
|
||||
Default: empty.
|
||||
spring.cloud.stream.kafka.binder.healthTimeout::
|
||||
The time to wait to get partition information in seconds; default 60.
|
||||
Health will report as down if this timer expires.
|
||||
Health will report as down if this timer expires.
|
||||
+
|
||||
Default: 10.
|
||||
spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow::
|
||||
The frequency, in milliseconds, with which offsets are saved.
|
||||
Ignored if `0`.
|
||||
+
|
||||
Default: `10000`.
|
||||
spring.cloud.stream.kafka.binder.offsetUpdateCount::
|
||||
The frequency, in number of updates, which which consumed offsets are persisted.
|
||||
Ignored if `0`.
|
||||
Mutually exclusive with `offsetUpdateTimeWindow`.
|
||||
+
|
||||
Default: `0`.
|
||||
spring.cloud.stream.kafka.binder.requiredAcks::
|
||||
The number of required acks on the broker.
|
||||
See the Kafka documentation for the producer `acks` property.
|
||||
+
|
||||
Default: `1`.
|
||||
spring.cloud.stream.kafka.binder.minPartitionCount::
|
||||
@@ -101,6 +88,7 @@ It can be superseded by the `partitionCount` setting of the producer or by the v
|
||||
Default: `1`.
|
||||
spring.cloud.stream.kafka.binder.replicationFactor::
|
||||
The replication factor of auto-created topics if `autoCreateTopics` is active.
|
||||
Can be overriden on each binding.
|
||||
+
|
||||
Default: `1`.
|
||||
spring.cloud.stream.kafka.binder.autoCreateTopics::
|
||||
@@ -116,10 +104,6 @@ If set to `false`, the binder will rely on the partition size of the topic being
|
||||
If the partition count of the target topic is smaller than the expected value, the binder will fail to start.
|
||||
+
|
||||
Default: `false`.
|
||||
spring.cloud.stream.kafka.binder.socketBufferSize::
|
||||
Size (in bytes) of the socket buffer to be used by the Kafka consumers.
|
||||
+
|
||||
Default: `2097152`.
|
||||
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix::
|
||||
Enable transactions in the binder; see `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
|
||||
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.
|
||||
@@ -131,12 +115,37 @@ spring.cloud.stream.kafka.binder.transaction.producer.*::
|
||||
+
|
||||
Default: See individual producer properties.
|
||||
|
||||
spring.cloud.stream.kafka.binder.headerMapperBeanName::
|
||||
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to/from Kafka headers.
|
||||
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper`, which uses JSON deserialization for the headers.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
[[kafka-consumer-properties]]
|
||||
=== Kafka Consumer Properties
|
||||
|
||||
The following properties are available for Kafka consumers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
|
||||
|
||||
admin.configuration::
|
||||
A `Map` of Kafka topic properties used when provisioning topics.
|
||||
e.g. `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
|
||||
+
|
||||
Default: none.
|
||||
|
||||
admin.replicas-assignment::
|
||||
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and value the assignments.
|
||||
Used when provisioning new topics.
|
||||
See `NewTopic` javadocs in the `kafka-clients` jar.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
admin.replication-factor::
|
||||
The replication factor to use when provisioning topics; overrides the binder-wide setting.
|
||||
Ignored if `replicas-assignments` is present.
|
||||
+
|
||||
Default: none (the binder-wide default of 1 is used).
|
||||
|
||||
autoRebalanceEnabled::
|
||||
When `true`, topic partitions will be automatically rebalanced between the members of a consumer group.
|
||||
When `false`, each consumer will be assigned a fixed set of partitions based on `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex`.
|
||||
@@ -144,12 +153,21 @@ This requires both `spring.cloud.stream.instanceCount` and `spring.cloud.stream.
|
||||
The property `spring.cloud.stream.instanceCount` must typically be greater than 1 in this case.
|
||||
+
|
||||
Default: `true`.
|
||||
ackEachRecord::
|
||||
When `autoCommitOffset` is `true`, whether to commit the offset after each record is processed.
|
||||
By default, offsets are committed after all records in the batch of records returned by `consumer.poll()` have been processed.
|
||||
The number of records returned by a poll can be controlled with the `max.poll.recods` Kafka property, set via the consumer `configuration` property.
|
||||
Setting this to true may cause a degradation in performance, but reduces the likelihood of redelivered records when a failure occurs.
|
||||
Also see the binder `requiredAcks` property, which also affects the performance of committing offsets.
|
||||
+
|
||||
Default: `false`.
|
||||
autoCommitOffset::
|
||||
Whether to autocommit offsets when a message has been processed.
|
||||
If set to `false`, a header with the key `kafka_acknowledgment` of the type `org.springframework.kafka.support.Acknowledgment` header will be present in the inbound message.
|
||||
Applications may use this header for acknowledging messages.
|
||||
See the examples section for details.
|
||||
When this property is set to `false`, Kafka binder will set the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL`.
|
||||
When this property is set to `false`, Kafka binder will set the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL` and the application is responsible for acknowledging records.
|
||||
Also see `ackEachRecord`.
|
||||
+
|
||||
Default: `true`.
|
||||
autoCommitOnError::
|
||||
@@ -159,14 +177,15 @@ If set to `true`, it will always auto-commit (if auto-commit is enabled).
|
||||
If not set (default), it effectively has the same value as `enableDlq`, auto-committing erroneous messages if they are sent to a DLQ, and not committing them otherwise.
|
||||
+
|
||||
Default: not set.
|
||||
recoveryInterval::
|
||||
The interval between connection recovery attempts, in milliseconds.
|
||||
resetOffsets::
|
||||
Whether to reset offsets on the consumer to the value provided by startOffset.
|
||||
+
|
||||
Default: `5000`.
|
||||
Default: `false`.
|
||||
startOffset::
|
||||
The starting offset for new groups.
|
||||
Allowed values: `earliest`, `latest`.
|
||||
If the consumer group is set explicitly for the consumer 'binding' (via `spring.cloud.stream.bindings.<channelName>.group`), then 'startOffset' is set to `earliest`; otherwise it is set to `latest` for the `anonymous` consumer group.
|
||||
Also see `resetOffsets`.
|
||||
+
|
||||
Default: null (equivalent to `earliest`).
|
||||
enableDlq::
|
||||
@@ -214,6 +233,25 @@ Default: `30000`
|
||||
The following properties are available for Kafka producers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
|
||||
|
||||
admin.configuration::
|
||||
A `Map` of Kafka topic properties used when provisioning new topics.
|
||||
e.g. `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
|
||||
+
|
||||
Default: none.
|
||||
|
||||
admin.replicas-assignment::
|
||||
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and value the assignments.
|
||||
Used when provisioning new topics.
|
||||
See `NewTopic` javadocs in the `kafka-clients` jar.
|
||||
+
|
||||
Default: none.
|
||||
|
||||
admin.replication-factor::
|
||||
The replication factor to use when provisioning new topics; overrides the binder-wide setting.
|
||||
Ignored if `replicas-assignments` is present.
|
||||
+
|
||||
Default: none (the binder-wide default of 1 is used).
|
||||
|
||||
bufferSize::
|
||||
Upper limit, in bytes, of how much data the Kafka producer will attempt to batch before sending.
|
||||
+
|
||||
@@ -229,15 +267,15 @@ batchTimeout::
|
||||
Default: `0`.
|
||||
messageKeyExpression::
|
||||
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message.
|
||||
For example `headers.key` or `payload.myKey`.
|
||||
For example `headers['myKey']`; the payload cannot be used because by the time this expression is evaluated, the payload is already in the form of a `byte[]`.
|
||||
+
|
||||
Default: `none`.
|
||||
headerPatterns::
|
||||
A comma-delimited list of simple patterns to match spring-messaging headers to be mapped to the kafka `Headers` in the `ProducerRecord`.
|
||||
Patterns can begin or end with the wildcard character (asterisk).
|
||||
Patterns can be negated by prefixing with `!`; matching stops after the first match (positive or negative).
|
||||
For example `!foo,fo*` will pass `fox` but not `foo`.
|
||||
`id` and `timestamp` are never mapped.
|
||||
Patterns can begin or end with the wildcard character (asterisk).
|
||||
Patterns can be negated by prefixing with `!`; matching stops after the first match (positive or negative).
|
||||
For example `!foo,fo*` will pass `fox` but not `foo`.
|
||||
`id` and `timestamp` are never mapped.
|
||||
+
|
||||
Default: `*` (all headers - except the `id` and `timestamp`)
|
||||
configuration::
|
||||
@@ -314,7 +352,6 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
|
||||
----
|
||||
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
|
||||
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
|
||||
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
|
||||
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
|
||||
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
|
||||
----
|
||||
@@ -343,7 +380,6 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
|
||||
[source]
|
||||
----
|
||||
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
|
||||
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
|
||||
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
|
||||
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
|
||||
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
|
||||
@@ -366,7 +402,7 @@ KafkaClient {
|
||||
};
|
||||
----
|
||||
|
||||
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent. As an alternative to setting `spring.cloud.stream.kafka.binder.autoCreateTopics` you can simply remove the broker dependency from the application. See <<exclude-admin-utils>> for details.
|
||||
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent.
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
@@ -421,103 +457,6 @@ public class Application {
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
==== Using the binder with Apache Kafka 0.10
|
||||
|
||||
The default Kafka support in Spring Cloud Stream Kafka binder is for Kafka version 0.10.1.1. The binder also supports connecting to other 0.10 based versions and 0.9 clients.
|
||||
In order to do this, when you create the project that contains your application, include `spring-cloud-starter-stream-kafka` as you normally would do for the default binder.
|
||||
Then add these dependencies at the top of the `<dependencies>` section in the pom.xml file to override the dependencies.
|
||||
|
||||
Here is an example for downgrading your application to 0.10.0.1. Since it is still on the 0.10 line, the default `spring-kafka` and `spring-integration-kafka` versions can be retained.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>0.10.0.1</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>0.10.0.1</version>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
Here is another example of using 0.9.0.1 version.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<version>1.0.5.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.integration</groupId>
|
||||
<artifactId>spring-integration-kafka</artifactId>
|
||||
<version>2.0.1.RELEASE</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
<version>0.9.0.1</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>0.9.0.1</version>
|
||||
</dependency>
|
||||
|
||||
----
|
||||
|
||||
[NOTE]
|
||||
====
|
||||
The versions above are provided only for the sake of the example.
|
||||
For best results, we recommend using the most recent 0.10-compatible versions of the projects.
|
||||
====
|
||||
|
||||
[[exclude-admin-utils]]
|
||||
==== Excluding Kafka broker jar from the classpath of the binder based application
|
||||
|
||||
The Apache Kafka Binder uses the administrative utilities which are part of the Apache Kafka server library to create and reconfigure topics.
|
||||
If the inclusion of the Apache Kafka server library and its dependencies is not necessary at runtime because the application will rely on the topics being configured administratively, the Kafka binder allows for Apache Kafka server dependency to be excluded from the application.
|
||||
|
||||
If you use non default versions for Kafka dependencies as advised above, all you have to do is not to include the kafka broker dependency.
|
||||
If you use the default Kafka version, then ensure that you exclude the kafka broker jar from the `spring-cloud-starter-stream-kafka` dependency as following.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
If you exclude the Apache Kafka server dependency and the topic is not present on the server, then the Apache Kafka broker will create the topic if auto topic creation is enabled on the server.
|
||||
Please keep in mind that if you are relying on this, then the Kafka server will use the default number of partitions and replication factors.
|
||||
On the other hand, if auto topic creation is disabled on the server, then care must be taken before running the application to create the topic with the desired number of partitions.
|
||||
|
||||
If you want to have full control over how partitions are allocated, then leave the default settings as they are, i.e. do not exclude the kafka broker jar and ensure that `spring.cloud.stream.kafka.binder.autoCreateTopics` is set to `true`, which is the default.
|
||||
|
||||
[[kafka-error-channels]]
|
||||
== Error Channels
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC2</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RC2</version>
|
||||
<version>2.0.0.RC3</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -20,10 +20,11 @@ import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.TimeGauge;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
@@ -48,12 +49,13 @@ import org.springframework.util.ObjectUtils;
|
||||
* @author Soby Chacko
|
||||
* @author Artem Bilan
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Jon Schneider
|
||||
*/
|
||||
public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
|
||||
|
||||
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
|
||||
|
||||
static final String METRIC_PREFIX = "spring.cloud.stream.binder.kafka";
|
||||
static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
|
||||
|
||||
private final KafkaMessageChannelBinder binder;
|
||||
|
||||
@@ -91,8 +93,12 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
|
||||
String topic = topicInfo.getKey();
|
||||
String group = topicInfo.getValue().getConsumerGroup();
|
||||
|
||||
registry.gauge(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), this,
|
||||
o -> calculateConsumerLagOnTopic(topic, group));
|
||||
TimeGauge.builder(METRIC_NAME, this, TimeUnit.MILLISECONDS,
|
||||
o -> calculateConsumerLagOnTopic(topic, group))
|
||||
.tag("group", group)
|
||||
.tag("topic", topic)
|
||||
.description("Consumer lag for a particular group and topic")
|
||||
.register(registry);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,6 +110,7 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
|
||||
}
|
||||
|
||||
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
|
||||
|
||||
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
|
||||
|
||||
@@ -28,7 +28,9 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
@@ -80,7 +82,9 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaderMapper;
|
||||
@@ -325,7 +329,8 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
Collection<PartitionInfo> listenedPartitions;
|
||||
|
||||
if (extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ||
|
||||
boolean groupManagement = extendedConsumerProperties.getExtension().isAutoRebalanceEnabled();
|
||||
if (groupManagement ||
|
||||
extendedConsumerProperties.getInstanceCount() == 1) {
|
||||
listenedPartitions = allPartitions;
|
||||
}
|
||||
@@ -354,6 +359,7 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
containerProperties.setIdleEventInterval(extendedConsumerProperties.getExtension().getIdleEventInterval());
|
||||
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
|
||||
resetOffsets(extendedConsumerProperties, consumerFactory, groupManagement, containerProperties);
|
||||
@SuppressWarnings("rawtypes")
|
||||
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
|
||||
new ConcurrentMessageListenerContainer(consumerFactory, containerProperties) {
|
||||
@@ -366,8 +372,14 @@ public class KafkaMessageChannelBinder extends
|
||||
};
|
||||
messageListenerContainer.setConcurrency(concurrency);
|
||||
// these won't be needed if the container is made a bean
|
||||
messageListenerContainer.setApplicationEventPublisher(getApplicationContext());
|
||||
if (getApplicationEventPublisher() != null) {
|
||||
messageListenerContainer.setApplicationEventPublisher(getApplicationEventPublisher());
|
||||
}
|
||||
else if (getApplicationContext() != null) {
|
||||
messageListenerContainer.setApplicationEventPublisher(getApplicationContext());
|
||||
}
|
||||
messageListenerContainer.setBeanName(destination.getName() + ".container");
|
||||
// end of these won't be needed...
|
||||
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
|
||||
messageListenerContainer.getContainerProperties()
|
||||
.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
|
||||
@@ -376,6 +388,9 @@ public class KafkaMessageChannelBinder extends
|
||||
else {
|
||||
messageListenerContainer.getContainerProperties()
|
||||
.setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
|
||||
if (extendedConsumerProperties.getExtension().isAckEachRecord()) {
|
||||
messageListenerContainer.getContainerProperties().setAckMode(AckMode.RECORD);
|
||||
}
|
||||
}
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
this.logger.debug(
|
||||
@@ -397,6 +412,58 @@ public class KafkaMessageChannelBinder extends
|
||||
return kafkaMessageDrivenChannelAdapter;
|
||||
}
|
||||
|
||||
/*
|
||||
* Reset the offsets if needed; may update the offsets in in the container's
|
||||
* topicPartitionInitialOffsets.
|
||||
*/
|
||||
private void resetOffsets(
|
||||
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
|
||||
final ConsumerFactory<?, ?> consumerFactory, boolean groupManagement,
|
||||
final ContainerProperties containerProperties) {
|
||||
|
||||
boolean resetOffsets = extendedConsumerProperties.getExtension().isResetOffsets();
|
||||
final Object resetTo = consumerFactory.getConfigurationProperties().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
|
||||
final AtomicBoolean initialAssignment = new AtomicBoolean(true);
|
||||
if (!"earliest".equals(resetTo) && "!latest".equals(resetTo)) {
|
||||
logger.warn("no (or unknown) " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
|
||||
" property cannot reset");
|
||||
resetOffsets = false;
|
||||
}
|
||||
if (groupManagement && resetOffsets) {
|
||||
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
// no op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
// no op
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
if (initialAssignment.getAndSet(false)) {
|
||||
if ("earliest".equals(resetTo)) {
|
||||
consumer.seekToBeginning(tps);
|
||||
}
|
||||
else if ("latest".equals(resetTo)) {
|
||||
consumer.seekToEnd(tps);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
else if (resetOffsets) {
|
||||
Arrays.stream(containerProperties.getTopicPartitions())
|
||||
.map(tpio -> new TopicPartitionInitialOffset(tpio.topic(), tpio.partition(),
|
||||
// SK GH-599 "earliest".equals(resetTo) ? SeekPosition.BEGINNING : SeekPosition.END))
|
||||
"earliest".equals(resetTo) ? 0L : Long.MAX_VALUE))
|
||||
.collect(Collectors.toList()).toArray(containerProperties.getTopicPartitions());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PolledConsumerResources createPolledConsumerResources(String name, String group,
|
||||
ConsumerDestination destination, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
|
||||
@@ -617,7 +684,7 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
}
|
||||
|
||||
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
|
||||
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2015-2017 the original author or authors.
|
||||
* Copyright 2015-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -17,40 +17,31 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
|
||||
import org.springframework.kafka.support.LoggingProducerListener;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
* @author David Turanski
|
||||
@@ -61,24 +52,20 @@ import org.springframework.util.ObjectUtils;
|
||||
* @author Henryk Konsek
|
||||
* @author Gary Russell
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Artem Bilan
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(Binder.class)
|
||||
@Import({ PropertyPlaceholderAutoConfiguration.class})
|
||||
@Import({KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderHealthIndicatorConfiguration.class })
|
||||
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
|
||||
public class KafkaBinderConfiguration {
|
||||
|
||||
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
|
||||
|
||||
@Autowired
|
||||
private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;
|
||||
|
||||
@Autowired
|
||||
private ProducerListener producerListener;
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
|
||||
@Autowired
|
||||
private KafkaProperties kafkaProperties;
|
||||
|
||||
@@ -94,7 +81,8 @@ public class KafkaBinderConfiguration {
|
||||
|
||||
@Bean
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
configurationProperties, provisioningProvider);
|
||||
kafkaMessageChannelBinder.setProducerListener(producerListener);
|
||||
@@ -108,41 +96,37 @@ public class KafkaBinderConfiguration {
|
||||
return new LoggingProducerListener();
|
||||
}
|
||||
|
||||
@Bean
|
||||
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
|
||||
props.putAll(configurationProperties.getConsumerConfiguration());
|
||||
}
|
||||
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
|
||||
consumerFactory);
|
||||
indicator.setTimeout(configurationProperties.getHealthTimeout());
|
||||
return indicator;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
@Nullable MeterRegistry meterRegistry) {
|
||||
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, null, meterRegistry);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
|
||||
return new KafkaJaasLoginModuleInitializer();
|
||||
}
|
||||
|
||||
/**
|
||||
* A conditional configuration for the {@link KafkaBinderMetrics} bean when the
|
||||
* {@link MeterRegistry} class is in classpath, as well as a {@link MeterRegistry} bean is
|
||||
* present in the application context.
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(MeterRegistry.class)
|
||||
@ConditionalOnBean(MeterRegistry.class)
|
||||
protected class KafkaBinderMetricsConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
|
||||
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
MeterRegistry meterRegistry) {
|
||||
|
||||
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, null, meterRegistry);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class JaasConfigurationProperties {
|
||||
|
||||
private JaasLoginModuleConfiguration kafka;
|
||||
|
||||
private JaasLoginModuleConfiguration zookeeper;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
*
|
||||
*/
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnClass(name="org.springframework.boot.actuate.health.HealthIndicator")
|
||||
class KafkaBinderHealthIndicatorConfiguration {
|
||||
|
||||
@Bean
|
||||
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties) {
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
|
||||
props.putAll(configurationProperties.getConsumerConfiguration());
|
||||
}
|
||||
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
|
||||
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
|
||||
}
|
||||
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
|
||||
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
|
||||
consumerFactory);
|
||||
indicator.setTimeout(configurationProperties.getHealthTimeout());
|
||||
return indicator;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright 2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaAdminProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.config.BinderFactoryConfiguration;
|
||||
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
|
||||
import org.springframework.integration.config.EnableIntegration;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 2.0
|
||||
*
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = {KafkaBinderConfiguration.class,
|
||||
BinderFactoryConfiguration.class,
|
||||
BindingServiceConfiguration.class })
|
||||
@TestPropertySource(properties = {
|
||||
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replication-factor=2",
|
||||
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replicas-assignments.0=0,1",
|
||||
"spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0" })
|
||||
@EnableIntegration
|
||||
public class AdminConfigTests {
|
||||
|
||||
@Autowired
|
||||
private KafkaMessageChannelBinder binder;
|
||||
|
||||
@Test
|
||||
public void testProps() {
|
||||
KafkaConsumerProperties consumerProps = this.binder.getExtendedConsumerProperties("input");
|
||||
KafkaAdminProperties admin = consumerProps.getAdmin();
|
||||
assertThat(admin.getReplicationFactor()).isEqualTo((short) 2);
|
||||
assertThat(admin.getReplicasAssignments().get(0)).isEqualTo(Arrays.asList(0, 1));
|
||||
assertThat(admin.getConfiguration().get("message.format.version")).isEqualTo("0.9.0.0");
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
* Copyright 2016-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -28,14 +28,12 @@ import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
@@ -50,8 +48,7 @@ import static org.junit.Assert.assertTrue;
|
||||
* @author Ilayaperumal Gopinathan
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringBootTest(classes = { KafkaBinderAutoConfigurationPropertiesTest.KafkaBinderConfigProperties.class,
|
||||
KafkaBinderConfiguration.class })
|
||||
@SpringBootTest(classes = {KafkaBinderConfiguration.class })
|
||||
@TestPropertySource(locations = "classpath:binder-config-autoconfig.properties")
|
||||
public class KafkaBinderAutoConfigurationPropertiesTest {
|
||||
|
||||
@@ -124,13 +121,4 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
|
||||
bootstrapServers.add("10.98.09.196:9092");
|
||||
assertTrue(((List<String>) configs.get("bootstrap.servers")).containsAll(bootstrapServers));
|
||||
}
|
||||
|
||||
public static class KafkaBinderConfigProperties {
|
||||
|
||||
@Bean
|
||||
KafkaProperties kafkaProperties() {
|
||||
return new KafkaProperties();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,9 +20,9 @@ import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.search.Search;
|
||||
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
@@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
@@ -71,20 +72,20 @@ public class KafkaBinderMetricsTest {
|
||||
org.mockito.BDDMockito.given(consumerFactory.createConsumer()).willReturn(consumer);
|
||||
org.mockito.BDDMockito.given(binder.getTopicsInUse()).willReturn(topicsInUse);
|
||||
metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties, consumerFactory, null);
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(org.mockito.Matchers.anyCollectionOf(TopicPartition.class)))
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(ArgumentMatchers.anyCollection()))
|
||||
.willReturn(java.util.Collections.singletonMap(new TopicPartition(TEST_TOPIC, 0), 1000L));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldIndicateLag() {
|
||||
org.mockito.BDDMockito.given(consumer.committed(org.mockito.Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(group.gauge().value()).isEqualTo(500.0);
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
|
||||
.value(TimeUnit.MILLISECONDS)).isEqualTo(500.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -92,15 +93,15 @@ public class KafkaBinderMetricsTest {
|
||||
Map<TopicPartition, Long> endOffsets = new HashMap<>();
|
||||
endOffsets.put(new TopicPartition(TEST_TOPIC, 0), 1000L);
|
||||
endOffsets.put(new TopicPartition(TEST_TOPIC, 1), 1000L);
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(org.mockito.Matchers.anyCollectionOf(TopicPartition.class))).willReturn(endOffsets);
|
||||
org.mockito.BDDMockito.given(consumer.committed(org.mockito.Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
org.mockito.BDDMockito.given(consumer.endOffsets(ArgumentMatchers.anyCollection())).willReturn(endOffsets);
|
||||
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
|
||||
List<PartitionInfo> partitions = partitions(new Node(0, null, 0), new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(group.gauge().value()).isEqualTo(1000.0);
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
|
||||
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -110,8 +111,8 @@ public class KafkaBinderMetricsTest {
|
||||
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
|
||||
metrics.bindTo(meterRegistry);
|
||||
assertThat(meterRegistry.getMeters()).hasSize(1);
|
||||
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
|
||||
assertThat(group.gauge().value()).isEqualTo(1000.0);
|
||||
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
|
||||
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -100,6 +100,8 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.support.Acknowledgment;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
@@ -1095,6 +1097,11 @@ public class KafkaBinderTests extends
|
||||
"testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", "test", moduleInputChannel,
|
||||
consumerProperties);
|
||||
|
||||
|
||||
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding,
|
||||
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
|
||||
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.BATCH);
|
||||
|
||||
String testPayload1 = "foo" + UUID.randomUUID().toString();
|
||||
Message<?> message1 = org.springframework.integration.support.MessageBuilder.withPayload(
|
||||
testPayload1.getBytes()).build();
|
||||
@@ -1131,12 +1138,17 @@ public class KafkaBinderTests extends
|
||||
QueueChannel inbound1 = new QueueChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
|
||||
consumerProperties.getExtension().setAckEachRecord(true);
|
||||
Binding<MessageChannel> consumerBinding1 = binder.bindConsumer(testDestination, "test1", inbound1,
|
||||
consumerProperties);
|
||||
QueueChannel inbound2 = new QueueChannel();
|
||||
Binding<MessageChannel> consumerBinding2 = binder.bindConsumer(testDestination, "test2", inbound2,
|
||||
consumerProperties);
|
||||
|
||||
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding2,
|
||||
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
|
||||
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.RECORD);
|
||||
|
||||
Message<?> receivedMessage1 = receive(inbound1);
|
||||
assertThat(receivedMessage1).isNotNull();
|
||||
assertThat(new String((byte[]) receivedMessage1.getPayload(), StandardCharsets.UTF_8)).isEqualTo(testPayload);
|
||||
|
||||
@@ -17,10 +17,20 @@
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecords;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
@@ -28,9 +38,24 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.test.util.TestUtils;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyBoolean;
|
||||
import static org.mockito.ArgumentMatchers.anyInt;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.BDDMockito.given;
|
||||
import static org.mockito.BDDMockito.willAnswer;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
@@ -76,4 +101,134 @@ public class KafkaBinderUnitTests {
|
||||
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithGroupManagementEarliest() throws Exception {
|
||||
testOffsetResetWithGroupManagement(true, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithGroupManagementLatest() throws Throwable {
|
||||
testOffsetResetWithGroupManagement(false, true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithManualAssignmentEarliest() throws Exception {
|
||||
testOffsetResetWithGroupManagement(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOffsetResetWithGroupManualAssignmentLatest() throws Throwable {
|
||||
testOffsetResetWithGroupManagement(false, false);
|
||||
}
|
||||
|
||||
private void testOffsetResetWithGroupManagement(final boolean earliest, boolean groupManage) throws Exception {
|
||||
final List<TopicPartition> partitions = new ArrayList<>();
|
||||
partitions.add(new TopicPartition("foo", 0));
|
||||
partitions.add(new TopicPartition("foo", 1));
|
||||
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties();
|
||||
KafkaTopicProvisioner provisioningProvider = mock(KafkaTopicProvisioner.class);
|
||||
ConsumerDestination dest = mock(ConsumerDestination.class);
|
||||
given(dest.getName()).willReturn("foo");
|
||||
given(provisioningProvider.provisionConsumerDestination(anyString(), anyString(), any())).willReturn(dest);
|
||||
final AtomicInteger part = new AtomicInteger();
|
||||
willAnswer(i -> {
|
||||
return partitions.stream()
|
||||
.map(p -> new PartitionInfo("foo", part.getAndIncrement(), null, null, null))
|
||||
.collect(Collectors.toList());
|
||||
}).given(provisioningProvider).getPartitionsForTopic(anyInt(), anyBoolean(), any());
|
||||
@SuppressWarnings("unchecked")
|
||||
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
willAnswer(i -> {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
return new ConsumerRecords<>(Collections.emptyMap());
|
||||
}).given(consumer).poll(anyLong());
|
||||
willAnswer(i -> {
|
||||
((org.apache.kafka.clients.consumer.ConsumerRebalanceListener) i.getArgument(1))
|
||||
.onPartitionsAssigned(partitions);
|
||||
latch.countDown();
|
||||
latch.countDown();
|
||||
return null;
|
||||
}).given(consumer).subscribe(eq(Collections.singletonList("foo")),
|
||||
any(org.apache.kafka.clients.consumer.ConsumerRebalanceListener.class));
|
||||
willAnswer(i -> {
|
||||
latch.countDown();
|
||||
return null;
|
||||
}).given(consumer).seek(any(), anyLong());
|
||||
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties, provisioningProvider) {
|
||||
|
||||
@Override
|
||||
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
|
||||
|
||||
return new ConsumerFactory<byte[], byte[]>() {
|
||||
|
||||
@Override
|
||||
public Consumer<byte[], byte[]> createConsumer() {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Consumer<byte[], byte[]> createConsumer(String arg0) {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Consumer<byte[], byte[]> createConsumer(String arg0, String arg1) {
|
||||
return consumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoCommit() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getConfigurationProperties() {
|
||||
return Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
|
||||
earliest ? "earliest" : "latest");
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
};
|
||||
GenericApplicationContext context = new GenericApplicationContext();
|
||||
context.refresh();
|
||||
binder.setApplicationContext(context);
|
||||
MessageChannel channel = new DirectChannel();
|
||||
KafkaConsumerProperties extension = new KafkaConsumerProperties();
|
||||
extension.setResetOffsets(true);
|
||||
extension.setAutoRebalanceEnabled(groupManage);
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<KafkaConsumerProperties>(
|
||||
extension);
|
||||
consumerProperties.setInstanceCount(1);
|
||||
binder.bindConsumer("foo", "bar", channel, consumerProperties);
|
||||
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
|
||||
if (groupManage) {
|
||||
if (earliest) {
|
||||
verify(consumer).seekToBeginning(partitions);
|
||||
}
|
||||
else {
|
||||
verify(consumer).seekToEnd(partitions);
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (earliest) {
|
||||
verify(consumer).seek(partitions.get(0), 0L);
|
||||
verify(consumer).seek(partitions.get(1), 0L);
|
||||
}
|
||||
else {
|
||||
verify(consumer).seek(partitions.get(0), Long.MAX_VALUE);
|
||||
verify(consumer).seek(partitions.get(1), Long.MAX_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.integration;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.search.Search;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
@@ -27,7 +27,9 @@ import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.FilteredClassLoader;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.messaging.Sink;
|
||||
@@ -40,12 +42,12 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
/**
|
||||
* @author Artem Bilan
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Jon Schneider
|
||||
*
|
||||
* @since 2.0
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(
|
||||
webEnvironment = SpringBootTest.WebEnvironment.NONE,
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
|
||||
properties = "spring.cloud.stream.bindings.input.group=" + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP)
|
||||
public class KafkaBinderActuatorTests {
|
||||
|
||||
@@ -74,15 +76,24 @@ public class KafkaBinderActuatorTests {
|
||||
|
||||
@Test
|
||||
public void testKafkaBinderMetricsExposed() {
|
||||
Search search = this.meterRegistry.find(
|
||||
String.format("%s.%s.%s.lag", "spring.cloud.stream.binder.kafka", TEST_CONSUMER_GROUP, Sink.INPUT));
|
||||
|
||||
assertThat(search.gauge()).isNotNull();
|
||||
|
||||
this.kafkaTemplate.send(Sink.INPUT, null, "foo".getBytes());
|
||||
this.kafkaTemplate.flush();
|
||||
|
||||
assertThat(search.gauge().value()).isGreaterThan(0);
|
||||
assertThat(this.meterRegistry.get("spring.cloud.stream.binder.kafka.offset")
|
||||
.tag("group", TEST_CONSUMER_GROUP)
|
||||
.tag("topic", Sink.INPUT)
|
||||
.timeGauge().value()).isGreaterThan(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKafkaBinderMetricsWhenNoMicrometer() {
|
||||
new ApplicationContextRunner()
|
||||
.withUserConfiguration(KafkaMetricsTestConfig.class)
|
||||
.withClassLoader(new FilteredClassLoader("io.micrometer.core"))
|
||||
.run(context -> {
|
||||
assertThat(context.getBeanNamesForType(MeterRegistry.class)).isEmpty();
|
||||
assertThat(context.getBeanNamesForType(MeterBinder.class)).isEmpty();
|
||||
});
|
||||
}
|
||||
|
||||
@EnableBinding(Sink.class)
|
||||
|
||||
Reference in New Issue
Block a user