Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2851464bd0 | ||
|
|
da049fc980 | ||
|
|
170166ac57 | ||
|
|
5b880a8104 | ||
|
|
42d3b92c7b | ||
|
|
577ffbb67f | ||
|
|
3770db7844 | ||
|
|
406e20f19c | ||
|
|
648188fc6b | ||
|
|
63b306d34c | ||
|
|
5cd8e06ec6 | ||
|
|
79be11c9e9 |
@@ -7,7 +7,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<version>3.2.2</version>
|
||||
</parent>
|
||||
<packaging>jar</packaging>
|
||||
<name>spring-cloud-stream-binder-kafka-docs</name>
|
||||
|
||||
@@ -9,6 +9,7 @@
|
||||
|spring.cloud.stream.dynamic-destinations | `[]` | A list of destinations that can be bound dynamically. If set, only listed destinations can be bound.
|
||||
|spring.cloud.stream.function.batch-mode | `false` |
|
||||
|spring.cloud.stream.function.bindings | |
|
||||
|spring.cloud.stream.input-bindings | | A semi-colon delimited string to explicitly define input bindings (specifically for cases when there is no implicit trigger to create such bindings such as Function, Supplier or Consumer).
|
||||
|spring.cloud.stream.instance-count | `1` | The number of deployed instances of an application. Default: 1. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-count" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index | `0` | The instance id of the application: a number from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index" where 'foo' is the name of the binding.
|
||||
|spring.cloud.stream.instance-index-list | | A list of instance id's from 0 to instanceCount-1. Used for partitioning and with Kafka. NOTE: Could also be managed per individual binding "spring.cloud.stream.bindings.foo.consumer.instance-index-list" where 'foo' is the name of the binding. This setting will override the one set in 'spring.cloud.stream.instance-index'
|
||||
@@ -54,9 +55,10 @@
|
||||
|spring.cloud.stream.metrics.meter-filter | | Pattern to control the 'meters' one wants to capture. By default all 'meters' will be captured. For example, 'spring.integration.*' will only capture metric information for meters whose name starts with 'spring.integration'.
|
||||
|spring.cloud.stream.metrics.properties | | Application properties that should be added to the metrics payload For example: `spring.application**`.
|
||||
|spring.cloud.stream.metrics.schedule-interval | `60s` | Interval expressed as Duration for scheduling metrics snapshots publishing. Defaults to 60 seconds
|
||||
|spring.cloud.stream.output-bindings | | A semi-colon delimited string to explicitly define output bindings (specifically for cases when there is no implicit trigger to create such bindings such as Function, Supplier or Consumer).
|
||||
|spring.cloud.stream.override-cloud-connectors | `false` | This property is only applicable when the cloud profile is active and Spring Cloud Connectors are provided with the application. If the property is false (the default), the binder detects a suitable bound service (for example, a RabbitMQ service bound in Cloud Foundry for the RabbitMQ binder) and uses it for creating connections (usually through Spring Cloud Connectors). When set to true, this property instructs binders to completely ignore the bound services and rely on Spring Boot properties (for example, relying on the spring.rabbitmq.* properties provided in the environment for the RabbitMQ binder). The typical usage of this property is to be nested in a customized environment when connecting to multiple systems.
|
||||
|spring.cloud.stream.pollable-source | `none` | A semi-colon delimited list of binding names of pollable sources. Binding names follow the same naming convention as functions. For example, name '...pollable-source=foobar' will be accessible as 'foobar-iin-0'' binding
|
||||
|spring.cloud.stream.sendto.destination | `none` | The name of the header used to determine the name of the output destination
|
||||
|spring.cloud.stream.source | | A colon delimited string representing the names of the sources based on which source bindings will be created. This is primarily to support cases where source binding may be required without providing a corresponding Supplier. (e.g., for cases where the actual source of data is outside of scope of spring-cloud-stream - HTTP -> Stream)
|
||||
|spring.cloud.stream.source | | A semi-colon delimited string representing the names of the sources based on which source bindings will be created. This is primarily to support cases where source binding may be required without providing a corresponding Supplier. (e.g., for cases where the actual source of data is outside of scope of spring-cloud-stream - HTTP -> Stream) @deprecated use {@link #outputBindings}
|
||||
|
||||
|===
|
||||
@@ -826,7 +826,7 @@ It will use that for inbound deserialization.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public Serde<Foo() customSerde{
|
||||
public Serde<Foo> customSerde() {
|
||||
...
|
||||
}
|
||||
|
||||
@@ -1261,7 +1261,7 @@ ReadOnlyKeyValueStore<Object, Object> keyValueStore =
|
||||
----
|
||||
|
||||
During the startup, the above method call to retrieve the store might fail.
|
||||
For e.g it might still be in the middle of initializing the state store.
|
||||
For example, it might still be in the middle of initializing the state store.
|
||||
In such cases, it will be useful to retry this operation.
|
||||
Kafka Streams binder provides a simple retry mechanism to accommodate this.
|
||||
|
||||
@@ -1296,6 +1296,10 @@ else {
|
||||
}
|
||||
----
|
||||
|
||||
For more information on these host finding methods, please see the Javadoc on the methods.
|
||||
For these methods also, during startup, if the underlying KafkaStreams objects are not ready, they might throw exceptions.
|
||||
The aforementioned retry properties are applicable for these methods as well.
|
||||
|
||||
==== Other API methods available through the InteractiveQueryService
|
||||
|
||||
Use the following API method to retrieve the `KeyQueryMetadata` object associated with the combination of given store and key.
|
||||
|
||||
@@ -660,41 +660,10 @@ See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/m
|
||||
===== Example: Pausing and Resuming the Consumer
|
||||
|
||||
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
|
||||
This is facilitated by adding the `Consumer` as a parameter to your `@StreamListener`.
|
||||
To resume, you need an `ApplicationListener` for `ListenerContainerIdleEvent` instances.
|
||||
This is facilitated by managing the binding lifecycle as shown in **Binding visualization and control** in the Spring Cloud Stream documentation, using `State.PAUSED` and `State.RESUMED`.
|
||||
|
||||
To resume, you can use an `ApplicationListener` (or `@EventListener` method) to receive `ListenerContainerIdleEvent` instances.
|
||||
The frequency at which events are published is controlled by the `idleEventInterval` property.
|
||||
Since the consumer is not thread-safe, you must call these methods on the calling thread.
|
||||
|
||||
The following simple application shows how to pause and resume:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@SpringBootApplication
|
||||
@EnableBinding(Sink.class)
|
||||
public class Application {
|
||||
|
||||
public static void main(String[] args) {
|
||||
SpringApplication.run(Application.class, args);
|
||||
}
|
||||
|
||||
@StreamListener(Sink.INPUT)
|
||||
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
|
||||
System.out.println(in);
|
||||
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
|
||||
return event -> {
|
||||
System.out.println(event);
|
||||
if (event.getConsumer().paused().size() > 0) {
|
||||
event.getConsumer().resume(event.getConsumer().paused());
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
[[kafka-transactional-binder]]
|
||||
=== Transactional Binder
|
||||
@@ -971,3 +940,27 @@ public AdminClientConfigCustomizer adminClientConfigCustomizer() {
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
[[custom-kafka-binder-health-indicator]]
|
||||
=== Custom Kafka Binder Health Indicator
|
||||
|
||||
Kafka binder activates a default health indicator when Spring Boot actuator is on the classpath.
|
||||
This health indicator checks the health of the binder and any communication issues with the Kafka broker.
|
||||
If an application wants to disable this default health check implementation and include a custom implementation, then it can provide an implementation for `KafkaBinderHealth` interface.
|
||||
`KafkaBinderHealth` is a marker interface that extends from `HealthIndicator`.
|
||||
In the custom implementation, it must provide an implementation for the `health()` method.
|
||||
The custom implementation must be present in the application configuration as a bean.
|
||||
When the binder discovers the custom implementation, it will use that instead of the default implementation.
|
||||
Here is an example of such a custom implementation bean in the application.
|
||||
|
||||
```
|
||||
@Bean
|
||||
public KafkaBinderHealth kafkaBinderHealthIndicator() {
|
||||
return new KafkaBinderHealth() {
|
||||
@Override
|
||||
public Health health() {
|
||||
// custom implementation details.
|
||||
}
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
10
pom.xml
10
pom.xml
@@ -2,12 +2,12 @@
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<version>3.2.2</version>
|
||||
<packaging>pom</packaging>
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-build</artifactId>
|
||||
<version>3.1.0</version>
|
||||
<version>3.1.1</version>
|
||||
<relativePath />
|
||||
</parent>
|
||||
<scm>
|
||||
@@ -21,10 +21,10 @@
|
||||
</scm>
|
||||
<properties>
|
||||
<java.version>1.8</java.version>
|
||||
<spring-kafka.version>2.8.0</spring-kafka.version>
|
||||
<spring-integration-kafka.version>5.5.5</spring-integration-kafka.version>
|
||||
<spring-kafka.version>2.8.2</spring-kafka.version>
|
||||
<spring-integration-kafka.version>5.5.8</spring-integration-kafka.version>
|
||||
<kafka.version>3.0.0</kafka.version>
|
||||
<spring-cloud-stream.version>3.2.1</spring-cloud-stream.version>
|
||||
<spring-cloud-stream.version>3.2.2</spring-cloud-stream.version>
|
||||
<maven-checkstyle-plugin.failsOnError>true</maven-checkstyle-plugin.failsOnError>
|
||||
<maven-checkstyle-plugin.failsOnViolation>true</maven-checkstyle-plugin.failsOnViolation>
|
||||
<maven-checkstyle-plugin.includeTestSourceDirectory>true</maven-checkstyle-plugin.includeTestSourceDirectory>
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<version>3.2.2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
|
||||
<description>Spring Cloud Starter Stream Kafka</description>
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<version>3.2.2</version>
|
||||
</parent>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-core</artifactId>
|
||||
<description>Spring Cloud Stream Kafka Binder Core</description>
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<version>3.2.2</version>
|
||||
</parent>
|
||||
|
||||
<properties>
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
* Copyright 2019-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -19,7 +19,9 @@ package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@@ -40,9 +42,10 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.streams.kstream.Materialized;
|
||||
import org.apache.kafka.streams.processor.Processor;
|
||||
import org.apache.kafka.streams.processor.ProcessorContext;
|
||||
import org.apache.kafka.streams.processor.TimestampExtractor;
|
||||
import org.apache.kafka.streams.processor.api.Processor;
|
||||
import org.apache.kafka.streams.processor.api.Record;
|
||||
import org.apache.kafka.streams.processor.api.RecordMetadata;
|
||||
import org.apache.kafka.streams.state.KeyValueStore;
|
||||
import org.apache.kafka.streams.state.StoreBuilder;
|
||||
|
||||
@@ -449,12 +452,15 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
|
||||
//See this issue for more context: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1003
|
||||
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
|
||||
AtomicBoolean matched = new AtomicBoolean();
|
||||
AtomicReference<String> topicObject = new AtomicReference<>();
|
||||
AtomicReference<Headers> headersObject = new AtomicReference<>();
|
||||
// Processor to retrieve the header value.
|
||||
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched));
|
||||
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched, topicObject, headersObject));
|
||||
// Branching based on event type match.
|
||||
final KStream<?, ?>[] branch = stream.branch((key, value) -> matched.getAndSet(false));
|
||||
// Deserialize if we have a branch from above.
|
||||
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(null, ((Bytes) value).get()));
|
||||
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(
|
||||
topicObject.get(), headersObject.get(), ((Bytes) value).get()));
|
||||
return getkStream(bindingProperties, deserializedKStream, nativeDecoding);
|
||||
}
|
||||
return getkStream(bindingProperties, stream, nativeDecoding);
|
||||
@@ -549,14 +555,18 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
|
||||
consumed);
|
||||
if (StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
|
||||
AtomicBoolean matched = new AtomicBoolean();
|
||||
AtomicReference<String> topicObject = new AtomicReference<>();
|
||||
AtomicReference<Headers> headersObject = new AtomicReference<>();
|
||||
|
||||
final KStream<?, ?> stream = kTable.toStream();
|
||||
|
||||
// Processor to retrieve the header value.
|
||||
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched));
|
||||
stream.process(() -> eventTypeProcessor(kafkaStreamsConsumerProperties, matched, topicObject, headersObject));
|
||||
// Branching based on event type match.
|
||||
final KStream<?, ?>[] branch = stream.branch((key, value) -> matched.getAndSet(false));
|
||||
// Deserialize if we have a branch from above.
|
||||
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(null, ((Bytes) value).get()));
|
||||
final KStream<?, Object> deserializedKStream = branch[0].mapValues(value -> valueSerde.deserializer().deserialize(
|
||||
topicObject.get(), headersObject.get(), ((Bytes) value).get()));
|
||||
|
||||
return deserializedKStream.toTable();
|
||||
}
|
||||
@@ -581,19 +591,27 @@ public abstract class AbstractKafkaStreamsBinderProcessor implements Application
|
||||
return consumed;
|
||||
}
|
||||
|
||||
private <K, V> Processor<K, V> eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, AtomicBoolean matched) {
|
||||
return new Processor() {
|
||||
private <K, V> Processor<K, V, Void, Void> eventTypeProcessor(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties,
|
||||
AtomicBoolean matched, AtomicReference<String> topicObject, AtomicReference<Headers> headersObject) {
|
||||
return new Processor<K, V, Void, Void>() {
|
||||
|
||||
ProcessorContext context;
|
||||
org.apache.kafka.streams.processor.api.ProcessorContext<?, ?> context;
|
||||
|
||||
@Override
|
||||
public void init(ProcessorContext context) {
|
||||
public void init(org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> context) {
|
||||
Processor.super.init(context);
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void process(Object key, Object value) {
|
||||
final Headers headers = this.context.headers();
|
||||
public void process(Record<K, V> record) {
|
||||
final Headers headers = record.headers();
|
||||
headersObject.set(headers);
|
||||
final Optional<RecordMetadata> optional = this.context.recordMetadata();
|
||||
if (optional.isPresent()) {
|
||||
final RecordMetadata recordMetadata = optional.get();
|
||||
topicObject.set(recordMetadata.topic());
|
||||
}
|
||||
final Iterable<Header> eventTypeHeader = headers.headers(kafkaStreamsConsumerProperties.getEventTypeHeaderKey());
|
||||
if (eventTypeHeader != null && eventTypeHeader.iterator().hasNext()) {
|
||||
String eventTypeFromHeader = new String(eventTypeHeader.iterator().next().value());
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
* Copyright 2018-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -84,15 +84,7 @@ public class InteractiveQueryService {
|
||||
*/
|
||||
public <T> T getQueryableStore(String storeName, QueryableStoreType<T> storeType) {
|
||||
|
||||
RetryTemplate retryTemplate = new RetryTemplate();
|
||||
|
||||
KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
|
||||
RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());
|
||||
|
||||
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||
retryTemplate.setRetryPolicy(retryPolicy);
|
||||
final RetryTemplate retryTemplate = getRetryTemplate();
|
||||
|
||||
KafkaStreams contextSpecificKafkaStreams = getThreadContextSpecificKafkaStreams();
|
||||
|
||||
@@ -191,7 +183,7 @@ public class InteractiveQueryService {
|
||||
* through all the consumer instances under the same application id and retrieves the
|
||||
* proper host.
|
||||
*
|
||||
* Note that the end user applications must provide `applicaiton.server` as a
|
||||
* Note that the end user applications must provide `application.server` as a
|
||||
* configuration property for all the application instances when calling this method.
|
||||
* If this is not available, then null maybe returned.
|
||||
* @param <K> generic type for key
|
||||
@@ -201,11 +193,40 @@ public class InteractiveQueryService {
|
||||
* @return the {@link HostInfo} where the key for the provided store is hosted currently
|
||||
*/
|
||||
public <K> HostInfo getHostInfo(String store, K key, Serializer<K> serializer) {
|
||||
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
|
||||
.stream()
|
||||
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
|
||||
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
|
||||
return keyQueryMetadata != null ? keyQueryMetadata.getActiveHost() : null;
|
||||
final RetryTemplate retryTemplate = getRetryTemplate();
|
||||
|
||||
|
||||
return retryTemplate.execute(context -> {
|
||||
Throwable throwable = null;
|
||||
try {
|
||||
final KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams()
|
||||
.stream()
|
||||
.map((k) -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer)))
|
||||
.filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
|
||||
if (keyQueryMetadata != null) {
|
||||
return keyQueryMetadata.activeHost();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throwable = e;
|
||||
}
|
||||
throw new IllegalStateException(
|
||||
"Error when retrieving state store", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
|
||||
});
|
||||
}
|
||||
|
||||
private RetryTemplate getRetryTemplate() {
|
||||
RetryTemplate retryTemplate = new RetryTemplate();
|
||||
|
||||
KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
|
||||
RetryPolicy retryPolicy = new SimpleRetryPolicy(stateStoreRetry.getMaxAttempts());
|
||||
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
|
||||
backOffPolicy.setBackOffPeriod(stateStoreRetry.getBackoffPeriod());
|
||||
|
||||
retryTemplate.setBackOffPolicy(backOffPolicy);
|
||||
retryTemplate.setRetryPolicy(retryPolicy);
|
||||
|
||||
return retryTemplate;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2019 the original author or authors.
|
||||
* Copyright 2018-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -18,6 +18,7 @@ package org.springframework.cloud.stream.binder.kafka.streams;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
@@ -69,6 +70,7 @@ import org.springframework.util.StringUtils;
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @author Lei Chen
|
||||
* @author Eduard Domínguez
|
||||
*/
|
||||
public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
|
||||
@@ -96,14 +98,14 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
KafkaStreamsConsumerProperties extendedConsumerProperties) {
|
||||
String keySerdeString = extendedConsumerProperties.getKeySerde();
|
||||
|
||||
return getKeySerde(keySerdeString);
|
||||
return getKeySerde(keySerdeString, extendedConsumerProperties.getConfiguration());
|
||||
}
|
||||
|
||||
public Serde<?> getInboundKeySerde(
|
||||
KafkaStreamsConsumerProperties extendedConsumerProperties, ResolvableType resolvableType) {
|
||||
String keySerdeString = extendedConsumerProperties.getKeySerde();
|
||||
|
||||
return getKeySerde(keySerdeString, resolvableType);
|
||||
return getKeySerde(keySerdeString, resolvableType, extendedConsumerProperties.getConfiguration());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -120,7 +122,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
String valueSerdeString = extendedConsumerProperties.getValueSerde();
|
||||
try {
|
||||
if (consumerProperties != null && consumerProperties.isUseNativeDecoding()) {
|
||||
valueSerde = getValueSerde(valueSerdeString);
|
||||
valueSerde = getValueSerde(valueSerdeString, extendedConsumerProperties.getConfiguration());
|
||||
}
|
||||
else {
|
||||
valueSerde = Serdes.ByteArray();
|
||||
@@ -140,7 +142,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
String valueSerdeString = extendedConsumerProperties.getValueSerde();
|
||||
try {
|
||||
if (consumerProperties != null && consumerProperties.isUseNativeDecoding()) {
|
||||
valueSerde = getValueSerde(valueSerdeString, resolvableType);
|
||||
valueSerde = getValueSerde(valueSerdeString, resolvableType, extendedConsumerProperties.getConfiguration());
|
||||
}
|
||||
else {
|
||||
valueSerde = Serdes.ByteArray();
|
||||
@@ -158,11 +160,11 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
* @return configurd {@link Serde} for the outbound key.
|
||||
*/
|
||||
public Serde<?> getOuboundKeySerde(KafkaStreamsProducerProperties properties) {
|
||||
return getKeySerde(properties.getKeySerde());
|
||||
return getKeySerde(properties.getKeySerde(), properties.getConfiguration());
|
||||
}
|
||||
|
||||
public Serde<?> getOuboundKeySerde(KafkaStreamsProducerProperties properties, ResolvableType resolvableType) {
|
||||
return getKeySerde(properties.getKeySerde(), resolvableType);
|
||||
return getKeySerde(properties.getKeySerde(), resolvableType, properties.getConfiguration());
|
||||
}
|
||||
|
||||
|
||||
@@ -179,7 +181,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
try {
|
||||
if (producerProperties.isUseNativeEncoding()) {
|
||||
valueSerde = getValueSerde(
|
||||
kafkaStreamsProducerProperties.getValueSerde());
|
||||
kafkaStreamsProducerProperties.getValueSerde(), kafkaStreamsProducerProperties.getConfiguration());
|
||||
}
|
||||
else {
|
||||
valueSerde = Serdes.ByteArray();
|
||||
@@ -197,7 +199,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
try {
|
||||
if (producerProperties.isUseNativeEncoding()) {
|
||||
valueSerde = getValueSerde(
|
||||
kafkaStreamsProducerProperties.getValueSerde(), resolvableType);
|
||||
kafkaStreamsProducerProperties.getValueSerde(), resolvableType, kafkaStreamsProducerProperties.getConfiguration());
|
||||
}
|
||||
else {
|
||||
valueSerde = Serdes.ByteArray();
|
||||
@@ -215,7 +217,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
* @return {@link Serde} for the state store key.
|
||||
*/
|
||||
public Serde<?> getStateStoreKeySerde(String keySerdeString) {
|
||||
return getKeySerde(keySerdeString);
|
||||
return getKeySerde(keySerdeString, (Map<String, ?>) null);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -225,14 +227,14 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
*/
|
||||
public Serde<?> getStateStoreValueSerde(String valueSerdeString) {
|
||||
try {
|
||||
return getValueSerde(valueSerdeString);
|
||||
return getValueSerde(valueSerdeString, (Map<String, ?>) null);
|
||||
}
|
||||
catch (ClassNotFoundException ex) {
|
||||
throw new IllegalStateException("Serde class not found: ", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private Serde<?> getKeySerde(String keySerdeString) {
|
||||
private Serde<?> getKeySerde(String keySerdeString, Map<String, ?> extendedConfiguration) {
|
||||
Serde<?> keySerde;
|
||||
try {
|
||||
if (StringUtils.hasText(keySerdeString)) {
|
||||
@@ -241,8 +243,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
else {
|
||||
keySerde = getFallbackSerde("default.key.serde");
|
||||
}
|
||||
keySerde.configure(this.streamConfigGlobalProperties, true);
|
||||
|
||||
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), true);
|
||||
}
|
||||
catch (ClassNotFoundException ex) {
|
||||
throw new IllegalStateException("Serde class not found: ", ex);
|
||||
@@ -250,7 +251,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
return keySerde;
|
||||
}
|
||||
|
||||
private Serde<?> getKeySerde(String keySerdeString, ResolvableType resolvableType) {
|
||||
private Serde<?> getKeySerde(String keySerdeString, ResolvableType resolvableType, Map<String, ?> extendedConfiguration) {
|
||||
Serde<?> keySerde = null;
|
||||
try {
|
||||
if (StringUtils.hasText(keySerdeString)) {
|
||||
@@ -267,7 +268,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
keySerde = Serdes.ByteArray();
|
||||
}
|
||||
}
|
||||
keySerde.configure(this.streamConfigGlobalProperties, true);
|
||||
keySerde.configure(combineStreamConfigProperties(extendedConfiguration), true);
|
||||
}
|
||||
catch (ClassNotFoundException ex) {
|
||||
throw new IllegalStateException("Serde class not found: ", ex);
|
||||
@@ -380,7 +381,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
}
|
||||
|
||||
|
||||
private Serde<?> getValueSerde(String valueSerdeString)
|
||||
private Serde<?> getValueSerde(String valueSerdeString, Map<String, ?> extendedConfiguration)
|
||||
throws ClassNotFoundException {
|
||||
Serde<?> valueSerde;
|
||||
if (StringUtils.hasText(valueSerdeString)) {
|
||||
@@ -389,7 +390,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
else {
|
||||
valueSerde = getFallbackSerde("default.value.serde");
|
||||
}
|
||||
valueSerde.configure(this.streamConfigGlobalProperties, false);
|
||||
valueSerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
|
||||
return valueSerde;
|
||||
}
|
||||
|
||||
@@ -403,7 +404,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private Serde<?> getValueSerde(String valueSerdeString, ResolvableType resolvableType)
|
||||
private Serde<?> getValueSerde(String valueSerdeString, ResolvableType resolvableType, Map<String, ?> extendedConfiguration)
|
||||
throws ClassNotFoundException {
|
||||
Serde<?> valueSerde = null;
|
||||
if (StringUtils.hasText(valueSerdeString)) {
|
||||
@@ -422,7 +423,7 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
valueSerde = Serdes.ByteArray();
|
||||
}
|
||||
}
|
||||
valueSerde.configure(streamConfigGlobalProperties, false);
|
||||
valueSerde.configure(combineStreamConfigProperties(extendedConfiguration), false);
|
||||
return valueSerde;
|
||||
}
|
||||
|
||||
@@ -430,4 +431,15 @@ public class KeyValueSerdeResolver implements ApplicationContextAware {
|
||||
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
context = (ConfigurableApplicationContext) applicationContext;
|
||||
}
|
||||
|
||||
private Map<String, ?> combineStreamConfigProperties(Map<String, ?> extendedConfiguration) {
|
||||
if (extendedConfiguration != null && !extendedConfiguration.isEmpty()) {
|
||||
Map<String, Object> streamConfiguration = new HashMap(this.streamConfigGlobalProperties);
|
||||
streamConfiguration.putAll(extendedConfiguration);
|
||||
return streamConfiguration;
|
||||
}
|
||||
else {
|
||||
return this.streamConfigGlobalProperties;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2017-2021 the original author or authors.
|
||||
* Copyright 2017-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
import org.apache.kafka.common.serialization.IntegerSerializer;
|
||||
import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.streams.KafkaStreams;
|
||||
import org.apache.kafka.streams.KeyQueryMetadata;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
@@ -125,11 +126,39 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
|
||||
catch (Exception ignored) {
|
||||
|
||||
}
|
||||
|
||||
Mockito.verify(mockKafkaStreams, times(3))
|
||||
.store(StoreQueryParameters.fromNameAndType("foo", storeType));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStateStoreRetrievalRetryForHostInfoService() {
|
||||
StreamsBuilderFactoryBean mock = Mockito.mock(StreamsBuilderFactoryBean.class);
|
||||
KafkaStreams mockKafkaStreams = Mockito.mock(KafkaStreams.class);
|
||||
Mockito.when(mock.getKafkaStreams()).thenReturn(mockKafkaStreams);
|
||||
KafkaStreamsRegistry kafkaStreamsRegistry = new KafkaStreamsRegistry();
|
||||
kafkaStreamsRegistry.registerKafkaStreams(mock);
|
||||
Mockito.when(mock.isRunning()).thenReturn(true);
|
||||
Properties mockProperties = new Properties();
|
||||
mockProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, "foobarApp-123");
|
||||
Mockito.when(mock.getStreamsConfiguration()).thenReturn(mockProperties);
|
||||
KafkaStreamsBinderConfigurationProperties binderConfigurationProperties =
|
||||
new KafkaStreamsBinderConfigurationProperties(new KafkaProperties());
|
||||
binderConfigurationProperties.getStateStoreRetry().setMaxAttempts(3);
|
||||
InteractiveQueryService interactiveQueryService = new InteractiveQueryService(kafkaStreamsRegistry,
|
||||
binderConfigurationProperties);
|
||||
|
||||
QueryableStoreType<ReadOnlyKeyValueStore<Object, Object>> storeType = QueryableStoreTypes.keyValueStore();
|
||||
final StringSerializer serializer = new StringSerializer();
|
||||
try {
|
||||
interactiveQueryService.getHostInfo("foo", "fooKey", serializer);
|
||||
}
|
||||
catch (Exception ignored) {
|
||||
|
||||
}
|
||||
Mockito.verify(mockKafkaStreams, times(3))
|
||||
.queryMetadataForKey("foo", "fooKey", serializer);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore
|
||||
public void testKstreamBinderWithPojoInputAndStringOuput() throws Exception {
|
||||
|
||||
@@ -20,6 +20,9 @@ import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import com.fasterxml.jackson.databind.JavaType;
|
||||
import com.fasterxml.jackson.databind.type.TypeFactory;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.security.JaasUtils;
|
||||
import org.apache.kafka.streams.kstream.GlobalKTable;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
@@ -28,18 +31,22 @@ import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolver;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
|
||||
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
* @author Eduard Domínguez
|
||||
*/
|
||||
public class KafkaStreamsBinderBootstrapTest {
|
||||
|
||||
@@ -111,7 +118,7 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id"
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=com.test.MyClass",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.configuration.spring.json.value.type.method=" + this.getClass().getName() + ".determineType",
|
||||
"--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id"
|
||||
+ "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar",
|
||||
"--spring.cloud.stream.kafka.streams.binder.brokers="
|
||||
@@ -134,10 +141,28 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
final StreamsBuilderFactoryBean input3SBFB = applicationContext.getBean("&stream-builder-input3", StreamsBuilderFactoryBean.class);
|
||||
final Properties streamsConfiguration3 = input3SBFB.getStreamsConfiguration();
|
||||
assertThat(streamsConfiguration3.containsKey("spring.json.value.type.method")).isFalse();
|
||||
applicationContext.getBean(KeyValueSerdeResolver.class);
|
||||
|
||||
String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
|
||||
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2");
|
||||
|
||||
assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver);
|
||||
|
||||
String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
|
||||
.getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName"));
|
||||
assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName);
|
||||
|
||||
String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
|
||||
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName"));
|
||||
assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName);
|
||||
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
public static JavaType determineType(byte[] data, Headers headers) {
|
||||
return TypeFactory.defaultInstance().constructParametricType(Map.class, String.class, String.class);
|
||||
}
|
||||
|
||||
@SpringBootApplication
|
||||
static class SimpleKafkaStreamsApplication {
|
||||
|
||||
@@ -149,7 +174,7 @@ public class KafkaStreamsBinderBootstrapTest {
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Consumer<KTable<Object, String>> input2() {
|
||||
public Consumer<KTable<Map<String, String>, Map<String, String>>> input2() {
|
||||
return s -> {
|
||||
// No-op consumer
|
||||
};
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
|
||||
<version>3.2.1</version>
|
||||
<version>3.2.2</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
/*
|
||||
* Copyright 2022-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka;
|
||||
|
||||
import org.springframework.boot.actuate.health.HealthIndicator;
|
||||
|
||||
/**
|
||||
* Marker interface used for custom KafkaBinderHealth indicator implementations.
|
||||
*
|
||||
* @author Soby Chacko
|
||||
* @since 3.2.2
|
||||
*/
|
||||
public interface KafkaBinderHealth extends HealthIndicator {
|
||||
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2016-2021 the original author or authors.
|
||||
* Copyright 2016-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -35,7 +35,6 @@ import org.apache.kafka.common.PartitionInfo;
|
||||
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.actuate.health.HealthIndicator;
|
||||
import org.springframework.boot.actuate.health.Status;
|
||||
import org.springframework.boot.actuate.health.StatusAggregator;
|
||||
import org.springframework.kafka.core.ConsumerFactory;
|
||||
@@ -55,7 +54,7 @@ import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
||||
* @author Chukwubuikem Ume-Ugwa
|
||||
* @author Taras Danylchuk
|
||||
*/
|
||||
public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBean {
|
||||
public class KafkaBinderHealthIndicator implements KafkaBinderHealth, DisposableBean {
|
||||
|
||||
private static final int DEFAULT_TIMEOUT = 60;
|
||||
|
||||
@@ -73,7 +72,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe
|
||||
private boolean considerDownWhenAnyPartitionHasNoLeader;
|
||||
|
||||
public KafkaBinderHealthIndicator(KafkaMessageChannelBinder binder,
|
||||
ConsumerFactory<?, ?> consumerFactory) {
|
||||
ConsumerFactory<?, ?> consumerFactory) {
|
||||
this.binder = binder;
|
||||
this.consumerFactory = consumerFactory;
|
||||
}
|
||||
@@ -219,7 +218,7 @@ public class KafkaBinderHealthIndicator implements HealthIndicator, DisposableBe
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
public void destroy() {
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2018-2019 the original author or authors.
|
||||
* Copyright 2018-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -24,6 +24,8 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||
|
||||
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
|
||||
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
|
||||
@@ -38,11 +40,13 @@ import org.springframework.util.ObjectUtils;
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Chukwubuikem Ume-Ugwa
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
|
||||
@Configuration
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
|
||||
@ConditionalOnEnabledHealthIndicator("binders")
|
||||
@ConditionalOnMissingBean(KafkaBinderHealth.class)
|
||||
public class KafkaBinderHealthIndicatorConfiguration {
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Copyright 2022-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.binder.kafka.bootstrap;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.actuate.health.Health;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealth;
|
||||
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
|
||||
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
|
||||
|
||||
/**
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class KafkaBinderCustomHealthCheckTests {
|
||||
|
||||
@ClassRule
|
||||
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, 10);
|
||||
|
||||
@Test
|
||||
public void testCustomHealthIndicatorIsActivated() {
|
||||
ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder(
|
||||
CustomHealthCheckApplication.class).web(WebApplicationType.NONE).run(
|
||||
"--spring.cloud.stream.kafka.binder.brokers="
|
||||
+ embeddedKafka.getEmbeddedKafka().getBrokersAsString());
|
||||
final KafkaBinderHealth kafkaBinderHealth = applicationContext.getBean(KafkaBinderHealth.class);
|
||||
assertThat(kafkaBinderHealth).isInstanceOf(CustomHealthIndicator.class);
|
||||
assertThatThrownBy(() -> applicationContext.getBean(KafkaBinderHealthIndicator.class)).isInstanceOf(NoSuchBeanDefinitionException.class);
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@SpringBootApplication
|
||||
static class CustomHealthCheckApplication {
|
||||
|
||||
@Bean
|
||||
public CustomHealthIndicator kafkaBinderHealthIndicator() {
|
||||
return new CustomHealthIndicator();
|
||||
}
|
||||
}
|
||||
|
||||
static class CustomHealthIndicator implements KafkaBinderHealth {
|
||||
|
||||
@Override
|
||||
public Health health() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user