Kafka Streams binding lifecycle changes
Start Kafka Streams bindgings only when they are not running. Similarly, stop them only if they are running. Without these guards in the bindings for KStream, KTable and GlobalKTable, it may cause NPE's due to the backing concurrent collections in KafkaStreamsRegistry not finding the proper KafkaStreams object, especially when the StreamsBuilderFactory bean is already stopped through the binder provided manager.
This commit is contained in:
@@ -102,16 +102,20 @@ public class GlobalKTableBinder extends
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
super.start();
|
||||
GlobalKTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
GlobalKTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
GlobalKTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
if (streamsBuilderFactoryBean.isRunning()) {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
GlobalKTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -131,16 +131,20 @@ class KStreamBinder extends
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
super.start();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
if (streamsBuilderFactoryBean.isRunning()) {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -192,16 +196,20 @@ class KStreamBinder extends
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
super.start();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
if (streamsBuilderFactoryBean.isRunning()) {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
KStreamBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -103,16 +103,20 @@ class KTableBinder extends
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
super.start();
|
||||
KTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
if (!streamsBuilderFactoryBean.isRunning()) {
|
||||
super.start();
|
||||
KTableBinder.this.kafkaStreamsRegistry.registerKafkaStreams(streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void stop() {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
KTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
if (streamsBuilderFactoryBean.isRunning()) {
|
||||
final KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
super.stop();
|
||||
KTableBinder.this.kafkaStreamsRegistry.unregisterKafkaStreams(kafkaStreams);
|
||||
KafkaStreamsBinderUtils.closeDlqProducerFactories(kafkaStreamsBindingInformationCatalogue, streamsBuilderFactoryBean);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user