Dealing with Postgres specific Json/Enum type and Notifier/Listener with R2dbc
In the previous posts, we have explored the R2dbc support for H2, MySQL, Postgres and MSSQL, but every database has its own specific functionality, for example, Postgres drivers for R2dbc has built-in support of JSON and Enum type. In this post, we will focus on these specific features provided in Postgres.
JSON type
Postgres support two forms of JSON types.
- json - storing data as textual form in databases
- jsonb - storing data as binary form in databases
For example, add a metadata column to the posts table we’ve created in the previous posts.
CREATE TABLE IF NOT EXISTS posts (
-- other column definitions are omiited
metadata json default '{}',
PRIMARY KEY (id)
);
The json data type can be converted to Java io.r2dbc.postgresql.codec.Json
and reverse.
class Post {
//other properties are omiited
@Column("metadata")// use for Spring Data R2dbc
private Json metadata;
}
If you are using DatabaseClient
to insert data into tables, bind a value of the Json
type to the json data type, eg.
this.databaseClient
.sql("INSERT INTO posts (title, content, metadata) VALUES (:title, :content, :metadata)")
.filter((statement, executeFunction) -> statement.returnGeneratedValues("id").execute())
.bind("title", "my first post")
.bind("content", "content of my first post")
.bind("metadata", Json.of("{\"tags\":[\"spring\", \"r2dbc\"]}"))
.fetch()
.first()
.subscribe(
data -> log.info("inserted data : {}", data),
error -> log.info("error: {}", error)
);
When retrieving data from databases, you need to convert the json data type to Java Json
type.
// code fragments of row result mapping
row.get("metadata", Json.class)
If you are using Spring Data R2dbc APIs to operate the databases, either R2dbcEntityTemplate
or Repository
interface, it will do the conversion work automatically.
// query by id
this.template.selectOne(Query.query(where("id").is(id)), Post.class);
// save
this.template.insert(Post.class)
.using(p)
.map(post -> post.getId());
In a WebMVC/WebFlux application, if you wan to expose RESTful APIs and use the entity class(eg. Post
class here) as the payload of the request or response body, you need to customize the serialization or deserialization of Json
class.
@Slf4j
public class PgJsonObjectSerializer extends JsonSerializer<Json> {
@Override
public void serialize(Json value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
var text = value.asString();
log.info("The raw json value from PostgresSQL JSON type:" + text);
JsonFactory factory = new JsonFactory();
JsonParser parser = factory.createParser(text);
var node = gen.getCodec().readTree(parser);
serializers.defaultSerializeValue(node, gen);
}
}
The above PgJsonObjectSerializer
reads the raw json as string which is escaped, and use Jackson to parse it and regenerate to json literal form in the response by Jackson.
@Slf4j
public class PgJsonObjectDeserializer extends JsonDeserializer<Json> {
@Override
public Json deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JsonProcessingException {
var value = ctxt.readTree(p);
log.info("read json value :", value);
return Json.of(value.toString());
}
}
The PgJsonObjectDeserializer
converts incoming json string to Json
object.
Then apply then on the fields of the entity.
public class Post {
// other fields
@JsonSerialize(using = PgJsonObjectSerializer.class)
@JsonDeserialize(using = PgJsonObjectDeserializer.class)
private Json metadata;
}
Or register them in Jackson ObjectMapper
configuration.
var builder = Jackson2ObjectMapperBuilder.json();
//...
// register globally or on class fields
builder.serializers(new PgJsonObjectSerializer())
.deserializers(new PgJsonObjectDeserializer())
If it is a Spring Boot application, just need to declare a @JsonComponent
annotated component, Spring Boot will register it automatically.
@Slf4j
@JsonComponent
class PgJsonObjectJsonComponent {
static class Deserializer extends JsonDeserializer<Json> {
@Override
public Json deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
var value = ctxt.readTree(p);
log.info("read json value :{}", value);
return Json.of(value.toString());
}
}
static class Serializer extends JsonSerializer<Json> {
@Override
public void serialize(Json value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
var text = value.asString();
log.info("The raw json value from PostgresSQL JSON type:{}", text);
JsonFactory factory = new JsonFactory();
JsonParser parser = factory.createParser(text);
var node = gen.getCodec().readTree(parser);
serializers.defaultSerializeValue(node, gen);
}
}
}
Enum type
Postgres database supports custom types, you can create your enum type and limit the inserting values in a set of predefined items.
DO $$ BEGIN
CREATE TYPE post_status AS ENUM( 'DRAFT', 'PENDING_MODERATION', 'PUBLISHED');
EXCEPTION
WHEN duplicate_object THEN null;
END $$;
The scripts will try to create a custom type post_status
and only accept one of DRAFT
, PENDING_MODERATION
, PUBLISHED
.
The
ConnectionFactoryInitializer
read the scripts line by line terminated by “;” which caused exceptions when the application starts up. I move this scripts into the init.sql of Postgres docker service defined indocker-compose.yml
, it will be initialized when the docker is starting.
Define a post_status
enum data type in the table script.
CREATE TABLE IF NOT EXISTS posts (
// other fields
status post_status default 'DRAFT',
PRIMARY KEY (id)
);
Create a Java Enum class and define a stauts
field in the Post
entity.
public class Post {
//...
@Column("status")
private Status status;
}
enum Status {
DRAFT, PENDING_MODERATION, PUBLISHED;
}
To encode and decode the enum data type automatically, register a Postgres specific EnumCodec
in the connection configuration which is shipped with the official Postgres R2dc driver.
PostgresqlConnectionConfiguration.builder()
...
.codecRegistrar(EnumCodec.builder().withEnum("post_status", Post.Status.class).build())
.build()
Then you can use DatabaseClient
to read and write data of Java enum type like this.
// read from rowset directly.
row.get("status", Post.Status.class)
// bind java Enum type to parameter placeholder directly
.bind("status", p.getStatus())
When using Spring Data R2dbc, it registered a collection of converters in its MappingContext
in the AbstractR2dbcConfiguration
. It will detect the data type and Java type and apply the correct converters when reading and writing data.
Ideally when it fails, it will return to use the built-in solution from the baked drivers, but I got exceptions when reading data from databases. Follow the official Spring Data R2dbc reference document, you can create your custom converters to resolve this issue.
Create s @ReadingConverter
to convert the data type to Java type.
@ReadingConverter
public class PostReadingConverter implements Converter<Row, Post> {
@Override
public Post convert(Row row) {
return Post.builder()
.id(row.get("id", UUID.class))
.title(row.get("title", String.class))
.content(row.get("content", String.class))
.status(row.get("status", Post.Status.class))
.metadata(row.get("metadata", Json.class))
.createdAt(row.get("created_at", LocalDateTime.class))
.updatedAt(row.get("updated_at", LocalDateTime.class))
.version(row.get("version", Long.class))
.build();
}
}
Creating a @WritingConverter
to handle the conversion of Java Enum type to enum data type. Spring Data R2dbc has a EnumWriteSupport
to simplify the work.
@WritingConverter
public class PostStatusWritingConverter extends EnumWriteSupport<Post.Status> {
}
Register them in the R2dbc configuration.
public class DatabaseConfig extends AbstractR2dbcConfiguration {
//...
@Override
protected List<Object> getCustomConverters() {
return List.of(new PostReadingConverter(), new PostStatusWritingConverter());
}
}
Spring Data R2dbc can convert Java Enum type to a textual data type and reverse automatically. You can just need to declare the enum column as a textual type, eg.
status VARCHAR(255) default 'DRAFT',
No need custom converters in this case.
The disadvantage of this usage is that it will loss the constraints of the database schema itself, you are responsible for ensuring the correctness of the data being inserted in the tables.
Notify/Listen
Postgres provides a simple notification mechanism, you can use it as a message broker.
Create a Nofitier
to send a message.
@Component
@Slf4j
class Notifier {
@Autowired
@Qualifier("pgConnectionFactory")
ConnectionFactory pgConnectionFactory;
PostgresqlConnection sender;
@PostConstruct
public void initialize() throws InterruptedException {
sender = Mono.from(pgConnectionFactory.create())
.cast(PostgresqlConnection.class)
.block();
}
public Mono<Void> send() {
return sender.createStatement("NOTIFY mymessage, 'Hello world at " + LocalDateTime.now() + "'")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.log("sending notification::")
.then();
}
@PreDestroy
public void destroy() {
sender.close().subscribe();
}
}
And receiving the message in a Listener
bean.
@Component
@Slf4j
class Listener {
@Autowired
@Qualifier("pgConnectionFactory")
ConnectionFactory pgConnectionFactory;
PostgresqlConnection receiver;
@PostConstruct
public void initialize() throws InterruptedException {
receiver = Mono.from(pgConnectionFactory.create())
.cast(PostgresqlConnection.class)
.block();
receiver.createStatement("LISTEN mymessage")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.log("listen::")
.subscribe();
receiver.getNotifications()
.delayElements(Duration.ofSeconds(1))
.log()
.subscribe(
data -> log.info("notifications: {}", data)
);
}
@PreDestroy
public void destroy() {
receiver.close().subscribe();
}
}
Add a routing rule to send a message, check the logging of listener side.
.GET("/hello", request -> notifier
.send()
.flatMap((v) -> noContent().build())
)
Grab the sample codes from my Github.