Fix regression on TopicExistsException

Fix #116
This commit is contained in:
Marius Bogoevici
2017-03-31 12:42:19 -04:00
committed by Thomas Risberg
parent 88d4b8eef5
commit 47aaf29e3e

View File

@@ -201,8 +201,22 @@ public class KafkaTopicProvisioner implements ProvisioningProvider<ExtendedConsu
@Override
public Object doWithRetry(RetryContext context) throws RuntimeException {
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
try {
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
configurationProperties.getReplicationFactor(), new Properties());
}
catch (Exception e) {
String exceptionClass = e.getClass().getName();
if (exceptionClass.equals("kafka.common.TopicExistsException")
|| exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")) {
if (logger.isWarnEnabled()) {
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
}
}
else {
throw e;
}
}
return null;
}
});