Update interactive query sample

Update the sample to properly handle multiple instatnces
Update README
This commit is contained in:
Soby Chacko
2018-05-17 12:08:00 -04:00
parent a1bd8929b6
commit c6bdca132b
5 changed files with 161 additions and 18 deletions

View File

@@ -9,18 +9,33 @@ There is a REST service provided as part of the application that can be used to
=== Running the app:
We will run 2 instances of the processor application to demonstrate that regardless of which instance hosts the keys, the REST endpoint will serve the requests.
For more information on how this is done, please take a look at the application code.
1. `docker-compose up -d`
2. Start the confluent schema registry: The following command is based on the confluent platform.
2. Start the confluent schema registry: The following command is based on the confluent platform and assume that you are at the root of the confluent platform's root directory.
`./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties`
3. Go to the root of the repository and do: `./mvnw clean package`
4. `java -jar target/kafka-streams-interactive-query-0.0.1-SNAPSHOT.jar`
4. `java -jar target/kafka-streams-interactive-query-advanced-0.0.1-SNAPSHOT.jar`
5. On another terminal session:
`java -jar target/kafka-streams-interactive-query-advanced-0.0.1-SNAPSHOT.jar --server.port=8082 --spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8082'
5. Run the stand-alone `Producers` application to generate data and start the processing.
Keep it running for a while.
6. Go to the URL: http://localhost:8080/charts/top-five?genre=Punk
keep refreshing the URL and you will see the song play count information changes.
Take a look at the console sessions for the applications and you will see that it may not be the processor started on 8080 that serves this request.
7. Go to the URL: http://localhost:8082/charts/top-five?genre=Punk
Take a look at the console sessions for the applications and you will see that it may not be the processor started on 8082 that serves this request.
8. Once you are done with running the sample, stop the docker containers and the schema registry.

View File

@@ -47,6 +47,7 @@
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>2.1.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>

View File

@@ -21,13 +21,13 @@ import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import kafka.streams.interactive.query.avro.PlayEvent;
import kafka.streams.interactive.query.avro.Song;
import kafka.streams.interactive.query.avro.SongPlayCount;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.*;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
@@ -37,10 +37,11 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.binder.kafka.streams.QueryableStoreRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryServices;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import java.io.*;
import java.util.*;
@@ -54,7 +55,7 @@ public class KafkaStreamsInteractiveQuerySample {
static final String ALL_SONGS = "all-songs";
@Autowired
private QueryableStoreRegistry queryableStoreRegistry;
private InteractiveQueryServices interactiveQueryServices;
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsInteractiveQuerySample.class, args);
@@ -169,16 +170,45 @@ public class KafkaStreamsInteractiveQuerySample {
@RestController
public class FooController {
private final Log logger = LogFactory.getLog(getClass());
@RequestMapping("/song/idx")
public SongBean song(@RequestParam(value="id") Long id) {
final ReadOnlyKeyValueStore<Long, Song> songStore =
interactiveQueryServices.getQueryableStore(KafkaStreamsInteractiveQuerySample.ALL_SONGS, QueryableStoreTypes.<Long, Song>keyValueStore());
final Song song = songStore.get(id);
if (song == null) {
throw new IllegalArgumentException("hi");
}
return new SongBean(song.getArtist(), song.getAlbum(), song.getName());
}
@RequestMapping("/charts/top-five")
public List<SongPlayCountBean> greeting(@RequestParam(value="genre") String genre) {
return topFiveSongs(KafkaStreamsInteractiveQuerySample.TOP_FIVE_KEY, KafkaStreamsInteractiveQuerySample.TOP_FIVE_SONGS_STORE);
@SuppressWarnings("unchecked")
public List<SongPlayCountBean> topFive(@RequestParam(value="genre") String genre) {
HostInfo hostInfo = interactiveQueryServices.getHostInfo(KafkaStreamsInteractiveQuerySample.TOP_FIVE_SONGS_STORE,
KafkaStreamsInteractiveQuerySample.TOP_FIVE_KEY, new StringSerializer());
if (interactiveQueryServices.getCurrentHostInfo().equals(hostInfo)) {
logger.info("Top Five songs request served from same host: " + hostInfo);
return topFiveSongs(KafkaStreamsInteractiveQuerySample.TOP_FIVE_KEY, KafkaStreamsInteractiveQuerySample.TOP_FIVE_SONGS_STORE);
}
else {
//find the store from the proper instance.
logger.info("Top Five songs request served from different host: " + hostInfo);
RestTemplate restTemplate = new RestTemplate();
return restTemplate.postForObject(
String.format("http://%s:%d/%s", hostInfo.host(),
hostInfo.port(), "charts/top-five?genre=Punk"), "punk", List.class);
}
}
private List<SongPlayCountBean> topFiveSongs(final String key,
final String storeName) {
final ReadOnlyKeyValueStore<String, TopFiveSongs> topFiveStore =
queryableStoreRegistry.getQueryableStoreType(storeName, QueryableStoreTypes.<String, TopFiveSongs>keyValueStore());
interactiveQueryServices.getQueryableStore(storeName, QueryableStoreTypes.<String, TopFiveSongs>keyValueStore());
// Get the value from the store
final TopFiveSongs value = topFiveStore.get(key);
@@ -187,12 +217,31 @@ public class KafkaStreamsInteractiveQuerySample {
}
final List<SongPlayCountBean> results = new ArrayList<>();
value.forEach(songPlayCount -> {
final ReadOnlyKeyValueStore<Long, Song> songStore =
queryableStoreRegistry.getQueryableStoreType(KafkaStreamsInteractiveQuerySample.ALL_SONGS, QueryableStoreTypes.<Long, Song>keyValueStore());
final Song song = songStore.get(songPlayCount.getSongId());
results.add(new SongPlayCountBean(song.getArtist(),song.getAlbum(), song.getName(),
songPlayCount.getPlays()));
HostInfo hostInfo = interactiveQueryServices.getHostInfo(KafkaStreamsInteractiveQuerySample.ALL_SONGS,
songPlayCount.getSongId(), new LongSerializer());
if (interactiveQueryServices.getCurrentHostInfo().equals(hostInfo)) {
logger.info("Song info request served from same host: " + hostInfo);
final ReadOnlyKeyValueStore<Long, Song> songStore =
interactiveQueryServices.getQueryableStore(KafkaStreamsInteractiveQuerySample.ALL_SONGS, QueryableStoreTypes.<Long, Song>keyValueStore());
final Song song = songStore.get(songPlayCount.getSongId());
results.add(new SongPlayCountBean(song.getArtist(),song.getAlbum(), song.getName(),
songPlayCount.getPlays()));
}
else {
logger.info("Song info request served from different host: " + hostInfo);
RestTemplate restTemplate = new RestTemplate();
SongBean song = restTemplate.postForObject(
String.format("http://%s:%d/%s", hostInfo.host(),
hostInfo.port(), "song/idx?id=" + songPlayCount.getSongId()), "id", SongBean.class);
results.add(new SongPlayCountBean(song.getArtist(),song.getAlbum(), song.getName(),
songPlayCount.getPlays()));
}
});
return results;
}

View File

@@ -0,0 +1,75 @@
package kafka.streams.interactive.query;
import java.util.Objects;
/**
* @author Soby Chacko
*/
public class SongBean {
private String artist;
private String album;
private String name;
public SongBean() {}
public SongBean(final String artist, final String album, final String name) {
this.artist = artist;
this.album = album;
this.name = name;
}
public String getArtist() {
return artist;
}
public void setArtist(final String artist) {
this.artist = artist;
}
public String getAlbum() {
return album;
}
public void setAlbum(final String album) {
this.album = album;
}
public String getName() {
return name;
}
public void setName(final String name) {
this.name = name;
}
@Override
public String toString() {
return "SongBean{" +
"artist='" + artist + '\'' +
", album='" + album + '\'' +
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SongBean that = (SongBean) o;
return Objects.equals(artist, that.artist) &&
Objects.equals(album, that.album) &&
Objects.equals(name, that.name);
}
@Override
public int hashCode() {
return Objects.hash(artist, album, name);
}
}

View File

@@ -22,4 +22,7 @@ spring.cloud.stream.kafka.streams.binder:
zkNodes: localhost #192.168.99.100
configuration:
schema.registry.url: http://localhost:8081
commit.interval.ms: 1000
commit.interval.ms: 1000
spring.cloud.stream.kafka.streams.binder.autoAddPartitions: true
spring.cloud.stream.kafka.streams.binder.minPartitionCount: 4
spring.cloud.stream.kafka.streams.binder.configuration.application.server: localhost:8080