From e7bf404fce3c2a99ebe11aadd1d0dd3b60f6fa16 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Wed, 24 Nov 2021 13:12:52 -0500 Subject: [PATCH] Fix PartitioningInterceptor CCE The newly added DefaultPartitioningInteceptor must be explicitly checked in order to avoid a CCE. Related to resolving https://github.com/spring-cloud/spring-cloud-stream/issues/2245 Specifically for this: https://github.com/spring-cloud/spring-cloud-stream/issues/2245#issuecomment-977663452 --- .../stream/binder/kafka/KafkaMessageChannelBinder.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index b3211ed2..6a4973de 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -419,10 +419,14 @@ public class KafkaMessageChannelBinder extends List interceptors = ((InterceptableChannel) channel) .getInterceptors(); interceptors.forEach((interceptor) -> { - if (interceptor instanceof PartitioningInterceptor || interceptor instanceof DefaultPartitioningInterceptor) { + if (interceptor instanceof PartitioningInterceptor) { ((PartitioningInterceptor) interceptor) .setPartitionCount(partitions.size()); } + else if (interceptor instanceof DefaultPartitioningInterceptor) { + ((DefaultPartitioningInterceptor) interceptor) + .setPartitionCount(partitions.size()); + } }); }