Incorporated the review comments on the pull request.

This commit is contained in:
CHANDRAKANT Kumar
2020-08-16 01:15:24 +05:30
parent e57306cb8b
commit cdf0802d63
4 changed files with 24 additions and 46 deletions

View File

@@ -1,25 +0,0 @@
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/build/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/

View File

@@ -1,7 +1,7 @@
## Spring WebFlux Concurrency
This module contains articles about concurrency model in Spring WebFlux.
Please note that this assumes Mongo and Kafka to be running on the local machine on default configurations.
### Relevant Articles:

View File

@@ -3,6 +3,9 @@ package com.baeldung.webflux;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Please note that this assumes Mongo and Kafka to be running on the local machine on default configurations.
*/
@SpringBootApplication
public class Application {

View File

@@ -53,31 +53,31 @@ public class Controller {
@GetMapping("/threads/webclient")
public Flux<String> getThreadsWebClient() {
WebClient.create("http://localhost:8080/index")
.get()
.retrieve()
.bodyToMono(String.class)
.subscribeOn(scheduler)
.publishOn(scheduler)
.doOnNext(s -> logger.info("Response: {}", s))
.subscribe();
.get()
.retrieve()
.bodyToMono(String.class)
.subscribeOn(scheduler)
.publishOn(scheduler)
.doOnNext(s -> logger.info("Response: {}", s))
.subscribe();
return Flux.fromIterable(getThreads());
}
@GetMapping("/threads/rxjava")
public Observable<String> getIndexRxJava() {
Observable.fromIterable(Arrays.asList("Hello", "World"))
.map(s -> s.toUpperCase())
.observeOn(io.reactivex.schedulers.Schedulers.trampoline())
.doOnNext(s -> logger.info("String: {}", s))
.subscribe();
.map(s -> s.toUpperCase())
.observeOn(io.reactivex.schedulers.Schedulers.trampoline())
.doOnNext(s -> logger.info("String: {}", s))
.subscribe();
return Observable.fromIterable(getThreads());
}
@GetMapping("/threads/mongodb")
public Flux<String> getIndexMongo() {
personRepository.findAll()
.doOnNext(p -> logger.info("Person: {}", p))
.subscribe();
.doOnNext(p -> logger.info("Person: {}", p))
.subscribe();
return Flux.fromIterable(getThreads());
}
@@ -90,9 +90,9 @@ public class Controller {
SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps);
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = Flux.range(1, 10)
.map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
.map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
sender.send(outboundFlux)
.subscribe();
.subscribe();
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -108,7 +108,7 @@ public class Controller {
inboundFlux.subscribe(r -> {
logger.info("Received message: {}", r.value());
r.receiverOffset()
.acknowledge();
.acknowledge();
});
return Flux.fromIterable(getThreads());
}
@@ -120,9 +120,9 @@ public class Controller {
private List<String> getThreads() {
return Thread.getAllStackTraces()
.keySet()
.stream()
.map(t -> String.format("%-20s \t %s \t %d \t %s\n", t.getName(), t.getState(), t.getPriority(), t.isDaemon() ? "Daemon" : "Normal"))
.collect(Collectors.toList());
.keySet()
.stream()
.map(t -> String.format("%-20s \t %s \t %d \t %s\n", t.getName(), t.getState(), t.getPriority(), t.isDaemon() ? "Daemon" : "Normal"))
.collect(Collectors.toList());
}
}