move to reactive, extract mongodb ex
This commit is contained in:
@@ -0,0 +1,41 @@
|
||||
package com.baeldung.websocket;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.WebSocketMessage;
|
||||
import org.springframework.web.reactive.socket.WebSocketSession;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
import static java.time.LocalDateTime.now;
|
||||
import static java.util.UUID.randomUUID;
|
||||
|
||||
@Component
|
||||
public class ReactiveWebSocketHandler implements WebSocketHandler {
|
||||
|
||||
private static final ObjectMapper json = new ObjectMapper();
|
||||
|
||||
private Flux<String> eventFlux = Flux.generate(sink -> {
|
||||
Event event = new Event(randomUUID().toString(), now().toString());
|
||||
try {
|
||||
sink.next(json.writeValueAsString(event));
|
||||
} catch (JsonProcessingException e) {
|
||||
sink.error(e);
|
||||
}
|
||||
});
|
||||
|
||||
private Flux<String> intervalFlux = Flux.interval(Duration.ofMillis(1000L))
|
||||
.zipWith(eventFlux, (time, event) -> event);
|
||||
|
||||
@Override
|
||||
public Mono<Void> handle(WebSocketSession webSocketSession) {
|
||||
return webSocketSession.send(intervalFlux
|
||||
.map(webSocketSession::textMessage))
|
||||
.and(webSocketSession.receive()
|
||||
.map(WebSocketMessage::getPayloadAsText).log());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user