Testing topic provisioning properties for KStream

Resolves #684
This commit is contained in:
Soby Chacko
2019-08-23 18:53:32 -04:00
parent 8145ab19fb
commit 16bb3e2f62

View File

@@ -54,6 +54,7 @@ import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.annotations.KafkaStreamsProcessor;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsProducerProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.CleanupConfig;
@@ -111,6 +112,7 @@ public class StreamToTableJoinIntegrationTests {
"--spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId"
+ "=StreamToTableJoinIntegrationTests-abc",
"--spring.cloud.stream.kafka.streams.bindings.input-x.consumer.topic.properties.cleanup.policy=compact",
"--spring.cloud.stream.kafka.streams.bindings.output.producer.topic.properties.cleanup.policy=compact",
"--spring.cloud.stream.kafka.streams.binder.brokers="
+ embeddedKafka.getBrokersAsString());
try {
@@ -128,6 +130,16 @@ public class StreamToTableJoinIntegrationTests {
assertThat(cleanupPolicyX).isEqualTo("compact");
Binder<KStream, ? extends ConsumerProperties, ? extends ProducerProperties> kStreamBinder = binderFactory
.getBinder("kstream", KStream.class);
KafkaStreamsProducerProperties producerProperties = (KafkaStreamsProducerProperties) ((ExtendedPropertiesBinder) kStreamBinder)
.getExtendedProducerProperties("output");
String cleanupPolicyOutput = producerProperties.getTopic().getProperties().get("cleanup.policy");
assertThat(cleanupPolicyOutput).isEqualTo("compact");
// Input 1: Region per user (multiple records allowed per user).
List<KeyValue<String, String>> userRegions = Arrays.asList(new KeyValue<>(
"alice", "asia"), /* Alice lived in Asia originally... */