Server-Sent Events (Spring WebFlux)

Rukmal Senavirathne
3 min readNov 30, 2024

--

In the current and future world of real-time applications, Users must keep updating live and real-time data. SSE was initially proposed at the start of 2004. In 2006 Opera Web Browser implemented and experimented with this feature called Server-Sent Events.

SSE is a technology that is used to push real-time updates to clients over HTTP. It is an efficient way to deliver real-time data to clients without needing for the client to poll the server.

This is different from a Web Socket, SSE is an one-way of communication, Sever sends data to the client but the client cannot send data to a server like Web Socket.

The client can establish a long-lived HTTP connection.

Messages can be sent as text-based in a specific format over the connection.

Browsers handle reconnection automatically if the connection drops.

I have created an example using spring webflux. It contains a Server and a Client both. For both, I have used spring webflux.

My use case is when currency rates are uploaded to the server clients need to get real-time updates from server with the rates like USD to AED (USDAED as 3.67).

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
  1. Server

The server contains the following classes.

add/currency endpoint is used to upload currency rates to the server

{
"pair": "USDAED",
"rate": 280.15
}

then if a client has subscribed to /currency endpoint they will get real-time updates.

ServiceContoller

@RestController
@RequestMapping("/api/v1")
public class ServiceController {

private final CurrencyHandler processor;

public ServiceController(CurrencyHandler processor) {
this.processor = processor;
}

@PostMapping("/add/currency")
@ResponseStatus(HttpStatus.CREATED)
public Mono<Currency> send(@RequestBody Currency currency) {
LOGGER.info("Received '{}'", currency);
processor.publish(currency);
return Mono.just(currency);
}

@GetMapping(path = "/currency", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<Object>> consumer() {
return Flux.create(sink -> processor.subscribe(sink::next)).map(
currency -> ServerSentEvent.builder().data(currency).event("goal").build());
}
}

Currencyhandler

@Service
public class CurrencyHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(CurrencyHandler.class);

private final List<Consumer<Currency>> listeners = new CopyOnWriteArrayList<>();

public void subscribe(Consumer<Currency> listener) {
listeners.add(listener);
LOGGER.info("New one added, total consumer: {}", listeners.size());
}

public void publish(Currency currency) {
LOGGER.info("Processing live score: {}", currency);
listeners.forEach(listener -> listener.accept(currency));
}
}

Currency

public class Currency {

public String getPair() {
return pair;
}

public void setPair(String pair) {
this.pair = pair;
}

public double getRate() {
return rate;
}

public void setRate(double rate) {
this.rate = rate;
}

private String pair;

private double rate;
}

2. Client

The client contains the following classes.

SSEClient listening to /currency endpoint, when it gets data it prints rates.

@Component
public class SseClient {

private final WebClient webClient;

public SseClient(WebClient.Builder webClientBuilder) {
this.webClient = webClientBuilder.baseUrl("http://localhost:8080").build(); // Replace with your SSE server's base URL
}

public void listenToSseStream() {
Flux<Currency> eventStream = webClient.get()
.uri("/api/v1/currency") // Replace with your server's SSE endpoint
.retrieve()
.bodyToFlux(Currency.class)
.retryWhen(Retry.backoff(2, Duration.ofSeconds(2))
.maxBackoff(Duration.ofSeconds(10))
.doBeforeRetry(retrySignal -> System.out.println("Retrying connection..."))
);

eventStream.subscribe(
event -> System.out.println("Received event: " + event.getRate()), // Handle events
error -> System.err.println("Error occurred: " + error), // Handle errors
() -> System.out.println("SSE stream completed!") // Handle completion
);
}
}

SSEService

@Service
public class SseService {

private final SseClient sseClient;

public SseService(SseClient sseClient) {
this.sseClient = sseClient;
}

public void startListening() {
System.out.println("Starting SSE Listener...");
sseClient.listenToSseStream();
}
}

References

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Rukmal Senavirathne
Rukmal Senavirathne

Written by Rukmal Senavirathne

Senior Software Engineer at GTN. Graduated from the Department of Computer Science and Engineering at the University of Moratuwa.

No responses yet

Write a response