Compare commits

...

26 Commits

Author SHA1 Message Date
Soby Chacko
5fb0b4329b Fix tests
Fixing a test where the underlying change was not cherry-picked,
but the test was. Removing the test changes.
2018-11-19 16:40:12 -05:00
Oleg Zhurakousky
87e1b35d55 Fixed tests related to GH-1527 change in core 2018-11-19 15:56:21 -05:00
Soby Chacko
999740597a Remove unused imports 2018-11-19 15:37:46 -05:00
Soby Chacko
475273f5db 2.0.2.RELEASE 2018-11-19 15:34:33 -05:00
Soby Chacko
8591cc59a5 Backport Kafka HeaderMapper (2.0.x only)
- Backport DefaultKafkaHeaderMapper from Spring Kafka on 2.0.x branch
   in order to address the MediaType content type related issues on the apps.
 - Rename DefaultKafkaHeaderMapper to BinderHeaderMapper and deprecate it
   as we already use the proper version from Spring Kafka on the master branch.
 - Fix test.
2018-11-16 14:38:52 -05:00
Soby Chacko
bea31ce135 Update Kafka binder metrics docs
Fix the wrong metric name used in the Kafka binder metrics for consumer offset lag.
Update the description.

Resolves #422
2018-08-02 09:59:12 -04:00
Soby Chacko
ec503d4025 JAAS initializer regression
Fix JAAS initializer with setting the missing properties.

Resoves #419

Polishing
2018-07-28 08:37:14 -04:00
Soby Chacko
8d94cd2b43 Autoconfigure optimization
Add spring-boot-autoconfigure-processor to the kafka-streams binder for
auto configuration optimization.

Resolves #406
2018-07-24 16:09:23 -04:00
Soby Chacko
7adbc06b5c Next update version: 2.0.2.BUILD-SNAPSHOT 2018-07-11 18:53:53 -04:00
Soby Chacko
d67c98334f 2.0.1.RELEASE 2018-07-11 17:36:18 -04:00
Soby Chacko
3a4f047e9c Update dependencies
spring-cloud-build to 2.0.2.RELEASE
spring-kafka to 2.1.7.RELEASE
2018-07-11 17:34:19 -04:00
Gary Russell
725d2a0de2 GH-404: Synchronize shared consumer
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/404

The fix for issue https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/231
added a shared consumer but the consumer is not thread safe. Add synchronization.

Also, a timeout was added to the `KafkaBinderHealthIndicator` but not to the
`KafkaBinderMetrics` which has a similar shared consumer; add a timeout there.
2018-07-11 16:15:18 -04:00
Soby Chacko
c7dc56e7d2 Provide programmatic access to KafkaStreams object
Providing access to the underlying StreamBuilderFactoryBean by making the bean name
deterministic. Eariler, the binder was using UUID to make the stream builder factory
bean names unique in the event of multiple StreamListeners. Switching to use the
method name instead to keep the StreamBuilder factory beans unique while providing
a deterministic way to giving it programmatic access.

Polishing docs

Fixes #396
2018-07-11 16:14:31 -04:00
Soby Chacko
5c594816bd Fix bad link in docs
Resolves #359
2018-07-11 16:13:39 -04:00
Soby Chacko
c941e2d735 Fix typo in kafka streams docs
Resolves #400
2018-07-11 16:13:25 -04:00
UltimaPhoenix
8a1c2c504d Fix unit test
Remove unnecessary semicolon

Replace deprecated method with the new one

Test refactoring
2018-07-11 16:13:06 -04:00
Artem Bilan
dd48bf1540 GH-381: Remove duplicated SCSt-binder-test dep
Fixes spring-cloud/spring-cloud-stream-binder-kafka#381
2018-07-11 16:12:15 -04:00
Thomas Cheyney
3450b4b360 Reuse Kafka consumer Metrics
Polishing
2018-07-11 16:11:47 -04:00
Soby Chacko
78a8baf81f Kafka Streams initializr image for docs 2018-07-11 16:11:09 -04:00
Soby Chacko
1ea69a10a4 Revert "GH-360: Improve Binder Producer/Consumer Config"
This reverts commit 64431426aa.
2018-07-11 16:09:55 -04:00
Soby Chacko
8f61919069 Revert "Allow Kafka Streams state store creation"
This reverts commit 369c46ce77.
2018-07-11 16:09:35 -04:00
Lei Chen
369c46ce77 Allow Kafka Streams state store creation
* Allow Kafka Streams state store creation when using process/transform method in DSL
 * Add unit test for state store
 * Address code review comments
 * Add author and javadocs
 * Integration test fixing for state store
 * Polishing
2018-07-05 10:17:35 -04:00
Gary Russell
64431426aa GH-360: Improve Binder Producer/Consumer Config
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/360

`producer-configuration` and `consumer-configuration` improperly appear in content-assist.

These are methods used by the binders to get merged configuration data (boot and binder).

Rename the methods and add `producerProperties` and `consumerProperties` to allow
configuration.
2018-04-27 12:46:49 -04:00
slamhan
f77dc50de9 QueryableStore retrieval stops at InvalidStateStoreException
If there are multiple streams, there is a code path that throws
a premature InvalidStateStoreException. Fixing that issue.

Fixes #366

Polishing.
2018-04-24 11:06:30 -04:00
Danish Garg
d141ad3647 Changed occurances of map calls on kafka streams to mapValues
Resolves #357
2018-04-11 15:37:09 -04:00
Oleg Zhurakousky
75dd5f202a Created new maintenance branch 2.0.1 2018-04-06 15:14:37 -04:00
26 changed files with 617 additions and 121 deletions

10
pom.xml
View File

@@ -2,20 +2,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.2.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.4.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
<spring-kafka.version>2.1.10.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.0.3.RELEASE</spring-integration-kafka.version>
<kafka.version>1.0.1</kafka.version>
<spring-cloud-stream.version>2.0.0.RELEASE</spring-cloud-stream.version>
<kafka.version>1.0.2</kafka.version>
<spring-cloud-stream.version>2.0.2.RELEASE</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.2.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

Binary file not shown.

After

Width:  |  Height:  |  Size: 119 KiB

View File

@@ -81,7 +81,7 @@ For common configuration options and properties pertaining to binder, refer to t
=== Kafka Streams Properties
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.binder.`
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.streams.binder.`
literal.
configuration::
@@ -595,4 +595,18 @@ Once you gain access to this bean, then you can query for the particular state-s
----
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
queryableStoreRegistry.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
----
== Accessing the underlying KafkaStreams object
`StreamBuilderFactoryBean` from spring-kafka that is responsible for constructing the `KafkaStreams` object can be accessed programmatically.
Each `StreamBuilderFactoryBean` is registered as `stream-builder` and appended with the `StreamListener` method name.
If your `StreamListener` method is named as `process` for example, the stream builder bean is named as `stream-builder-process`.
Since this is a factory bean, it should be accessed by prepending an ampersand (`&`) when accessing it programmatically.
Following is an example and it assumes the `StreamListener` method is named as `process`
[source]
----
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
----

View File

@@ -454,7 +454,7 @@ public class Application {
== Error Channels
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
See <<binder-error-channels>> for more information.
See <<spring-cloud-stream-overview-error-handling>> for more information.
The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureException` with properties:
@@ -469,6 +469,6 @@ You can consume these exceptions with your own Spring Integration flow.
Kafka binder module exposes the following metrics:
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
For example, if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, the consumer group named `myGroup` has `1000` messages waiting to be consumed from the topic calle `myTopic`.
`spring.cloud.stream.binder.kafka.offset`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
The metrics provided are based on the Mircometer metrics library. The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.2.RELEASE</version>
</parent>
<dependencies>
@@ -62,5 +62,10 @@
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
</project>
</project>

View File

@@ -23,10 +23,12 @@ import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProv
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Soby Chacko
*/
@Configuration
public class KTableBinderConfiguration {
@Autowired

View File

@@ -73,7 +73,7 @@ class KafkaStreamsMessageConversionDelegate {
String contentType = this.kstreamBindingInformationCatalogue.getContentType(outboundBindTarget);
MessageConverter messageConverter = compositeMessageConverterFactory.getMessageConverterForAllRegistered();
return outboundBindTarget.map((k, v) -> {
return outboundBindTarget.mapValues((v) -> {
Message<?> message = v instanceof Message<?> ? (Message<?>) v :
MessageBuilder.withPayload(v).build();
Map<String, Object> headers = new HashMap<>(message.getHeaders());
@@ -81,9 +81,9 @@ class KafkaStreamsMessageConversionDelegate {
headers.put(MessageHeaders.CONTENT_TYPE, contentType);
}
MessageHeaders messageHeaders = new MessageHeaders(headers);
return new KeyValue<>(k,
return
messageConverter.toMessage(message.getPayload(),
messageHeaders).getPayload());
messageHeaders).getPayload();
});
}
@@ -137,10 +137,10 @@ class KafkaStreamsMessageConversionDelegate {
processErrorFromDeserialization(bindingTarget, branch[1]);
//first branch above is the branch where the messages are converted, let it go through further processing.
return branch[0].map((o, o2) -> {
KeyValue<Object, Object> objectObjectKeyValue = keyValueThreadLocal.get();
return branch[0].mapValues((o2) -> {
Object objectValue = keyValueThreadLocal.get().value;
keyValueThreadLocal.remove();
return objectObjectKeyValue;
return objectValue;
});
}

View File

@@ -20,14 +20,12 @@ import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -300,18 +298,18 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
else {
LOG.info("Native decoding is disabled for " + inboundName + ". Inbound message conversion done by Spring Cloud Stream.");
}
stream = stream.map((key, value) -> {
KeyValue<Object, Object> keyValue;
stream = stream.mapValues(value -> {
Object returnValue;
String contentType = bindingProperties.getContentType();
if (!StringUtils.isEmpty(contentType) && !nativeDecoding) {
Message<?> message = MessageBuilder.withPayload(value)
.setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
keyValue = new KeyValue<>(key, message);
returnValue = message;
} else {
returnValue = value;
}
else {
keyValue = new KeyValue<>(key, value);
}
return keyValue;
return returnValue;
});
return stream;
}
@@ -332,12 +330,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean();
streamsBuilder.setAutoStartup(false);
String uuid = UUID.randomUUID().toString();
BeanDefinition streamsBuilderBeanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(), () -> streamsBuilder)
.getRawBeanDefinition();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + uuid, streamsBuilderBeanDefinition);
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + uuid, StreamsBuilderFactoryBean.class);
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + method.getName(), streamsBuilderBeanDefinition);
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
String group = bindingProperties.getGroup();
if (!StringUtils.hasText(group)) {
group = binderConfigurationProperties.getApplicationId();
@@ -367,7 +364,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
BeanDefinition streamsConfigBeanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsConfig>) streamsConfig.getClass(), () -> streamsConfig)
.getRawBeanDefinition();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("streamsConfig-" + uuid, streamsConfigBeanDefinition);
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("streamsConfig-" + method.getName(), streamsConfigBeanDefinition);
streamsBuilder.setStreamsConfig(streamsConfig);
methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderX);

View File

@@ -20,6 +20,7 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreType;
/**
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.state.QueryableStoreType;
* the user applications.
*
* @author Soby Chacko
* @author Renwei Han
* @since 2.0.0
*/
public class QueryableStoreRegistry {
@@ -44,9 +46,14 @@ public class QueryableStoreRegistry {
public <T> T getQueryableStoreType(String storeName, QueryableStoreType<T> storeType) {
for (KafkaStreams kafkaStream : kafkaStreams) {
T store = kafkaStream.store(storeName, storeType);
if (store != null) {
return store;
try{
T store = kafkaStream.store(storeName, storeType);
if (store != null) {
return store;
}
}
catch (InvalidStateStoreException ignored) {
//pass through
}
}
return null;

View File

@@ -106,10 +106,10 @@ public class KafkaStreamsBinderPojoInputAndPrimitiveTypeOutputTests {
template.sendDefault("{\"id\":\"123\"}");
ConsumerRecord<Integer, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts-id");
assertThat(cr.key().equals(123));
assertThat(cr.key()).isEqualTo(123);
ObjectMapper om = new ObjectMapper();
Long aLong = om.readValue(cr.value(), Long.class);
assertThat(aLong.equals(1L));
assertThat(aLong).isEqualTo(1L);
}
@EnableBinding(KafkaStreamsProcessor.class)

View File

@@ -24,11 +24,14 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -48,6 +51,7 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -101,6 +105,12 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
receiveAndValidate(context);
//Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
ReadOnlyWindowStore<Object, Object> store = kafkaStreams.store("foo-WordCounts", QueryableStoreTypes.windowStore());
assertThat(store).isNotNull();
} finally {
context.close();
}

View File

@@ -1,5 +1,5 @@
eclipse.preferences.version=1
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=java;javax;com;org;org.springframework;ch.qos;\#;
org.eclipse.jdt.ui.importorder=java;javax;com;io.micrometer;org;org.springframework;ch.qos;\#;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.2.RELEASE</version>
</parent>
<dependencies>
@@ -37,11 +37,6 @@
<artifactId>spring-boot-autoconfigure</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>

View File

@@ -0,0 +1,344 @@
/*
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* Default header mapper for Apache Kafka.
* Most headers in {@link org.springframework.kafka.support.KafkaHeaders} are not mapped on outbound messages.
* The exceptions are correlation and reply headers for request/reply
* messaging.
* Header types are added to a special header {@link #JSON_TYPES}.
*
* @author Gary Russell
* @since 2.0.2
*
*/
@Deprecated
public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
private static final List<String> DEFAULT_TRUSTED_PACKAGES =
Arrays.asList(
"java.util",
"java.lang"
);
private static final List<String> DEFAULT_TO_STRING_CLASSES =
Arrays.asList(
"org.springframework.util.MimeType",
"org.springframework.http.MediaType"
);
/**
* Header name for java types of other headers.
*/
public static final String JSON_TYPES = "spring_json_header_types";
private final ObjectMapper objectMapper;
private final Set<String> trustedPackages = new LinkedHashSet<>(DEFAULT_TRUSTED_PACKAGES);
private final Set<String> toStringClasses = new LinkedHashSet<>(DEFAULT_TO_STRING_CLASSES);
/**
* Construct an instance with the default object mapper and default header patterns
* for outbound headers; all inbound headers are mapped. The default pattern list is
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
* {@link KafkaHeaders} are never mapped as headers since they represent data in
* consumer/producer records.
* @see #BinderHeaderMapper(ObjectMapper)
*/
public BinderHeaderMapper() {
this(new ObjectMapper());
}
/**
* Construct an instance with the provided object mapper and default header patterns
* for outbound headers; all inbound headers are mapped. The patterns are applied in
* order, stopping on the first match (positive or negative). Patterns are negated by
* preceding them with "!". The default pattern list is
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
* {@link KafkaHeaders} are never mapped as headers since they represent data in
* consumer/producer records.
* @param objectMapper the object mapper.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public BinderHeaderMapper(ObjectMapper objectMapper) {
this(objectMapper,
"!" + MessageHeaders.ID,
"!" + MessageHeaders.TIMESTAMP,
"*");
}
/**
* Construct an instance with a default object mapper and the provided header patterns
* for outbound headers; all inbound headers are mapped. The patterns are applied in
* order, stopping on the first match (positive or negative). Patterns are negated by
* preceding them with "!". The patterns will replace the default patterns; you
* generally should not map the {@code "id" and "timestamp"} headers. Note:
* most of the headers in {@link KafkaHeaders} are ever mapped as headers since they
* represent data in consumer/producer records.
* @param patterns the patterns.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public BinderHeaderMapper(String... patterns) {
this(new ObjectMapper(), patterns);
}
/**
* Construct an instance with the provided object mapper and the provided header
* patterns for outbound headers; all inbound headers are mapped. The patterns are
* applied in order, stopping on the first match (positive or negative). Patterns are
* negated by preceding them with "!". The patterns will replace the default patterns;
* you generally should not map the {@code "id" and "timestamp"} headers. Note: most
* of the headers in {@link KafkaHeaders} are never mapped as headers since they
* represent data in consumer/producer records.
* @param objectMapper the object mapper.
* @param patterns the patterns.
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
*/
public BinderHeaderMapper(ObjectMapper objectMapper, String... patterns) {
super(patterns);
Assert.notNull(objectMapper, "'objectMapper' must not be null");
Assert.noNullElements(patterns, "'patterns' must not have null elements");
this.objectMapper = objectMapper;
}
/**
* Return the object mapper.
* @return the mapper.
*/
protected ObjectMapper getObjectMapper() {
return this.objectMapper;
}
/**
* Provide direct access to the trusted packages set for subclasses.
* @return the trusted packages.
* @since 2.2
*/
protected Set<String> getTrustedPackages() {
return this.trustedPackages;
}
/**
* Provide direct access to the toString() classes by subclasses.
* @return the toString() classes.
* @since 2.2
*/
protected Set<String> getToStringClasses() {
return this.toStringClasses;
}
/**
* Add packages to the trusted packages list (default {@code java.util, java.lang}) used
* when constructing objects from JSON.
* If any of the supplied packages is {@code "*"}, all packages are trusted.
* If a class for a non-trusted package is encountered, the header is returned to the
* application with value of type {@link NonTrustedHeaderType}.
* @param trustedPackages the packages to trust.
*/
public void addTrustedPackages(String... trustedPackages) {
if (trustedPackages != null) {
for (String whiteList : trustedPackages) {
if ("*".equals(whiteList)) {
this.trustedPackages.clear();
break;
}
else {
this.trustedPackages.add(whiteList);
}
}
}
}
/**
* Add class names that the outbound mapper should perform toString() operations on
* before mapping.
* @param classNames the class names.
* @since 2.2
*/
public void addToStringClasses(String... classNames) {
this.toStringClasses.addAll(Arrays.asList(classNames));
}
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final Map<String, String> jsonHeaders = new HashMap<>();
headers.forEach((k, v) -> {
if (matches(k, v)) {
if (v instanceof byte[]) {
target.add(new RecordHeader(k, (byte[]) v));
}
else {
try {
Object value = v;
String className = v.getClass().getName();
if (this.toStringClasses.contains(className)) {
value = v.toString();
className = "java.lang.String";
}
target.add(new RecordHeader(k, getObjectMapper().writeValueAsBytes(value)));
jsonHeaders.put(k, className);
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug("Could not map " + k + " with type " + v.getClass().getName());
}
}
}
}
});
if (jsonHeaders.size() > 0) {
try {
target.add(new RecordHeader(JSON_TYPES, getObjectMapper().writeValueAsBytes(jsonHeaders)));
}
catch (IllegalStateException | JsonProcessingException e) {
logger.error("Could not add json types header", e);
}
}
}
@SuppressWarnings("unchecked")
@Override
public void toHeaders(Headers source, final Map<String, Object> headers) {
Map<String, String> types = null;
Iterator<Header> iterator = source.iterator();
while (iterator.hasNext()) {
Header next = iterator.next();
if (next.key().equals(JSON_TYPES)) {
try {
types = getObjectMapper().readValue(next.value(), HashMap.class);
}
catch (IOException e) {
logger.error("Could not decode json types: " + new String(next.value()), e);
}
break;
}
}
final Map<String, String> jsonTypes = types;
source.forEach(h -> {
if (!(h.key().equals(JSON_TYPES))) {
if (jsonTypes != null && jsonTypes.containsKey(h.key())) {
Class<?> type = Object.class;
String requestedType = jsonTypes.get(h.key());
boolean trusted = false;
try {
trusted = trusted(requestedType);
if (trusted) {
type = ClassUtils.forName(requestedType, null);
}
}
catch (Exception e) {
logger.error("Could not load class for header: " + h.key(), e);
}
if (trusted) {
try {
headers.put(h.key(), getObjectMapper().readValue(h.value(), type));
}
catch (IOException e) {
logger.error("Could not decode json type: " + new String(h.value()) + " for key: " + h.key(),
e);
headers.put(h.key(), h.value());
}
}
else {
headers.put(h.key(), new NonTrustedHeaderType(h.value(), requestedType));
}
}
else {
headers.put(h.key(), h.value());
}
}
});
}
protected boolean trusted(String requestedType) {
if (!this.trustedPackages.isEmpty()) {
int lastDot = requestedType.lastIndexOf(".");
if (lastDot < 0) {
return false;
}
String packageName = requestedType.substring(0, lastDot);
for (String trustedPackage : this.trustedPackages) {
if (packageName.equals(trustedPackage) || packageName.startsWith(trustedPackage + ".")) {
return true;
}
}
return false;
}
return true;
}
/**
* Represents a header that could not be decoded due to an untrusted type.
*/
public static class NonTrustedHeaderType {
private final byte[] headerValue;
private final String untrustedType;
NonTrustedHeaderType(byte[] headerValue, String untrustedType) { // NOSONAR
this.headerValue = headerValue; // NOSONAR
this.untrustedType = untrustedType;
}
public byte[] getHeaderValue() {
return this.headerValue;
}
public String getUntrustedType() {
return this.untrustedType;
}
@Override
public String toString() {
try {
return "NonTrustedHeaderType [headerValue=" + new String(this.headerValue, StandardCharsets.UTF_8)
+ ", untrustedType=" + this.untrustedType + "]";
}
catch (Exception e) {
return "NonTrustedHeaderType [headerValue=" + Arrays.toString(this.headerValue) + ", untrustedType="
+ this.untrustedType + "]";
}
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -78,25 +78,31 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
public Health call() {
try {
if (metadataConsumer == null) {
metadataConsumer = consumerFactory.createConsumer();
}
Set<String> downMessages = new HashSet<>();
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
downMessages.add(partitionInfo.toString());
synchronized(KafkaBinderHealthIndicator.this) {
if (metadataConsumer == null) {
metadataConsumer = consumerFactory.createConsumer();
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
else {
return Health.down()
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
synchronized (metadataConsumer) {
Set<String> downMessages = new HashSet<>();
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
downMessages.add(partitionInfo.toString());
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
else {
return Health.down()
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
}
}
}
catch (Exception e) {

View File

@@ -20,11 +20,17 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.TimeGauge;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
@@ -50,9 +56,13 @@ import org.springframework.util.ObjectUtils;
* @author Artem Bilan
* @author Oleg Zhurakousky
* @author Jon Schneider
* @author Thomas Cheyney
* @author Gary Russell
*/
public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
private static final int DEFAULT_TIMEOUT = 60;
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
@@ -65,6 +75,10 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
private final MeterRegistry meterRegistry;
private Consumer<?, ?> metadataConsumer;
private int timeout = DEFAULT_TIMEOUT;
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties binderConfigurationProperties,
ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
@@ -81,6 +95,10 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
this(binder, binderConfigurationProperties, null, null);
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public void bindTo(MeterRegistry registry) {
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
@@ -103,30 +121,56 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
}
private double calculateConsumerLagOnTopic(String topic, String group) {
long lag = 0;
try (Consumer<?, ?> metadataConsumer = createConsumerFactory(group).createConsumer()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new LinkedList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Long> future = exec.submit(() -> {
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
if (current != null) {
lag += endOffset.getValue() - current.offset();
long lag = 0;
try {
if (metadataConsumer == null) {
synchronized(KafkaBinderMetrics.this) {
if (metadataConsumer == null) {
metadataConsumer = createConsumerFactory(group).createConsumer();
}
}
}
else {
lag += endOffset.getValue();
synchronized (metadataConsumer) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new LinkedList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
if (current != null) {
lag += endOffset.getValue() - current.offset();
}
else {
lag += endOffset.getValue();
}
}
}
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
}
return lag;
});
try {
return future.get(this.timeout, TimeUnit.SECONDS);
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return 0L;
}
catch (ExecutionException | TimeoutException e) {
return 0L;
}
finally {
exec.shutdownNow();
}
return lag;
}
private ConsumerFactory<?, ?> createConsumerFactory(String group) {

View File

@@ -90,7 +90,6 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.ProducerListener;
@@ -268,10 +267,10 @@ public class KafkaMessageChannelBinder extends
if (!patterns.contains("!" + MessageHeaders.ID)) {
patterns.add(0, "!" + MessageHeaders.ID);
}
mapper = new DefaultKafkaHeaderMapper(patterns.toArray(new String[patterns.size()]));
mapper = new BinderHeaderMapper(patterns.toArray(new String[patterns.size()]));
}
else {
mapper = new DefaultKafkaHeaderMapper();
mapper = new BinderHeaderMapper();
}
}
handler.setHeaderMapper(mapper);
@@ -547,7 +546,7 @@ public class KafkaMessageChannelBinder extends
KafkaHeaderMapper.class);
}
if (mapper == null) {
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper() {
BinderHeaderMapper headerMapper = new BinderHeaderMapper() {
@Override
public void toHeaders(Headers source, Map<String, Object> headers) {

View File

@@ -18,6 +18,8 @@ package org.springframework.cloud.stream.binder.kafka.config;
import java.io.IOException;
import javax.security.auth.login.AppConfigurationEntry;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
@@ -56,7 +58,8 @@ import org.springframework.kafka.support.ProducerListener;
*/
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderHealthIndicatorConfiguration.class })
@Import({ KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
KafkaBinderHealthIndicatorConfiguration.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
@@ -81,7 +84,7 @@ public class KafkaBinderConfiguration {
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaTopicProvisioner provisioningProvider) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider);
@@ -97,8 +100,34 @@ public class KafkaBinderConfiguration {
}
@Bean
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
return new KafkaJaasLoginModuleInitializer();
@ConditionalOnMissingBean(KafkaJaasLoginModuleInitializer.class)
public KafkaJaasLoginModuleInitializer jaasInitializer(KafkaBinderConfigurationProperties configurationProperties) throws IOException {
KafkaJaasLoginModuleInitializer kafkaJaasLoginModuleInitializer = new KafkaJaasLoginModuleInitializer();
JaasLoginModuleConfiguration jaas = configurationProperties.getJaas();
if (jaas != null) {
kafkaJaasLoginModuleInitializer.setLoginModule(jaas.getLoginModule());
KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = null;
AppConfigurationEntry.LoginModuleControlFlag controlFlagValue = jaas.getControlFlagValue();
if (AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL.equals(controlFlagValue)) {
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.OPTIONAL;
}
else if (AppConfigurationEntry.LoginModuleControlFlag.REQUIRED.equals(controlFlagValue)) {
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED;
}
else if (AppConfigurationEntry.LoginModuleControlFlag.REQUISITE.equals(controlFlagValue)) {
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUISITE;
}
else if (AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT.equals(controlFlagValue)) {
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.SUFFICIENT;
}
if (controlFlag != null) {
kafkaJaasLoginModuleInitializer.setControlFlag(controlFlag);
}
kafkaJaasLoginModuleInitializer.setOptions(jaas.getOptions());
}
return kafkaJaasLoginModuleInitializer;
}
/**

View File

@@ -41,7 +41,9 @@ import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -74,11 +76,11 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
producerFactory);
assertTrue(producerConfigs.get("batch.size").equals(10));
assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("key.deserializer") == null);
assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("value.deserializer") == null);
assertTrue(producerConfigs.get("compression.type").equals("snappy"));
assertEquals(producerConfigs.get("key.serializer"), LongSerializer.class);
assertNull(producerConfigs.get("key.deserializer"));
assertEquals(producerConfigs.get("value.serializer"), LongSerializer.class);
assertNull(producerConfigs.get("value.deserializer"));
assertEquals("snappy", producerConfigs.get("compression.type"));
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9092");
bootstrapServers.add("10.98.09.196:9092");
@@ -95,12 +97,12 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
assertTrue(consumerConfigs.get("key.serializer") == null);
assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
assertTrue(consumerConfigs.get("value.serialized") == null);
assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig"));
assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest"));
assertEquals(consumerConfigs.get("key.deserializer"), LongDeserializer.class);
assertNull(consumerConfigs.get("key.serializer"));
assertEquals(consumerConfigs.get("value.deserializer"), LongDeserializer.class);
assertNull(consumerConfigs.get("value.serialized"));
assertEquals("groupIdFromBootConfig", consumerConfigs.get("group.id"));
assertEquals("earliest", consumerConfigs.get("auto.offset.reset"));
assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
}

View File

@@ -41,6 +41,7 @@ import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -75,11 +76,11 @@ public class KafkaBinderConfigurationPropertiesTest {
ReflectionUtils.makeAccessible(producerFactoryConfigField);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
producerFactory);
assertTrue(producerConfigs.get("batch.size").equals("12345"));
assertTrue(producerConfigs.get("linger.ms").equals("100"));
assertTrue(producerConfigs.get("key.serializer").equals(ByteArraySerializer.class));
assertTrue(producerConfigs.get("value.serializer").equals(ByteArraySerializer.class));
assertTrue(producerConfigs.get("compression.type").equals("gzip"));
assertEquals("12345", producerConfigs.get("batch.size"));;
assertEquals("100", producerConfigs.get("linger.ms"));
assertEquals(producerConfigs.get("key.serializer"), ByteArraySerializer.class);
assertEquals(producerConfigs.get("value.serializer"), ByteArraySerializer.class);
assertEquals("gzip", producerConfigs.get("compression.type"));
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9082");
assertTrue((((String) producerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
@@ -95,8 +96,8 @@ public class KafkaBinderConfigurationPropertiesTest {
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(ByteArrayDeserializer.class));
assertTrue(consumerConfigs.get("value.deserializer").equals(ByteArrayDeserializer.class));
assertEquals(consumerConfigs.get("key.deserializer"), ByteArrayDeserializer.class);
assertEquals(consumerConfigs.get("value.deserializer"), ByteArrayDeserializer.class);
assertTrue((((String) consumerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
}

View File

@@ -23,9 +23,11 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.TimeGauge;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -33,6 +35,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.TopicInformation;
@@ -43,6 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Henryk Konsek
* @author Thomas Cheyney
*/
public class KafkaBinderMetricsTest {
@@ -123,6 +127,37 @@ public class KafkaBinderMetricsTest {
assertThat(meterRegistry.getMeters()).isEmpty();
}
@Test
public void createsConsumerOnceWhenInvokedMultipleTimes() {
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
metrics.bindTo(meterRegistry);
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge();
gauge.value(TimeUnit.MILLISECONDS);
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
org.mockito.Mockito.verify(this.consumerFactory).createConsumer();
}
@Test
public void consumerCreationFailsFirstTime() {
org.mockito.BDDMockito.given(consumerFactory.createConsumer()).willThrow(KafkaException.class)
.willReturn(consumer);
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
metrics.bindTo(meterRegistry);
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge();
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(0);
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
org.mockito.Mockito.verify(this.consumerFactory, Mockito.times(2)).createConsumer();
}
private List<PartitionInfo> partitions(Node... nodes) {
List<PartitionInfo> partitions = new ArrayList<>();
for (int i = 0; i < nodes.length; i++) {

View File

@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
@@ -122,7 +121,6 @@ import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
@@ -317,10 +315,10 @@ public class KafkaBinderTests extends
moduleOutputChannel.send(message);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
moduleInputChannel.subscribe(message1 -> {
try {
inboundMessageRef.set((Message<byte[]>) message1);
inboundMessageRef.set((Message<String>) message1);
}
finally {
latch.countDown();
@@ -330,13 +328,13 @@ public class KafkaBinderTests extends
Assertions.assertThat(inboundMessageRef.get()).isNotNull();
Assertions.assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
Assertions.assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo");
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")).isInstanceOf(MimeType.class);
MimeType actual = (MimeType) inboundMessageRef.get().getHeaders().get("foo");
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")).isInstanceOf(String.class);
String actual = (String) inboundMessageRef.get().getHeaders().get("foo");
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN.toString());
producerBinding.unbind();
consumerBinding.unbind();
}
@@ -368,10 +366,10 @@ public class KafkaBinderTests extends
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build();
moduleOutputChannel.send(message);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
moduleInputChannel.subscribe(message1 -> {
try {
inboundMessageRef.set((Message<byte[]>) message1);
inboundMessageRef.set((Message<String>) message1);
}
finally {
latch.countDown();
@@ -380,7 +378,7 @@ public class KafkaBinderTests extends
Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");
assertThat(inboundMessageRef.get()).isNotNull();
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo");
assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
producerBinding.unbind();
@@ -426,6 +424,13 @@ public class KafkaBinderTests extends
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
.isEqualTo(MimeTypeUtils.APPLICATION_OCTET_STREAM);
Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = ((KafkaTestBinder)binder).getCoreBinder().getTopicsInUse();
assertThat(topicsInUse.keySet()).contains("foo.bar");
KafkaMessageChannelBinder.TopicInformation topic = topicsInUse.get("foo.bar");
assertThat(topic.isConsumerTopic()).isTrue();
assertThat(topic.getConsumerGroup()).isEqualTo("testSendAndReceive");
producerBinding.unbind();
consumerBinding.unbind();
}
@@ -2330,10 +2335,10 @@ public class KafkaBinderTests extends
binderBindUnbindLatency();
moduleOutputChannel.send(message);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
moduleInputChannel.subscribe(message1 -> {
try {
inboundMessageRef.set((Message<byte[]>) message1);
inboundMessageRef.set((Message<String>) message1);
}
finally {
latch.countDown();
@@ -2342,7 +2347,7 @@ public class KafkaBinderTests extends
Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");
assertThat(inboundMessageRef.get()).isNotNull();
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("test");
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test");
assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType", MimeTypeUtils.TEXT_PLAIN);
}
finally {

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka.bootstrap;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
@@ -35,7 +36,7 @@ public class KafkaBinderBootstrapTest {
@Test
public void testKafkaBinderConfiguration() throws Exception {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
.web(false)
.web(WebApplicationType.NONE)
.run("--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
applicationContext.close();