Installing and using the postgres docker image

Using the postgres docker image, is the quickest way to get started with any of your postgres development environments, or if you need to test any features of the postgres database. Official postgres database images are available for download and use from docker hub. You can get images of postgres 11,12,13,14 and 15 from docker hub. In this blog post we will see how to get started with downloading and running this image on Ubuntu Linux.

The steps outlined below assume that you have one of the newer versions of ubuntu installed on a host and that you are using the linux user named “ubuntu” to run docker.

As root execute the following commands on the ubuntu host to download docker. These steps also setup the ubuntu user to run docker containers.

            mkdir -m 0755 -p /etc/apt/keyrings
            curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
            echo \
            "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \
            $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
            apt-get update -y
            apt-get install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin -y
            systemctl disable --now docker.service docker.socket
            su - ubuntu -c "dockerd-rootless-setuptool.sh install"

Now you are ready to download and run the postgres container.

Login as the “ubuntu” user and Create a text file named pg.yaml file, that has the following instructions. The following instructions assume that you have a directory /u01/pg/data which you want to use to store your postgres datafiles.

services:
  db:
    image: postgres:latest
    restart: always
    environment:
      POSTGRES_PASSWORD: MyPassword
    ports:
      - "5432:5432"
    volumes:
      - /u01/pg/data:/var/lib/postgresql/data

Now we can run docker to download and run this image using the following command

docker compose -f /home/ubuntu/tmp/pg.yaml up -d db

This command downloads and installs the latest postgres image (Which at the time of writing is 15.2) You can now connect to the postgres database from the ubuntu host as shown below.

psql -h localhost -p 5432 -d postgres -U postgres 

You can login as root to your postgres container using the command below

docker exec -it cc18a27f8850 bash

where cc18a27f8850 is the container id (You can get your container id by running the command, docker container ls -a.

Once you are logged in as root to the postgres container, you can su – postgres and then run commands to stop and start postgres. pg_ctl can be found in /usr/lib/postgresql/15/bin.

Subscribing to PostgreSql logical replication using python and psycopg2

When postgresql is used as a transactional database, there are use cases where the data changes from the transactional database are captured and send to other databases like your datamart or datawarehouse. You could use cloud services like Aws Dms or replication software like Debazium to do this. In this blog post I will show you how to use python to read changes (cdc, change data capture) from a postgresql database using the wal2json output plugin and psycopg2.

When you are compiling postgresql from source code, you can enable the output plugins test_decoder or wal2json as shown below.

            cd /home/postgres/tmp/postgresql-14.4/contrib/test_decoding
            make PG_CONFIG=/u01/pg/14/bin/pg_config
            make PG_CONFIG=/u01/pg/14/bin/pg_config install
            cd /home/postgres/tmp/postgresql-14.4/
            wget https://github.com/eulerto/wal2json/archive/refs/tags/wal2json_2_4.tar.gz
            tar -xzvf wal2json_2_4.tar.gz
            cd wal2json-wal2json_2_4/ 
            make PG_CONFIG=/u01/pg/14/bin/pg_config
            make PG_CONFIG=/u01/pg/14/bin/pg_config install

where /home/postgres/tmp/postgresql-14.4 is the directory where you untarred your postgres source code into, before compiling and installing postgres to /u01/pg/14

In order to proceed you need to have installed the library psycopg2 with python3 (Eg: pip install psycopg2)

You also need to make sure that the parameter wal_level is set to ‘logical’ in the postgresql.conf file of your postgres database.

create a table named books in your postgresql database.

create table books (bookid bigint primary key,bookname varchar(100));

Insert a few rows into the table.

insert into books values (1,'First Book');
insert into books values (2,'Second Book');
insert into books values (3,'Third Book');

The python library psycopg2 has a module named extras which provides helpers to read from postgres logical replication publishers. We will be using the functions from this module, namely create_replication_slot , start_replication and consume_stream to create the publisher and subscriber for logical replication.

Here is the code sample for pglogical.py

from __future__ import print_function
import sys
import psycopg2
import psycopg2.extras

conn = psycopg2.connect(
    'host=localhost user=postgres port=5432',
    connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()
replication_options = {
'include-xids':'1',
'include-timestamp':'1',
'pretty-print':'1'
}
try:
    cur.start_replication(
        slot_name='pytest', decode=True,
        options=replication_options)
except psycopg2.ProgrammingError:
    cur.create_replication_slot('pytest', output_plugin='wal2json')
    cur.start_replication(
        slot_name='pytest', decode=True,
        options=replication_options)


class DemoConsumer(object):
    def __call__(self, msg):
        print(msg.payload)
        msg.cursor.send_feedback(flush_lsn=msg.data_start)

democonsumer = DemoConsumer()

print("Starting streaming, press Control-C to end...", file=sys.stderr)
try:
   cur.consume_stream(democonsumer)
except KeyboardInterrupt:
   cur.close()
   conn.close()
   print("The slot 'pytest' still exists. Drop it with "
         "SELECT pg_drop_replication_slot('pytest'); if no longer needed.",
         file=sys.stderr)
   print("WARNING: Transaction logs will accumulate in pg_xlog "
         "until the slot is dropped.", file=sys.stderr)

The code above is a modified version of the code, published by Marco Nenciarini here.

The code above uses the wal2json output plugin, you can change it to use the test_decoding output plugin, but you will also have to change the replication_options variable to those supported by test_decoding.

You can run the following command to create the publication and the subscriber.

python3 pglogical.py

You will see a prompt saying “Starting streaming, press Control-C to end…”

Let us now make a few changes to the books table.

insert into books values (5,'Fifth Book');

do $$
<<first_block>>
begin
    insert into books values (6,'Sixth Book');
    delete from books where bookid = 3;
end first_block $$;

If you now go back to the screen where you ran your python program, you can see the following messages on screen.

{
        "xid": 741,
        "timestamp": "2022-07-11 21:29:37.301299+00",
        "change": [
                {
                        "kind": "insert",
                        "schema": "public",
                        "table": "books",
                        "columnnames": ["bookid", "bookname"],
                        "columntypes": ["bigint", "character varying(100)"],
                        "columnvalues": [5, "Fifth Book"]
                }
        ]
}
{
        "xid": 742,
        "timestamp": "2022-07-11 21:33:07.913942+00",
        "change": [
                {
                        "kind": "insert",
                        "schema": "public",
                        "table": "books",
                        "columnnames": ["bookid", "bookname"],
                        "columntypes": ["bigint", "character varying(100)"],
                        "columnvalues": [6, "Sixth Book"]
                }
                ,{
                        "kind": "delete",
                        "schema": "public",
                        "table": "books",
                        "oldkeys": {
                                "keynames": ["bookid"],
                                "keytypes": ["bigint"],
                                "keyvalues": [3]
                        }
                }
        ]
}

These are the changes the python subscriber program is reading from postgres the logical replication publisher.

You can then write these changes either to a csv file  or to another database as you choose.