Ensure that topic is created where broker automatically creates it
This commit is contained in:
@@ -1012,6 +1012,9 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
QueueChannel input = new QueueChannel();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, output,
|
||||
createProducerProperties());
|
||||
|
||||
waitUntilTopicCreated(getZkUtils(configurationProperties), testTopicName, 5000);
|
||||
|
||||
String testPayload = "foo1-" + UUID.randomUUID().toString();
|
||||
output.send(new GenericMessage<>(testPayload.getBytes()));
|
||||
|
||||
@@ -1024,6 +1027,21 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
private void waitUntilTopicCreated(ZkUtils zkUtils, String testTopicName, int timeoutInMillis) throws Exception {
|
||||
int partitionSize = 0;
|
||||
long currentTimeMillis = System.currentTimeMillis();
|
||||
do {
|
||||
try {
|
||||
partitionSize = invokePartitionSize(testTopicName, zkUtils);
|
||||
Thread.sleep(100);
|
||||
}
|
||||
catch (Exception e) {
|
||||
//falls through
|
||||
}
|
||||
}
|
||||
while (partitionSize == 0 && (currentTimeMillis + timeoutInMillis) > System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoConfigureTopicsDisabledSucceedsIfTopicExisting() throws Exception {
|
||||
|
||||
Reference in New Issue
Block a user