Server-Sent Events (Spring WebFlux)
In the current and future world of real-time applications, Users must keep updating live and real-time data. SSE was initially proposed at the start of 2004. In 2006 Opera Web Browser implemented and experimented with this feature called Server-Sent Events.
SSE is a technology that is used to push real-time updates to clients over HTTP. It is an efficient way to deliver real-time data to clients without needing for the client to poll the server.
This is different from a Web Socket, SSE is an one-way of communication, Sever sends data to the client but the client cannot send data to a server like Web Socket.
The client can establish a long-lived HTTP connection.
Messages can be sent as text-based in a specific format over the connection.
Browsers handle reconnection automatically if the connection drops.
I have created an example using spring webflux. It contains a Server and a Client both. For both, I have used spring webflux.
My use case is when currency rates are uploaded to the server clients need to get real-time updates from server with the rates like USD to AED (USDAED as 3.67).
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
- Server
The server contains the following classes.
add/currency endpoint is used to upload currency rates to the server
{
"pair": "USDAED",
"rate": 280.15
}
then if a client has subscribed to /currency endpoint they will get real-time updates.
ServiceContoller
@RestController
@RequestMapping("/api/v1")
public class ServiceController {
private final CurrencyHandler processor;
public ServiceController(CurrencyHandler processor) {
this.processor = processor;
}
@PostMapping("/add/currency")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Currency> send(@RequestBody Currency currency) {
LOGGER.info("Received '{}'", currency);
processor.publish(currency);
return Mono.just(currency);
}
@GetMapping(path = "/currency", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Object>> consumer() {
return Flux.create(sink -> processor.subscribe(sink::next)).map(
currency -> ServerSentEvent.builder().data(currency).event("goal").build());
}
}
Currencyhandler
@Service
public class CurrencyHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(CurrencyHandler.class);
private final List<Consumer<Currency>> listeners = new CopyOnWriteArrayList<>();
public void subscribe(Consumer<Currency> listener) {
listeners.add(listener);
LOGGER.info("New one added, total consumer: {}", listeners.size());
}
public void publish(Currency currency) {
LOGGER.info("Processing live score: {}", currency);
listeners.forEach(listener -> listener.accept(currency));
}
}
Currency
public class Currency {
public String getPair() {
return pair;
}
public void setPair(String pair) {
this.pair = pair;
}
public double getRate() {
return rate;
}
public void setRate(double rate) {
this.rate = rate;
}
private String pair;
private double rate;
}
2. Client
The client contains the following classes.
SSEClient listening to /currency endpoint, when it gets data it prints rates.
@Component
public class SseClient {
private final WebClient webClient;
public SseClient(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.baseUrl("http://localhost:8080").build(); // Replace with your SSE server's base URL
}
public void listenToSseStream() {
Flux<Currency> eventStream = webClient.get()
.uri("/api/v1/currency") // Replace with your server's SSE endpoint
.retrieve()
.bodyToFlux(Currency.class)
.retryWhen(Retry.backoff(2, Duration.ofSeconds(2))
.maxBackoff(Duration.ofSeconds(10))
.doBeforeRetry(retrySignal -> System.out.println("Retrying connection..."))
);
eventStream.subscribe(
event -> System.out.println("Received event: " + event.getRate()), // Handle events
error -> System.err.println("Error occurred: " + error), // Handle errors
() -> System.out.println("SSE stream completed!") // Handle completion
);
}
}
SSEService
@Service
public class SseService {
private final SseClient sseClient;
public SseService(SseClient sseClient) {
this.sseClient = sseClient;
}
public void startListening() {
System.out.println("Starting SSE Listener...");
sseClient.listenToSseStream();
}
}
References