Fix PartitioningInterceptor CCE

The newly added DefaultPartitioningInteceptor must be explicitly
checked in order to avoid a CCE.

Related to resolving https://github.com/spring-cloud/spring-cloud-stream/issues/2245

Specifically for this: https://github.com/spring-cloud/spring-cloud-stream/issues/2245#issuecomment-977663452
This commit is contained in:
Soby Chacko
2021-11-24 13:12:52 -05:00
parent e9a8b4af7e
commit e7bf404fce

View File

@@ -419,10 +419,14 @@ public class KafkaMessageChannelBinder extends
List<ChannelInterceptor> interceptors = ((InterceptableChannel) channel)
.getInterceptors();
interceptors.forEach((interceptor) -> {
if (interceptor instanceof PartitioningInterceptor || interceptor instanceof DefaultPartitioningInterceptor) {
if (interceptor instanceof PartitioningInterceptor) {
((PartitioningInterceptor) interceptor)
.setPartitionCount(partitions.size());
}
else if (interceptor instanceof DefaultPartitioningInterceptor) {
((DefaultPartitioningInterceptor) interceptor)
.setPartitionCount(partitions.size());
}
});
}