Future Advent Calendar 2019 This is the article on the 18th day.
When adopting the Pub / Sub type messaging architecture, I think that there are many cases where broker middleware such as kafka and managed services such as Amazon SNS and Google Cloud Pub / Sub are used, but in fact Pub / Sub is also used in PostgreSQL. I can do it.
If you are already using PostgreSQL for your business, you can easily realize loosely coupled system-to-system communication without having to build a new Pub / Sub broker.
This article introduces this feature and presents options and considerations when implementing a Pub / Sub client in Java.
NOTIFY/LISTEN
There are three queries related to the Pub / Sub feature of PostgreSQL:
NOTIFY channel [, payload]
Pg_notify
is also provided as a function with the same function.LISTEN channel
Let's look at the basic usage and behavior.
channel"foo"Start Subscribe
LISTEN foo;
channel"foo"To"hello"Publish the data
NOTIFY foo, 'hello';
--Or
SELECT pg_notify('foo', 'hello');
-- "foo"The following notifications will be sent to sessions that have already been subscribed to
-- Asynchronous notification "foo" with payload "hello" received from server process with PID 14728.
Notification without payload is also possible
NOTIFY foo;
-- Asynchronous notification "foo" received from server process with PID 14728.
channel"foo"Exit Subscribe
UNLISTEN foo;
It's very simple. Next, we will list the main specifications of this function and show the points to consider when using it. For details, please see the Official Document.
Channel is an arbitrary character string that is a key for Pub / Sub communication. Data cannot be exchanged if the channel targeted for LISTEN and the channel that executes NOTIFY are different.
You can listen to multiple channels in one session.
Alphanumeric characters and underscores (_) can be used as characters that can be specified for channels in ASCII. It is not case sensitive. We have confirmed that multi-byte characters can also be used.
NOTIFY Hello, 'world';
-- Asynchronous notification "Hello" with payload "world" received from server process with PID 14728.
```
The data on the payload is text only, and binaries cannot be sent or received.
If you want to put binary data, you need to convert it to text format with ʻencode` function or serialize it to JSON character string etc. with the calling application.
The payload size limit is less than 8000 bytes, and if it is exceeded, the following error will be returned.
ERROR: payload string too long
SQL state: 22023
Collected notifications
BEGIN;
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'a';
NOTIFY foo, 'b';
NOTIFY foo, 'c';
COMMIT;
-- Asynchronous notification "foo" with payload "a" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "b" received from server process with PID 14728.
-- Asynchronous notification "foo" with payload "c" received from server process with PID 14728.
pg_notification_queue_usage
function (expressed as a decimal number from 0 to 1).We will introduce three types of patterns when implementing Pub / Sub communication described so far in Java.
This is an implementation example using the JDBC driver of the PostgreSQL original family (the implementation example of the original family is here). When using Maven, add the following dependency.
pom.xml
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.8</version>
</dependency>
//Create a connection for LISTEN in advance
private final org.postgresql.jdbc.PgConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
/**
*Start receiving notifications.
*
* @param channel channel
*/
private void startListen(final String channel) {
try {
try (var stmt = this.listenerConn.createStatement()) {
stmt.execute("LISTEN " + channel);
while (true) {
var notifications = pgconn.getNotifications(10 * 1000);
if (this.terminated) {
return;
}
if (notifications == null) {
continue;
}
for (var n : notifications) {
LOG.info("Received Notification: pid={}, channel={}, payload={}", n.getPID(), n.getName(), n.getParameter());
}
}
}
} catch (SQLException e) {
LOG.error("exception thrown {}", e.getMessage());
}
}
/**
*Notifies the PostgreSQL server.
*
* @param channel channel
* @param payload message payload
*/
private void notify(final String channel, final String payload) {
try {
var conn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PgConnection.class);
var pstmt = conn.prepareStatement("select pg_notify(?, ?)");
try (conn; pstmt) {
pstmt.setString(1, channel);
pstmt.setString(2, payload);
pstmt.execute();
}
LOG.info("Notified: pid={}, channel={}, payload={}", pgconn.getBackendPID(), channel, payload);
} catch (SQLException e) {
LOG.error("exception thrown", e);
}
}
PgConnection # getNotifications (int timeoutMillis)
, it will block here for a specified time until notifications come, so if you enclose it in a loop, it will be a long polling logic.NOTIFY
query, I get ʻorg.postgresql.util.PSQLException, and I can't find a workaround, so I try to execute
pg_notify`.PGJDBC-NG
pom.xml
<dependency>
<groupId>com.impossibl.pgjdbc-ng</groupId>
<artifactId>pgjdbc-ng</artifactId>
<version>0.8.3</version>
</dependency>
//Create a connection for LISTEN in advance
private final com.impossibl.postgres.api.jdbc.PGConnection listenerConn = DriverManager.getConnection(URL, USERNAME, PASSWORD).unwrap(PGConnection.class);
/**
*Start receiving notifications.
*
* @param channel channel
*/
private void startListen(final String channel) {
try {
this.listenerConn.addNotificationListener(new PGNotificationListener() {
@Override
public void notification(final int pid, final String channel, final String payload) {
LOG.info("Received Notification: {}, {}, {}", pid, channel, payload);
}
});
try (var stmt = this.listenerConn.createStatement()) {
stmt.execute("LISTEN " + channel);
}
} catch (SQLException e) {
LOG.error("exception thrown {}", e.getMessage());
}
}
// notify()Is similar to the PostgreSQL JDBC driver
As you can see, here you can implement the behavior when receiving a notification in the form of an event listener. It is also possible to register a listener by specifying a channel.
R2DBC
R2DBC is a newly developed JDBC driver from the perspective of reactive programming. Fully compliant with Reactive Streams, it claims to be completely non-blocking I / O.
pom.xml
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>0.8.0.RELEASE</version>
</dependency>
//Set up a connection for sending and receiving in advance
private Mono<PostgresqlConnection> receiver;
private Mono<PostgresqlConnection> sender;
var connFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("...")
.port(5432)
.username("...")
.password("...")
.database("...")
.build());
this.receiver = connFactory.create();
this.sender = connFactory.create();
/**
*Start receiving notifications.
*
* @param channel channel
*/
private void startListen(final String channel) {
this.receiver.map(pgconn -> {
return pgconn.createStatement("LISTEN " + channel)
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.thenMany(pgconn.getNotifications())
.doOnNext(notification -> LOG.info("Received Notification: {}, {}, {}", notification.getProcessId(), notification.getName(), notification.getParameter()))
.doOnSubscribe(s -> LOG.info("listen start"))
.subscribe();
}).subscribe();
}
/**
*Notifies the PostgreSQL server.
*
* @param channel channel
* @param payload message payload
*/
private void notify(final String channel, final String payload) {
this.sender.map(pgconn -> {
return pgconn.createStatement("NOTIFY " + channel + ", '" + payload + "'")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.then()
.doOnSubscribe(s -> LOG.info("Notified: channel={}, payload={}", channel, payload))
.subscribe();
}).subscribe();
}
When using R2DBC, you will use the API of the dependent Project Reactor entirely.
This time I will only briefly explain, but I will build a series of flow of executing a query, handling the result, setting ancillary actions that move at a specified timing, and finally this flow will start moving You are calling subscribe ()
.
The action when the notification arrives with doOnNext ()
is set, and the action when subscribed with doOnSubscribe ()
(timing to execute the query) is set, and here the log is simply output.
At first glance, I was disappointed with the feeling of creating asynchronous stream processing in a style similar to Java's Stream API, but this page was a great learning experience.
PostgreSQL NOTIFY / LISTEN is Release 9.0, and the storage destination of the wait state event is the memory from the conventional system table. The ability to send payloads with notifications instead of queues has improved performance and convenience, and is now in its current form. It seems that the function itself has been installed for a long time, but since it has not been published in Qiita so far, I tried to publish it also as an information organization obtained by in-house technical verification.
By the way, R2DBC is still under active development and I have been watching it since this year, but since it came to support the NOTIFY / LISTEN function in September this year, I covered it a little in this article. I will write an article featuring R2DBC soon.
Recommended Posts