Recycle KafkaStreams Objects

In the event Kafka Streams bindings are restarted (stop/start)
using the actuator bindings endpoints, the underlying KafkaStreams
objects are not recycled. After restarting, it still sees the previous
KafkaStreams object. Addressing this issue.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1119
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1120
This commit is contained in:
Soby Chacko
2021-08-12 15:29:46 -04:00
parent e6602f235e
commit 389b45d6bd
7 changed files with 59 additions and 9 deletions

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.springframework.cloud.stream.binder.AbstractBinder;
@@ -58,16 +59,18 @@ public class GlobalKTableBinder extends
// @checkstyle:off
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
private final KafkaStreamsRegistry kafkaStreamsRegistry;
// @checkstyle:on
public GlobalKTableBinder(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) {
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsRegistry kafkaStreamsRegistry) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
}
@Override
@@ -97,9 +100,17 @@ public class GlobalKTableBinder extends
return true;
}
@Override
public synchronized void start() {
super.start();
GlobalKTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
@Override
public synchronized void stop() {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop();
GlobalKTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
};

View File

@@ -59,10 +59,11 @@ public class GlobalKTableBinderConfiguration {
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties) {
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties,
KafkaStreamsRegistry kafkaStreamsRegistry) {
GlobalKTableBinder globalKTableBinder = new GlobalKTableBinder(binderConfigurationProperties,
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue);
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue, kafkaStreamsRegistry);
globalKTableBinder.setKafkaStreamsExtendedBindingProperties(
kafkaStreamsExtendedBindingProperties);
return globalKTableBinder;

View File

@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -78,16 +79,19 @@ class KStreamBinder extends
private final KeyValueSerdeResolver keyValueSerdeResolver;
private final KafkaStreamsRegistry kafkaStreamsRegistry;
KStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver) {
KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsRegistry kafkaStreamsRegistry) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.keyValueSerdeResolver = keyValueSerdeResolver;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
}
@Override
@@ -125,9 +129,17 @@ class KStreamBinder extends
return true;
}
@Override
public synchronized void start() {
super.start();
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
@Override
public synchronized void stop() {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop();
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
};
@@ -178,9 +190,17 @@ class KStreamBinder extends
return false;
}
@Override
public synchronized void start() {
super.start();
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
@Override
public synchronized void stop() {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop();
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
};

View File

@@ -59,10 +59,10 @@ public class KStreamBinderConfiguration {
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) {
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KafkaStreamsRegistry kafkaStreamsRegistry) {
KStreamBinder kStreamBinder = new KStreamBinder(binderConfigurationProperties,
kafkaTopicProvisioner, KafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue, keyValueSerdeResolver);
KafkaStreamsBindingInformationCatalogue, keyValueSerdeResolver, kafkaStreamsRegistry);
kStreamBinder.setKafkaStreamsExtendedBindingProperties(
kafkaStreamsExtendedBindingProperties);
return kStreamBinder;

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.cloud.stream.binder.AbstractBinder;
@@ -59,12 +60,15 @@ class KTableBinder extends
// @checkstyle:on
private final KafkaStreamsRegistry kafkaStreamsRegistry;
KTableBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) {
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue, KafkaStreamsRegistry kafkaStreamsRegistry) {
this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
}
@Override
@@ -97,9 +101,17 @@ class KTableBinder extends
return true;
}
@Override
public synchronized void start() {
super.start();
KTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
@Override
public synchronized void stop() {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop();
KTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
}
};

View File

@@ -59,9 +59,10 @@ public class KTableBinderConfiguration {
KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties) {
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties,
KafkaStreamsRegistry kafkaStreamsRegistry) {
KTableBinder kTableBinder = new KTableBinder(binderConfigurationProperties,
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue);
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue, kafkaStreamsRegistry);
kTableBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties);
return kTableBinder;
}

View File

@@ -62,6 +62,11 @@ public class KafkaStreamsRegistry {
this.streamsBuilderFactoryBeanMap.put(kafkaStreams, streamsBuilderFactoryBean);
}
void unregisterKafkaStreams(KafkaStreams kafkaStreams) {
this.kafkaStreams.remove(kafkaStreams);
this.streamsBuilderFactoryBeanMap.remove(kafkaStreams);
}
/**
*
* @param kafkaStreams {@link KafkaStreams} object