GH-677: Fix resetOffsets with concurrency
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/677 The logic for resetting offsets only on the initial assignment used a simple boolean; this is insufficient when concurrency is > 1. Use a concurrent set instead to determine whether or not a particular topic/partition has been sought. Also, change the `initial` argument on `KafkaBindingRebalanceListener.onPartitionsAssigned()` to be derived from a `ThreadLocal` and add javadocs about retaining the state by partition. **backport to all supported versions** (Except `KafkaBindingRebalanceListener` which did not exist before 2.1.x) polishing polishing
This commit is contained in:
committed by
Soby Chacko
parent
6d14276d10
commit
fba5dbb22f
@@ -55,11 +55,16 @@ public interface KafkaBindingRebalanceListener {
|
||||
|
||||
/**
|
||||
* Invoked when partitions are initially assigned or after a rebalance. Applications
|
||||
* might only want to perform seek operations on an initial assignment.
|
||||
* might only want to perform seek operations on an initial assignment. While the
|
||||
* 'initial' argument is true for each thread (when concurrency is greater than 1),
|
||||
* implementations should keep track of exactly which partitions have been sought.
|
||||
* There is a race in that a rebalance could occur during startup and so a topic/
|
||||
* partition that has been sought on one thread may be re-assigned to another
|
||||
* thread and you may not wish to re-seek it at that time.
|
||||
* @param bindingName the name of the binding.
|
||||
* @param consumer the consumer.
|
||||
* @param partitions the partitions.
|
||||
* @param initial true if this is the initial assignment.
|
||||
* @param initial true if this is the initial assignment on the current thread.
|
||||
*/
|
||||
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> partitions, boolean initial) {
|
||||
|
||||
@@ -28,9 +28,9 @@ import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.regex.Pattern;
|
||||
@@ -581,6 +581,7 @@ public class KafkaMessageChannelBinder extends
|
||||
public void setupRebalanceListener(
|
||||
final ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties,
|
||||
final ContainerProperties containerProperties) {
|
||||
|
||||
Assert.isTrue(!extendedConsumerProperties.getExtension().isResetOffsets(),
|
||||
"'resetOffsets' cannot be set when a KafkaBindingRebalanceListener is provided");
|
||||
final String bindingName = bindingNameHolder.get();
|
||||
@@ -590,7 +591,7 @@ public class KafkaMessageChannelBinder extends
|
||||
containerProperties
|
||||
.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
|
||||
private boolean initial = true;
|
||||
private final ThreadLocal<Boolean> initialAssignment = new ThreadLocal<>();
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
|
||||
@@ -612,11 +613,15 @@ public class KafkaMessageChannelBinder extends
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> partitions) {
|
||||
try {
|
||||
Boolean initial = this.initialAssignment.get();
|
||||
if (initial == null) {
|
||||
initial = Boolean.TRUE;
|
||||
}
|
||||
userRebalanceListener.onPartitionsAssigned(bindingName,
|
||||
consumer, partitions, this.initial);
|
||||
consumer, partitions, initial);
|
||||
}
|
||||
finally {
|
||||
this.initial = false;
|
||||
this.initialAssignment.set(Boolean.FALSE);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -663,20 +668,22 @@ public class KafkaMessageChannelBinder extends
|
||||
boolean resetOffsets = extendedConsumerProperties.getExtension().isResetOffsets();
|
||||
final Object resetTo = consumerFactory.getConfigurationProperties()
|
||||
.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
|
||||
final AtomicBoolean initialAssignment = new AtomicBoolean(true);
|
||||
if (!"earliest".equals(resetTo) && !"latest".equals(resetTo)) {
|
||||
logger.warn("no (or unknown) " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
|
||||
+ " property cannot reset");
|
||||
resetOffsets = false;
|
||||
}
|
||||
if (groupManagement && resetOffsets) {
|
||||
containerProperties
|
||||
.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
Set<TopicPartition> sought = ConcurrentHashMap.newKeySet();
|
||||
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
|
||||
|
||||
@Override
|
||||
public void onPartitionsRevokedBeforeCommit(
|
||||
Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
// no op
|
||||
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Partitions revoked: " + tps);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -686,14 +693,23 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer,
|
||||
Collection<TopicPartition> tps) {
|
||||
if (initialAssignment.getAndSet(false)) {
|
||||
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> tps) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Partitions assigned: " + tps);
|
||||
}
|
||||
List<TopicPartition> toSeek = tps.stream()
|
||||
.filter(tp -> {
|
||||
boolean shouldSeek = !sought.contains(tp);
|
||||
sought.add(tp);
|
||||
return shouldSeek;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
if (toSeek.size() > 0) {
|
||||
if ("earliest".equals(resetTo)) {
|
||||
consumer.seekToBeginning(tps);
|
||||
consumer.seekToBeginning(toSeek);
|
||||
}
|
||||
else if ("latest".equals(resetTo)) {
|
||||
consumer.seekToEnd(tps);
|
||||
consumer.seekToEnd(toSeek);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
@@ -3007,6 +3008,81 @@ public class KafkaBinderTests extends
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testResetOffsets() throws Exception {
|
||||
Binding<?> producerBinding = null;
|
||||
Binding<?> consumerBinding = null;
|
||||
try {
|
||||
String testPayload = "test";
|
||||
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties = createProducerProperties();
|
||||
|
||||
DirectChannel moduleOutputChannel = createBindableChannel("output",
|
||||
createProducerBindingProperties(producerProperties));
|
||||
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
consumerProperties.setConcurrency(2);
|
||||
consumerProperties.setInstanceCount(5); // 10 partitions across 2 threads
|
||||
consumerProperties.getExtension().setResetOffsets(true);
|
||||
|
||||
DirectChannel moduleInputChannel = createBindableChannel("input",
|
||||
createConsumerBindingProperties(consumerProperties));
|
||||
|
||||
String testTopicName = "existing" + System.currentTimeMillis();
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
configurationProperties.setAutoAddPartitions(true);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
producerBinding = binder.bindProducer(testTopicName, moduleOutputChannel,
|
||||
producerProperties);
|
||||
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "testReset",
|
||||
moduleInputChannel, consumerProperties);
|
||||
// Let the consumer actually bind to the producer before sending a msg
|
||||
binderBindUnbindLatency();
|
||||
IntStream.range(0, 10).forEach(i -> moduleOutputChannel.send(MessageBuilder.withPayload(testPayload)
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN)
|
||||
.setHeader(KafkaHeaders.PARTITION_ID, i)
|
||||
.build()));
|
||||
CountDownLatch latch1 = new CountDownLatch(10);
|
||||
CountDownLatch latch2 = new CountDownLatch(20);
|
||||
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
|
||||
AtomicInteger received = new AtomicInteger();
|
||||
moduleInputChannel.subscribe(message1 -> {
|
||||
try {
|
||||
inboundMessageRef.set((Message<byte[]>) message1);
|
||||
}
|
||||
finally {
|
||||
received.incrementAndGet();
|
||||
latch1.countDown();
|
||||
latch2.countDown();
|
||||
}
|
||||
});
|
||||
assertThat(latch1.await(10, TimeUnit.SECONDS)).as("Failed to receive messages").isTrue();
|
||||
consumerBinding.unbind();
|
||||
consumerBinding = binder.bindConsumer(testTopicName, "testReset",
|
||||
moduleInputChannel, consumerProperties);
|
||||
assertThat(latch2.await(10, TimeUnit.SECONDS)).as("Failed to receive message").isTrue();
|
||||
binder.bindConsumer(testTopicName + "-x", "testReset",
|
||||
moduleInputChannel, consumerProperties).unbind(); // cause another rebalance
|
||||
assertThat(received.get()).as("Unexpected reset").isEqualTo(20);
|
||||
|
||||
assertThat(inboundMessageRef.get()).isNotNull();
|
||||
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test".getBytes());
|
||||
assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType",
|
||||
MimeTypeUtils.TEXT_PLAIN);
|
||||
}
|
||||
finally {
|
||||
if (producerBinding != null) {
|
||||
producerBinding.unbind();
|
||||
}
|
||||
if (consumerBinding != null) {
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private final class FailingInvocationCountingMessageHandler
|
||||
implements MessageHandler {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user