diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index b3211ed2..6a4973de 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -419,10 +419,14 @@ public class KafkaMessageChannelBinder extends List interceptors = ((InterceptableChannel) channel) .getInterceptors(); interceptors.forEach((interceptor) -> { - if (interceptor instanceof PartitioningInterceptor || interceptor instanceof DefaultPartitioningInterceptor) { + if (interceptor instanceof PartitioningInterceptor) { ((PartitioningInterceptor) interceptor) .setPartitionCount(partitions.size()); } + else if (interceptor instanceof DefaultPartitioningInterceptor) { + ((DefaultPartitioningInterceptor) interceptor) + .setPartitionCount(partitions.size()); + } }); }