Compare commits

...

17 Commits

Author SHA1 Message Date
Soby Chacko
d67c98334f 2.0.1.RELEASE 2018-07-11 17:36:18 -04:00
Soby Chacko
3a4f047e9c Update dependencies
spring-cloud-build to 2.0.2.RELEASE
spring-kafka to 2.1.7.RELEASE
2018-07-11 17:34:19 -04:00
Gary Russell
725d2a0de2 GH-404: Synchronize shared consumer
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/404

The fix for issue https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/231
added a shared consumer but the consumer is not thread safe. Add synchronization.

Also, a timeout was added to the `KafkaBinderHealthIndicator` but not to the
`KafkaBinderMetrics` which has a similar shared consumer; add a timeout there.
2018-07-11 16:15:18 -04:00
Soby Chacko
c7dc56e7d2 Provide programmatic access to KafkaStreams object
Providing access to the underlying StreamBuilderFactoryBean by making the bean name
deterministic. Eariler, the binder was using UUID to make the stream builder factory
bean names unique in the event of multiple StreamListeners. Switching to use the
method name instead to keep the StreamBuilder factory beans unique while providing
a deterministic way to giving it programmatic access.

Polishing docs

Fixes #396
2018-07-11 16:14:31 -04:00
Soby Chacko
5c594816bd Fix bad link in docs
Resolves #359
2018-07-11 16:13:39 -04:00
Soby Chacko
c941e2d735 Fix typo in kafka streams docs
Resolves #400
2018-07-11 16:13:25 -04:00
UltimaPhoenix
8a1c2c504d Fix unit test
Remove unnecessary semicolon

Replace deprecated method with the new one

Test refactoring
2018-07-11 16:13:06 -04:00
Artem Bilan
dd48bf1540 GH-381: Remove duplicated SCSt-binder-test dep
Fixes spring-cloud/spring-cloud-stream-binder-kafka#381
2018-07-11 16:12:15 -04:00
Thomas Cheyney
3450b4b360 Reuse Kafka consumer Metrics
Polishing
2018-07-11 16:11:47 -04:00
Soby Chacko
78a8baf81f Kafka Streams initializr image for docs 2018-07-11 16:11:09 -04:00
Soby Chacko
1ea69a10a4 Revert "GH-360: Improve Binder Producer/Consumer Config"
This reverts commit 64431426aa.
2018-07-11 16:09:55 -04:00
Soby Chacko
8f61919069 Revert "Allow Kafka Streams state store creation"
This reverts commit 369c46ce77.
2018-07-11 16:09:35 -04:00
Lei Chen
369c46ce77 Allow Kafka Streams state store creation
* Allow Kafka Streams state store creation when using process/transform method in DSL
 * Add unit test for state store
 * Address code review comments
 * Add author and javadocs
 * Integration test fixing for state store
 * Polishing
2018-07-05 10:17:35 -04:00
Gary Russell
64431426aa GH-360: Improve Binder Producer/Consumer Config
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/360

`producer-configuration` and `consumer-configuration` improperly appear in content-assist.

These are methods used by the binders to get merged configuration data (boot and binder).

Rename the methods and add `producerProperties` and `consumerProperties` to allow
configuration.
2018-04-27 12:46:49 -04:00
slamhan
f77dc50de9 QueryableStore retrieval stops at InvalidStateStoreException
If there are multiple streams, there is a code path that throws
a premature InvalidStateStoreException. Fixing that issue.

Fixes #366

Polishing.
2018-04-24 11:06:30 -04:00
Danish Garg
d141ad3647 Changed occurances of map calls on kafka streams to mapValues
Resolves #357
2018-04-11 15:37:09 -04:00
Oleg Zhurakousky
75dd5f202a Created new maintenance branch 2.0.1 2018-04-06 15:14:37 -04:00
21 changed files with 207 additions and 95 deletions

View File

@@ -2,20 +2,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.1.RELEASE</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.2.RELEASE</version>
<relativePath />
</parent>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.1.5.RELEASE</spring-kafka.version>
<spring-kafka.version>2.1.7.RELEASE</spring-kafka.version>
<spring-integration-kafka.version>3.0.3.RELEASE</spring-integration-kafka.version>
<kafka.version>1.0.1</kafka.version>
<spring-cloud-stream.version>2.0.0.RELEASE</spring-cloud-stream.version>
<spring-cloud-stream.version>2.0.1.RELEASE</spring-cloud-stream.version>
</properties>
<modules>
<module>spring-cloud-stream-binder-kafka</module>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.1.RELEASE</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-docs</artifactId>

Binary file not shown.

After

Width:  |  Height:  |  Size: 119 KiB

View File

@@ -81,7 +81,7 @@ For common configuration options and properties pertaining to binder, refer to t
=== Kafka Streams Properties
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.binder.`
The following properties are available at the binder level and must be prefixed with `spring.cloud.stream.kafka.streams.binder.`
literal.
configuration::
@@ -595,4 +595,18 @@ Once you gain access to this bean, then you can query for the particular state-s
----
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
queryableStoreRegistry.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
----
== Accessing the underlying KafkaStreams object
`StreamBuilderFactoryBean` from spring-kafka that is responsible for constructing the `KafkaStreams` object can be accessed programmatically.
Each `StreamBuilderFactoryBean` is registered as `stream-builder` and appended with the `StreamListener` method name.
If your `StreamListener` method is named as `process` for example, the stream builder bean is named as `stream-builder-process`.
Since this is a factory bean, it should be accessed by prepending an ampersand (`&`) when accessing it programmatically.
Following is an example and it assumes the `StreamListener` method is named as `process`
[source]
----
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
----

View File

@@ -454,7 +454,7 @@ public class Application {
== Error Channels
Starting with version 1.3, the binder unconditionally sends exceptions to an error channel for each consumer destination and can also be configured to send async producer send failures to an error channel.
See <<binder-error-channels>> for more information.
See <<spring-cloud-stream-overview-error-handling>> for more information.
The payload of the `ErrorMessage` for a send failure is a `KafkaSendFailureException` with properties:

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>

View File

@@ -73,7 +73,7 @@ class KafkaStreamsMessageConversionDelegate {
String contentType = this.kstreamBindingInformationCatalogue.getContentType(outboundBindTarget);
MessageConverter messageConverter = compositeMessageConverterFactory.getMessageConverterForAllRegistered();
return outboundBindTarget.map((k, v) -> {
return outboundBindTarget.mapValues((v) -> {
Message<?> message = v instanceof Message<?> ? (Message<?>) v :
MessageBuilder.withPayload(v).build();
Map<String, Object> headers = new HashMap<>(message.getHeaders());
@@ -81,9 +81,9 @@ class KafkaStreamsMessageConversionDelegate {
headers.put(MessageHeaders.CONTENT_TYPE, contentType);
}
MessageHeaders messageHeaders = new MessageHeaders(headers);
return new KeyValue<>(k,
return
messageConverter.toMessage(message.getPayload(),
messageHeaders).getPayload());
messageHeaders).getPayload();
});
}
@@ -137,10 +137,10 @@ class KafkaStreamsMessageConversionDelegate {
processErrorFromDeserialization(bindingTarget, branch[1]);
//first branch above is the branch where the messages are converted, let it go through further processing.
return branch[0].map((o, o2) -> {
KeyValue<Object, Object> objectObjectKeyValue = keyValueThreadLocal.get();
return branch[0].mapValues((o2) -> {
Object objectValue = keyValueThreadLocal.get().value;
keyValueThreadLocal.remove();
return objectObjectKeyValue;
return objectValue;
});
}

View File

@@ -20,14 +20,12 @@ import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -300,18 +298,18 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
else {
LOG.info("Native decoding is disabled for " + inboundName + ". Inbound message conversion done by Spring Cloud Stream.");
}
stream = stream.map((key, value) -> {
KeyValue<Object, Object> keyValue;
stream = stream.mapValues(value -> {
Object returnValue;
String contentType = bindingProperties.getContentType();
if (!StringUtils.isEmpty(contentType) && !nativeDecoding) {
Message<?> message = MessageBuilder.withPayload(value)
.setHeader(MessageHeaders.CONTENT_TYPE, contentType).build();
keyValue = new KeyValue<>(key, message);
returnValue = message;
} else {
returnValue = value;
}
else {
keyValue = new KeyValue<>(key, value);
}
return keyValue;
return returnValue;
});
return stream;
}
@@ -332,12 +330,11 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
ConfigurableListableBeanFactory beanFactory = this.applicationContext.getBeanFactory();
StreamsBuilderFactoryBean streamsBuilder = new StreamsBuilderFactoryBean();
streamsBuilder.setAutoStartup(false);
String uuid = UUID.randomUUID().toString();
BeanDefinition streamsBuilderBeanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsBuilderFactoryBean>) streamsBuilder.getClass(), () -> streamsBuilder)
.getRawBeanDefinition();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + uuid, streamsBuilderBeanDefinition);
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + uuid, StreamsBuilderFactoryBean.class);
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("stream-builder-" + method.getName(), streamsBuilderBeanDefinition);
StreamsBuilderFactoryBean streamsBuilderX = applicationContext.getBean("&stream-builder-" + method.getName(), StreamsBuilderFactoryBean.class);
String group = bindingProperties.getGroup();
if (!StringUtils.hasText(group)) {
group = binderConfigurationProperties.getApplicationId();
@@ -367,7 +364,7 @@ class KafkaStreamsStreamListenerSetupMethodOrchestrator implements StreamListene
BeanDefinition streamsConfigBeanDefinition =
BeanDefinitionBuilder.genericBeanDefinition((Class<StreamsConfig>) streamsConfig.getClass(), () -> streamsConfig)
.getRawBeanDefinition();
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("streamsConfig-" + uuid, streamsConfigBeanDefinition);
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition("streamsConfig-" + method.getName(), streamsConfigBeanDefinition);
streamsBuilder.setStreamsConfig(streamsConfig);
methodStreamsBuilderFactoryBeanMap.put(method, streamsBuilderX);

View File

@@ -20,6 +20,7 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.QueryableStoreType;
/**
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.state.QueryableStoreType;
* the user applications.
*
* @author Soby Chacko
* @author Renwei Han
* @since 2.0.0
*/
public class QueryableStoreRegistry {
@@ -44,9 +46,14 @@ public class QueryableStoreRegistry {
public <T> T getQueryableStoreType(String storeName, QueryableStoreType<T> storeType) {
for (KafkaStreams kafkaStream : kafkaStreams) {
T store = kafkaStream.store(storeName, storeType);
if (store != null) {
return store;
try{
T store = kafkaStream.store(storeName, storeType);
if (store != null) {
return store;
}
}
catch (InvalidStateStoreException ignored) {
//pass through
}
}
return null;

View File

@@ -106,10 +106,10 @@ public class KafkaStreamsBinderPojoInputAndPrimitiveTypeOutputTests {
template.sendDefault("{\"id\":\"123\"}");
ConsumerRecord<Integer, String> cr = KafkaTestUtils.getSingleRecord(consumer, "counts-id");
assertThat(cr.key().equals(123));
assertThat(cr.key()).isEqualTo(123);
ObjectMapper om = new ObjectMapper();
Long aLong = om.readValue(cr.value(), Long.class);
assertThat(aLong.equals(1L));
assertThat(aLong).isEqualTo(1L);
}
@EnableBinding(KafkaStreamsProcessor.class)

View File

@@ -24,11 +24,14 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -48,6 +51,7 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.SendTo;
@@ -101,6 +105,12 @@ public class KafkaStreamsBinderWordCountIntegrationTests {
"--spring.cloud.stream.kafka.streams.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
try {
receiveAndValidate(context);
//Assertions on StreamBuilderFactoryBean
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
ReadOnlyWindowStore<Object, Object> store = kafkaStreams.store("foo-WordCounts", QueryableStoreTypes.windowStore());
assertThat(store).isNotNull();
} finally {
context.close();
}

View File

@@ -1,5 +1,5 @@
eclipse.preferences.version=1
org.eclipse.jdt.ui.ignorelowercasenames=true
org.eclipse.jdt.ui.importorder=java;javax;com;org;org.springframework;ch.qos;\#;
org.eclipse.jdt.ui.importorder=java;javax;com;io.micrometer;org;org.springframework;ch.qos;\#;
org.eclipse.jdt.ui.ondemandthreshold=99
org.eclipse.jdt.ui.staticondemandthreshold=99

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>2.0.0.RELEASE</version>
<version>2.0.1.RELEASE</version>
</parent>
<dependencies>
@@ -37,11 +37,6 @@
<artifactId>spring-boot-autoconfigure</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2016-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -78,25 +78,31 @@ public class KafkaBinderHealthIndicator implements HealthIndicator {
public Health call() {
try {
if (metadataConsumer == null) {
metadataConsumer = consumerFactory.createConsumer();
}
Set<String> downMessages = new HashSet<>();
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
downMessages.add(partitionInfo.toString());
synchronized(KafkaBinderHealthIndicator.this) {
if (metadataConsumer == null) {
metadataConsumer = consumerFactory.createConsumer();
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
else {
return Health.down()
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
synchronized (metadataConsumer) {
Set<String> downMessages = new HashSet<>();
for (String topic : KafkaBinderHealthIndicator.this.binder.getTopicsInUse().keySet()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
if (KafkaBinderHealthIndicator.this.binder.getTopicsInUse().get(topic).getPartitionInfos()
.contains(partitionInfo) && partitionInfo.leader().id() == -1) {
downMessages.add(partitionInfo.toString());
}
}
}
if (downMessages.isEmpty()) {
return Health.up().build();
}
else {
return Health.down()
.withDetail("Following partitions in use have no leaders: ", downMessages.toString())
.build();
}
}
}
catch (Exception e) {

View File

@@ -20,11 +20,17 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.TimeGauge;
import io.micrometer.core.instrument.binder.MeterBinder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
@@ -50,9 +56,13 @@ import org.springframework.util.ObjectUtils;
* @author Artem Bilan
* @author Oleg Zhurakousky
* @author Jon Schneider
* @author Thomas Cheyney
* @author Gary Russell
*/
public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<BindingCreatedEvent> {
private static final int DEFAULT_TIMEOUT = 60;
private final static Log LOG = LogFactory.getLog(KafkaBinderMetrics.class);
static final String METRIC_NAME = "spring.cloud.stream.binder.kafka.offset";
@@ -65,6 +75,10 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
private final MeterRegistry meterRegistry;
private Consumer<?, ?> metadataConsumer;
private int timeout = DEFAULT_TIMEOUT;
public KafkaBinderMetrics(KafkaMessageChannelBinder binder,
KafkaBinderConfigurationProperties binderConfigurationProperties,
ConsumerFactory<?, ?> defaultConsumerFactory, @Nullable MeterRegistry meterRegistry) {
@@ -81,6 +95,10 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
this(binder, binderConfigurationProperties, null, null);
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
@Override
public void bindTo(MeterRegistry registry) {
for (Map.Entry<String, KafkaMessageChannelBinder.TopicInformation> topicInfo : this.binder.getTopicsInUse()
@@ -103,30 +121,56 @@ public class KafkaBinderMetrics implements MeterBinder, ApplicationListener<Bind
}
private double calculateConsumerLagOnTopic(String topic, String group) {
long lag = 0;
try (Consumer<?, ?> metadataConsumer = createConsumerFactory(group).createConsumer()) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new LinkedList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Long> future = exec.submit(() -> {
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
if (current != null) {
lag += endOffset.getValue() - current.offset();
long lag = 0;
try {
if (metadataConsumer == null) {
synchronized(KafkaBinderMetrics.this) {
if (metadataConsumer == null) {
metadataConsumer = createConsumerFactory(group).createConsumer();
}
}
}
else {
lag += endOffset.getValue();
synchronized (metadataConsumer) {
List<PartitionInfo> partitionInfos = metadataConsumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = new LinkedList<>();
for (PartitionInfo partitionInfo : partitionInfos) {
topicPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
Map<TopicPartition, Long> endOffsets = metadataConsumer.endOffsets(topicPartitions);
for (Map.Entry<TopicPartition, Long> endOffset : endOffsets.entrySet()) {
OffsetAndMetadata current = metadataConsumer.committed(endOffset.getKey());
if (current != null) {
lag += endOffset.getValue() - current.offset();
}
else {
lag += endOffset.getValue();
}
}
}
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
}
return lag;
});
try {
return future.get(this.timeout, TimeUnit.SECONDS);
}
catch (Exception e) {
LOG.debug("Cannot generate metric for topic: " + topic, e);
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return 0L;
}
catch (ExecutionException | TimeoutException e) {
return 0L;
}
finally {
exec.shutdownNow();
}
return lag;
}
private ConsumerFactory<?, ?> createConsumerFactory(String group) {

View File

@@ -41,7 +41,9 @@ import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@@ -74,11 +76,11 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
producerFactory);
assertTrue(producerConfigs.get("batch.size").equals(10));
assertTrue(producerConfigs.get("key.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("key.deserializer") == null);
assertTrue(producerConfigs.get("value.serializer").equals(LongSerializer.class));
assertTrue(producerConfigs.get("value.deserializer") == null);
assertTrue(producerConfigs.get("compression.type").equals("snappy"));
assertEquals(producerConfigs.get("key.serializer"), LongSerializer.class);
assertNull(producerConfigs.get("key.deserializer"));
assertEquals(producerConfigs.get("value.serializer"), LongSerializer.class);
assertNull(producerConfigs.get("value.deserializer"));
assertEquals("snappy", producerConfigs.get("compression.type"));
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9092");
bootstrapServers.add("10.98.09.196:9092");
@@ -95,12 +97,12 @@ public class KafkaBinderAutoConfigurationPropertiesTest {
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(LongDeserializer.class));
assertTrue(consumerConfigs.get("key.serializer") == null);
assertTrue(consumerConfigs.get("value.deserializer").equals(LongDeserializer.class));
assertTrue(consumerConfigs.get("value.serialized") == null);
assertTrue(consumerConfigs.get("group.id").equals("groupIdFromBootConfig"));
assertTrue(consumerConfigs.get("auto.offset.reset").equals("earliest"));
assertEquals(consumerConfigs.get("key.deserializer"), LongDeserializer.class);
assertNull(consumerConfigs.get("key.serializer"));
assertEquals(consumerConfigs.get("value.deserializer"), LongDeserializer.class);
assertNull(consumerConfigs.get("value.serialized"));
assertEquals("groupIdFromBootConfig", consumerConfigs.get("group.id"));
assertEquals("earliest", consumerConfigs.get("auto.offset.reset"));
assertTrue((((List<String>) consumerConfigs.get("bootstrap.servers")).containsAll(bootstrapServers)));
}

View File

@@ -41,6 +41,7 @@ import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ReflectionUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -75,11 +76,11 @@ public class KafkaBinderConfigurationPropertiesTest {
ReflectionUtils.makeAccessible(producerFactoryConfigField);
Map<String, Object> producerConfigs = (Map<String, Object>) ReflectionUtils.getField(producerFactoryConfigField,
producerFactory);
assertTrue(producerConfigs.get("batch.size").equals("12345"));
assertTrue(producerConfigs.get("linger.ms").equals("100"));
assertTrue(producerConfigs.get("key.serializer").equals(ByteArraySerializer.class));
assertTrue(producerConfigs.get("value.serializer").equals(ByteArraySerializer.class));
assertTrue(producerConfigs.get("compression.type").equals("gzip"));
assertEquals("12345", producerConfigs.get("batch.size"));;
assertEquals("100", producerConfigs.get("linger.ms"));
assertEquals(producerConfigs.get("key.serializer"), ByteArraySerializer.class);
assertEquals(producerConfigs.get("value.serializer"), ByteArraySerializer.class);
assertEquals("gzip", producerConfigs.get("compression.type"));
List<String> bootstrapServers = new ArrayList<>();
bootstrapServers.add("10.98.09.199:9082");
assertTrue((((String) producerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
@@ -95,8 +96,8 @@ public class KafkaBinderConfigurationPropertiesTest {
ReflectionUtils.makeAccessible(consumerFactoryConfigField);
Map<String, Object> consumerConfigs = (Map<String, Object>) ReflectionUtils.getField(consumerFactoryConfigField,
consumerFactory);
assertTrue(consumerConfigs.get("key.deserializer").equals(ByteArrayDeserializer.class));
assertTrue(consumerConfigs.get("value.deserializer").equals(ByteArrayDeserializer.class));
assertEquals(consumerConfigs.get("key.deserializer"), ByteArrayDeserializer.class);
assertEquals(consumerConfigs.get("value.deserializer"), ByteArrayDeserializer.class);
assertTrue((((String) consumerConfigs.get("bootstrap.servers")).contains("10.98.09.199:9082")));
}

View File

@@ -23,9 +23,11 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.TimeGauge;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
@@ -33,6 +35,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.TopicInformation;
@@ -43,6 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Henryk Konsek
* @author Thomas Cheyney
*/
public class KafkaBinderMetricsTest {
@@ -123,6 +127,37 @@ public class KafkaBinderMetricsTest {
assertThat(meterRegistry.getMeters()).isEmpty();
}
@Test
public void createsConsumerOnceWhenInvokedMultipleTimes() {
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
metrics.bindTo(meterRegistry);
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge();
gauge.value(TimeUnit.MILLISECONDS);
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
org.mockito.Mockito.verify(this.consumerFactory).createConsumer();
}
@Test
public void consumerCreationFailsFirstTime() {
org.mockito.BDDMockito.given(consumerFactory.createConsumer()).willThrow(KafkaException.class)
.willReturn(consumer);
final List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
topicsInUse.put(TEST_TOPIC, new TopicInformation("group", partitions));
metrics.bindTo(meterRegistry);
TimeGauge gauge = meterRegistry.get(KafkaBinderMetrics.METRIC_NAME).tag("group", "group").tag("topic", TEST_TOPIC).timeGauge();
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(0);
assertThat(gauge.value(TimeUnit.MILLISECONDS)).isEqualTo(1000.0);
org.mockito.Mockito.verify(this.consumerFactory, Mockito.times(2)).createConsumer();
}
private List<PartitionInfo> partitions(Node... nodes) {
List<PartitionInfo> partitions = new ArrayList<>();
for (int i = 0; i < nodes.length; i++) {

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.stream.binder.kafka.bootstrap;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
@@ -35,7 +36,7 @@ public class KafkaBinderBootstrapTest {
@Test
public void testKafkaBinderConfiguration() throws Exception {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(SimpleApplication.class)
.web(false)
.web(WebApplicationType.NONE)
.run("--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(),
"--spring.cloud.stream.kafka.binder.zkNodes=" + embeddedKafka.getZookeeperConnectionString());
applicationContext.close();