Compare commits

...

15 Commits

Author SHA1 Message Date
buildmaster
2851464bd0 Update SNAPSHOT to 3.2.2 2022-02-16 18:29:43 +00:00
Soby Chacko
da049fc980 Update Spring Kafka/SIK versions 2022-02-16 10:08:33 -05:00
Gary Russell
170166ac57 GH-1195: Fix Pause/Resume Documentation
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1195

Remove obsolete documentation.

**cherry-pick to 3.2.x**
2022-02-07 14:33:45 -05:00
Rex Ijiekhuamen
5b880a8104 Fixed invalid java code snippet 2022-01-24 10:23:17 -05:00
Soby Chacko
42d3b92c7b Test package changes 2022-01-18 15:52:18 -05:00
Soby Chacko
577ffbb67f Enable custom binder health check impelementation
Currently, KafkaBinderHealthIndicator is not customizable and included by default
when Spring Boot actuator is on the classpath. Fix this by allowing the application
to provide a custom implementation. A new marker interface called KafkaBinderHealth
can be used by the applicaiton to provide a custom HealthIndicator implementation, in
which case, the binder's default implementation will be excluded.

Tests and docs changes.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1180
2022-01-13 09:56:32 -05:00
Soby Chacko
3770db7844 Retries for HostInfo in InteractiveQueryService
InteractiveQueryService methods for finding the host info for Kafka Streams
currently throw exceptions if the underlying KafkaStreams are not ready yet.
Introduce a retry mechanism so that the users can control the behaviour of
these methods by providing the following properties.

spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts (default 1)
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backoffPeriod (default 1000 ms).

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1185
2022-01-11 18:49:23 -05:00
Eduard Domínguez
406e20f19c Fix: KeySerde setup not using expected key type headers
checkstyle fixes
2022-01-11 14:37:20 -05:00
Soby Chacko
648188fc6b Event type routing improvements (Kafka Streams)
When routing by event types, the deserializer omits the
topic and header information. Fixing this issue.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1186
2022-01-05 19:30:36 -05:00
Eduard Domínguez
63b306d34c GH-1176: KeyValueSerdeResolver improvements
Use extended properties when initializing Consumer and Producer Serdes.

Updated copyright years and authors.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1176
2021-12-10 14:02:29 -05:00
buildmaster
5cd8e06ec6 Bumping versions to 3.2.2-SNAPSHOT after release 2021-12-01 16:58:07 +00:00
buildmaster
79be11c9e9 Going back to snapshots 2021-12-01 16:58:07 +00:00
buildmaster
fc4358ba10 Update SNAPSHOT to 3.2.1 2021-12-01 16:55:37 +00:00
buildmaster
f3d2287b70 Bumping versions to 3.2.1-SNAPSHOT after release 2021-12-01 13:16:23 +00:00
buildmaster
220ae98bcc Going back to snapshots 2021-12-01 13:16:23 +00:00
18 changed files with 315 additions and 107 deletions

View File

@@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.0</version>
<version>3.2.2</version>
</parent>
<packaging>jar</packaging>
<name>spring-cloud-stream-binder-kafka-docs</name>

View File

@@ -9,6 +9,7 @@
|spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|spring.cloud.stream.function.batch-mode | `false` |
|spring.cloud.stream.function.bindings | |
|spring.cloud.stream.input-bindings | | A semi-colon delimited string to explicitly define input bindings (specifically for cases when there is no implicit trigger to create such bindings such as Function, Supplier or Consumer).
|spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index'
@@ -54,9 +55,10 @@
|spring.cloud.stream.metrics.meter-filter | | Pattern to control the 'meters' one wants to capture. By default all 'meters' will be captured. For example, 'spring.integration.*' will only capture metric information for meters whose name starts with 'spring.integration'.
|spring.cloud.stream.metrics.properties | | Application properties that should be added to the metrics payload For example: `spring.application**`.
|spring.cloud.stream.metrics.schedule-interval | `60s` | Interval expressed as Duration for scheduling metrics snapshots publishing. Defaults to 60 seconds
|spring.cloud.stream.output-bindings | | A semi-colon delimited string to explicitly define output bindings (specifically for cases when there is no implicit trigger to create such bindings such as Function, Supplier or Consumer).
|spring.cloud.stream.override-cloud-connectors | `false` | This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.
|spring.cloud.stream.pollable-source | `none` | A semi-colon delimited list of binding names of pollable sources. Binding names follow the same naming convention as functions. For example, name '...pollable-source=foobar' will be accessible as 'foobar-iin-0'' binding
|spring.cloud.stream.sendto.destination | `none` | The name of the header used to determine the name of the output destination
|spring.cloud.stream.source | | A colon delimited string representing the names of the sources based on which source bindings will be created. This is primarily to support cases where source binding may be required without providing a corresponding Supplier. (e.g., for cases where the actual source of data is outside of scope of spring-cloud-stream - HTTP -> Stream)
|spring.cloud.stream.source | | A semi-colon delimited string representing the names of the sources based on which source bindings will be created. This is primarily to support cases where source binding may be required without providing a corresponding Supplier. (e.g., for cases where the actual source of data is outside of scope of spring-cloud-stream - HTTP -> Stream) @deprecated use {@link #outputBindings}
|===

View File

@@ -826,7 +826,7 @@ It will use that for inbound deserialization.
```
@Bean
public Serde<Foo() customSerde{
public Serde<Foo> customSerde() {
...
}
@@ -1261,7 +1261,7 @@ ReadOnlyKeyValueStore<Object, Object> keyValueStore =
----
During the startup, the above method call to retrieve the store might fail.
For e.g it might still be in the middle of initializing the state store.
For example, it might still be in the middle of initializing the state store.
In such cases, it will be useful to retry this operation.
Kafka Streams binder provides a simple retry mechanism to accommodate this.
@@ -1296,6 +1296,10 @@ else {
}
----
For more information on these host finding methods, please see the Javadoc on the methods.
For these methods also, during startup, if the underlying KafkaStreams objects are not ready, they might throw exceptions.
The aforementioned retry properties are applicable for these methods as well.
==== Other API methods available through the InteractiveQueryService
Use the following API method to retrieve the `KeyQueryMetadata` object associated with the combination of given store and key.

View File

@@ -660,41 +660,10 @@ See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/m
===== Example: Pausing and Resuming the Consumer
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
This is facilitated by adding the `Consumer` as a parameter to your `@StreamListener`.
To resume, you need an `ApplicationListener` for `ListenerContainerIdleEvent` instances.
This is facilitated by managing the binding lifecycle as shown in **Binding visualization and control** in the Spring Cloud Stream documentation, using `State.PAUSED` and `State.RESUMED`.
To resume, you can use an `ApplicationListener` (or `@EventListener` method) to receive `ListenerContainerIdleEvent` instances.
The frequency at which events are published is controlled by the `idleEventInterval` property.
Since the consumer is not thread-safe, you must call these methods on the calling thread.
The following simple application shows how to pause and resume:
[source, java]
----
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
----
[[kafka-transactional-binder]]
=== Transactional Binder
@@ -971,3 +940,27 @@ public AdminClientConfigCustomizer adminClientConfigCustomizer() {
};
}
```
[[custom-kafka-binder-health-indicator]]
=== Custom Kafka Binder Health Indicator
Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath.
This health indicator checks the health of the binder and any communication issues with the Kafka broker.
If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for `KafkaBinderHealth` interface.
`KafkaBinderHealth` is a marker interface that extends from `HealthIndicator`.
In the custom implementation, it must provide an implementation for the `health()` method.
The custom implementation must be present in the application configuration as a bean.
When the binder discovers the custom implementation, it will use that instead of the default implementation.
Here is an example of such a custom implementation bean in the application.
```
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
```

10
pom.xml
View File

@@ -2,12 +2,12 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.0</version>
<version>3.2.2</version>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-build</artifactId>
<version>3.1.0</version>
<version>3.1.1</version>
<relativePath />
</parent>
<scm>
@@ -21,10 +21,10 @@
</scm>
<properties>
<java.version>1.8</java.version>
<spring-kafka.version>2.8.0</spring-kafka.version>
<spring-integration-kafka.version>5.5.5</spring-integration-kafka.version>
<spring-kafka.version>2.8.2</spring-kafka.version>
<spring-integration-kafka.version>5.5.8</spring-integration-kafka.version>
<kafka.version>3.0.0</kafka.version>
<spring-cloud-stream.version>3.2.0</spring-cloud-stream.version>
<spring-cloud-stream.version>3.2.2</spring-cloud-stream.version>
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>

View File

@@ -4,7 +4,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.0</version>
<version>3.2.2</version>
</parent>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<description>Spring Cloud Starter Stream Kafka</description>

View File

@@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.0</version>
<version>3.2.2</version>
</parent>
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
<description>Spring Cloud Stream Kafka Binder Core</description>

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.0</version>
<version>3.2.2</version>
</parent>
<properties>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,7 +19,9 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@@ -40,9 +42,10 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -449,12 +452,15 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
//See this issue for more context: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1003
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
AtomicBoolean matched = new AtomicBoolean();
AtomicReference<String> topicObject = new AtomicReference<>();
AtomicReference<Headers> headersObject = new AtomicReference<>();
// Processor to retrieve the header value.
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched));
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched, topicObject, headersObject));
// Branching based on event type match.
final KStream<?, ?>[] branch = stream.branch((key, value) -> matched.getAndSet(false));
// Deserialize if we have a branch from above.
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(null, ((Bytes) value).get()));
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(
topicObject.get(), headersObject.get(), ((Bytes) value).get()));
return getkStream(bindingProperties, deserializedKStream, nativeDecoding);
}
return getkStream(bindingProperties, stream, nativeDecoding);
@@ -549,14 +555,18 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
consumed);
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
AtomicBoolean matched = new AtomicBoolean();
AtomicReference<String> topicObject = new AtomicReference<>();
AtomicReference<Headers> headersObject = new AtomicReference<>();
final KStream<?, ?> stream = kTable.toStream();
// Processor to retrieve the header value.
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched));
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched, topicObject, headersObject));
// Branching based on event type match.
final KStream<?, ?>[] branch = stream.branch((key, value) -> matched.getAndSet(false));
// Deserialize if we have a branch from above.
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(null, ((Bytes) value).get()));
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(
topicObject.get(), headersObject.get(), ((Bytes) value).get()));
return deserializedKStream.toTable();
}
@@ -581,19 +591,27 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
return consumed;
}
private <K, V> Processor<K, V> eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, AtomicBoolean matched) {
return new Processor() {
private <K, V> Processor<K, V, Void, Void> eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
AtomicBoolean matched, AtomicReference<String> topicObject, AtomicReference<Headers> headersObject) {
return new Processor<K, V, Void, Void>() {
ProcessorContext context;
org.apache.kafka.streams.processor.api.ProcessorContext<?, ?> context;
@Override
public void init(ProcessorContext context) {
public void init(org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> context) {
Processor.super.init(context);
this.context = context;
}
@Override
public void process(Object key, Object value) {
final Headers headers = this.context.headers();
public void process(Record<K, V> record) {
final Headers headers = record.headers();
headersObject.set(headers);
final Optional<RecordMetadata> optional = this.context.recordMetadata();
if (optional.isPresent()) {
final RecordMetadata recordMetadata = optional.get();
topicObject.set(recordMetadata.topic());
}
final Iterable<Header> eventTypeHeader = headers.headers(kafkaStreamsConsumerProperties.getEventTypeHeaderKey());
if (eventTypeHeader != null && eventTypeHeader.iterator().hasNext()) {
String eventTypeFromHeader = new String(eventTypeHeader.iterator().next().value());

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -84,15 +84,7 @@ public class InteractiveQueryService {
*/
public <T> T getQueryableStore(String storeName, QueryableStoreType<T> storeType) {
RetryTemplate retryTemplate = new RetryTemplate();
KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
final RetryTemplate retryTemplate = getRetryTemplate();
KafkaStreams contextSpecificKafkaStreams = getThreadContextSpecificKafkaStreams();
@@ -191,7 +183,7 @@ public class InteractiveQueryService {
* through all the consumer instances under the same application id and retrieves the
* proper host.
*
* Note that the end user applications must provide `applicaiton.server` as a
* Note that the end user applications must provide `application.server` as a
* configuration property for all the application instances when calling this method.
* If this is not available, then null maybe returned.
* @param <K> generic type for key
@@ -201,11 +193,40 @@ public class InteractiveQueryService {
* @return the {@link HostInfo} where the key for the provided store is hosted currently
*/
public <K> HostInfo getHostInfo(String store, K key, Serializer<K> serializer) {
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
.stream()
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
return keyQueryMetadata != null ? keyQueryMetadata.getActiveHost() : null;
final RetryTemplate retryTemplate = getRetryTemplate();
return retryTemplate.execute(context -> {
Throwable throwable = null;
try {
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
.stream()
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
if (keyQueryMetadata != null) {
return keyQueryMetadata.activeHost();
}
}
catch (Exception e) {
throwable = e;
}
throw new IllegalStateException(
"Error when retrieving state store", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
});
}
private RetryTemplate getRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
/**

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
@@ -69,6 +70,7 @@ import org.springframework.util.StringUtils;
*
* @author Soby Chacko
* @author Lei Chen
* @author Eduard Domínguez
*/
public class KeyValueSerdeResolver implements ApplicationContextAware {
@@ -96,14 +98,14 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
KafkaStreamsConsumerProperties extendedConsumerProperties) {
String keySerdeString = extendedConsumerProperties.getKeySerde();
return getKeySerde(keySerdeString);
return getKeySerde(keySerdeString, extendedConsumerProperties.getConfiguration());
}
public Serde<?> getInboundKeySerde(
KafkaStreamsConsumerProperties extendedConsumerProperties, ResolvableType resolvableType) {
String keySerdeString = extendedConsumerProperties.getKeySerde();
return getKeySerde(keySerdeString, resolvableType);
return getKeySerde(keySerdeString, resolvableType, extendedConsumerProperties.getConfiguration());
}
/**
@@ -120,7 +122,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
String valueSerdeString = extendedConsumerProperties.getValueSerde();
try {
if (consumerProperties != null && consumerProperties.isUseNativeDecoding()) {
valueSerde = getValueSerde(valueSerdeString);
valueSerde = getValueSerde(valueSerdeString, extendedConsumerProperties.getConfiguration());
}
else {
valueSerde = Serdes.ByteArray();
@@ -140,7 +142,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
String valueSerdeString = extendedConsumerProperties.getValueSerde();
try {
if (consumerProperties != null && consumerProperties.isUseNativeDecoding()) {
valueSerde = getValueSerde(valueSerdeString, resolvableType);
valueSerde = getValueSerde(valueSerdeString, resolvableType, extendedConsumerProperties.getConfiguration());
}
else {
valueSerde = Serdes.ByteArray();
@@ -158,11 +160,11 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
* @return configurd {@link Serde} for the outbound key.
*/
public Serde<?> getOuboundKeySerde(KafkaStreamsProducerProperties properties) {
return getKeySerde(properties.getKeySerde());
return getKeySerde(properties.getKeySerde(), properties.getConfiguration());
}
public Serde<?> getOuboundKeySerde(KafkaStreamsProducerProperties properties, ResolvableType resolvableType) {
return getKeySerde(properties.getKeySerde(), resolvableType);
return getKeySerde(properties.getKeySerde(), resolvableType, properties.getConfiguration());
}
@@ -179,7 +181,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
try {
if (producerProperties.isUseNativeEncoding()) {
valueSerde = getValueSerde(
kafkaStreamsProducerProperties.getValueSerde());
kafkaStreamsProducerProperties.getValueSerde(), kafkaStreamsProducerProperties.getConfiguration());
}
else {
valueSerde = Serdes.ByteArray();
@@ -197,7 +199,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
try {
if (producerProperties.isUseNativeEncoding()) {
valueSerde = getValueSerde(
kafkaStreamsProducerProperties.getValueSerde(), resolvableType);
kafkaStreamsProducerProperties.getValueSerde(), resolvableType, kafkaStreamsProducerProperties.getConfiguration());
}
else {
valueSerde = Serdes.ByteArray();
@@ -215,7 +217,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
* @return {@link Serde} for the state store key.
*/
public Serde<?> getStateStoreKeySerde(String keySerdeString) {
return getKeySerde(keySerdeString);
return getKeySerde(keySerdeString, (Map<String, ?>) null);
}
/**
@@ -225,14 +227,14 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
*/
public Serde<?> getStateStoreValueSerde(String valueSerdeString) {
try {
return getValueSerde(valueSerdeString);
return getValueSerde(valueSerdeString, (Map<String, ?>) null);
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException("Serde class not found: ", ex);
}
}
private Serde<?> getKeySerde(String keySerdeString) {
private Serde<?> getKeySerde(String keySerdeString, Map<String, ?> extendedConfiguration) {
Serde<?> keySerde;
try {
if (StringUtils.hasText(keySerdeString)) {
@@ -241,8 +243,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
else {
keySerde = getFallbackSerde("default.key.serde");
}
keySerde.configure(this.streamConfigGlobalProperties, true);
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), true);
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException("Serde class not found: ", ex);
@@ -250,7 +251,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
return keySerde;
}
private Serde<?> getKeySerde(String keySerdeString, ResolvableType resolvableType) {
private Serde<?> getKeySerde(String keySerdeString, ResolvableType resolvableType, Map<String, ?> extendedConfiguration) {
Serde<?> keySerde = null;
try {
if (StringUtils.hasText(keySerdeString)) {
@@ -267,7 +268,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
keySerde = Serdes.ByteArray();
}
}
keySerde.configure(this.streamConfigGlobalProperties, true);
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), true);
}
catch (ClassNotFoundException ex) {
throw new IllegalStateException("Serde class not found: ", ex);
@@ -380,7 +381,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
}
private Serde<?> getValueSerde(String valueSerdeString)
private Serde<?> getValueSerde(String valueSerdeString, Map<String, ?> extendedConfiguration)
throws ClassNotFoundException {
Serde<?> valueSerde;
if (StringUtils.hasText(valueSerdeString)) {
@@ -389,7 +390,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
else {
valueSerde = getFallbackSerde("default.value.serde");
}
valueSerde.configure(this.streamConfigGlobalProperties, false);
valueSerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
return valueSerde;
}
@@ -403,7 +404,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
}
@SuppressWarnings("unchecked")
private Serde<?> getValueSerde(String valueSerdeString, ResolvableType resolvableType)
private Serde<?> getValueSerde(String valueSerdeString, ResolvableType resolvableType, Map<String, ?> extendedConfiguration)
throws ClassNotFoundException {
Serde<?> valueSerde = null;
if (StringUtils.hasText(valueSerdeString)) {
@@ -422,7 +423,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
valueSerde = Serdes.ByteArray();
}
}
valueSerde.configure(streamConfigGlobalProperties, false);
valueSerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
return valueSerde;
}
@@ -430,4 +431,15 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
context = (ConfigurableApplicationContext) applicationContext;
}
private Map<String, ?> combineStreamConfigProperties(Map<String, ?> extendedConfiguration) {
if (extendedConfiguration != null && !extendedConfiguration.isEmpty()) {
Map<String, Object> streamConfiguration = new HashMap(this.streamConfigGlobalProperties);
streamConfiguration.putAll(extendedConfiguration);
return streamConfiguration;
}
else {
return this.streamConfigGlobalProperties;
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
@@ -125,11 +126,39 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
catch (Exception ignored) {
}
Mockito.verify(mockKafkaStreams, times(3))
.store(StoreQueryParameters.fromNameAndType("foo", storeType));
}
@Test
public void testStateStoreRetrievalRetryForHostInfoService() {
StreamsBuilderFactoryBean mock = Mockito.mock(StreamsBuilderFactoryBean.class);
KafkaStreams mockKafkaStreams = Mockito.mock(KafkaStreams.class);
Mockito.when(mock.getKafkaStreams()).thenReturn(mockKafkaStreams);
KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry();
kafkaStreamsRegistry.registerKafkaStreams(mock);
Mockito.when(mock.isRunning()).thenReturn(true);
Properties mockProperties = new Properties();
mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "foobarApp-123");
Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties);
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties =
new KafkaStreamsBinderConfigurationProperties(new KafkaProperties());
binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3);
InteractiveQueryService interactiveQueryService = new InteractiveQueryService(kafkaStreamsRegistry,
binderConfigurationProperties);
QueryableStoreType<ReadOnlyKeyValueStore<Object, Object>> storeType = QueryableStoreTypes.keyValueStore();
final StringSerializer serializer = new StringSerializer();
try {
interactiveQueryService.getHostInfo("foo", "fooKey", serializer);
}
catch (Exception ignored) {
}
Mockito.verify(mockKafkaStreams, times(3))
.queryMetadataForKey("foo", "fooKey", serializer);
}
@Test
@Ignore
public void testKstreamBinderWithPojoInputAndStringOuput() throws Exception {

View File

@@ -20,6 +20,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
@@ -28,18 +31,22 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
/**
* @author Soby Chacko
* @author Eduard Domínguez
*/
public class KafkaStreamsBinderBootstrapTest {
@@ -111,7 +118,7 @@ public class KafkaStreamsBinderBootstrapTest {
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart",
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id"
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo",
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=com.test.MyClass",
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=" + this.getClass().getName() + ".determineType",
"--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id"
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar",
"--spring.cloud.stream.kafka.streams.binder.brokers="
@@ -134,10 +141,28 @@ public class KafkaStreamsBinderBootstrapTest {
final StreamsBuilderFactoryBean input3SBFB = applicationContext.getBean("&stream-builder-input3", StreamsBuilderFactoryBean.class);
final Properties streamsConfiguration3 = input3SBFB.getStreamsConfiguration();
assertThat(streamsConfiguration3.containsKey("spring.json.value.type.method")).isFalse();
applicationContext.getBean(KeyValueSerdeResolver.class);
String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2");
assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver);
String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
.getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName"));
assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName);
String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName"));
assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName);
applicationContext.close();
}
public static JavaType determineType(byte[] data, Headers headers) {
return TypeFactory.defaultInstance().constructParametricType(Map.class, String.class, String.class);
}
@SpringBootApplication
static class SimpleKafkaStreamsApplication {
@@ -149,7 +174,7 @@ public class KafkaStreamsBinderBootstrapTest {
}
@Bean
public Consumer<KTable<Object, String>> input2() {
public Consumer<KTable<Map<String, String>, Map<String, String>>> input2() {
return s -> {
// No-op consumer
};

View File

@@ -10,7 +10,7 @@
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.2.0</version>
<version>3.2.2</version>
</parent>
<dependencies>

View File

@@ -0,0 +1,29 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import org.springframework.boot.actuate.health.HealthIndicator;
/**
* Marker interface used for custom KafkaBinderHealth indicator implementations.
*
* @author Soby Chacko
* @since 3.2.2
*/
public interface KafkaBinderHealth extends HealthIndicator {
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,7 +35,6 @@ import org.apache.kafka.common.PartitionInfo;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.actuate.health.StatusAggregator;
import org.springframework.kafka.core.ConsumerFactory;
@@ -55,7 +54,7 @@ import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
* @author Chukwubuikem Ume-Ugwa
* @author Taras Danylchuk
*/
public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBean {
public class KafkaBinderHealthIndicator implements KafkaBinderHealth, DisposableBean {
private static final int DEFAULT_TIMEOUT = 60;
@@ -73,7 +72,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe
private boolean considerDownWhenAnyPartitionHasNoLeader;
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
ConsumerFactory<?, ?> consumerFactory) {
ConsumerFactory<?, ?> consumerFactory) {
this.binder = binder;
this.consumerFactory = consumerFactory;
}
@@ -219,7 +218,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe
}
@Override
public void destroy() throws Exception {
public void destroy() {
executor.shutdown();
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2018-2019 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,6 +24,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
@@ -38,11 +40,13 @@ import org.springframework.util.ObjectUtils;
*
* @author Oleg Zhurakousky
* @author Chukwubuikem Ume-Ugwa
* @author Soby Chacko
*/
@Configuration
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
@ConditionalOnEnabledHealthIndicator("binders")
@ConditionalOnMissingBean(KafkaBinderHealth.class)
public class KafkaBinderHealthIndicatorConfiguration {
@Bean

View File

@@ -0,0 +1,72 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.bootstrap;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
/**
* @author Soby Chacko
*/
public class KafkaBinderCustomHealthCheckTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10);
@Test
public void testCustomHealthIndicatorIsActivated() {
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
CustomHealthCheckApplication.class).web(WebApplicationType.NONE).run(
"--spring.cloud.stream.kafka.binder.brokers="
+ embeddedKafka.getEmbeddedKafka().getBrokersAsString());
final KafkaBinderHealth kafkaBinderHealth = applicationContext.getBean(KafkaBinderHealth.class);
assertThat(kafkaBinderHealth).isInstanceOf(CustomHealthIndicator.class);
assertThatThrownBy(() -> applicationContext.getBean(KafkaBinderHealthIndicator.class)).isInstanceOf(NoSuchBeanDefinitionException.class);
applicationContext.close();
}
@SpringBootApplication
static class CustomHealthCheckApplication {
@Bean
public CustomHealthIndicator kafkaBinderHealthIndicator() {
return new CustomHealthIndicator();
}
}
static class CustomHealthIndicator implements KafkaBinderHealth {
@Override
public Health health() {
return null;
}
}
}