Compare commits

..

16 Commits

Author SHA1 Message Date
Soby Chacko
b8267ea81e 2.0.0.RC3 Release 2018-03-12 15:23:09 -04:00
Gary Russell
2b595b004f GH-337: Add ackEachRecord property
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/337
Resolves #338
2018-03-10 11:02:50 -05:00
Gary Russell
de45edc962 Add missing binder headerMapperBeanName to doc. 2018-03-08 11:46:03 -05:00
Gary Russell
2406fe5237 Event Publisher Polishing
Now that the abstract binder makes its event publisher available to subclasses,
use it, if present, instead of the application context.
In most cases, they will be the same object, but the user might override the
publisher.

Resolves #336
2018-03-08 09:23:14 -05:00
Oleg Zhurakousky
b5a0013e1e GH-330 Polishing
Resolves #330
Resolves #334
2018-03-08 09:11:11 -05:00
Gary Russell
c814ad5595 GH-330: Kafka Topic Provisioning Improvements
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/330

- allow override of binder-wide `replicationFactor` for each binding
- allow specific partition/replica configuration
- allow setting `NewTopic.configs()` properties, similar to the consumer and producer
- use a new `AdminClient` for provisioning (and `close()` it) instead of keeping a long-lived connection open.
2018-03-08 09:10:27 -05:00
Oleg Zhurakousky
def2c3d0ed SCST-GH-1259 Polishing
- added '@DeprecatedConfigurationProperty'
- minor doc polishing

Resolves #335
2018-03-07 16:04:44 -05:00
Gary Russell
10a44d1e44 SCST-GH-1259: Kafka Binder Doc Polishing
Fixes https://github.com/spring-cloud/spring-cloud-stream/issues/1259

Also deprecate properties that are no longer used.

Missed a save.
2018-03-07 16:03:36 -05:00
Oleg Zhurakousky
0de078ca48 GH-326 added KafkaAutoConfiguration to KafkaBinderConfiguration
- added KafkaAutoConfiguration to the @Import of KafkaBinderConfiguration
- removed 'optional' flag for KafkaProperties from KafkaBinderConfigurationProperties
- fixed KafkaBinderAutoConfigurationPropertiesTest

Resolves #326
Resolves #333
2018-03-06 13:30:41 -05:00
Gary Russell
8035e25359 GH-67: Workaround for SK GH-599
Fixes #67

Spring Kafka currently doesn't support `TPIO.SeekPosition` for initial offsets.
Instead, use 0 and `Long.MAX_VALUE` for `BEGINNING` and `END` respectively.

Resolves #331
2018-03-06 13:16:53 -05:00
Artem Bilan
bcf15ed3be GH-328: Make KafkaBinderMetrics bean conditional
Fixes: spring-cloud/spring-cloud-stream-binder-kafka#328

Since we consider a Micrometer dependency as an optional, it would be
better do not expose beans which depends of that library

* Move `KafkaBinderMetrics` to its own `@Configuration` class with
appropriate conditions on the classpath and beans presence
* Add an `ApplicationContextRunner`-based test-case to achieve a
condition when Micrometer is not in classpath via `FilteredClassLoader`
hook

Resolves #328
Resolves #332
2018-03-06 12:40:16 -05:00
Gary Russell
d37ef750ad GH-67: Reinstate resetOffsets property
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/67

Currently only supported with group management (`autoRebalanceEnabled` - default true).

See https://github.com/spring-projects/spring-kafka/issues/599

Resolves #67
Resolves #329
2018-03-05 20:50:17 -05:00
Oleg Zhurakousky
d8baca5a66 Polishing
Resolves #325
2018-03-05 20:03:24 -05:00
Jon Schneider
b9d7f1f537 Change KafkaBinderMetrics to a tagged TimeGauge 2018-03-05 19:56:12 -05:00
Oleg Zhurakousky
ad819ece92 GH-322 added @Conditional for 'KafkaBinderHealthIndicator' bean 2018-03-02 09:08:39 -05:00
Soby Chacko
e254968eaf Next version: 2.0.0.BUILD-SNAPSHOT 2018-03-01 09:30:11 -05:00
23 changed files with 891 additions and 302 deletions

View File

@@ -2,7 +2,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC2</version>
<version>2.0.0.RC3</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
@@ -15,7 +15,7 @@
<spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.0.3.RELEASE</spring-integration-kafka.version>
<kafka.version>1.0.0</kafka.version>
<spring-cloud-stream.version>2.0.0.RC2</spring-cloud-stream.version>
<spring-cloud-stream.version>2.0.0.RC3</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC2</version>
<version>2.0.0.RC3</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC2</version>
<version>2.0.0.RC3</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -0,0 +1,62 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.properties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Properties for configuring topics.
*
* @author Gary Russell
* @since 2.0
*
*/
public class KafkaAdminProperties {
private Short replicationFactor;
private Map<Integer, List<Integer>> replicasAssignments = new HashMap<>();
private Map<String, String> configuration = new HashMap<>();
public Short getReplicationFactor() {
return this.replicationFactor;
}
public void setReplicationFactor(Short replicationFactor) {
this.replicationFactor = replicationFactor;
}
public Map<Integer, List<Integer>> getReplicasAssignments() {
return this.replicasAssignments;
}
public void setReplicasAssignments(Map<Integer, List<Integer>> replicasAssignments) {
this.replicasAssignments = replicasAssignments;
}
public Map<String, String> getConfiguration() {
return this.configuration;
}
public void setConfiguration(Map<String, String> configuration) {
this.configuration = configuration;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2017 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.DeprecatedConfigurationProperty;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
@@ -45,7 +46,7 @@ public class KafkaBinderConfigurationProperties {
private final Transaction transaction = new Transaction();
@Autowired(required = false)
@Autowired
private KafkaProperties kafkaProperties;
private String[] zkNodes = new String[] { "localhost" };
@@ -86,7 +87,7 @@ public class KafkaBinderConfigurationProperties {
private String requiredAcks = "1";
private int replicationFactor = 1;
private short replicationFactor = 1;
private int fetchSize = 1024 * 1024;
@@ -110,6 +111,13 @@ public class KafkaBinderConfigurationProperties {
return this.transaction;
}
/**
* No longer used.
* @return the connection String
* @deprecated connection to zookeeper is no longer necessary
*/
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
@Deprecated
public String getZkConnectionString() {
return toConnectionString(this.zkNodes, this.defaultZkPort);
}
@@ -126,26 +134,68 @@ public class KafkaBinderConfigurationProperties {
return this.headers;
}
/**
* No longer used.
* @return the window.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getOffsetUpdateTimeWindow() {
return this.offsetUpdateTimeWindow;
}
/**
* No longer used.
* @return the count.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getOffsetUpdateCount() {
return this.offsetUpdateCount;
}
/**
* No longer used.
* @return the timeout.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getOffsetUpdateShutdownTimeout() {
return this.offsetUpdateShutdownTimeout;
}
/**
* Zookeeper nodes.
* @return the nodes.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public String[] getZkNodes() {
return this.zkNodes;
}
/**
* Zookeeper nodes.
* @param zkNodes the nodes.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public void setZkNodes(String... zkNodes) {
this.zkNodes = zkNodes;
}
/**
* Zookeeper port.
* @param defaultZkPort the port.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public void setDefaultZkPort(String defaultZkPort) {
this.defaultZkPort = defaultZkPort;
}
@@ -166,30 +216,79 @@ public class KafkaBinderConfigurationProperties {
this.headers = headers;
}
/**
* No longer used.
* @param offsetUpdateTimeWindow the window.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setOffsetUpdateTimeWindow(int offsetUpdateTimeWindow) {
this.offsetUpdateTimeWindow = offsetUpdateTimeWindow;
}
/**
* No longer used.
* @param offsetUpdateCount the count.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setOffsetUpdateCount(int offsetUpdateCount) {
this.offsetUpdateCount = offsetUpdateCount;
}
/**
* No longer used.
* @param offsetUpdateShutdownTimeout the timeout.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setOffsetUpdateShutdownTimeout(int offsetUpdateShutdownTimeout) {
this.offsetUpdateShutdownTimeout = offsetUpdateShutdownTimeout;
}
/**
* Zookeeper session timeout.
* @return the timeout.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public int getZkSessionTimeout() {
return this.zkSessionTimeout;
}
/**
* Zookeeper session timeout.
* @param zkSessionTimeout the timout
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public void setZkSessionTimeout(int zkSessionTimeout) {
this.zkSessionTimeout = zkSessionTimeout;
}
/**
* Zookeeper connection timeout.
* @return the timout.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public int getZkConnectionTimeout() {
return this.zkConnectionTimeout;
}
/**
* Zookeeper connection timeout.
* @param zkConnectionTimeout the timeout.
* @deprecated connection to zookeeper is no longer necessary
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "No longer necessary since 2.0")
public void setZkConnectionTimeout(int zkConnectionTimeout) {
this.zkConnectionTimeout = zkConnectionTimeout;
}
@@ -212,10 +311,24 @@ public class KafkaBinderConfigurationProperties {
return StringUtils.arrayToCommaDelimitedString(fullyFormattedHosts);
}
/**
* No longer used.
* @return the wait.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getMaxWait() {
return this.maxWait;
}
/**
* No longer user.
* @param maxWait the wait.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setMaxWait(int maxWait) {
this.maxWait = maxWait;
}
@@ -232,18 +345,32 @@ public class KafkaBinderConfigurationProperties {
this.requiredAcks = requiredAcks;
}
public int getReplicationFactor() {
public short getReplicationFactor() {
return this.replicationFactor;
}
public void setReplicationFactor(int replicationFactor) {
public void setReplicationFactor(short replicationFactor) {
this.replicationFactor = replicationFactor;
}
/**
* No longer used.
* @return the size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getFetchSize() {
return this.fetchSize;
}
/**
* No longer used.
* @param fetchSize the size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
}
@@ -264,10 +391,24 @@ public class KafkaBinderConfigurationProperties {
this.healthTimeout = healthTimeout;
}
/**
* No longer used.
* @return the queue size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public int getQueueSize() {
return this.queueSize;
}
/**
* No longer used.
* @param queueSize the queue size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0")
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
@@ -288,10 +429,26 @@ public class KafkaBinderConfigurationProperties {
this.autoAddPartitions = autoAddPartitions;
}
/**
* No longer used; set properties such as this via {@link #getConfiguration()
* configuration}.
* @return the size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0, set properties such as this via 'configuration'")
public int getSocketBufferSize() {
return this.socketBufferSize;
}
/**
* No longer used; set properties such as this via {@link #getConfiguration()
* configuration}.
* @param socketBufferSize the size.
* @deprecated
*/
@Deprecated
@DeprecatedConfigurationProperty(reason = "Not used since 2.0, set properties such as this via 'configuration'")
public void setSocketBufferSize(int socketBufferSize) {
this.socketBufferSize = socketBufferSize;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -53,6 +53,8 @@ public class KafkaConsumerProperties {
both
}
private boolean ackEachRecord;
private boolean autoRebalanceEnabled = true;
private boolean autoCommitOffset = true;
@@ -61,6 +63,8 @@ public class KafkaConsumerProperties {
private StartOffset startOffset;
private boolean resetOffsets;
private boolean enableDlq;
private String dlqName;
@@ -79,6 +83,16 @@ public class KafkaConsumerProperties {
private Map<String, String> configuration = new HashMap<>();
private KafkaAdminProperties admin = new KafkaAdminProperties();
public boolean isAckEachRecord() {
return this.ackEachRecord;
}
public void setAckEachRecord(boolean ackEachRecord) {
this.ackEachRecord = ackEachRecord;
}
public boolean isAutoCommitOffset() {
return this.autoCommitOffset;
}
@@ -95,6 +109,14 @@ public class KafkaConsumerProperties {
this.startOffset = startOffset;
}
public boolean isResetOffsets() {
return this.resetOffsets;
}
public void setResetOffsets(boolean resetOffsets) {
this.resetOffsets = resetOffsets;
}
public boolean isEnableDlq() {
return this.enableDlq;
}
@@ -111,10 +133,22 @@ public class KafkaConsumerProperties {
this.autoCommitOnError = autoCommitOnError;
}
/**
* No longer used.
* @return the interval.
* @deprecated
*/
@Deprecated
public int getRecoveryInterval() {
return this.recoveryInterval;
}
/**
* No longer used.
* @param recoveryInterval the interval.
* @deprecated
*/
@Deprecated
public void setRecoveryInterval(int recoveryInterval) {
this.recoveryInterval = recoveryInterval;
}
@@ -182,4 +216,12 @@ public class KafkaConsumerProperties {
this.idleEventInterval = idleEventInterval;
}
public KafkaAdminProperties getAdmin() {
return this.admin;
}
public void setAdmin(KafkaAdminProperties admin) {
this.admin = admin;
}
}

View File

@@ -44,6 +44,8 @@ public class KafkaProducerProperties {
private Map<String, String> configuration = new HashMap<>();
private KafkaAdminProperties admin = new KafkaAdminProperties();
public int getBufferSize() {
return this.bufferSize;
}
@@ -101,6 +103,15 @@ public class KafkaProducerProperties {
this.configuration = configuration;
}
public KafkaAdminProperties getAdmin() {
return this.admin;
}
public void setAdmin(KafkaAdminProperties admin) {
this.admin = admin;
}
public enum CompressionType {
none,
gzip,

View File

@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.provisioning;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -44,6 +45,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.BinderException;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaAdminProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
@@ -67,6 +69,7 @@ import org.springframework.util.StringUtils;
* @author Gary Russell
* @author Ilayaperumal Gopinathan
* @author Simon Flandergan
* @author Oleg Zhurakousky
*/
public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsumerProperties<KafkaConsumerProperties>,
ExtendedProducerProperties<KafkaProducerProperties>>, InitializingBean {
@@ -77,19 +80,18 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
private final KafkaBinderConfigurationProperties configurationProperties;
private final AdminClient adminClient;
private final int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
private final Map<String, Object> adminClientProperties;
private RetryOperations metadataRetryOperations;
private final int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
public KafkaTopicProvisioner(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties,
KafkaProperties kafkaProperties) {
Assert.isTrue(kafkaProperties != null, "KafkaProperties cannot be null");
Map<String, Object> adminClientProperties = kafkaProperties.buildAdminProperties();
this.adminClientProperties = kafkaProperties.buildAdminProperties();
this.configurationProperties = kafkaBinderConfigurationProperties;
normalalizeBootPropsWithBinder(adminClientProperties, kafkaProperties, kafkaBinderConfigurationProperties);
this.adminClient = AdminClient.create(adminClientProperties);
}
/**
@@ -118,33 +120,38 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
}
@Override
public ProducerDestination provisionProducerDestination(final String name, ExtendedProducerProperties<KafkaProducerProperties> properties) {
public ProducerDestination provisionProducerDestination(final String name,
ExtendedProducerProperties<KafkaProducerProperties> properties) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Using kafka topic for outbound: " + name);
}
KafkaTopicUtils.validateTopicName(name);
createTopic(name, properties.getPartitionCount(), false);
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
try (AdminClient adminClient = AdminClient.create(this.adminClientProperties)) {
createTopic(adminClient, name, properties.getPartitionCount(), false, properties.getExtension().getAdmin());
int partitions = 0;
if (this.configurationProperties.isAutoCreateTopics()) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
try {
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
Map<String, TopicDescription> topicDescriptions = null;
try {
topicDescriptions = all.get(this.operationTimeout, TimeUnit.SECONDS);
}
catch (Exception e) {
throw new ProvisioningException("Problems encountered with partitions finding", e);
}
TopicDescription topicDescription = topicDescriptions.get(name);
int partitions = topicDescription.partitions().size();
return new KafkaProducerDestination(name, partitions);
partitions = topicDescription.partitions().size();
}
catch (Exception e) {
throw new ProvisioningException("Problems encountered with partitions finding", e);
}
}
else {
return new KafkaProducerDestination(name);
return new KafkaProducerDestination(name, partitions);
}
}
@Override
public ConsumerDestination provisionConsumerDestination(final String name, final String group, ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
public ConsumerDestination provisionConsumerDestination(final String name, final String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
KafkaTopicUtils.validateTopicName(name);
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
@@ -153,25 +160,32 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
throw new IllegalArgumentException("Instance count cannot be zero");
}
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
createTopic(name, partitionCount, properties.getExtension().isAutoRebalanceEnabled());
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
try {
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
TopicDescription topicDescription = topicDescriptions.get(name);
int partitions = topicDescription.partitions().size();
ConsumerDestination dlqTopic = createDlqIfNeedBe(name, group, properties, anonymous, partitions);
if (dlqTopic != null) {
return dlqTopic;
ConsumerDestination consumerDestination = new KafkaConsumerDestination(name);
try (AdminClient adminClient = createAdminClient()) {
createTopic(adminClient, name, partitionCount, properties.getExtension().isAutoRebalanceEnabled(),
properties.getExtension().getAdmin());
if (this.configurationProperties.isAutoCreateTopics()) {
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(name));
KafkaFuture<Map<String, TopicDescription>> all = describeTopicsResult.all();
try {
Map<String, TopicDescription> topicDescriptions = all.get(operationTimeout, TimeUnit.SECONDS);
TopicDescription topicDescription = topicDescriptions.get(name);
int partitions = topicDescription.partitions().size();
consumerDestination = createDlqIfNeedBe(adminClient, name, group, properties, anonymous, partitions);
if (consumerDestination == null) {
consumerDestination = new KafkaConsumerDestination(name, partitions);
}
}
catch (Exception e) {
throw new ProvisioningException("provisioning exception", e);
}
return new KafkaConsumerDestination(name, partitions);
}
catch (Exception e) {
throw new ProvisioningException("provisioning exception", e);
}
}
return new KafkaConsumerDestination(name);
return consumerDestination;
}
AdminClient createAdminClient() {
return AdminClient.create(this.adminClientProperties);
}
/**
@@ -209,14 +223,15 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
});
}
private ConsumerDestination createDlqIfNeedBe(String name, String group,
private ConsumerDestination createDlqIfNeedBe(AdminClient adminClient, String name, String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties,
boolean anonymous, int partitions) {
if (properties.getExtension().isEnableDlq() && !anonymous) {
String dlqTopic = StringUtils.hasText(properties.getExtension().getDlqName()) ?
properties.getExtension().getDlqName() : "error." + name + "." + group;
try {
createTopicAndPartitions(dlqTopic, partitions, properties.getExtension().isAutoRebalanceEnabled());
createTopicAndPartitions(adminClient, dlqTopic, partitions,
properties.getExtension().isAutoRebalanceEnabled(), properties.getExtension().getAdmin());
}
catch (Throwable throwable) {
if (throwable instanceof Error) {
@@ -231,9 +246,10 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
return null;
}
private void createTopic(String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker) {
private void createTopic(AdminClient adminClient, String name, int partitionCount, boolean tolerateLowerPartitionsOnBroker,
KafkaAdminProperties properties) {
try {
createTopicIfNecessary(name, partitionCount, tolerateLowerPartitionsOnBroker);
createTopicIfNecessary(adminClient, name, partitionCount, tolerateLowerPartitionsOnBroker, properties);
}
catch (Throwable throwable) {
if (throwable instanceof Error) {
@@ -245,16 +261,14 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
}
}
private void createTopicIfNecessary(final String topicName, final int partitionCount,
boolean tolerateLowerPartitionsOnBroker) throws Throwable {
if (this.configurationProperties.isAutoCreateTopics() && adminClient != null) {
createTopicAndPartitions(topicName, partitionCount, tolerateLowerPartitionsOnBroker);
private void createTopicIfNecessary(AdminClient adminClient, final String topicName, final int partitionCount,
boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties properties) throws Throwable {
if (this.configurationProperties.isAutoCreateTopics()) {
createTopicAndPartitions(adminClient, topicName, partitionCount, tolerateLowerPartitionsOnBroker,
properties);
}
else if (this.configurationProperties.isAutoCreateTopics() && adminClient == null) {
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
"No topic will be created by the binder");
}
else if (!this.configurationProperties.isAutoCreateTopics()) {
else {
this.logger.info("Auto creation of topics is disabled.");
}
}
@@ -262,9 +276,12 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
/**
* Creates a Kafka topic if needed, or try to increase its partition count to the
* desired number.
* @param adminClient
* @param adminProperties
*/
private void createTopicAndPartitions(final String topicName, final int partitionCount,
boolean tolerateLowerPartitionsOnBroker) throws Throwable {
private void createTopicAndPartitions(AdminClient adminClient, final String topicName, final int partitionCount,
boolean tolerateLowerPartitionsOnBroker, KafkaAdminProperties adminProperties) throws Throwable {
ListTopicsResult listTopicsResult = adminClient.listTopics();
KafkaFuture<Set<String>> namesFutures = listTopicsResult.names();
@@ -298,14 +315,26 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
}
}
}
else if (!names.contains(topicName)) {
else {
// always consider minPartitionCount for topic creation
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
partitionCount);
this.metadataRetryOperations.execute(context -> {
NewTopic newTopic = new NewTopic(topicName, effectivePartitionCount,
(short) configurationProperties.getReplicationFactor());
NewTopic newTopic;
Map<Integer, List<Integer>> replicasAssignments = adminProperties.getReplicasAssignments();
if (replicasAssignments != null && replicasAssignments.size() > 0) {
newTopic = new NewTopic(topicName, adminProperties.getReplicasAssignments());
}
else {
newTopic = new NewTopic(topicName, effectivePartitionCount,
adminProperties.getReplicationFactor() != null
? adminProperties.getReplicationFactor()
: configurationProperties.getReplicationFactor());
}
if (adminProperties.getConfiguration().size() > 0) {
newTopic.configs(adminProperties.getConfiguration());
}
CreateTopicsResult createTopicsResult = adminClient.createTopics(Collections.singletonList(newTopic));
try {
createTopicsResult.all().get(operationTimeout, TimeUnit.SECONDS);
@@ -318,6 +347,10 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
}
}
else {
logger.error("Failed to create topics", e.getCause());
throw e.getCause();
}
}
else {
logger.error("Failed to create topics", e.getCause());
@@ -365,10 +398,6 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
private final int partitions;
KafkaProducerDestination(String destinationName) {
this(destinationName, 0);
}
KafkaProducerDestination(String destinationName, Integer partitions) {
this.producerDestinationName = destinationName;
this.partitions = partitions;
@@ -420,10 +449,6 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
return this.consumerDestinationName;
}
public String getDlqName() {
return dlqName;
}
@Override
public String toString() {
return "KafkaConsumerDestination{" +

View File

@@ -55,10 +55,11 @@ public class KafkaTopicProvisionerTests {
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:9092");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, bootConfig);
AdminClient adminClient = KafkaTestUtils.getPropertyValue(provisioner, "adminClient", AdminClient.class);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Map configs = KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder.configs", Map.class);
assertThat(((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0)).isEqualTo("localhost:1234");
adminClient.close();
}
@SuppressWarnings("rawtypes")
@@ -73,13 +74,13 @@ public class KafkaTopicProvisionerTests {
binderConfig.getConfiguration().put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ts.getFile().getAbsolutePath());
binderConfig.setBrokers("localhost:1234");
KafkaTopicProvisioner provisioner = new KafkaTopicProvisioner(binderConfig, bootConfig);
AdminClient adminClient = KafkaTestUtils.getPropertyValue(provisioner, "adminClient", AdminClient.class);
AdminClient adminClient = provisioner.createAdminClient();
assertThat(KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder")).isInstanceOf(SslChannelBuilder.class);
Map configs = KafkaTestUtils.getPropertyValue(adminClient, "client.selector.channelBuilder.configs", Map.class);
assertThat(((List) configs.get(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)).get(0)).isEqualTo("localhost:1234");
adminClient.close();
}
@SuppressWarnings("rawtypes")
@Test
public void brokersInvalid() throws Exception {
KafkaProperties bootConfig = new KafkaProperties();

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC2</version>
<version>2.0.0.RC3</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

View File

@@ -7,7 +7,7 @@ In addition, this guide also explains the Kafka Streams binding capabilities of
== Usage
For using the Apache Kafka binder, you just need to add it to your Spring Cloud Stream application, using the following Maven coordinates:
To use Apache Kafka binder all you need is to add `spring-cloud-stream-binder-kafka` as a dependency to your Spring Cloud Stream application. Below is a Maven example:
[source,xml]
----
@@ -38,6 +38,11 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
The consumer group maps directly to the same Apache Kafka concept.
Partitioning also maps directly to Apache Kafka partitions as well.
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker at least that version.
This client can communicate with older brokers (refer to the Kafka documentation), but certain features may not be available.
For example, with versions earlier than 0.11.x.x, native headers are not supported.
Also, 0.11.x.x does not support the `autoAddPartitions` property.
== Configuration Options
This section contains the configuration options used by the Apache Kafka binder.
@@ -55,42 +60,24 @@ spring.cloud.stream.kafka.binder.defaultBrokerPort::
This sets the default port when no port is configured in the broker list.
+
Default: `9092`.
spring.cloud.stream.kafka.binder.zkNodes::
A list of ZooKeeper nodes to which the Kafka binder can connect.
+
Default: `localhost`.
spring.cloud.stream.kafka.binder.defaultZkPort::
`zkNodes` allows hosts specified with or without port information (e.g., `host1,host2:port2`).
This sets the default port when no port is configured in the node list.
+
Default: `2181`.
spring.cloud.stream.kafka.binder.configuration::
Key/Value map of client properties (both producers and consumer) passed to all clients created by the binder.
Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, especially security settings.
Due to the fact that these properties will be used by both producers and consumers, usage should be restricted to common properties, for example, security settings.
+
Default: Empty map.
spring.cloud.stream.kafka.binder.headers::
The list of custom headers that will be transported by the binder.
Only required when communicating with older applications (<= 1.3.x) with a `kafka-clients` version < 0.11.0.0; newer versions support headers natively.
+
Default: empty.
spring.cloud.stream.kafka.binder.healthTimeout::
The time to wait to get partition information in seconds; default 60.
Health will report as down if this timer expires.
Health will report as down if this timer expires.
+
Default: 10.
spring.cloud.stream.kafka.binder.offsetUpdateTimeWindow::
The frequency, in milliseconds, with which offsets are saved.
Ignored if `0`.
+
Default: `10000`.
spring.cloud.stream.kafka.binder.offsetUpdateCount::
The frequency, in number of updates, which which consumed offsets are persisted.
Ignored if `0`.
Mutually exclusive with `offsetUpdateTimeWindow`.
+
Default: `0`.
spring.cloud.stream.kafka.binder.requiredAcks::
The number of required acks on the broker.
See the Kafka documentation for the producer `acks` property.
+
Default: `1`.
spring.cloud.stream.kafka.binder.minPartitionCount::
@@ -101,6 +88,7 @@ It can be superseded by the `partitionCount` setting of the producer or by the v
Default: `1`.
spring.cloud.stream.kafka.binder.replicationFactor::
The replication factor of auto-created topics if `autoCreateTopics` is active.
Can be overriden on each binding.
+
Default: `1`.
spring.cloud.stream.kafka.binder.autoCreateTopics::
@@ -116,10 +104,6 @@ If set to `false`, the binder will rely on the partition size of the topic being
If the partition count of the target topic is smaller than the expected value, the binder will fail to start.
+
Default: `false`.
spring.cloud.stream.kafka.binder.socketBufferSize::
Size (in bytes) of the socket buffer to be used by the Kafka consumers.
+
Default: `2097152`.
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix::
Enable transactions in the binder; see `transaction.id` in the Kafka documentation and https://docs.spring.io/spring-kafka/reference/html/_reference.html#transactions[Transactions] in the `spring-kafka` documentation.
When transactions are enabled, individual `producer` properties are ignored and all producers use the `spring.cloud.stream.kafka.binder.transaction.producer.*` properties.
@@ -131,12 +115,37 @@ spring.cloud.stream.kafka.binder.transaction.producer.*::
+
Default: See individual producer properties.
spring.cloud.stream.kafka.binder.headerMapperBeanName::
The bean name of a `KafkaHeaderMapper` used for mapping `spring-messaging` headers to/from Kafka headers.
Use this, for example, if you wish to customize the trusted packages in a `DefaultKafkaHeaderMapper`, which uses JSON deserialization for the headers.
+
Default: none.
[[kafka-consumer-properties]]
=== Kafka Consumer Properties
The following properties are available for Kafka consumers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
admin.configuration::
A `Map` of Kafka topic properties used when provisioning topics.
e.g. `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
+
Default: none.
admin.replicas-assignment::
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and value the assignments.
Used when provisioning new topics.
See `NewTopic` javadocs in the `kafka-clients` jar.
+
Default: none.
admin.replication-factor::
The replication factor to use when provisioning topics; overrides the binder-wide setting.
Ignored if `replicas-assignments` is present.
+
Default: none (the binder-wide default of 1 is used).
autoRebalanceEnabled::
When `true`, topic partitions will be automatically rebalanced between the members of a consumer group.
When `false`, each consumer will be assigned a fixed set of partitions based on `spring.cloud.stream.instanceCount` and `spring.cloud.stream.instanceIndex`.
@@ -144,12 +153,21 @@ This requires both `spring.cloud.stream.instanceCount` and `spring.cloud.stream.
The property `spring.cloud.stream.instanceCount` must typically be greater than 1 in this case.
+
Default: `true`.
ackEachRecord::
When `autoCommitOffset` is `true`, whether to commit the offset after each record is processed.
By default, offsets are committed after all records in the batch of records returned by `consumer.poll()` have been processed.
The number of records returned by a poll can be controlled with the `max.poll.recods` Kafka property, set via the consumer `configuration` property.
Setting this to true may cause a degradation in performance, but reduces the likelihood of redelivered records when a failure occurs.
Also see the binder `requiredAcks` property, which also affects the performance of committing offsets.
+
Default: `false`.
autoCommitOffset::
Whether to autocommit offsets when a message has been processed.
If set to `false`, a header with the key `kafka_acknowledgment` of the type `org.springframework.kafka.support.Acknowledgment` header will be present in the inbound message.
Applications may use this header for acknowledging messages.
See the examples section for details.
When this property is set to `false`, Kafka binder will set the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL`.
When this property is set to `false`, Kafka binder will set the ack mode to `org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL` and the application is responsible for acknowledging records.
Also see `ackEachRecord`.
+
Default: `true`.
autoCommitOnError::
@@ -159,14 +177,15 @@ If set to `true`, it will always auto-commit (if auto-commit is enabled).
If not set (default), it effectively has the same value as `enableDlq`, auto-committing erroneous messages if they are sent to a DLQ, and not committing them otherwise.
+
Default: not set.
recoveryInterval::
The interval between connection recovery attempts, in milliseconds.
resetOffsets::
Whether to reset offsets on the consumer to the value provided by startOffset.
+
Default: `5000`.
Default: `false`.
startOffset::
The starting offset for new groups.
Allowed values: `earliest`, `latest`.
If the consumer group is set explicitly for the consumer 'binding' (via `spring.cloud.stream.bindings.<channelName>.group`), then 'startOffset' is set to `earliest`; otherwise it is set to `latest` for the `anonymous` consumer group.
Also see `resetOffsets`.
+
Default: null (equivalent to `earliest`).
enableDlq::
@@ -214,6 +233,25 @@ Default: `30000`
The following properties are available for Kafka producers only and
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
admin.configuration::
A `Map` of Kafka topic properties used when provisioning new topics.
e.g. `spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0`
+
Default: none.
admin.replicas-assignment::
A Map<Integer, List<Integer>> of replica assignments, with the key being the partition and value the assignments.
Used when provisioning new topics.
See `NewTopic` javadocs in the `kafka-clients` jar.
+
Default: none.
admin.replication-factor::
The replication factor to use when provisioning new topics; overrides the binder-wide setting.
Ignored if `replicas-assignments` is present.
+
Default: none (the binder-wide default of 1 is used).
bufferSize::
Upper limit, in bytes, of how much data the Kafka producer will attempt to batch before sending.
+
@@ -229,15 +267,15 @@ batchTimeout::
Default: `0`.
messageKeyExpression::
A SpEL expression evaluated against the outgoing message used to populate the key of the produced Kafka message.
For example `headers.key` or `payload.myKey`.
For example `headers['myKey']`; the payload cannot be used because by the time this expression is evaluated, the payload is already in the form of a `byte[]`.
+
Default: `none`.
headerPatterns::
A comma-delimited list of simple patterns to match spring-messaging headers to be mapped to the kafka `Headers` in the `ProducerRecord`.
Patterns can begin or end with the wildcard character (asterisk).
Patterns can be negated by prefixing with `!`; matching stops after the first match (positive or negative).
For example `!foo,fo*` will pass `fox` but not `foo`.
`id` and `timestamp` are never mapped.
Patterns can begin or end with the wildcard character (asterisk).
Patterns can be negated by prefixing with `!`; matching stops after the first match (positive or negative).
For example `!foo,fo*` will pass `fox` but not `foo`.
`id` and `timestamp` are never mapped.
+
Default: `*` (all headers - except the `id` and `timestamp`)
configuration::
@@ -314,7 +352,6 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
----
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
----
@@ -343,7 +380,6 @@ Here is an example of launching a Spring Cloud Stream application with SASL and
[source]
----
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.kafka.binder.zkNodes=secure.zookeeper:2181 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
@@ -366,7 +402,7 @@ KafkaClient {
};
----
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent. As an alternative to setting `spring.cloud.stream.kafka.binder.autoCreateTopics` you can simply remove the broker dependency from the application. See <<exclude-admin-utils>> for details.
If the topics required already exist on the broker, or will be created by an administrator, autocreation can be turned off and only client JAAS properties need to be sent.
[NOTE]
====
@@ -421,103 +457,6 @@ public class Application {
}
----
==== Using the binder with Apache Kafka 0.10
The default Kafka support in Spring Cloud Stream Kafka binder is for Kafka version 0.10.1.1. The binder also supports connecting to other 0.10 based versions and 0.9 clients.
In order to do this, when you create the project that contains your application, include `spring-cloud-starter-stream-kafka` as you normally would do for the default binder.
Then add these dependencies at the top of the `<dependencies>` section in the pom.xml file to override the dependencies.
Here is an example for downgrading your application to 0.10.0.1. Since it is still on the 0.10 line, the default `spring-kafka` and `spring-integration-kafka` versions can be retained.
[source,xml]
----
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
----
Here is another example of using 0.9.0.1 version.
[source,xml]
----
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.0.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-kafka</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
----
[NOTE]
====
The versions above are provided only for the sake of the example.
For best results, we recommend using the most recent 0.10-compatible versions of the projects.
====
[[exclude-admin-utils]]
==== Excluding Kafka broker jar from the classpath of the binder based application
The Apache Kafka Binder uses the administrative utilities which are part of the Apache Kafka server library to create and reconfigure topics.
If the inclusion of the Apache Kafka server library and its dependencies is not necessary at runtime because the application will rely on the topics being configured administratively, the Kafka binder allows for Apache Kafka server dependency to be excluded from the application.
If you use non default versions for Kafka dependencies as advised above, all you have to do is not to include the kafka broker dependency.
If you use the default Kafka version, then ensure that you exclude the kafka broker jar from the `spring-cloud-starter-stream-kafka` dependency as following.
[source,xml]
----
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
----
If you exclude the Apache Kafka server dependency and the topic is not present on the server, then the Apache Kafka broker will create the topic if auto topic creation is enabled on the server.
Please keep in mind that if you are relying on this, then the Kafka server will use the default number of partitions and replication factors.
On the other hand, if auto topic creation is disabled on the server, then care must be taken before running the application to create the topic with the desired number of partitions.
If you want to have full control over how partitions are allocated, then leave the default settings as they are, i.e. do not exclude the kafka broker jar and ensure that `spring.cloud.stream.kafka.binder.autoCreateTopics` is set to `true`, which is the default.
[[kafka-error-channels]]
== Error Channels

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC2</version>
<version>2.0.0.RC3</version>
</parent>
<dependencies>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RC2</version>
<version>2.0.0.RC3</version>
</parent>
<dependencies>

View File

@@ -20,10 +20,11 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.TimeGauge;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
@@ -48,12 +49,13 @@ import org.springframework.util.ObjectUtils;
* @author Soby Chacko
* @author Artem Bilan
* @author Oleg Zhurakousky
* @author Jon Schneider
*/
public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
static final String METRIC_PREFIX = "spring.cloud.stream.binder.kafka";
static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
private final KafkaMessageChannelBinder binder;
@@ -91,8 +93,12 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
String topic = topicInfo.getKey();
String group = topicInfo.getValue().getConsumerGroup();
registry.gauge(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), this,
o -> calculateConsumerLagOnTopic(topic, group));
TimeGauge.builder(METRIC_NAME, this, TimeUnit.MILLISECONDS,
o -> calculateConsumerLagOnTopic(topic, group))
.tag("group", group)
.tag("topic", topic)
.description("Consumer lag for a particular group and topic")
.register(registry);
}
}
@@ -104,6 +110,7 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {

View File

@@ -28,7 +28,9 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -80,7 +82,9 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
@@ -325,7 +329,8 @@ public class KafkaMessageChannelBinder extends
Collection<PartitionInfo> listenedPartitions;
if (extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ||
boolean groupManagement = extendedConsumerProperties.getExtension().isAutoRebalanceEnabled();
if (groupManagement ||
extendedConsumerProperties.getInstanceCount() == 1) {
listenedPartitions = allPartitions;
}
@@ -354,6 +359,7 @@ public class KafkaMessageChannelBinder extends
}
containerProperties.setIdleEventInterval(extendedConsumerProperties.getExtension().getIdleEventInterval());
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
resetOffsets(extendedConsumerProperties, consumerFactory, groupManagement, containerProperties);
@SuppressWarnings("rawtypes")
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
new ConcurrentMessageListenerContainer(consumerFactory, containerProperties) {
@@ -366,8 +372,14 @@ public class KafkaMessageChannelBinder extends
};
messageListenerContainer.setConcurrency(concurrency);
// these won't be needed if the container is made a bean
messageListenerContainer.setApplicationEventPublisher(getApplicationContext());
if (getApplicationEventPublisher() != null) {
messageListenerContainer.setApplicationEventPublisher(getApplicationEventPublisher());
}
else if (getApplicationContext() != null) {
messageListenerContainer.setApplicationEventPublisher(getApplicationContext());
}
messageListenerContainer.setBeanName(destination.getName() + ".container");
// end of these won't be needed...
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties()
.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
@@ -376,6 +388,9 @@ public class KafkaMessageChannelBinder extends
else {
messageListenerContainer.getContainerProperties()
.setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
if (extendedConsumerProperties.getExtension().isAckEachRecord()) {
messageListenerContainer.getContainerProperties().setAckMode(AckMode.RECORD);
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(
@@ -397,6 +412,58 @@ public class KafkaMessageChannelBinder extends
return kafkaMessageDrivenChannelAdapter;
}
/*
* Reset the offsets if needed; may update the offsets in in the container's
* topicPartitionInitialOffsets.
*/
private void resetOffsets(
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
final ConsumerFactory<?, ?> consumerFactory, boolean groupManagement,
final ContainerProperties containerProperties) {
boolean resetOffsets = extendedConsumerProperties.getExtension().isResetOffsets();
final Object resetTo = consumerFactory.getConfigurationProperties().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
final AtomicBoolean initialAssignment = new AtomicBoolean(true);
if (!"earliest".equals(resetTo) && "!latest".equals(resetTo)) {
logger.warn("no (or unknown) " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
" property cannot reset");
resetOffsets = false;
}
if (groupManagement && resetOffsets) {
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
// no op
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
// no op
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
if (initialAssignment.getAndSet(false)) {
if ("earliest".equals(resetTo)) {
consumer.seekToBeginning(tps);
}
else if ("latest".equals(resetTo)) {
consumer.seekToEnd(tps);
}
}
}
});
}
else if (resetOffsets) {
Arrays.stream(containerProperties.getTopicPartitions())
.map(tpio -> new TopicPartitionInitialOffset(tpio.topic(), tpio.partition(),
// SK GH-599 "earliest".equals(resetTo) ? SeekPosition.BEGINNING : SeekPosition.END))
"earliest".equals(resetTo) ? 0L : Long.MAX_VALUE))
.collect(Collectors.toList()).toArray(containerProperties.getTopicPartitions());
}
}
@Override
protected PolledConsumerResources createPolledConsumerResources(String name, String group,
ConsumerDestination destination, ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
@@ -617,7 +684,7 @@ public class KafkaMessageChannelBinder extends
}
}
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2015-2017 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,40 +17,31 @@
package org.springframework.cloud.stream.binder.kafka.config;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;
/**
* @author David Turanski
@@ -61,24 +52,20 @@ import org.springframework.util.ObjectUtils;
* @author Henryk Konsek
* @author Gary Russell
* @author Oleg Zhurakousky
* @author Artem Bilan
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({ PropertyPlaceholderAutoConfiguration.class})
@Import({KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderHealthIndicatorConfiguration.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
@Autowired
private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;
@Autowired
private ProducerListener producerListener;
@Autowired
private ApplicationContext context;
@Autowired
private KafkaProperties kafkaProperties;
@@ -94,7 +81,8 @@ public class KafkaBinderConfiguration {
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaTopicProvisioner provisioningProvider) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider);
kafkaMessageChannelBinder.setProducerListener(producerListener);
@@ -108,41 +96,37 @@ public class KafkaBinderConfiguration {
return new LoggingProducerListener();
}
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
props.putAll(configurationProperties.getConsumerConfiguration());
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
@Bean
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties,
@Nullable MeterRegistry meterRegistry) {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, null, meterRegistry);
}
@Bean
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
return new KafkaJaasLoginModuleInitializer();
}
/**
* A conditional configuration for the {@link KafkaBinderMetrics} bean when the
* {@link MeterRegistry} class is in classpath, as well as a {@link MeterRegistry} bean is
* present in the application context.
*/
@Configuration
@ConditionalOnClass(MeterRegistry.class)
@ConditionalOnBean(MeterRegistry.class)
protected class KafkaBinderMetricsConfiguration {
@Bean
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties,
MeterRegistry meterRegistry) {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, null, meterRegistry);
}
}
public static class JaasConfigurationProperties {
private JaasLoginModuleConfiguration kafka;
private JaasLoginModuleConfiguration zookeeper;
}
}

View File

@@ -0,0 +1,63 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.ObjectUtils;
/**
*
* @author Oleg Zhurakousky
*
*/
@Configuration
@ConditionalOnClass(name="org.springframework.boot.actuate.health.HealthIndicator")
class KafkaBinderHealthIndicatorConfiguration {
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConsumerConfiguration())) {
props.putAll(configurationProperties.getConsumerConfiguration());
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder,
consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
}

View File

@@ -0,0 +1,64 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaAdminProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.config.BinderFactoryConfiguration;
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Gary Russell
* @since 2.0
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {KafkaBinderConfiguration.class,
BinderFactoryConfiguration.class,
BindingServiceConfiguration.class })
@TestPropertySource(properties = {
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replication-factor=2",
"spring.cloud.stream.kafka.bindings.input.consumer.admin.replicas-assignments.0=0,1",
"spring.cloud.stream.kafka.bindings.input.consumer.admin.configuration.message.format.version=0.9.0.0" })
@EnableIntegration
public class AdminConfigTests {
@Autowired
private KafkaMessageChannelBinder binder;
@Test
public void testProps() {
KafkaConsumerProperties consumerProps = this.binder.getExtendedConsumerProperties("input");
KafkaAdminProperties admin = consumerProps.getAdmin();
assertThat(admin.getReplicationFactor()).isEqualTo((short) 2);
assertThat(admin.getReplicasAssignments().get(0)).isEqualTo(Arrays.asList(0, 1));
assertThat(admin.getConfiguration().get("message.format.version")).isEqualTo("0.9.0.0");
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,14 +28,12 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -50,8 +48,7 @@ import static org.junit.Assert.assertTrue;
* @author Ilayaperumal Gopinathan
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = { KafkaBinderAutoConfigurationPropertiesTest.KafkaBinderConfigProperties.class,
KafkaBinderConfiguration.class })
@SpringBootTest(classes = {KafkaBinderConfiguration.class })
@TestPropertySource(locations = "classpath:binder-config-autoconfig.properties")
public class KafkaBinderAutoConfigurationPropertiesTest {
@@ -124,13 +121,4 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
bootstrapServers.add("10.98.09.196:9092");
assertTrue(((List<String>) configs.get("bootstrap.servers")).containsAll(bootstrapServers));
}
public static class KafkaBinderConfigProperties {
@Bean
KafkaProperties kafkaProperties() {
return new KafkaProperties();
}
}
}

View File

@@ -20,9 +20,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.search.Search;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -31,6 +31,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -71,20 +72,20 @@ public class KafkaBinderMetricsTest {
org.mockito.BDDMockito.given(consumerFactory.createConsumer()).willReturn(consumer);
org.mockito.BDDMockito.given(binder.getTopicsInUse()).willReturn(topicsInUse);
metrics = new KafkaBinderMetrics(binder, kafkaBinderConfigurationProperties, consumerFactory, null);
org.mockito.BDDMockito.given(consumer.endOffsets(org.mockito.Matchers.anyCollectionOf(TopicPartition.class)))
org.mockito.BDDMockito.given(consumer.endOffsets(ArgumentMatchers.anyCollection()))
.willReturn(java.util.Collections.singletonMap(new TopicPartition(TEST_TOPIC, 0), 1000L));
}
@Test
public void shouldIndicateLag() {
org.mockito.BDDMockito.given(consumer.committed(org.mockito.Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(group.gauge().value()).isEqualTo(500.0);
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
.value(TimeUnit.MILLISECONDS)).isEqualTo(500.0);
}
@Test
@@ -92,15 +93,15 @@ public class KafkaBinderMetricsTest {
Map<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(new TopicPartition(TEST_TOPIC, 0), 1000L);
endOffsets.put(new TopicPartition(TEST_TOPIC, 1), 1000L);
org.mockito.BDDMockito.given(consumer.endOffsets(org.mockito.Matchers.anyCollectionOf(TopicPartition.class))).willReturn(endOffsets);
org.mockito.BDDMockito.given(consumer.committed(org.mockito.Matchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
org.mockito.BDDMockito.given(consumer.endOffsets(ArgumentMatchers.anyCollection())).willReturn(endOffsets);
org.mockito.BDDMockito.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class))).willReturn(new OffsetAndMetadata(500));
List<PartitionInfo> partitions = partitions(new Node(0, null, 0), new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(group.gauge().value()).isEqualTo(1000.0);
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
}
@Test
@@ -110,8 +111,8 @@ public class KafkaBinderMetricsTest {
org.mockito.BDDMockito.given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
metrics.bindTo(meterRegistry);
assertThat(meterRegistry.getMeters()).hasSize(1);
Search group = meterRegistry.find(String.format("%s.%s.%s.lag", KafkaBinderMetrics.METRIC_PREFIX, "group", TEST_TOPIC));
assertThat(group.gauge().value()).isEqualTo(1000.0);
assertThat(meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge()
.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
}
@Test

View File

@@ -100,6 +100,8 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
@@ -1095,6 +1097,11 @@ public class KafkaBinderTests extends
"testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", "test", moduleInputChannel,
consumerProperties);
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding,
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.BATCH);
String testPayload1 = "foo" + UUID.randomUUID().toString();
Message<?> message1 = org.springframework.integration.support.MessageBuilder.withPayload(
testPayload1.getBytes()).build();
@@ -1131,12 +1138,17 @@ public class KafkaBinderTests extends
QueueChannel inbound1 = new QueueChannel();
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.getExtension().setAutoRebalanceEnabled(false);
consumerProperties.getExtension().setAckEachRecord(true);
Binding<MessageChannel> consumerBinding1 = binder.bindConsumer(testDestination, "test1", inbound1,
consumerProperties);
QueueChannel inbound2 = new QueueChannel();
Binding<MessageChannel> consumerBinding2 = binder.bindConsumer(testDestination, "test2", inbound2,
consumerProperties);
AbstractMessageListenerContainer<?, ?> container = TestUtils.getPropertyValue(consumerBinding2,
"lifecycle.messageListenerContainer", AbstractMessageListenerContainer.class);
assertThat(container.getContainerProperties().getAckMode()).isEqualTo(AckMode.RECORD);
Message<?> receivedMessage1 = receive(inbound1);
assertThat(receivedMessage1).isNotNull();
assertThat(new String((byte[]) receivedMessage1.getPayload(), StandardCharsets.UTF_8)).isEqualTo(testPayload);

View File

@@ -17,10 +17,20 @@
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
@@ -28,9 +38,24 @@ import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.messaging.MessageChannel;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
/**
* @author Gary Russell
@@ -76,4 +101,134 @@ public class KafkaBinderUnitTests {
assertThat(configs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)).isEqualTo("earliest");
}
@Test
public void testOffsetResetWithGroupManagementEarliest() throws Exception {
testOffsetResetWithGroupManagement(true, true);
}
@Test
public void testOffsetResetWithGroupManagementLatest() throws Throwable {
testOffsetResetWithGroupManagement(false, true);
}
@Test
public void testOffsetResetWithManualAssignmentEarliest() throws Exception {
testOffsetResetWithGroupManagement(true, false);
}
@Test
public void testOffsetResetWithGroupManualAssignmentLatest() throws Throwable {
testOffsetResetWithGroupManagement(false, false);
}
private void testOffsetResetWithGroupManagement(final boolean earliest, boolean groupManage) throws Exception {
final List<TopicPartition> partitions = new ArrayList<>();
partitions.add(new TopicPartition("foo", 0));
partitions.add(new TopicPartition("foo", 1));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties();
KafkaTopicProvisioner provisioningProvider = mock(KafkaTopicProvisioner.class);
ConsumerDestination dest = mock(ConsumerDestination.class);
given(dest.getName()).willReturn("foo");
given(provisioningProvider.provisionConsumerDestination(anyString(), anyString(), any())).willReturn(dest);
final AtomicInteger part = new AtomicInteger();
willAnswer(i -> {
return partitions.stream()
.map(p -> new PartitionInfo("foo", part.getAndIncrement(), null, null, null))
.collect(Collectors.toList());
}).given(provisioningProvider).getPartitionsForTopic(anyInt(), anyBoolean(), any());
@SuppressWarnings("unchecked")
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
final CountDownLatch latch = new CountDownLatch(2);
willAnswer(i -> {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return new ConsumerRecords<>(Collections.emptyMap());
}).given(consumer).poll(anyLong());
willAnswer(i -> {
((org.apache.kafka.clients.consumer.ConsumerRebalanceListener) i.getArgument(1))
.onPartitionsAssigned(partitions);
latch.countDown();
latch.countDown();
return null;
}).given(consumer).subscribe(eq(Collections.singletonList("foo")),
any(org.apache.kafka.clients.consumer.ConsumerRebalanceListener.class));
willAnswer(i -> {
latch.countDown();
return null;
}).given(consumer).seek(any(), anyLong());
KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(configurationProperties, provisioningProvider) {
@Override
protected ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
return new ConsumerFactory<byte[], byte[]>() {
@Override
public Consumer<byte[], byte[]> createConsumer() {
return consumer;
}
@Override
public Consumer<byte[], byte[]> createConsumer(String arg0) {
return consumer;
}
@Override
public Consumer<byte[], byte[]> createConsumer(String arg0, String arg1) {
return consumer;
}
@Override
public boolean isAutoCommit() {
return false;
}
@Override
public Map<String, Object> getConfigurationProperties() {
return Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
earliest ? "earliest" : "latest");
}
};
}
};
GenericApplicationContext context = new GenericApplicationContext();
context.refresh();
binder.setApplicationContext(context);
MessageChannel channel = new DirectChannel();
KafkaConsumerProperties extension = new KafkaConsumerProperties();
extension.setResetOffsets(true);
extension.setAutoRebalanceEnabled(groupManage);
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = new ExtendedConsumerProperties<KafkaConsumerProperties>(
extension);
consumerProperties.setInstanceCount(1);
binder.bindConsumer("foo", "bar", channel, consumerProperties);
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
if (groupManage) {
if (earliest) {
verify(consumer).seekToBeginning(partitions);
}
else {
verify(consumer).seekToEnd(partitions);
}
}
else {
if (earliest) {
verify(consumer).seek(partitions.get(0), 0L);
verify(consumer).seek(partitions.get(1), 0L);
}
else {
verify(consumer).seek(partitions.get(0), Long.MAX_VALUE);
verify(consumer).seek(partitions.get(1), Long.MAX_VALUE);
}
}
}
}

View File

@@ -17,7 +17,7 @@
package org.springframework.cloud.stream.binder.kafka.integration;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.search.Search;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -27,7 +27,9 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.FilteredClassLoader;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
@@ -40,12 +42,12 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Artem Bilan
* @author Oleg Zhurakousky
* @author Jon Schneider
*
* @since 2.0
*/
@RunWith(SpringRunner.class)
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.NONE,
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE,
properties = "spring.cloud.stream.bindings.input.group=" + KafkaBinderActuatorTests.TEST_CONSUMER_GROUP)
public class KafkaBinderActuatorTests {
@@ -74,15 +76,24 @@ public class KafkaBinderActuatorTests {
@Test
public void testKafkaBinderMetricsExposed() {
Search search = this.meterRegistry.find(
String.format("%s.%s.%s.lag", "spring.cloud.stream.binder.kafka", TEST_CONSUMER_GROUP, Sink.INPUT));
assertThat(search.gauge()).isNotNull();
this.kafkaTemplate.send(Sink.INPUT, null, "foo".getBytes());
this.kafkaTemplate.flush();
assertThat(search.gauge().value()).isGreaterThan(0);
assertThat(this.meterRegistry.get("spring.cloud.stream.binder.kafka.offset")
.tag("group", TEST_CONSUMER_GROUP)
.tag("topic", Sink.INPUT)
.timeGauge().value()).isGreaterThan(0);
}
@Test
public void testKafkaBinderMetricsWhenNoMicrometer() {
new ApplicationContextRunner()
.withUserConfiguration(KafkaMetricsTestConfig.class)
.withClassLoader(new FilteredClassLoader("io.micrometer.core"))
.run(context -> {
assertThat(context.getBeanNamesForType(MeterRegistry.class)).isEmpty();
assertThat(context.getBeanNamesForType(MeterBinder.class)).isEmpty();
});
}
@EnableBinding(Sink.class)