Compare commits
6 Commits
v3.0.0.M2
...
v2.1.1.REL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d8a55b41c7 | ||
|
|
e96174664f | ||
|
|
fa6bf19ebe | ||
|
|
46051cd1a4 | ||
|
|
1cb8cbbcc0 | ||
|
|
87f91640f4 |
14
README.adoc
14
README.adoc
@@ -1,4 +1,8 @@
|
||||
// Do not edit this file (e.g. go instead to src/main/asciidoc)
|
||||
////
|
||||
DO NOT EDIT THIS FILE. IT WAS GENERATED.
|
||||
Manual changes to this file will be lost when it is generated again.
|
||||
Edit the files in the src/main/asciidoc/ directory instead.
|
||||
////
|
||||
|
||||
:jdkversion: 1.8
|
||||
:github-tag: master
|
||||
@@ -338,12 +342,20 @@ Ignored if `replicas-assignments` is present.
|
||||
+
|
||||
Default: none (the binder-wide default of 1 is used).
|
||||
|
||||
|
||||
NOTE: The Kafka binder uses the `partitionCount` setting of the producer as a hint to create a topic with the given partition count (in conjunction with the `minPartitionCount`, the maximum of the two being the value being used).
|
||||
Exercise caution when configuring both `minPartitionCount` for a binder and `partitionCount` for an application, as the larger value is used.
|
||||
If a topic already exists with a smaller partition count and `autoAddPartitions` is disabled (the default), the binder fails to start.
|
||||
If a topic already exists with a smaller partition count and `autoAddPartitions` is enabled, new partitions are added.
|
||||
If a topic already exists with a larger number of partitions than the maximum of (`minPartitionCount` or `partitionCount`), the existing partition count is used.
|
||||
|
||||
compression::
|
||||
Set the `compression.type` producer property.
|
||||
Supported values are `none`, `gzip`, `snappy` and `lz4`.
|
||||
If you override the `kafka-clients` jar to 2.1.0 (or later), as discussed in the https://docs.spring.io/spring-kafka/docs/2.2.x/reference/html/deps-for-21x.html[Spring for Apache Kafka documentation], and wish to use `zstd` compression, use `spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd`.
|
||||
+
|
||||
Default: `none`.
|
||||
|
||||
==== Usage examples
|
||||
|
||||
In this section, we show the use of the preceding properties for specific scenarios.
|
||||
|
||||
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.1.RELEASE</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
64
pom.xml
64
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.1.RELEASE</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.1.0.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.3.RELEASE</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
@@ -15,7 +15,7 @@
|
||||
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>2.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>2.1.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>2.1.1.RELEASE</spring-cloud-stream.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
@@ -142,35 +142,35 @@
|
||||
</plugins>
|
||||
</pluginManagement>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-tools</artifactId>
|
||||
<version>${spring-cloud-stream.version}</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>checkstyle-validation</id>
|
||||
<phase>validate</phase>
|
||||
<configuration>
|
||||
<configLocation>checkstyle.xml</configLocation>
|
||||
<headerLocation>checkstyle-header.txt</headerLocation>
|
||||
<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>
|
||||
<encoding>UTF-8</encoding>
|
||||
<consoleOutput>true</consoleOutput>
|
||||
<failsOnError>true</failsOnError>
|
||||
<includeTestSourceDirectory>true</includeTestSourceDirectory>
|
||||
</configuration>
|
||||
<goals>
|
||||
<goal>check</goal>
|
||||
</goals>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<!--<plugin>-->
|
||||
<!--<groupId>org.apache.maven.plugins</groupId>-->
|
||||
<!--<artifactId>maven-checkstyle-plugin</artifactId>-->
|
||||
<!--<dependencies>-->
|
||||
<!--<dependency>-->
|
||||
<!--<groupId>org.springframework.cloud</groupId>-->
|
||||
<!--<artifactId>spring-cloud-stream-tools</artifactId>-->
|
||||
<!--<version>${spring-cloud-stream.version}</version>-->
|
||||
<!--</dependency>-->
|
||||
<!--</dependencies>-->
|
||||
<!--<executions>-->
|
||||
<!--<execution>-->
|
||||
<!--<id>checkstyle-validation</id>-->
|
||||
<!--<phase>validate</phase>-->
|
||||
<!--<configuration>-->
|
||||
<!--<configLocation>checkstyle.xml</configLocation>-->
|
||||
<!--<headerLocation>checkstyle-header.txt</headerLocation>-->
|
||||
<!--<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>-->
|
||||
<!--<encoding>UTF-8</encoding>-->
|
||||
<!--<consoleOutput>true</consoleOutput>-->
|
||||
<!--<failsOnError>true</failsOnError>-->
|
||||
<!--<includeTestSourceDirectory>true</includeTestSourceDirectory>-->
|
||||
<!--</configuration>-->
|
||||
<!--<goals>-->
|
||||
<!--<goal>check</goal>-->
|
||||
<!--</goals>-->
|
||||
<!--</execution>-->
|
||||
<!--</executions>-->
|
||||
<!--</plugin>-->
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.1.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.1.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.properties;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
@@ -39,6 +41,11 @@ public class KafkaExtendedBindingProperties
|
||||
return DEFAULTS_PREFIX;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, KafkaBindingProperties> getBindings() {
|
||||
return this.doGetBindings();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
|
||||
return KafkaBindingProperties.class;
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.1.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -16,6 +16,8 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.streams.properties;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
|
||||
@@ -36,6 +38,11 @@ public class KafkaStreamsExtendedBindingProperties
|
||||
return DEFAULTS_PREFIX;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, KafkaStreamsBindingProperties> getBindings() {
|
||||
return this.doGetBindings();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
|
||||
return KafkaStreamsBindingProperties.class;
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.1.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.1.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -38,6 +38,8 @@ import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfi
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
@@ -127,17 +129,13 @@ public class KafkaBinderConfiguration {
|
||||
return kafkaJaasLoginModuleInitializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* A conditional configuration for the {@link KafkaBinderMetrics} bean when the
|
||||
* {@link MeterRegistry} class is in classpath, as well as a {@link MeterRegistry} bean is
|
||||
* present in the application context.
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnClass(MeterRegistry.class)
|
||||
@ConditionalOnBean(MeterRegistry.class)
|
||||
@ConditionalOnMissingBean(value = KafkaBinderMetrics.class, name = "outerContext")
|
||||
@ConditionalOnClass(name = "io.micrometer.core.instrument.MeterRegistry")
|
||||
protected class KafkaBinderMetricsConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnBean(MeterRegistry.class)
|
||||
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
|
||||
public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
@@ -145,7 +143,25 @@ public class KafkaBinderConfiguration {
|
||||
|
||||
return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties, null, meterRegistry);
|
||||
}
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnBean(name = "outerContext")
|
||||
@ConditionalOnMissingBean(KafkaBinderMetrics.class)
|
||||
@ConditionalOnClass(name = "io.micrometer.core.instrument.MeterRegistry")
|
||||
protected class KafkaBinderMetricsConfigurationWithMultiBinder {
|
||||
|
||||
@Bean
|
||||
public MeterBinder kafkaBinderMetrics(
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder,
|
||||
KafkaBinderConfigurationProperties configurationProperties,
|
||||
ConfigurableApplicationContext context) {
|
||||
|
||||
MeterRegistry meterRegistry = context.getBean("outerContext", ApplicationContext.class)
|
||||
.getBean(MeterRegistry.class);
|
||||
return new KafkaBinderMetrics(kafkaMessageChannelBinder,
|
||||
configurationProperties, null, meterRegistry);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -158,5 +174,4 @@ public class KafkaBinderConfiguration {
|
||||
|
||||
private JaasLoginModuleConfiguration zookeeper;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -2606,7 +2606,6 @@ public class KafkaBinderTests extends
|
||||
throw new RequeueCurrentMessageException();
|
||||
});
|
||||
}
|
||||
fail("Expected exception");
|
||||
}
|
||||
catch (MessageHandlingException e) {
|
||||
assertThat(e.getCause()).isInstanceOf(RequeueCurrentMessageException.class);
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.bootstrap;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.messaging.Sink;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class MultiBinderMeterRegistryTest {
|
||||
|
||||
@ClassRule
|
||||
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10);
|
||||
|
||||
@Test
|
||||
public void testMetricsWorkWithMultiBinders() {
|
||||
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
|
||||
SimpleApplication.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.cloud.stream.bindings.input.destination=foo",
|
||||
"--spring.cloud.stream.bindings.input.binder=inbound",
|
||||
"--spring.cloud.stream.bindings.input.group=testGroupabc",
|
||||
"--spring.cloud.stream.binders.inbound.type=kafka",
|
||||
"--spring.cloud.stream.binders.inbound.environment"
|
||||
+ ".spring.cloud.stream.kafka.binder.brokers" + "="
|
||||
+ embeddedKafka.getBrokersAsString());
|
||||
|
||||
final MeterRegistry meterRegistry = applicationContext.getBean(MeterRegistry.class);
|
||||
|
||||
assertThat(meterRegistry).isNotNull();
|
||||
|
||||
assertThat(meterRegistry.get("spring.cloud.stream.binder.kafka.offset")
|
||||
.tag("group", "testGroupabc")
|
||||
.tag("topic", "foo").gauge().value()).isNotNull();
|
||||
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Sink.class)
|
||||
static class SimpleApplication {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user