diff --git a/.jdk8 b/.jdk8 new file mode 100644 index 00000000..e69de29b diff --git a/pom.xml b/pom.xml index 6a2e1b9d..7518335e 100644 --- a/pom.xml +++ b/pom.xml @@ -11,9 +11,9 @@ - 1.7 - 0.10.2.0 + 1.8 2.0.0.BUILD-SNAPSHOT + 0.11.0.0 3.0.0.BUILD-SNAPSHOT 2.0.0.BUILD-SNAPSHOT @@ -21,6 +21,7 @@ spring-cloud-stream-binder-kafka spring-cloud-starter-stream-kafka spring-cloud-stream-binder-kafka-docs + spring-cloud-stream-binder-kafka-test spring-cloud-stream-binder-kafka-core spring-cloud-stream-binder-kstream @@ -42,11 +43,6 @@ spring-cloud-stream ${spring-cloud-stream.version} - - org.springframework.cloud - spring-cloud-stream-codec - ${spring-cloud-stream.version} - org.apache.kafka kafka_2.11 @@ -71,6 +67,11 @@ kafka-clients ${kafka.version} + + org.springframework.kafka + spring-kafka + ${spring-kafka.version} + org.springframework.integration spring-integration-kafka @@ -105,6 +106,13 @@ + + org.springframework.cloud + spring-cloud-stream-binder-kafka + ${project.version} + test-jar + test + @@ -152,7 +160,7 @@ org.springframework.cloud spring-cloud-build-tools - 2.0.0.BUILD-SNAPSHOT + ${project.parent.version} diff --git a/spring-cloud-starter-stream-kafka/.jdk8 b/spring-cloud-starter-stream-kafka/.jdk8 new file mode 100644 index 00000000..e69de29b diff --git a/spring-cloud-stream-binder-kafka-core/.jdk8 b/spring-cloud-stream-binder-kafka-core/.jdk8 new file mode 100644 index 00000000..e69de29b diff --git a/spring-cloud-stream-binder-kafka-core/pom.xml b/spring-cloud-stream-binder-kafka-core/pom.xml index 6890ca94..7bc68d35 100644 --- a/spring-cloud-stream-binder-kafka-core/pom.xml +++ b/spring-cloud-stream-binder-kafka-core/pom.xml @@ -1,6 +1,18 @@ 4.0.0 + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + org.springframework.cloud spring-cloud-stream-binder-kafka-parent diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/admin/Kafka09AdminUtilsOperation.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/admin/Kafka09AdminUtilsOperation.java deleted file mode 100644 index 0723f1ed..00000000 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/admin/Kafka09AdminUtilsOperation.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Copyright 2002-2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.kafka.admin; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.Properties; - -import kafka.utils.ZkUtils; - -import org.springframework.util.ClassUtils; -import org.springframework.util.ReflectionUtils; - -/** - * @author Soby Chacko - */ -public class Kafka09AdminUtilsOperation implements AdminUtilsOperation { - - private static Class ADMIN_UTIL_CLASS; - - static { - try { - ADMIN_UTIL_CLASS = ClassUtils.forName("kafka.admin.AdminUtils", null); - } - catch (ClassNotFoundException e) { - throw new IllegalStateException("AdminUtils class not found", e); - } - } - - public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions, - String replicaAssignmentStr, boolean checkBrokerAvailable) { - try { - Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods(); - Method addPartitions = null; - for (Method m : declaredMethods) { - if (m.getName().equals("addPartitions")) { - addPartitions = m; - } - } - if (addPartitions != null) { - addPartitions.invoke(null, zkUtils, topic, numPartitions, - replicaAssignmentStr, checkBrokerAvailable); - } - else { - throw new InvocationTargetException( - new RuntimeException("method not found")); - } - } - catch (InvocationTargetException e) { - ReflectionUtils.handleInvocationTargetException(e); - } - catch (IllegalAccessException e) { - ReflectionUtils.handleReflectionException(e); - } - - } - - public short errorCodeFromTopicMetadata(String topic, ZkUtils zkUtils) { - try { - Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class); - Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils); - Class topicMetadataClass = ClassUtils.forName("kafka.api.TopicMetadata", null); - Method errorCodeMethod = ReflectionUtils.findMethod(topicMetadataClass, "errorCode"); - return (short) errorCodeMethod.invoke(result); - } - catch (ClassNotFoundException e) { - throw new IllegalStateException("AdminUtils class not found", e); - } - catch (InvocationTargetException e) { - ReflectionUtils.handleInvocationTargetException(e); - } - catch (IllegalAccessException e) { - ReflectionUtils.handleReflectionException(e); - } - return 0; - } - - @SuppressWarnings("unchecked") - public int partitionSize(String topic, ZkUtils zkUtils) { - try { - Method fetchTopicMetadataFromZk = ReflectionUtils.findMethod(ADMIN_UTIL_CLASS, "fetchTopicMetadataFromZk", String.class, ZkUtils.class); - Object result = fetchTopicMetadataFromZk.invoke(null, topic, zkUtils); - Class topicMetadataClass = ClassUtils.forName("kafka.api.TopicMetadata", null); - - Method partitionsMetadata = ReflectionUtils.findMethod(topicMetadataClass, "partitionsMetadata"); - scala.collection.Seq partitionSize = - (scala.collection.Seq)partitionsMetadata.invoke(result); - - return partitionSize.size(); - } - catch (ClassNotFoundException e) { - throw new IllegalStateException("AdminUtils class not found", e); - } - catch (InvocationTargetException e) { - ReflectionUtils.handleInvocationTargetException(e); - } - catch (IllegalAccessException e) { - ReflectionUtils.handleReflectionException(e); - } - return 0; - - } - - public void invokeCreateTopic(ZkUtils zkUtils, String topic, int partitions, - int replicationFactor, Properties topicConfig) { - try { - Method[] declaredMethods = ADMIN_UTIL_CLASS.getDeclaredMethods(); - Method createTopic = null; - for (Method m : declaredMethods) { - if (m.getName().equals("createTopic")) { - createTopic = m; - break; - } - } - if (createTopic != null) { - createTopic.invoke(null, zkUtils, topic, partitions, - replicationFactor, topicConfig); - } - else { - throw new InvocationTargetException( - new RuntimeException("method not found")); - } - } - catch (InvocationTargetException e) { - ReflectionUtils.handleInvocationTargetException(e); - } - catch (IllegalAccessException e) { - ReflectionUtils.handleReflectionException(e); - } - } -} \ No newline at end of file diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/admin/Kafka10AdminUtilsOperation.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/admin/KafkaAdminUtilsOperation.java similarity index 96% rename from spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/admin/Kafka10AdminUtilsOperation.java rename to spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/admin/KafkaAdminUtilsOperation.java index 026ffe6a..9cad3aa7 100644 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/admin/Kafka10AdminUtilsOperation.java +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/admin/KafkaAdminUtilsOperation.java @@ -25,7 +25,7 @@ import org.apache.kafka.common.requests.MetadataResponse; /** * @author Soby Chacko */ -public class Kafka10AdminUtilsOperation implements AdminUtilsOperation { +public class KafkaAdminUtilsOperation implements AdminUtilsOperation { public void invokeAddPartitions(ZkUtils zkUtils, String topic, int numPartitions, String replicaAssignmentStr, boolean checkBrokerAvailable) { diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java index b5442d4a..8cdd3815 100644 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaBinderConfigurationProperties.java @@ -40,6 +40,8 @@ import org.springframework.util.StringUtils; @ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder") public class KafkaBinderConfigurationProperties { + private final Transaction transaction = new Transaction(); + @Autowired(required = false) private KafkaProperties kafkaProperties; @@ -96,6 +98,10 @@ public class KafkaBinderConfigurationProperties { private JaasLoginModuleConfiguration jaas; + public Transaction getTransaction() { + return this.transaction; + } + public String getZkConnectionString() { return toConnectionString(this.zkNodes, this.defaultZkPort); } @@ -352,4 +358,24 @@ public class KafkaBinderConfigurationProperties { this.jaas = jaas; } + public static class Transaction { + + private final KafkaProducerProperties producer = new KafkaProducerProperties(); + + private String transactionIdPrefix; + + public String getTransactionIdPrefix() { + return this.transactionIdPrefix; + } + + public void setTransactionIdPrefix(String transactionIdPrefix) { + this.transactionIdPrefix = transactionIdPrefix; + } + + public KafkaProducerProperties getProducer() { + return this.producer; + } + + } + } diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaConsumerProperties.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaConsumerProperties.java index 1d0cbbcb..13368ad2 100644 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaConsumerProperties.java +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaConsumerProperties.java @@ -43,6 +43,8 @@ public class KafkaConsumerProperties { private int recoveryInterval = 5000; + private String[] trustedPackages; + private Map configuration = new HashMap<>(); public boolean isAutoCommitOffset() { @@ -123,4 +125,12 @@ public class KafkaConsumerProperties { public void setDlqName(String dlqName) { this.dlqName = dlqName; } + + public String[] getTrustedPackages() { + return trustedPackages; + } + + public void setTrustedPackages(String[] trustedPackages) { + this.trustedPackages = trustedPackages; + } } diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java index 8cc96130..104ae583 100644 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/properties/KafkaProducerProperties.java @@ -16,16 +16,17 @@ package org.springframework.cloud.stream.binder.kafka.properties; -import org.springframework.expression.Expression; - import java.util.HashMap; import java.util.Map; import javax.validation.constraints.NotNull; +import org.springframework.expression.Expression; + /** * @author Marius Bogoevici * @author Henryk Konsek + * @author Gary Russell */ public class KafkaProducerProperties { @@ -39,6 +40,8 @@ public class KafkaProducerProperties { private Expression messageKeyExpression; + private String[] headerPatterns; + private Map configuration = new HashMap<>(); public int getBufferSize() { @@ -82,6 +85,14 @@ public class KafkaProducerProperties { this.messageKeyExpression = messageKeyExpression; } + public String[] getHeaderPatterns() { + return this.headerPatterns; + } + + public void setHeaderPatterns(String[] headerPatterns) { + this.headerPatterns = headerPatterns; + } + public Map getConfiguration() { return this.configuration; } diff --git a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java index a1e9b3dd..bab31e0c 100644 --- a/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java +++ b/spring-cloud-stream-binder-kafka-core/src/main/java/org/springframework/cloud/stream/binder/kafka/provisioning/KafkaTopicProvisioner.java @@ -40,8 +40,6 @@ import org.springframework.cloud.stream.provisioning.ConsumerDestination; import org.springframework.cloud.stream.provisioning.ProducerDestination; import org.springframework.cloud.stream.provisioning.ProvisioningException; import org.springframework.cloud.stream.provisioning.ProvisioningProvider; -import org.springframework.retry.RetryCallback; -import org.springframework.retry.RetryContext; import org.springframework.retry.RetryOperations; import org.springframework.retry.backoff.ExponentialBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; @@ -203,29 +201,25 @@ public class KafkaTopicProvisioner implements ProvisioningProvider() { + this.metadataRetryOperations.execute(context -> { - @Override - public Object doWithRetry(RetryContext context) throws RuntimeException { - - try { - adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount, - configurationProperties.getReplicationFactor(), new Properties()); - } - catch (Exception e) { - String exceptionClass = e.getClass().getName(); - if (exceptionClass.equals("kafka.common.TopicExistsException") - || exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")) { - if (logger.isWarnEnabled()) { - logger.warn("Attempt to create topic: " + topicName + ". Topic already exists."); - } - } - else { - throw e; - } - } - return null; + try { + adminUtilsOperation.invokeCreateTopic(zkUtils, topicName, effectivePartitionCount, + configurationProperties.getReplicationFactor(), new Properties()); } + catch (Exception e) { + String exceptionClass = e.getClass().getName(); + if (exceptionClass.equals("kafka.common.TopicExistsException") + || exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")) { + if (logger.isWarnEnabled()) { + logger.warn("Attempt to create topic: " + topicName + ". Topic already exists."); + } + } + else { + throw e; + } + } + return null; }); } else { @@ -243,27 +237,23 @@ public class KafkaTopicProvisioner implements ProvisioningProvider> callable) { try { return this.metadataRetryOperations - .execute(new RetryCallback, Exception>() { - - @Override - public Collection doWithRetry(RetryContext context) throws Exception { - Collection partitions = callable.call(); - // do a sanity check on the partition set - int partitionSize = partitions.size(); - if (partitionSize < partitionCount) { - if (tolerateLowerPartitionsOnBroker) { - logger.warn("The number of expected partitions was: " + partitionCount + ", but " - + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead." - + "There will be " + (partitionCount - partitionSize) + " idle consumers"); - } - else { - throw new IllegalStateException("The number of expected partitions was: " - + partitionCount + ", but " + partitionSize - + (partitionSize > 1 ? " have " : " has ") + "been found instead"); - } + .execute(context -> { + Collection partitions = callable.call(); + // do a sanity check on the partition set + int partitionSize = partitions.size(); + if (partitionSize < partitionCount) { + if (tolerateLowerPartitionsOnBroker) { + logger.warn("The number of expected partitions was: " + partitionCount + ", but " + + partitionSize + (partitionSize > 1 ? " have " : " has ") + "been found instead." + + "There will be " + (partitionCount - partitionSize) + " idle consumers"); + } + else { + throw new IllegalStateException("The number of expected partitions was: " + + partitionCount + ", but " + partitionSize + + (partitionSize > 1 ? " have " : " has ") + "been found instead"); } - return partitions; } + return partitions; }); } catch (Exception e) { diff --git a/spring-cloud-stream-binder-kafka-docs/.jdk8 b/spring-cloud-stream-binder-kafka-docs/.jdk8 new file mode 100644 index 00000000..e69de29b diff --git a/spring-cloud-stream-binder-kafka-docs/src/main/asciidoc/index.adoc b/spring-cloud-stream-binder-kafka-docs/src/main/asciidoc/index.adoc index b5bfdec7..6d41943f 100644 --- a/spring-cloud-stream-binder-kafka-docs/src/main/asciidoc/index.adoc +++ b/spring-cloud-stream-binder-kafka-docs/src/main/asciidoc/index.adoc @@ -26,6 +26,8 @@ include::overview.adoc[] include::dlq.adoc[] +include::metrics.adoc[] + = Appendices [appendix] include::building.adoc[] diff --git a/spring-cloud-stream-binder-kafka-docs/src/main/asciidoc/overview.adoc b/spring-cloud-stream-binder-kafka-docs/src/main/asciidoc/overview.adoc index d2c2fb59..63ba12eb 100644 --- a/spring-cloud-stream-binder-kafka-docs/src/main/asciidoc/overview.adoc +++ b/spring-cloud-stream-binder-kafka-docs/src/main/asciidoc/overview.adoc @@ -528,34 +528,6 @@ spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.c spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde ---- -timewindow.length:: - Many streaming applications written using Kafka Streams involve windowning operations. - If you specify this property, there is a `org.apache.kafka.streams.kstream.TimeWindows` bean automatically provided that can be autowired in applications. - This property must be prefixed with `spring.cloud.stream.kstream.`. - A bean of type `org.apache.kafka.streams.kstream.TimeWindows` is created only if this property is provided. - - Following is an example of using this property. - Values are provided in milliseconds. - -[source] ----- -spring.cloud.stream.kstream.timeWindow.length=5000 ----- - -timewindow.advanceBy:: - This property goes hand in hand with `timewindow.length` and has no effect on its own. - If you provide this property, the generated `org.apache.kafka.streams.kstream.TimeWindows` bean will automatically conatin this information. - This property must be prefixed with `spring.cloud.stream.kstream.`. - - Following is an example of using this property. - Values are provided in milliseconds. - -[source] ----- -spring.cloud.stream.kstream.timeWindow.advanceBy=1000 ----- - - [[kafka-error-channels]] == Error Channels @@ -568,13 +540,3 @@ The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureExcep * `record` - the raw `ProducerRecord` that was created from the `failedMessage` There is no automatic handling of these exceptions (such as sending to a <>); you can consume these exceptions with your own Spring Integration flow. - - -[[kafka-metrics]] -== Kafka Metrics - -Kafka binder module exposes the following metrics: - -`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag` - this metric indicates how many messages have not been yet consumed from given binder's topic by given consumer group. -For example if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, then consumer group `myGroup` has `1000` messages to waiting to be consumed from topic `myTopic`. -This metric is particularly useful to provide auto-scaling feedback to PaaS platform of your choice. diff --git a/spring-cloud-stream-binder-kafka-test/.jdk8 b/spring-cloud-stream-binder-kafka-test/.jdk8 new file mode 100644 index 00000000..e69de29b diff --git a/spring-cloud-stream-binder-kafka-0.10.1-test/pom.xml b/spring-cloud-stream-binder-kafka-test/pom.xml similarity index 70% rename from spring-cloud-stream-binder-kafka-0.10.1-test/pom.xml rename to spring-cloud-stream-binder-kafka-test/pom.xml index 8c6f6b03..d1da4420 100644 --- a/spring-cloud-stream-binder-kafka-0.10.1-test/pom.xml +++ b/spring-cloud-stream-binder-kafka-test/pom.xml @@ -4,18 +4,13 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent -<<<<<<< c396c5c756d1c01e1b6b0717ff64ae72ab78cb36 -<<<<<<< b20f4a0e08629664c73e2e6cc1e73ac791509697 - 1.3.1.BUILD-SNAPSHOT -======= 2.0.0.M1 ->>>>>>> Release 2.0.0.M1 ======= 2.0.0.BUILD-SNAPSHOT >>>>>>> Set version to 2.0.0.BUILD-SNAPSHOT - spring-cloud-stream-binder-kafka-0.10.1-test - Spring Cloud Stream Kafka Binder 0.10.1 Tests + spring-cloud-stream-binder-kafka-test + Spring Cloud Stream Kafka Binder Tests http://projects.spring.io/spring-cloud Pivotal Software, Inc. @@ -23,17 +18,20 @@ ${basedir}/../.. - 0.10.1.1 + 0.11.0.0 + 2.0.0.BUILD-SNAPSHOT org.springframework.cloud - spring-cloud-stream-binder-kafka-core + spring-cloud-stream-binder-kafka + test org.springframework.cloud spring-cloud-stream-binder-kafka + test-jar test @@ -45,12 +43,6 @@ org.apache.kafka kafka_2.11 test - - - org.slf4j - slf4j-log4j12 - - org.apache.kafka @@ -74,13 +66,6 @@ org.springframework.integration spring-integration-jmx - - org.springframework.cloud - spring-cloud-stream-binder-kafka - ${project.version} - test-jar - test - org.springframework.cloud spring-cloud-stream-binder-kafka-0.10.2-test @@ -102,39 +87,59 @@ io.confluent kafka-avro-serializer - 3.1.2 + 3.3.0 test io.confluent kafka-schema-registry - 3.1.2 + 3.3.0 test + + org.glassfish.jersey.inject + jersey-hk2 + 2.26-b06 + + + org.glassfish.jersey.bundles.repackaged + jersey-guava + 2.6 + + + spring-snapshots + Spring Snapshots + http://repo.spring.io/libs-snapshot-local + + true + + + false + + + + spring-milestones + Spring Milestones + http://repo.spring.io/libs-milestone-local + + false + + + + spring-releases + Spring Releases + http://repo.spring.io/release + + false + + confluent http://packages.confluent.io/maven/ - - - - org.apache.maven.plugins - maven-jar-plugin - 3.0.2 - - - - test-jar - - - - - - - diff --git a/spring-cloud-stream-binder-kafka-0.10.1-test/src/test/java/org/springframework/cloud/stream/binder/kafka/Kafka_0_10_1_BinderTests.java b/spring-cloud-stream-binder-kafka-test/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java similarity index 71% rename from spring-cloud-stream-binder-kafka-0.10.1-test/src/test/java/org/springframework/cloud/stream/binder/kafka/Kafka_0_10_1_BinderTests.java rename to spring-cloud-stream-binder-kafka-test/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index c8815cfb..05c4c893 100644 --- a/spring-cloud-stream-binder-kafka-0.10.1-test/src/test/java/org/springframework/cloud/stream/binder/kafka/Kafka_0_10_1_BinderTests.java +++ b/spring-cloud-stream-binder-kafka-test/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -42,10 +42,11 @@ import org.springframework.cloud.stream.binder.Binding; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.Spy; -import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation; +import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties; +import org.springframework.cloud.stream.config.BindingProperties; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.kafka.core.ConsumerFactory; @@ -55,14 +56,17 @@ import org.springframework.kafka.test.core.BrokerAddress; import org.springframework.kafka.test.rule.KafkaEmbedded; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.MimeTypeUtils; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertTrue; /** * Integration tests for the {@link KafkaMessageChannelBinder}. - * + * * This test specifically tests for the 0.10.1.x version of Kafka. * * @author Eric Bottard @@ -70,16 +74,16 @@ import static org.junit.Assert.assertTrue; * @author Mark Fisher * @author Ilayaperumal Gopinathan */ -public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests { +public class KafkaBinderTests extends AbstractKafkaBinderTests { private final String CLASS_UNDER_TEST_NAME = KafkaMessageChannelBinder.class.getSimpleName(); @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 10); - private Kafka10TestBinder binder; + private KafkaTestBinder binder; - private Kafka10AdminUtilsOperation adminUtilsOperation = new Kafka10AdminUtilsOperation(); + private final KafkaAdminUtilsOperation adminUtilsOperation = new KafkaAdminUtilsOperation(); @Override protected void binderBindUnbindLatency() throws InterruptedException { @@ -87,14 +91,15 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests { } @Override - protected Kafka10TestBinder getBinder() { + protected KafkaTestBinder getBinder() { if (binder == null) { KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); - binder = new Kafka10TestBinder(binderConfiguration); + binder = new KafkaTestBinder(binderConfiguration); } return binder; } + @Override protected KafkaBinderConfigurationProperties createConfigurationProperties() { KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties(); BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses(); @@ -139,7 +144,7 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests { @Override protected Binder getBinder(KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties) { - return new Kafka10TestBinder(kafkaBinderConfigurationProperties); + return new KafkaTestBinder(kafkaBinderConfigurationProperties); } @Before @@ -186,6 +191,8 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests { configurationProperties.getZkSessionTimeout(), configurationProperties.getZkConnectionTimeout(), ZKStringSerializer$.MODULE$); final ZkUtils zkUtils = new ZkUtils(zkClient, null, false); + + Map schemaRegistryProps = new HashMap<>(); schemaRegistryProps.put("kafkastore.connection.url", configurationProperties.getZkConnectionString()); schemaRegistryProps.put("listeners", "http://0.0.0.0:8082"); @@ -238,4 +245,81 @@ public class Kafka_0_10_1_BinderTests extends Kafka_0_10_2_BinderTests { producerBinding.unbind(); consumerBinding.unbind(); } + + @Override + public void testSendAndReceiveWithExplicitConsumerGroupWithRawMode() { + // raw mode no longer needed + } + + @Override + public void testSendAndReceiveWithRawModeAndStringPayload() { + // raw mode no longer needed + } + + @Test + @Override + @SuppressWarnings("unchecked") + public void testSendAndReceiveNoOriginalContentType() throws Exception { + Binder binder = getBinder(); + + BindingProperties producerBindingProperties = createProducerBindingProperties( + createProducerProperties()); + DirectChannel moduleOutputChannel = createBindableChannel("output", + producerBindingProperties); + QueueChannel moduleInputChannel = new QueueChannel(); + Binding producerBinding = binder.bindProducer("bar.0", + moduleOutputChannel, producerBindingProperties.getProducer()); + + ExtendedConsumerProperties consumerProperties = createConsumerProperties(); + consumerProperties.getExtension().setTrustedPackages(new String[] {"org.springframework.util"}); + Binding consumerBinding = binder.bindConsumer("bar.0", + "testSendAndReceiveNoOriginalContentType", moduleInputChannel, + consumerProperties); + binderBindUnbindLatency(); + + //TODO: Will have to fix the MimeType to convert to byte array once this issue has been resolved: + //https://github.com/spring-projects/spring-kafka/issues/424 + Message message = org.springframework.integration.support.MessageBuilder.withPayload("foo".getBytes()) + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE.getBytes()).build(); + moduleOutputChannel.send(message); + Message inbound = receive(moduleInputChannel); + assertThat(inbound).isNotNull(); + assertThat(inbound.getPayload()).isEqualTo("foo".getBytes()); + assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE)) + .isEqualTo(MimeTypeUtils.TEXT_PLAIN_VALUE.getBytes()); + producerBinding.unbind(); + consumerBinding.unbind(); + } + + @Test + @Override + @SuppressWarnings("unchecked") + public void testSendAndReceive() throws Exception { + Binder binder = getBinder(); + BindingProperties outputBindingProperties = createProducerBindingProperties( + createProducerProperties()); + DirectChannel moduleOutputChannel = createBindableChannel("output", + outputBindingProperties); + QueueChannel moduleInputChannel = new QueueChannel(); + Binding producerBinding = binder.bindProducer("foo.0", + moduleOutputChannel, outputBindingProperties.getProducer()); + Binding consumerBinding = binder.bindConsumer("foo.0", + "testSendAndReceive", moduleInputChannel, createConsumerProperties()); + // Bypass conversion we are only testing sendReceive + Message message = org.springframework.integration.support.MessageBuilder.withPayload("foo".getBytes()) + .setHeader(MessageHeaders.CONTENT_TYPE, + MimeTypeUtils.APPLICATION_OCTET_STREAM_VALUE.getBytes()) + .build(); + // Let the consumer actually bind to the producer before sending a msg + binderBindUnbindLatency(); + moduleOutputChannel.send(message); + Message inbound = receive(moduleInputChannel); + assertThat(inbound).isNotNull(); + assertThat(inbound.getPayload()).isEqualTo("foo".getBytes()); + assertThat(inbound.getHeaders().get(MessageHeaders.CONTENT_TYPE)) + .isEqualTo(MimeTypeUtils.APPLICATION_OCTET_STREAM_VALUE.getBytes()); + producerBinding.unbind(); + consumerBinding.unbind(); + } + } diff --git a/spring-cloud-stream-binder-kafka-test/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTestBinder.java b/spring-cloud-stream-binder-kafka-test/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTestBinder.java new file mode 100644 index 00000000..5314c8cb --- /dev/null +++ b/spring-cloud-stream-binder-kafka-test/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTestBinder.java @@ -0,0 +1,85 @@ +/* + * Copyright 2015-2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; +import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation; +import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation; +import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties; +import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; +import org.springframework.cloud.stream.provisioning.ConsumerDestination; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.kafka.support.LoggingProducerListener; +import org.springframework.kafka.support.ProducerListener; + +/** + * Test support class for {@link KafkaMessageChannelBinder}. + * @author Eric Bottard + * @author Marius Bogoevici + * @author David Turanski + * @author Gary Russell + * @author Soby Chacko + */ +public class KafkaTestBinder extends AbstractKafkaTestBinder { + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public KafkaTestBinder(KafkaBinderConfigurationProperties binderConfiguration) { + try { + AdminUtilsOperation adminUtilsOperation = new KafkaAdminUtilsOperation(); + KafkaTopicProvisioner provisioningProvider = + new KafkaTopicProvisioner(binderConfiguration, adminUtilsOperation); + provisioningProvider.afterPropertiesSet(); + + KafkaMessageChannelBinder binder = new KafkaMessageChannelBinder(binderConfiguration, + provisioningProvider) { + + /* + * Some tests use multiple instance indexes for the same topic; we need to make + * the error infrastructure beans unique. + */ + @Override + protected String errorsBaseName(ConsumerDestination destination, String group, + ExtendedConsumerProperties consumerProperties) { + return super.errorsBaseName(destination, group, consumerProperties) + "-" + + consumerProperties.getInstanceIndex(); + } + + }; + + ProducerListener producerListener = new LoggingProducerListener(); + binder.setProducerListener(producerListener); + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class); + setApplicationContext(context); + binder.setApplicationContext(context); + binder.afterPropertiesSet(); + this.setBinder(binder); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Configuration + @EnableIntegration + static class Config { + + } + +} diff --git a/spring-cloud-stream-binder-kafka-test/src/test/java/org/springframework/cloud/stream/binder/kafka/User1.java b/spring-cloud-stream-binder-kafka-test/src/test/java/org/springframework/cloud/stream/binder/kafka/User1.java new file mode 100644 index 00000000..b5c45c9b --- /dev/null +++ b/spring-cloud-stream-binder-kafka-test/src/test/java/org/springframework/cloud/stream/binder/kafka/User1.java @@ -0,0 +1,85 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import java.io.IOException; + +import org.apache.avro.Schema; +import org.apache.avro.reflect.Nullable; +import org.apache.avro.specific.SpecificRecordBase; + +import org.springframework.core.io.ClassPathResource; + +/** + * @author Marius Bogoevici + * @author Ilayaperumal Gopinathan + */ +public class User1 extends SpecificRecordBase { + + @Nullable + private String name; + + @Nullable + private String favoriteColor; + + public String getName() { + return this.name; + } + + public void setName(String name) { + this.name = name; + } + + public String getFavoriteColor() { + return this.favoriteColor; + } + + public void setFavoriteColor(String favoriteColor) { + this.favoriteColor = favoriteColor; + } + + @Override + public Schema getSchema() { + try { + return new Schema.Parser().parse(new ClassPathResource("schemas/users_v1.schema").getInputStream()); + } + catch (IOException e) { + throw new IllegalStateException(e); + } + } + + @Override + public Object get(int i) { + if (i == 0) { + return getName().toString(); + } + if (i == 1) { + return getFavoriteColor().toString(); + } + return null; + } + + @Override + public void put(int i, Object o) { + if (i == 0) { + setName((String) o); + } + if (i == 1) { + setFavoriteColor((String) o); + } + } +} diff --git a/spring-cloud-stream-binder-kafka/.jdk8 b/spring-cloud-stream-binder-kafka/.jdk8 new file mode 100644 index 00000000..e69de29b diff --git a/spring-cloud-stream-binder-kafka/pom.xml b/spring-cloud-stream-binder-kafka/pom.xml index f995d4b9..c8a939fe 100644 --- a/spring-cloud-stream-binder-kafka/pom.xml +++ b/spring-cloud-stream-binder-kafka/pom.xml @@ -10,37 +10,14 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent -<<<<<<< c396c5c756d1c01e1b6b0717ff64ae72ab78cb36 -<<<<<<< b20f4a0e08629664c73e2e6cc1e73ac791509697 -<<<<<<< e3460d6fcef9406fc9c6018f0f828630b929a815 - 1.3.1.BUILD-SNAPSHOT -======= 2.0.0.BUILD-SNAPSHOT ->>>>>>> Update version to 2.0.0.BUILD-SNAPSHOT -======= - 2.0.0.M1 ->>>>>>> Release 2.0.0.M1 -======= - 2.0.0.BUILD-SNAPSHOT ->>>>>>> Set version to 2.0.0.BUILD-SNAPSHOT org.springframework.cloud spring-cloud-stream-binder-kafka-core -<<<<<<< c396c5c756d1c01e1b6b0717ff64ae72ab78cb36 -<<<<<<< b20f4a0e08629664c73e2e6cc1e73ac791509697 -<<<<<<< e3460d6fcef9406fc9c6018f0f828630b929a815 -======= 2.0.0.BUILD-SNAPSHOT ->>>>>>> Update version to 2.0.0.BUILD-SNAPSHOT -======= - 2.0.0.M1 ->>>>>>> Release 2.0.0.M1 -======= - 2.0.0.BUILD-SNAPSHOT ->>>>>>> Set version to 2.0.0.BUILD-SNAPSHOT org.springframework.boot @@ -98,9 +75,6 @@ spring-cloud-stream-binder-test test -<<<<<<< b20f4a0e08629664c73e2e6cc1e73ac791509697 -<<<<<<< e3460d6fcef9406fc9c6018f0f828630b929a815 -======= org.springframework.cloud spring-cloud-stream-schema @@ -119,9 +93,6 @@ 3.2.1 test ->>>>>>> Update version to 2.0.0.BUILD-SNAPSHOT -======= ->>>>>>> Release 2.0.0.M1 diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderJaasInitializerListener.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderJaasInitializerListener.java deleted file mode 100644 index 4d06df00..00000000 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderJaasInitializerListener.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.kafka; - -import java.io.File; -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import javax.security.auth.login.AppConfigurationEntry; -import javax.security.auth.login.Configuration; - -import org.apache.kafka.common.security.JaasUtils; - -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.ApplicationListener; -import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.util.Assert; - -/** - * @author Marius Bogoevici - */ -public class KafkaBinderJaasInitializerListener implements ApplicationListener, - ApplicationContextAware, DisposableBean { - - public static final String DEFAULT_ZK_LOGIN_CONTEXT_NAME = "Client"; - - private ApplicationContext applicationContext; - - private final boolean ignoreJavaLoginConfigParamSystemProperty; - - private final File placeholderJaasConfiguration; - - public KafkaBinderJaasInitializerListener() throws IOException { - // we ignore the system property if it wasn't originally set at launch - this.ignoreJavaLoginConfigParamSystemProperty = - (System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM) == null); - this.placeholderJaasConfiguration = File.createTempFile("kafka-client-jaas-config-placeholder", "conf"); - this.placeholderJaasConfiguration.deleteOnExit(); - } - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; - } - - @Override - public void destroy() throws Exception { - if (this.ignoreJavaLoginConfigParamSystemProperty) { - System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM); - } - } - - @Override - public void onApplicationEvent(ContextRefreshedEvent event) { - if (event.getSource() == this.applicationContext) { - KafkaBinderConfigurationProperties binderConfigurationProperties = - applicationContext.getBean(KafkaBinderConfigurationProperties.class); - // only use programmatic support if a file is not set via system property - if (ignoreJavaLoginConfigParamSystemProperty - && binderConfigurationProperties.getJaas() != null) { - Map configurationEntries = new HashMap<>(); - AppConfigurationEntry kafkaClientConfigurationEntry = new AppConfigurationEntry - (binderConfigurationProperties.getJaas().getLoginModule(), - binderConfigurationProperties.getJaas().getControlFlagValue(), - binderConfigurationProperties.getJaas().getOptions() != null ? - binderConfigurationProperties.getJaas().getOptions() : - Collections.emptyMap()); - configurationEntries.put(JaasUtils.LOGIN_CONTEXT_CLIENT, - new AppConfigurationEntry[]{ kafkaClientConfigurationEntry }); - Configuration.setConfiguration(new InternalConfiguration(configurationEntries)); - // Workaround for a 0.9 client issue where even if the Configuration is set - // a system property check is performed. - // Since the Configuration already exists, this will be ignored. - if (this.placeholderJaasConfiguration != null) { - System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, this.placeholderJaasConfiguration.getAbsolutePath()); - } - } - } - } - - /** - * A {@link Configuration} set up programmatically by the Kafka binder - */ - public static class InternalConfiguration extends Configuration { - - private final Map configurationEntries; - - public InternalConfiguration(Map configurationEntries) { - Assert.notNull(configurationEntries, " cannot be null"); - Assert.notEmpty(configurationEntries, " cannot be empty"); - this.configurationEntries = configurationEntries; - } - - @Override - public AppConfigurationEntry[] getAppConfigurationEntry(String name) { - return configurationEntries.get(name); - } - } -} diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java index b8098e88..9240362d 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetrics.java @@ -15,12 +15,13 @@ */ package org.springframework.cloud.stream.binder.kafka; -import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.binder.MeterBinder; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -30,8 +31,6 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.boot.actuate.endpoint.PublicMetrics; -import org.springframework.boot.actuate.metrics.Metric; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @@ -42,7 +41,7 @@ import org.springframework.util.ObjectUtils; * * @author Henryk Konsek */ -public class KafkaBinderMetrics implements PublicMetrics { +public class KafkaBinderMetrics implements MeterBinder { private final static Logger LOG = LoggerFactory.getLogger(KafkaBinderMetrics.class); @@ -68,8 +67,7 @@ public class KafkaBinderMetrics implements PublicMetrics { } @Override - public Collection> metrics() { - List> metrics = new LinkedList<>(); + public void bindTo(MeterRegistry registry) { for (Map.Entry topicInfo : this.binder.getTopicsInUse() .entrySet()) { if (!topicInfo.getValue().isConsumerTopic()) { @@ -96,13 +94,12 @@ public class KafkaBinderMetrics implements PublicMetrics { lag += endOffset.getValue(); } } - metrics.add(new Metric<>(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), lag)); + registry.gauge(String.format("%s.%s.%s.lag", METRIC_PREFIX, group, topic), lag); } catch (Exception e) { LOG.debug("Cannot generate metric for topic: " + topic, e); } } - return metrics; } private ConsumerFactory createConsumerFactory(String group) { diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index 6ebb0854..1ef6c931 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -21,16 +21,17 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.Callable; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -61,16 +62,20 @@ import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.listener.AbstractMessageListenerContainer; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.config.ContainerProperties; +import org.springframework.kafka.support.DefaultKafkaHeaderMapper; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.TopicPartitionInitialOffset; -import org.springframework.messaging.Message; +import org.springframework.kafka.support.converter.MessagingMessageConverter; +import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessagingException; +import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; @@ -93,37 +98,31 @@ import org.springframework.util.concurrent.ListenableFutureCallback; */ public class KafkaMessageChannelBinder extends AbstractMessageChannelBinder, - ExtendedProducerProperties, KafkaTopicProvisioner> + ExtendedProducerProperties, KafkaTopicProvisioner> implements ExtendedPropertiesBinder { private final KafkaBinderConfigurationProperties configurationProperties; private final Map topicsInUse = new HashMap<>(); + private final KafkaTransactionManager transactionManager; + private ProducerListener producerListener; private KafkaExtendedBindingProperties extendedBindingProperties = new KafkaExtendedBindingProperties(); public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties, KafkaTopicProvisioner provisioningProvider) { - super(false, headersToMap(configurationProperties), provisioningProvider); + super(true, null, provisioningProvider); this.configurationProperties = configurationProperties; - } - - private static String[] headersToMap(KafkaBinderConfigurationProperties configurationProperties) { - String[] headersToMap; - if (ObjectUtils.isEmpty(configurationProperties.getHeaders())) { - headersToMap = BinderHeaders.STANDARD_HEADERS; + if (StringUtils.hasText(configurationProperties.getTransaction().getTransactionIdPrefix())) { + this.transactionManager = new KafkaTransactionManager<>( + getProducerFactory(configurationProperties.getTransaction().getTransactionIdPrefix(), + new ExtendedProducerProperties<>(configurationProperties.getTransaction().getProducer()))); } else { - String[] combinedHeadersToMap = Arrays.copyOfRange(BinderHeaders.STANDARD_HEADERS, 0, - BinderHeaders.STANDARD_HEADERS.length + configurationProperties.getHeaders().length); - System.arraycopy(configurationProperties.getHeaders(), 0, combinedHeadersToMap, - BinderHeaders.STANDARD_HEADERS.length, - configurationProperties.getHeaders().length); - headersToMap = combinedHeadersToMap; + this.transactionManager = null; } - return headersToMap; } public void setExtendedBindingProperties(KafkaExtendedBindingProperties extendedBindingProperties) { @@ -152,7 +151,16 @@ public class KafkaMessageChannelBinder extends protected MessageHandler createProducerMessageHandler(final ProducerDestination destination, ExtendedProducerProperties producerProperties, MessageChannel errorChannel) throws Exception { - final DefaultKafkaProducerFactory producerFB = getProducerFactory(producerProperties); + /* + * IMPORTANT: With a transactional binder, individual producer properties for + * Kafka are ignored; the global binder + * (spring.cloud.stream.kafka.binder.transaction.producer.*) properties are used + * instead, for all producers. A binder is transactional when + * 'spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix' has text. + */ + final ProducerFactory producerFB = this.transactionManager != null + ? this.transactionManager.getProducerFactory() + : getProducerFactory(null, producerProperties); Collection partitions = provisioningProvider.getPartitionsForTopic( producerProperties.getPartitionCount(), false, @@ -195,10 +203,22 @@ public class KafkaMessageChannelBinder extends if (errorChannel != null) { handler.setSendFailureChannel(errorChannel); } + String[] headerPatterns = producerProperties.getExtension().getHeaderPatterns(); + if (headerPatterns != null && headerPatterns.length > 0) { + List patterns = new LinkedList<>(Arrays.asList(headerPatterns)); + if (!patterns.contains("!" + MessageHeaders.TIMESTAMP)) { + patterns.add(0, "!" + MessageHeaders.TIMESTAMP); + } + if (!patterns.contains("!" + MessageHeaders.ID)) { + patterns.add(0, "!" + MessageHeaders.ID); + } + DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(patterns.toArray(new String[patterns.size()])); + handler.setHeaderMapper(headerMapper); + } return handler; } - private DefaultKafkaProducerFactory getProducerFactory( + private DefaultKafkaProducerFactory getProducerFactory(String transactionIdPrefix, ExtendedProducerProperties producerProperties) { Map props = new HashMap<>(); props.put(ProducerConfig.RETRIES_CONFIG, 0); @@ -227,7 +247,11 @@ public class KafkaMessageChannelBinder extends if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) { props.putAll(producerProperties.getExtension().getConfiguration()); } - return new DefaultKafkaProducerFactory<>(props); + DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(props); + if (transactionIdPrefix != null) { + producerFactory.setTransactionIdPrefix(transactionIdPrefix); + } + return producerFactory; } @Override @@ -284,6 +308,9 @@ public class KafkaMessageChannelBinder extends || extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(destination.getName()) : new ContainerProperties(topicPartitionInitialOffsets); + if (this.transactionManager != null) { + containerProperties.setTransactionManager(this.transactionManager); + } int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size()); @SuppressWarnings("rawtypes") final ConcurrentMessageListenerContainer messageListenerContainer = @@ -312,6 +339,14 @@ public class KafkaMessageChannelBinder extends } final KafkaMessageDrivenChannelAdapter kafkaMessageDrivenChannelAdapter = new KafkaMessageDrivenChannelAdapter<>( messageListenerContainer); + MessagingMessageConverter messageConverter = new MessagingMessageConverter(); + DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper(); + String[] trustedPackages = extendedConsumerProperties.getExtension().getTrustedPackages(); + if (!StringUtils.isEmpty(trustedPackages)) { + headerMapper.addTrustedPackages(trustedPackages); + } + messageConverter.setHeaderMapper(headerMapper); + kafkaMessageDrivenChannelAdapter.setMessageConverter(messageConverter); kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory()); ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination, consumerGroup, extendedConsumerProperties); @@ -334,48 +369,46 @@ public class KafkaMessageChannelBinder extends protected MessageHandler getErrorMessageHandler(final ConsumerDestination destination, final String group, final ExtendedConsumerProperties extendedConsumerProperties) { if (extendedConsumerProperties.getExtension().isEnableDlq()) { - DefaultKafkaProducerFactory producerFactory = getProducerFactory( - new ExtendedProducerProperties<>(new KafkaProducerProperties())); + ProducerFactory producerFactory = this.transactionManager != null + ? this.transactionManager.getProducerFactory() + : getProducerFactory(null, new ExtendedProducerProperties<>(new KafkaProducerProperties())); final KafkaTemplate kafkaTemplate = new KafkaTemplate<>(producerFactory); - return new MessageHandler() { + return message -> { + final ConsumerRecord record = message.getHeaders() + .get(KafkaHeaders.RAW_DATA, ConsumerRecord.class); + final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key())) + : null; + final byte[] payload = record.value() != null + ? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null; + String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName()) + ? extendedConsumerProperties.getExtension().getDlqName() + : "error." + destination.getName() + "." + group; + ProducerRecord producerRecord = new ProducerRecord<>(dlqName, record.partition(), + key, payload, record.headers()); + ListenableFuture> sentDlq = kafkaTemplate.send(producerRecord); + sentDlq.addCallback(new ListenableFutureCallback>() { + StringBuilder sb = new StringBuilder().append(" a message with key='") + .append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'") + .append(" and payload='") + .append(toDisplayString(ObjectUtils.nullSafeToString(payload), 50)) + .append("'").append(" received from ") + .append(record.partition()); - @Override - public void handleMessage(Message message) throws MessagingException { - final ConsumerRecord record = message.getHeaders() - .get(KafkaMessageDrivenChannelAdapter.KAFKA_RAW_DATA, ConsumerRecord.class); - final byte[] key = record.key() != null ? Utils.toArray(ByteBuffer.wrap((byte[]) record.key())) - : null; - final byte[] payload = record.value() != null - ? Utils.toArray(ByteBuffer.wrap((byte[]) record.value())) : null; - String dlqName = StringUtils.hasText(extendedConsumerProperties.getExtension().getDlqName()) - ? extendedConsumerProperties.getExtension().getDlqName() - : "error." + destination.getName() + "." + group; - ListenableFuture> sentDlq = kafkaTemplate.send(dlqName, - record.partition(), key, payload); - sentDlq.addCallback(new ListenableFutureCallback>() { - StringBuilder sb = new StringBuilder().append(" a message with key='") - .append(toDisplayString(ObjectUtils.nullSafeToString(key), 50)).append("'") - .append(" and payload='") - .append(toDisplayString(ObjectUtils.nullSafeToString(payload), 50)) - .append("'").append(" received from ") - .append(record.partition()); + @Override + public void onFailure(Throwable ex) { + KafkaMessageChannelBinder.this.logger.error( + "Error sending to DLQ " + sb.toString(), ex); + } - @Override - public void onFailure(Throwable ex) { - KafkaMessageChannelBinder.this.logger.error( - "Error sending to DLQ " + sb.toString(), ex); + @Override + public void onSuccess(SendResult result) { + if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) { + KafkaMessageChannelBinder.this.logger.debug( + "Sent to DLQ " + sb.toString()); } + } - @Override - public void onSuccess(SendResult result) { - if (KafkaMessageChannelBinder.this.logger.isDebugEnabled()) { - KafkaMessageChannelBinder.this.logger.debug( - "Sent to DLQ " + sb.toString()); - } - } - - }); - } + }); }; } return null; @@ -439,11 +472,11 @@ public class KafkaMessageChannelBinder extends private boolean running = true; - private final DefaultKafkaProducerFactory producerFactory; + private final ProducerFactory producerFactory; ProducerConfigurationMessageHandler(KafkaTemplate kafkaTemplate, String topic, ExtendedProducerProperties producerProperties, - DefaultKafkaProducerFactory producerFactory) { + ProducerFactory producerFactory) { super(kafkaTemplate); setTopicExpression(new LiteralExpression(topic)); setMessageKeyExpression(producerProperties.getExtension().getMessageKeyExpression()); @@ -471,7 +504,9 @@ public class KafkaMessageChannelBinder extends @Override public void stop() { - producerFactory.stop(); + if (this.producerFactory instanceof Lifecycle) { + ((Lifecycle) producerFactory).stop(); + } this.running = false; } diff --git a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java index 88cf0ec4..380c3b8a 100644 --- a/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java +++ b/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java @@ -20,43 +20,34 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import io.micrometer.core.instrument.binder.MeterBinder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.utils.AppInfoParser; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.actuate.endpoint.PublicMetrics; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator; -import org.springframework.cloud.stream.binder.kafka.KafkaBinderJaasInitializerListener; import org.springframework.cloud.stream.binder.kafka.KafkaBinderMetrics; import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder; import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation; -import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation; -import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation; +import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation; import org.springframework.cloud.stream.binder.kafka.properties.JaasLoginModuleConfiguration; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties; import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; -import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration; import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Condition; -import org.springframework.context.annotation.ConditionContext; -import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import org.springframework.core.type.AnnotatedTypeMetadata; -import org.springframework.integration.codec.Codec; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.ProducerListener; import org.springframework.util.ObjectUtils; @@ -72,15 +63,12 @@ import org.springframework.util.ObjectUtils; */ @Configuration @ConditionalOnMissingBean(Binder.class) -@Import({ KryoCodecAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class}) +@Import({ PropertyPlaceholderAutoConfiguration.class}) @EnableConfigurationProperties({ KafkaBinderConfigurationProperties.class, KafkaExtendedBindingProperties.class }) public class KafkaBinderConfiguration { protected static final Log logger = LogFactory.getLog(KafkaBinderConfiguration.class); - @Autowired - private Codec codec; - @Autowired private KafkaBinderConfigurationProperties configurationProperties; @@ -105,7 +93,6 @@ public class KafkaBinderConfiguration { KafkaMessageChannelBinder kafkaMessageChannelBinder() { KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder( this.configurationProperties, provisioningProvider()); - kafkaMessageChannelBinder.setCodec(this.codec); kafkaMessageChannelBinder.setProducerListener(producerListener); kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties); return kafkaMessageChannelBinder; @@ -136,45 +123,19 @@ public class KafkaBinderConfiguration { } @Bean - public PublicMetrics kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) { + public MeterBinder kafkaBinderMetrics(KafkaMessageChannelBinder kafkaMessageChannelBinder) { return new KafkaBinderMetrics(kafkaMessageChannelBinder, configurationProperties); } @Bean(name = "adminUtilsOperation") - @Conditional(Kafka09Present.class) - @ConditionalOnClass(name = "kafka.admin.AdminUtils") - public AdminUtilsOperation kafka09AdminUtilsOperation() { - logger.info("AdminUtils selected: Kafka 0.9 AdminUtils"); - return new Kafka09AdminUtilsOperation(); - } - - @Bean(name = "adminUtilsOperation") - @Conditional(Kafka10Present.class) @ConditionalOnClass(name = "kafka.admin.AdminUtils") public AdminUtilsOperation kafka10AdminUtilsOperation() { - logger.info("AdminUtils selected: Kafka 0.10 AdminUtils"); - return new Kafka10AdminUtilsOperation(); + return new KafkaAdminUtilsOperation(); } @Bean - public ApplicationListener jaasInitializer() throws IOException { - return new KafkaBinderJaasInitializerListener(); - } - - static class Kafka10Present implements Condition { - - @Override - public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) { - return AppInfoParser.getVersion().startsWith("0.10"); - } - } - - static class Kafka09Present implements Condition { - - @Override - public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) { - return AppInfoParser.getVersion().startsWith("0.9"); - } + public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException { + return new KafkaJaasLoginModuleInitializer(); } public static class JaasConfigurationProperties { diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AbstractKafkaBinderTests.java similarity index 96% rename from spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java rename to spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AbstractKafkaBinderTests.java index fd3cbe1d..5fc97ee8 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AbstractKafkaBinderTests.java @@ -16,6 +16,7 @@ package org.springframework.cloud.stream.binder.kafka; +import java.io.IOException; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; @@ -27,8 +28,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -82,6 +87,7 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.retry.backoff.FixedBackOffPolicy; import org.springframework.retry.policy.SimpleRetryPolicy; import org.springframework.retry.support.RetryTemplate; +import org.springframework.util.MimeTypeUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.SettableListenableFuture; @@ -90,16 +96,13 @@ import static org.assertj.core.api.Assertions.fail; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; - /** * @author Soby Chacko * @author Ilayaperumal Gopinathan * @author Henryk Konsek * @author Gary Russell */ -public abstract class KafkaBinderTests extends +public abstract class AbstractKafkaBinderTests extends PartitionCapableBinderTests, ExtendedProducerProperties> { @Rule @@ -158,6 +161,7 @@ public abstract class KafkaBinderTests extends moduleInputChannel.subscribe(handler); ExtendedProducerProperties producerProperties = createProducerProperties(); producerProperties.setPartitionCount(2); + producerProperties.getExtension().setHeaderPatterns(new String[] { MessageHeaders.CONTENT_TYPE }); ExtendedConsumerProperties consumerProperties = createConsumerProperties(); consumerProperties.setMaxAttempts(withRetry ? 2 : 1); consumerProperties.setBackOffInitialInterval(100); @@ -183,35 +187,23 @@ public abstract class KafkaBinderTests extends final AtomicReference> boundErrorChannelMessage = new AtomicReference<>(); final AtomicReference> globalErrorChannelMessage = new AtomicReference<>(); final AtomicBoolean hasRecovererInCallStack = new AtomicBoolean(!withRetry); - boundErrorChannel.subscribe(new MessageHandler() { - - @Override - public void handleMessage(Message message) throws MessagingException { - boundErrorChannelMessage.set(message); - String stackTrace = Arrays.toString(new RuntimeException().getStackTrace()); - hasRecovererInCallStack.set(stackTrace.contains("ErrorMessageSendingRecoverer")); - } - - }); - globalErrorChannel.subscribe(new MessageHandler() { - - @Override - public void handleMessage(Message message) throws MessagingException { - globalErrorChannelMessage.set(message); - } - + boundErrorChannel.subscribe(message -> { + boundErrorChannelMessage.set(message); + String stackTrace = Arrays.toString(new RuntimeException().getStackTrace()); + hasRecovererInCallStack.set(stackTrace.contains("ErrorMessageSendingRecoverer")); }); + globalErrorChannel.subscribe(globalErrorChannelMessage::set); Binding dlqConsumerBinding = binder.bindConsumer( "error.dlqTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties); binderBindUnbindLatency(); String testMessagePayload = "test." + UUID.randomUUID().toString(); - Message testMessage = MessageBuilder.withPayload(testMessagePayload).build(); + Message testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build(); moduleOutputChannel.send(testMessage); Message receivedMessage = receive(dlqChannel, 3); assertThat(receivedMessage).isNotNull(); - assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload); + assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes()); assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts()); binderBindUnbindLatency(); @@ -247,7 +239,7 @@ public abstract class KafkaBinderTests extends "testGroup", moduleInputChannel, consumerProperties); String testMessagePayload = "test." + UUID.randomUUID().toString(); - Message testMessage = MessageBuilder.withPayload(testMessagePayload).build(); + Message testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build(); moduleOutputChannel.send(testMessage); assertThat(handler.getLatch().await((int) (timeoutMultiplier * 1000), TimeUnit.MILLISECONDS)); @@ -255,7 +247,7 @@ public abstract class KafkaBinderTests extends assertThat(handler.getReceivedMessages().entrySet()).hasSize(1); Message receivedMessage = handler.getReceivedMessages().entrySet().iterator().next().getValue(); assertThat(receivedMessage).isNotNull(); - assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload); + assertThat(receivedMessage.getPayload()).isEqualTo(testMessagePayload.getBytes()); assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts()); consumerBinding.unbind(); @@ -265,13 +257,13 @@ public abstract class KafkaBinderTests extends successfulInputChannel, consumerProperties); binderBindUnbindLatency(); String testMessage2Payload = "test." + UUID.randomUUID().toString(); - Message testMessage2 = MessageBuilder.withPayload(testMessage2Payload).build(); + Message testMessage2 = MessageBuilder.withPayload(testMessage2Payload.getBytes()).build(); moduleOutputChannel.send(testMessage2); Message firstReceived = receive(successfulInputChannel); - assertThat(firstReceived.getPayload()).isEqualTo(testMessagePayload); + assertThat(firstReceived.getPayload()).isEqualTo(testMessagePayload.getBytes()); Message secondReceived = receive(successfulInputChannel); - assertThat(secondReceived.getPayload()).isEqualTo(testMessage2Payload); + assertThat(secondReceived.getPayload()).isEqualTo(testMessage2Payload.getBytes()); consumerBinding.unbind(); producerBinding.unbind(); } @@ -304,18 +296,18 @@ public abstract class KafkaBinderTests extends "error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties); String testMessagePayload = "test." + UUID.randomUUID().toString(); - Message testMessage = MessageBuilder.withPayload(testMessagePayload).build(); + Message testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build(); moduleOutputChannel.send(testMessage); Message dlqMessage = receive(dlqChannel, 3); assertThat(dlqMessage).isNotNull(); - assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload); + assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes()); // first attempt fails assertThat(handler.getReceivedMessages().entrySet()).hasSize(1); Message handledMessage = handler.getReceivedMessages().entrySet().iterator().next().getValue(); assertThat(handledMessage).isNotNull(); - assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload); + assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload.getBytes()); assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts()); binderBindUnbindLatency(); dlqConsumerBinding.unbind(); @@ -326,11 +318,11 @@ public abstract class KafkaBinderTests extends consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0", "testGroup", successfulInputChannel, consumerProperties); String testMessage2Payload = "test." + UUID.randomUUID().toString(); - Message testMessage2 = MessageBuilder.withPayload(testMessage2Payload).build(); + Message testMessage2 = MessageBuilder.withPayload(testMessage2Payload.getBytes()).build(); moduleOutputChannel.send(testMessage2); Message receivedMessage = receive(successfulInputChannel); - assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload); + assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload.getBytes()); binderBindUnbindLatency(); consumerBinding.unbind(); @@ -367,18 +359,18 @@ public abstract class KafkaBinderTests extends dlqConsumerProperties); String testMessagePayload = "test." + UUID.randomUUID().toString(); - Message testMessage = MessageBuilder.withPayload(testMessagePayload).build(); + Message testMessage = MessageBuilder.withPayload(testMessagePayload.getBytes()).build(); moduleOutputChannel.send(testMessage); Message dlqMessage = receive(dlqChannel, 3); assertThat(dlqMessage).isNotNull(); - assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload); + assertThat(dlqMessage.getPayload()).isEqualTo(testMessagePayload.getBytes()); // first attempt fails assertThat(handler.getReceivedMessages().entrySet()).hasSize(1); Message handledMessage = handler.getReceivedMessages().entrySet().iterator().next().getValue(); assertThat(handledMessage).isNotNull(); - assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload); + assertThat(handledMessage.getPayload()).isEqualTo(testMessagePayload.getBytes()); assertThat(handler.getInvocationCount()).isEqualTo(consumerProperties.getMaxAttempts()); binderBindUnbindLatency(); dlqConsumerBinding.unbind(); @@ -389,11 +381,11 @@ public abstract class KafkaBinderTests extends consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0", "testGroup", successfulInputChannel, consumerProperties); String testMessage2Payload = "test." + UUID.randomUUID().toString(); - Message testMessage2 = MessageBuilder.withPayload(testMessage2Payload).build(); + Message testMessage2 = MessageBuilder.withPayload(testMessage2Payload.getBytes()).build(); moduleOutputChannel.send(testMessage2); Message receivedMessage = receive(successfulInputChannel); - assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload); + assertThat(receivedMessage.getPayload()).isEqualTo(testMessage2Payload.getBytes()); binderBindUnbindLatency(); consumerBinding.unbind(); @@ -598,8 +590,6 @@ public abstract class KafkaBinderTests extends Binder binder = getBinder(createConfigurationProperties()); GenericApplicationContext context = new GenericApplicationContext(); context.refresh(); - // binder.setApplicationContext(context); - // binder.afterPropertiesSet(); DirectChannel output = new DirectChannel(); QueueChannel input1 = new QueueChannel(); @@ -949,20 +939,28 @@ public abstract class KafkaBinderTests extends } }; + ObjectMapper om = new ObjectMapper(); + if (usesExplicitRouting()) { - assertThat(receive0.getPayload()).isEqualTo(0); - assertThat(receive1.getPayload()).isEqualTo(1); - assertThat(receive2.getPayload()).isEqualTo(2); + assertThat(om.readValue((byte[])receive0.getPayload(), Integer.class)).isEqualTo(0); + assertThat(om.readValue((byte[])receive1.getPayload(), Integer.class)).isEqualTo(1); + assertThat(om.readValue((byte[])receive2.getPayload(), Integer.class)).isEqualTo(2); assertThat(receive2).has(correlationHeadersForPayload2); } else { List> receivedMessages = Arrays.asList(receive0, receive1, receive2); - assertThat(receivedMessages).extracting("payload").containsExactlyInAnyOrder(0, 1, 2); + assertThat(receivedMessages).extracting("payload").containsExactlyInAnyOrder(new byte[]{48}, new byte[]{49}, new byte[]{50}); Condition> payloadIs2 = new Condition>() { @Override public boolean matches(Message value) { - return value.getPayload().equals(2); + try { + return om.readValue((byte[]) value.getPayload(), Integer.class).equals(2); + } + catch (IOException e) { + // + } + return false; } }; assertThat(receivedMessages).filteredOn(payloadIs2).areExactly(1, correlationHeadersForPayload2); @@ -1038,11 +1036,12 @@ public abstract class KafkaBinderTests extends assertThat(receive2).isNotNull(); Message receive3 = receive(input3); assertThat(receive3).isNotNull(); + ObjectMapper om = new ObjectMapper(); - assertThat(receive0.getPayload()).isEqualTo(0); - assertThat(receive1.getPayload()).isEqualTo(1); - assertThat(receive2.getPayload()).isEqualTo(2); - assertThat(receive3.getPayload()).isEqualTo(3); + assertThat(om.readValue((byte[])receive0.getPayload(), Integer.class)).isEqualTo(0); + assertThat(om.readValue((byte[])receive1.getPayload(), Integer.class)).isEqualTo(1); + assertThat(om.readValue((byte[])receive2.getPayload(), Integer.class)).isEqualTo(2); + assertThat(om.readValue((byte[])receive3.getPayload(), Integer.class)).isEqualTo(3); input0Binding.unbind(); input1Binding.unbind(); @@ -1454,6 +1453,7 @@ public abstract class KafkaBinderTests extends } } + //TODO: We need to evaluate this test as built in serialization has different meaning in 2.0 @Test @SuppressWarnings("unchecked") public void testBuiltinSerialization() throws Exception { @@ -1461,7 +1461,9 @@ public abstract class KafkaBinderTests extends Binding consumerBinding = null; try { String testPayload = new String("test"); - Message message = MessageBuilder.withPayload(testPayload).build(); + Message message = MessageBuilder.withPayload(testPayload.getBytes()) + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN_VALUE.getBytes()) + .build(); SubscribableChannel moduleOutputChannel = new DirectChannel(); String testTopicName = "existing" + System.currentTimeMillis(); KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); @@ -1484,8 +1486,8 @@ public abstract class KafkaBinderTests extends moduleOutputChannel.send(message); Message inbound = receive(moduleInputChannel, 5); assertThat(inbound).isNotNull(); - assertThat(inbound.getPayload()).isEqualTo("test"); - assertThat(inbound.getHeaders()).containsEntry("contentType", "text/plain"); + assertThat(inbound.getPayload()).isEqualTo("test".getBytes()); + assertThat(inbound.getHeaders()).containsEntry("contentType", "text/plain".getBytes()); } finally { if (producerBinding != null) { @@ -1754,7 +1756,7 @@ public abstract class KafkaBinderTests extends producerProps.setHeaderMode(HeaderMode.raw); producerProps.setErrorChannelEnabled(true); Binding producerBinding = binder.bindProducer("ec.0", moduleOutputChannel, producerProps); - final Message message = MessageBuilder.withPayload("bad").setHeader(MessageHeaders.CONTENT_TYPE, "foo/bar") + final Message message = MessageBuilder.withPayload("bad").setHeader(MessageHeaders.CONTENT_TYPE, "application/json") .build(); SubscribableChannel ec = binder.getApplicationContext().getBean("ec.0.errors", SubscribableChannel.class); final AtomicReference> errorMessage = new AtomicReference<>(); @@ -1785,7 +1787,7 @@ public abstract class KafkaBinderTests extends new DirectFieldAccessor(endpoint).setPropertyValue("kafkaTemplate", new KafkaTemplate(mock(ProducerFactory.class)) { - @Override + @Override // SIK < 2.3 public ListenableFuture send(String topic, Object payload) { sent.set(payload); SettableListenableFuture future = new SettableListenableFuture<>(); @@ -1793,6 +1795,14 @@ public abstract class KafkaBinderTests extends return future; } + @Override // SIK 2.3+ + public ListenableFuture send(ProducerRecord record) { + sent.set(record.value()); + SettableListenableFuture future = new SettableListenableFuture<>(); + future.setException(fooException); + return future; + } + }); moduleOutputChannel.send(message); @@ -1801,7 +1811,8 @@ public abstract class KafkaBinderTests extends assertThat(errorMessage.get().getPayload()).isInstanceOf(KafkaSendFailureException.class); KafkaSendFailureException exception = (KafkaSendFailureException) errorMessage.get().getPayload(); assertThat(exception.getCause()).isSameAs(fooException); - assertThat(new String(((byte[] )exception.getFailedMessage().getPayload()))).isEqualTo(message.getPayload()); + ObjectMapper om = new ObjectMapper(); + assertThat(om.readValue((byte[] )exception.getFailedMessage().getPayload(), String.class)).isEqualTo(message.getPayload()); assertThat(exception.getRecord().value()).isSameAs(sent.get()); producerBinding.unbind(); } @@ -1830,7 +1841,7 @@ public abstract class KafkaBinderTests extends @Override public void handleMessage(Message message) throws MessagingException { invocationCount++; - Long offset = message.getHeaders().get(KafkaBinderTests.this.getKafkaOffsetHeaderKey(), Long.class); + Long offset = message.getHeaders().get(AbstractKafkaBinderTests.this.getKafkaOffsetHeaderKey(), Long.class); // using the offset as key allows to ensure that we don't store duplicate // messages on retry if (!receivedMessages.containsKey(offset)) { diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AbstractKafkaTestBinder.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AbstractKafkaTestBinder.java index fa48a603..b8bd62e3 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AbstractKafkaTestBinder.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/AbstractKafkaTestBinder.java @@ -21,9 +21,6 @@ import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties; import org.springframework.context.ApplicationContext; -import org.springframework.integration.codec.Codec; -import org.springframework.integration.codec.kryo.PojoCodec; -import org.springframework.integration.tuple.TupleKryoRegistrar; /** * @author Soby Chacko @@ -47,9 +44,4 @@ public abstract class AbstractKafkaTestBinder extends return this.applicationContext; } - protected static Codec getCodec() { - return new PojoCodec(new TupleKryoRegistrar()); - } - } - diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderAutoConfigurationPropertiesTest.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderAutoConfigurationPropertiesTest.java index 5f21fafd..ca9e6333 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderAutoConfigurationPropertiesTest.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderAutoConfigurationPropertiesTest.java @@ -66,10 +66,10 @@ public class KafkaBinderAutoConfigurationPropertiesTest { ExtendedProducerProperties producerProperties = new ExtendedProducerProperties<>( new KafkaProducerProperties()); Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", - ExtendedProducerProperties.class); + String.class, ExtendedProducerProperties.class); getProducerFactoryMethod.setAccessible(true); DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod - .invoke(this.kafkaMessageChannelBinder, producerProperties); + .invoke(this.kafkaMessageChannelBinder, "foo", producerProperties); Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class); ReflectionUtils.makeAccessible(producerFactoryConfigField); diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationPropertiesTest.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationPropertiesTest.java index aa02c349..7a85a8b1 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationPropertiesTest.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationPropertiesTest.java @@ -63,10 +63,10 @@ public class KafkaBinderConfigurationPropertiesTest { ExtendedProducerProperties producerProperties = new ExtendedProducerProperties<>( kafkaProducerProperties); Method getProducerFactoryMethod = KafkaMessageChannelBinder.class.getDeclaredMethod("getProducerFactory", - ExtendedProducerProperties.class); + String.class, ExtendedProducerProperties.class); getProducerFactoryMethod.setAccessible(true); DefaultKafkaProducerFactory producerFactory = (DefaultKafkaProducerFactory) getProducerFactoryMethod - .invoke(this.kafkaMessageChannelBinder, producerProperties); + .invoke(this.kafkaMessageChannelBinder, "bar", producerProperties); Field producerFactoryConfigField = ReflectionUtils.findField(DefaultKafkaProducerFactory.class, "configs", Map.class); ReflectionUtils.makeAccessible(producerFactoryConfigField); diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderJaasInitializerListenerTest.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderJaasInitializerListenerTest.java deleted file mode 100644 index 4cbe8dc0..00000000 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderJaasInitializerListenerTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.binder.kafka; - -import javax.security.auth.login.AppConfigurationEntry; - -import com.sun.security.auth.login.ConfigFile; - -import org.apache.kafka.common.security.JaasUtils; -import org.junit.Test; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.core.io.ClassPathResource; - -import static org.assertj.core.api.Assertions.assertThat; - -/** - * @author Marius Bogoevici - */ -public class KafkaBinderJaasInitializerListenerTest { - - @Test - public void testConfigurationParsedCorrectlyWithKafkaClient() throws Exception { - ConfigFile configFile = new ConfigFile(new ClassPathResource("jaas-sample-kafka-only.conf").getURI()); - final AppConfigurationEntry[] kafkaConfigurationArray = configFile.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_CLIENT); - - final ConfigurableApplicationContext context = - SpringApplication.run(SimpleApplication.class, - "--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true", - "--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true", - "--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab", - "--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM"); - javax.security.auth.login.Configuration configuration = javax.security.auth.login.Configuration.getConfiguration(); - - final AppConfigurationEntry[] kafkaConfiguration = configuration.getAppConfigurationEntry(JaasUtils.LOGIN_CONTEXT_CLIENT); - assertThat(kafkaConfiguration).hasSize(1); - assertThat(kafkaConfiguration[0].getOptions()).isEqualTo(kafkaConfigurationArray[0].getOptions()); - context.close(); - } - - @SpringBootApplication - public static class SimpleApplication { - - } -} diff --git a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java index 1814d01b..58b9cbd8 100644 --- a/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java +++ b/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java @@ -16,11 +16,12 @@ package org.springframework.cloud.stream.binder.kafka; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.Node; @@ -31,7 +32,6 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import org.springframework.boot.actuate.metrics.Metric; import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.TopicInformation; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @@ -61,6 +61,8 @@ public class KafkaBinderMetricsTest { @Mock private KafkaMessageChannelBinder binder; + private MeterRegistry meterRegistry = new SimpleMeterRegistry(); + private Map topicsInUse = new HashMap<>(); @Mock @@ -82,11 +84,10 @@ public class KafkaBinderMetricsTest { List partitions = partitions(new Node(0, null, 0)); topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions)); given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions); - Collection> collectedMetrics = metrics.metrics(); - assertThat(collectedMetrics).hasSize(1); - assertThat(collectedMetrics.iterator().next().getName()) - .isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC)); - assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(500L); + metrics.bindTo(meterRegistry); + assertThat(meterRegistry.getMeters()).hasSize(1); + MeterRegistry.Search group = meterRegistry.find(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC)); + assertThat(group.gauge().get().value()).isEqualTo(500.0); } @Test @@ -99,11 +100,10 @@ public class KafkaBinderMetricsTest { List partitions = partitions(new Node(0, null, 0), new Node(0, null, 0)); topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions)); given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions); - Collection> collectedMetrics = metrics.metrics(); - assertThat(collectedMetrics).hasSize(1); - assertThat(collectedMetrics.iterator().next().getName()) - .isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC)); - assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(1000L); + metrics.bindTo(meterRegistry); + assertThat(meterRegistry.getMeters()).hasSize(1); + MeterRegistry.Search group = meterRegistry.find(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC)); + assertThat(group.gauge().get().value()).isEqualTo(1000.0); } @Test @@ -111,19 +111,18 @@ public class KafkaBinderMetricsTest { List partitions = partitions(new Node(0, null, 0)); topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions)); given(consumer.partitionsFor(TEST_TOPIC)).willReturn(partitions); - Collection> collectedMetrics = metrics.metrics(); - assertThat(collectedMetrics).hasSize(1); - assertThat(collectedMetrics.iterator().next().getName()) - .isEqualTo(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC)); - assertThat(collectedMetrics.iterator().next().getValue()).isEqualTo(1000L); + metrics.bindTo(meterRegistry); + assertThat(meterRegistry.getMeters()).hasSize(1); + MeterRegistry.Search group = meterRegistry.find(String.format("%s.%s.%s.lag", METRIC_PREFIX, "group", TEST_TOPIC)); + assertThat(group.gauge().get().value()).isEqualTo(1000.0); } @Test public void shouldNotCalculateLagForProducerTopics() { List partitions = partitions(new Node(0, null, 0)); topicsInUse.put(TEST_TOPIC, new TopicInformation(null, partitions)); - Collection> collectedMetrics = metrics.metrics(); - assertThat(collectedMetrics).isEmpty(); + metrics.bindTo(meterRegistry); + assertThat(meterRegistry.getMeters()).isEmpty(); } private List partitions(Node... nodes) { diff --git a/spring-cloud-stream-binder-kafka/src/test/resources/logback.xml b/spring-cloud-stream-binder-kafka/src/test/resources/logback.xml index 39803372..ba06d203 100644 --- a/spring-cloud-stream-binder-kafka/src/test/resources/logback.xml +++ b/spring-cloud-stream-binder-kafka/src/test/resources/logback.xml @@ -4,6 +4,7 @@ %d{ISO8601} %5p %t %c{2}:%L - %m%n + diff --git a/spring-cloud-stream-binder-kstream/.jdk8 b/spring-cloud-stream-binder-kstream/.jdk8 new file mode 100644 index 00000000..e69de29b diff --git a/spring-cloud-stream-binder-kstream/pom.xml b/spring-cloud-stream-binder-kstream/pom.xml index 04d9afcc..5cd6fe6d 100644 --- a/spring-cloud-stream-binder-kstream/pom.xml +++ b/spring-cloud-stream-binder-kstream/pom.xml @@ -10,7 +10,7 @@ org.springframework.cloud spring-cloud-stream-binder-kafka-parent - 1.3.1.BUILD-SNAPSHOT + 2.0.0.BUILD-SNAPSHOT @@ -23,10 +23,6 @@ spring-boot-configuration-processor true - - org.springframework.cloud - spring-cloud-stream-codec - org.springframework.boot spring-boot-autoconfigure @@ -52,13 +48,26 @@ org.springframework.kafka spring-kafka-test - test org.apache.kafka kafka_2.11 test test + + + jline + jline + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + org.springframework.cloud diff --git a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamBinder.java b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamBinder.java index 9d482312..a7ece547 100644 --- a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamBinder.java +++ b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamBinder.java @@ -23,18 +23,13 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.springframework.cloud.stream.binder.AbstractBinder; -import org.springframework.cloud.stream.binder.BinderHeaders; import org.springframework.cloud.stream.binder.Binding; import org.springframework.cloud.stream.binder.DefaultBinding; -import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils; import org.springframework.cloud.stream.binder.ExtendedConsumerProperties; import org.springframework.cloud.stream.binder.ExtendedProducerProperties; import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder; -import org.springframework.cloud.stream.binder.HeaderMode; -import org.springframework.cloud.stream.binder.MessageValues; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties; @@ -43,8 +38,6 @@ import org.springframework.cloud.stream.binder.kstream.config.KStreamConsumerPro import org.springframework.cloud.stream.binder.kstream.config.KStreamExtendedBindingProperties; import org.springframework.cloud.stream.binder.kstream.config.KStreamProducerProperties; import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.util.MimeType; import org.springframework.util.StringUtils; /** @@ -54,8 +47,6 @@ public class KStreamBinder extends AbstractBinder, ExtendedConsumerProperties, ExtendedProducerProperties> implements ExtendedPropertiesBinder, KStreamConsumerProperties, KStreamProducerProperties> { - private String[] headers; - private final KafkaTopicProvisioner kafkaTopicProvisioner; private final KStreamExtendedBindingProperties kStreamExtendedBindingProperties; @@ -67,7 +58,6 @@ public class KStreamBinder extends public KStreamBinder(KafkaBinderConfigurationProperties binderConfigurationProperties, KafkaTopicProvisioner kafkaTopicProvisioner, KStreamExtendedBindingProperties kStreamExtendedBindingProperties, StreamsConfig streamsConfig) { this.binderConfigurationProperties = binderConfigurationProperties; - this.headers = EmbeddedHeaderUtils.headersToEmbed(binderConfigurationProperties.getHeaders()); this.kafkaTopicProvisioner = kafkaTopicProvisioner; this.kStreamExtendedBindingProperties = kStreamExtendedBindingProperties; this.streamsConfig = streamsConfig; @@ -90,43 +80,13 @@ public class KStreamBinder extends ExtendedProducerProperties extendedProducerProperties = new ExtendedProducerProperties( new KafkaProducerProperties()); this.kafkaTopicProvisioner.provisionProducerDestination(name, extendedProducerProperties); - if (HeaderMode.embeddedHeaders.equals(properties.getHeaderMode())) { - outboundBindTarget = outboundBindTarget.map(new KeyValueMapper>() { - @Override - public KeyValue apply(Object k, Object v) { - if (v instanceof Message) { - try { - return new KeyValue<>(k, (Object) KStreamBinder.this.serializeAndEmbedHeadersIfApplicable((Message) v)); - } - catch (Exception e) { - throw new IllegalArgumentException(e); - } - } - else { - throw new IllegalArgumentException("Wrong type of message " + v); - } - } - }); + if (!properties.isUseNativeEncoding()) { + outboundBindTarget = outboundBindTarget + .map((k, v) -> KeyValue.pair(k, (Object) KStreamBinder.this.serializePayloadIfNecessary((Message) v))); } else { - if (!properties.isUseNativeEncoding()) { - outboundBindTarget = outboundBindTarget - .map(new KeyValueMapper>() { - @Override - public KeyValue apply(Object k, Object v) { - return KeyValue.pair(k, (Object) KStreamBinder.this.serializePayloadIfNecessary((Message) v)); - } - }); - } - else { - outboundBindTarget = outboundBindTarget - .map(new KeyValueMapper>() { - @Override - public KeyValue apply(Object k, Object v) { - return KeyValue.pair(k, ((Message) v).getPayload()); - } - }); - } + outboundBindTarget = outboundBindTarget + .map((k, v) -> KeyValue.pair(k, ((Message) v).getPayload())); } if (!properties.isUseNativeEncoding() || StringUtils.hasText(properties.getExtension().getKeySerde()) || StringUtils.hasText(properties.getExtension().getValueSerde())) { try { @@ -167,24 +127,6 @@ public class KStreamBinder extends return new DefaultBinding<>(name, null, outboundBindTarget, null); } - private byte[] serializeAndEmbedHeadersIfApplicable(Message message) throws Exception { - MessageValues transformed = serializePayloadIfNecessary(message); - byte[] payload; - - Object contentType = transformed.get(MessageHeaders.CONTENT_TYPE); - // transform content type headers to String, so that they can be properly embedded - // in JSON - if (contentType instanceof MimeType) { - transformed.put(MessageHeaders.CONTENT_TYPE, contentType.toString()); - } - Object originalContentType = transformed.get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE); - if (originalContentType instanceof MimeType) { - transformed.put(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE, originalContentType.toString()); - } - payload = EmbeddedHeaderUtils.embedHeaders(transformed, headers); - return payload; - } - @Override public KStreamConsumerProperties getExtendedConsumerProperties(String channelName) { return this.kStreamExtendedBindingProperties.getExtendedConsumerProperties(channelName); diff --git a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamBoundElementFactory.java b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamBoundElementFactory.java index 8c4a5845..7a6cc376 100644 --- a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamBoundElementFactory.java +++ b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamBoundElementFactory.java @@ -16,9 +16,6 @@ package org.springframework.cloud.stream.binder.kstream; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - import org.aopalliance.intercept.MethodInterceptor; import org.aopalliance.intercept.MethodInvocation; import org.apache.kafka.streams.KeyValue; @@ -27,19 +24,13 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.springframework.aop.framework.ProxyFactory; -import org.springframework.cloud.stream.binder.ConsumerProperties; -import org.springframework.cloud.stream.binder.EmbeddedHeaderUtils; -import org.springframework.cloud.stream.binder.HeaderMode; -import org.springframework.cloud.stream.binder.MessageSerializationUtils; -import org.springframework.cloud.stream.binder.MessageValues; -import org.springframework.cloud.stream.binder.StringConvertingContentTypeResolver; import org.springframework.cloud.stream.binding.AbstractBindingTargetFactory; import org.springframework.cloud.stream.config.BindingProperties; import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; -import org.springframework.integration.codec.Codec; import org.springframework.integration.support.MutableMessageHeaders; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -55,47 +46,31 @@ public class KStreamBoundElementFactory extends AbstractBindingTargetFactory> payloadTypeCache = new ConcurrentHashMap<>(); - private CompositeMessageConverterFactory compositeMessageConverterFactory; public KStreamBoundElementFactory(KStreamBuilder streamBuilder, BindingServiceProperties bindingServiceProperties, - Codec codec, CompositeMessageConverterFactory compositeMessageConverterFactory) { + CompositeMessageConverterFactory compositeMessageConverterFactory) { super(KStream.class); this.bindingServiceProperties = bindingServiceProperties; this.kStreamBuilder = streamBuilder; - this.codec = codec; this.compositeMessageConverterFactory = compositeMessageConverterFactory; } @Override public KStream createInput(String name) { KStream stream = kStreamBuilder.stream(bindingServiceProperties.getBindingDestination(name)); - ConsumerProperties properties = bindingServiceProperties.getConsumerProperties(name); - if (HeaderMode.embeddedHeaders.equals(properties.getHeaderMode())) { + stream = stream.map((key, value) -> { - stream = stream.map(new KeyValueMapper>() { - @Override - public KeyValue apply(Object key, Object value) { - if (!(value instanceof byte[])) { - return new KeyValue<>(key, value); - } - try { - MessageValues messageValues = EmbeddedHeaderUtils - .extractHeaders(MessageBuilder.withPayload((byte[]) value).build(), true); - messageValues = deserializePayloadIfNecessary(messageValues); - return new KeyValue(null, messageValues.toMessage()); - } - catch (Exception e) { - throw new IllegalArgumentException(e); - } - } - }); - } + BindingProperties bindingProperties = bindingServiceProperties.getBindingProperties(name); + String contentType = bindingProperties.getContentType(); + if (!StringUtils.isEmpty(contentType)) { + + Message message = MessageBuilder.withPayload(value) + .setHeader(MessageHeaders.CONTENT_TYPE, contentType).build(); + return new KeyValue<>(key, message); + } + return new KeyValue<>(key, value); + }); return stream; } @@ -106,16 +81,12 @@ public class KStreamBoundElementFactory extends AbstractBindingTargetFactory delegate); @@ -126,23 +97,32 @@ public class KStreamBoundElementFactory extends AbstractBindingTargetFactory delegate; private final MessageConverter messageConverter; + private final BindingServiceProperties bindingServiceProperties; + private String name; - public KStreamWrapperHandler(MessageConverter messageConverter) { + public KStreamWrapperHandler(MessageConverter messageConverter, + BindingServiceProperties bindingServiceProperties, + String name) { this.messageConverter = messageConverter; + this.bindingServiceProperties = bindingServiceProperties; + this.name = name; } public void wrap(KStream delegate) { Assert.notNull(delegate, "delegate cannot be null"); Assert.isNull(this.delegate, "delegate already set to " + this.delegate); if (messageConverter != null) { - KeyValueMapper> keyValueMapper = new KeyValueMapper>() { - @Override - public KeyValue apply(Object k, Object v) { - Message message = (Message) v; - return new KeyValue(k, - messageConverter.toMessage(message.getPayload(), - new MutableMessageHeaders(((Message) v).getHeaders()))); + KeyValueMapper> keyValueMapper = (k, v) -> { + Message message = (Message) v; + BindingProperties bindingProperties = bindingServiceProperties.getBindingProperties(name); + String contentType = bindingProperties.getContentType(); + MutableMessageHeaders messageHeaders = new MutableMessageHeaders(((Message) v).getHeaders()); + if (!StringUtils.isEmpty(contentType)) { + messageHeaders.put(MessageHeaders.CONTENT_TYPE, contentType); } + return new KeyValue<>(k, + messageConverter.toMessage(message.getPayload(), + messageHeaders)); }; delegate = delegate.map(keyValueMapper); } diff --git a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamListenerParameterAdapter.java b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamListenerParameterAdapter.java index 01d00b5a..f2a2c0e8 100644 --- a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamListenerParameterAdapter.java +++ b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamListenerParameterAdapter.java @@ -52,22 +52,22 @@ public class KStreamListenerParameterAdapter implements StreamListenerParameterA final Class valueClass = (resolvableType.getGeneric(1).getRawClass() != null) ? (resolvableType.getGeneric(1).getRawClass()) : Object.class; - return bindingTarget.map(new KeyValueMapper() { - @Override - public Object apply(Object o, Object o2) { - if (valueClass.isAssignableFrom(o2.getClass())) { - return new KeyValue<>(o, o2); - } - else if (o2 instanceof Message) { - return new KeyValue<>(o, messageConverter.fromMessage((Message) o2, valueClass)); - } - else if(o2 instanceof String || o2 instanceof byte[]) { - Message message = MessageBuilder.withPayload(o2).build(); - return new KeyValue<>(o, messageConverter.fromMessage(message, valueClass)); - } - else { - return new KeyValue<>(o, o2); + return bindingTarget.map((KeyValueMapper) (o, o2) -> { + if (valueClass.isAssignableFrom(o2.getClass())) { + return new KeyValue<>(o, o2); + } + else if (o2 instanceof Message) { + if (valueClass.isAssignableFrom(((Message) o2).getPayload().getClass())) { + return new KeyValue<>(o, ((Message) o2).getPayload()); } + return new KeyValue<>(o, messageConverter.fromMessage((Message) o2, valueClass)); + } + else if(o2 instanceof String || o2 instanceof byte[]) { + Message message = MessageBuilder.withPayload(o2).build(); + return new KeyValue<>(o, messageConverter.fromMessage(message, valueClass)); + } + else { + return new KeyValue<>(o, o2); } }); } diff --git a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamStreamListenerResultAdapter.java b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamStreamListenerResultAdapter.java index 5e4af0a8..e1585ff3 100644 --- a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamStreamListenerResultAdapter.java +++ b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/KStreamStreamListenerResultAdapter.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.springframework.cloud.stream.binding.StreamListenerResultAdapter; import org.springframework.messaging.Message; @@ -40,15 +39,12 @@ public class KStreamStreamListenerResultAdapter implements StreamListenerResultA @Override @SuppressWarnings("unchecked") public Closeable adapt(KStream streamListenerResult, KStreamBoundElementFactory.KStreamWrapper boundElement) { - boundElement.wrap(streamListenerResult.map(new KeyValueMapper() { - @Override - public Object apply(Object k, Object v) { - if (v instanceof Message) { - return new KeyValue<>(k, v); - } - else { - return new KeyValue<>(k, MessageBuilder.withPayload(v).build()); - } + boundElement.wrap(streamListenerResult.map((k, v) -> { + if (v instanceof Message) { + return new KeyValue<>(k, v); + } + else { + return new KeyValue<>(k, MessageBuilder.withPayload(v).build()); } })); return new NoOpCloseable(); diff --git a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/config/KStreamBinderConfiguration.java b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/config/KStreamBinderConfiguration.java index c4f62848..b12881df 100644 --- a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/config/KStreamBinderConfiguration.java +++ b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/config/KStreamBinderConfiguration.java @@ -18,27 +18,22 @@ package org.springframework.cloud.stream.binder.kstream.config; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.streams.StreamsConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.admin.AdminUtilsOperation; -import org.springframework.cloud.stream.binder.kafka.admin.Kafka09AdminUtilsOperation; -import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation; +import org.springframework.cloud.stream.binder.kafka.admin.KafkaAdminUtilsOperation; import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; import org.springframework.cloud.stream.binder.kstream.KStreamBinder; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Condition; -import org.springframework.context.annotation.ConditionContext; -import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; -import org.springframework.core.type.AnnotatedTypeMetadata; /** * @author Marius Bogoevici + * @author Gary Russell */ @Configuration @EnableConfigurationProperties(KStreamExtendedBindingProperties.class) @@ -63,34 +58,10 @@ public class KStreamBinderConfiguration { } @Bean(name = "adminUtilsOperation") - @Conditional(Kafka09Present.class) - @ConditionalOnClass(name = "kafka.admin.AdminUtils") - public AdminUtilsOperation kafka09AdminUtilsOperation() { - logger.info("AdminUtils selected: Kafka 0.9 AdminUtils"); - return new Kafka09AdminUtilsOperation(); - } - - @Bean(name = "adminUtilsOperation") - @Conditional(Kafka10Present.class) @ConditionalOnClass(name = "kafka.admin.AdminUtils") public AdminUtilsOperation kafka10AdminUtilsOperation() { logger.info("AdminUtils selected: Kafka 0.10 AdminUtils"); - return new Kafka10AdminUtilsOperation(); + return new KafkaAdminUtilsOperation(); } - static class Kafka10Present implements Condition { - - @Override - public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) { - return AppInfoParser.getVersion().startsWith("0.10"); - } - } - - static class Kafka09Present implements Condition { - - @Override - public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata annotatedTypeMetadata) { - return AppInfoParser.getVersion().startsWith("0.9"); - } - } } diff --git a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/config/KStreamBinderSupportAutoConfiguration.java b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/config/KStreamBinderSupportAutoConfiguration.java index ee92fb20..c3d1080a 100644 --- a/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/config/KStreamBinderSupportAutoConfiguration.java +++ b/spring-cloud-stream-binder-kstream/src/main/java/org/springframework/cloud/stream/binder/kstream/config/KStreamBinderSupportAutoConfiguration.java @@ -33,7 +33,6 @@ import org.springframework.cloud.stream.binder.kstream.KStreamStreamListenerResu import org.springframework.cloud.stream.config.BindingServiceProperties; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; import org.springframework.context.annotation.Bean; -import org.springframework.integration.codec.Codec; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.core.KStreamBuilderFactoryBean; import org.springframework.util.ObjectUtils; @@ -70,10 +69,9 @@ public class KStreamBinderSupportAutoConfiguration { public StreamsConfig streamsConfig(KafkaBinderConfigurationProperties binderConfigurationProperties) { Properties props = new Properties(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, binderConfigurationProperties.getKafkaConnectionString()); - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()); - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArraySerde.class.getName()); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "default"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, binderConfigurationProperties.getZkConnectionString()); if (!ObjectUtils.isEmpty(binderConfigurationProperties.getConfiguration())) { props.putAll(binderConfigurationProperties.getConfiguration()); } @@ -94,9 +92,9 @@ public class KStreamBinderSupportAutoConfiguration { @Bean public KStreamBoundElementFactory kStreamBindableTargetFactory(KStreamBuilder kStreamBuilder, - BindingServiceProperties bindingServiceProperties, Codec codec, + BindingServiceProperties bindingServiceProperties, CompositeMessageConverterFactory compositeMessageConverterFactory) { - return new KStreamBoundElementFactory(kStreamBuilder, bindingServiceProperties, codec, + return new KStreamBoundElementFactory(kStreamBuilder, bindingServiceProperties, compositeMessageConverterFactory); } diff --git a/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamBinderPojoInputAndPrimitiveTypeOutputTests.java b/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamBinderPojoInputAndPrimitiveTypeOutputTests.java index 7b857b83..f3e785ea 100644 --- a/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamBinderPojoInputAndPrimitiveTypeOutputTests.java +++ b/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamBinderPojoInputAndPrimitiveTypeOutputTests.java @@ -18,16 +18,13 @@ package org.springframework.cloud.stream.binder.kstream; import java.util.Map; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -52,20 +49,21 @@ import static org.assertj.core.api.Assertions.assertThat; /** * * @author Soby Chacko + * @author Gary Russell */ public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests { @ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "counts-id"); - private static Consumer consumer; + private static Consumer consumer; @BeforeClass public static void setUp() throws Exception { Map consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + //consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class.getName()); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); consumer = cf.createConsumer(); embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "counts-id"); } @@ -88,12 +86,16 @@ public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests { "--spring.cloud.stream.bindings.output.producer.headerMode=raw", "--spring.cloud.stream.bindings.output.producer.useNativeEncoding=true", "--spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde", - "--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$LongSerde", + "--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde", "--spring.cloud.stream.bindings.input.consumer.headerMode=raw", "--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(), "--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString()); - receiveAndValidateFoo(context); - context.close(); + try { + receiveAndValidateFoo(context); + } + finally { + context.close(); + } } private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception{ @@ -102,10 +104,12 @@ public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests { KafkaTemplate template = new KafkaTemplate<>(pf, true); template.setDefaultTopic("foos"); template.sendDefault("{\"id\":\"123\"}"); - ConsumerRecord cr = KafkaTestUtils.getSingleRecord(consumer, "counts-id"); + ConsumerRecord cr = KafkaTestUtils.getSingleRecord(consumer, "counts-id"); assertThat(cr.key().equals(123)); - assertThat(cr.value().equals(1L)); + ObjectMapper om = new ObjectMapper(); + Long aLong = om.readValue(cr.value(), Long.class); + assertThat(aLong.equals(1L)); } @EnableBinding(KStreamProcessor.class) @@ -116,30 +120,12 @@ public class KStreamBinderPojoInputAndPrimitiveTypeOutputTests { @SendTo("output") public KStream process(KStream input) { return input - .filter(new Predicate() { - - @Override - public boolean test(Object key, Product product) { - return product.getId() == 123; - } - }) - .map(new KeyValueMapper>() { - - @Override - public KeyValue apply(Object key, Product value) { - return new KeyValue<>(value, value); - } - }) + .filter((key, product) -> product.getId() == 123) + .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)) .count(TimeWindows.of(5000), "id-count-store") .toStream() - .map(new KeyValueMapper, Long, KeyValue>() { - - @Override - public KeyValue apply(Windowed key, Long value) { - return new KeyValue<>(key.key().id, value); - } - }); + .map((key, value) -> new KeyValue<>(key.key().id, value)); } } diff --git a/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamBinderWordCountIntegrationTests.java b/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamBinderWordCountIntegrationTests.java index 10bfc32c..089586a7 100644 --- a/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamBinderWordCountIntegrationTests.java +++ b/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamBinderWordCountIntegrationTests.java @@ -18,7 +18,6 @@ package org.springframework.cloud.stream.binder.kstream; import java.util.Arrays; import java.util.Date; -import java.util.List; import java.util.Map; import org.apache.kafka.clients.consumer.Consumer; @@ -27,10 +26,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.kstream.Windowed; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -39,9 +35,11 @@ import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.binder.kstream.annotations.KStreamProcessor; +import org.springframework.cloud.stream.binder.kstream.config.KStreamApplicationSupportProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory; @@ -56,6 +54,7 @@ import static org.assertj.core.api.Assertions.assertThat; * * @author Marius Bogoevici * @author Soby Chacko + * @author Gary Russell */ public class KStreamBinderWordCountIntegrationTests { @@ -92,13 +91,18 @@ public class KStreamBinderWordCountIntegrationTests { "--spring.cloud.stream.kstream.binder.configuration.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde", "--spring.cloud.stream.bindings.output.producer.headerMode=raw", "--spring.cloud.stream.bindings.output.producer.useNativeEncoding=true", + "--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde", "--spring.cloud.stream.bindings.input.consumer.headerMode=raw", "--spring.cloud.stream.kstream.timeWindow.length=5000", "--spring.cloud.stream.kstream.timeWindow.advanceBy=0", "--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(), "--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString()); - receiveAndValidate(context); - context.close(); + try { + receiveAndValidate(context); + } + finally { + context.close(); + } } private void receiveAndValidate(ConfigurableApplicationContext context) throws Exception{ @@ -113,6 +117,7 @@ public class KStreamBinderWordCountIntegrationTests { @EnableBinding(KStreamProcessor.class) @EnableAutoConfiguration + @EnableConfigurationProperties(KStreamApplicationSupportProperties.class) public static class WordCountProcessorApplication { @Autowired @@ -123,30 +128,12 @@ public class KStreamBinderWordCountIntegrationTests { public KStream process(KStream input) { return input - .flatMapValues(new ValueMapper>() { - - @Override - public List apply(String value) { - return Arrays.asList(value.toLowerCase().split("\\W+")); - } - }) - .map(new KeyValueMapper>() { - - @Override - public KeyValue apply(Object key, String value) { - return new KeyValue<>(value, value); - } - }) + .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Serdes.String(), Serdes.String()) - .count(timeWindows, "WordCounts") + .count(timeWindows, "foo-WordCounts") .toStream() - .map(new KeyValueMapper, Long, KeyValue>() { - - @Override - public KeyValue apply(Windowed key, Long value) { - return new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))); - } - }); + .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))); } } diff --git a/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamInteractiveQueryIntegrationTests.java b/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamInteractiveQueryIntegrationTests.java index 8897e29a..68d249f8 100644 --- a/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamInteractiveQueryIntegrationTests.java +++ b/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KStreamInteractiveQueryIntegrationTests.java @@ -24,8 +24,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.junit.AfterClass; @@ -54,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * @author Soby Chacko + * @author Gary Russell */ public class KStreamInteractiveQueryIntegrationTests { @@ -86,13 +85,18 @@ public class KStreamInteractiveQueryIntegrationTests { "--spring.cloud.stream.kstream.binder.configuration.commit.interval.ms=1000", "--spring.cloud.stream.kstream.binder.configuration.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde", "--spring.cloud.stream.kstream.binder.configuration.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde", + "--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde", "--spring.cloud.stream.bindings.output.producer.headerMode=raw", "--spring.cloud.stream.bindings.output.producer.useNativeEncoding=true", "--spring.cloud.stream.bindings.input.consumer.headerMode=raw", "--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(), "--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString()); - receiveAndValidateFoo(context); - context.close(); + try { + receiveAndValidateFoo(context); + } + finally { + context.close(); + } } private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception{ @@ -120,30 +124,12 @@ public class KStreamInteractiveQueryIntegrationTests { public KStream process(KStream input) { return input - .filter(new Predicate() { - - @Override - public boolean test(Object key, Product product) { - return product.getId() == 123; - } - }) - .map(new KeyValueMapper>() { - - @Override - public KeyValue apply(Object key, Product value) { - return new KeyValue<>(value.id, value); - } - }) + .filter((key, product) -> product.getId() == 123) + .map((key, value) -> new KeyValue<>(value.id, value)) .groupByKey(new Serdes.IntegerSerde(), new JsonSerde<>(Product.class)) .count("prod-id-count-store") .toStream() - .map(new KeyValueMapper>() { - - @Override - public KeyValue apply(Integer key, Long value) { - return new KeyValue<>(null, "Count for product with ID 123: " + value); - } - }); + .map((key, value) -> new KeyValue<>(null, "Count for product with ID 123: " + value)); } @Bean @@ -151,7 +137,6 @@ public class KStreamInteractiveQueryIntegrationTests { return new Foo(kStreamBuilderFactoryBean); } - static class Foo { KStreamBuilderFactoryBean kStreamBuilderFactoryBean; diff --git a/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KstreamBinderPojoInputStringOutputIntegrationTests.java b/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KstreamBinderPojoInputStringOutputIntegrationTests.java index b1838ed4..f52fd2ec 100644 --- a/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KstreamBinderPojoInputStringOutputIntegrationTests.java +++ b/spring-cloud-stream-binder-kstream/src/test/java/org/springframework/cloud/stream/binder/kstream/KstreamBinderPojoInputStringOutputIntegrationTests.java @@ -23,10 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.Windowed; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -51,6 +48,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * @author Marius Bogoevici * @author Soby Chacko + * @author Gary Russell */ public class KstreamBinderPojoInputStringOutputIntegrationTests { @@ -86,11 +84,16 @@ public class KstreamBinderPojoInputStringOutputIntegrationTests { "--spring.cloud.stream.bindings.output.producer.headerMode=raw", "--spring.cloud.stream.bindings.output.producer.useNativeEncoding=true", "--spring.cloud.stream.kstream.bindings.output.producer.keySerde=org.apache.kafka.common.serialization.Serdes$IntegerSerde", + "--spring.cloud.stream.kstream.bindings.output.producer.valueSerde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde", "--spring.cloud.stream.bindings.input.consumer.headerMode=raw", "--spring.cloud.stream.kstream.binder.brokers=" + embeddedKafka.getBrokersAsString(), "--spring.cloud.stream.kstream.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString()); - receiveAndValidateFoo(context); - context.close(); + try { + receiveAndValidateFoo(context); + } + finally { + context.close(); + } } private void receiveAndValidateFoo(ConfigurableApplicationContext context) throws Exception { @@ -112,30 +115,12 @@ public class KstreamBinderPojoInputStringOutputIntegrationTests { public KStream process(KStream input) { return input - .filter(new Predicate() { - - @Override - public boolean test(Object key, Product product) { - return product.getId() == 123; - } - }) - .map(new KeyValueMapper>() { - - @Override - public KeyValue apply(Object key, Product value) { - return new KeyValue<>(value, value); - } - }) + .filter((key, product) -> product.getId() == 123) + .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(new JsonSerde<>(Product.class), new JsonSerde<>(Product.class)) .count(TimeWindows.of(5000), "id-count-store") .toStream() - .map(new KeyValueMapper, Long, KeyValue>() { - - @Override - public KeyValue apply(Windowed key, Long value) { - return new KeyValue<>(key.key().id, "Count for product with ID 123: " + value); - } - }); + .map((key, value) -> new KeyValue<>(key.key().id, "Count for product with ID 123: " + value)); } } diff --git a/update-version.sh b/update-version.sh index 48459701..bc0672b8 100755 --- a/update-version.sh +++ b/update-version.sh @@ -7,7 +7,7 @@ -lines=$(find . -name 'pom.xml' | xargs egrep "SNAPSHOT" | grep -v regex | wc -l) +lines=$(find . -name 'pom.xml' | xargs egrep "SNAPSHOT|M[0-9]|RC[0-9]" | grep -v regex | wc -l) if [ $lines -eq 0 ]; then echo "No snapshots found" else