Compare commits

...

15 Commits

Author SHA1 Message Date
Soby Chacko
32adf574ac Fix concurrency issues with Kafka Streams binder
See this commit on the master branch for details:
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/1cc50c1a
2020-02-13 15:02:36 -05:00
Adriano Scheffer
fc1e4602fb Always call value serde configure method
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/836

Remove redundant call to serde configure

Closes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/836
2020-02-13 14:47:13 -05:00
Soby Chacko
0ea0abb5b9 Kafka Streams binder health indicator issues
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/842

When a health indicator is run against a topic with multiple partitions,
the Kafka Streams binder overwrites the information. Addressing this issue.
2020-02-13 14:41:06 -05:00
Soby Chacko
11bdcd5884 Topic properties not applied on kstream binding
On KStream producer binding, topic properties are not applied
by the provisioner. This is because the extended properties object
that is passed to producer binding is erroneously overwritten by a
default instance. Addressing this issue.

Resolves #684
2019-09-06 16:31:01 -04:00
Walliee
dd201ef60b Add hook to specify sendTimeoutExpression
resolves #724
2019-09-03 10:28:26 -04:00
buildmaster
8aa18c73f0 Bumping versions to 2.2.2.BUILD-SNAPSHOT after release 2019-08-28 06:43:49 +00:00
buildmaster
84a58bc202 Going back to snapshots 2019-08-28 06:43:49 +00:00
buildmaster
d6ca68e20d Update SNAPSHOT to 2.2.1.RELEASE 2019-08-28 06:40:48 +00:00
Gary Russell
665f087956 GH-689: Support producer-only transactions
Resolves #689
2019-08-26 09:43:12 -04:00
Soby Chacko
4a4d42da41 Backport
Topic provisioning - Kafka streams table types

* Topic provisioning for consumers ignores topic.properties settings if binding type is KTable or GlobalKTable
* Modify tests to verify the behavior during KTable/GlobalKTable bindings
* Polishing

Resolves #687
2019-07-05 17:36:52 -04:00
Gary Russell
fba5dbb22f GH-677: Fix resetOffsets with concurrency
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/677

The logic for resetting offsets only on the initial assignment used a simple
boolean; this is insufficient when concurrency is > 1.

Use a concurrent set instead to determine whether or not a particular topic/partition
has been sought.

Also, change the `initial` argument on `KafkaBindingRebalanceListener.onPartitionsAssigned()`
to be derived from a `ThreadLocal` and add javadocs about retaining the state by partition.

**backport to all supported versions** (Except `KafkaBindingRebalanceListener` which did
not exist before 2.1.x)

polishing

polishing
2019-06-18 16:39:28 -04:00
Oleg Zhurakousky
6d14276d10 Added author tag 2019-06-13 20:21:15 +02:00
Vladislav Fefelov
74783cfa59 Use single Executor Service in Kafka Binder health indicator
Resolves #665
2019-06-13 20:21:04 +02:00
Oleg Zhurakousky
8fb0516d96 Prepared docs for 2.2.1.RELEASE 2019-06-06 16:19:07 +02:00
iguissouma
683e3f96ba Use try with resources when creating AdminClient
Use try with resources when creating AdminClient to release all associated resources.
Fixes gh-660
2019-06-04 09:39:00 -04:00
25 changed files with 505 additions and 55 deletions

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.1.BUILD-SNAPSHOT</version>
<version>2.2.2.BUILD-SNAPSHOT</version>
</parent>
<packaging>pom</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>
@@ -124,10 +124,10 @@
<sourceDocumentName>${docs.main}.adoc</sourceDocumentName>
<attributes>
<spring-cloud-stream-version>${project.version}</spring-cloud-stream-version>
<docs-url>https://cloud.spring.io/</docs-url>
<docs-version></docs-version>
<!-- <docs-version>${project.version}/</docs-version> -->
<!-- <docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url> -->
<!-- <docs-url>https://cloud.spring.io/</docs-url> -->
<!-- <docs-version></docs-version> -->
<docs-version>${project.version}/</docs-version>
<docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url>
</attributes>
</configuration>
<executions>

View File

@@ -290,6 +290,13 @@ sync::
Whether the producer is synchronous.
+
Default: `false`.
sendTimeoutExpression::
A SpEL expression evaluated against the outgoing message used to evaluate the time to wait for ack when synchronous publish is enabled -- for example, `headers['mySendTimeout']`.
The value of the timeout is in milliseconds.
With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of a `byte[]`.
Now, the expression is evaluated before the payload is converted.
+
Default: `none`.
batchTimeout::
How long the producer waits to allow more messages to accumulate in the same batch before sending the messages.
(Normally, the producer does not wait at all and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
@@ -502,6 +509,50 @@ public class Application {
}
----
[[kafka-transactional-binder]]
=== Transactional Binder
Enable transactions by setting `spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix` to a non-empty value, e.g. `tx-`.
When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction.
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
A common producer factory is used for all producer bindings configure using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
====
[source, java]
----
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
}
----
====
Notice that we get a reference to the binder using the `BinderFactory`; use `null` in the first argument when there is only one binder configured.
If more than one binder is configured, use the binder name to get the reference.
Once we have a reference to the binder, we can obtain a reference to the `ProducerFactory` and create a transaction manager.
Then you would just normal Spring transaction support, e.g. `TransactionTemplate` or `@Transactional`, for example:
====
[source, java]
----
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
----
====
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
[[kafka-error-channels]]
=== Error Channels

View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.1.BUILD-SNAPSHOT</version>
<version>2.2.2.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.1.4.BUILD-SNAPSHOT</version>
<version>2.1.6.RELEASE</version>
<relativePath />
</parent>
<properties>
@@ -15,7 +15,7 @@
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
<kafka.version>2.0.0</kafka.version>
<spring-cloud-stream.version>2.2.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
<spring-cloud-stream.version>2.2.2.BUILD-SNAPSHOT</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.1.BUILD-SNAPSHOT</version>
<version>2.2.2.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.1.BUILD-SNAPSHOT</version>
<version>2.2.2.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -40,6 +40,8 @@ public class KafkaProducerProperties {
private boolean sync;
private Expression sendTimeoutExpression;
private int batchTimeout;
private Expression messageKeyExpression;
@@ -75,6 +77,14 @@ public class KafkaProducerProperties {
this.sync = sync;
}
public Expression getSendTimeoutExpression() {
return this.sendTimeoutExpression;
}
public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
this.sendTimeoutExpression = sendTimeoutExpression;
}
public int getBatchTimeout() {
return this.batchTimeout;
}

View File

@@ -466,11 +466,11 @@ public class KafkaTopicProvisioner implements
// In some cases, the above partition query may not throw an UnknownTopic..Exception for various reasons.
// For that, we are forcing another query to ensure that the topic is present on the server.
if (CollectionUtils.isEmpty(partitions)) {
final AdminClient adminClient = AdminClient
.create(this.adminClientProperties);
final DescribeTopicsResult describeTopicsResult = adminClient
try (AdminClient adminClient = AdminClient
.create(this.adminClientProperties)) {
final DescribeTopicsResult describeTopicsResult = adminClient
.describeTopics(Collections.singletonList(topicName));
try {
describeTopicsResult.all().get();
}
catch (ExecutionException ex) {

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.1.BUILD-SNAPSHOT</version>
<version>2.2.2.BUILD-SNAPSHOT</version>
</parent>
<properties>

View File

@@ -118,4 +118,8 @@ public class GlobalKTableBinder extends
.getExtendedPropertiesEntryClass();
}
public void setKafkaStreamsExtendedBindingProperties(
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
}
}

View File

@@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -67,10 +68,14 @@ public class GlobalKTableBinderConfiguration {
@Bean
public GlobalKTableBinder GlobalKTableBinder(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
@Qualifier("kafkaStreamsDlqDispatchers") Map<String, KafkaStreamsDlqDispatch> kafkaStreamsDlqDispatchers) {
return new GlobalKTableBinder(binderConfigurationProperties,
GlobalKTableBinder globalKTableBinder = new GlobalKTableBinder(binderConfigurationProperties,
kafkaTopicProvisioner, kafkaStreamsDlqDispatchers);
globalKTableBinder.setKafkaStreamsExtendedBindingProperties(
kafkaStreamsExtendedBindingProperties);
return globalKTableBinder;
}
}

View File

@@ -116,7 +116,7 @@ class KStreamBinder extends
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
// @checkstyle:on
ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(
new KafkaProducerProperties());
properties.getExtension());
this.kafkaTopicProvisioner.provisionProducerDestination(name,
extendedProducerProperties);
Serde<?> keySerde = this.keyValueSerdeResolver

View File

@@ -122,4 +122,9 @@ class KTableBinder extends
.getExtendedPropertiesEntryClass();
}
public void setKafkaStreamsExtendedBindingProperties(
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
}
}

View File

@@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -68,10 +69,12 @@ public class KTableBinderConfiguration {
public KTableBinder kTableBinder(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
@Qualifier("kafkaStreamsDlqDispatchers") Map<String, KafkaStreamsDlqDispatch> kafkaStreamsDlqDispatchers) {
KTableBinder kStreamBinder = new KTableBinder(binderConfigurationProperties,
KTableBinder kTableBinder = new KTableBinder(binderConfigurationProperties,
kafkaTopicProvisioner, kafkaStreamsDlqDispatchers);
return kStreamBinder;
kTableBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties);
return kTableBinder;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@@ -70,12 +71,22 @@ class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator {
final Map<String, Object> details = new HashMap<>();
for (TaskMetadata metadata : taskMetadata) {
details.put("taskId", metadata.taskId());
details.put("partitions",
metadata.topicPartitions().stream().map(
p -> "partition=" + p.partition() + ", topic=" + p.topic())
.collect(Collectors.toList()));
if (details.containsKey("partitions")) {
@SuppressWarnings("unchecked")
List<String> partitionsInfo = (List<String>) details.get("partitions");
partitionsInfo.addAll(addPartitionsInfo(metadata));
}
else {
details.put("partitions",
addPartitionsInfo(metadata));
}
}
return details;
}
private static List<String> addPartitionsInfo(TaskMetadata metadata) {
return metadata.topicPartitions().stream().map(
p -> "partition=" + p.partition() + ", topic=" + p.topic())
.collect(Collectors.toList());
}
}

View File

@@ -466,8 +466,8 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
}
int concurrency = this.bindingServiceProperties.getConsumerProperties(inboundName).getConcurrency();
// override concurrency if set at the individual binding level.
if (concurrency > 1) {
// override concurrency (set potentially on the binding).
if (concurrency >= 1) {
streamConfigGlobalProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, concurrency);
}

View File

@@ -539,8 +539,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator
int concurrency = this.bindingServiceProperties.getConsumerProperties(inboundName)
.getConcurrency();
// override concurrency if set at the individual binding level.
if (concurrency > 1) {
// override concurrency (set potentially on the binding).
if (concurrency >= 1) {
streamConfigGlobalProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
concurrency);
}

View File

@@ -200,7 +200,7 @@ public class KeyValueSerdeResolver {
Serde.class)
: Serdes.ByteArray();
}
valueSerde.configure(this.streamConfigGlobalProperties, false);
return valueSerde;
}
}

View File

@@ -41,8 +41,14 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -75,7 +81,8 @@ public class StreamToGlobalKTableJoinIntegrationTests {
SpringApplication app = new SpringApplication(
StreamToGlobalKTableJoinIntegrationTests.OrderEnricherApplication.class);
app.setWebApplicationType(WebApplicationType.NONE);
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=orders",
"--spring.cloud.stream.bindings.input-x.destination=customers",
@@ -112,10 +119,44 @@ public class StreamToGlobalKTableJoinIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
+ "=StreamToGlobalKTableJoinIntegrationTests-abc",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.topic.properties.cleanup.policy=compact",
"--spring.cloud.stream.kafka.streams.bindings.input-x.consumer.topic.properties.cleanup.policy=compact",
"--spring.cloud.stream.kafka.streams.bindings.input-y.consumer.topic.properties.cleanup.policy=compact",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
+ embeddedKafka.getZookeeperConnectionString())) {
+ embeddedKafka.getZookeeperConnectionString());
try {
// Testing certain ancillary configuration of GlobalKTable around topics creation.
// See this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/687
BinderFactory binderFactory = context.getBeanFactory()
.getBean(BinderFactory.class);
Binder<KStream, ? extends ConsumerProperties, ? extends ProducerProperties> kStreamBinder = binderFactory
.getBinder("kstream", KStream.class);
KafkaStreamsConsumerProperties input = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) kStreamBinder)
.getExtendedConsumerProperties("input");
String cleanupPolicy = input.getTopic().getProperties().get("cleanup.policy");
assertThat(cleanupPolicy).isEqualTo("compact");
Binder<GlobalKTable, ? extends ConsumerProperties, ? extends ProducerProperties> globalKTableBinder = binderFactory
.getBinder("globalktable", GlobalKTable.class);
KafkaStreamsConsumerProperties inputX = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) globalKTableBinder)
.getExtendedConsumerProperties("input-x");
String cleanupPolicyX = inputX.getTopic().getProperties().get("cleanup.policy");
assertThat(cleanupPolicyX).isEqualTo("compact");
KafkaStreamsConsumerProperties inputY = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) globalKTableBinder)
.getExtendedConsumerProperties("input-y");
String cleanupPolicyY = inputY.getTopic().getProperties().get("cleanup.policy");
assertThat(cleanupPolicyY).isEqualTo("compact");
Map<String, Object> senderPropsCustomer = KafkaTestUtils
.producerProps(embeddedKafka);
senderPropsCustomer.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
@@ -225,7 +266,10 @@ public class StreamToGlobalKTableJoinIntegrationTests {
pfCustomer.destroy();
pfProduct.destroy();
pfOrder.destroy();
}
finally {
consumer.close();
context.close();
}
}

View File

@@ -46,8 +46,14 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
@@ -90,7 +96,7 @@ public class StreamToTableJoinIntegrationTests {
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "output-topic-1");
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
ConfigurableApplicationContext context = app.run("--server.port=0",
"--spring.jmx.enabled=false",
"--spring.cloud.stream.bindings.input.destination=user-clicks-1",
"--spring.cloud.stream.bindings.input-x.destination=user-regions-1",
@@ -117,10 +123,25 @@ public class StreamToTableJoinIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
+ "=StreamToTableJoinIntegrationTests-abc",
"--spring.cloud.stream.kafka.streams.bindings.input-x.consumer.topic.properties.cleanup.policy=compact",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
+ embeddedKafka.getZookeeperConnectionString())) {
+ embeddedKafka.getZookeeperConnectionString());
try {
// Testing certain ancillary configuration of GlobalKTable around topics creation.
// See this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/687
BinderFactory binderFactory = context.getBeanFactory()
.getBean(BinderFactory.class);
Binder<KTable, ? extends ConsumerProperties, ? extends ProducerProperties> ktableBinder = binderFactory
.getBinder("ktable", KTable.class);
KafkaStreamsConsumerProperties inputX = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) ktableBinder)
.getExtendedConsumerProperties("input-x");
String cleanupPolicyX = inputX.getTopic().getProperties().get("cleanup.policy");
assertThat(cleanupPolicyX).isEqualTo("compact");
// Input 1: Region per user (multiple records allowed per user).
List<KeyValue<String, String>> userRegions = Arrays.asList(new KeyValue<>(
@@ -193,6 +214,7 @@ public class StreamToTableJoinIntegrationTests {
}
finally {
consumer.close();
context.close();
}
}

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.2.1.BUILD-SNAPSHOT</version>
<version>2.2.2.BUILD-SNAPSHOT</version>
</parent>
<dependencies>

View File

@@ -30,9 +30,11 @@ import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.PartitionInfo;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
/**
* Health indicator for Kafka.
@@ -43,11 +45,15 @@ import org.springframework.kafka.core.ConsumerFactory;
* @author Gary Russell
* @author Laur Aliste
* @author Soby Chacko
* @author Vladislav Fefelov
*/
public class KafkaBinderHealthIndicator implements HealthIndicator {
public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBean {
private static final int DEFAULT_TIMEOUT = 60;
private final ExecutorService executor = Executors.newSingleThreadExecutor(
new CustomizableThreadFactory("kafka-binder-health-"));
private final KafkaMessageChannelBinder binder;
private final ConsumerFactory<?, ?> consumerFactory;
@@ -72,8 +78,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
@Override
public Health health() {
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Health> future = exec.submit(this::buildHealthStatus);
Future<Health> future = executor.submit(this::buildHealthStatus);
try {
return future.get(this.timeout, TimeUnit.SECONDS);
}
@@ -91,9 +96,6 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
return Health.down().withDetail("Failed to retrieve partition information in",
this.timeout + " seconds").build();
}
finally {
exec.shutdownNow();
}
}
private Health buildHealthStatus() {
@@ -146,4 +148,9 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
}
}
@Override
public void destroy() throws Exception {
executor.shutdown();
}
}

View File

@@ -55,11 +55,16 @@ public interface KafkaBindingRebalanceListener {
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment.
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {

View File

@@ -28,9 +28,9 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
@@ -104,6 +104,7 @@ import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
@@ -284,6 +285,17 @@ public class KafkaMessageChannelBinder extends
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
}
/**
* Return a reference to the binder's transaction manager's producer factory (if
* configured). Use this to create a transaction manager in a bean definition when you
* wish to use producer-only transactions.
* @return the transaction manager, or null.
*/
@Nullable
public ProducerFactory<byte[], byte[]> getTransactionalProducerFactory() {
return this.transactionManager == null ? null : this.transactionManager.getProducerFactory();
}
@Override
protected MessageHandler createProducerMessageHandler(
final ProducerDestination destination,
@@ -581,6 +593,7 @@ public class KafkaMessageChannelBinder extends
public void setupRebalanceListener(
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
final ContainerProperties containerProperties) {
Assert.isTrue(!extendedConsumerProperties.getExtension().isResetOffsets(),
"'resetOffsets' cannot be set when a KafkaBindingRebalanceListener is provided");
final String bindingName = bindingNameHolder.get();
@@ -590,7 +603,7 @@ public class KafkaMessageChannelBinder extends
containerProperties
.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
private boolean initial = true;
private final ThreadLocal<Boolean> initialAssignment = new ThreadLocal<>();
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
@@ -612,11 +625,15 @@ public class KafkaMessageChannelBinder extends
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
try {
Boolean initial = this.initialAssignment.get();
if (initial == null) {
initial = Boolean.TRUE;
}
userRebalanceListener.onPartitionsAssigned(bindingName,
consumer, partitions, this.initial);
consumer, partitions, initial);
}
finally {
this.initial = false;
this.initialAssignment.set(Boolean.FALSE);
}
}
@@ -663,20 +680,22 @@ public class KafkaMessageChannelBinder extends
boolean resetOffsets = extendedConsumerProperties.getExtension().isResetOffsets();
final Object resetTo = consumerFactory.getConfigurationProperties()
.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
final AtomicBoolean initialAssignment = new AtomicBoolean(true);
if (!"earliest".equals(resetTo) && !"latest".equals(resetTo)) {
logger.warn("no (or unknown) " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
+ " property cannot reset");
resetOffsets = false;
}
if (groupManagement && resetOffsets) {
containerProperties
.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
Set<TopicPartition> sought = ConcurrentHashMap.newKeySet();
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(
Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
// no op
if (logger.isInfoEnabled()) {
logger.info("Partitions revoked: " + tps);
}
}
@Override
@@ -686,14 +705,23 @@ public class KafkaMessageChannelBinder extends
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<TopicPartition> tps) {
if (initialAssignment.getAndSet(false)) {
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
if (logger.isInfoEnabled()) {
logger.info("Partitions assigned: " + tps);
}
List<TopicPartition> toSeek = tps.stream()
.filter(tp -> {
boolean shouldSeek = !sought.contains(tp);
sought.add(tp);
return shouldSeek;
})
.collect(Collectors.toList());
if (toSeek.size() > 0) {
if ("earliest".equals(resetTo)) {
consumer.seekToBeginning(tps);
consumer.seekToBeginning(toSeek);
}
else if ("latest".equals(resetTo)) {
consumer.seekToEnd(tps);
consumer.seekToEnd(toSeek);
}
}
}
@@ -1133,6 +1161,9 @@ public class KafkaMessageChannelBinder extends
if (producerProperties.getExtension().isSync()) {
setSync(true);
}
if (producerProperties.getExtension().getSendTimeoutExpression() != null) {
setSendTimeoutExpression(producerProperties.getExtension().getSendTimeoutExpression());
}
this.producerFactory = producerFactory;
}

View File

@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.AdminClient;
@@ -91,7 +92,9 @@ import org.springframework.cloud.stream.provisioning.ProvisioningException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
@@ -2256,6 +2259,28 @@ public class KafkaBinderTests extends
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testSendTimeoutExpressionProducerMetadata() throws Exception {
Binder binder = getBinder(createConfigurationProperties());
DirectChannel output = new DirectChannel();
String testTopicName = UUID.randomUUID().toString();
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
properties.getExtension().setSync(true);
SpelExpressionParser parser = new SpelExpressionParser();
Expression sendTimeoutExpression = parser.parseExpression("5000");
properties.getExtension().setSendTimeoutExpression(sendTimeoutExpression);
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName,
output, properties);
DirectFieldAccessor accessor = new DirectFieldAccessor(
extractEndpoint(producerBinding));
KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor
.getWrappedInstance();
assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sendTimeoutExpression")
.equals(sendTimeoutExpression));
producerBinding.unbind();
}
@Test
@SuppressWarnings("unchecked")
public void testAutoCreateTopicsDisabledOnBinderStillWorksAsLongAsBrokerCreatesTopic()
@@ -3007,6 +3032,81 @@ public class KafkaBinderTests extends
}
}
@Test
@SuppressWarnings("unchecked")
public void testResetOffsets() throws Exception {
Binding<?> producerBinding = null;
Binding<?> consumerBinding = null;
try {
String testPayload = "test";
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
DirectChannel moduleOutputChannel = createBindableChannel("output",
createProducerBindingProperties(producerProperties));
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
consumerProperties.setConcurrency(2);
consumerProperties.setInstanceCount(5); // 10 partitions across 2 threads
consumerProperties.getExtension().setResetOffsets(true);
DirectChannel moduleInputChannel = createBindableChannel("input",
createConsumerBindingProperties(consumerProperties));
String testTopicName = "existing" + System.currentTimeMillis();
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
configurationProperties.setAutoAddPartitions(true);
Binder binder = getBinder(configurationProperties);
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel,
producerProperties);
consumerBinding = binder.bindConsumer(testTopicName, "testReset",
moduleInputChannel, consumerProperties);
// Let the consumer actually bind to the producer before sending a msg
binderBindUnbindLatency();
IntStream.range(0, 10).forEach(i -> moduleOutputChannel.send(MessageBuilder.withPayload(testPayload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
.setHeader(KafkaHeaders.PARTITION_ID, i)
.build()));
CountDownLatch latch1 = new CountDownLatch(10);
CountDownLatch latch2 = new CountDownLatch(20);
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
AtomicInteger received = new AtomicInteger();
moduleInputChannel.subscribe(message1 -> {
try {
inboundMessageRef.set((Message<byte[]>) message1);
}
finally {
received.incrementAndGet();
latch1.countDown();
latch2.countDown();
}
});
assertThat(latch1.await(10, TimeUnit.SECONDS)).as("Failed to receive messages").isTrue();
consumerBinding.unbind();
consumerBinding = binder.bindConsumer(testTopicName, "testReset",
moduleInputChannel, consumerProperties);
assertThat(latch2.await(10, TimeUnit.SECONDS)).as("Failed to receive message").isTrue();
binder.bindConsumer(testTopicName + "-x", "testReset",
moduleInputChannel, consumerProperties).unbind(); // cause another rebalance
assertThat(received.get()).as("Unexpected reset").isEqualTo(20);
assertThat(inboundMessageRef.get()).isNotNull();
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test".getBytes());
assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType",
MimeTypeUtils.TEXT_PLAIN);
}
finally {
if (producerBinding != null) {
producerBinding.unbind();
}
if (consumerBinding != null) {
consumerBinding.unbind();
}
}
}
private final class FailingInvocationCountingMessageHandler
implements MessageHandler {

View File

@@ -0,0 +1,152 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration;
import java.util.Map;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.BinderFactory;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Gary Russell
* @since 2.1.4
*
*/
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.",
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=99",
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.acks=all"})
public class ProducerOnlyTransactionTests {
private static final String KAFKA_BROKERS_PROPERTY = "spring.cloud.stream.kafka.binder.brokers";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "output")
.brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1")
.brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1");
@Autowired
private Sender sender;
@Autowired
private MessageChannel output;
@BeforeClass
public static void setup() {
System.setProperty(KAFKA_BROKERS_PROPERTY,
embeddedKafka.getEmbeddedKafka().getBrokersAsString());
}
@AfterClass
public static void clean() {
System.clearProperty(KAFKA_BROKERS_PROPERTY);
}
@Test
public void testProducerTx() {
this.sender.DoInTransaction(this.output);
assertThat(this.sender.isInTx()).isTrue();
Map<String, Object> props = KafkaTestUtils.consumerProps("consumeTx", "false",
embeddedKafka.getEmbeddedKafka());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase());
Consumer<?, ?> consumer = new KafkaConsumer<>(props);
embeddedKafka.getEmbeddedKafka().consumeFromAllEmbeddedTopics(consumer);
ConsumerRecord<?, ?> record = KafkaTestUtils.getSingleRecord(consumer, "output");
assertThat(record.value()).isEqualTo("foo".getBytes());
}
@EnableBinding(Source.class)
@EnableAutoConfiguration
@EnableTransactionManagement
public static class Config {
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
try {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager<byte[], byte[]> tm = new KafkaTransactionManager<>(pf);
tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return tm;
}
catch (BeanCreationException e) { // needed to avoid other tests in this package failing when there is no binder
return null;
}
}
@Bean
public Sender sender() {
return new Sender();
}
}
public static class Sender {
private boolean isInTx;
@Transactional
public void DoInTransaction(MessageChannel output) {
this.isInTx = TransactionSynchronizationManager.isActualTransactionActive();
output.send(new GenericMessage<>("foo"));
}
public boolean isInTx() {
return this.isInTx;
}
}
}