@@ -10,7 +10,7 @@ This means the Dead-Letter topic must have at least as many partitions as the or
|
||||
To change this behavior, add a `DlqPartitionFunction` implementation as a `@Bean` to the application context.
|
||||
Only one such bean can be present.
|
||||
The function is provided with the consumer group, the failed `ConsumerRecord` and the exception.
|
||||
For example, if you always with to route to partition 0, you might use:
|
||||
For example, if you always want to route to partition 0, you might use:
|
||||
|
||||
====
|
||||
[source, java]
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -40,7 +40,7 @@ The Apache Kafka Binder implementation maps each destination to an Apache Kafka
|
||||
The consumer group maps directly to the same Apache Kafka concept.
|
||||
Partitioning also maps directly to Apache Kafka partitions as well.
|
||||
|
||||
The binder currently uses the Apache Kafka `kafka-clients` 1.0.0 jar and is designed to be used with a broker of at least that version.
|
||||
The binder currently uses the Apache Kafka `kafka-clients` version `2.3.1`.
|
||||
This client can communicate with older brokers (see the Kafka documentation), but certain features may not be available.
|
||||
For example, with versions earlier than 0.11.x.x, native headers are not supported.
|
||||
Also, 0.11.x.x does not support the `autoAddPartitions` property.
|
||||
|
||||
@@ -37,10 +37,10 @@ import org.apache.kafka.common.serialization.Serdes;
|
||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.apache.kafka.streams.KeyValue;
|
||||
import org.apache.kafka.streams.kstream.Grouped;
|
||||
import org.apache.kafka.streams.kstream.Joined;
|
||||
import org.apache.kafka.streams.kstream.KStream;
|
||||
import org.apache.kafka.streams.kstream.KTable;
|
||||
import org.apache.kafka.streams.kstream.Serialized;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
|
||||
@@ -444,7 +444,7 @@ public class StreamToTableJoinFunctionTests {
|
||||
Joined.with(Serdes.String(), Serdes.Long(), null))
|
||||
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
|
||||
regionWithClicks.getClicks()))
|
||||
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
|
||||
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
|
||||
.reduce(Long::sum)
|
||||
.toStream()));
|
||||
}
|
||||
@@ -461,7 +461,7 @@ public class StreamToTableJoinFunctionTests {
|
||||
Joined.with(Serdes.String(), Serdes.Long(), null))
|
||||
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
|
||||
regionWithClicks.getClicks()))
|
||||
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
|
||||
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
|
||||
.reduce(Long::sum)
|
||||
.toStream());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user