package com.baeldung.websocket; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.stereotype.Component; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketSession; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.Duration; import static java.time.LocalDateTime.now; import static java.util.UUID.randomUUID; @Component("ReactiveWebSocketHandler") public class ReactiveWebSocketHandler implements WebSocketHandler { private static final ObjectMapper json = new ObjectMapper(); private Flux eventFlux = Flux.generate(sink -> { Event event = new Event(randomUUID().toString(), now().toString()); try { sink.next(json.writeValueAsString(event)); } catch (JsonProcessingException e) { sink.error(e); } }); private Flux intervalFlux = Flux.interval(Duration.ofMillis(1000L)) .zipWith(eventFlux, (time, event) -> event); @Override public Mono handle(WebSocketSession webSocketSession) { return webSocketSession.send(intervalFlux .map(webSocketSession::textMessage)) .and(webSocketSession.receive() .map(WebSocketMessage::getPayloadAsText).log()); } }