Fix concurrency issues with Kafka Streams binder

See this commit on the master branch for details:
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/commit/1cc50c1a
This commit is contained in:
Soby Chacko
2020-02-13 15:00:35 -05:00
parent fc1e4602fb
commit 32adf574ac
2 changed files with 4 additions and 4 deletions

View File

@@ -466,8 +466,8 @@ public class KafkaStreamsFunctionProcessor implements ApplicationContextAware {
}
int concurrency = this.bindingServiceProperties.getConsumerProperties(inboundName).getConcurrency();
// override concurrency if set at the individual binding level.
if (concurrency > 1) {
// override concurrency (set potentially on the binding).
if (concurrency >= 1) {
streamConfigGlobalProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, concurrency);
}

View File

@@ -539,8 +539,8 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator
int concurrency = this.bindingServiceProperties.getConsumerProperties(inboundName)
.getConcurrency();
// override concurrency if set at the individual binding level.
if (concurrency > 1) {
// override concurrency (set potentially on the binding).
if (concurrency >= 1) {
streamConfigGlobalProperties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
concurrency);
}