Recycle KafkaStreams Objects

In the event Kafka Streams bindings are restarted (stop/start)
using the actuator bindings endpoints, the underlying KafkaStreams
objects are not recycled. After restarting, it still sees the previous
KafkaStreams object. Addressing this issue.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1119
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1120
This commit is contained in:
Soby Chacko
2021-08-12 15:29:46 -04:00
parent d7907bbdcc
commit f4d3715317
7 changed files with 59 additions and 9 deletions

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams; package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.GlobalKTable;
import org.springframework.cloud.stream.binder.AbstractBinder; import org.springframework.cloud.stream.binder.AbstractBinder;
@@ -58,16 +59,18 @@ public class GlobalKTableBinder extends
// @checkstyle:off // @checkstyle:off
private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties(); private KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties = new KafkaStreamsExtendedBindingProperties();
private final KafkaStreamsRegistry kafkaStreamsRegistry;
// @checkstyle:on // @checkstyle:on
public GlobalKTableBinder( public GlobalKTableBinder(
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner, KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue) { KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsRegistry kafkaStreamsRegistry) {
this.binderConfigurationProperties = binderConfigurationProperties; this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner; this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue; this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
} }
@Override @Override
@@ -97,9 +100,17 @@ public class GlobalKTableBinder extends
return true; return true;
} }
@Override
public synchronized void start() {
super.start();
GlobalKTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
@Override @Override
public synchronized void stop() { public synchronized void stop() {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop(); super.stop();
GlobalKTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean); KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
} }
}; };

View File

@@ -59,10 +59,11 @@ public class GlobalKTableBinderConfiguration {
KafkaTopicProvisioner kafkaTopicProvisioner, KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties) { @Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties,
KafkaStreamsRegistry kafkaStreamsRegistry) {
GlobalKTableBinder globalKTableBinder = new GlobalKTableBinder(binderConfigurationProperties, GlobalKTableBinder globalKTableBinder = new GlobalKTableBinder(binderConfigurationProperties,
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue); kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue, kafkaStreamsRegistry);
globalKTableBinder.setKafkaStreamsExtendedBindingProperties( globalKTableBinder.setKafkaStreamsExtendedBindingProperties(
kafkaStreamsExtendedBindingProperties); kafkaStreamsExtendedBindingProperties);
return globalKTableBinder; return globalKTableBinder;

View File

@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -78,16 +79,19 @@ class KStreamBinder extends
private final KeyValueSerdeResolver keyValueSerdeResolver; private final KeyValueSerdeResolver keyValueSerdeResolver;
private final KafkaStreamsRegistry kafkaStreamsRegistry;
KStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, KStreamBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner, KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate, KafkaStreamsMessageConversionDelegate kafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue, KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver) { KeyValueSerdeResolver keyValueSerdeResolver, KafkaStreamsRegistry kafkaStreamsRegistry) {
this.binderConfigurationProperties = binderConfigurationProperties; this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner; this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate; this.kafkaStreamsMessageConversionDelegate = kafkaStreamsMessageConversionDelegate;
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue; this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.keyValueSerdeResolver = keyValueSerdeResolver; this.keyValueSerdeResolver = keyValueSerdeResolver;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
} }
@Override @Override
@@ -125,9 +129,17 @@ class KStreamBinder extends
return true; return true;
} }
@Override
public synchronized void start() {
super.start();
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
@Override @Override
public synchronized void stop() { public synchronized void stop() {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop(); super.stop();
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean); KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
} }
}; };
@@ -178,9 +190,17 @@ class KStreamBinder extends
return false; return false;
} }
@Override
public synchronized void start() {
super.start();
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
@Override @Override
public synchronized void stop() { public synchronized void stop() {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop(); super.stop();
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean); KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
} }
}; };

View File

@@ -59,10 +59,10 @@ public class KStreamBinderConfiguration {
KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate, KafkaStreamsMessageConversionDelegate KafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue, KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue,
KeyValueSerdeResolver keyValueSerdeResolver, KeyValueSerdeResolver keyValueSerdeResolver,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties) { KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KafkaStreamsRegistry kafkaStreamsRegistry) {
KStreamBinder kStreamBinder = new KStreamBinder(binderConfigurationProperties, KStreamBinder kStreamBinder = new KStreamBinder(binderConfigurationProperties,
kafkaTopicProvisioner, KafkaStreamsMessageConversionDelegate, kafkaTopicProvisioner, KafkaStreamsMessageConversionDelegate,
KafkaStreamsBindingInformationCatalogue, keyValueSerdeResolver); KafkaStreamsBindingInformationCatalogue, keyValueSerdeResolver, kafkaStreamsRegistry);
kStreamBinder.setKafkaStreamsExtendedBindingProperties( kStreamBinder.setKafkaStreamsExtendedBindingProperties(
kafkaStreamsExtendedBindingProperties); kafkaStreamsExtendedBindingProperties);
return kStreamBinder; return kStreamBinder;

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.stream.binder.kafka.streams; package org.springframework.cloud.stream.binder.kafka.streams;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.KTable;
import org.springframework.cloud.stream.binder.AbstractBinder; import org.springframework.cloud.stream.binder.AbstractBinder;
@@ -59,12 +60,15 @@ class KTableBinder extends
// @checkstyle:on // @checkstyle:on
private final KafkaStreamsRegistry kafkaStreamsRegistry;
KTableBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties, KTableBinder(KafkaStreamsBinderConfigurationProperties binderConfigurationProperties,
KafkaTopicProvisioner kafkaTopicProvisioner, KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue) { KafkaStreamsBindingInformationCatalogue KafkaStreamsBindingInformationCatalogue, KafkaStreamsRegistry kafkaStreamsRegistry) {
this.binderConfigurationProperties = binderConfigurationProperties; this.binderConfigurationProperties = binderConfigurationProperties;
this.kafkaTopicProvisioner = kafkaTopicProvisioner; this.kafkaTopicProvisioner = kafkaTopicProvisioner;
this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue; this.kafkaStreamsBindingInformationCatalogue = KafkaStreamsBindingInformationCatalogue;
this.kafkaStreamsRegistry = kafkaStreamsRegistry;
} }
@Override @Override
@@ -97,9 +101,17 @@ class KTableBinder extends
return true; return true;
} }
@Override
public synchronized void start() {
super.start();
KTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
}
@Override @Override
public synchronized void stop() { public synchronized void stop() {
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
super.stop(); super.stop();
KTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean); KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
} }
}; };

View File

@@ -59,9 +59,10 @@ public class KTableBinderConfiguration {
KafkaTopicProvisioner kafkaTopicProvisioner, KafkaTopicProvisioner kafkaTopicProvisioner,
KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties,
KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue,
@Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties) { @Qualifier("streamConfigGlobalProperties") Map<String, Object> streamConfigGlobalProperties,
KafkaStreamsRegistry kafkaStreamsRegistry) {
KTableBinder kTableBinder = new KTableBinder(binderConfigurationProperties, KTableBinder kTableBinder = new KTableBinder(binderConfigurationProperties,
kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue); kafkaTopicProvisioner, kafkaStreamsBindingInformationCatalogue, kafkaStreamsRegistry);
kTableBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties); kTableBinder.setKafkaStreamsExtendedBindingProperties(kafkaStreamsExtendedBindingProperties);
return kTableBinder; return kTableBinder;
} }

View File

@@ -62,6 +62,11 @@ public class KafkaStreamsRegistry {
this.streamsBuilderFactoryBeanMap.put(kafkaStreams, streamsBuilderFactoryBean); this.streamsBuilderFactoryBeanMap.put(kafkaStreams, streamsBuilderFactoryBean);
} }
void unregisterKafkaStreams(KafkaStreams kafkaStreams) {
this.kafkaStreams.remove(kafkaStreams);
this.streamsBuilderFactoryBeanMap.remove(kafkaStreams);
}
/** /**
* *
* @param kafkaStreams {@link KafkaStreams} object * @param kafkaStreams {@link KafkaStreams} object