From b1dea1eefeefc54743a79d63da0d6d5921e93ea5 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Mon, 12 Feb 2018 12:29:37 +1300
Subject: [PATCH] Use explicit memory barriers when manipulating
 MyProc->subxids and nxids.

The existing coding predated the availability of memory barrier primitives,
and could potentially behave incorrectly on systems with weaker memory
ordering.

DRAFT -- FOR DISCUSSION ONLY

Thomas Munro
Discussion: https://postgr.es/m/CAEepm%3D1nff0x%3D7i3YQO16jLA2qw-F9O39YmUew4oq-xcBQBs0g%40mail.gmail.com
---
 src/backend/access/transam/varsup.c | 36 +++++++++++++++---------------------
 src/backend/storage/ipc/procarray.c | 21 ++++++++++++++++-----
 2 files changed, 31 insertions(+), 26 deletions(-)

diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 394843f7e91..e9cf86b338c 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -197,6 +197,11 @@ GetNewTransactionId(bool isSubXact)
 	 *
 	 * The same comments apply to the subxact xid count and overflow fields.
 	 *
+	 * Other processes that read nxids should do so before reading xids
+	 * elements with a pg_read_barrier() in between, so that they can be sure
+	 * not to read an uninitialized array element; see
+	 * src/backend/storage/lmgr/README.barrier.
+	 *
 	 * A solution to the atomic-store problem would be to give each PGXACT its
 	 * own spinlock used only for fetching/storing that PGXACT's xid and
 	 * related fields.
@@ -211,31 +216,20 @@ GetNewTransactionId(bool isSubXact)
 	 * window *will* include the parent XID, so they will deliver the correct
 	 * answer later on when someone does have a reason to inquire.)
 	 */
+	if (!isSubXact)
+		MyPgXact->xid = xid;
+	else
 	{
-		/*
-		 * Use volatile pointer to prevent code rearrangement; other backends
-		 * could be examining my subxids info concurrently, and we don't want
-		 * them to see an invalid intermediate state, such as incrementing
-		 * nxids before filling the array entry.  Note we are assuming that
-		 * TransactionId and int fetch/store are atomic.
-		 */
-		volatile PGPROC *myproc = MyProc;
-		volatile PGXACT *mypgxact = MyPgXact;
+		int			nxids = MyPgXact->nxids;
 
-		if (!isSubXact)
-			mypgxact->xid = xid;
-		else
+		if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
 		{
-			int			nxids = mypgxact->nxids;
-
-			if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
-			{
-				myproc->subxids.xids[nxids] = xid;
-				mypgxact->nxids = nxids + 1;
-			}
-			else
-				mypgxact->overflowed = true;
+			MyProc->subxids.xids[nxids] = xid;
+			pg_write_barrier();
+			MyPgXact->nxids = nxids + 1;
 		}
+		else
+			MyPgXact->overflowed = true;
 	}
 
 	LWLockRelease(XidGenLock);
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 1a00011adc3..ba6c629d5db 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1075,8 +1075,8 @@ TransactionIdIsInProgress(TransactionId xid)
 	for (i = 0; i < arrayP->numProcs; i++)
 	{
 		int			pgprocno = arrayP->pgprocnos[i];
-		volatile PGPROC *proc = &allProcs[pgprocno];
-		volatile PGXACT *pgxact = &allPgXact[pgprocno];
+		PGPROC	   *proc = &allProcs[pgprocno];
+		PGXACT	   *pgxact = &allPgXact[pgprocno];
 		TransactionId pxid;
 
 		/* Ignore my own proc --- dealt with it above */
@@ -1107,9 +1107,13 @@ TransactionIdIsInProgress(TransactionId xid)
 			continue;
 
 		/*
-		 * Step 2: check the cached child-Xids arrays
+		 * Step 2: check the cached child-Xids arrays, making sure that we
+		 * observe the array size before we try to read elements in that
+		 * range.
 		 */
-		for (j = pgxact->nxids - 1; j >= 0; j--)
+		j = pgxact->nxids - 1;
+		pg_read_barrier();
+		for (; j >= 0; j--)
 		{
 			/* Fetch xid just once - see GetNewTransactionId */
 			TransactionId cxid = proc->subxids.xids[j];
@@ -1641,6 +1645,10 @@ GetSnapshotData(Snapshot snapshot)
 			 * missing any xids added concurrently, because they must postdate
 			 * xmax.)
 			 *
+			 * By including a read barrier between the read of nxids and the
+			 * array elements, we avoid accessing an uninitialized element,
+			 * because GetNewTransactionId() writes in the opposite order.
+			 *
 			 * Again, our own XIDs are not included in the snapshot.
 			 */
 			if (!suboverflowed)
@@ -1653,8 +1661,9 @@ GetSnapshotData(Snapshot snapshot)
 
 					if (nxids > 0)
 					{
-						volatile PGPROC *proc = &allProcs[pgprocno];
+						PGPROC	   *proc = &allProcs[pgprocno];
 
+						pg_read_barrier();
 						memcpy(snapshot->subxip + subcount,
 							   (void *) proc->subxids.xids,
 							   nxids * sizeof(TransactionId));
@@ -2034,6 +2043,7 @@ GetRunningTransactionData(void)
 			nxids = pgxact->nxids;
 			if (nxids > 0)
 			{
+				pg_read_barrier();
 				memcpy(&xids[count], (void *) proc->subxids.xids,
 					   nxids * sizeof(TransactionId));
 				count += nxids;
@@ -3001,6 +3011,7 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 #define XidCacheRemove(i) \
 	do { \
 		MyProc->subxids.xids[i] = MyProc->subxids.xids[MyPgXact->nxids - 1]; \
+		pg_write_barrier(); \
 		MyPgXact->nxids--; \
 	} while (0)
 
-- 
2.15.1

