#!/usr/bin/bash
#

OUTDIR=$1
NUMTABLES=$2
REFRESH=$3
SLEEP=$4

# how long to run the various tasks
DURATION_CHECK=1200
DURATION_ALTER=1000
DURATION_WORKLOAD=1100
DURATION_REFRESH=1250

START_TIMESTAMP=`date +%s`

echo [`date`] [`date +%s`] "NUMTABLES=$NUMTABLES  SLEEP=$SLEEP  REFRESH=$REFRESH"

# config
PUB_PORT=5001
PUB_DATA=$OUTDIR/data-pub

SUB_PORT=5002
SUB_DATA=$OUTDIR/data-sub

# initialize clusters
pg_ctl -D $PUB_DATA init
pg_ctl -D $SUB_DATA init

echo 'wal_level = logical' >> $PUB_DATA/postgresql.conf
echo "port = $PUB_PORT" >> $PUB_DATA/postgresql.conf
echo "log_line_prefix = '%n %m [%p] [%b] [%a] [%v/%x] '" >> $PUB_DATA/postgresql.conf

echo "port = $SUB_PORT" >> $SUB_DATA/postgresql.conf
echo "log_line_prefix = '%n %m [%p] [%b] [%a] [%v/%x] '" >> $SUB_DATA/postgresql.conf

pg_ctl -D $PUB_DATA -w -l $OUTDIR/pg-pub.log start
pg_ctl -D $SUB_DATA -w -l $OUTDIR/pg-sub.log start

createdb -p $PUB_PORT test > $OUTDIR/init-pub.log 2>&1
createdb -p $SUB_PORT test > $OUTDIR/init-sub.log 2>&1

# create tables on both ends
for t in `seq 1 $NUMTABLES`; do
	psql -p $PUB_PORT test -c "CREATE TABLE t_${t} (id serial primary key, value bigint)"
	psql -p $SUB_PORT test -c "CREATE TABLE t_${t} (id serial primary key, value bigint)"
done

# generate scripts for pgbench
echo > insert.sql
echo > add.sql
echo > drop.sql
echo > single.sql

for s in `seq 1 $NUMTABLES`; do
	name="t_$s"
	echo "INSERT INTO $name (value) VALUES (random() * 1000);" >> insert.sql

	# we'll mix add/drop in a one huge transaction
	echo "ALTER PUBLICATION p DROP TABLE $name;" >> drop.sql;
	echo "SELECT pg_sleep(random()/100);" >> drop.sql

	echo "ALTER PUBLICATION p ADD TABLE $name;" >> add.sql;
	echo "SELECT pg_sleep(random()/100);" >> add.sql;

	# and then a bunch of small transaction adding/removing the table
	echo "BEGIN;" >> single.sql;
	echo "ALTER PUBLICATION p DROP TABLE $name;" >> single.sql;
	echo "SELECT pg_sleep(random()/100);" >> single.sql;
	echo "ALTER PUBLICATION p ADD TABLE $name;" >> single.sql;
	echo "SELECT pg_sleep(random()/100);" >> single.sql;
	echo "COMMIT;" >> single.sql;
	echo "SELECT pg_sleep(random()/10);" >> single.sql;
done

# shuffle the inserts a bit, to randomize the insert order
sort -R insert.sql > insert.tmp && mv insert.tmp insert.sql

# shuffle the ALTER PUBLICATION ... ADD/DROP too, but mabe sure to keep
# DROP before ADD of the same table
#
# use shuffle because "sort -R" groups the same key (e.g. pg_sleep calls)
# and we don't want that
shuf add.sql > add.tmp && mv add.tmp add.sql
shuf drop.sql > drop.tmp && mv drop.tmp drop.sql

# do the drop/add in a single large transation
echo "BEGIN;" > multi.tmp
cat drop.sql >> multi.tmp
cat add.sql >> multi.tmp
echo "COMMIT;" >> multi.tmp
mv multi.tmp multi.sql

# but also have a script with each DROP/ADD in a separate transaction
echo "SELECT pg_sleep(random()/10);" >> single.sql;

# move that to the output directory
mv insert.sql single.sql multi.sql $OUTDIR/

# run pgbench on the source in the background (5m is overkill, we will terminate)
pgbench -n -c 8 -f $OUTDIR/insert.sql -p $PUB_PORT -T $DURATION_WORKLOAD test > $OUTDIR/pgbench.log 2>&1 &

# create publication on 12 and add the table to it
echo [`date`] [`date +%s`] "creating publication"
psql -p $PUB_PORT test -c "CREATE PUBLICATION p"

# sleep for a few seconds
echo [`date`] [`date +%s`] "sleeping"
sleep $SLEEP

# add create subscription on 14
echo [`date`] [`date +%s`] "creating subscription"
psql -p $SUB_PORT test -c "CREATE SUBSCRIPTION s CONNECTION 'application_name=test host=localhost user=user dbname=test port=$PUB_PORT' PUBLICATION p"

# sleep for a few seconds
echo [`date`] [`date +%s`] "sleeping"
sleep $SLEEP

# refresh the subscription in a loop
./refresh.sh $SUB_PORT $DURATION_REFRESH > $OUTDIR/refresh.log 2>&1 &

# add tables, occasionally do refresh
for s in `seq 1 $NUMTABLES`; do

	lsn=`psql -p $PUB_PORT test -t -A -c "SELECT pg_current_wal_lsn()"`
	echo [`date`] [`date +%s`] "adding table $s to publication [$lsn]"

	psql -p $PUB_PORT test -c "ALTER PUBLICATION p ADD TABLE t_${s}"

	m=$((t % REFRESH))

	if [ "$m" == "0" ]; then

		lsn=`psql -p $PUB_PORT test -t -A -c "SELECT pg_current_wal_lsn()"`
		echo [`date`] [`date +%s`] "refreshing publication [$lsn]"

		psql -p $SUB_PORT test -c "ALTER SUBSCRIPTION s REFRESH PUBLICATION"
		psql -p $PUB_PORT test -c "SELECT pg_current_wal_lsn() AS wal_lsn, pg_current_wal_insert_lsn() AS insert_lsn, (SELECT relname FROM pg_class AS c WHERE c.oid = p.prrelid) AS relname, * FROM pg_publication_rel p ORDER BY prrelid" >> $OUTDIR/pg_publication_rel.log 2>&1
	fi

done

lsn=`psql -p $PUB_PORT test -t -A -c "SELECT pg_current_wal_lsn()"`
echo [`date`] [`date +%s`] "refreshing publication [$lsn]"

# but make sure to do a refresh at the end
psql -p $SUB_PORT test -c "ALTER SUBSCRIPTION s REFRESH PUBLICATION"

psql -p $PUB_PORT test -c "SELECT pg_current_wal_lsn() AS wal_lsn, pg_current_wal_insert_lsn() AS insert_lsn, (SELECT relname FROM pg_class AS c WHERE c.oid = p.prrelid) AS relname, * FROM pg_publication_rel p ORDER BY prrelid" >> $OUTDIR/pg_publication_rel.log 2>&1

# run pgbench on the source in the background, adding/removing tables
pgbench -n -f $OUTDIR/single.sql -p $PUB_PORT -T $DURATION_ALTER test > $OUTDIR/single.log 2>&1 &
pgbench -n -f $OUTDIR/multi.sql -p $PUB_PORT -T $DURATION_ALTER test > $OUTDIR/multi.log 2>&1 &

# wait for all tables to get in sync (essentially what wait_for_subscription_sync does)
while /bin/true; do
	echo [`date`] [`date +%s`] "waiting for subscription sync"
	c=`psql -p $SUB_PORT test -t -A -c "SELECT count(*) FROM pg_subscription_rel WHERE srsubstate = 'r'"`
	if [ "$c" == "$NUMTABLES" ]; then
		break
	fi
	echo [`date`] [`date +%s`] "synced $c tables out of $NUMTABLES"
	sleep 1
done

echo [`date`] [`date +%s`] "subscription sync done"

# wait for the pgbench to finish (if it's still running)
echo [`date`] [`date +%s`] "waiting for pgbench to complete"
wait

# get current LSN on publisher and wait for subscriber to fully catchup (same as wait_for_catchup)
lsn=`psql -p $PUB_PORT test -t -A -c "SELECT pg_current_wal_lsn()"`

echo [`date`] [`date +%s`] "waiting for replay of LSN $lsn"

start_lsn=`psql -p $PUB_PORT test -t -A -c "SELECT replay_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
start_time=`date +%s`

while /bin/true; do
	echo [`date`] [`date +%s`] "waiting for catchup $lsn"
	replay_lsn=`psql -p $PUB_PORT test -t -A -c "SELECT replay_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
	remaining_wal=`psql -p $PUB_PORT test -t -A -c "SELECT ('$lsn' - replay_lsn) FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
	remaining_kb=$((remaining_wal/1024))

	current_time=`date +%s`
	delta_time=$((current_time - start_time + 1))
	delta_wal=`psql -p $PUB_PORT test -t -A -c "SELECT (replay_lsn - '$start_lsn') FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
	total_wal=`psql -p $PUB_PORT test -t -A -c "SELECT ('$lsn'::pg_lsn - '$start_lsn'::pg_lsn) FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
	speed=$((delta_wal / delta_time))

	if [ "$speed" != "0" ]; then
		remaining_sec=$((remaining_wal / speed))
	else
		remaining_sec='???'
	fi

	r=`psql -p $PUB_PORT test -t -A -c "SELECT replay_lsn >= '$lsn' AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
	if [ "$r" == "t" ]; then
		break
	fi
	echo [`date`] [`date +%s`] "replay LSN $replay_lsn, remaining $remaining_kb kB / $remaining_sec seconds"
	sleep 1
done

# dump basic info about subscription and subscriber

psql -p $SUB_PORT test -c "SELECT oid, relname FROM pg_class AS c WHERE relname like 'seq_%' ORDER BY relname" > $OUTDIR/pg_class.log 2>&1

psql -p $SUB_PORT test -c "SELECT (SELECT relname FROM pg_class AS c WHERE c.oid = r.srrelid) AS relname, * FROM pg_subscription_rel AS r ORDER BY 1" > $OUTDIR/pg_subscription_rel.log 2>&1

psql -p $PUB_PORT test -c "SELECT pg_current_wal_lsn() AS wal_lsn, pg_current_wal_insert_lsn() AS insert_lsn, * FROM pg_stat_replication" > $OUTDIR/pg_stat_replication.log 2>&1

# check consistency - compare number of records, sum, ... could be made more thorough, but good enough
echo [`date`] [`date +%s`] "caught up, check consistency"
r="OK"
c=0
for t in `seq 1 $NUMTABLES`; do
	a=`psql -p $PUB_PORT test -t -A -c "SELECT id, value FROM t_${t}"`
	b=`psql -p $SUB_PORT test -t -A -c "SELECT id, value FROM t_${t}"`
	if [ "$a" == "$b" ]; then
		echo [`date`] [`date +%s`] "$t OK ($a $b)"
	else
		echo [`date`] [`date +%s`] "$t ERROR ($a $b)"
		r="ERROR"
		c=1
	fi
done

# cleanup tables on both ends
for t in `seq 1 $NUMTABLES`; do
	psql -p $PUB_PORT test -c "SELECT * FROM t_${t}" > $OUTDIR/test_${t}_pub.data
	psql -p $SUB_PORT test -c "SELECT * FROM t_${t}" > $OUTDIR/test_${t}_sub.data
done

pg_ctl -D $PUB_DATA -w -m immediate stop
pg_ctl -D $SUB_DATA -w -m immediate stop

# if went OK, remove the data directories
if [ "$r" == "OK" ]; then
	rm -Rf $PUB_DATA
	rm -Rf $SUB_DATA
	# remove the log files too, may be pretty large due to debugging
	rm $OUTDIR/pg-pub.log $OUTDIR/pg-sub.log
else
	# for error, compress the data directories
	tar -c $PUB_DATA | gzip --fast > $PUB_DATA.tgz
	tar -c $SUB_DATA | gzip --fast > $SUB_DATA.tgz
	# and also the data directories
	pigz --fast $OUTDIR/pg-pub.log $OUTDIR/pg-sub.log
fi

date

END_TIMESTAMP=`date +%s`

echo [`date`] [`date +%s`] "RUNTIME: " $((END_TIMESTAMP - START_TIMESTAMP))
echo [`date`] [`date +%s`] "RESULT: $r"

exit $c
