#!/bin/bash

port_publisher=5431
port_subscriber=5432
bindir=/usr/local/pgsql/bin/

echo '##########'
echo '#Clean up#'
echo '##########'

pg_ctl stop -D data_N2
pg_ctl stop -D data_N1

rm -r data_N1 data_N2 *log

echo '########'
echo '#Set up#'
echo '########'

initdb -D data_N1 -U postgres
initdb -D data_N2 -U postgres

cat << EOF >> data_N1/postgresql.conf
wal_level = logical
port = $port_publisher
wal_sender_timeout = 5s
log_min_messages = debug3
EOF

cat << EOF >> data_N2/postgresql.conf
wal_level = logical
port = $port_subscriber
EOF

# Boot database instances

pg_ctl -D data_N1 start -w -l N1.log
pg_ctl -D data_N2 start -w -l N2.log

# Setup as publisher/subscriber

psql -U postgres -p $port_publisher -c "CREATE TABLE tbl (a int, b int);"
psql -U postgres -p $port_publisher -c "CREATE EXTENSION pg_walinspect"
psql -U postgres -p $port_publisher -c "CREATE PUBLICATION pub FOR TABLE tbl;"

psql -U postgres -p $port_subscriber -c "CREATE TABLE tbl (a int, b int);"
psql -U postgres -p $port_subscriber -c "CREATE SUBSCRIPTION sub CONNECTION 'user=postgres dbname=postgres port=$port_publisher' PUBLICATION pub WITH (copy_data = off)"

# do INSERT/VACUUM on publisher

psql -U postgres -p $port_publisher -c "INSERT INTO tbl VALUES (generate_series(1, 5))"

## NOTE: if CHECKPOINT is done here, HEAP/HEAP2 xlog records will be disappeared

## psql -U postgres -p $port_publisher -c "CHECKPOINT"

# Wait short time to make sure subscriber is caught up

sleep 5s

# Stop both nodes and reboot to emulate pg_upgrade

pg_ctl stop -D data_N2
pg_ctl stop -D data_N1

pg_ctl -D data_N1 start -w -l N1.log -o "-b"

echo '###############################################################'
echo '#Check difference of WAL position between publisher/subscriber#'
echo '###############################################################'

psql -U postgres -p $port_publisher -c "
WITH tmp as (
    SELECT confirmed_flush_lsn FROM pg_replication_slots
)
SELECT row_number() over (), start_lsn, end_lsn, prev_lsn, xid, resource_manager, record_type,
       record_length, main_data_length, fpi_length, description, block_ref
       FROM tmp, pg_get_wal_records_info(tmp.confirmed_flush_lsn, 'FFFFFFFF/FFFFFFFF');
"
