Compare commits
26 Commits
v3.0.0.M2
...
v2.0.2.REL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5fb0b4329b | ||
|
|
87e1b35d55 | ||
|
|
999740597a | ||
|
|
475273f5db | ||
|
|
8591cc59a5 | ||
|
|
bea31ce135 | ||
|
|
ec503d4025 | ||
|
|
8d94cd2b43 | ||
|
|
7adbc06b5c | ||
|
|
d67c98334f | ||
|
|
3a4f047e9c | ||
|
|
725d2a0de2 | ||
|
|
c7dc56e7d2 | ||
|
|
5c594816bd | ||
|
|
c941e2d735 | ||
|
|
8a1c2c504d | ||
|
|
dd48bf1540 | ||
|
|
3450b4b360 | ||
|
|
78a8baf81f | ||
|
|
1ea69a10a4 | ||
|
|
8f61919069 | ||
|
|
369c46ce77 | ||
|
|
64431426aa | ||
|
|
f77dc50de9 | ||
|
|
d141ad3647 | ||
|
|
75dd5f202a |
10
pom.xml
10
pom.xml
@@ -2,20 +2,20 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RELEASE</version>
|
||||
<version>2.0.2.RELEASE</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>2.0.0.RELEASE</version>
|
||||
<version>2.0.4.RELEASE</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
|
||||
<spring-kafka.version>2.1.10.RELEASE</spring-kafka.version>
|
||||
<spring-integration-kafka.version>3.0.3.RELEASE</spring-integration-kafka.version>
|
||||
<kafka.version>1.0.1</kafka.version>
|
||||
<spring-cloud-stream.version>2.0.0.RELEASE</spring-cloud-stream.version>
|
||||
<kafka.version>1.0.2</kafka.version>
|
||||
<spring-cloud-stream.version>2.0.2.RELEASE</spring-cloud-stream.version>
|
||||
</properties>
|
||||
<modules>
|
||||
<module>spring-cloud-stream-binder-kafka</module>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RELEASE</version>
|
||||
<version>2.0.2.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RELEASE</version>
|
||||
<version>2.0.2.RELEASE</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RELEASE</version>
|
||||
<version>2.0.2.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>
|
||||
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 119 KiB |
@@ -81,7 +81,7 @@ For common configuration options and properties pertaining to binder, refer to t
|
||||
|
||||
=== Kafka Streams Properties
|
||||
|
||||
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.binder.`
|
||||
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.streams.binder.`
|
||||
literal.
|
||||
|
||||
configuration::
|
||||
@@ -595,4 +595,18 @@ Once you gain access to this bean, then you can query for the particular state-s
|
||||
----
|
||||
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
|
||||
queryableStoreRegistry.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
|
||||
----
|
||||
|
||||
== Accessing the underlying KafkaStreams object
|
||||
|
||||
`StreamBuilderFactoryBean` from spring-kafka that is responsible for constructing the `KafkaStreams` object can be accessed programmatically.
|
||||
Each `StreamBuilderFactoryBean` is registered as `stream-builder` and appended with the `StreamListener` method name.
|
||||
If your `StreamListener` method is named as `process` for example, the stream builder bean is named as `stream-builder-process`.
|
||||
Since this is a factory bean, it should be accessed by prepending an ampersand (`&`) when accessing it programmatically.
|
||||
Following is an example and it assumes the `StreamListener` method is named as `process`
|
||||
|
||||
[source]
|
||||
----
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
||||
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
----
|
||||
@@ -454,7 +454,7 @@ public class Application {
|
||||
== Error Channels
|
||||
|
||||
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
|
||||
See <<binder-error-channels>> for more information.
|
||||
See <<spring-cloud-stream-overview-error-handling>> for more information.
|
||||
|
||||
The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureException` with properties:
|
||||
|
||||
@@ -469,6 +469,6 @@ You can consume these exceptions with your own Spring Integration flow.
|
||||
|
||||
Kafka binder module exposes the following metrics:
|
||||
|
||||
`spring.cloud.stream.binder.kafka.someGroup.someTopic.lag`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
|
||||
For example, if the value of the metric `spring.cloud.stream.binder.kafka.myGroup.myTopic.lag` is `1000`, the consumer group named `myGroup` has `1000` messages waiting to be consumed from the topic calle `myTopic`.
|
||||
`spring.cloud.stream.binder.kafka.offset`: This metric indicates how many messages have not been yet consumed from a given binder's topic by a given consumer group.
|
||||
The metrics provided are based on the Mircometer metrics library. The metric contains the consumer group information, topic and the actual lag in committed offset from the latest offset on the topic.
|
||||
This metric is particularly useful for providing auto-scaling feedback to a PaaS platform.
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RELEASE</version>
|
||||
<version>2.0.2.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
@@ -62,5 +62,10 @@
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-autoconfigure-processor</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
</project>
|
||||
@@ -23,10 +23,12 @@ import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProv
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
@Configuration
|
||||
public class KTableBinderConfiguration {
|
||||
|
||||
@Autowired
|
||||
|
||||
@@ -73,7 +73,7 @@ class KafkaStreamsMessageConversionDelegate {
|
||||
String contentType = this.kstreamBindingInformationCatalogue.getContentType(outboundBindTarget);
|
||||
MessageConverter messageConverter = compositeMessageConverterFactory.getMessageConverterForAllRegistered();
|
||||
|
||||
return outboundBindTarget.map((k, v) -> {
|
||||
return outboundBindTarget.mapValues((v) -> {
|
||||
Message<?> message = v instanceof Message<?> ? (Message<?>) v :
|
||||
MessageBuilder.withPayload(v).build();
|
||||
Map<String, Object> headers = new HashMap<>(message.getHeaders());
|
||||
@@ -81,9 +81,9 @@ class KafkaStreamsMessageConversionDelegate {
|
||||
headers.put(MessageHeaders.CONTENT_TYPE, contentType);
|
||||
}
|
||||
MessageHeaders messageHeaders = new MessageHeaders(headers);
|
||||
return new KeyValue<>(k,
|
||||
return
|
||||
messageConverter.toMessage(message.getPayload(),
|
||||
messageHeaders).getPayload());
|
||||
messageHeaders).getPayload();
|
||||
});
|
||||
}
|
||||
|
||||
@@ -137,10 +137,10 @@ class KafkaStreamsMessageConversionDelegate {
|
||||
processErrorFromDeserialization(bindingTarget, branch[1]);
|
||||
|
||||
//first branch above is the branch where the messages are converted, let it go through further processing.
|
||||
return branch[0].map((o, o2) -> {
|
||||
KeyValue<Object, Object> objectObjectKeyValue = keyValueThreadLocal.get();
|
||||
return branch[0].mapValues((o2) -> {
|
||||
Object objectValue = keyValueThreadLocal.get().value;
|
||||
keyValueThreadLocal.remove();
|
||||
return objectObjectKeyValue;
|
||||
return objectValue;
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -20,14 +20,12 @@ import java.lang.reflect.Method;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.common.serialization.Serde;
|
||||
import org.apache.kafka.common.utils.Bytes;
|
||||
import org.apache.kafka.streams.Consumed;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.StreamsBuilder;
|
||||
import org.apache.kafka.streams.StreamsConfig;
|
||||
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
|
||||
@@ -300,18 +298,18 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
else {
|
||||
LOG.info("Native decoding is disabled for " + inboundName + ". Inbound message conversion done by Spring Cloud Stream.");
|
||||
}
|
||||
stream = stream.map((key, value) -> {
|
||||
KeyValue<Object, Object> keyValue;
|
||||
|
||||
stream = stream.mapValues(value -> {
|
||||
Object returnValue;
|
||||
String contentType = bindingProperties.getContentType();
|
||||
if (!StringUtils.isEmpty(contentType) && !nativeDecoding) {
|
||||
Message<?> message = MessageBuilder.withPayload(value)
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
|
||||
keyValue = new KeyValue<>(key, message);
|
||||
returnValue = message;
|
||||
} else {
|
||||
returnValue = value;
|
||||
}
|
||||
else {
|
||||
keyValue = new KeyValue<>(key, value);
|
||||
}
|
||||
return keyValue;
|
||||
return returnValue;
|
||||
});
|
||||
return stream;
|
||||
}
|
||||
@@ -332,12 +330,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
|
||||
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean();
|
||||
streamsBuilder.setAutoStartup(false);
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
BeanDefinition streamsBuilderBeanDefinition =
|
||||
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(), () -> streamsBuilder)
|
||||
.getRawBeanDefinition();
|
||||
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + uuid, streamsBuilderBeanDefinition);
|
||||
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + uuid, StreamsBuilderFactoryBean.class);
|
||||
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + method.getName(), streamsBuilderBeanDefinition);
|
||||
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
|
||||
String group = bindingProperties.getGroup();
|
||||
if (!StringUtils.hasText(group)) {
|
||||
group = binderConfigurationProperties.getApplicationId();
|
||||
@@ -367,7 +364,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
|
||||
BeanDefinition streamsConfigBeanDefinition =
|
||||
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsConfig>) streamsConfig.getClass(), () -> streamsConfig)
|
||||
.getRawBeanDefinition();
|
||||
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("streamsConfig-" + uuid, streamsConfigBeanDefinition);
|
||||
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("streamsConfig-" + method.getName(), streamsConfigBeanDefinition);
|
||||
|
||||
streamsBuilder.setStreamsConfig(streamsConfig);
|
||||
methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderX);
|
||||
|
||||
@@ -20,6 +20,7 @@ import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.errors.InvalidStateStoreException;
|
||||
import org.apache.kafka.streams.state.QueryableStoreType;
|
||||
|
||||
/**
|
||||
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.state.QueryableStoreType;
|
||||
* the user applications.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Renwei Han
|
||||
* @since 2.0.0
|
||||
*/
|
||||
public class QueryableStoreRegistry {
|
||||
@@ -44,9 +46,14 @@ public class QueryableStoreRegistry {
|
||||
public <T> T getQueryableStoreType(String storeName, QueryableStoreType<T> storeType) {
|
||||
|
||||
for (KafkaStreams kafkaStream : kafkaStreams) {
|
||||
T store = kafkaStream.store(storeName, storeType);
|
||||
if (store != null) {
|
||||
return store;
|
||||
try{
|
||||
T store = kafkaStream.store(storeName, storeType);
|
||||
if (store != null) {
|
||||
return store;
|
||||
}
|
||||
}
|
||||
catch (InvalidStateStoreException ignored) {
|
||||
//pass through
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
||||
@@ -106,10 +106,10 @@ public class KafkaStreamsBinderPojoInputAndPrimitiveTypeOutputTests {
|
||||
template.sendDefault("{\"id\":\"123\"}");
|
||||
ConsumerRecord<Integer, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts-id");
|
||||
|
||||
assertThat(cr.key().equals(123));
|
||||
assertThat(cr.key()).isEqualTo(123);
|
||||
ObjectMapper om = new ObjectMapper();
|
||||
Long aLong = om.readValue(cr.value(), Long.class);
|
||||
assertThat(aLong.equals(1L));
|
||||
assertThat(aLong).isEqualTo(1L);
|
||||
}
|
||||
|
||||
@EnableBinding(KafkaStreamsProcessor.class)
|
||||
|
||||
@@ -24,11 +24,14 @@ import org.apache.kafka.clients.consumer.Consumer;
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.kstream.Serialized;
|
||||
import org.apache.kafka.streams.kstream.TimeWindows;
|
||||
import org.apache.kafka.streams.state.QueryableStoreTypes;
|
||||
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
@@ -48,6 +51,7 @@ import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||
import org.springframework.kafka.core.KafkaTemplate;
|
||||
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.test.rule.KafkaEmbedded;
|
||||
import org.springframework.kafka.test.utils.KafkaTestUtils;
|
||||
import org.springframework.messaging.handler.annotation.SendTo;
|
||||
@@ -101,6 +105,12 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
|
||||
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
|
||||
try {
|
||||
receiveAndValidate(context);
|
||||
//Assertions on StreamBuilderFactoryBean
|
||||
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
|
||||
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
|
||||
ReadOnlyWindowStore<Object, Object> store = kafkaStreams.store("foo-WordCounts", QueryableStoreTypes.windowStore());
|
||||
assertThat(store).isNotNull();
|
||||
|
||||
} finally {
|
||||
context.close();
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
eclipse.preferences.version=1
|
||||
org.eclipse.jdt.ui.ignorelowercasenames=true
|
||||
org.eclipse.jdt.ui.importorder=java;javax;com;org;org.springframework;ch.qos;\#;
|
||||
org.eclipse.jdt.ui.importorder=java;javax;com;io.micrometer;org;org.springframework;ch.qos;\#;
|
||||
org.eclipse.jdt.ui.ondemandthreshold=99
|
||||
org.eclipse.jdt.ui.staticondemandthreshold=99
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>2.0.0.RELEASE</version>
|
||||
<version>2.0.2.RELEASE</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
@@ -37,11 +37,6 @@
|
||||
<artifactId>spring-boot-autoconfigure</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
|
||||
@@ -0,0 +1,344 @@
|
||||
/*
|
||||
* Copyright 2017-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
|
||||
import org.springframework.kafka.support.AbstractKafkaHeaderMapper;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* Default header mapper for Apache Kafka.
|
||||
* Most headers in {@link org.springframework.kafka.support.KafkaHeaders} are not mapped on outbound messages.
|
||||
* The exceptions are correlation and reply headers for request/reply
|
||||
* messaging.
|
||||
* Header types are added to a special header {@link #JSON_TYPES}.
|
||||
*
|
||||
* @author Gary Russell
|
||||
* @since 2.0.2
|
||||
*
|
||||
*/
|
||||
@Deprecated
|
||||
public class BinderHeaderMapper extends AbstractKafkaHeaderMapper {
|
||||
|
||||
private static final List<String> DEFAULT_TRUSTED_PACKAGES =
|
||||
Arrays.asList(
|
||||
"java.util",
|
||||
"java.lang"
|
||||
);
|
||||
|
||||
private static final List<String> DEFAULT_TO_STRING_CLASSES =
|
||||
Arrays.asList(
|
||||
"org.springframework.util.MimeType",
|
||||
"org.springframework.http.MediaType"
|
||||
);
|
||||
|
||||
/**
|
||||
* Header name for java types of other headers.
|
||||
*/
|
||||
public static final String JSON_TYPES = "spring_json_header_types";
|
||||
|
||||
private final ObjectMapper objectMapper;
|
||||
|
||||
private final Set<String> trustedPackages = new LinkedHashSet<>(DEFAULT_TRUSTED_PACKAGES);
|
||||
|
||||
private final Set<String> toStringClasses = new LinkedHashSet<>(DEFAULT_TO_STRING_CLASSES);
|
||||
|
||||
/**
|
||||
* Construct an instance with the default object mapper and default header patterns
|
||||
* for outbound headers; all inbound headers are mapped. The default pattern list is
|
||||
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
|
||||
* {@link KafkaHeaders} are never mapped as headers since they represent data in
|
||||
* consumer/producer records.
|
||||
* @see #BinderHeaderMapper(ObjectMapper)
|
||||
*/
|
||||
public BinderHeaderMapper() {
|
||||
this(new ObjectMapper());
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an instance with the provided object mapper and default header patterns
|
||||
* for outbound headers; all inbound headers are mapped. The patterns are applied in
|
||||
* order, stopping on the first match (positive or negative). Patterns are negated by
|
||||
* preceding them with "!". The default pattern list is
|
||||
* {@code "!id", "!timestamp" and "*"}. In addition, most of the headers in
|
||||
* {@link KafkaHeaders} are never mapped as headers since they represent data in
|
||||
* consumer/producer records.
|
||||
* @param objectMapper the object mapper.
|
||||
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
|
||||
*/
|
||||
public BinderHeaderMapper(ObjectMapper objectMapper) {
|
||||
this(objectMapper,
|
||||
"!" + MessageHeaders.ID,
|
||||
"!" + MessageHeaders.TIMESTAMP,
|
||||
"*");
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an instance with a default object mapper and the provided header patterns
|
||||
* for outbound headers; all inbound headers are mapped. The patterns are applied in
|
||||
* order, stopping on the first match (positive or negative). Patterns are negated by
|
||||
* preceding them with "!". The patterns will replace the default patterns; you
|
||||
* generally should not map the {@code "id" and "timestamp"} headers. Note:
|
||||
* most of the headers in {@link KafkaHeaders} are ever mapped as headers since they
|
||||
* represent data in consumer/producer records.
|
||||
* @param patterns the patterns.
|
||||
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
|
||||
*/
|
||||
public BinderHeaderMapper(String... patterns) {
|
||||
this(new ObjectMapper(), patterns);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an instance with the provided object mapper and the provided header
|
||||
* patterns for outbound headers; all inbound headers are mapped. The patterns are
|
||||
* applied in order, stopping on the first match (positive or negative). Patterns are
|
||||
* negated by preceding them with "!". The patterns will replace the default patterns;
|
||||
* you generally should not map the {@code "id" and "timestamp"} headers. Note: most
|
||||
* of the headers in {@link KafkaHeaders} are never mapped as headers since they
|
||||
* represent data in consumer/producer records.
|
||||
* @param objectMapper the object mapper.
|
||||
* @param patterns the patterns.
|
||||
* @see org.springframework.util.PatternMatchUtils#simpleMatch(String, String)
|
||||
*/
|
||||
public BinderHeaderMapper(ObjectMapper objectMapper, String... patterns) {
|
||||
super(patterns);
|
||||
Assert.notNull(objectMapper, "'objectMapper' must not be null");
|
||||
Assert.noNullElements(patterns, "'patterns' must not have null elements");
|
||||
this.objectMapper = objectMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the object mapper.
|
||||
* @return the mapper.
|
||||
*/
|
||||
protected ObjectMapper getObjectMapper() {
|
||||
return this.objectMapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide direct access to the trusted packages set for subclasses.
|
||||
* @return the trusted packages.
|
||||
* @since 2.2
|
||||
*/
|
||||
protected Set<String> getTrustedPackages() {
|
||||
return this.trustedPackages;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide direct access to the toString() classes by subclasses.
|
||||
* @return the toString() classes.
|
||||
* @since 2.2
|
||||
*/
|
||||
protected Set<String> getToStringClasses() {
|
||||
return this.toStringClasses;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add packages to the trusted packages list (default {@code java.util, java.lang}) used
|
||||
* when constructing objects from JSON.
|
||||
* If any of the supplied packages is {@code "*"}, all packages are trusted.
|
||||
* If a class for a non-trusted package is encountered, the header is returned to the
|
||||
* application with value of type {@link NonTrustedHeaderType}.
|
||||
* @param trustedPackages the packages to trust.
|
||||
*/
|
||||
public void addTrustedPackages(String... trustedPackages) {
|
||||
if (trustedPackages != null) {
|
||||
for (String whiteList : trustedPackages) {
|
||||
if ("*".equals(whiteList)) {
|
||||
this.trustedPackages.clear();
|
||||
break;
|
||||
}
|
||||
else {
|
||||
this.trustedPackages.add(whiteList);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add class names that the outbound mapper should perform toString() operations on
|
||||
* before mapping.
|
||||
* @param classNames the class names.
|
||||
* @since 2.2
|
||||
*/
|
||||
public void addToStringClasses(String... classNames) {
|
||||
this.toStringClasses.addAll(Arrays.asList(classNames));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fromHeaders(MessageHeaders headers, Headers target) {
|
||||
final Map<String, String> jsonHeaders = new HashMap<>();
|
||||
headers.forEach((k, v) -> {
|
||||
if (matches(k, v)) {
|
||||
if (v instanceof byte[]) {
|
||||
target.add(new RecordHeader(k, (byte[]) v));
|
||||
}
|
||||
else {
|
||||
try {
|
||||
Object value = v;
|
||||
String className = v.getClass().getName();
|
||||
if (this.toStringClasses.contains(className)) {
|
||||
value = v.toString();
|
||||
className = "java.lang.String";
|
||||
}
|
||||
target.add(new RecordHeader(k, getObjectMapper().writeValueAsBytes(value)));
|
||||
jsonHeaders.put(k, className);
|
||||
}
|
||||
catch (Exception e) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Could not map " + k + " with type " + v.getClass().getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
if (jsonHeaders.size() > 0) {
|
||||
try {
|
||||
target.add(new RecordHeader(JSON_TYPES, getObjectMapper().writeValueAsBytes(jsonHeaders)));
|
||||
}
|
||||
catch (IllegalStateException | JsonProcessingException e) {
|
||||
logger.error("Could not add json types header", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void toHeaders(Headers source, final Map<String, Object> headers) {
|
||||
Map<String, String> types = null;
|
||||
Iterator<Header> iterator = source.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Header next = iterator.next();
|
||||
if (next.key().equals(JSON_TYPES)) {
|
||||
try {
|
||||
types = getObjectMapper().readValue(next.value(), HashMap.class);
|
||||
}
|
||||
catch (IOException e) {
|
||||
logger.error("Could not decode json types: " + new String(next.value()), e);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
final Map<String, String> jsonTypes = types;
|
||||
source.forEach(h -> {
|
||||
if (!(h.key().equals(JSON_TYPES))) {
|
||||
if (jsonTypes != null && jsonTypes.containsKey(h.key())) {
|
||||
Class<?> type = Object.class;
|
||||
String requestedType = jsonTypes.get(h.key());
|
||||
boolean trusted = false;
|
||||
try {
|
||||
trusted = trusted(requestedType);
|
||||
if (trusted) {
|
||||
type = ClassUtils.forName(requestedType, null);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("Could not load class for header: " + h.key(), e);
|
||||
}
|
||||
if (trusted) {
|
||||
try {
|
||||
headers.put(h.key(), getObjectMapper().readValue(h.value(), type));
|
||||
}
|
||||
catch (IOException e) {
|
||||
logger.error("Could not decode json type: " + new String(h.value()) + " for key: " + h.key(),
|
||||
e);
|
||||
headers.put(h.key(), h.value());
|
||||
}
|
||||
}
|
||||
else {
|
||||
headers.put(h.key(), new NonTrustedHeaderType(h.value(), requestedType));
|
||||
}
|
||||
}
|
||||
else {
|
||||
headers.put(h.key(), h.value());
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
protected boolean trusted(String requestedType) {
|
||||
if (!this.trustedPackages.isEmpty()) {
|
||||
int lastDot = requestedType.lastIndexOf(".");
|
||||
if (lastDot < 0) {
|
||||
return false;
|
||||
}
|
||||
String packageName = requestedType.substring(0, lastDot);
|
||||
for (String trustedPackage : this.trustedPackages) {
|
||||
if (packageName.equals(trustedPackage) || packageName.startsWith(trustedPackage + ".")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a header that could not be decoded due to an untrusted type.
|
||||
*/
|
||||
public static class NonTrustedHeaderType {
|
||||
|
||||
private final byte[] headerValue;
|
||||
|
||||
private final String untrustedType;
|
||||
|
||||
NonTrustedHeaderType(byte[] headerValue, String untrustedType) { // NOSONAR
|
||||
this.headerValue = headerValue; // NOSONAR
|
||||
this.untrustedType = untrustedType;
|
||||
}
|
||||
|
||||
public byte[] getHeaderValue() {
|
||||
return this.headerValue;
|
||||
}
|
||||
|
||||
public String getUntrustedType() {
|
||||
return this.untrustedType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
try {
|
||||
return "NonTrustedHeaderType [headerValue=" + new String(this.headerValue, StandardCharsets.UTF_8)
|
||||
+ ", untrustedType=" + this.untrustedType + "]";
|
||||
}
|
||||
catch (Exception e) {
|
||||
return "NonTrustedHeaderType [headerValue=" + Arrays.toString(this.headerValue) + ", untrustedType="
|
||||
+ this.untrustedType + "]";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2017 the original author or authors.
|
||||
* Copyright 2016-2018 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -78,25 +78,31 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
|
||||
public Health call() {
|
||||
try {
|
||||
if (metadataConsumer == null) {
|
||||
metadataConsumer = consumerFactory.createConsumer();
|
||||
}
|
||||
Set<String> downMessages = new HashSet<>();
|
||||
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
|
||||
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
|
||||
downMessages.add(partitionInfo.toString());
|
||||
synchronized(KafkaBinderHealthIndicator.this) {
|
||||
if (metadataConsumer == null) {
|
||||
metadataConsumer = consumerFactory.createConsumer();
|
||||
}
|
||||
}
|
||||
}
|
||||
if (downMessages.isEmpty()) {
|
||||
return Health.up().build();
|
||||
}
|
||||
else {
|
||||
return Health.down()
|
||||
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
|
||||
.build();
|
||||
synchronized (metadataConsumer) {
|
||||
Set<String> downMessages = new HashSet<>();
|
||||
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
|
||||
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
|
||||
downMessages.add(partitionInfo.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (downMessages.isEmpty()) {
|
||||
return Health.up().build();
|
||||
}
|
||||
else {
|
||||
return Health.down()
|
||||
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
||||
@@ -20,11 +20,17 @@ import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.TimeGauge;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.kafka.clients.consumer.Consumer;
|
||||
@@ -50,9 +56,13 @@ import org.springframework.util.ObjectUtils;
|
||||
* @author Artem Bilan
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Jon Schneider
|
||||
* @author Thomas Cheyney
|
||||
* @author Gary Russell
|
||||
*/
|
||||
public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
|
||||
|
||||
private static final int DEFAULT_TIMEOUT = 60;
|
||||
|
||||
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
|
||||
|
||||
static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
|
||||
@@ -65,6 +75,10 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
|
||||
|
||||
private final MeterRegistry meterRegistry;
|
||||
|
||||
private Consumer<?, ?> metadataConsumer;
|
||||
|
||||
private int timeout = DEFAULT_TIMEOUT;
|
||||
|
||||
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
|
||||
KafkaBinderConfigurationProperties binderConfigurationProperties,
|
||||
ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
|
||||
@@ -81,6 +95,10 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
|
||||
this(binder, binderConfigurationProperties, null, null);
|
||||
}
|
||||
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void bindTo(MeterRegistry registry) {
|
||||
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
|
||||
@@ -103,30 +121,56 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
|
||||
}
|
||||
|
||||
private double calculateConsumerLagOnTopic(String topic, String group) {
|
||||
long lag = 0;
|
||||
try (Consumer<?, ?> metadataConsumer = createConsumerFactory(group).createConsumer()) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
List<TopicPartition> topicPartitions = new LinkedList<>();
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
|
||||
}
|
||||
ExecutorService exec = Executors.newSingleThreadExecutor();
|
||||
Future<Long> future = exec.submit(() -> {
|
||||
|
||||
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
|
||||
|
||||
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
|
||||
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
|
||||
if (current != null) {
|
||||
lag += endOffset.getValue() - current.offset();
|
||||
long lag = 0;
|
||||
try {
|
||||
if (metadataConsumer == null) {
|
||||
synchronized(KafkaBinderMetrics.this) {
|
||||
if (metadataConsumer == null) {
|
||||
metadataConsumer = createConsumerFactory(group).createConsumer();
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
lag += endOffset.getValue();
|
||||
synchronized (metadataConsumer) {
|
||||
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
|
||||
List<TopicPartition> topicPartitions = new LinkedList<>();
|
||||
for (PartitionInfo partitionInfo : partitionInfos) {
|
||||
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
|
||||
}
|
||||
|
||||
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
|
||||
|
||||
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
|
||||
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
|
||||
if (current != null) {
|
||||
lag += endOffset.getValue() - current.offset();
|
||||
}
|
||||
else {
|
||||
lag += endOffset.getValue();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.debug("Cannot generate metric for topic: " + topic, e);
|
||||
}
|
||||
return lag;
|
||||
});
|
||||
try {
|
||||
return future.get(this.timeout, TimeUnit.SECONDS);
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOG.debug("Cannot generate metric for topic: " + topic, e);
|
||||
catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return 0L;
|
||||
}
|
||||
catch (ExecutionException | TimeoutException e) {
|
||||
return 0L;
|
||||
}
|
||||
finally {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
return lag;
|
||||
}
|
||||
|
||||
private ConsumerFactory<?, ?> createConsumerFactory(String group) {
|
||||
|
||||
@@ -90,7 +90,6 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
|
||||
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
|
||||
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
|
||||
import org.springframework.kafka.listener.config.ContainerProperties;
|
||||
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaderMapper;
|
||||
import org.springframework.kafka.support.KafkaHeaders;
|
||||
import org.springframework.kafka.support.ProducerListener;
|
||||
@@ -268,10 +267,10 @@ public class KafkaMessageChannelBinder extends
|
||||
if (!patterns.contains("!" + MessageHeaders.ID)) {
|
||||
patterns.add(0, "!" + MessageHeaders.ID);
|
||||
}
|
||||
mapper = new DefaultKafkaHeaderMapper(patterns.toArray(new String[patterns.size()]));
|
||||
mapper = new BinderHeaderMapper(patterns.toArray(new String[patterns.size()]));
|
||||
}
|
||||
else {
|
||||
mapper = new DefaultKafkaHeaderMapper();
|
||||
mapper = new BinderHeaderMapper();
|
||||
}
|
||||
}
|
||||
handler.setHeaderMapper(mapper);
|
||||
@@ -547,7 +546,7 @@ public class KafkaMessageChannelBinder extends
|
||||
KafkaHeaderMapper.class);
|
||||
}
|
||||
if (mapper == null) {
|
||||
DefaultKafkaHeaderMapper headerMapper = new DefaultKafkaHeaderMapper() {
|
||||
BinderHeaderMapper headerMapper = new BinderHeaderMapper() {
|
||||
|
||||
@Override
|
||||
public void toHeaders(Headers source, Map<String, Object> headers) {
|
||||
|
||||
@@ -18,6 +18,8 @@ package org.springframework.cloud.stream.binder.kafka.config;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.binder.MeterBinder;
|
||||
|
||||
@@ -56,7 +58,8 @@ import org.springframework.kafka.support.ProducerListener;
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(Binder.class)
|
||||
@Import({KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class, KafkaBinderHealthIndicatorConfiguration.class })
|
||||
@Import({ KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
|
||||
KafkaBinderHealthIndicatorConfiguration.class })
|
||||
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
|
||||
public class KafkaBinderConfiguration {
|
||||
|
||||
@@ -81,7 +84,7 @@ public class KafkaBinderConfiguration {
|
||||
|
||||
@Bean
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
KafkaTopicProvisioner provisioningProvider) {
|
||||
|
||||
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(
|
||||
configurationProperties, provisioningProvider);
|
||||
@@ -97,8 +100,34 @@ public class KafkaBinderConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
|
||||
return new KafkaJaasLoginModuleInitializer();
|
||||
@ConditionalOnMissingBean(KafkaJaasLoginModuleInitializer.class)
|
||||
public KafkaJaasLoginModuleInitializer jaasInitializer(KafkaBinderConfigurationProperties configurationProperties) throws IOException {
|
||||
KafkaJaasLoginModuleInitializer kafkaJaasLoginModuleInitializer = new KafkaJaasLoginModuleInitializer();
|
||||
JaasLoginModuleConfiguration jaas = configurationProperties.getJaas();
|
||||
if (jaas != null) {
|
||||
kafkaJaasLoginModuleInitializer.setLoginModule(jaas.getLoginModule());
|
||||
|
||||
KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = null;
|
||||
AppConfigurationEntry.LoginModuleControlFlag controlFlagValue = jaas.getControlFlagValue();
|
||||
|
||||
if (AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL.equals(controlFlagValue)) {
|
||||
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.OPTIONAL;
|
||||
}
|
||||
else if (AppConfigurationEntry.LoginModuleControlFlag.REQUIRED.equals(controlFlagValue)) {
|
||||
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED;
|
||||
}
|
||||
else if (AppConfigurationEntry.LoginModuleControlFlag.REQUISITE.equals(controlFlagValue)) {
|
||||
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.REQUISITE;
|
||||
}
|
||||
else if (AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT.equals(controlFlagValue)) {
|
||||
controlFlag = KafkaJaasLoginModuleInitializer.ControlFlag.SUFFICIENT;
|
||||
}
|
||||
if (controlFlag != null) {
|
||||
kafkaJaasLoginModuleInitializer.setControlFlag(controlFlag);
|
||||
}
|
||||
kafkaJaasLoginModuleInitializer.setOptions(jaas.getOptions());
|
||||
}
|
||||
return kafkaJaasLoginModuleInitializer;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -41,7 +41,9 @@ import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
@@ -74,11 +76,11 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
|
||||
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
|
||||
producerFactory);
|
||||
assertTrue(producerConfigs.get("batch.size").equals(10));
|
||||
assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
|
||||
assertTrue(producerConfigs.get("key.deserializer") == null);
|
||||
assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
|
||||
assertTrue(producerConfigs.get("value.deserializer") == null);
|
||||
assertTrue(producerConfigs.get("compression.type").equals("snappy"));
|
||||
assertEquals(producerConfigs.get("key.serializer"), LongSerializer.class);
|
||||
assertNull(producerConfigs.get("key.deserializer"));
|
||||
assertEquals(producerConfigs.get("value.serializer"), LongSerializer.class);
|
||||
assertNull(producerConfigs.get("value.deserializer"));
|
||||
assertEquals("snappy", producerConfigs.get("compression.type"));
|
||||
List<String> bootstrapServers = new ArrayList<>();
|
||||
bootstrapServers.add("10.98.09.199:9092");
|
||||
bootstrapServers.add("10.98.09.196:9092");
|
||||
@@ -95,12 +97,12 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
|
||||
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
|
||||
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
|
||||
consumerFactory);
|
||||
assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
|
||||
assertTrue(consumerConfigs.get("key.serializer") == null);
|
||||
assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
|
||||
assertTrue(consumerConfigs.get("value.serialized") == null);
|
||||
assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig"));
|
||||
assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest"));
|
||||
assertEquals(consumerConfigs.get("key.deserializer"), LongDeserializer.class);
|
||||
assertNull(consumerConfigs.get("key.serializer"));
|
||||
assertEquals(consumerConfigs.get("value.deserializer"), LongDeserializer.class);
|
||||
assertNull(consumerConfigs.get("value.serialized"));
|
||||
assertEquals("groupIdFromBootConfig", consumerConfigs.get("group.id"));
|
||||
assertEquals("earliest", consumerConfigs.get("auto.offset.reset"));
|
||||
assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,7 @@ import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
@@ -75,11 +76,11 @@ public class KafkaBinderConfigurationPropertiesTest {
|
||||
ReflectionUtils.makeAccessible(producerFactoryConfigField);
|
||||
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
|
||||
producerFactory);
|
||||
assertTrue(producerConfigs.get("batch.size").equals("12345"));
|
||||
assertTrue(producerConfigs.get("linger.ms").equals("100"));
|
||||
assertTrue(producerConfigs.get("key.serializer").equals(ByteArraySerializer.class));
|
||||
assertTrue(producerConfigs.get("value.serializer").equals(ByteArraySerializer.class));
|
||||
assertTrue(producerConfigs.get("compression.type").equals("gzip"));
|
||||
assertEquals("12345", producerConfigs.get("batch.size"));;
|
||||
assertEquals("100", producerConfigs.get("linger.ms"));
|
||||
assertEquals(producerConfigs.get("key.serializer"), ByteArraySerializer.class);
|
||||
assertEquals(producerConfigs.get("value.serializer"), ByteArraySerializer.class);
|
||||
assertEquals("gzip", producerConfigs.get("compression.type"));
|
||||
List<String> bootstrapServers = new ArrayList<>();
|
||||
bootstrapServers.add("10.98.09.199:9082");
|
||||
assertTrue((((String) producerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
|
||||
@@ -95,8 +96,8 @@ public class KafkaBinderConfigurationPropertiesTest {
|
||||
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
|
||||
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
|
||||
consumerFactory);
|
||||
assertTrue(consumerConfigs.get("key.deserializer").equals(ByteArrayDeserializer.class));
|
||||
assertTrue(consumerConfigs.get("value.deserializer").equals(ByteArrayDeserializer.class));
|
||||
assertEquals(consumerConfigs.get("key.deserializer"), ByteArrayDeserializer.class);
|
||||
assertEquals(consumerConfigs.get("value.deserializer"), ByteArrayDeserializer.class);
|
||||
assertTrue((((String) consumerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
|
||||
}
|
||||
|
||||
|
||||
@@ -23,9 +23,11 @@ import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.TimeGauge;
|
||||
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
|
||||
import org.apache.kafka.clients.consumer.KafkaConsumer;
|
||||
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
|
||||
import org.apache.kafka.common.KafkaException;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
@@ -33,6 +35,7 @@ import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.TopicInformation;
|
||||
@@ -43,6 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author Henryk Konsek
|
||||
* @author Thomas Cheyney
|
||||
*/
|
||||
public class KafkaBinderMetricsTest {
|
||||
|
||||
@@ -123,6 +127,37 @@ public class KafkaBinderMetricsTest {
|
||||
assertThat(meterRegistry.getMeters()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createsConsumerOnceWhenInvokedMultipleTimes() {
|
||||
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
|
||||
metrics.bindTo(meterRegistry);
|
||||
|
||||
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge();
|
||||
gauge.value(TimeUnit.MILLISECONDS);
|
||||
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
|
||||
|
||||
org.mockito.Mockito.verify(this.consumerFactory).createConsumer();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void consumerCreationFailsFirstTime() {
|
||||
org.mockito.BDDMockito.given(consumerFactory.createConsumer()).willThrow(KafkaException.class)
|
||||
.willReturn(consumer);
|
||||
|
||||
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
|
||||
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
|
||||
|
||||
metrics.bindTo(meterRegistry);
|
||||
|
||||
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge();
|
||||
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(0);
|
||||
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
|
||||
|
||||
org.mockito.Mockito.verify(this.consumerFactory, Mockito.times(2)).createConsumer();
|
||||
}
|
||||
|
||||
private List<PartitionInfo> partitions(Node... nodes) {
|
||||
List<PartitionInfo> partitions = new ArrayList<>();
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
|
||||
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.apache.kafka.clients.admin.AdminClient;
|
||||
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||
import org.apache.kafka.clients.admin.CreateTopicsResult;
|
||||
@@ -122,7 +121,6 @@ import org.springframework.messaging.support.ErrorMessage;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeType;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.SettableListenableFuture;
|
||||
@@ -317,10 +315,10 @@ public class KafkaBinderTests extends
|
||||
|
||||
moduleOutputChannel.send(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
|
||||
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
|
||||
moduleInputChannel.subscribe(message1 -> {
|
||||
try {
|
||||
inboundMessageRef.set((Message<byte[]>) message1);
|
||||
inboundMessageRef.set((Message<String>) message1);
|
||||
}
|
||||
finally {
|
||||
latch.countDown();
|
||||
@@ -330,13 +328,13 @@ public class KafkaBinderTests extends
|
||||
|
||||
|
||||
Assertions.assertThat(inboundMessageRef.get()).isNotNull();
|
||||
Assertions.assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
|
||||
Assertions.assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo");
|
||||
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(BinderHeaders.BINDER_ORIGINAL_CONTENT_TYPE)).isNull();
|
||||
Assertions.assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
|
||||
Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")).isInstanceOf(MimeType.class);
|
||||
MimeType actual = (MimeType) inboundMessageRef.get().getHeaders().get("foo");
|
||||
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN);
|
||||
Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")).isInstanceOf(String.class);
|
||||
String actual = (String) inboundMessageRef.get().getHeaders().get("foo");
|
||||
Assertions.assertThat(actual).isEqualTo(MimeTypeUtils.TEXT_PLAIN.toString());
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
@@ -368,10 +366,10 @@ public class KafkaBinderTests extends
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build();
|
||||
moduleOutputChannel.send(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
|
||||
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
|
||||
moduleInputChannel.subscribe(message1 -> {
|
||||
try {
|
||||
inboundMessageRef.set((Message<byte[]>) message1);
|
||||
inboundMessageRef.set((Message<String>) message1);
|
||||
}
|
||||
finally {
|
||||
latch.countDown();
|
||||
@@ -380,7 +378,7 @@ public class KafkaBinderTests extends
|
||||
Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");
|
||||
|
||||
assertThat(inboundMessageRef.get()).isNotNull();
|
||||
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
|
||||
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("foo");
|
||||
assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
|
||||
producerBinding.unbind();
|
||||
@@ -426,6 +424,13 @@ public class KafkaBinderTests extends
|
||||
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("foo");
|
||||
assertThat(inboundMessageRef.get().getHeaders().get(MessageHeaders.CONTENT_TYPE))
|
||||
.isEqualTo(MimeTypeUtils.APPLICATION_OCTET_STREAM);
|
||||
|
||||
Map<String, KafkaMessageChannelBinder.TopicInformation> topicsInUse = ((KafkaTestBinder)binder).getCoreBinder().getTopicsInUse();
|
||||
assertThat(topicsInUse.keySet()).contains("foo.bar");
|
||||
KafkaMessageChannelBinder.TopicInformation topic = topicsInUse.get("foo.bar");
|
||||
assertThat(topic.isConsumerTopic()).isTrue();
|
||||
assertThat(topic.getConsumerGroup()).isEqualTo("testSendAndReceive");
|
||||
|
||||
producerBinding.unbind();
|
||||
consumerBinding.unbind();
|
||||
}
|
||||
@@ -2330,10 +2335,10 @@ public class KafkaBinderTests extends
|
||||
binderBindUnbindLatency();
|
||||
moduleOutputChannel.send(message);
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
AtomicReference<Message<byte[]>> inboundMessageRef = new AtomicReference<>();
|
||||
AtomicReference<Message<String>> inboundMessageRef = new AtomicReference<>();
|
||||
moduleInputChannel.subscribe(message1 -> {
|
||||
try {
|
||||
inboundMessageRef.set((Message<byte[]>) message1);
|
||||
inboundMessageRef.set((Message<String>) message1);
|
||||
}
|
||||
finally {
|
||||
latch.countDown();
|
||||
@@ -2342,7 +2347,7 @@ public class KafkaBinderTests extends
|
||||
Assert.isTrue(latch.await(5, TimeUnit.SECONDS), "Failed to receive message");
|
||||
|
||||
assertThat(inboundMessageRef.get()).isNotNull();
|
||||
assertThat(new String(inboundMessageRef.get().getPayload(), StandardCharsets.UTF_8)).isEqualTo("test");
|
||||
assertThat(inboundMessageRef.get().getPayload()).isEqualTo("test");
|
||||
assertThat(inboundMessageRef.get().getHeaders()).containsEntry("contentType", MimeTypeUtils.TEXT_PLAIN);
|
||||
}
|
||||
finally {
|
||||
|
||||
@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka.bootstrap;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
@@ -35,7 +36,7 @@ public class KafkaBinderBootstrapTest {
|
||||
@Test
|
||||
public void testKafkaBinderConfiguration() throws Exception {
|
||||
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
|
||||
.web(false)
|
||||
.web(WebApplicationType.NONE)
|
||||
.run("--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(),
|
||||
"--spring.cloud.stream.kafka.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
|
||||
applicationContext.close();
|
||||
|
||||
Reference in New Issue
Block a user