Subscribing to PostgreSql logical replication using python and psycopg2

When postgresql is used as a transactional database, there are use cases where the data changes from the transactional database are captured and send to other databases like your datamart or datawarehouse. You could use cloud services like Aws Dms or replication software like Debazium to do this. In this blog post I will show you how to use python to read changes (cdc, change data capture) from a postgresql database using the wal2json output plugin and psycopg2.

When you are compiling postgresql from source code, you can enable the output plugins test_decoder or wal2json as shown below.

            cd /home/postgres/tmp/postgresql-14.4/contrib/test_decoding
            make PG_CONFIG=/u01/pg/14/bin/pg_config
            make PG_CONFIG=/u01/pg/14/bin/pg_config install
            cd /home/postgres/tmp/postgresql-14.4/
            wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_4.tar.gz
            tar -xzvf wal2json_2_4.tar.gz
            cd wal2json-wal2json_2_4/ 
            make PG_CONFIG=/u01/pg/14/bin/pg_config
            make PG_CONFIG=/u01/pg/14/bin/pg_config install

where /home/postgres/tmp/postgresql-14.4 is the directory where you untarred your postgres source code into, before compiling and installing postgres to /u01/pg/14

In order to proceed you need to have installed the library psycopg2 with python3 (Eg: pip install psycopg2)

You also need to make sure that the parameter wal_level is set to ‘logical’ in the postgresql.conf file of your postgres database.

create a table named books in your postgresql database.

create table books (bookid bigint primary key,bookname varchar(100));

Insert a few rows into the table.

insert into books values (1,'First Book');
insert into books values (2,'Second Book');
insert into books values (3,'Third Book');

The python library psycopg2 has a module named extras which provides helpers to read from postgres logical replication publishers. We will be using the functions from this module, namely create_replication_slot , start_replication and consume_stream to create the publisher and subscriber for logical replication.

Here is the code sample for pglogical.py

from __future__ import print_function
import sys
import psycopg2
import psycopg2.extras

conn = psycopg2.connect(
    'host=localhost user=postgres port=5432',
    connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()
replication_options = {
'include-xids':'1',
'include-timestamp':'1',
'pretty-print':'1'
}
try:
    cur.start_replication(
        slot_name='pytest', decode=True,
        options=replication_options)
except psycopg2.ProgrammingError:
    cur.create_replication_slot('pytest', output_plugin='wal2json')
    cur.start_replication(
        slot_name='pytest', decode=True,
        options=replication_options)


class DemoConsumer(object):
    def __call__(self, msg):
        print(msg.payload)
        msg.cursor.send_feedback(flush_lsn=msg.data_start)

democonsumer = DemoConsumer()

print("Starting streaming, press Control-C to end...", file=sys.stderr)
try:
   cur.consume_stream(democonsumer)
except KeyboardInterrupt:
   cur.close()
   conn.close()
   print("The slot 'pytest' still exists. Drop it with "
         "SELECT pg_drop_replication_slot('pytest'); if no longer needed.",
         file=sys.stderr)
   print("WARNING: Transaction logs will accumulate in pg_xlog "
         "until the slot is dropped.", file=sys.stderr)

The code above is a modified version of the code, published by Marco Nenciarini here.

The code above uses the wal2json output plugin, you can change it to use the test_decoding output plugin, but you will also have to change the replication_options variable to those supported by test_decoding.

You can run the following command to create the publication and the subscriber.

python3 pglogical.py

You will see a prompt saying “Starting streaming, press Control-C to end…”

Let us now make a few changes to the books table.

insert into books values (5,'Fifth Book');

do $$
<<first_block>>
begin
    insert into books values (6,'Sixth Book');
    delete from books where bookid = 3;
end first_block $$;

If you now go back to the screen where you ran your python program, you can see the following messages on screen.

{
        "xid": 741,
        "timestamp": "2022-07-11 21:29:37.301299+00",
        "change": [
                {
                        "kind": "insert",
                        "schema": "public",
                        "table": "books",
                        "columnnames": ["bookid", "bookname"],
                        "columntypes": ["bigint", "character varying(100)"],
                        "columnvalues": [5, "Fifth Book"]
                }
        ]
}
{
        "xid": 742,
        "timestamp": "2022-07-11 21:33:07.913942+00",
        "change": [
                {
                        "kind": "insert",
                        "schema": "public",
                        "table": "books",
                        "columnnames": ["bookid", "bookname"],
                        "columntypes": ["bigint", "character varying(100)"],
                        "columnvalues": [6, "Sixth Book"]
                }
                ,{
                        "kind": "delete",
                        "schema": "public",
                        "table": "books",
                        "oldkeys": {
                                "keynames": ["bookid"],
                                "keytypes": ["bigint"],
                                "keyvalues": [3]
                        }
                }
        ]
}

These are the changes the python subscriber program is reading from postgres the logical replication publisher.

You can then write these changes either to a csv file  or to another database as you choose.

Graph CPU usage on exadata using oswatcher files

On the oracle database machine, oswatcher is installed during setup time, both on the database nodes and the exadata cells. This utility collects linux operating system level statistics, which comes in very handy when troubleshooting operating system level issues. The data is collected in text files. There is a Java based utility (OSWG) provided by oracle support to graph the contents of these files, however that utility does not work on the oswatcher files generated on exadata.

Here is a python script that can graph the cpu used from the mpstat information that oswatcher captures. It has been tested on new oswatcher files on an x3-2. You need to first install a python environment that has the “numpy” and “matplotlib” modules installed.

Install a Python Virtualenv.

If you create multiple applications using Python and end up using different versions, it is easier to maintain different virtualenv’s. You can create a python virtualenv as shown below (On ubuntu linux).

curl -O https://pypi.python.org/packages/source/v/virtualenv/virtualenv-1.9.1.tar.gz
tar -xzvf virtualenv-1.9.1.tar.gz
cd virtualenv-1.9.1
python virtualenv.py ../p273env2
. p273env2/bin/activate
pip install numpy
sudo apt-get install libfreetype6-dev
pip install matplotlib

Now that you have a python environment, with your required libraries, you can go ahead and execute the script as shown below.

The oswatcher files in /opt/oracle/oswatcher are .bz2 files and there will be one file per hour per day. Copy the mpstat .bz2 files into a directory and use bunzip2 to unzip them. In this example let us say that the directory name is /u01/oswatcher/mpstat/tmp

You can now run the script as shown below

python parseoswmp.py  /u01/oswatcher/mpstat/tmp
or
python parseoswmp.py  /u01/oswatcher/mpstat/tmp '06/14/2013 05:00:00 AM' '06/14/2013 07:00:00 AM'

The first command will graph the cpu usage for the entire time range in all those files and the second command graphs the cpu information for the date and time range you have specified.

It creates a file in the current directory, named oswmpstat.png, which has the graph.

You can find the full script here.

You can find a sample output graph here.

Using Python 3

I have been writing some python scripts for awr analysis and trending. Since python 2.7 is no longer being enhanced, i have now switched to using python 3. Lot of python applications and frameworks still does not support python 3 (Notably the Django framework). Good news is that cx_oracle works with python 3.

The steps to install cx_oracle with python 3 are very similar to the steps that i had outlined in my previous post on installing cx_oracle with python 2.7.

The difference is that

– You have to first install python3 and python3-dev (On ubuntu, you can just use the ubuntu software center to do this)

– Then download the cx_oracle 5.1.1 source code only tar ball from http://cx-oracle.sourceforge.net/

– login as root, untar the tar file, cd to the cx_Oracle-5.1.1 directory

– Then run /usr/bin/python3 setup.py install

That does it and now oracle connectivity is in place.

I’ve also been using the matplotlib library along with Python to plot graphs with the awr and oswatcher data files. matplotlib also works with python 3.

– You have to first install libpng, libpng-dev, libfreetype6, libfreetype6-dev (Use the ubuntu software center)

– Download the numpy source code tar ball.

– Extract the tar file, login as root, cd to the directory and run /usr/bin/python3 setup.py install

– Installing matplotlib Ref :

– Download the matplotlib source code tar file

– Login as root, cd to the directory

– /usr/bin/python3 setup.py build

– /usr/bin/python3 setup.py install

Now you should have matplotlib working with python3

Enjoy your python scripting


Installing cx-oracle with 11.2.0.2 on ubuntu

cx_Oracle is a Python extension module that allows access to Oracle databases and conforms to the Python database API specification. Below are the steps i used to setup cx_Oracle, with 11.2.0.2, python 2.6 on Ubuntu 10.04 (Lucid Lynx).

1) Installed Oracle 11.2.0.2 Enterprise Edition on Ubuntu (You can also configure this by just installing the 11.2.0.2 instant client too)

2) Download cx_Oracle Source code from http://cx-oracle.sourceforge.net/

3) Install python-dev on ubuntu (Or else you will get compile errors (Like file Python.h not found) when you try to install cx-oracle)

– sudo apt-get install python-dev

4) Login as root

5) export ORACLE_HOME=/u01/11gr2/db_1;export LD_LIBRARY_PATH=$ORACLE_HOME/lib:$LD_LIBRARY_PATH

6) cd <dir-where-cx-oracle-untarred>/cx_Oracle-5.0.4

7) python setup.py install

Once it is installed you can run the sample program from http://wiki.oracle.com/page/Python to make sure it works.