#!/usr/bin/env bash

export PGUSER=${PGUSER:-postgres}
export PGDATABASE=${PGDATABASE:-postgres}
export SRC_PORT=${SRC_PORT:-5432}
export DST_PORT=${DST_PORT:-5433}

NTABLES=${NTABLES:-100}
BATCH_SIZE=${BATCH_SIZE:-8}
NROWS=${NROWS:-10000}

PSQL="$(which psql)"
psql() {
    ${PSQL} -X -v ON_ERROR_STOP=1 "$@"
}

for port in {${SRC_PORT},${DST_PORT}}; do
    for tnum in $(seq 0 ${NTABLES}); do
        echo "CREATE TABLE t${tnum} (key INTEGER PRIMARY KEY, val INTEGER);";
    done | psql -p $port -q;
done

psql -p ${SRC_PORT} -c "CREATE PUBLICATION pub FOR TABLE t0;"
psql -p ${DST_PORT} <<EOF
CREATE SUBSCRIPTION sub
  CONNECTION 'port=${SRC_PORT} user=${PGUSER} dbname=${PGDATABASE}'
  PUBLICATION pub;
EOF

SENTINEL=$(mktemp)
trap "rm -f ${SENTINEL}" EXIT

for tnum in $(seq 0 ${NTABLES}); do
    (
        for val in $(seq 1 ${NROWS}); do
            [ -e ${SENTINEL} ] || break
            for key in $(seq 1 ${NROWS}); do
                [ -e ${SENTINEL} ] || break
                echo -n "INSERT INTO t${tnum} VALUES (${key}, ${val})"
                echo    " ON CONFLICT (key) DO UPDATE SET val = EXCLUDED.val;";
            done;
        done | psql -p ${SRC_PORT} -q
    ) &
done

tables_synced_count() {
    psql -p ${DST_PORT} -t <<EOF
        SELECT COUNT(sr.*)
          FROM pg_subscription_rel AS sr
         INNER JOIN pg_subscription AS s ON (s.oid = srsubid)
         WHERE subname = 'sub'
           AND srrelid = ANY('{$1}'::regclass[])
           AND srsubstate = 'r'
EOF
}

ntables_synced=1
while [ ${ntables_synced} -lt ${NTABLES} ]; do
    target=$((${ntables_synced} + ${BATCH_SIZE} - 1))
    target=$((${target} > ${NTABLES} ? ${NTABLES} : ${target}))
    tnames=$(for tnum in $(seq $ntables_synced $target); do echo -n ", t${tnum}"; done | cut -c3-)
    cmd=$(for tnum in $(seq $ntables_synced $target); do echo -n ", ADD TABLE t${tnum}"; done | cut -c3-)
    echo "Adding table(s) ${tnames}"
    psql -p ${SRC_PORT} -q -c "ALTER PUBLICATION pub ADD TABLE ${tnames};"
    psql -p ${DST_PORT} -q -c "ALTER SUBSCRIPTION sub REFRESH PUBLICATION;"
    while sleep 1; do
        [ "$(tables_synced_count "${tnames}")" -eq $(($target - $ntables_synced + 1)) ] && break
    done
    ntables_synced=$((${target} + 1))
done

rm -f ${SENTINEL}
echo "Waiting for data generators ..."
wait

echo "Waiting for replication lag ..."
SRC_LSN=$(echo $(psql -p ${SRC_PORT} -q -t -c "SELECT pg_current_wal_lsn()"))

wal_sync_status() {
    psql -p ${DST_PORT} -q -t <<EOF
        SELECT pg_wal_lsn_diff('${SRC_LSN}', latest_end_lsn) <= 0
          FROM pg_stat_subscription
         WHERE subname = 'sub'
           AND leader_pid IS NULL AND relid IS NULL
EOF
}

while sleep 1; do
    [ x"$(wal_sync_status)" == x" t" ] && break
done

# PG 14 seems to need some extra time, for some reason
sleep 10

echo "Verifying data"
for tnum in $(seq 0 ${NTABLES}); do
    if ! diff -q &>/dev/null \
        <(psql -p ${SRC_PORT} -c "COPY (SELECT * FROM t${tnum} ORDER BY key) TO STDOUT") \
        <(psql -p ${DST_PORT} -c "COPY (SELECT * FROM t${tnum} ORDER BY key) TO STDOUT"); then
        echo "Table t${tnum} has mismatch";
    fi
done
