#!/usr/bin/bash
#
# ./crash-test.sh OUTPUT_DIRECTORY NUMBER_OF_TABLES REFRESH_INTERVAL SLEEP_SECONDS
#
# OUTPUT_DIRECTORY - directory where all the data directories / logs will be
# NUMBER_OF_TABLES - number of tables to create, add to publication, ...
# REFRESH_INTERVAL - call REFRESH PUBLICATION after adding this many tables
# SLEEP_SECONDS    - number of seconds to sleep between steps
# 
#
# What this does (briefly)
#
# - initialize and start two clusters - publisher and subscriber
# - create publication/subscription between them
# - start pgbench in the background, to generate changes
# - create given number of tables, add them to the publication
# - once in a while refresh the subscription to sync tables
# - stop the pgbench
# - wait for the subscriber to fully catch up
# - compare data on publisher/subscriber (simple cross-check)
# - if everything went OK, remove the data directories etc.
# - if there was error, keep the data (but compress, to save space)
#

OUTDIR=$1
NUMTABLES=$2
REFRESH=$3
SLEEP=$4

echo $PATH

START_TIMESTAMP=`date +%s`

echo [`date`] [`date +%s`] "NUMTABLES=$NUMTABLES  SLEEP=$SLEEP  REFRESH=$REFRESH"

# config
PUB_PORT=5001
PUB_DATA=$OUTDIR/data-pub

SUB_PORT=5002
SUB_DATA=$OUTDIR/data-sub

# initialize clusters
pg_ctl -D $PUB_DATA init
pg_ctl -D $SUB_DATA init

echo 'wal_level = logical' >> $PUB_DATA/postgresql.conf
echo "port = $PUB_PORT" >> $PUB_DATA/postgresql.conf
echo "log_line_prefix = '%n %m [%p] [%b] [%a] [%v/%x] '" >> $PUB_DATA/postgresql.conf

echo "port = $SUB_PORT" >> $SUB_DATA/postgresql.conf
echo "log_line_prefix = '%n %m [%p] [%b] [%a] [%v/%x] '" >> $SUB_DATA/postgresql.conf

pg_ctl -D $PUB_DATA -w -l $OUTDIR/pg-pub.log start
pg_ctl -D $SUB_DATA -w -l $OUTDIR/pg-sub.log start

createdb -p $PUB_PORT test > $OUTDIR/init-pub.log 2>&1
createdb -p $SUB_PORT test > $OUTDIR/init-sub.log 2>&1

# create table on both ends
for t in `seq 1 $NUMTABLES`; do
	psql -p $PUB_PORT test -c "CREATE TABLE test_${t} (id SERIAL PRIMARY KEY, ts timestamptz, lsn pg_lsn, val TEXT)"
	psql -p $SUB_PORT test -c "CREATE TABLE test_${t} (id SERIAL PRIMARY KEY, ts timestamptz, lsn pg_lsn, val TEXT)"
done

# generate insert script for pgbench
echo > insert.sql
for t in `seq 1 $NUMTABLES`; do
	echo "INSERT INTO test_${t} (ts, lsn, val) VALUES (clock_timestamp(), pg_current_wal_insert_lsn(), md5(random()::text));" >> insert.sql
done

sort -R insert.sql > insert.tmp && mv insert.tmp insert.sql

echo "BEGIN;" > insert.tmp
cat insert.sql >> insert.tmp
echo "COMMIT;" >> insert.tmp
mv insert.tmp insert.sql

mv insert.sql $OUTDIR/

# run pgbench on the source in the background (5m is overkill, we will terminate)
pgbench -n -c 8 -f $OUTDIR/insert.sql -p $PUB_PORT -T 3600 test > $OUTDIR/pgbench.log 2>&1 &

# create publication on 12 and add the table to it
echo [`date`] [`date +%s`] "creating publication"
psql -p $PUB_PORT test -c "CREATE PUBLICATION p"

# sleep for a few seconds
echo [`date`] [`date +%s`] "sleeping"
sleep $SLEEP

# add create subscription on 14
echo [`date`] [`date +%s`] "creating subscription"
psql -p $SUB_PORT test -c "CREATE SUBSCRIPTION s CONNECTION 'application_name=test host=localhost user=nitinmotiani dbname=test port=$PUB_PORT' PUBLICATION p"

# sleep for a few seconds
echo [`date`] [`date +%s`] "sleeping"
sleep $SLEEP

psql -p $PUB_PORT test -c "ALTER PUBLICATION p ADD TABLES IN SCHEMA public"
refreshed=0
for t in `seq 1 $NUMTABLES`; do

	lsn=`psql -p $PUB_PORT test -t -A -c "SELECT pg_current_wal_lsn()"`
	#echo [`date`] [`date +%s`] "adding table $t to publication [$lsn]"

	#psql -p $PUB_PORT test -c "ALTER PUBLICATION p ADD TABLE test_${t}"

	m=$((t % REFRESH))

	if [ "$m" == "0" ] && [ "$refreshed" == "0" ]; then

		lsn=`psql -p $PUB_PORT test -t -A -c "SELECT pg_current_wal_lsn()"`
		echo [`date`] [`date +%s`] "refreshing publication [$lsn] in loop"

		psql -p $SUB_PORT test -c "ALTER SUBSCRIPTION s REFRESH PUBLICATION"
		psql -p $PUB_PORT test -c "SELECT pg_current_wal_lsn() AS wal_lsn, pg_current_wal_insert_lsn() AS insert_lsn, (SELECT relname FROM pg_class AS c WHERE c.oid = p.prrelid) AS relname, * FROM pg_publication_rel p ORDER BY prrelid" >> $OUTDIR/pg_publication_rel.log 2>&1
		refreshed=1
	fi

done

if [ "$refreshed" == "0" ]; then
	lsn=`psql -p $PUB_PORT test -t -A -c "SELECT pg_current_wal_lsn()"`
	echo [`date`] [`date +%s`] "refreshing publication [$lsn]"
	psql -p $SUB_PORT test -c "ALTER SUBSCRIPTION s REFRESH PUBLICATION"
fi

psql -p $PUB_PORT test -c "SELECT pg_current_wal_lsn() AS wal_lsn, pg_current_wal_insert_lsn() AS insert_lsn, (SELECT relname FROM pg_class AS c WHERE c.oid = p.prrelid) AS relname, * FROM pg_publication_rel p ORDER BY prrelid" >> $OUTDIR/pg_publication_rel.log 2>&1

# wait for all tables to get in sync (essentially what wait_for_subscription_sync does)
while /bin/true; do
	echo [`date`] [`date +%s`] "waiting for subscription"
	c=`psql -p $SUB_PORT test -t -A -c "SELECT count(*) FROM pg_subscription_rel WHERE srsubstate = 'r'"`
	test_total=`psql -p $SUB_PORT test -t -A -c "SELECT count(*) FROM pg_subscription_rel"`
	echo "Total num entries in the rel table are $test_total"
	if [ "$c" == "$NUMTABLES" ]; then
		break
	fi
	echo [`date`] [`date +%s`] "synced $c tables out of $NUMTABLES"
	sleep 1
done

# sleep for a few more seconds, to generate more pgbench changes
echo [`date`] [`date +%s`] "sleeping"
sleep $SLEEP

# kill the pgbench, and wait for all the associated backends to terminate
killall pgbench

while /bin/true; do
	echo [`date`] [`date +%s`] "waiting for pgbench backends to die"
	r=`psql -p $PUB_PORT test -t -A -c "SELECT count(*) FROM pg_stat_activity WHERE application_name = 'pgbench'"`
	if [ "$r" == "0" ]; then
		break
	fi
	sleep 1
done

# get current LSN on publisher and wait for subscriber to fully catchup (same as wait_for_catchup)
lsn=`psql -p $PUB_PORT test -t -A -c "SELECT pg_current_wal_lsn()"`

echo [`date`] [`date +%s`] "waiting for replay of LSN $lsn"

start_lsn=`psql -p $PUB_PORT test -t -A -c "SELECT replay_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
start_time=`date +%s`

while /bin/true; do
	echo [`date`] [`date +%s`] "waiting for catchup $lsn"
	replay_lsn=`psql -p $PUB_PORT test -t -A -c "SELECT replay_lsn FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
	remaining_wal=`psql -p $PUB_PORT test -t -A -c "SELECT ('$lsn' - replay_lsn) FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
	remaining_kb=$((remaining_wal/1024))

	current_time=`date +%s`
	delta_time=$((current_time - start_time + 1))
	delta_wal=`psql -p $PUB_PORT test -t -A -c "SELECT (replay_lsn - '$start_lsn') FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
	total_wal=`psql -p $PUB_PORT test -t -A -c "SELECT ('$lsn'::pg_lsn - '$start_lsn'::pg_lsn) FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
	speed=$((delta_wal / delta_time))

	if [ "$speed" != "0" ]; then
		remaining_sec=$((remaining_wal / speed))
	else
		remaining_sec='???'
	fi

	r=`psql -p $PUB_PORT test -t -A -c "SELECT replay_lsn >= '$lsn' AND state = 'streaming' FROM pg_catalog.pg_stat_replication WHERE application_name = 'test'"`
	if [ "$r" == "t" ]; then
		break
	fi
	echo [`date`] [`date +%s`] "replay LSN $replay_lsn, remaining $remaining_kb kB / $remaining_sec seconds"
	sleep 1
done

# dump basic info about subscription and subscriber

psql -p $SUB_PORT test -c "SELECT oid, relname FROM pg_class AS c WHERE relname like 'test_%' ORDER BY relname" > $OUTDIR/pg_class.log 2>&1

psql -p $SUB_PORT test -c "SELECT (SELECT relname FROM pg_class AS c WHERE c.oid = r.srrelid) AS relname, * FROM pg_subscription_rel AS r ORDER BY 1" > $OUTDIR/pg_subscription_rel.log 2>&1

psql -p $PUB_PORT test -c "SELECT pg_current_wal_lsn() AS wal_lsn, pg_current_wal_insert_lsn() AS insert_lsn, * FROM pg_stat_replication" > $OUTDIR/pg_stat_replication.log 2>&1

# check consistency - compare number of records, sum, ... could be made more thorough, but good enough
echo [`date`] [`date +%s`] "caught up, check consistency"
r="OK"
c=0
for t in `seq 1 $NUMTABLES`; do
	a=`psql -p $PUB_PORT test -t -A -c "SELECT count(*), sum(id) FROM test_${t}"`
	b=`psql -p $SUB_PORT test -t -A -c "SELECT count(*), sum(id) FROM test_${t}"`
	if [ "$a" == "$b" ]; then
		echo [`date`] [`date +%s`] "$t OK ($a $b)"
	else
		echo [`date`] [`date +%s`] "$t ERROR ($a $b)"
		r="ERROR"
		c=1
	fi
done

# cleanup tables on both ends
for t in `seq 1 $NUMTABLES`; do
	psql -p $PUB_PORT test -c "SELECT * FROM test_${t} ORDER BY id" > $OUTDIR/test_${t}_pub.data
	psql -p $SUB_PORT test -c "SELECT * FROM test_${t} ORDER BY id" > $OUTDIR/test_${t}_sub.data
done

pg_ctl -D $PUB_DATA -w -m immediate stop
pg_ctl -D $SUB_DATA -w -m immediate stop

# if went OK, remove the data directories
if [ "$r" == "OK" ]; then
	rm -Rf $PUB_DATA
	rm -Rf $SUB_DATA
	# remove the log files too, may be pretty large due to debugging
	rm $OUTDIR/pg-pub.log $OUTDIR/pg-sub.log
else
	# for error, compress the data directories
	tar -c $PUB_DATA | gzip --fast > $PUB_DATA.tgz
	tar -c $SUB_DATA | gzip --fast > $SUB_DATA.tgz
	# and also the data directories
	pigz --fast $OUTDIR/pg-pub.log $OUTDIR/pg-sub.log
fi

date

END_TIMESTAMP=`date +%s`

echo [`date`] [`date +%s`] "RUNTIME: " $((END_TIMESTAMP - START_TIMESTAMP))
echo [`date`] [`date +%s`] "RESULT: $r"

exit $c
