Compare commits

..

44 Commits
4.x ... main

Author SHA1 Message Date
Soby Chacko
31ea106834 Update issue templates 2022-05-11 11:57:10 -04:00
Soby Chacko
02b30bf430 Update issue templates 2022-05-11 11:55:12 -04:00
Soby Chacko
9aac07957c Update issue templates 2022-05-11 11:53:09 -04:00
Soby Chacko
63a7da1b9e Update issue templates 2022-05-11 11:51:30 -04:00
Soby Chacko
2f986495fa Update README.adoc 2022-03-30 11:57:09 -04:00
Soby Chacko
382a3c1c81 Update README.adoc 2022-03-30 11:54:30 -04:00
Soby Chacko
3045019398 Version downgrades
Temporily downgrade SK to 3.0.0-M1 and Kafka client to 3.0.0
2022-02-22 17:47:26 -05:00
Soby Chacko
e848ec8051 Remove call to deprecated method in Spring Kafka 2022-02-15 11:42:46 -05:00
Gary Russell
c472b185be GH-1195: Fix Pause/Resume Documentation
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1195

Remove obsolete documentation.

**cherry-pick to 3.2.x**
2022-02-07 10:49:19 -05:00
Soby Chacko
235146e29a Revert "Revert "Update documented version of kafka-clients""
This reverts commit 65960e101f.
2022-01-27 10:53:54 -05:00
Soby Chacko
8eecf03827 Revert "Revert "Update Kafka client version to 3.1.0""
This reverts commit 01dbb49313.
2022-01-27 10:53:27 -05:00
Soby Chacko
3f1f7dbbe8 Revert "Revert "Fixed invalid java code snippet""
This reverts commit d2c12873d0.
2022-01-27 10:52:22 -05:00
buildmaster
45245f4b92 Going back to snapshots 2022-01-27 15:39:18 +00:00
buildmaster
9922922036 Update SNAPSHOT to 4.0.0-M1 2022-01-27 15:37:22 +00:00
Oleg Zhurakousky
c3b6610c9f Update SK and SIK version 2022-01-27 16:24:05 +01:00
Oleg Zhurakousky
d2c12873d0 Revert "Fixed invalid java code snippet"
This reverts commit 4cbcb4049b.
2022-01-27 16:16:26 +01:00
Oleg Zhurakousky
01dbb49313 Revert "Update Kafka client version to 3.1.0"
This reverts commit a25e2ea0b3.
2022-01-27 16:16:10 +01:00
Oleg Zhurakousky
65960e101f Revert "Update documented version of kafka-clients"
This reverts commit 69e377e13b.
2022-01-27 16:16:02 +01:00
Jay Lindquist
69e377e13b Update documented version of kafka-clients
It looks like this has been incorrect for a few versions. The main branch is currently pulling in 3.1.0

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/blob/main/pom.xml#L26
2022-01-26 11:53:37 -05:00
Soby Chacko
a25e2ea0b3 Update Kafka client version to 3.1.0
Test changes
2022-01-25 17:18:04 -05:00
Rex Ijiekhuamen
4cbcb4049b Fixed invalid java code snippet 2022-01-24 10:22:32 -05:00
Soby Chacko
310683987a Temporarily disabling a test 2022-01-19 12:46:23 -05:00
Soby Chacko
cd02be57e5 Test package changes 2022-01-18 15:42:47 -05:00
Soby Chacko
ee888a15ba Fixing test 2022-01-18 12:59:47 -05:00
Soby Chacko
417665773c Merge branch '4.x' into main 2022-01-18 11:50:45 -05:00
Soby Chacko
577ffbb67f Enable custom binder health check impelementation
Currently, KafkaBinderHealthIndicator is not customizable and included by default
when Spring Boot actuator is on the classpath. Fix this by allowing the application
to provide a custom implementation. A new marker interface called KafkaBinderHealth
can be used by the applicaiton to provide a custom HealthIndicator implementation, in
which case, the binder's default implementation will be excluded.

Tests and docs changes.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1180
2022-01-13 09:56:32 -05:00
Soby Chacko
3770db7844 Retries for HostInfo in InteractiveQueryService
InteractiveQueryService methods for finding the host info for Kafka Streams
currently throw exceptions if the underlying KafkaStreams are not ready yet.
Introduce a retry mechanism so that the users can control the behaviour of
these methods by providing the following properties.

spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts (default 1)
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backoffPeriod (default 1000 ms).

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1185
2022-01-11 18:49:23 -05:00
Eduard Domínguez
406e20f19c Fix: KeySerde setup not using expected key type headers
checkstyle fixes
2022-01-11 14:37:20 -05:00
Soby Chacko
648188fc6b Event type routing improvements (Kafka Streams)
When routing by event types, the deserializer omits the
topic and header information. Fixing this issue.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1186
2022-01-05 19:30:36 -05:00
Eduard Domínguez
63b306d34c GH-1176: KeyValueSerdeResolver improvements
Use extended properties when initializing Consumer and Producer Serdes.

Updated copyright years and authors.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1176
2021-12-10 14:02:29 -05:00
buildmaster
5cd8e06ec6 Bumping versions to 3.2.2-SNAPSHOT after release 2021-12-01 16:58:07 +00:00
buildmaster
79be11c9e9 Going back to snapshots 2021-12-01 16:58:07 +00:00
buildmaster
fc4358ba10 Update SNAPSHOT to 3.2.1 2021-12-01 16:55:37 +00:00
buildmaster
f3d2287b70 Bumping versions to 3.2.1-SNAPSHOT after release 2021-12-01 13:16:23 +00:00
buildmaster
220ae98bcc Going back to snapshots 2021-12-01 13:16:23 +00:00
buildmaster
bd3eebd897 Update SNAPSHOT to 3.2.0 2021-12-01 13:14:14 +00:00
Soby Chacko
ed8683dcc2 KafkaStreams binder health check improvements
Allow health checks on KafkaStreams processors that are currently stopped through
actuator bindings endpoint. Add this only as an opt-in feature through a new binder
level property - includeStoppedProcessorsForHealthCheck which is false by default
to preserve the current health indicator behavior.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1165
Resolves #1175
2021-12-01 10:51:39 +01:00
Soby Chacko
60b6604988 New tips-tricks-recipes section in docs
Migrate the recipe section in Spring Cloud Stream Samples
repository as Tips, Tricks and Receipes in Kafka binder main docs.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1173
Resolves #1174
2021-12-01 10:36:31 +01:00
Oleg Zhurakousky
a3e76282b4 Merge pull request #1172 from sobychacko/fix-partitioning-interceptor
Fix PartitioningInterceptor CCE
2021-11-29 18:12:59 +01:00
Soby Chacko
c9687189b7 Fix PartitioningInterceptor CCE
The newly added DefaultPartitioningInteceptor must be explicitly
checked in order to avoid a CCE.

Related to resolving https://github.com/spring-cloud/spring-cloud-stream/issues/2245

Specifically for this: https://github.com/spring-cloud/spring-cloud-stream/issues/2245#issuecomment-977663452
2021-11-24 13:16:00 -05:00
Soby Chacko
5fcdf28776 GH-1170: Schema registry certificates
Move classpath: resources provided as schema registry certificates
into a local file system location.

Adding test and docs.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1170
2021-11-23 18:23:55 -05:00
Soby Chacko
d334359cd4 Update spring-kafka to 2.8.0 Release 2021-11-23 10:22:18 -05:00
Pommerening, Nico
aff0dc00ef GH-1161: InteractiveQueryService improvements
This PR safe guards state store instances in case there are multiple KafkaStreams
instances present that have distinct application IDs but share State Store Names.

Change is backwards compatible: In case no KafkaStreams association of the thread
can be found, all local state stores are queried as before.

In case an associated KafkaStreams Instance is found, but required StateStore is
not found in this instance, a warning is issued but backwards compatibility is
preserved by looking up all state stores.

Store within KafkaStreams instance of thread is preferred over "foreign" store with same name.

Warning is issued if requested store is not found within KafkaStreams instance of thread.

The main benefit here is to get rid of randomly selecting stores across all KafkaStreams instances
in case a store is contained within multiple streams instances with same name.

Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1161
2021-11-18 14:08:22 -05:00
Oleg Zhurakousky
7840decc86 Changes related to GH-2245 from core 2021-11-16 14:43:43 +01:00
10 changed files with 39 additions and 55 deletions

10
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View File

@@ -0,0 +1,10 @@
---
name: Bug report
about: Create a report to help us improve
title: Please create new issues in https://github.com/spring-cloud/spring-cloud-stream/issues
labels: ''
assignees: ''
---
Please create all new issues in https://github.com/spring-cloud/spring-cloud-stream/issues. The Kafka binder repository has been relocated to the core Spring Cloud Stream repo.

View File

@@ -26,6 +26,12 @@ It contains information about its design, usage, and configuration options, as w
In addition, this guide explains the Kafka Streams binding capabilities of Spring Cloud Stream.
--
== ANNOUNCEMENT
**IMPORTANT: This repository is now migrated as part of core Spring Cloud Stream - https://github.com/spring-cloud/spring-cloud-stream.
Please create new issues over at the core repository.**
== Apache Kafka Binder
=== Usage

View File

@@ -590,7 +590,7 @@ It will use that for inbound deserialization.
```
@Bean
public Serde<Foo() customSerde{
public Serde<Foo> customSerde() {
...
}

View File

@@ -40,7 +40,7 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
The consumer group maps directly to the same Apache Kafka concept.
Partitioning also maps directly to Apache Kafka partitions as well.
The binder currently uses the Apache Kafka `kafka-clients` version `2.3.1`.
The binder currently uses the Apache Kafka `kafka-clients` version `3.1.0`.
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
For example, with versions earlier than 0.11.x.x, native headers are not supported.
Also, 0.11.x.x does not support the `autoAddPartitions` property.
@@ -658,41 +658,10 @@ See this https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/m
===== Example: Pausing and Resuming the Consumer
If you wish to suspend consumption but not cause a partition rebalance, you can pause and resume the consumer.
This is facilitated by adding the `Consumer` as a parameter to your `@StreamListener`.
To resume, you need an `ApplicationListener` for `ListenerContainerIdleEvent` instances.
This is facilitated by managing the binding lifecycle as shown in **Binding visualization and control** in the Spring Cloud Stream documentation, using `State.PAUSED` and `State.RESUMED`.
To resume, you can use an `ApplicationListener` (or `@EventListener` method) to receive `ListenerContainerIdleEvent` instances.
The frequency at which events are published is controlled by the `idleEventInterval` property.
Since the consumer is not thread-safe, you must call these methods on the calling thread.
The following simple application shows how to pause and resume:
[source, java]
----
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
----
[[kafka-transactional-binder]]
=== Transactional Binder
@@ -993,4 +962,3 @@ public KafkaBinderHealth kafkaBinderHealthIndicator() {
};
}
```

View File

@@ -21,7 +21,7 @@
</scm>
<properties>
<java.version>17</java.version>
<spring-kafka.version>3.0.0-SNAPSHOT</spring-kafka.version>
<spring-kafka.version>3.0.0-M1</spring-kafka.version>
<spring-integration-kafka.version>6.0.0-SNAPSHOT</spring-integration-kafka.version>
<kafka.version>3.0.0</kafka.version>
<spring-cloud-stream.version>4.0.0-SNAPSHOT</spring-cloud-stream.version>

View File

@@ -123,7 +123,6 @@ public class KafkaStreamsInteractiveQueryIntegrationTests {
catch (Exception ignored) {
}
Mockito.verify(mockKafkaStreams, times(3))
.store(StoreQueryParameters.fromNameAndType("foo", storeType));
}

View File

@@ -31,7 +31,6 @@ import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
@@ -39,7 +38,6 @@ import org.springframework.cloud.stream.binder.kafka.streams.KeyValueSerdeResolv
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.support.mapping.DefaultJackson2JavaTypeMapper;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
@@ -143,18 +141,19 @@ public class KafkaStreamsBinderBootstrapTest {
assertThat(streamsConfiguration3.containsKey("spring.json.value.type.method")).isFalse();
applicationContext.getBean(KeyValueSerdeResolver.class);
String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2");
assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver);
String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
.getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName"));
assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName);
String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
.getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName"));
assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName);
//TODO: In Kafka Streams 3.1, taskTopology field is removed. Re-evaluate this testing strategy.
// String configuredSerdeTypeResolver = (String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
// .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeResolver.arg$2");
//
// assertThat(this.getClass().getName() + ".determineType").isEqualTo(configuredSerdeTypeResolver);
//
// String configuredKeyDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
// .getPropertyValue("taskTopology.processorNodes[0].keyDeserializer.typeMapper.classIdFieldName"));
// assertThat(DefaultJackson2JavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredKeyDeserializerFieldName);
//
// String configuredValueDeserializerFieldName = ((String) new DirectFieldAccessor(input2SBFB.getKafkaStreams())
// .getPropertyValue("taskTopology.processorNodes[0].valDeserializer.typeMapper.classIdFieldName"));
// assertThat(DefaultJackson2JavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME).isEqualTo(configuredValueDeserializerFieldName);
applicationContext.close();
}

View File

@@ -679,7 +679,7 @@ public class KafkaMessageChannelBinder extends
concurrency = extendedConsumerProperties.getConcurrency();
}
resetOffsetsForAutoRebalance(extendedConsumerProperties, consumerFactory, containerProperties);
containerProperties.setAuthorizationExceptionRetryInterval(this.configurationProperties.getAuthorizationExceptionRetryInterval());
containerProperties.setAuthExceptionRetryInterval(this.configurationProperties.getAuthorizationExceptionRetryInterval());
@SuppressWarnings("rawtypes")
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer = new ConcurrentMessageListenerContainer(
consumerFactory, containerProperties) {

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka.integration2;
package org.springframework.cloud.stream.binder.kafka.bootstrap;
import org.junit.ClassRule;
import org.junit.Test;

View File

@@ -23,6 +23,7 @@ import java.util.function.Consumer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -79,6 +80,7 @@ public class KafkaNullConverterTest {
}
@Test
@Ignore
public void testKafkaNullConverterOutput() throws InterruptedException {
final StreamBridge streamBridge = context.getBean(StreamBridge.class);