Subscribing to PostgreSql logical replication using python and psycopg2

When postgresql is used as a transactional database, there are use cases where the data changes from the transactional database are captured and send to other databases like your datamart or datawarehouse. You could use cloud services like Aws Dms or replication software like Debazium to do this. In this blog post I will show you how to use python to read changes (cdc, change data capture) from a postgresql database using the wal2json output plugin and psycopg2.

When you are compiling postgresql from source code, you can enable the output plugins test_decoder or wal2json as shown below.

            cd /home/postgres/tmp/postgresql-14.4/contrib/test_decoding
            make PG_CONFIG=/u01/pg/14/bin/pg_config
            make PG_CONFIG=/u01/pg/14/bin/pg_config install
            cd /home/postgres/tmp/postgresql-14.4/
            wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_4.tar.gz
            tar -xzvf wal2json_2_4.tar.gz
            cd wal2json-wal2json_2_4/ 
            make PG_CONFIG=/u01/pg/14/bin/pg_config
            make PG_CONFIG=/u01/pg/14/bin/pg_config install

where /home/postgres/tmp/postgresql-14.4 is the directory where you untarred your postgres source code into, before compiling and installing postgres to /u01/pg/14

In order to proceed you need to have installed the library psycopg2 with python3 (Eg: pip install psycopg2)

You also need to make sure that the parameter wal_level is set to ‘logical’ in the postgresql.conf file of your postgres database.

create a table named books in your postgresql database.

create table books (bookid bigint primary key,bookname varchar(100));

Insert a few rows into the table.

insert into books values (1,'First Book');
insert into books values (2,'Second Book');
insert into books values (3,'Third Book');

The python library psycopg2 has a module named extras which provides helpers to read from postgres logical replication publishers. We will be using the functions from this module, namely create_replication_slot , start_replication and consume_stream to create the publisher and subscriber for logical replication.

Here is the code sample for pglogical.py

from __future__ import print_function
import sys
import psycopg2
import psycopg2.extras

conn = psycopg2.connect(
    'host=localhost user=postgres port=5432',
    connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()
replication_options = {
'include-xids':'1',
'include-timestamp':'1',
'pretty-print':'1'
}
try:
    cur.start_replication(
        slot_name='pytest', decode=True,
        options=replication_options)
except psycopg2.ProgrammingError:
    cur.create_replication_slot('pytest', output_plugin='wal2json')
    cur.start_replication(
        slot_name='pytest', decode=True,
        options=replication_options)


class DemoConsumer(object):
    def __call__(self, msg):
        print(msg.payload)
        msg.cursor.send_feedback(flush_lsn=msg.data_start)

democonsumer = DemoConsumer()

print("Starting streaming, press Control-C to end...", file=sys.stderr)
try:
   cur.consume_stream(democonsumer)
except KeyboardInterrupt:
   cur.close()
   conn.close()
   print("The slot 'pytest' still exists. Drop it with "
         "SELECT pg_drop_replication_slot('pytest'); if no longer needed.",
         file=sys.stderr)
   print("WARNING: Transaction logs will accumulate in pg_xlog "
         "until the slot is dropped.", file=sys.stderr)

The code above is a modified version of the code, published by Marco Nenciarini here.

The code above uses the wal2json output plugin, you can change it to use the test_decoding output plugin, but you will also have to change the replication_options variable to those supported by test_decoding.

You can run the following command to create the publication and the subscriber.

python3 pglogical.py

You will see a prompt saying “Starting streaming, press Control-C to end…”

Let us now make a few changes to the books table.

insert into books values (5,'Fifth Book');

do $$
<<first_block>>
begin
    insert into books values (6,'Sixth Book');
    delete from books where bookid = 3;
end first_block $$;

If you now go back to the screen where you ran your python program, you can see the following messages on screen.

{
        "xid": 741,
        "timestamp": "2022-07-11 21:29:37.301299+00",
        "change": [
                {
                        "kind": "insert",
                        "schema": "public",
                        "table": "books",
                        "columnnames": ["bookid", "bookname"],
                        "columntypes": ["bigint", "character varying(100)"],
                        "columnvalues": [5, "Fifth Book"]
                }
        ]
}
{
        "xid": 742,
        "timestamp": "2022-07-11 21:33:07.913942+00",
        "change": [
                {
                        "kind": "insert",
                        "schema": "public",
                        "table": "books",
                        "columnnames": ["bookid", "bookname"],
                        "columntypes": ["bigint", "character varying(100)"],
                        "columnvalues": [6, "Sixth Book"]
                }
                ,{
                        "kind": "delete",
                        "schema": "public",
                        "table": "books",
                        "oldkeys": {
                                "keynames": ["bookid"],
                                "keytypes": ["bigint"],
                                "keyvalues": [3]
                        }
                }
        ]
}

These are the changes the python subscriber program is reading from postgres the logical replication publisher.

You can then write these changes either to a csv file  or to another database as you choose.

Installing the pg_partman extension

pg_partman is an extension that simplifies the process of partition management in postgres.

Below are the steps that I followed to install the pg_partman extension with postgres 12.8.

Change your current working directory to the directory where you unzipped the postgres 12.8 source code.

cd <SomeStaticPath>/postgresql-12.8/contrib

git clone https://github.com/pgpartman/pg_partman.git

cd pg_partman

make PG_CONFIG=<pghome>/bin/pg_config NO_BGW=1

make install

Then Edit your postgresql.conf file and add pg_partman_bgw to the parameter shared_preload_libraries

Now restart your postgres instance

At this time you are ready to create the extension from postgres and use it.

Use psql to login to your database

CREATE SCHEMA partman;

CREATE EXTENSION pg_partman SCHEMA partman;

\dx (To list the extensions and the version installed in your database)

Loading IMDB data into postgresql

IMDB (Internet Movie Database) makes the movie dataset available for free download at https://datasets.imdbws.com/. The documentation for this dataset can be found at https://www.imdb.com/interfaces/.

In this blog post, I show you how to load this data into a PostgreSql database. The steps are executed from an Ubuntu Linux workstation. I assume that you already have a postgresql database with about 50Gb of free space to upload this data into and you know the connection information.

We’ll use the s32imdbpy.py script that can be downloaded from github .

From the ubuntu workstation, install the following packages

sudo apt install python3-pip
sudo apt-get install libpq-dev

Now install the following python modules

pip3 install Psycopg2
pip3 install imdbpy

Create a new directory, to store the downloaded datafiles and download the datafiles from https://datasets.imdbws.com into this directory

mkdir dat
cd dat
wget https://datasets.imdbws.com/name.basics.tsv.gz
wget https://datasets.imdbws.com/title.akas.tsv.gz
wget https://datasets.imdbws.com/title.basics.tsv.gz
wget https://datasets.imdbws.com/title.crew.tsv.gz
wget https://datasets.imdbws.com/title.episode.tsv.gz
wget https://datasets.imdbws.com/title.principals.tsv.gz
wget https://datasets.imdbws.com/title.ratings.tsv.gz

Login to your postgresql database and create a new schema to hold the imdb tables (This step is optional. If you do not create this schema, then the tables and the corresponding data gets loaded into the public schema).

create schema imdb;

Before you run the script, lets edit the script and make one change, which will enable the script to load the data into the newly created imdb schema. This change will be made in line 183 in the file.

Change
engine = sqlalchemy.create_engine(db_uri, encoding='utf-8', echo=False)
To
engine = sqlalchemy.create_engine(db_uri, encoding='utf-8', echo=False,connect_args={'options': '-csearch_path={}'.format('imdb')})

Then Execute the script as shown below

python3 s32imdbpy.py /home/ubuntu/dat postgresql://username:password@dbhostname/dbname

Where /home/ubuntu/dat is the directory where the imdb files are downloaded into.

This will take some time to load (Close to an hour on a reasonably sized ubuntu workstation) and will consume about 15Gb of space in your postgresql database.

Substitution variables in psql scripts

As postgresql users and administrators, we tend to create lot of scripts, and run them routinely, for common queries we need to execute against the database. It is likely that in some of those scripts, you would want to parameterize values used in filter conditions. Below is an example of how this can be done.

psql is the terminal based front end tool to interact with postgresql. You can run scripts stored in files in the file system, using the \i directive in plsql. You can use the \set command to set variables in psql. If you want to prompt for the value to be entered by the user, you can use the \prompt directive.

\prompt 'Enter Table Name : ' tabname


select last_vacuum,last_autovacuum,last_analyze ,last_autoanalyze,n_live_tup,n_dead_tup
from pg_stat_user_tables
where relname = :'tabname';

Now if you run this from psql you will be first prompted for the table name and then it will display the results for the table name you entered.

postgres=# \i pgstats.sql
Enter Table Name : nflstats

 last_vacuum | last_autovacuum |         last_analyze         |       last_autoanalyze        | n_live_tup | n_dead_tup 
-------------+-----------------+------------------------------+-------------------------------+------------+------------
             |                 | 2020-06-08 18:07:36.29538+00 | 2020-06-08 15:37:48.738104+00 |     270418 |          0