The following example demonstrates controlling logical decoding using the SQL interface.
Before you can use logical decoding, you must set
wal_level to logical and
max_replication_slots to at least 1. Then, you
should connect to the target database (in the example
below, lightdb) as a superuser.
lightdb@postgres=# -- Create a slot named 'regression_slot' using the output plugin 'wal2sql'
lightdb@postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'wal2sql');
slot_name | lsn
-----------------+------------
regression_slot | 0/23F5E930
(1 row)
lightdb@postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
-----------------+---------+-----------+----------+--------+-------------+---------------------
regression_slot | wal2sql | logical | postgres | f | 0/23F5E8F8 | 0/23F5E930
(1 row)
lightdb@postgres=# -- There are no changes to see yet
lightdb@postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----+-----+------
(0 rows)
lightdb@postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE
lightdb@postgres=# -- DDL isn't replicated, so all you'll see is the empty json
lightdb@postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
------------+------+----------
0/23F914F8 | 2048 | {"C":[]}
(1 rows)
lightdb@postgres=# -- Once changes are read, they're consumed and not emitted
lightdb@postgres=# -- in a subsequent call:
lightdb@postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----+-----+------
(0 rows)
lightdb@postgres=# BEGIN;
lightdb@postgres=*# INSERT INTO data(data) VALUES('1');
lightdb@postgres=*# INSERT INTO data(data) VALUES('2');
lightdb@postgres=*# COMMIT;
lightdb@postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
------------+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
0/23FC83D8 | 2054 | {"C":[{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[1,"1"],"pk":{"pkv":[1]}},{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[2,"2"],"pk":{"pkv":[2]}}]}
(1 row)
lightdb@postgres=# INSERT INTO data(data) VALUES('3');
lightdb@postgres=# -- You can also peek ahead in the change stream without consuming changes
lightdb@postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
lsn | xid | data
------------+------+--------------------------------------------------------------------------------------------------------------------------
0/23FC8928 | 2055 | {"C":[{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[3,"3"],"pk":{"pkv":[3]}}]}
(1 row)
lightdb@postgres=# -- The next call to pg_logical_slot_peek_changes() returns the same changes again
lightdb@postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL);
lsn | xid | data
------------+------+--------------------------------------------------------------------------------------------------------------------------
0/23FC8928 | 2055 | {"C":[{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[3,"3"],"pk":{"pkv":[3]}}]}
(1 row)
lightdb@postgres=# -- options can be passed to output plugin, to influence the formatting
lightdb@postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-timestamp', 'on');
lsn | xid | data
------------+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------
0/23FC8928 | 2055 | {"timestamp":"2024-07-05 17:31:08.395304+08","C":[{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[3,"3"],"pk":{"pkv":[3]}}]}
(1 row)
lightdb@postgres=# -- Remember to destroy a slot you no longer need to stop it consuming
lightdb@postgres=# -- server resources:
lightdb@postgres=# SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
-----------------------
(1 row)
The following example shows how logical decoding is controlled over the
streaming replication protocol, using the
program lt_recvlogical included in the LightDB
distribution. This requires that client authentication is set up to allow
replication connections
(see Section 25.2.5.1) and
that max_wal_senders is set sufficiently high to allow
an additional connection.
$ lt_recvlogical -d postgres --slot=test --create-slot $ lt_recvlogical -d postgres --slot=test --start -f - Control+Z $ ltsql -d postgres -c "INSERT INTO data(data) VALUES('4');" $ fg {"C":[{"K":"I","S":" ","T":"data","CN":["id","data"],"columntypes":["integer","text"],"CV":[4,"4"],"pk":{"pkv":[4]}}]} Control+C $ lt_recvlogical -d postgres --slot=test --drop-slot