Restore Kafka Streams error state behavior in the binder, equivalent
to Kafka Streams prior to 2.8. Starting with 2.8, users can customize
the way uncaught errors are interpreted via an UncaughtExceptionHandler
in the applicaiton. Binder now sets a default handler that shuts down
the Kafka Streams client if thre is an uncaught error.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1110
At the moment, Kafka Streams binder only allows KStream bindings on the outbound.
There is a delegation mechanism in which we stil can use KStream for output binding
while allowing the applications to provide a KTable type as the function return type.
Update docs.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1085Resolves#1105
* Adding a delay in testDlqWithNativeSerializationEnabledOnDlqProducer to avoid a race condition.
Awaitility is used to wait for the proper condition in this test.
* In the MicroMeter registry tests, properly wait for the first message to arrive on the outbound
so that the producer factory gets a chance to add the MicroMeter producer listener completely
before the test assertions start.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1103
Composed function defintions can start with a `java.util.function.Function`
or `java.util.function.BiFunction` and compose with other functions or consumers.
In the case of a consumer, this needs to be the last unit in the function definition.
`java.util.function.BiConumer` is not eligibe for function composition.
The first component in the definition can be a curried function as well.
Adding tests and docs.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1088Resolves#1091
Before running the JAAS initilizer security tests, remove reference to
JAAS related config files created previously in other tests. This is done
through clearing a system property (java.security.auth.login.config) in
the tests. Without clearing this property, these tests run into a race condition.
Fixes https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1094
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1081
The wrong consequence from the absense of scheduler.shutdown():
1) First of all we created the pool with 1 thread.
2) After we lost the reference on it and created the pool with 2 threads.
3) But, the first pool is not yet collected by GC and now you have 3 threads together. And so on.
Each thread does nothing, but it takes system memory and takes part in the scheduling process.
After 30 topics, for example, we potentially have (30+1)*15=465 threads.
It is already serious additional load on the switching contexts and the native memory.
Removed waiting after scheduler stop request.
checkstyle fixes
Going forward, binder will use the default ack on error settings
in Spring Kafka, which is true by default. If the applicaitons
need to change this behavior, then a custom error handler must
be provided through ListenerContainerCustomizer.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1079
This comnmit effectively removes changes introduced by a4ad9e2c0b.
Removing spring.factories mechanism of registering binders in preference to using spring.binders.
This commit effectively reverts 4e6881830a
Remove spring.factories changes introduced in the commit mentiond above.
Revert the binder to use spring.binders mechanism to register binder configurations.
When destination-is-pattern property is enabled and native deserialization
is used, then Kafka Streams binder does not use the correct Serde types.
Fixing this issue by providing the proper Serdes to the StreamsBuilder.
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1051
When functional beans are discovered from libraries in the classpath,
it causes issues when Kafka Streams functions are scanned and bootstrapped.
Binder expects the users to provide function definition property although the
application does not directly include these functional beans or is aware of it.
Fixing this issue by excluding non kafka streams function from scanning.
Null check around adding micrometer listener to the StreamsBuilder.
Addressing issues raised by the comments here:
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1030#issuecomment-804039087
Resolves https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1046
When using ack mode `MANUAL_IMMEDIATE`, we must acknowledge the delivery on
the container thread, otherwise the commit will be queued for later processing,
possibly causing out of order commits, incorrectly reducing the committed offset.
Wait for the send to complete before acknowledging; use a timeout slightly larger
than the configured producer delivery timeout to avoid premature timeouts.
Tested with user-provided test case.
Fix missing timeout buffer.
More context for this issue: https://gitter.im/spring-cloud/spring-cloud-stream?at=6050f43595e23446e43cacd1