Server-Sent Events (Spring WebFlux)

Rukmal Senavirathne
3 min readNov 30, 2024

--

In the current and future world of real-time applications, Users must keep updating live and real-time data. SSE was initially proposed at the start of 2004. In 2006 Opera Web Browser implemented and experimented with this feature called Server-Sent Events.

SSE is a technology that is used to push real-time updates to clients over HTTP. It is an efficient way to deliver real-time data to clients without needing for the client to poll the server.

This is different from a Web Socket, SSE is an one-way of communication, Sever sends data to the client but the client cannot send data to a server like Web Socket.

The client can establish a long-lived HTTP connection.

Messages can be sent as text-based in a specific format over the connection.

Browsers handle reconnection automatically if the connection drops.

I have created an example using spring webflux. It contains a Server and a Client both. For both, I have used spring webflux.

My use case is when currency rates are uploaded to the server clients need to get real-time updates from server with the rates like USD to AED (USDAED as 3.67).

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
  1. Server

The server contains the following classes.

add/currency endpoint is used to upload currency rates to the server

{
"pair": "USDAED",
"rate": 280.15
}

then if a client has subscribed to /currency endpoint they will get real-time updates.

ServiceContoller

@RestController
@RequestMapping("/api/v1")
public class ServiceController {

private final CurrencyHandler processor;

public ServiceController(CurrencyHandler processor) {
this.processor = processor;
}

@PostMapping("/add/currency")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Currency> send(@RequestBody Currency currency) {
LOGGER.info("Received '{}'", currency);
processor.publish(currency);
return Mono.just(currency);
}

@GetMapping(path = "/currency", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Object>> consumer() {
return Flux.create(sink -> processor.subscribe(sink::next)).map(
currency -> ServerSentEvent.builder().data(currency).event("goal").build());
}
}

Currencyhandler

@Service
public class CurrencyHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(CurrencyHandler.class);

private final List<Consumer<Currency>> listeners = new CopyOnWriteArrayList<>();

public void subscribe(Consumer<Currency> listener) {
listeners.add(listener);
LOGGER.info("New one added, total consumer: {}", listeners.size());
}

public void publish(Currency currency) {
LOGGER.info("Processing live score: {}", currency);
listeners.forEach(listener -> listener.accept(currency));
}
}

Currency

public class Currency {

public String getPair() {
return pair;
}

public void setPair(String pair) {
this.pair = pair;
}

public double getRate() {
return rate;
}

public void setRate(double rate) {
this.rate = rate;
}

private String pair;

private double rate;
}

2. Client

The client contains the following classes.

SSEClient listening to /currency endpoint, when it gets data it prints rates.

@Component
public class SseClient {

private final WebClient webClient;

public SseClient(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.baseUrl("http://localhost:8080").build(); // Replace with your SSE server's base URL
}

public void listenToSseStream() {
Flux<Currency> eventStream = webClient.get()
.uri("/api/v1/currency") // Replace with your server's SSE endpoint
.retrieve()
.bodyToFlux(Currency.class)
.retryWhen(Retry.backoff(2, Duration.ofSeconds(2))
.maxBackoff(Duration.ofSeconds(10))
.doBeforeRetry(retrySignal -> System.out.println("Retrying connection..."))
);

eventStream.subscribe(
event -> System.out.println("Received event: " + event.getRate()), // Handle events
error -> System.err.println("Error occurred: " + error), // Handle errors
() -> System.out.println("SSE stream completed!") // Handle completion
);
}
}

SSEService

@Service
public class SseService {

private final SseClient sseClient;

public SseService(SseClient sseClient) {
this.sseClient = sseClient;
}

public void startListening() {
System.out.println("Starting SSE Listener...");
sseClient.listenToSseStream();
}
}

References

--

--

Rukmal Senavirathne
Rukmal Senavirathne

Written by Rukmal Senavirathne

Senior Software Engineer at GTN. Graduated from the Department of Computer Science and Engineering at the University of Moratuwa.

No responses yet