Refactoring kafka-streams-aggregate sample to use functions

This commit is contained in:
Soby Chacko
2019-10-25 15:23:11 -04:00
parent 594779ee77
commit f2b3aacaab
4 changed files with 192 additions and 27 deletions

View File

@@ -11,25 +11,65 @@
<description>Demo project for Spring Boot</description>
<parent>
<groupId>io.spring.cloud.stream.sample</groupId>
<artifactId>spring-cloud-stream-samples-parent</artifactId>
<version>0.0.1-SNAPSHOT</version>
<relativePath>../..</relativePath>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<spring-cloud.version>Hoxton.BUILD-SNAPSHOT</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
@@ -39,4 +79,55 @@
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>

View File

@@ -16,23 +16,23 @@
package kafka.streams.table.join;
import java.util.function.Consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@@ -47,22 +47,23 @@ public class KafkaStreamsAggregateSample {
SpringApplication.run(KafkaStreamsAggregateSample.class, args);
}
@EnableBinding(KafkaStreamsProcessorX.class)
public static class KafkaStreamsAggregateSampleApplication {
@StreamListener("input")
public void process(KStream<String, DomainEvent> input) {
@Bean
public Consumer<KStream<String, DomainEvent>> aggregate() {
ObjectMapper mapper = new ObjectMapper();
Serde<DomainEvent> domainEventSerde = new JsonSerde<>( DomainEvent.class, mapper );
input
return input -> input
.groupBy(
(s, domainEvent) -> domainEvent.boardUuid,
Serialized.with(null, domainEventSerde))
Grouped.with(null, domainEventSerde))
.aggregate(
String::new,
(s, domainEvent, board) -> board.concat(domainEvent.eventType),
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots").withKeySerde(Serdes.String()).
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-events-snapshots")
.withKeySerde(Serdes.String()).
withValueSerde(Serdes.String())
);
}
@@ -80,9 +81,4 @@ public class KafkaStreamsAggregateSample {
}
}
interface KafkaStreamsProcessorX {
@Input("input")
KStream<?, ?> input();
}
}

View File

@@ -1,9 +1,8 @@
spring.application.name: kafka-streams-aggregate-sample
spring.cloud.stream.bindings.input:
spring.cloud.stream.bindings.aggregate-in-0:
destination: foobar
spring.cloud.stream.kafka.streams.binder:
brokers: localhost #192.168.99.100
configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$BytesSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
commit.interval.ms: 1000

View File

@@ -1,18 +1,97 @@
package kafka.streams.table.join;
import org.junit.Ignore;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.web.client.RestTemplate;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class KafkaStreamsAggregateSampleTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true, "foobar");
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
@Autowired
StreamsBuilderFactoryBean streamsBuilderFactoryBean;
@LocalServerPort
int randomServerPort;
@Before
public void before() {
streamsBuilderFactoryBean.setCloseTimeout(0);
}
@BeforeClass
public static void setUp() {
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString());
}
@AfterClass
public static void tearDown() {
System.clearProperty("spring.cloud.stream.kafka.streams.binder.brokers");
}
@Test
@Ignore
public void contextLoads() {
public void testKafkaStreamsWordCountProcessor() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ObjectMapper mapper = new ObjectMapper();
Serde<DomainEvent> domainEventSerde = new JsonSerde<>(DomainEvent.class, mapper);
senderProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, domainEventSerde.serializer().getClass());
DefaultKafkaProducerFactory<String, DomainEvent> pf = new DefaultKafkaProducerFactory<>(senderProps);
try {
KafkaTemplate<String, DomainEvent> template = new KafkaTemplate<>(pf, true);
template.setDefaultTopic("foobar");
DomainEvent ddEvent = new DomainEvent();
ddEvent.setBoardUuid("12345");
ddEvent.setEventType("create-domain-event");
template.sendDefault("", ddEvent);
Thread.sleep(1000);
RestTemplate restTemplate = new RestTemplate();
String fooResourceUrl
= "http://localhost:" + randomServerPort + "/events";
ResponseEntity<String> response
= restTemplate.getForEntity(fooResourceUrl, String.class);
assertThat(response.getBody()).contains("create-domain-event");
}
finally {
pf.destroy();
}
}
}