Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
32adf574ac | ||
|
|
fc1e4602fb | ||
|
|
0ea0abb5b9 | ||
|
|
11bdcd5884 | ||
|
|
dd201ef60b | ||
|
|
8aa18c73f0 | ||
|
|
84a58bc202 | ||
|
|
d6ca68e20d | ||
|
|
665f087956 | ||
|
|
4a4d42da41 | ||
|
|
fba5dbb22f | ||
|
|
6d14276d10 | ||
|
|
74783cfa59 | ||
|
|
8fb0516d96 | ||
|
|
683e3f96ba |
10
docs/pom.xml
10
docs/pom.xml
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.2.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
@@ -124,10 +124,10 @@
|
||||
<sourceDocumentName>${docs.main}.adoc</sourceDocumentName>
|
||||
<attributes>
|
||||
<spring-cloud-stream-version>${project.version}</spring-cloud-stream-version>
|
||||
<docs-url>https://cloud.spring.io/</docs-url>
|
||||
<docs-version></docs-version>
|
||||
<!-- <docs-version>${project.version}/</docs-version> -->
|
||||
<!-- <docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url> -->
|
||||
<!-- <docs-url>https://cloud.spring.io/</docs-url> -->
|
||||
<!-- <docs-version></docs-version> -->
|
||||
<docs-version>${project.version}/</docs-version>
|
||||
<docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url>
|
||||
</attributes>
|
||||
</configuration>
|
||||
<executions>
|
||||
|
||||
@@ -290,6 +290,13 @@ sync::
|
||||
Whether the producer is synchronous.
|
||||
+
|
||||
Default: `false`.
|
||||
sendTimeoutExpression::
|
||||
A SpEL expression evaluated against the outgoing message used to evaluate the time to wait for ack when synchronous publish is enabled -- for example, `headers['mySendTimeout']`.
|
||||
The value of the timeout is in milliseconds.
|
||||
With versions before 3.0, the payload could not be used unless native encoding was being used because, by the time this expression was evaluated, the payload was already in the form of a `byte[]`.
|
||||
Now, the expression is evaluated before the payload is converted.
|
||||
+
|
||||
Default: `none`.
|
||||
batchTimeout::
|
||||
How long the producer waits to allow more messages to accumulate in the same batch before sending the messages.
|
||||
(Normally, the producer does not wait at all and simply sends all the messages that accumulated while the previous send was in progress.) A non-zero value may increase throughput at the expense of latency.
|
||||
@@ -502,6 +509,50 @@ public class Application {
|
||||
}
|
||||
----
|
||||
|
||||
[[kafka-transactional-binder]]
|
||||
=== Transactional Binder
|
||||
|
||||
Enable transactions by setting `spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix` to a non-empty value, e.g. `tx-`.
|
||||
When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction.
|
||||
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
|
||||
A common producer factory is used for all producer bindings configure using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
|
||||
|
||||
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
@Bean
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders) {
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
return new KafkaTransactionManager<>(pf);
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
Notice that we get a reference to the binder using the `BinderFactory`; use `null` in the first argument when there is only one binder configured.
|
||||
If more than one binder is configured, use the binder name to get the reference.
|
||||
Once we have a reference to the binder, we can obtain a reference to the `ProducerFactory` and create a transaction manager.
|
||||
|
||||
Then you would just normal Spring transaction support, e.g. `TransactionTemplate` or `@Transactional`, for example:
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
public static class Sender {
|
||||
|
||||
@Transactional
|
||||
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
|
||||
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
|
||||
|
||||
[[kafka-error-channels]]
|
||||
=== Error Channels
|
||||
|
||||
|
||||
6
pom.xml
6
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.2.BUILD-SNAPSHOT</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.1.4.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.6.RELEASE</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
@@ -15,7 +15,7 @@
|
||||
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>2.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>2.2.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>2.2.2.BUILD-SNAPSHOT</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.2.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.2.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -40,6 +40,8 @@ public class KafkaProducerProperties {
|
||||
|
||||
private boolean sync;
|
||||
|
||||
private Expression sendTimeoutExpression;
|
||||
|
||||
private int batchTimeout;
|
||||
|
||||
private Expression messageKeyExpression;
|
||||
@@ -75,6 +77,14 @@ public class KafkaProducerProperties {
|
||||
this.sync = sync;
|
||||
}
|
||||
|
||||
public Expression getSendTimeoutExpression() {
|
||||
return this.sendTimeoutExpression;
|
||||
}
|
||||
|
||||
public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
|
||||
this.sendTimeoutExpression = sendTimeoutExpression;
|
||||
}
|
||||
|
||||
public int getBatchTimeout() {
|
||||
return this.batchTimeout;
|
||||
}
|
||||
|
||||
@@ -466,11 +466,11 @@ public class KafkaTopicProvisioner implements
|
||||
// In some cases, the above partition query may not throw an UnknownTopic..Exception for various reasons.
|
||||
// For that, we are forcing another query to ensure that the topic is present on the server.
|
||||
if (CollectionUtils.isEmpty(partitions)) {
|
||||
final AdminClient adminClient = AdminClient
|
||||
.create(this.adminClientProperties);
|
||||
final DescribeTopicsResult describeTopicsResult = adminClient
|
||||
try (AdminClient adminClient = AdminClient
|
||||
.create(this.adminClientProperties)) {
|
||||
final DescribeTopicsResult describeTopicsResult = adminClient
|
||||
.describeTopics(Collections.singletonList(topicName));
|
||||
try {
|
||||
|
||||
describeTopicsResult.all().get();
|
||||
}
|
||||
catch (ExecutionException ex) {
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.2.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -118,4 +118,8 @@ public class GlobalKTableBinder extends
|
||||
.getExtendedPropertiesEntryClass();
|
||||
}
|
||||
|
||||
public void setKafkaStreamsExtendedBindingProperties(
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
|
||||
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@@ -67,10 +68,14 @@ public class GlobalKTableBinderConfiguration {
|
||||
@Bean
|
||||
public GlobalKTableBinder GlobalKTableBinder(
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner,
|
||||
@Qualifier("kafkaStreamsDlqDispatchers") Map<String, KafkaStreamsDlqDispatch> kafkaStreamsDlqDispatchers) {
|
||||
return new GlobalKTableBinder(binderConfigurationProperties,
|
||||
GlobalKTableBinder globalKTableBinder = new GlobalKTableBinder(binderConfigurationProperties,
|
||||
kafkaTopicProvisioner, kafkaStreamsDlqDispatchers);
|
||||
globalKTableBinder.setKafkaStreamsExtendedBindingProperties(
|
||||
kafkaStreamsExtendedBindingProperties);
|
||||
return globalKTableBinder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -116,7 +116,7 @@ class KStreamBinder extends
|
||||
ExtendedProducerProperties<KafkaStreamsProducerProperties> properties) {
|
||||
// @checkstyle:on
|
||||
ExtendedProducerProperties<KafkaProducerProperties> extendedProducerProperties = new ExtendedProducerProperties<>(
|
||||
new KafkaProducerProperties());
|
||||
properties.getExtension());
|
||||
this.kafkaTopicProvisioner.provisionProducerDestination(name,
|
||||
extendedProducerProperties);
|
||||
Serde<?> keySerde = this.keyValueSerdeResolver
|
||||
|
||||
@@ -122,4 +122,9 @@ class KTableBinder extends
|
||||
.getExtendedPropertiesEntryClass();
|
||||
}
|
||||
|
||||
public void setKafkaStreamsExtendedBindingProperties(
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
|
||||
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@@ -68,10 +69,12 @@ public class KTableBinderConfiguration {
|
||||
public KTableBinder kTableBinder(
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
|
||||
@Qualifier("kafkaStreamsDlqDispatchers") Map<String, KafkaStreamsDlqDispatch> kafkaStreamsDlqDispatchers) {
|
||||
KTableBinder kStreamBinder = new KTableBinder(binderConfigurationProperties,
|
||||
KTableBinder kTableBinder = new KTableBinder(binderConfigurationProperties,
|
||||
kafkaTopicProvisioner, kafkaStreamsDlqDispatchers);
|
||||
return kStreamBinder;
|
||||
kTableBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties);
|
||||
return kTableBinder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
* Copyright 2019-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -17,6 +17,7 @@
|
||||
package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -70,12 +71,22 @@ class KafkaStreamsBinderHealthIndicator extends AbstractHealthIndicator {
|
||||
final Map<String, Object> details = new HashMap<>();
|
||||
for (TaskMetadata metadata : taskMetadata) {
|
||||
details.put("taskId", metadata.taskId());
|
||||
details.put("partitions",
|
||||
metadata.topicPartitions().stream().map(
|
||||
p -> "partition=" + p.partition() + ", topic=" + p.topic())
|
||||
.collect(Collectors.toList()));
|
||||
if (details.containsKey("partitions")) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<String> partitionsInfo = (List<String>) details.get("partitions");
|
||||
partitionsInfo.addAll(addPartitionsInfo(metadata));
|
||||
}
|
||||
else {
|
||||
details.put("partitions",
|
||||
addPartitionsInfo(metadata));
|
||||
}
|
||||
}
|
||||
return details;
|
||||
}
|
||||
|
||||
private static List<String> addPartitionsInfo(TaskMetadata metadata) {
|
||||
return metadata.topicPartitions().stream().map(
|
||||
p -> "partition=" + p.partition() + ", topic=" + p.topic())
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -466,8 +466,8 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
|
||||
}
|
||||
|
||||
int concurrency = this.bindingServiceProperties.getConsumerProperties(inboundName).getConcurrency();
|
||||
// override concurrency if set at the individual binding level.
|
||||
if (concurrency > 1) {
|
||||
// override concurrency (set potentially on the binding).
|
||||
if (concurrency >= 1) {
|
||||
streamConfigGlobalProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, concurrency);
|
||||
}
|
||||
|
||||
|
||||
@@ -539,8 +539,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator
|
||||
|
||||
int concurrency = this.bindingServiceProperties.getConsumerProperties(inboundName)
|
||||
.getConcurrency();
|
||||
// override concurrency if set at the individual binding level.
|
||||
if (concurrency > 1) {
|
||||
// override concurrency (set potentially on the binding).
|
||||
if (concurrency >= 1) {
|
||||
streamConfigGlobalProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
|
||||
concurrency);
|
||||
}
|
||||
|
||||
@@ -200,7 +200,7 @@ public class KeyValueSerdeResolver {
|
||||
Serde.class)
|
||||
: Serdes.ByteArray();
|
||||
}
|
||||
valueSerde.configure(this.streamConfigGlobalProperties, false);
|
||||
return valueSerde;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -41,8 +41,14 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderFactory;
|
||||
import org.springframework.cloud.stream.binder.ConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.binder.ProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
@@ -75,7 +81,8 @@ public class StreamToGlobalKTableJoinIntegrationTests {
|
||||
SpringApplication app = new SpringApplication(
|
||||
StreamToGlobalKTableJoinIntegrationTests.OrderEnricherApplication.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
|
||||
|
||||
ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=orders",
|
||||
"--spring.cloud.stream.bindings.input-x.destination=customers",
|
||||
@@ -112,10 +119,44 @@ public class StreamToGlobalKTableJoinIntegrationTests {
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
|
||||
+ "=StreamToGlobalKTableJoinIntegrationTests-abc",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.topic.properties.cleanup.policy=compact",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-x.consumer.topic.properties.cleanup.policy=compact",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-y.consumer.topic.properties.cleanup.policy=compact",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
|
||||
+ embeddedKafka.getZookeeperConnectionString())) {
|
||||
+ embeddedKafka.getZookeeperConnectionString());
|
||||
try {
|
||||
|
||||
// Testing certain ancillary configuration of GlobalKTable around topics creation.
|
||||
// See this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/687
|
||||
|
||||
BinderFactory binderFactory = context.getBeanFactory()
|
||||
.getBean(BinderFactory.class);
|
||||
|
||||
Binder<KStream, ? extends ConsumerProperties, ? extends ProducerProperties> kStreamBinder = binderFactory
|
||||
.getBinder("kstream", KStream.class);
|
||||
|
||||
KafkaStreamsConsumerProperties input = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) kStreamBinder)
|
||||
.getExtendedConsumerProperties("input");
|
||||
String cleanupPolicy = input.getTopic().getProperties().get("cleanup.policy");
|
||||
|
||||
assertThat(cleanupPolicy).isEqualTo("compact");
|
||||
|
||||
Binder<GlobalKTable, ? extends ConsumerProperties, ? extends ProducerProperties> globalKTableBinder = binderFactory
|
||||
.getBinder("globalktable", GlobalKTable.class);
|
||||
|
||||
KafkaStreamsConsumerProperties inputX = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) globalKTableBinder)
|
||||
.getExtendedConsumerProperties("input-x");
|
||||
String cleanupPolicyX = inputX.getTopic().getProperties().get("cleanup.policy");
|
||||
|
||||
assertThat(cleanupPolicyX).isEqualTo("compact");
|
||||
|
||||
KafkaStreamsConsumerProperties inputY = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) globalKTableBinder)
|
||||
.getExtendedConsumerProperties("input-y");
|
||||
String cleanupPolicyY = inputY.getTopic().getProperties().get("cleanup.policy");
|
||||
|
||||
assertThat(cleanupPolicyY).isEqualTo("compact");
|
||||
Map<String, Object> senderPropsCustomer = KafkaTestUtils
|
||||
.producerProps(embeddedKafka);
|
||||
senderPropsCustomer.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
||||
@@ -225,7 +266,10 @@ public class StreamToGlobalKTableJoinIntegrationTests {
|
||||
pfCustomer.destroy();
|
||||
pfProduct.destroy();
|
||||
pfOrder.destroy();
|
||||
}
|
||||
finally {
|
||||
consumer.close();
|
||||
context.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -46,8 +46,14 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderFactory;
|
||||
import org.springframework.cloud.stream.binder.ConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.binder.ProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
@@ -90,7 +96,7 @@ public class StreamToTableJoinIntegrationTests {
|
||||
consumer = cf.createConsumer();
|
||||
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "output-topic-1");
|
||||
|
||||
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
|
||||
ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=user-clicks-1",
|
||||
"--spring.cloud.stream.bindings.input-x.destination=user-regions-1",
|
||||
@@ -117,10 +123,25 @@ public class StreamToTableJoinIntegrationTests {
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
|
||||
+ "=StreamToTableJoinIntegrationTests-abc",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-x.consumer.topic.properties.cleanup.policy=compact",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
|
||||
+ embeddedKafka.getZookeeperConnectionString())) {
|
||||
+ embeddedKafka.getZookeeperConnectionString());
|
||||
try {
|
||||
// Testing certain ancillary configuration of GlobalKTable around topics creation.
|
||||
// See this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/687
|
||||
BinderFactory binderFactory = context.getBeanFactory()
|
||||
.getBean(BinderFactory.class);
|
||||
|
||||
Binder<KTable, ? extends ConsumerProperties, ? extends ProducerProperties> ktableBinder = binderFactory
|
||||
.getBinder("ktable", KTable.class);
|
||||
|
||||
KafkaStreamsConsumerProperties inputX = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) ktableBinder)
|
||||
.getExtendedConsumerProperties("input-x");
|
||||
String cleanupPolicyX = inputX.getTopic().getProperties().get("cleanup.policy");
|
||||
|
||||
assertThat(cleanupPolicyX).isEqualTo("compact");
|
||||
|
||||
// Input 1: Region per user (multiple records allowed per user).
|
||||
List<KeyValue<String, String>> userRegions = Arrays.asList(new KeyValue<>(
|
||||
@@ -193,6 +214,7 @@ public class StreamToTableJoinIntegrationTests {
|
||||
}
|
||||
finally {
|
||||
consumer.close();
|
||||
context.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.2.BUILD-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -30,9 +30,11 @@ import java.util.concurrent.TimeoutException;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.actuate.health.HealthIndicator;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
||||
|
||||
/**
|
||||
* Health indicator for Kafka.
|
||||
@@ -43,11 +45,15 @@ import org.springframework.kafka.core.ConsumerFactory;
|
||||
* @author Gary Russell
|
||||
* @author Laur Aliste
|
||||
* @author Soby Chacko
|
||||
* @author Vladislav Fefelov
|
||||
*/
|
||||
public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBean {
|
||||
|
||||
private static final int DEFAULT_TIMEOUT = 60;
|
||||
|
||||
private final ExecutorService executor = Executors.newSingleThreadExecutor(
|
||||
new CustomizableThreadFactory("kafka-binder-health-"));
|
||||
|
||||
private final KafkaMessageChannelBinder binder;
|
||||
|
||||
private final ConsumerFactory<?, ?> consumerFactory;
|
||||
@@ -72,8 +78,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
|
||||
@Override
|
||||
public Health health() {
|
||||
ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||
Future<Health> future = exec.submit(this::buildHealthStatus);
|
||||
Future<Health> future = executor.submit(this::buildHealthStatus);
|
||||
try {
|
||||
return future.get(this.timeout, TimeUnit.SECONDS);
|
||||
}
|
||||
@@ -91,9 +96,6 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
return Health.down().withDetail("Failed to retrieve partition information in",
|
||||
this.timeout + " seconds").build();
|
||||
}
|
||||
finally {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
private Health buildHealthStatus() {
|
||||
@@ -146,4 +148,9 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -55,11 +55,16 @@ public interface KafkaBindingRebalanceListener {
|
||||
|
||||
/**
|
||||
* Invoked when partitions are initially assigned or after a rebalance. Applications
|
||||
* might only want to perform seek operations on an initial assignment.
|
||||
* might only want to perform seek operations on an initial assignment. While the
|
||||
* 'initial' argument is true for each thread (when concurrency is greater than 1),
|
||||
* implementations should keep track of exactly which partitions have been sought.
|
||||
* There is a race in that a rebalance could occur during startup and so a topic/
|
||||
* partition that has been sought on one thread may be re-assigned to another
|
||||
* thread and you may not wish to re-seek it at that time.
|
||||
* @param bindingName the name of the binding.
|
||||
* @param consumer the consumer.
|
||||
* @param partitions the partitions.
|
||||
* @param initial true if this is the initial assignment.
|
||||
* @param initial true if this is the initial assignment on the current thread.
|
||||
*/
|
||||
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> partitions, boolean initial) {
|
||||
|
||||
@@ -28,9 +28,9 @@ import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.regex.Pattern;
|
||||
@@ -104,6 +104,7 @@ import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
|
||||
import org.springframework.kafka.support.converter.MessagingMessageConverter;
|
||||
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
@@ -284,6 +285,17 @@ public class KafkaMessageChannelBinder extends
|
||||
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a reference to the binder's transaction manager's producer factory (if
|
||||
* configured). Use this to create a transaction manager in a bean definition when you
|
||||
* wish to use producer-only transactions.
|
||||
* @return the transaction manager, or null.
|
||||
*/
|
||||
@Nullable
|
||||
public ProducerFactory<byte[], byte[]> getTransactionalProducerFactory() {
|
||||
return this.transactionManager == null ? null : this.transactionManager.getProducerFactory();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageHandler createProducerMessageHandler(
|
||||
final ProducerDestination destination,
|
||||
@@ -581,6 +593,7 @@ public class KafkaMessageChannelBinder extends
|
||||
public void setupRebalanceListener(
|
||||
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
|
||||
final ContainerProperties containerProperties) {
|
||||
|
||||
Assert.isTrue(!extendedConsumerProperties.getExtension().isResetOffsets(),
|
||||
"'resetOffsets' cannot be set when a KafkaBindingRebalanceListener is provided");
|
||||
final String bindingName = bindingNameHolder.get();
|
||||
@@ -590,7 +603,7 @@ public class KafkaMessageChannelBinder extends
|
||||
containerProperties
|
||||
.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
|
||||
private boolean initial = true;
|
||||
private final ThreadLocal<Boolean> initialAssignment = new ThreadLocal<>();
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
|
||||
@@ -612,11 +625,15 @@ public class KafkaMessageChannelBinder extends
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> partitions) {
|
||||
try {
|
||||
Boolean initial = this.initialAssignment.get();
|
||||
if (initial == null) {
|
||||
initial = Boolean.TRUE;
|
||||
}
|
||||
userRebalanceListener.onPartitionsAssigned(bindingName,
|
||||
consumer, partitions, this.initial);
|
||||
consumer, partitions, initial);
|
||||
}
|
||||
finally {
|
||||
this.initial = false;
|
||||
this.initialAssignment.set(Boolean.FALSE);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -663,20 +680,22 @@ public class KafkaMessageChannelBinder extends
|
||||
boolean resetOffsets = extendedConsumerProperties.getExtension().isResetOffsets();
|
||||
final Object resetTo = consumerFactory.getConfigurationProperties()
|
||||
.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
|
||||
final AtomicBoolean initialAssignment = new AtomicBoolean(true);
|
||||
if (!"earliest".equals(resetTo) && !"latest".equals(resetTo)) {
|
||||
logger.warn("no (or unknown) " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
|
||||
+ " property cannot reset");
|
||||
resetOffsets = false;
|
||||
}
|
||||
if (groupManagement && resetOffsets) {
|
||||
containerProperties
|
||||
.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
Set<TopicPartition> sought = ConcurrentHashMap.newKeySet();
|
||||
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevokedBeforeCommit(
|
||||
Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
// no op
|
||||
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Partitions revoked: " + tps);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -686,14 +705,23 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> tps) {
|
||||
if (initialAssignment.getAndSet(false)) {
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Partitions assigned: " + tps);
|
||||
}
|
||||
List<TopicPartition> toSeek = tps.stream()
|
||||
.filter(tp -> {
|
||||
boolean shouldSeek = !sought.contains(tp);
|
||||
sought.add(tp);
|
||||
return shouldSeek;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
if (toSeek.size() > 0) {
|
||||
if ("earliest".equals(resetTo)) {
|
||||
consumer.seekToBeginning(tps);
|
||||
consumer.seekToBeginning(toSeek);
|
||||
}
|
||||
else if ("latest".equals(resetTo)) {
|
||||
consumer.seekToEnd(tps);
|
||||
consumer.seekToEnd(toSeek);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1133,6 +1161,9 @@ public class KafkaMessageChannelBinder extends
|
||||
if (producerProperties.getExtension().isSync()) {
|
||||
setSync(true);
|
||||
}
|
||||
if (producerProperties.getExtension().getSendTimeoutExpression() != null) {
|
||||
setSendTimeoutExpression(producerProperties.getExtension().getSendTimeoutExpression());
|
||||
}
|
||||
this.producerFactory = producerFactory;
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
@@ -91,7 +92,9 @@ import org.springframework.cloud.stream.provisioning.ProvisioningException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.support.GenericApplicationContext;
|
||||
import org.springframework.expression.Expression;
|
||||
import org.springframework.expression.common.LiteralExpression;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.integration.IntegrationMessageHeaderAccessor;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
@@ -2256,6 +2259,28 @@ public class KafkaBinderTests extends
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testSendTimeoutExpressionProducerMetadata() throws Exception {
|
||||
Binder binder = getBinder(createConfigurationProperties());
|
||||
DirectChannel output = new DirectChannel();
|
||||
String testTopicName = UUID.randomUUID().toString();
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties = createProducerProperties();
|
||||
properties.getExtension().setSync(true);
|
||||
SpelExpressionParser parser = new SpelExpressionParser();
|
||||
Expression sendTimeoutExpression = parser.parseExpression("5000");
|
||||
properties.getExtension().setSendTimeoutExpression(sendTimeoutExpression);
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName,
|
||||
output, properties);
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(
|
||||
extractEndpoint(producerBinding));
|
||||
KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor
|
||||
.getWrappedInstance();
|
||||
assertThat(new DirectFieldAccessor(wrappedInstance).getPropertyValue("sendTimeoutExpression")
|
||||
.equals(sendTimeoutExpression));
|
||||
producerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoCreateTopicsDisabledOnBinderStillWorksAsLongAsBrokerCreatesTopic()
|
||||
@@ -3007,6 +3032,81 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testResetOffsets() throws Exception {
|
||||
Binding<?> producerBinding = null;
|
||||
Binding<?> consumerBinding = null;
|
||||
try {
|
||||
String testPayload = "test";
|
||||
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
createProducerBindingProperties(producerProperties));
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setConcurrency(2);
|
||||
consumerProperties.setInstanceCount(5); // 10 partitions across 2 threads
|
||||
consumerProperties.getExtension().setResetOffsets(true);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel,
|
||||
producerProperties);
|
||||
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "testReset",
|
||||
moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
IntStream.range(0, 10).forEach(i -> moduleOutputChannel.send(MessageBuilder.withPayload(testPayload)
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
|
||||
.setHeader(KafkaHeaders.PARTITION_ID, i)
|
||||
.build()));
|
||||
CountDownLatch latch1 = new CountDownLatch(10);
|
||||
CountDownLatch latch2 = new CountDownLatch(20);
|
||||
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
|
||||
AtomicInteger received = new AtomicInteger();
|
||||
moduleInputChannel.subscribe(message1 -> {
|
||||
try {
|
||||
inboundMessageRef.set((Message<byte[]>) message1);
|
||||
}
|
||||
finally {
|
||||
received.incrementAndGet();
|
||||
latch1.countDown();
|
||||
latch2.countDown();
|
||||
}
|
||||
});
|
||||
assertThat(latch1.await(10, TimeUnit.SECONDS)).as("Failed to receive messages").isTrue();
|
||||
consumerBinding.unbind();
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "testReset",
|
||||
moduleInputChannel, consumerProperties);
|
||||
assertThat(latch2.await(10, TimeUnit.SECONDS)).as("Failed to receive message").isTrue();
|
||||
binder.bindConsumer(testTopicName + "-x", "testReset",
|
||||
moduleInputChannel, consumerProperties).unbind(); // cause another rebalance
|
||||
assertThat(received.get()).as("Unexpected reset").isEqualTo(20);
|
||||
|
||||
assertThat(inboundMessageRef.get()).isNotNull();
|
||||
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test".getBytes());
|
||||
assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType",
|
||||
MimeTypeUtils.TEXT_PLAIN);
|
||||
}
|
||||
finally {
|
||||
if (producerBinding != null) {
|
||||
producerBinding.unbind();
|
||||
}
|
||||
if (consumerBinding != null) {
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private final class FailingInvocationCountingMessageHandler
|
||||
implements MessageHandler {
|
||||
|
||||
|
||||
@@ -0,0 +1,152 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.integration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import kafka.server.KafkaConfig;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.requests.IsolationLevel;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.BeanCreationException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.binder.BinderFactory;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.messaging.Source;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.annotation.EnableTransactionManagement;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 2.1.4
|
||||
*
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
|
||||
"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.",
|
||||
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=99",
|
||||
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.acks=all"})
|
||||
public class ProducerOnlyTransactionTests {
|
||||
|
||||
private static final String KAFKA_BROKERS_PROPERTY = "spring.cloud.stream.kafka.binder.brokers";
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "output")
|
||||
.brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1")
|
||||
.brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1");
|
||||
|
||||
@Autowired
|
||||
private Sender sender;
|
||||
|
||||
@Autowired
|
||||
private MessageChannel output;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
System.setProperty(KAFKA_BROKERS_PROPERTY,
|
||||
embeddedKafka.getEmbeddedKafka().getBrokersAsString());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void clean() {
|
||||
System.clearProperty(KAFKA_BROKERS_PROPERTY);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProducerTx() {
|
||||
this.sender.DoInTransaction(this.output);
|
||||
assertThat(this.sender.isInTx()).isTrue();
|
||||
Map<String, Object> props = KafkaTestUtils.consumerProps("consumeTx", "false",
|
||||
embeddedKafka.getEmbeddedKafka());
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase());
|
||||
Consumer<?, ?> consumer = new KafkaConsumer<>(props);
|
||||
embeddedKafka.getEmbeddedKafka().consumeFromAllEmbeddedTopics(consumer);
|
||||
ConsumerRecord<?, ?> record = KafkaTestUtils.getSingleRecord(consumer, "output");
|
||||
assertThat(record.value()).isEqualTo("foo".getBytes());
|
||||
}
|
||||
|
||||
@EnableBinding(Source.class)
|
||||
@EnableAutoConfiguration
|
||||
@EnableTransactionManagement
|
||||
public static class Config {
|
||||
|
||||
@Bean
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders) {
|
||||
try {
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
KafkaTransactionManager<byte[], byte[]> tm = new KafkaTransactionManager<>(pf);
|
||||
tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
|
||||
return tm;
|
||||
}
|
||||
catch (BeanCreationException e) { // needed to avoid other tests in this package failing when there is no binder
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Sender sender() {
|
||||
return new Sender();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Sender {
|
||||
|
||||
private boolean isInTx;
|
||||
|
||||
@Transactional
|
||||
public void DoInTransaction(MessageChannel output) {
|
||||
this.isInTx = TransactionSynchronizationManager.isActualTransactionActive();
|
||||
output.send(new GenericMessage<>("foo"));
|
||||
}
|
||||
|
||||
public boolean isInTx() {
|
||||
return this.isInTx;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user