Skip to the content.

Working with Relational Database using R2dbc DatabaseClient

In Spring 5.2 or previous versions, when using relational databases in your applications, you have to use Jdbc or JPA to operate with the databases. Since Spring 5.3, things will be changed. The R2dbc’s DatabaseClient which is originally part of the Spring Data R2dbc is completely refactored and will be migrated to the core Spring framework.

To access the relational databases, R2dbc will be an official alternative of Jdbc in the new Spring 5.3.

R2dbc

Configuring DatabaseClient

To use R2dbc, first of all, add corresponding drivers into your project dependencies. Check the Introduction to R2dbc for details.

<dependency>
    <groupId>io.r2dbc</groupId>
    <artifactId>r2dbc-postgresql</artifactId>
</dependency>

And make sure you are using the latest Spring 5.3 which is managed by Spring Boot 2.4 BOM, use Spring Boot BOM to simplify the dependency management.

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.4.0-M3</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

Also add the following dependencies to create a plain Spring application.

<!-- spring webflux support -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-webflux</artifactId>
</dependency>

<!-- Spring R2dbc -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-r2dbc</artifactId>
</dependency>

<!-- Jackson modules -->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jdk8</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
</dependency>

<!-- Reactor/Netty runtime -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-buffer</artifactId>
</dependency>

<dependency>
    <groupId>io.projectreactor.netty</groupId>
    <artifactId>reactor-netty</artifactId>
</dependency>

<!-- Project Lombok: to erase the getter and setters, equals, hashCode, toString in source codes, and clean your codes -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

<!-- logging -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
</dependency>

In a Spring Boot application, just needs to add spring-boot-webflux-starter into your dependencies.

Like the DataSource beans of the Jdbc support, to activate R2dbc feature, declare a ConnectionFactory bean instead.

@Bean
public ConnectionFactory connectionFactory() {

    // postgres
    return new PostgresqlConnectionFactory(
        PostgresqlConnectionConfiguration.builder()
        .host("localhost")
        .database("test")
        .username("user")
        .password("password")
        .build()
    );
}

Now you can create a DatabaseClient bean which is dependent on the existing ConnectionFactory bean.

@Bean
DatabaseClient databaseClient(ConnectionFactory connectionFactory) {
    return DatabaseClient.builder()
        .connectionFactory(connectionFactory)
        //.bindMarkers(() -> BindMarkersFactory.named(":", "", 20).create())
        .namedParameters(true)
        .build();
}

The DatabaseClient.builder() provides a flexible way to setup the parameter binding strategy in SQL queries.

If you are using Spring Boot, just need to setup spring.r2dbc.url, spring.r2dbc.username, spring.r2dbc.password properties in the application.properties file, it will autoconfigure ConnectionFactory and DatabaseClient for you.

Using DatabaseClient

Once a DatabaseClient is available in Spring container, you can inject it in your Repository class , and use it to execute queries, such as, select, insert, update, delete, etc.

Create a PostRepository class to envelope the basic CRUD operations.

@RequiredArgsConstructor
@Component
@Slf4j
public class PostRepository {

    public static final BiFunction<Row, RowMetadata, Post> MAPPING_FUNCTION = (row, rowMetaData) -> Post.builder()
            .id(row.get("id", UUID.class))
            .title(row.get("title", String.class))
            .content(row.get("content", String.class))
            .build();

    private final DatabaseClient databaseClient;

    public Flux<Post> findByTitleContains(String name) {
        return this.databaseClient
                .sql("SELECT * FROM posts WHERE title LIKE :title")
                .bind("title", "%" + name + "%")
                .map(MAPPING_FUNCTION)
                .all();
    }

    public Flux<Post> findAll() {
        return this.databaseClient
                .sql("SELECT * FROM posts")
                .filter((statement, executeFunction) -> statement.fetchSize(10).execute())
                .map(MAPPING_FUNCTION)
                .all();
    }

    public Mono<Post> findById(UUID id) {
        return this.databaseClient
                .sql("SELECT * FROM posts WHERE id=:id")
                .bind("id", id)
                .map(MAPPING_FUNCTION)
                .one();
    }

    public Mono<UUID> save(Post p) {
        return this.databaseClient.sql("INSERT INTO  posts (title, content) VALUES (:title, :content, :metadata)")
                .filter((statement, executeFunction) -> statement.returnGeneratedValues("id").execute())
                .bind("title", p.getTitle())
                .bind("content", p.getContent())
                .fetch()
                .first()
                .map(r -> (UUID) r.get("id"));
    }

    public Mono<Integer> update(Post p) {
        return this.databaseClient.sql("UPDATE posts set title=:title, content=:content WHERE id=:id")
                .bind("title", p.getTitle())
                .bind("content", p.getContent())
                .bind("id", p.getId())
                .fetch()
                .rowsUpdated();
    }

    public Mono<Integer> deleteById(UUID id) {
        return this.databaseClient.sql("DELETE FROM posts WHERE id=:id")
                .bind("id", id)
                .fetch()
                .rowsUpdated();
    }
}

In the above codes, it used a POJO class Post to wrap the query results.

@Data
@ToString
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Post {

    private UUID id;

    private String title;

    private String content;

}

Declare a ConnectionFactoryInitializer bean to initialize the database, such as preparing the schema if not existed , and inserting some sample data.

@Bean
public ConnectionFactoryInitializer initializer(ConnectionFactory connectionFactory) {

    ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();
    initializer.setConnectionFactory(connectionFactory);

    CompositeDatabasePopulator populator = new CompositeDatabasePopulator();
    populator.addPopulators(new ResourceDatabasePopulator(new ClassPathResource("schema.sql")));
    populator.addPopulators(new ResourceDatabasePopulator(new ClassPathResource("data.sql")));
    initializer.setDatabasePopulator(populator);

    return initializer;
}

Here we use a schema.sql to create the tables and the data.sql to initialize some sample data.

-- schema.sql
CREATE TABLE IF NOT EXISTS posts (
    id UUID DEFAULT uuid_generate_v4(),
    title VARCHAR(255),
    content VARCHAR(255),
    PRIMARY KEY (id)
);

-- data.sql
DELETE FROM posts;
INSERT INTO  posts (title, content) VALUES ('R2dbc is refined', 'R2dbc is now part of Spring framework core');

In the Jdbc world, there are some solutions to initialize and maintain the database schema and data, such as Flyway and Liquibase. There is a new project nkonev/r2dbc-migrate which try to port similar features into the R2dbc world.

To initialize some sample data in a plain Spring application, create a bean to listen the ContextRefreshEvent.

@Component
@Slf4j
@RequiredArgsConstructor
public class DataInitializer {

    private final DatabaseClient databaseClient;

    @EventListener(value = ContextRefreshedEvent.class)
    public void init() throws Exception {
        log.info("start data initialization...");
        this.databaseClient
                .sql("INSERT INTO  posts (title, content) VALUES (:title, :content)")
                .filter((statement, executeFunction) -> statement.returnGeneratedValues("id").execute())
                .bind("title", "my first post")
                .bind("content", "content of my first post")
                .fetch()
                .first()
                .subscribe(
                        data -> log.info("inserted data : {}", data),
                        error -> log.info("error: {}", error)
                );

    }
}

Spring Boot includes a series of application lifecycle events, in a Spring Boot application, listen the ApplicationReadyEvent instead.

Transaction Management

Spring R2dbc provides a R2dbcTransactionManager which implements ReactiveTransactionManager, with a Spring transaction manager, you can control the transaction by the declaration way or programmatic approaches.

To enable transaction support, first of all, you should add @EnableTransactionManagement on the configuration class.

@Configuration
@EnableTransactionManagement
class DatabaseConfig{}

Then declare a R2dbcTransactionManager bean.

@Bean
ReactiveTransactionManager transactionManager(ConnectionFactory connectionFactory) {
    return new R2dbcTransactionManager(connectionFactory);
}

Now it is ready to add a @Transacational annotation on classes or methods to control transaction unit.

// on classes
@Transactional
class Service{}

// on methods
class Service{
    
    @Transactional
    public void someMethod() {}
}

For programmatic transaction control, the traditional Jdbc provides a TransactionTemplate. In Spring reactive stack, Spring provides a similar TransactionalOperator.

Declare an extra TransactionalOperator bean.

@Bean
TransactionalOperator transactionalOperator(ReactiveTransactionManager transactionManager) {
    return TransactionalOperator.create(transactionManager);
}

Then apply operator::transactional in the reactive pipelines to wrap a series of operations into a transaction unit.

@Autowired
TransactionalOperator operator;

posts
    .save(Post.builder().title("another post").content("content of another post").build())
    .map(p -> {
        p.setTitle("new Title");
        return p;
    })
    .flatMap(posts::save)
    .flatMap(saved -> comments
             .save(Comment.builder()
                   .content("dummy comments")
                   .postId(saved.getId())
                   .build()
                  )
            )
    .log()
    .then()
    .thenMany(posts.findAll())
    .as(operator::transactional)
    .subscribe(
        data -> log.info("saved data: {}", data),
        err -> log.error("err: {}", err)
    );

More info of Spring transaction, check the Transaction Management chapter of Spring reference documentation.

Exposing RESTful APIs

Let’s move to the web layer, and expose the CRUD operations as RESTful APIs.

Create a WebConfig configuration class to enable Spring Webflux feature.

@Configuration
@EnableWebFlux
public class WebConfig implements WebFluxConfigurer {

    @Autowired
    private ObjectMapper objectMapper;

    @Override
    public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
        configurer.defaultCodecs()
                .jackson2JsonEncoder(new Jackson2JsonEncoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN));
        configurer.defaultCodecs()
                .jackson2JsonDecoder(new Jackson2JsonDecoder(objectMapper, MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN));
    }

    @Bean
    public RouterFunction<ServerResponse> routes(PostHandler postController) {
        return route()
                .GET("/posts", postController::all)
                .POST("/posts", postController::create)
                .GET("/posts/{id}", postController::get)
                .PUT("/posts/{id}", postController::update)
                .DELETE("/posts/{id}", postController::delete)
                .build();
    }
}

In this example, we use RouterFunction instead of traditional @RestController.

And usually add a configuration to customize the Jackson 2 ObjectMapper.

@Configuration
public class Jackson2ObjectMapperConfig {

    @Bean
    public ObjectMapper objectMapper() {

        var builder = Jackson2ObjectMapperBuilder.json();
        builder.serializationInclusion(JsonInclude.Include.NON_EMPTY);
        builder.featuresToDisable(
                SerializationFeature.WRITE_DATES_AS_TIMESTAMPS,
                SerializationFeature.FAIL_ON_EMPTY_BEANS,
                DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES,
                DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
        builder.featuresToEnable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
        builder.modulesToInstall(JavaTimeModule.class);

        return builder.build();
    }
}

In a Spring Boot application, it will autoconfigure these for you, you just need to declare a RouterFunction bean and focus on your business.

Add the missing PostHanlder class in the RouterFunction bean to handle the incoming request.

@Component
public class PostHandler {

    private final PostRepository posts;

    public PostHandler(PostRepository posts) {
        this.posts = posts;
    }

    public Mono<ServerResponse> all(ServerRequest req) {
        return ok().body(this.posts.findAll(), Post.class);
    }

    public Mono<ServerResponse> create(ServerRequest req) {
        return req.bodyToMono(Post.class)
                .flatMap(this.posts::save)
                .flatMap(id -> created(URI.create("/posts/" + id)).build());
    }

    public Mono<ServerResponse> get(ServerRequest req) {
        return this.posts.findById(UUID.fromString(req.pathVariable("id")))
                .flatMap(post -> ok().body(Mono.just(post), Post.class))
                .switchIfEmpty(notFound().build());
    }

    public Mono<ServerResponse> update(ServerRequest req) {
        var existed = this.posts.findById(UUID.fromString(req.pathVariable("id")));
        return Mono
                .zip(
                        (data) -> {
                            Post p = (Post) data[0];
                            Post p2 = (Post) data[1];
                            if (p2 != null && StringUtils.hasText(p2.getTitle())) {
                                p.setTitle(p2.getTitle());
                            }

                            if (p2 != null && StringUtils.hasText(p2.getContent())) {
                                p.setContent(p2.getContent());
                            }

                            if (p2 != null && p2.getMetadata() != null) {
                                p.setMetadata(p2.getMetadata());
                            }

                            if (p2 != null && p2.getStatus() != null) {
                                p.setStatus(p2.getStatus());
                            }
                            return p;
                        },
                        existed,
                        req.bodyToMono(Post.class)
                )
                .cast(Post.class)
                .flatMap(this.posts::update)
                .flatMap(post -> noContent().build());
    }

    public Mono<ServerResponse> delete(ServerRequest req) {
        return this.posts.deleteById(UUID.fromString(req.pathVariable("id")))
                .flatMap(deleted -> noContent().build());
    }

}

In a plain Spring application, you need create a custom main class to start the application.

@Configuration
@ComponentScan
@PropertySource(value = "classpath:application.properties", ignoreResourceNotFound = true)
public class Application {

    @Value("${server.port:8080}")
    private int port = 8080;

    public static void main(String[] args) throws Exception {
        try (AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(
                Application.class)) {
            context.getBean(HttpServer.class).bindNow().onDispose().block();
        }
    }

    @Profile("default")
    @Bean
    public HttpServer nettyHttpServer(ApplicationContext context) {
        HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context).build();
        ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
        HttpServer httpServer = HttpServer.create().host("localhost").port(this.port);
        return httpServer.handle(adapter);
    }

}

Now you can run the application in IDEs directly. You also can package the application into a jar file, and then execute the jar file using command line, check the configuration of maven-assembly-plugin in the pom.xml file.

In Spring Boot application, you can run the generated Spring boot Application class from IDEs directly, and you can also use preconfigured Spring Boot maven plugin to package and run the application.

Grab a copy of this sample code from my github, and play it yourself.