Compare commits

...

13 Commits

Author SHA1 Message Date
Marius Bogoevici
e3687cc5ab Release 1.1.2.RELEASE 2017-03-06 19:38:38 -05:00
Marius Bogoevici
d9c40d823a Use RELEASE version for Stream and Cloud Dependencies 2017-03-06 19:35:14 -05:00
Barry Commins
ca3b89b405 Configure the health indicator ConsumerFactory to use binder properties
Fixes #79

Replaced try...finally in KafkaBinderHealthIndicator with try-with-resources

Changed KafkaBinderHealthIndicatorTest for consistency
2017-03-06 19:20:58 -05:00
Marius Bogoevici
ef220a551a Fix 'spring-cloud-stream-binder-kafka-test-support'
Set scope compile on 'spring-kafka-test' so that the
dependency is transitive.
2017-02-21 19:13:36 -05:00
Ilayaperumal Gopinathan
585448dbad Add doc for startOffset usage
- Clarify based on the consumerGroup property

Resolves #48
2017-02-21 13:13:22 -05:00
Marius Bogoevici
4093a24d3e Reinstate spring-cloud-stream-binder-kafka-test-support
Fix #102

We've removed it in favour of using `spring-kafka-test`
directly but that is somewhat inconvenient to the end
user. Also, it's still part of the release train BOM
so it's an oversight on our end.
2017-02-20 23:22:44 +05:30
Soby Chacko
7d4323cb2e Ignore 'TopicExistsException' on creation
Fixes a race condtion when topics are created in a stream
and a TopicExistsException is thrown.

Fixes #83

Explicitly checking for TopicExistsException
Cleanup
2017-01-30 18:51:42 -05:00
Marius Bogoevici
76a53049c5 Setting version to 1.1.2.BUILD-SNAPSHOT 2017-01-11 22:40:57 -05:00
Marius Bogoevici
b29498617d Release 1.1.1.RELEASE
Signed-off-by: Marius Bogoevici <mbogoevici@pivotal.io>
2017-01-11 22:32:42 -05:00
Marius Bogoevici
547ea77b2f Update mvnw 2017-01-11 22:13:52 -05:00
Marius Bogoevici
699dc06bc5 Observe bufferSize setting
Fixes #77

Signed-off-by: Marius Bogoevici <mbogoevici@pivotal.io>
2017-01-11 22:13:29 -05:00
Marius Bogoevici
e822e59859 Remove package-info.java 2017-01-11 22:07:38 -05:00
Marius Bogoevici
cc630f2be0 Remove explicit version references for Stream and Kafka binder artifacts
Signed-off-by: Marius Bogoevici <mbogoevici@pivotal.io>
2017-01-04 22:33:38 -05:00
13 changed files with 184 additions and 65 deletions

25
mvnw vendored
View File

@@ -57,27 +57,27 @@ case "`uname`" in
#
# Look for the Apple JDKs first to preserve the existing behaviour, and then look
# for the new JDKs provided by Oracle.
#
#
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK ] ; then
#
# Apple JDKs
#
export JAVA_HOME=/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home
fi
if [ -z "$JAVA_HOME" ] && [ -L /System/Library/Java/JavaVirtualMachines/CurrentJDK ] ; then
#
# Apple JDKs
#
export JAVA_HOME=/System/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
fi
if [ -z "$JAVA_HOME" ] && [ -L "/Library/Java/JavaVirtualMachines/CurrentJDK" ] ; then
#
# Oracle JDKs
#
export JAVA_HOME=/Library/Java/JavaVirtualMachines/CurrentJDK/Contents/Home
fi
fi
if [ -z "$JAVA_HOME" ] && [ -x "/usr/libexec/java_home" ]; then
#
@@ -219,16 +219,27 @@ concat_lines() {
export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-$(find_maven_basedir)}
MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS"
# Provide a "standardized" way to retrieve the CLI args that will
# Provide a "standardized" way to retrieve the CLI args that will
# work with both Windows and non-Windows executions.
MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@"
export MAVEN_CMD_LINE_ARGS
WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain
echo "Running version check"
VERSION=$( sed '\!<parent!,\!</parent!d' `dirname $0`/pom.xml | grep '<version' | head -1 | sed -e 's/.*<version>//' -e 's!</version>.*$!!' )
echo "The found version is [${VERSION}]"
if echo $VERSION | egrep -q 'M|RC'; then
echo Activating \"milestone\" profile for version=\"$VERSION\"
echo $MAVEN_ARGS | grep -q milestone || MAVEN_ARGS="$MAVEN_ARGS -Pmilestone"
else
echo Deactivating \"milestone\" profile for version=\"$VERSION\"
echo $MAVEN_ARGS | grep -q milestone && MAVEN_ARGS=$(echo $MAVEN_ARGS | sed -e 's/-Pmilestone//')
fi
exec "$JAVACMD" \
$MAVEN_OPTS \
-classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \
"-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \
${WRAPPER_LAUNCHER} "$@"
${WRAPPER_LAUNCHER} ${MAVEN_ARGS} "$@"

10
pom.xml
View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<version>1.1.2.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>1.2.2.BUILD-SNAPSHOT</version>
<version>1.2.2.RELEASE</version>
<relativePath />
</parent>
<properties>
@@ -15,15 +15,15 @@
<kafka.version>0.9.0.1</kafka.version>
<spring-kafka.version>1.0.5.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>2.0.1.RELEASE</spring-integration-kafka.version>
<spring-cloud-stream.version>1.1.1.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-stream.version>1.1.2.RELEASE</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>
<module>spring-cloud-starter-stream-kafka</module>
<module>spring-cloud-stream-binder-kafka-docs</module>
<module>spring-cloud-stream-binder-kafka-0.10-test</module>
<module>spring-cloud-stream-binder-kafka-test-support</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
@@ -131,7 +131,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build-tools</artifactId>
<version>1.2.0.RELEASE</version>
<version>1.2.2.RELEASE</version>
</dependency>
</dependencies>
<executions>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<version>1.1.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>
@@ -20,7 +20,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<version>1.1.2.RELEASE</version>
</dependency>
</dependencies>
</project>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<version>1.1.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-0.10-test</artifactId>
<description>Spring Cloud Stream Kafka Binder 0.10 Tests</description>
@@ -28,7 +28,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -64,7 +64,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
@@ -76,7 +76,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<version>${spring-cloud-stream.version}</version>
<scope>test</scope>
</dependency>
<dependency>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<version>1.1.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
@@ -18,7 +18,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
</dependencies>
<profiles>

View File

@@ -153,6 +153,7 @@ Default: `false`.
startOffset::
The starting offset for new groups, or when `resetOffsets` is `true`.
Allowed values: `earliest`, `latest`.
If the consumer group is set explicitly for the consumer 'binding' (via `spring.cloud.stream.bindings.<channelName>.group`), then 'startOffset' is set to `earliest`; otherwise it is set to `latest` for the `anonymous` consumer group.
+
Default: null (equivalent to `earliest`).
enableDlq::

View File

@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-test-support</artifactId>
<description>Kafka related test classes</description>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${spring-kafka.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>1.1.1.BUILD-SNAPSHOT</version>
<version>1.1.2.RELEASE</version>
</parent>
<dependencies>

View File

@@ -16,20 +16,16 @@
package org.springframework.cloud.stream.binder.kafka;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfigurationProperties;
import org.springframework.kafka.core.ConsumerFactory;
/**
* Health indicator for Kafka.
@@ -41,24 +37,18 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
private final KafkaMessageChannelBinder binder;
private final KafkaBinderConfigurationProperties configurationProperties;
private final ConsumerFactory<?, ?> consumerFactory;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties configurationProperties) {
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
this.configurationProperties = configurationProperties;
this.consumerFactory = consumerFactory;
}
@Override
public Health health() {
Map<String, String> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties
.getKafkaConnectionString());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
KafkaConsumer metadataConsumer = new KafkaConsumer(properties);
try {
try (Consumer<?, ?> metadataConsumer = consumerFactory.createConsumer()) {
Set<String> downMessages = new HashSet<>();
for (String topic : this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
@@ -78,8 +68,5 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
catch (Exception e) {
return Health.down(e).build();
}
finally {
metadataConsumer.close();
}
}
}

View File

@@ -247,8 +247,7 @@ public class KafkaMessageChannelBinder extends
}
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
@@ -471,8 +470,22 @@ public class KafkaMessageChannelBinder extends
@Override
public Object doWithRetry(RetryContext context) throws RuntimeException {
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
try {
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
}
catch (Exception e) {
String exceptionClass = e.getClass().getName();
if (exceptionClass.equals("kafka.common.TopicExistsException") ||
exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")){
if (logger.isWarnEnabled()) {
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
}
}
else {
throw e;
}
}
return null;
}
});

View File

@@ -17,9 +17,13 @@
package org.springframework.cloud.stream.binder.kafka.config;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.springframework.beans.factory.annotation.Autowired;
@@ -46,8 +50,11 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.type.AnnotatedTypeMetadata;
import org.springframework.integration.codec.Codec;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.util.ObjectUtils;
/**
* @author David Turanski
@@ -101,7 +108,15 @@ public class KafkaBinderConfiguration {
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder) {
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, this.configurationProperties);
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
return new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
}
@Bean(name = "adminUtilsOperation")

View File

@@ -1,20 +0,0 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This package contains an implementation of the {@link org.springframework.cloud.stream.binder.Binder} for Kafka.
*/
package org.springframework.cloud.stream.binder.kafka;

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
/**
* @author Barry Commins
*/
public class KafkaBinderHealthIndicatorTest {
private static final String TEST_TOPIC = "test";
private KafkaBinderHealthIndicator indicator;
@Mock
private DefaultKafkaConsumerFactory consumerFactory;
@Mock
private KafkaConsumer consumer;
@Mock
private KafkaMessageChannelBinder binder;
private Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
given(consumerFactory.createConsumer()).willReturn(consumer);
given(binder.getTopicsInUse()).willReturn(topicsInUse);
indicator = new KafkaBinderHealthIndicator(binder, consumerFactory);
}
@Test
public void kafkaBinderIsUp() {
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, partitions);
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
}
@Test
public void kafkaBinderIsDown() {
final List<PartitionInfo> partitions = partitions(new Node(-1, null, 0));
topicsInUse.put(TEST_TOPIC, partitions);
given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions);
Health health = indicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
}
private List<PartitionInfo> partitions(Node leader) {
List<PartitionInfo> partitions = new ArrayList<>();
partitions.add(new PartitionInfo(TEST_TOPIC, 0, leader, null, null));
return partitions;
}
}