Compare commits

...

6 Commits

Author SHA1 Message Date
buildmaster
d8a55b41c7 Update SNAPSHOT to 2.1.1.RELEASE 2019-02-15 15:35:22 +00:00
Soby Chacko
e96174664f Disable checkstyle plugin on 2.1.x branch 2019-02-15 10:17:44 -05:00
buildmaster
fa6bf19ebe Bumping versions 2019-02-14 10:09:25 +00:00
Oleg Zhurakousky
46051cd1a4 upgraded stream dependency to 2.1.1.BUILD-SNAPSHOT 2019-02-14 10:16:41 +01:00
Soby Chacko
1cb8cbbcc0 Fixed merge conflicts 2019-02-14 09:34:18 +01:00
Oleg Zhurakousky
87f91640f4 GH-1601 satellite changes to the core 2019-02-05 07:09:41 +01:00
12 changed files with 157 additions and 47 deletions

View File

@@ -1,4 +1,8 @@
// Do not edit this file (e.g. go instead to src/main/asciidoc)
////
DO NOT EDIT THIS FILE. IT WAS GENERATED.
Manual changes to this file will be lost when it is generated again.
Edit the files in the src/main/asciidoc/ directory instead.
////
:jdkversion: 1.8
:github-tag: master
@@ -338,12 +342,20 @@ Ignored if `replicas-assignments` is present.
+
Default: none (the binder-wide default of 1 is used).
NOTE: The Kafka binder uses the `partitionCount` setting of the producer as a hint to create a topic with the given partition count (in conjunction with the `minPartitionCount`, the maximum of the two being the value being used).
Exercise caution when configuring both `minPartitionCount` for a binder and `partitionCount` for an application, as the larger value is used.
If a topic already exists with a smaller partition count and `autoAddPartitions` is disabled (the default), the binder fails to start.
If a topic already exists with a smaller partition count and `autoAddPartitions` is enabled, new partitions are added.
If a topic already exists with a larger number of partitions than the maximum of (`minPartitionCount` or `partitionCount`), the existing partition count is used.
compression::
Set the `compression.type` producer property.
Supported values are `none`, `gzip`, `snappy` and `lz4`.
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
+
Default: `none`.
==== Usage examples
In this section, we show the use of the preceding properties for specific scenarios.

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.1.RELEASE</version>
</parent>
<packaging>pom</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>

64
pom.xml
View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.1.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
<version>2.1.3.RELEASE</version>
<relativePath />
</parent>
<properties>
@@ -15,7 +15,7 @@
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
<kafka.version>2.0.0</kafka.version>
<spring-cloud-stream.version>2.1.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-stream.version>2.1.1.RELEASE</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>
@@ -142,35 +142,35 @@
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-tools</artifactId>
<version>${spring-cloud-stream.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>checkstyle-validation</id>
<phase>validate</phase>
<configuration>
<configLocation>checkstyle.xml</configLocation>
<headerLocation>checkstyle-header.txt</headerLocation>
<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<!--<plugin>-->
<!--<groupId>org.apache.maven.plugins</groupId>-->
<!--<artifactId>maven-checkstyle-plugin</artifactId>-->
<!--<dependencies>-->
<!--<dependency>-->
<!--<groupId>org.springframework.cloud</groupId>-->
<!--<artifactId>spring-cloud-stream-tools</artifactId>-->
<!--<version>${spring-cloud-stream.version}</version>-->
<!--</dependency>-->
<!--</dependencies>-->
<!--<executions>-->
<!--<execution>-->
<!--<id>checkstyle-validation</id>-->
<!--<phase>validate</phase>-->
<!--<configuration>-->
<!--<configLocation>checkstyle.xml</configLocation>-->
<!--<headerLocation>checkstyle-header.txt</headerLocation>-->
<!--<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>-->
<!--<encoding>UTF-8</encoding>-->
<!--<consoleOutput>true</consoleOutput>-->
<!--<failsOnError>true</failsOnError>-->
<!--<includeTestSourceDirectory>true</includeTestSourceDirectory>-->
<!--</configuration>-->
<!--<goals>-->
<!--<goal>check</goal>-->
<!--</goals>-->
<!--</execution>-->
<!--</executions>-->
<!--</plugin>-->
</plugins>
</build>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binder.kafka.properties;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
@@ -39,6 +41,11 @@ public class KafkaExtendedBindingProperties
return DEFAULTS_PREFIX;
}
@Override
public Map<String, KafkaBindingProperties> getBindings() {
return this.doGetBindings();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return KafkaBindingProperties.class;

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.1.RELEASE</version>
</parent>
<properties>

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.binder.kafka.streams.properties;
import java.util.Map;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
@@ -36,6 +38,11 @@ public class KafkaStreamsExtendedBindingProperties
return DEFAULTS_PREFIX;
}
@Override
public Map<String, KafkaStreamsBindingProperties> getBindings() {
return this.doGetBindings();
}
@Override
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
return KafkaStreamsBindingProperties.class;

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.1.1.BUILD-SNAPSHOT</version>
<version>2.1.1.RELEASE</version>
</parent>
<dependencies>

View File

@@ -38,6 +38,8 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfi
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@@ -127,17 +129,13 @@ public class KafkaBinderConfiguration {
return kafkaJaasLoginModuleInitializer;
}
/**
* A conditional configuration for the {@link KafkaBinderMetrics} bean when the
* {@link MeterRegistry} class is in classpath, as well as a {@link MeterRegistry} bean is
* present in the application context.
*/
@Configuration
@ConditionalOnClass(MeterRegistry.class)
@ConditionalOnBean(MeterRegistry.class)
@ConditionalOnMissingBean(value = KafkaBinderMetrics.class, name = "outerContext")
@ConditionalOnClass(name = "io.micrometer.core.instrument.MeterRegistry")
protected class KafkaBinderMetricsConfiguration {
@Bean
@ConditionalOnBean(MeterRegistry.class)
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties,
@@ -145,7 +143,25 @@ public class KafkaBinderConfiguration {
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, null, meterRegistry);
}
}
@Configuration
@ConditionalOnBean(name = "outerContext")
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
@ConditionalOnClass(name = "io.micrometer.core.instrument.MeterRegistry")
protected class KafkaBinderMetricsConfigurationWithMultiBinder {
@Bean
public MeterBinder kafkaBinderMetrics(
KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties,
ConfigurableApplicationContext context) {
MeterRegistry meterRegistry = context.getBean("outerContext", ApplicationContext.class)
.getBean(MeterRegistry.class);
return new KafkaBinderMetrics(kafkaMessageChannelBinder,
configurationProperties, null, meterRegistry);
}
}
/**
@@ -158,5 +174,4 @@ public class KafkaBinderConfiguration {
private JaasLoginModuleConfiguration zookeeper;
}
}

View File

@@ -2606,7 +2606,6 @@ public class KafkaBinderTests extends
throw new RequeueCurrentMessageException();
});
}
fail("Expected exception");
}
catch (MessageHandlingException e) {
assertThat(e.getCause()).isInstanceOf(RequeueCurrentMessageException.class);

View File

@@ -0,0 +1,70 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.bootstrap;
import io.micrometer.core.instrument.MeterRegistry;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
*/
public class MultiBinderMeterRegistryTest {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
@Test
public void testMetricsWorkWithMultiBinders() {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
SimpleApplication.class).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.bindings.input.destination=foo",
"--spring.cloud.stream.bindings.input.binder=inbound",
"--spring.cloud.stream.bindings.input.group=testGroupabc",
"--spring.cloud.stream.binders.inbound.type=kafka",
"--spring.cloud.stream.binders.inbound.environment"
+ ".spring.cloud.stream.kafka.binder.brokers" + "="
+ embeddedKafka.getBrokersAsString());
final MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class);
assertThat(meterRegistry).isNotNull();
assertThat(meterRegistry.get("spring.cloud.stream.binder.kafka.offset")
.tag("group", "testGroupabc")
.tag("topic", "foo").gauge().value()).isNotNull();
applicationContext.close();
}
@SpringBootApplication
@EnableBinding(Sink.class)
static class SimpleApplication {
}
}