Making AdminUtils optional
polishing, docs Addressing PR review comments Polishing: Refactoring topic creation methods Addressing PR comments: Use ClassUtils, Kafka AppInfoParser and polishing
This commit is contained in:
@@ -271,3 +271,32 @@ Then add these dependencies at the top of the `<dependencies>` section in the po
|
||||
The versions above are provided only for the sake of the example.
|
||||
For best results, we recommend using the most recent 0.10-compatible versions of the projects.
|
||||
====
|
||||
|
||||
==== Excluding Kafka broker jar from the classpath of the binder based application
|
||||
If the inclusion of the Kafka broker is not desired at runtime, Kafka binder allows this particular dependency to be excluded from the binder so that it will not be part of the classpath.
|
||||
However, topic creation will be disabled from the binder if this dependency is excluded.
|
||||
|
||||
If you use Kafka 10 dependencies as advised above, all you have to do is not to include the kafka broker dependency.
|
||||
If you use Kafka 0.9, then ensure that you exclude the kafka broker jar from the `spring-cloud-starter-stream-kafka` dependency as following.
|
||||
|
||||
[source,xml]
|
||||
----
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka_2.11</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
----
|
||||
|
||||
If you exclude the kafka broker dependency and the topic is not present on the server, then Kafka broker will create the topic if auto topic creation is enabled on the server.
|
||||
Please keep in mind that if you are relying on this, then the Kafka server will use the default number of partitions and replication factors.
|
||||
On the other hand, if auto topic creation is disabled on the server, then care must be taken before running the application to create the topic with the desired number of partitions.
|
||||
|
||||
If you want to have full control over how partitions are allocated, then leave the default settings as they are, i.e. do not exclude the kafka broker jar and ensure that `spring.cloud.stream.kafka.binder.autoCreateTopics` is set to `true` which is the default.
|
||||
|
||||
|
||||
|
||||
@@ -198,8 +198,8 @@ public class KafkaMessageChannelBinder extends
|
||||
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
|
||||
|
||||
KafkaTopicUtils.validateTopicName(destination);
|
||||
|
||||
Collection<PartitionInfo> partitions = ensureTopicCreated(destination, producerProperties.getPartitionCount());
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(destination, producerProperties.getPartitionCount());
|
||||
Collection<PartitionInfo> partitions = getPartitionsForTopic(destination, producerProperties.getPartitionCount());
|
||||
|
||||
if (producerProperties.getPartitionCount() < partitions.size()) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
@@ -221,13 +221,13 @@ public class KafkaMessageChannelBinder extends
|
||||
|
||||
@Override
|
||||
protected String createProducerDestinationIfNecessary(String name,
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
ExtendedProducerProperties<KafkaProducerProperties> properties) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("Using kafka topic for outbound: " + name);
|
||||
}
|
||||
KafkaTopicUtils.validateTopicName(name);
|
||||
|
||||
Collection<PartitionInfo> partitions = ensureTopicCreated(name, properties.getPartitionCount());
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, properties.getPartitionCount());
|
||||
Collection<PartitionInfo> partitions = getPartitionsForTopic(name, properties.getPartitionCount());
|
||||
if (properties.getPartitionCount() < partitions.size()) {
|
||||
if (this.logger.isInfoEnabled()) {
|
||||
this.logger.info("The `partitionCount` of the producer for topic " + name + " is "
|
||||
@@ -271,9 +271,9 @@ public class KafkaMessageChannelBinder extends
|
||||
if (properties.getInstanceCount() == 0) {
|
||||
throw new IllegalArgumentException("Instance count cannot be zero");
|
||||
}
|
||||
|
||||
Collection<PartitionInfo> allPartitions = ensureTopicCreated(name,
|
||||
properties.getInstanceCount() * properties.getConcurrency());
|
||||
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();
|
||||
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(name, partitionCount);
|
||||
Collection<PartitionInfo> allPartitions = getPartitionsForTopic(name, partitionCount);
|
||||
|
||||
Collection<PartitionInfo> listenedPartitions;
|
||||
|
||||
@@ -297,7 +297,7 @@ public class KafkaMessageChannelBinder extends
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
protected MessageProducer createConsumerEndpoint(String name, String group, Collection<PartitionInfo> destination,
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
|
||||
boolean anonymous = !StringUtils.hasText(group);
|
||||
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
|
||||
"DLQ support is not available for anonymous subscriptions");
|
||||
@@ -427,11 +427,24 @@ public class KafkaMessageChannelBinder extends
|
||||
return topicPartitionInitialOffsets;
|
||||
}
|
||||
|
||||
private void createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(final String topicName, final int partitionCount) {
|
||||
if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation != null) {
|
||||
createTopicAndPartitions(topicName, partitionCount);
|
||||
}
|
||||
else if (this.configurationProperties.isAutoCreateTopics() && adminUtilsOperation == null) {
|
||||
this.logger.warn("Auto creation of topics is enabled, but Kafka AdminUtils class is not present on the classpath. " +
|
||||
"No topic will be created by the binder");
|
||||
}
|
||||
else if (!this.configurationProperties.isAutoCreateTopics()) {
|
||||
this.logger.warn("Auto creation of topics is disabled.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Kafka topic if needed, or try to increase its partition count to the
|
||||
* desired number.
|
||||
*/
|
||||
private Collection<PartitionInfo> ensureTopicCreated(final String topicName, final int partitionCount) {
|
||||
private void createTopicAndPartitions(final String topicName, final int partitionCount) {
|
||||
|
||||
final ZkUtils zkUtils = ZkUtils.apply(this.configurationProperties.getZkConnectionString(),
|
||||
this.configurationProperties.getZkSessionTimeout(),
|
||||
@@ -459,62 +472,59 @@ public class KafkaMessageChannelBinder extends
|
||||
}
|
||||
}
|
||||
else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
|
||||
if (this.configurationProperties.isAutoCreateTopics()) {
|
||||
// always consider minPartitionCount for topic creation
|
||||
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
|
||||
partitionCount);
|
||||
// always consider minPartitionCount for topic creation
|
||||
final int effectivePartitionCount = Math.max(this.configurationProperties.getMinPartitionCount(),
|
||||
partitionCount);
|
||||
|
||||
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
|
||||
this.metadataRetryOperations.execute(new RetryCallback<Object, RuntimeException>() {
|
||||
|
||||
@Override
|
||||
public Object doWithRetry(RetryContext context) throws RuntimeException {
|
||||
@Override
|
||||
public Object doWithRetry(RetryContext context) throws RuntimeException {
|
||||
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
|
||||
configurationProperties.getReplicationFactor(), new Properties());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
else {
|
||||
throw new BinderException("Topic " + topicName + " does not exist");
|
||||
}
|
||||
adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount,
|
||||
configurationProperties.getReplicationFactor(), new Properties());
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
else {
|
||||
throw new BinderException("Error fetching Kafka topic metadata: ",
|
||||
ErrorMapping.exceptionFor(errorCode));
|
||||
}
|
||||
try {
|
||||
return this.metadataRetryOperations
|
||||
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
|
||||
Collection<PartitionInfo> partitions =
|
||||
getProducerFactory(
|
||||
new ExtendedProducerProperties<>(new KafkaProducerProperties()))
|
||||
.createProducer().partitionsFor(topicName);
|
||||
|
||||
// do a sanity check on the partition set
|
||||
if (partitions.size() < partitionCount) {
|
||||
throw new IllegalStateException("The number of expected partitions was: "
|
||||
+ partitionCount + ", but " + partitions.size()
|
||||
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
this.logger.error("Cannot initialize Binder", e);
|
||||
throw new BinderException("Cannot initialize binder:", e);
|
||||
}
|
||||
|
||||
}
|
||||
finally {
|
||||
zkUtils.close();
|
||||
}
|
||||
}
|
||||
|
||||
private Collection<PartitionInfo> getPartitionsForTopic(final String topicName, final int partitionCount) {
|
||||
try {
|
||||
return this.metadataRetryOperations
|
||||
.execute(new RetryCallback<Collection<PartitionInfo>, Exception>() {
|
||||
|
||||
@Override
|
||||
public Collection<PartitionInfo> doWithRetry(RetryContext context) throws Exception {
|
||||
Collection<PartitionInfo> partitions =
|
||||
getProducerFactory(
|
||||
new ExtendedProducerProperties<>(new KafkaProducerProperties()))
|
||||
.createProducer().partitionsFor(topicName);
|
||||
|
||||
// do a sanity check on the partition set
|
||||
if (partitions.size() < partitionCount) {
|
||||
throw new IllegalStateException("The number of expected partitions was: "
|
||||
+ partitionCount + ", but " + partitions.size()
|
||||
+ (partitions.size() > 1 ? " have " : " has ") + "been found instead");
|
||||
}
|
||||
return partitions;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception e) {
|
||||
this.logger.error("Cannot initialize Binder", e);
|
||||
throw new BinderException("Cannot initialize binder:", e);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void initDlqProducer() {
|
||||
try {
|
||||
if (this.dlqProducer == null) {
|
||||
|
||||
@@ -24,6 +24,7 @@ import java.util.Properties;
|
||||
import kafka.api.PartitionMetadata;
|
||||
import kafka.utils.ZkUtils;
|
||||
|
||||
import org.springframework.util.ClassUtils;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
@@ -31,13 +32,11 @@ import org.springframework.util.ReflectionUtils;
|
||||
*/
|
||||
public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
|
||||
private static ClassLoader CLASS_LOADER = Kafka10AdminUtilsOperation.class.getClassLoader();
|
||||
|
||||
private static Class<?> ADMIN_UTIL_CLASS;
|
||||
|
||||
static {
|
||||
try {
|
||||
ADMIN_UTIL_CLASS = CLASS_LOADER.loadClass("kafka.admin.AdminUtils");
|
||||
ADMIN_UTIL_CLASS = ClassUtils.forName("kafka.admin.AdminUtils", null);
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils class not found", e);
|
||||
@@ -77,7 +76,7 @@ public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
|
||||
|
||||
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
|
||||
Class<?> topicMetadataClass = CLASS_LOADER.loadClass("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata");
|
||||
Class<?> topicMetadataClass = ClassUtils.forName("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata", null);
|
||||
|
||||
Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "error");
|
||||
Object obj = errorCodeMethod.invoke(result);
|
||||
@@ -103,7 +102,7 @@ public class Kafka10AdminUtilsOperation implements AdminUtilsOperation {
|
||||
try {
|
||||
Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class);
|
||||
Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils);
|
||||
Class<?> topicMetadataClass = CLASS_LOADER.loadClass("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata");
|
||||
Class<?> topicMetadataClass = ClassUtils.forName("org.apache.kafka.common.requests.MetadataResponse$TopicMetadata", null);
|
||||
|
||||
Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionMetadata");
|
||||
List<PartitionMetadata> foo = (List<PartitionMetadata>) partitionsMetadata.invoke(result);
|
||||
|
||||
@@ -16,13 +16,14 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
|
||||
import kafka.admin.AdminUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.utils.AppInfoParser;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
@@ -58,7 +59,7 @@ import org.springframework.kafka.support.ProducerListener;
|
||||
@EnableConfigurationProperties({KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class})
|
||||
public class KafkaBinderConfiguration {
|
||||
|
||||
protected final Log logger = LogFactory.getLog(getClass());
|
||||
protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class);
|
||||
|
||||
@Autowired
|
||||
private Codec codec;
|
||||
@@ -75,6 +76,9 @@ public class KafkaBinderConfiguration {
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
|
||||
@Autowired (required = false)
|
||||
private AdminUtilsOperation adminUtilsOperation;
|
||||
|
||||
@Bean
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder() {
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
@@ -82,7 +86,6 @@ public class KafkaBinderConfiguration {
|
||||
kafkaMessageChannelBinder.setCodec(this.codec);
|
||||
//kafkaMessageChannelBinder.setProducerListener(producerListener);
|
||||
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
|
||||
AdminUtilsOperation adminUtilsOperation = context.getBean(AdminUtilsOperation.class);
|
||||
kafkaMessageChannelBinder.setAdminUtilsOperation(adminUtilsOperation);
|
||||
return kafkaMessageChannelBinder;
|
||||
}
|
||||
@@ -112,54 +115,21 @@ public class KafkaBinderConfiguration {
|
||||
return new Kafka10AdminUtilsOperation();
|
||||
}
|
||||
|
||||
private static Method getMethod(ClassLoader classLoader, String methodName) {
|
||||
try {
|
||||
Class<?> adminUtilClass = classLoader.loadClass("kafka.admin.AdminUtils");
|
||||
Method[] declaredMethods = adminUtilClass.getDeclaredMethods();
|
||||
for (Method m : declaredMethods) {
|
||||
if (m.getName().equals(methodName)) {
|
||||
return m;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
throw new IllegalStateException("AdminUtils not found", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@ConditionalOnClass(AdminUtils.class)
|
||||
static class Kafka10Condition implements Condition {
|
||||
|
||||
@Override
|
||||
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
|
||||
ClassLoader classLoader = Kafka10Condition.class.getClassLoader();
|
||||
Method addPartitions = getMethod(classLoader, "addPartitions");
|
||||
if (addPartitions != null) {
|
||||
Class<?>[] parameterTypes = addPartitions.getParameterTypes();
|
||||
Class clazz = parameterTypes[parameterTypes.length - 1];
|
||||
if (clazz.getName().equals("kafka.admin.RackAwareMode")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return AppInfoParser.getVersion().startsWith("0.10");
|
||||
}
|
||||
}
|
||||
|
||||
@ConditionalOnClass(AdminUtils.class)
|
||||
static class Kafka09Condition implements Condition {
|
||||
|
||||
@Override
|
||||
public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) {
|
||||
|
||||
ClassLoader classLoader = Kafka09Condition.class.getClassLoader();
|
||||
Method addPartitions = getMethod(classLoader, "addPartitions");
|
||||
if (addPartitions != null) {
|
||||
Class<?>[] parameterTypes = addPartitions.getParameterTypes();
|
||||
Class clazz = parameterTypes[parameterTypes.length - 1];
|
||||
if (!clazz.getName().equals("kafka.admin.RackAwareMode")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return AppInfoParser.getVersion().startsWith("0.9");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -60,7 +60,6 @@ import org.springframework.retry.policy.SimpleRetryPolicy;
|
||||
import org.springframework.retry.support.RetryTemplate;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
@@ -907,7 +906,7 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testAutoCreateTopicsDisabledFailsIfTopicMissing() throws Exception {
|
||||
public void testAutoCreateTopicsDisabledOnBinderStillWorksAsLongAsBrokerCreatesTopic() throws Exception {
|
||||
KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties();
|
||||
configurationProperties.setAutoCreateTopics(false);
|
||||
Binder binder = getBinder(configurationProperties);
|
||||
@@ -919,15 +918,21 @@ public abstract class KafkaBinderTests extends PartitionCapableBinderTests<Abstr
|
||||
setMetadataRetryOperations(binder, metatadataRetrievalRetryOperations);
|
||||
DirectChannel output = new DirectChannel();
|
||||
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties = createConsumerProperties();
|
||||
String testTopicName = "nonexisting" + System.currentTimeMillis();
|
||||
try {
|
||||
binder.bindConsumer(testTopicName, "test", output, consumerProperties);
|
||||
fail();
|
||||
}
|
||||
catch (Exception e) {
|
||||
assertThat(e).isInstanceOf(BinderException.class);
|
||||
assertThat(e).hasMessageContaining("Topic " + testTopicName + " does not exist");
|
||||
}
|
||||
String testTopicName = "createdByBroker-" + System.currentTimeMillis();
|
||||
|
||||
QueueChannel input = new QueueChannel();
|
||||
Binding<MessageChannel> producerBinding = binder.bindProducer(testTopicName, output,
|
||||
createProducerProperties());
|
||||
String testPayload = "foo1-" + UUID.randomUUID().toString();
|
||||
output.send(new GenericMessage<>(testPayload.getBytes()));
|
||||
|
||||
Binding<MessageChannel> consumerBinding = binder.bindConsumer(testTopicName, "test", input, consumerProperties);
|
||||
Message<byte[]> receivedMessage2 = (Message<byte[]>) receive(input);
|
||||
assertThat(receivedMessage2).isNotNull();
|
||||
assertThat(new String(receivedMessage2.getPayload())).isEqualTo(testPayload);
|
||||
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user