Compare commits
8 Commits
v3.0.0.M2
...
v2.2.1.REL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d6ca68e20d | ||
|
|
665f087956 | ||
|
|
4a4d42da41 | ||
|
|
fba5dbb22f | ||
|
|
6d14276d10 | ||
|
|
74783cfa59 | ||
|
|
8fb0516d96 | ||
|
|
683e3f96ba |
52
README.adoc
52
README.adoc
@@ -162,6 +162,9 @@ Default: none.
|
||||
[[kafka-consumer-properties]]
|
||||
==== Kafka Consumer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka consumers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.consumer.`.
|
||||
|
||||
@@ -284,6 +287,9 @@ Default: none (the binder-wide default of 1 is used).
|
||||
[[kafka-producer-properties]]
|
||||
==== Kafka Producer Properties
|
||||
|
||||
NOTE: To avoid repetition, Spring Cloud Stream supports setting values for all channels, in the format of `spring.cloud.stream.default.<property>=<value>`.
|
||||
|
||||
|
||||
The following properties are available for Kafka producers only and
|
||||
must be prefixed with `spring.cloud.stream.kafka.bindings.<channelName>.producer.`.
|
||||
|
||||
@@ -516,6 +522,50 @@ public class Application {
|
||||
}
|
||||
----
|
||||
|
||||
[[kafka-transactional-binder]]
|
||||
=== Transactional Binder
|
||||
|
||||
Enable transactions by setting `spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix` to a non-empty value, e.g. `tx-`.
|
||||
When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction.
|
||||
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
|
||||
A common producer factory is used for all producer bindings configure using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
|
||||
|
||||
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
@Bean
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders) {
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
return new KafkaTransactionManager<>(pf);
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
Notice that we get a reference to the binder using the `BinderFactory`; use `null` in the first argument when there is only one binder configured.
|
||||
If more than one binder is configured, use the binder name to get the reference.
|
||||
Once we have a reference to the binder, we can obtain a reference to the `ProducerFactory` and create a transaction manager.
|
||||
|
||||
Then you would just normal Spring transaction support, e.g. `TransactionTemplate` or `@Transactional`, for example:
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
public static class Sender {
|
||||
|
||||
@Transactional
|
||||
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
|
||||
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
|
||||
|
||||
[[kafka-error-channels]]
|
||||
=== Error Channels
|
||||
|
||||
@@ -731,4 +781,4 @@ added after the original pull request but before a merge.
|
||||
if you are fixing an existing issue please add `Fixes gh-XXXX` at the end of the commit
|
||||
message (where XXXX is the issue number).
|
||||
|
||||
// ======================================================================================
|
||||
// ======================================================================================
|
||||
10
docs/pom.xml
10
docs/pom.xml
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.1.RELEASE</version>
|
||||
</parent>
|
||||
<packaging>pom</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
@@ -124,10 +124,10 @@
|
||||
<sourceDocumentName>${docs.main}.adoc</sourceDocumentName>
|
||||
<attributes>
|
||||
<spring-cloud-stream-version>${project.version}</spring-cloud-stream-version>
|
||||
<docs-url>https://cloud.spring.io/</docs-url>
|
||||
<docs-version></docs-version>
|
||||
<!-- <docs-version>${project.version}/</docs-version> -->
|
||||
<!-- <docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url> -->
|
||||
<!-- <docs-url>https://cloud.spring.io/</docs-url> -->
|
||||
<!-- <docs-version></docs-version> -->
|
||||
<docs-version>${project.version}/</docs-version>
|
||||
<docs-url>https://cloud.spring.io/spring-cloud-static/</docs-url>
|
||||
</attributes>
|
||||
</configuration>
|
||||
<executions>
|
||||
|
||||
@@ -502,6 +502,50 @@ public class Application {
|
||||
}
|
||||
----
|
||||
|
||||
[[kafka-transactional-binder]]
|
||||
=== Transactional Binder
|
||||
|
||||
Enable transactions by setting `spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix` to a non-empty value, e.g. `tx-`.
|
||||
When used in a processor application, the consumer starts the transaction; any records sent on the consumer thread participate in the same transaction.
|
||||
When the listener exits normally, the listener container will send the offset to the transaction and commit it.
|
||||
A common producer factory is used for all producer bindings configure using `spring.cloud.stream.kafka.binder.transaction.producer.*` properties; individual binding Kafka producer properties are ignored.
|
||||
|
||||
If you wish to use transactions in a source application, or from some arbitrary thread for producer-only transaction (e.g. `@Scheduled` method), you must get a reference to the transactional producer factory and define a `KafkaTransactionManager` bean using it.
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
@Bean
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders) {
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
return new KafkaTransactionManager<>(pf);
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
Notice that we get a reference to the binder using the `BinderFactory`; use `null` in the first argument when there is only one binder configured.
|
||||
If more than one binder is configured, use the binder name to get the reference.
|
||||
Once we have a reference to the binder, we can obtain a reference to the `ProducerFactory` and create a transaction manager.
|
||||
|
||||
Then you would just normal Spring transaction support, e.g. `TransactionTemplate` or `@Transactional`, for example:
|
||||
|
||||
====
|
||||
[source, java]
|
||||
----
|
||||
public static class Sender {
|
||||
|
||||
@Transactional
|
||||
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
|
||||
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
====
|
||||
|
||||
If you wish to synchronize producer-only transactions with those from some other transaction manager, use a `ChainedTransactionManager`.
|
||||
|
||||
[[kafka-error-channels]]
|
||||
=== Error Channels
|
||||
|
||||
|
||||
6
pom.xml
6
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.1.RELEASE</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.1.4.BUILD-SNAPSHOT</version>
|
||||
<version>2.1.6.RELEASE</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
@@ -15,7 +15,7 @@
|
||||
<spring-kafka.version>2.2.2.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.1.0.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>2.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>2.2.0.BUILD-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>2.2.1.RELEASE</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.1.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.1.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -466,11 +466,11 @@ public class KafkaTopicProvisioner implements
|
||||
// In some cases, the above partition query may not throw an UnknownTopic..Exception for various reasons.
|
||||
// For that, we are forcing another query to ensure that the topic is present on the server.
|
||||
if (CollectionUtils.isEmpty(partitions)) {
|
||||
final AdminClient adminClient = AdminClient
|
||||
.create(this.adminClientProperties);
|
||||
final DescribeTopicsResult describeTopicsResult = adminClient
|
||||
try (AdminClient adminClient = AdminClient
|
||||
.create(this.adminClientProperties)) {
|
||||
final DescribeTopicsResult describeTopicsResult = adminClient
|
||||
.describeTopics(Collections.singletonList(topicName));
|
||||
try {
|
||||
|
||||
describeTopicsResult.all().get();
|
||||
}
|
||||
catch (ExecutionException ex) {
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.1.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -118,4 +118,8 @@ public class GlobalKTableBinder extends
|
||||
.getExtendedPropertiesEntryClass();
|
||||
}
|
||||
|
||||
public void setKafkaStreamsExtendedBindingProperties(
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
|
||||
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@@ -67,10 +68,14 @@ public class GlobalKTableBinderConfiguration {
|
||||
@Bean
|
||||
public GlobalKTableBinder GlobalKTableBinder(
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner,
|
||||
@Qualifier("kafkaStreamsDlqDispatchers") Map<String, KafkaStreamsDlqDispatch> kafkaStreamsDlqDispatchers) {
|
||||
return new GlobalKTableBinder(binderConfigurationProperties,
|
||||
GlobalKTableBinder globalKTableBinder = new GlobalKTableBinder(binderConfigurationProperties,
|
||||
kafkaTopicProvisioner, kafkaStreamsDlqDispatchers);
|
||||
globalKTableBinder.setKafkaStreamsExtendedBindingProperties(
|
||||
kafkaStreamsExtendedBindingProperties);
|
||||
return globalKTableBinder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -122,4 +122,9 @@ class KTableBinder extends
|
||||
.getExtendedPropertiesEntryClass();
|
||||
}
|
||||
|
||||
public void setKafkaStreamsExtendedBindingProperties(
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
|
||||
this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -25,6 +25,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
@@ -68,10 +69,12 @@ public class KTableBinderConfiguration {
|
||||
public KTableBinder kTableBinder(
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
|
||||
KafkaTopicProvisioner kafkaTopicProvisioner,
|
||||
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
|
||||
@Qualifier("kafkaStreamsDlqDispatchers") Map<String, KafkaStreamsDlqDispatch> kafkaStreamsDlqDispatchers) {
|
||||
KTableBinder kStreamBinder = new KTableBinder(binderConfigurationProperties,
|
||||
KTableBinder kTableBinder = new KTableBinder(binderConfigurationProperties,
|
||||
kafkaTopicProvisioner, kafkaStreamsDlqDispatchers);
|
||||
return kStreamBinder;
|
||||
kTableBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties);
|
||||
return kTableBinder;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -41,8 +41,14 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderFactory;
|
||||
import org.springframework.cloud.stream.binder.ConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.binder.ProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
@@ -75,7 +81,8 @@ public class StreamToGlobalKTableJoinIntegrationTests {
|
||||
SpringApplication app = new SpringApplication(
|
||||
StreamToGlobalKTableJoinIntegrationTests.OrderEnricherApplication.class);
|
||||
app.setWebApplicationType(WebApplicationType.NONE);
|
||||
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
|
||||
|
||||
ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=orders",
|
||||
"--spring.cloud.stream.bindings.input-x.destination=customers",
|
||||
@@ -112,10 +119,44 @@ public class StreamToGlobalKTableJoinIntegrationTests {
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
|
||||
+ "=StreamToGlobalKTableJoinIntegrationTests-abc",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.topic.properties.cleanup.policy=compact",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-x.consumer.topic.properties.cleanup.policy=compact",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-y.consumer.topic.properties.cleanup.policy=compact",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
|
||||
+ embeddedKafka.getZookeeperConnectionString())) {
|
||||
+ embeddedKafka.getZookeeperConnectionString());
|
||||
try {
|
||||
|
||||
// Testing certain ancillary configuration of GlobalKTable around topics creation.
|
||||
// See this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/687
|
||||
|
||||
BinderFactory binderFactory = context.getBeanFactory()
|
||||
.getBean(BinderFactory.class);
|
||||
|
||||
Binder<KStream, ? extends ConsumerProperties, ? extends ProducerProperties> kStreamBinder = binderFactory
|
||||
.getBinder("kstream", KStream.class);
|
||||
|
||||
KafkaStreamsConsumerProperties input = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) kStreamBinder)
|
||||
.getExtendedConsumerProperties("input");
|
||||
String cleanupPolicy = input.getTopic().getProperties().get("cleanup.policy");
|
||||
|
||||
assertThat(cleanupPolicy).isEqualTo("compact");
|
||||
|
||||
Binder<GlobalKTable, ? extends ConsumerProperties, ? extends ProducerProperties> globalKTableBinder = binderFactory
|
||||
.getBinder("globalktable", GlobalKTable.class);
|
||||
|
||||
KafkaStreamsConsumerProperties inputX = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) globalKTableBinder)
|
||||
.getExtendedConsumerProperties("input-x");
|
||||
String cleanupPolicyX = inputX.getTopic().getProperties().get("cleanup.policy");
|
||||
|
||||
assertThat(cleanupPolicyX).isEqualTo("compact");
|
||||
|
||||
KafkaStreamsConsumerProperties inputY = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) globalKTableBinder)
|
||||
.getExtendedConsumerProperties("input-y");
|
||||
String cleanupPolicyY = inputY.getTopic().getProperties().get("cleanup.policy");
|
||||
|
||||
assertThat(cleanupPolicyY).isEqualTo("compact");
|
||||
Map<String, Object> senderPropsCustomer = KafkaTestUtils
|
||||
.producerProps(embeddedKafka);
|
||||
senderPropsCustomer.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
|
||||
@@ -225,7 +266,10 @@ public class StreamToGlobalKTableJoinIntegrationTests {
|
||||
pfCustomer.destroy();
|
||||
pfProduct.destroy();
|
||||
pfOrder.destroy();
|
||||
}
|
||||
finally {
|
||||
consumer.close();
|
||||
context.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -46,8 +46,14 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.binder.BinderFactory;
|
||||
import org.springframework.cloud.stream.binder.ConsumerProperties;
|
||||
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
|
||||
import org.springframework.cloud.stream.binder.ProducerProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsApplicationSupportProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
@@ -90,7 +96,7 @@ public class StreamToTableJoinIntegrationTests {
|
||||
consumer = cf.createConsumer();
|
||||
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "output-topic-1");
|
||||
|
||||
try (ConfigurableApplicationContext ignored = app.run("--server.port=0",
|
||||
ConfigurableApplicationContext context = app.run("--server.port=0",
|
||||
"--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.bindings.input.destination=user-clicks-1",
|
||||
"--spring.cloud.stream.bindings.input-x.destination=user-regions-1",
|
||||
@@ -117,10 +123,25 @@ public class StreamToTableJoinIntegrationTests {
|
||||
"--spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=10000",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
|
||||
+ "=StreamToTableJoinIntegrationTests-abc",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input-x.consumer.topic.properties.cleanup.policy=compact",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
+ embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes="
|
||||
+ embeddedKafka.getZookeeperConnectionString())) {
|
||||
+ embeddedKafka.getZookeeperConnectionString());
|
||||
try {
|
||||
// Testing certain ancillary configuration of GlobalKTable around topics creation.
|
||||
// See this issue: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/687
|
||||
BinderFactory binderFactory = context.getBeanFactory()
|
||||
.getBean(BinderFactory.class);
|
||||
|
||||
Binder<KTable, ? extends ConsumerProperties, ? extends ProducerProperties> ktableBinder = binderFactory
|
||||
.getBinder("ktable", KTable.class);
|
||||
|
||||
KafkaStreamsConsumerProperties inputX = (KafkaStreamsConsumerProperties) ((ExtendedPropertiesBinder) ktableBinder)
|
||||
.getExtendedConsumerProperties("input-x");
|
||||
String cleanupPolicyX = inputX.getTopic().getProperties().get("cleanup.policy");
|
||||
|
||||
assertThat(cleanupPolicyX).isEqualTo("compact");
|
||||
|
||||
// Input 1: Region per user (multiple records allowed per user).
|
||||
List<KeyValue<String, String>> userRegions = Arrays.asList(new KeyValue<>(
|
||||
@@ -193,6 +214,7 @@ public class StreamToTableJoinIntegrationTests {
|
||||
}
|
||||
finally {
|
||||
consumer.close();
|
||||
context.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.2.1.BUILD-SNAPSHOT</version>
|
||||
<version>2.2.1.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -30,9 +30,11 @@ import java.util.concurrent.TimeoutException;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.actuate.health.HealthIndicator;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
||||
|
||||
/**
|
||||
* Health indicator for Kafka.
|
||||
@@ -43,11 +45,15 @@ import org.springframework.kafka.core.ConsumerFactory;
|
||||
* @author Gary Russell
|
||||
* @author Laur Aliste
|
||||
* @author Soby Chacko
|
||||
* @author Vladislav Fefelov
|
||||
*/
|
||||
public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBean {
|
||||
|
||||
private static final int DEFAULT_TIMEOUT = 60;
|
||||
|
||||
private final ExecutorService executor = Executors.newSingleThreadExecutor(
|
||||
new CustomizableThreadFactory("kafka-binder-health-"));
|
||||
|
||||
private final KafkaMessageChannelBinder binder;
|
||||
|
||||
private final ConsumerFactory<?, ?> consumerFactory;
|
||||
@@ -72,8 +78,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
|
||||
@Override
|
||||
public Health health() {
|
||||
ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||
Future<Health> future = exec.submit(this::buildHealthStatus);
|
||||
Future<Health> future = executor.submit(this::buildHealthStatus);
|
||||
try {
|
||||
return future.get(this.timeout, TimeUnit.SECONDS);
|
||||
}
|
||||
@@ -91,9 +96,6 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
return Health.down().withDetail("Failed to retrieve partition information in",
|
||||
this.timeout + " seconds").build();
|
||||
}
|
||||
finally {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
private Health buildHealthStatus() {
|
||||
@@ -146,4 +148,9 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -55,11 +55,16 @@ public interface KafkaBindingRebalanceListener {
|
||||
|
||||
/**
|
||||
* Invoked when partitions are initially assigned or after a rebalance. Applications
|
||||
* might only want to perform seek operations on an initial assignment.
|
||||
* might only want to perform seek operations on an initial assignment. While the
|
||||
* 'initial' argument is true for each thread (when concurrency is greater than 1),
|
||||
* implementations should keep track of exactly which partitions have been sought.
|
||||
* There is a race in that a rebalance could occur during startup and so a topic/
|
||||
* partition that has been sought on one thread may be re-assigned to another
|
||||
* thread and you may not wish to re-seek it at that time.
|
||||
* @param bindingName the name of the binding.
|
||||
* @param consumer the consumer.
|
||||
* @param partitions the partitions.
|
||||
* @param initial true if this is the initial assignment.
|
||||
* @param initial true if this is the initial assignment on the current thread.
|
||||
*/
|
||||
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> partitions, boolean initial) {
|
||||
|
||||
@@ -28,9 +28,9 @@ import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.regex.Pattern;
|
||||
@@ -104,6 +104,7 @@ import org.springframework.kafka.support.TopicPartitionInitialOffset;
|
||||
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
|
||||
import org.springframework.kafka.support.converter.MessagingMessageConverter;
|
||||
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
@@ -284,6 +285,17 @@ public class KafkaMessageChannelBinder extends
|
||||
return this.extendedBindingProperties.getExtendedPropertiesEntryClass();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a reference to the binder's transaction manager's producer factory (if
|
||||
* configured). Use this to create a transaction manager in a bean definition when you
|
||||
* wish to use producer-only transactions.
|
||||
* @return the transaction manager, or null.
|
||||
*/
|
||||
@Nullable
|
||||
public ProducerFactory<byte[], byte[]> getTransactionalProducerFactory() {
|
||||
return this.transactionManager == null ? null : this.transactionManager.getProducerFactory();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageHandler createProducerMessageHandler(
|
||||
final ProducerDestination destination,
|
||||
@@ -581,6 +593,7 @@ public class KafkaMessageChannelBinder extends
|
||||
public void setupRebalanceListener(
|
||||
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
|
||||
final ContainerProperties containerProperties) {
|
||||
|
||||
Assert.isTrue(!extendedConsumerProperties.getExtension().isResetOffsets(),
|
||||
"'resetOffsets' cannot be set when a KafkaBindingRebalanceListener is provided");
|
||||
final String bindingName = bindingNameHolder.get();
|
||||
@@ -590,7 +603,7 @@ public class KafkaMessageChannelBinder extends
|
||||
containerProperties
|
||||
.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
|
||||
private boolean initial = true;
|
||||
private final ThreadLocal<Boolean> initialAssignment = new ThreadLocal<>();
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
|
||||
@@ -612,11 +625,15 @@ public class KafkaMessageChannelBinder extends
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> partitions) {
|
||||
try {
|
||||
Boolean initial = this.initialAssignment.get();
|
||||
if (initial == null) {
|
||||
initial = Boolean.TRUE;
|
||||
}
|
||||
userRebalanceListener.onPartitionsAssigned(bindingName,
|
||||
consumer, partitions, this.initial);
|
||||
consumer, partitions, initial);
|
||||
}
|
||||
finally {
|
||||
this.initial = false;
|
||||
this.initialAssignment.set(Boolean.FALSE);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -663,20 +680,22 @@ public class KafkaMessageChannelBinder extends
|
||||
boolean resetOffsets = extendedConsumerProperties.getExtension().isResetOffsets();
|
||||
final Object resetTo = consumerFactory.getConfigurationProperties()
|
||||
.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
|
||||
final AtomicBoolean initialAssignment = new AtomicBoolean(true);
|
||||
if (!"earliest".equals(resetTo) && !"latest".equals(resetTo)) {
|
||||
logger.warn("no (or unknown) " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
|
||||
+ " property cannot reset");
|
||||
resetOffsets = false;
|
||||
}
|
||||
if (groupManagement && resetOffsets) {
|
||||
containerProperties
|
||||
.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
Set<TopicPartition> sought = ConcurrentHashMap.newKeySet();
|
||||
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevokedBeforeCommit(
|
||||
Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
// no op
|
||||
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Partitions revoked: " + tps);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -686,14 +705,23 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> tps) {
|
||||
if (initialAssignment.getAndSet(false)) {
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Partitions assigned: " + tps);
|
||||
}
|
||||
List<TopicPartition> toSeek = tps.stream()
|
||||
.filter(tp -> {
|
||||
boolean shouldSeek = !sought.contains(tp);
|
||||
sought.add(tp);
|
||||
return shouldSeek;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
if (toSeek.size() > 0) {
|
||||
if ("earliest".equals(resetTo)) {
|
||||
consumer.seekToBeginning(tps);
|
||||
consumer.seekToBeginning(toSeek);
|
||||
}
|
||||
else if ("latest".equals(resetTo)) {
|
||||
consumer.seekToEnd(tps);
|
||||
consumer.seekToEnd(toSeek);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
@@ -3007,6 +3008,81 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testResetOffsets() throws Exception {
|
||||
Binding<?> producerBinding = null;
|
||||
Binding<?> consumerBinding = null;
|
||||
try {
|
||||
String testPayload = "test";
|
||||
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
createProducerBindingProperties(producerProperties));
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setConcurrency(2);
|
||||
consumerProperties.setInstanceCount(5); // 10 partitions across 2 threads
|
||||
consumerProperties.getExtension().setResetOffsets(true);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel,
|
||||
producerProperties);
|
||||
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "testReset",
|
||||
moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
IntStream.range(0, 10).forEach(i -> moduleOutputChannel.send(MessageBuilder.withPayload(testPayload)
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
|
||||
.setHeader(KafkaHeaders.PARTITION_ID, i)
|
||||
.build()));
|
||||
CountDownLatch latch1 = new CountDownLatch(10);
|
||||
CountDownLatch latch2 = new CountDownLatch(20);
|
||||
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
|
||||
AtomicInteger received = new AtomicInteger();
|
||||
moduleInputChannel.subscribe(message1 -> {
|
||||
try {
|
||||
inboundMessageRef.set((Message<byte[]>) message1);
|
||||
}
|
||||
finally {
|
||||
received.incrementAndGet();
|
||||
latch1.countDown();
|
||||
latch2.countDown();
|
||||
}
|
||||
});
|
||||
assertThat(latch1.await(10, TimeUnit.SECONDS)).as("Failed to receive messages").isTrue();
|
||||
consumerBinding.unbind();
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "testReset",
|
||||
moduleInputChannel, consumerProperties);
|
||||
assertThat(latch2.await(10, TimeUnit.SECONDS)).as("Failed to receive message").isTrue();
|
||||
binder.bindConsumer(testTopicName + "-x", "testReset",
|
||||
moduleInputChannel, consumerProperties).unbind(); // cause another rebalance
|
||||
assertThat(received.get()).as("Unexpected reset").isEqualTo(20);
|
||||
|
||||
assertThat(inboundMessageRef.get()).isNotNull();
|
||||
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test".getBytes());
|
||||
assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType",
|
||||
MimeTypeUtils.TEXT_PLAIN);
|
||||
}
|
||||
finally {
|
||||
if (producerBinding != null) {
|
||||
producerBinding.unbind();
|
||||
}
|
||||
if (consumerBinding != null) {
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private final class FailingInvocationCountingMessageHandler
|
||||
implements MessageHandler {
|
||||
|
||||
|
||||
@@ -0,0 +1,152 @@
|
||||
/*
|
||||
* Copyright 2019-2019 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.integration;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import kafka.server.KafkaConfig;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.common.requests.IsolationLevel;
|
||||
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.BeanCreationException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.binder.BinderFactory;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.messaging.Source;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.core.ProducerFactory;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.kafka.transaction.KafkaTransactionManager;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.annotation.EnableTransactionManagement;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Gary Russell
|
||||
* @since 2.1.4
|
||||
*
|
||||
*/
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {
|
||||
"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=tx.",
|
||||
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.retries=99",
|
||||
"spring.cloud.stream.kafka.binder.transaction.producer.configuration.acks=all"})
|
||||
public class ProducerOnlyTransactionTests {
|
||||
|
||||
private static final String KAFKA_BROKERS_PROPERTY = "spring.cloud.stream.kafka.binder.brokers";
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "output")
|
||||
.brokerProperty(KafkaConfig.TransactionsTopicReplicationFactorProp(), "1")
|
||||
.brokerProperty(KafkaConfig.TransactionsTopicMinISRProp(), "1");
|
||||
|
||||
@Autowired
|
||||
private Sender sender;
|
||||
|
||||
@Autowired
|
||||
private MessageChannel output;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
System.setProperty(KAFKA_BROKERS_PROPERTY,
|
||||
embeddedKafka.getEmbeddedKafka().getBrokersAsString());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void clean() {
|
||||
System.clearProperty(KAFKA_BROKERS_PROPERTY);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testProducerTx() {
|
||||
this.sender.DoInTransaction(this.output);
|
||||
assertThat(this.sender.isInTx()).isTrue();
|
||||
Map<String, Object> props = KafkaTestUtils.consumerProps("consumeTx", "false",
|
||||
embeddedKafka.getEmbeddedKafka());
|
||||
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, IsolationLevel.READ_COMMITTED.name().toLowerCase());
|
||||
Consumer<?, ?> consumer = new KafkaConsumer<>(props);
|
||||
embeddedKafka.getEmbeddedKafka().consumeFromAllEmbeddedTopics(consumer);
|
||||
ConsumerRecord<?, ?> record = KafkaTestUtils.getSingleRecord(consumer, "output");
|
||||
assertThat(record.value()).isEqualTo("foo".getBytes());
|
||||
}
|
||||
|
||||
@EnableBinding(Source.class)
|
||||
@EnableAutoConfiguration
|
||||
@EnableTransactionManagement
|
||||
public static class Config {
|
||||
|
||||
@Bean
|
||||
public PlatformTransactionManager transactionManager(BinderFactory binders) {
|
||||
try {
|
||||
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
|
||||
MessageChannel.class)).getTransactionalProducerFactory();
|
||||
KafkaTransactionManager<byte[], byte[]> tm = new KafkaTransactionManager<>(pf);
|
||||
tm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
|
||||
return tm;
|
||||
}
|
||||
catch (BeanCreationException e) { // needed to avoid other tests in this package failing when there is no binder
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Sender sender() {
|
||||
return new Sender();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Sender {
|
||||
|
||||
private boolean isInTx;
|
||||
|
||||
@Transactional
|
||||
public void DoInTransaction(MessageChannel output) {
|
||||
this.isInTx = TransactionSynchronizationManager.isActualTransactionActive();
|
||||
output.send(new GenericMessage<>("foo"));
|
||||
}
|
||||
|
||||
public boolean isInTx() {
|
||||
return this.isInTx;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user