Spring 5 has embraced reactive programming paradigm by introducing a brand new reactive framework called Spring WebFlux.
Spring WebFlux is an asynchronous framework from the bottom up. It can run on Servlet Containers using the Servlet 3.1 non-blocking IO API as well as other async runtime environments such as netty or undertow.
It will be available for use alongside Spring MVC. Yes, Spring MVC is not going anywhere. It’s a popular web framework that developers have been using for a long time.
But You now have a choice between the new reactive framework and the traditional Spring MVC. You can choose to use any of them depending on your use case.
Spring WebFlux uses a library called Reactor for its reactive support. Reactor is an implementation of the Reactive Streams specification.
Reactor Provides two main types called Flux
and Mono
. Both of these types implement the Publisher
interface provided by Reactive Streams. Flux
is used to represent a stream of 0..N elements and Mono
is used to represent a stream of 0..1 element.
Although Spring uses Reactor as a core dependency for most of its internal APIs, It also supports the use of RxJava at the application level.
Programming models supported by Spring WebFlux
Spring WebFlux supports two types of programming models :
- Traditional annotation-based model with
@Controller
,@RequestMapping
, and other annotations that you have been using in Spring MVC. - A brand new Functional style model based on Java 8 lambdas for routing and handling requests.
In this article, We’ll be using the traditional annotation-based programming model. I will write about functional style model in a future article.
Let’s build a Reactive Restful Service in Spring Boot
In this article, we’ll build a Restful API for a mini twitter application. The application will only have a single domain model called Tweet
. Every Tweet
will have a text
and a createdAt
field.
We’ll use MongoDB as our data store along with the reactive mongodb driver. We’ll build REST APIs for creating, retrieving, updating and deleting a Tweet. All the REST APIs will be asynchronous and will return a Publisher.
We’ll also learn how to stream data from the database to the client.
Finally, we’ll write integration tests to test all the APIs using the new asynchronous WebTestClient provided by Spring 5.
Creating the Project
Let’s use Spring Initializr web app to generate our application. Follow the steps below to generate the Project –
- Head over to http://start.spring.io
- Enter artifact’s value as webflux-demo
- Add Reactive Web and Reactive MongoDB dependencies
- Click Generate to generate and download the Project.
Once the project is downloaded, unzip it and import it into your favorite IDE. The project’s directory structure should look like this –
Configuring MongoDB
You can configure MongoDB by simply adding the following property to the application.properties
file –
spring.data.mongodb.uri=mongodb://localhost:27017/webflux_demo
Spring Boot will read this configuration on startup and automatically configure the data source.
Creating the Domain Model
Let’s create our domain model – Tweet
. Create a new package called model
inside com.example.webfluxdemo
package and then create a file named Tweet.java
with the following contents –
package com.example.webfluxdemo.model;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.Date;
@Document(collection = "tweets")
public class Tweet {
@Id
private String id;
@NotBlank
@Size(max = 140)
private String text;
@NotNull
private Date createdAt = new Date();
public Tweet() {
}
public Tweet(String text) {
this.id = id;
this.text = text;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getText() {
return text;
}
public void setText(String text) {
this.text = text;
}
public Date getCreatedAt() {
return createdAt;
}
public void setCreatedAt(Date createdAt) {
this.createdAt = createdAt;
}
}
Simple enough! The Tweet model contains a text
and a createdAt
field. The text
field is annotated with @NotBlank
and @Size
annotations to ensure that it is not blank and have a maximum of 140 characters.
Creating the Repository
Next, we’re going to create the data access layer which will be used to access the MongoDB database. Create a new package called repository
inside com.example.webfluxdemo
and then create a new file called TweetRepository.java
with the following contents –
package com.example.webfluxdemo.repository;
import com.example.webfluxdemo.model.Tweet;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface TweetRepository extends ReactiveMongoRepository<Tweet, String> {
}
The TweetRepository
interface extends from ReactiveMongoRepository
which exposes various CRUD methods on the Document.
Spring Boot automatically plugs in an implementation of this interface called SimpleReactiveMongoRepository
at runtime.
So you get all the CRUD methods on the Document readily available to you without needing to write any code. Following are some of the methods available from SimpleReactiveMongoRepository
–
reactor.core.publisher.Flux<T> findAll();
reactor.core.publisher.Mono<T> findById(ID id);
<S extends T> reactor.core.publisher.Mono<S> save(S entity);
reactor.core.publisher.Mono<Void> delete(T entity);
Notice that all the methods are asynchronous and return a publisher in the form of a Flux
or a Mono
type.
Creating the Controller Endpoints
Finally, Let’s write the APIs that will be exposed to the clients. Create a new package called controller
inside com.example.webfluxdemo
and then create a new file called TweetController.java
with the following contents –
package com.example.webfluxdemo.controller;
import com.example.webfluxdemo.model.Tweet;
import com.example.webfluxdemo.repository.TweetRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.validation.Valid;
@RestController
public class TweetController {
@Autowired
private TweetRepository tweetRepository;
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetRepository.findAll();
}
@PostMapping("/tweets")
public Mono<Tweet> createTweets(@Valid @RequestBody Tweet tweet) {
return tweetRepository.save(tweet);
}
@GetMapping("/tweets/{id}")
public Mono<ResponseEntity<Tweet>> getTweetById(@PathVariable(value = "id") String tweetId) {
return tweetRepository.findById(tweetId)
.map(savedTweet -> ResponseEntity.ok(savedTweet))
.defaultIfEmpty(ResponseEntity.notFound().build());
}
@PutMapping("/tweets/{id}")
public Mono<ResponseEntity<Tweet>> updateTweet(@PathVariable(value = "id") String tweetId,
@Valid @RequestBody Tweet tweet) {
return tweetRepository.findById(tweetId)
.flatMap(existingTweet -> {
existingTweet.setText(tweet.getText());
return tweetRepository.save(existingTweet);
})
.map(updatedTweet -> new ResponseEntity<>(updatedTweet, HttpStatus.OK))
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
@DeleteMapping("/tweets/{id}")
public Mono<ResponseEntity<Void>> deleteTweet(@PathVariable(value = "id") String tweetId) {
return tweetRepository.findById(tweetId)
.flatMap(existingTweet ->
tweetRepository.delete(existingTweet)
.then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
)
.defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
// Tweets are Sent to the client as Server Sent Events
@GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<Tweet> streamAllTweets() {
return tweetRepository.findAll();
}
}
All the controller endpoints return a Publisher in the form of a Flux or a Mono. The last endpoint is very interesting where we set the content-type to text/event-stream
. It sends the tweets in the form of Server Sent Events to a browser like this –
data: {"id":"59ba5389d2b2a85ed4ebdafa","text":"tweet1","createdAt":1505383305602}
data: {"id":"59ba5587d2b2a85f93b8ece7","text":"tweet2","createdAt":1505383814847}
Now that we’re talking about event-stream, You might ask that doesn’t the following endpoint also return a Stream?
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetRepository.findAll();
}
And the answer is Yes. Flux<Tweet>
represents a stream of tweets. But, by default, it will produce a JSON array because If a stream of individual JSON objects is sent to the browser then It will not be a valid JSON document as a whole. A browser client has no way to consume a stream other than using Server-Sent-Events or WebSocket.
However, Non-browser clients can request a stream of JSON by setting the Accept
header to application/stream+json
, and the response will be a stream of JSON similar to Server-Sent-Events but without extra formatting :
{"id":"59ba5389d2b2a85ed4ebdafa","text":"tweet1","createdAt":1505383305602}
{"id":"59ba5587d2b2a85f93b8ece7","text":"tweet2","createdAt":1505383814847}
Integration Test with WebTestClient
Spring 5 also provides an asynchronous and reactive http client called WebClient
for working with asynchronous and streaming APIs. It is a reactive alternative to RestTemplate
.
Moreover, You also get a WebTestClient
for writing integration tests. The test client can be either run on a live server or used with mock request and response.
We’ll use WebTestClient to write integration tests for our REST APIs. Open WebfluxDemoApplicationTests.java
file and add the following tests to it –
package com.example.webfluxdemo;
import com.example.webfluxdemo.model.Tweet;
import com.example.webfluxdemo.repository.TweetRepository;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;
import java.util.Collections;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class WebfluxDemoApplicationTests {
@Autowired
private WebTestClient webTestClient;
@Autowired
TweetRepository tweetRepository;
@Test
public void testCreateTweet() {
Tweet tweet = new Tweet("This is a Test Tweet");
webTestClient.post().uri("/tweets")
.contentType(MediaType.APPLICATION_JSON_UTF8)
.accept(MediaType.APPLICATION_JSON_UTF8)
.body(Mono.just(tweet), Tweet.class)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
.expectBody()
.jsonPath("$.id").isNotEmpty()
.jsonPath("$.text").isEqualTo("This is a Test Tweet");
}
@Test
public void testGetAllTweets() {
webTestClient.get().uri("/tweets")
.accept(MediaType.APPLICATION_JSON_UTF8)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
.expectBodyList(Tweet.class);
}
@Test
public void testGetSingleTweet() {
Tweet tweet = tweetRepository.save(new Tweet("Hello, World!")).block();
webTestClient.get()
.uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
.exchange()
.expectStatus().isOk()
.expectBody()
.consumeWith(response ->
Assertions.assertThat(response.getResponseBody()).isNotNull());
}
@Test
public void testUpdateTweet() {
Tweet tweet = tweetRepository.save(new Tweet("Initial Tweet")).block();
Tweet newTweetData = new Tweet("Updated Tweet");
webTestClient.put()
.uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
.contentType(MediaType.APPLICATION_JSON_UTF8)
.accept(MediaType.APPLICATION_JSON_UTF8)
.body(Mono.just(newTweetData), Tweet.class)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
.expectBody()
.jsonPath("$.text").isEqualTo("Updated Tweet");
}
@Test
public void testDeleteTweet() {
Tweet tweet = tweetRepository.save(new Tweet("To be deleted")).block();
webTestClient.delete()
.uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
.exchange()
.expectStatus().isOk();
}
}
In the above example, I have written tests for all the CRUD APIs. You can run the tests by going to the root directory of the project and typing mvn test
.
Conclusion
In this article, we learned the basics of reactive programming with Spring and built a simple Restful service with the reactive support provided by Spring WebFlux framework. We also tested all the Rest APIs using WebTestClient.
I strongly recommend the following awesome YouTube videos for learning more about reactive programming with Spring and Reactor –
- Reactive Web Applications with Spring 5 – By Rossen Stoyanchev
- Developing Reactive applications with Reactive Streams and Java 8 – By Brian Clozel, Sébastien Deleuze
Thanks for reading folks! Let me know what do you think about the new Spring WebFlux framework in the comment section below.