Author: movead
Date:   Fri Jun 12 17:13:26 2020 +0800
src/backend/access/transam/Makefile       |   2 +
src/backend/access/transam/csn_log.c      | 438 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 
src/backend/access/transam/csn_snapshot.c | 340 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
src/backend/access/transam/twophase.c     |  28 ++++++++++++++
src/backend/access/transam/varsup.c       |   2 +
src/backend/access/transam/xact.c         |  29 +++++++++++++++
src/backend/access/transam/xlog.c         |  12 ++++++
src/backend/storage/ipc/ipci.c            |   6 +++
src/backend/storage/ipc/procarray.c       |  32 ++++++++++++++++
src/backend/storage/lmgr/lwlock.c         |   2 +
src/backend/storage/lmgr/lwlocknames.txt  |   1 +
src/backend/storage/lmgr/proc.c           |   3 ++
src/backend/utils/misc/guc.c              |  10 +++++
src/backend/utils/probes.d                |   2 +
src/backend/utils/time/snapmgr.c          |  49 ++++++++++++++++++++++++-
src/bin/initdb/initdb.c                   |   3 +-
src/include/access/csn_log.h              |  30 +++++++++++++++
src/include/access/csn_snapshot.h         |  58 +++++++++++++++++++++++++++++
src/include/datatype/timestamp.h          |   3 ++
src/include/fmgr.h                        |   1 +
src/include/portability/instr_time.h      |  10 +++++
src/include/storage/lwlock.h              |   1 +
src/include/storage/proc.h                |  12 ++++++
src/include/utils/snapshot.h              |   9 +++++
src/test/regress/expected/sysviews.out    |   3 +-
25 files changed, 1082 insertions(+), 4 deletions(-)
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de72..fc0321ee6b 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -15,6 +15,8 @@ include $(top_builddir)/src/Makefile.global
 OBJS = \
 	clog.o \
 	commit_ts.o \
+	csn_log.o \
+	csn_snapshot.o \
 	generic_xlog.o \
 	multixact.o \
 	parallel.o \
diff --git a/src/backend/access/transam/csn_log.c b/src/backend/access/transam/csn_log.c
new file mode 100644
index 0000000000..4e0b8d64e4
--- /dev/null
+++ b/src/backend/access/transam/csn_log.c
@@ -0,0 +1,438 @@
+/*-----------------------------------------------------------------------------
+ *
+ * csn_log.c
+ *		Track commit sequence numbers of finished transactions
+ *
+ * This module provides SLRU to store CSN for each transaction.  This
+ * mapping need to be kept only for xid's greater then oldestXid, but
+ * that can require arbitrary large amounts of memory in case of long-lived
+ * transactions.  Because of same lifetime and persistancy requirements
+ * this module is quite similar to subtrans.c
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/csn_log.c
+ *
+ *-----------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/csn_log.h"
+#include "access/slru.h"
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "utils/snapmgr.h"
+
+bool enable_csn_snapshot;
+
+/*
+ * Defines for CSNLog page sizes.  A page is the same BLCKSZ as is used
+ * everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * CSNLog page numbering also wraps around at
+ * 0xFFFFFFFF/CSN_LOG_XACTS_PER_PAGE, and CSNLog segment numbering at
+ * 0xFFFFFFFF/CLOG_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateCSNLog (see CSNLogPagePrecedes).
+ */
+
+/* We store the commit CSN for each xid */
+#define CSN_LOG_XACTS_PER_PAGE (BLCKSZ / sizeof(XidCSN))
+
+#define TransactionIdToPage(xid)	((xid) / (TransactionId) CSN_LOG_XACTS_PER_PAGE)
+#define TransactionIdToPgIndex(xid) ((xid) % (TransactionId) CSN_LOG_XACTS_PER_PAGE)
+
+/*
+ * Link to shared-memory data structures for CLOG control
+ */
+static SlruCtlData CSNLogCtlData;
+#define CsnlogCtl (&CSNLogCtlData)
+
+static int	ZeroCSNLogPage(int pageno);
+static bool CSNLogPagePrecedes(int page1, int page2);
+static void CSNLogSetPageStatus(TransactionId xid, int nsubxids,
+									  TransactionId *subxids,
+									  XidCSN csn, int pageno);
+static void CSNLogSetCSNInSlot(TransactionId xid, XidCSN csn,
+									  int slotno);
+
+/*
+ * CSNLogSetCSN
+ *
+ * Record XidCSN of transaction and its subtransaction tree.
+ *
+ * xid is a single xid to set status for. This will typically be the top level
+ * transactionid for a top level commit or abort. It can also be a
+ * subtransaction when we record transaction aborts.
+ *
+ * subxids is an array of xids of length nsubxids, representing subtransactions
+ * in the tree of xid. In various cases nsubxids may be zero.
+ *
+ * csn is the commit sequence number of the transaction. It should be
+ * AbortedCSN for abort cases.
+ */
+void
+CSNLogSetCSN(TransactionId xid, int nsubxids,
+					 TransactionId *subxids, XidCSN csn)
+{
+	int			pageno;
+	int			i = 0;
+	int			offset = 0;
+
+	/* Callers of CSNLogSetCSN() must check GUC params */
+	Assert(enable_csn_snapshot);
+
+	Assert(TransactionIdIsValid(xid));
+
+	pageno = TransactionIdToPage(xid);		/* get page of parent */
+	for (;;)
+	{
+		int			num_on_page = 0;
+
+		while (i < nsubxids && TransactionIdToPage(subxids[i]) == pageno)
+		{
+			num_on_page++;
+			i++;
+		}
+
+		CSNLogSetPageStatus(xid,
+							num_on_page, subxids + offset,
+							csn, pageno);
+		if (i >= nsubxids)
+			break;
+
+		offset = i;
+		pageno = TransactionIdToPage(subxids[offset]);
+		xid = InvalidTransactionId;
+	}
+}
+
+/*
+ * Record the final state of transaction entries in the csn log for
+ * all entries on a single page.  Atomic only on this page.
+ *
+ * Otherwise API is same as TransactionIdSetTreeStatus()
+ */
+static void
+CSNLogSetPageStatus(TransactionId xid, int nsubxids,
+						   TransactionId *subxids,
+						   XidCSN csn, int pageno)
+{
+	int			slotno;
+	int			i;
+
+	LWLockAcquire(CSNLogControlLock, LW_EXCLUSIVE);
+
+	slotno = SimpleLruReadPage(CsnlogCtl, pageno, true, xid);
+
+	/* Subtransactions first, if needed ... */
+	for (i = 0; i < nsubxids; i++)
+	{
+		Assert(CsnlogCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));
+		CSNLogSetCSNInSlot(subxids[i],	csn, slotno);
+	}
+
+	/* ... then the main transaction */
+	if (TransactionIdIsValid(xid))
+		CSNLogSetCSNInSlot(xid, csn, slotno);
+
+	CsnlogCtl->shared->page_dirty[slotno] = true;
+
+	LWLockRelease(CSNLogControlLock);
+}
+
+/*
+ * Sets the commit status of a single transaction.
+ */
+static void
+CSNLogSetCSNInSlot(TransactionId xid, XidCSN csn, int slotno)
+{
+	int			entryno = TransactionIdToPgIndex(xid);
+	XidCSN *ptr;
+
+	Assert(LWLockHeldByMe(CSNLogControlLock));
+
+	ptr = (XidCSN *) (CsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr));
+
+	*ptr = csn;
+}
+
+/*
+ * Interrogate the state of a transaction in the log.
+ *
+ * NB: this is a low-level routine and is NOT the preferred entry point
+ * for most uses; TransactionIdGetXidCSN() in csn_snapshot.c is the
+ * intended caller.
+ */
+XidCSN
+CSNLogGetCSNByXid(TransactionId xid)
+{
+	int			pageno = TransactionIdToPage(xid);
+	int			entryno = TransactionIdToPgIndex(xid);
+	int			slotno;
+	XidCSN *ptr;
+	XidCSN	xid_csn;
+
+	/* Callers of CSNLogGetCSNByXid() must check GUC params */
+	Assert(enable_csn_snapshot);
+
+	/* Can't ask about stuff that might not be around anymore */
+	Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
+
+	/* lock is acquired by SimpleLruReadPage_ReadOnly */
+
+	slotno = SimpleLruReadPage_ReadOnly(CsnlogCtl, pageno, xid);
+	ptr = (XidCSN *) (CsnlogCtl->shared->page_buffer[slotno] + entryno * sizeof(XLogRecPtr));
+	xid_csn = *ptr;
+
+	LWLockRelease(CSNLogControlLock);
+
+	return xid_csn;
+}
+
+/*
+ * Number of shared CSNLog buffers.
+ */
+static Size
+CSNLogShmemBuffers(void)
+{
+	return Min(32, Max(4, NBuffers / 512));
+}
+
+/*
+ * Reserve shared memory for CsnlogCtl.
+ */
+Size
+CSNLogShmemSize(void)
+{
+	if (!enable_csn_snapshot)
+		return 0;
+
+	return SimpleLruShmemSize(CSNLogShmemBuffers(), 0);
+}
+
+/*
+ * Initialization of shared memory for CSNLog.
+ */
+void
+CSNLogShmemInit(void)
+{
+	if (!enable_csn_snapshot)
+		return;
+
+	CsnlogCtl->PagePrecedes = CSNLogPagePrecedes;
+	SimpleLruInit(CsnlogCtl, "CSNLog Ctl", CSNLogShmemBuffers(), 0,
+				  CSNLogControlLock, "pg_csn", LWTRANCHE_CSN_LOG_BUFFERS);
+}
+
+/*
+ * This func must be called ONCE on system install.  It creates the initial
+ * CSNLog segment.  The pg_csn directory is assumed to have been
+ * created by initdb, and CSNLogShmemInit must have been called already.
+ */
+void
+BootStrapCSNLog(void)
+{
+	int			slotno;
+
+	if (!enable_csn_snapshot)
+		return;
+
+	LWLockAcquire(CSNLogControlLock, LW_EXCLUSIVE);
+
+	/* Create and zero the first page of the commit log */
+	slotno = ZeroCSNLogPage(0);
+
+	/* Make sure it's written out */
+	SimpleLruWritePage(CsnlogCtl, slotno);
+	Assert(!CsnlogCtl->shared->page_dirty[slotno]);
+
+	LWLockRelease(CSNLogControlLock);
+}
+
+/*
+ * Initialize (or reinitialize) a page of CSNLog to zeroes.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroCSNLogPage(int pageno)
+{
+	Assert(LWLockHeldByMe(CSNLogControlLock));
+	return SimpleLruZeroPage(CsnlogCtl, pageno);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * after StartupXLOG has initialized ShmemVariableCache->nextXid.
+ *
+ * oldestActiveXID is the oldest XID of any prepared transaction, or nextXid
+ * if there are none.
+ */
+void
+StartupCSNLog(TransactionId oldestActiveXID)
+{
+	int			startPage;
+	int			endPage;
+
+	if (!enable_csn_snapshot)
+		return;
+
+	/*
+	 * Since we don't expect pg_csn to be valid across crashes, we
+	 * initialize the currently-active page(s) to zeroes during startup.
+	 * Whenever we advance into a new page, ExtendCSNLog will likewise
+	 * zero the new page without regard to whatever was previously on disk.
+	 */
+	LWLockAcquire(CSNLogControlLock, LW_EXCLUSIVE);
+
+	startPage = TransactionIdToPage(oldestActiveXID);
+	endPage = TransactionIdToPage(XidFromFullTransactionId(ShmemVariableCache->nextFullXid));
+
+	while (startPage != endPage)
+	{
+		(void) ZeroCSNLogPage(startPage);
+		startPage++;
+		/* must account for wraparound */
+		if (startPage > TransactionIdToPage(MaxTransactionId))
+			startPage = 0;
+	}
+	(void) ZeroCSNLogPage(startPage);
+
+	LWLockRelease(CSNLogControlLock);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend shutdown
+ */
+void
+ShutdownCSNLog(void)
+{
+	if (!enable_csn_snapshot)
+		return;
+
+	/*
+	 * Flush dirty CSNLog pages to disk.
+	 *
+	 * This is not actually necessary from a correctness point of view. We do
+	 * it merely as a debugging aid.
+	 */
+	TRACE_POSTGRESQL_CSNLOG_CHECKPOINT_START(false);
+	SimpleLruFlush(CsnlogCtl, false);
+	TRACE_POSTGRESQL_CSNLOG_CHECKPOINT_DONE(false);
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointCSNLog(void)
+{
+	if (!enable_csn_snapshot)
+		return;
+
+	/*
+	 * Flush dirty CSNLog pages to disk.
+	 *
+	 * This is not actually necessary from a correctness point of view. We do
+	 * it merely to improve the odds that writing of dirty pages is done by
+	 * the checkpoint process and not by backends.
+	 */
+	TRACE_POSTGRESQL_CSNLOG_CHECKPOINT_START(true);
+	SimpleLruFlush(CsnlogCtl, true);
+	TRACE_POSTGRESQL_CSNLOG_CHECKPOINT_DONE(true);
+}
+
+/*
+ * Make sure that CSNLog has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock.  We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty clog or xlog page to make room
+ * in shared memory.
+ */
+void
+ExtendCSNLog(TransactionId newestXact)
+{
+	int			pageno;
+
+	if (!enable_csn_snapshot)
+		return;
+
+	/*
+	 * No work except at first XID of a page.  But beware: just after
+	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
+	 */
+	if (TransactionIdToPgIndex(newestXact) != 0 &&
+		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
+		return;
+
+	pageno = TransactionIdToPage(newestXact);
+
+	LWLockAcquire(CSNLogControlLock, LW_EXCLUSIVE);
+
+	/* Zero the page and make an XLOG entry about it */
+	ZeroCSNLogPage(pageno);
+
+	LWLockRelease(CSNLogControlLock);
+}
+
+/*
+ * Remove all CSNLog segments before the one holding the passed
+ * transaction ID.
+ *
+ * This is normally called during checkpoint, with oldestXact being the
+ * oldest TransactionXmin of any running transaction.
+ */
+void
+TruncateCSNLog(TransactionId oldestXact)
+{
+	int			cutoffPage;
+
+	if (!enable_csn_snapshot)
+		return;
+
+	/*
+	 * The cutoff point is the start of the segment containing oldestXact. We
+	 * pass the *page* containing oldestXact to SimpleLruTruncate. We step
+	 * back one transaction to avoid passing a cutoff page that hasn't been
+	 * created yet in the rare case that oldestXact would be the first item on
+	 * a page and oldestXact == next XID.  In that case, if we didn't subtract
+	 * one, we'd trigger SimpleLruTruncate's wraparound detection.
+	 */
+	TransactionIdRetreat(oldestXact);
+	cutoffPage = TransactionIdToPage(oldestXact);
+
+	SimpleLruTruncate(CsnlogCtl, cutoffPage);
+}
+
+/*
+ * Decide which of two CSNLog page numbers is "older" for truncation
+ * purposes.
+ *
+ * We need to use comparison of TransactionIds here in order to do the right
+ * thing with wraparound XID arithmetic.  However, if we are asked about
+ * page number zero, we don't want to hand InvalidTransactionId to
+ * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
+ * offset both xids by FirstNormalTransactionId to avoid that.
+ */
+static bool
+CSNLogPagePrecedes(int page1, int page2)
+{
+	TransactionId xid1;
+	TransactionId xid2;
+
+	xid1 = ((TransactionId) page1) * CSN_LOG_XACTS_PER_PAGE;
+	xid1 += FirstNormalTransactionId;
+	xid2 = ((TransactionId) page2) * CSN_LOG_XACTS_PER_PAGE;
+	xid2 += FirstNormalTransactionId;
+
+	return TransactionIdPrecedes(xid1, xid2);
+}
diff --git a/src/backend/access/transam/csn_snapshot.c b/src/backend/access/transam/csn_snapshot.c
new file mode 100644
index 0000000000..bcc5bac757
--- /dev/null
+++ b/src/backend/access/transam/csn_snapshot.c
@@ -0,0 +1,340 @@
+/*-------------------------------------------------------------------------
+ *
+ * csn_snapshot.c
+ *		Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/csn_snapshot.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/csn_log.h"
+#include "access/csn_snapshot.h"
+#include "access/transam.h"
+#include "access/twophase.h"
+#include "access/xact.h"
+#include "portability/instr_time.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shmem.h"
+#include "storage/spin.h"
+#include "utils/builtins.h"
+#include "utils/guc.h"
+#include "utils/snapmgr.h"
+#include "miscadmin.h"
+
+/* Raise a warning if imported snapshot_csn exceeds ours by this value. */
+#define SNAP_DESYNC_COMPLAIN (1*NSECS_PER_SEC) /* 1 second */
+
+/*
+ * CSNSnapshotState
+ *
+ * Do not trust local clocks to be strictly monotonical and save last acquired
+ * value so later we can compare next timestamp with it. Accessed through
+ * GenerateCSN().
+ */
+typedef struct
+{
+	SnapshotCSN		 last_max_csn;
+	volatile slock_t lock;
+} CSNSnapshotState;
+
+static CSNSnapshotState *csnState;
+
+/*
+ * Enables this module.
+ */
+extern bool enable_csn_snapshot;
+
+
+/* Estimate shared memory space needed */
+Size
+CSNSnapshotShmemSize(void)
+{
+	Size	size = 0;
+
+	if (enable_csn_snapshot)
+	{
+		size += MAXALIGN(sizeof(CSNSnapshotState));
+	}
+
+	return size;
+}
+
+/* Init shared memory structures */
+void
+CSNSnapshotShmemInit()
+{
+	bool found;
+
+	if (enable_csn_snapshot)
+	{
+		csnState = ShmemInitStruct("csnState",
+								sizeof(CSNSnapshotState),
+								&found);
+		if (!found)
+		{
+			csnState->last_max_csn = 0;
+			SpinLockInit(&csnState->lock);
+		}
+	}
+}
+
+/*
+ * GenerateCSN
+ *
+ * Generate SnapshotCSN which is actually a local time. Also we are forcing
+ * this time to be always increasing. Since now it is not uncommon to have
+ * millions of read transactions per second we are trying to use nanoseconds
+ * if such time resolution is available.
+ */
+SnapshotCSN
+GenerateCSN(bool locked)
+{
+	instr_time	current_time;
+	SnapshotCSN	csn;
+
+	Assert(enable_csn_snapshot);
+
+	/*
+	 * TODO: create some macro that add small random shift to current time.
+	 */
+	INSTR_TIME_SET_CURRENT(current_time);
+	csn = (SnapshotCSN) INSTR_TIME_GET_NANOSEC(current_time);
+
+	/* TODO: change to atomics? */
+	if (!locked)
+		SpinLockAcquire(&csnState->lock);
+
+	if (csn <= csnState->last_max_csn)
+		csn = ++csnState->last_max_csn;
+	else
+		csnState->last_max_csn = csn;
+
+	if (!locked)
+		SpinLockRelease(&csnState->lock);
+
+	return csn;
+}
+
+/*
+ * TransactionIdGetXidCSN
+ *
+ * Get XidCSN for specified TransactionId taking care about special xids,
+ * xids beyond TransactionXmin and InDoubt states.
+ */
+XidCSN
+TransactionIdGetXidCSN(TransactionId xid)
+{
+	XidCSN xid_csn;
+
+	Assert(enable_csn_snapshot);
+
+	/* Handle permanent TransactionId's for which we don't have mapping */
+	if (!TransactionIdIsNormal(xid))
+	{
+		if (xid == InvalidTransactionId)
+			return AbortedXidCSN;
+		if (xid == FrozenTransactionId || xid == BootstrapTransactionId)
+			return FrozenXidCSN;
+		Assert(false); /* Should not happend */
+	}
+
+	/*
+	 * For xids which less then TransactionXmin CSNLog can be already
+	 * trimmed but we know that such transaction is definetly not concurrently
+	 * running according to any snapshot including timetravel ones. Callers
+	 * should check TransactionDidCommit after.
+	 */
+	if (TransactionIdPrecedes(xid, TransactionXmin))
+		return FrozenXidCSN;
+
+	/* Read XidCSN from SLRU */
+	xid_csn = CSNLogGetCSNByXid(xid);
+
+	/*
+	 * If we faced InDoubt state then transaction is beeing committed and we
+	 * should wait until XidCSN will be assigned so that visibility check
+	 * could decide whether tuple is in snapshot. See also comments in
+	 * CSNSnapshotPrecommit().
+	 */
+	if (XidCSNIsInDoubt(xid_csn))
+	{
+		XactLockTableWait(xid, NULL, NULL, XLTW_None);
+		xid_csn = CSNLogGetCSNByXid(xid);
+		Assert(XidCSNIsNormal(xid_csn) ||
+				XidCSNIsAborted(xid_csn));
+	}
+
+	Assert(XidCSNIsNormal(xid_csn) ||
+			XidCSNIsInProgress(xid_csn) ||
+			XidCSNIsAborted(xid_csn));
+
+	return xid_csn;
+}
+
+/*
+ * XidInvisibleInCSNSnapshot
+ *
+ * Version of XidInMVCCSnapshot for transactions. For non-imported
+ * csn snapshots this should give same results as XidInLocalMVCCSnapshot
+ * (except that aborts will be shown as invisible without going to clog) and to
+ * ensure such behaviour XidInMVCCSnapshot is coated with asserts that checks
+ * identicalness of XidInvisibleInCSNSnapshot/XidInLocalMVCCSnapshot in
+ * case of ordinary snapshot.
+ */
+bool
+XidInvisibleInCSNSnapshot(TransactionId xid, Snapshot snapshot)
+{
+	XidCSN csn;
+
+	Assert(enable_csn_snapshot);
+
+	csn = TransactionIdGetXidCSN(xid);
+
+	if (XidCSNIsNormal(csn))
+	{
+		if (csn < snapshot->snapshot_csn)
+			return false;
+		else
+			return true;
+	}
+	else if (XidCSNIsFrozen(csn))
+	{
+		/* It is bootstrap or frozen transaction */
+		return false;
+	}
+	else
+	{
+		/* It is aborted or in-progress */
+		Assert(XidCSNIsAborted(csn) || XidCSNIsInProgress(csn));
+		if (XidCSNIsAborted(csn))
+			Assert(TransactionIdDidAbort(xid));
+		return true;
+	}
+}
+
+
+/*****************************************************************************
+ * Functions to handle transactions commit.
+ *
+ * For local transactions CSNSnapshotPrecommit sets InDoubt state before
+ * ProcArrayEndTransaction is called and transaction data potetntially becomes
+ * visible to other backends. ProcArrayEndTransaction (or ProcArrayRemove in
+ * twophase case) then acquires xid_csn under ProcArray lock and stores it
+ * in proc->assignedXidCsn. It's important that xid_csn for commit is
+ * generated under ProcArray lock, otherwise snapshots won't
+ * be equivalent. Consequent call to CSNSnapshotCommit will write
+ * proc->assignedXidCsn to CSNLog.
+ *
+ *
+ * CSNSnapshotAbort is slightly different comparing to commit because abort
+ * can skip InDoubt phase and can be called for transaction subtree.
+ *****************************************************************************/
+
+
+/*
+ * CSNSnapshotAbort
+ *
+ * Abort transaction in CsnLog. We can skip InDoubt state for aborts
+ * since no concurrent transactions allowed to see aborted data anyway.
+ */
+void
+CSNSnapshotAbort(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	if (!enable_csn_snapshot)
+		return;
+
+	CSNLogSetCSN(xid, nsubxids, subxids, AbortedXidCSN);
+
+	/*
+	 * Clean assignedXidCsn anyway, as it was possibly set in
+	 * XidSnapshotAssignCsnCurrent.
+	 */
+	pg_atomic_write_u64(&proc->assignedXidCsn, InProgressXidCSN);
+}
+
+/*
+ * CSNSnapshotPrecommit
+ *
+ * Set InDoubt status for local transaction that we are going to commit.
+ * This step is needed to achieve consistency between local snapshots and
+ * csn-based snapshots. We don't hold ProcArray lock while writing
+ * csn for transaction in SLRU but instead we set InDoubt status before
+ * transaction is deleted from ProcArray so the readers who will read csn
+ * in the gap between ProcArray removal and XidCSN assignment can wait
+ * until XidCSN is finally assigned. See also TransactionIdGetXidCSN().
+ *
+ * This should be called only from parallel group leader before backend is
+ * deleted from ProcArray.
+ */
+void
+CSNSnapshotPrecommit(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	XidCSN oldassignedXidCsn = InProgressXidCSN;
+	bool in_progress;
+
+	if (!enable_csn_snapshot)
+		return;
+
+	/* Set InDoubt status if it is local transaction */
+	in_progress = pg_atomic_compare_exchange_u64(&proc->assignedXidCsn,
+												 &oldassignedXidCsn,
+												 InDoubtXidCSN);
+	if (in_progress)
+	{
+		Assert(XidCSNIsInProgress(oldassignedXidCsn));
+		CSNLogSetCSN(xid, nsubxids,
+						   subxids, InDoubtXidCSN);
+	}
+	else
+	{
+		/* Otherwise we should have valid XidCSN by this time */
+		Assert(XidCSNIsNormal(oldassignedXidCsn));
+		Assert(XidCSNIsInDoubt(CSNLogGetCSNByXid(xid)));
+	}
+}
+
+/*
+ * CSNSnapshotCommit
+ *
+ * Write XidCSN that were acquired earlier to CsnLog. Should be
+ * preceded by CSNSnapshotPrecommit() so readers can wait until we finally
+ * finished writing to SLRU.
+ *
+ * Should be called after ProcArrayEndTransaction, but before releasing
+ * transaction locks, so that TransactionIdGetXidCSN can wait on this
+ * lock for XidCSN.
+ */
+void
+CSNSnapshotCommit(PGPROC *proc, TransactionId xid,
+					int nsubxids, TransactionId *subxids)
+{
+	volatile XidCSN assigned_xid_csn;
+
+	if (!enable_csn_snapshot)
+		return;
+
+	if (!TransactionIdIsValid(xid))
+	{
+		assigned_xid_csn = pg_atomic_read_u64(&proc->assignedXidCsn);
+		Assert(XidCSNIsInProgress(assigned_xid_csn));
+		return;
+	}
+
+	/* Finally write resulting XidCSN in SLRU */
+	assigned_xid_csn = pg_atomic_read_u64(&proc->assignedXidCsn);
+	Assert(XidCSNIsNormal(assigned_xid_csn));
+	CSNLogSetCSN(xid, nsubxids,
+						   subxids, assigned_xid_csn);
+
+	/* Reset for next transaction */
+	pg_atomic_write_u64(&proc->assignedXidCsn, InProgressXidCSN);
+}
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index e1904877fa..af5f388c12 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -77,6 +77,8 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/csn_snapshot.h"
+#include "access/csn_log.h"
 #include "access/htup_details.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
@@ -1479,8 +1481,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
 									   hdr->nabortrels, abortrels,
 									   gid);
 
+	/*
+	 * CSNSnapshot callbacks that should be called right before we are
+	 * going to become visible. Details in comments to this functions.
+	 */
+	if (isCommit)
+		CSNSnapshotPrecommit(proc, xid, hdr->nsubxacts, children);
+	else
+		CSNSnapshotAbort(proc, xid, hdr->nsubxacts, children);
+
+
 	ProcArrayRemove(proc, latestXid);
 
+	/*
+	 * Stamp our transaction with XidCSN in CSNLog.
+	 * Should be called after ProcArrayEndTransaction, but before releasing
+	 * transaction locks, since TransactionIdGetXidCSN relies on
+	 * XactLockTableWait to await xid_csn.
+	 */
+	if (isCommit)
+	{
+		CSNSnapshotCommit(proc, xid, hdr->nsubxacts, children);
+	}
+	else
+	{
+		Assert(XidCSNIsInProgress(
+				   pg_atomic_read_u64(&proc->assignedXidCsn)));
+	}
+
 	/*
 	 * In case we fail while running the callbacks, mark the gxact invalid so
 	 * no one else will try to commit/rollback, and so it will be recycled if
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index e14b53bf9e..b045ed09f3 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -15,6 +15,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/csn_log.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -173,6 +174,7 @@ GetNewTransactionId(bool isSubXact)
 	 * Extend pg_subtrans and pg_commit_ts too.
 	 */
 	ExtendCLOG(xid);
+	ExtendCSNLog(xid);
 	ExtendCommitTs(xid);
 	ExtendSUBTRANS(xid);
 
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index cd30b62d36..8dcf951954 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -21,6 +21,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/csn_snapshot.h"
 #include "access/multixact.h"
 #include "access/parallel.h"
 #include "access/subtrans.h"
@@ -1433,6 +1434,14 @@ RecordTransactionCommit(void)
 
 	/* Reset XactLastRecEnd until the next transaction writes something */
 	XactLastRecEnd = 0;
+
+	/*
+	 * Mark our transaction as InDoubt in CsnLog and get ready for
+	 * commit.
+	 */
+	if (markXidCommitted)
+		CSNSnapshotPrecommit(MyProc, xid, nchildren, children);
+
 cleanup:
 	/* Clean up local data */
 	if (rels)
@@ -1694,6 +1703,11 @@ RecordTransactionAbort(bool isSubXact)
 	 */
 	TransactionIdAbortTree(xid, nchildren, children);
 
+	/*
+	 * Mark our transaction as Aborted in CsnLog.
+	 */
+	CSNSnapshotAbort(MyProc, xid, nchildren, children);
+
 	END_CRIT_SECTION();
 
 	/* Compute latestXid while we have the child XIDs handy */
@@ -2183,6 +2197,21 @@ CommitTransaction(void)
 	 */
 	ProcArrayEndTransaction(MyProc, latestXid);
 
+	/*
+	 * Stamp our transaction with XidCSN in CsnLog.
+	 * Should be called after ProcArrayEndTransaction, but before releasing
+	 * transaction locks.
+	 */
+	if (!is_parallel_worker)
+	{
+		TransactionId  xid = GetTopTransactionIdIfAny();
+		TransactionId *subxids;
+		int			   nsubxids;
+
+		nsubxids = xactGetCommittedChildren(&subxids);
+		CSNSnapshotCommit(MyProc, xid, nsubxids, subxids);
+	}
+
 	/*
 	 * This is all post-commit cleanup.  Note that if an error is raised here,
 	 * it's too late to abort the transaction.  This should be just
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 55cac186dc..5b41aa58c3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -24,6 +24,7 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/csn_log.h"
 #include "access/heaptoast.h"
 #include "access/multixact.h"
 #include "access/rewriteheap.h"
@@ -5342,6 +5343,7 @@ BootStrapXLOG(void)
 
 	/* Bootstrap the commit log, too */
 	BootStrapCLOG();
+	BootStrapCSNLog();
 	BootStrapCommitTs();
 	BootStrapSUBTRANS();
 	BootStrapMultiXact();
@@ -7059,6 +7061,7 @@ StartupXLOG(void)
 			 * maintained during recovery and need not be started yet.
 			 */
 			StartupCLOG();
+			StartupCSNLog(oldestActiveXID);
 			StartupSUBTRANS(oldestActiveXID);
 
 			/*
@@ -7876,6 +7879,7 @@ StartupXLOG(void)
 	if (standbyState == STANDBY_DISABLED)
 	{
 		StartupCLOG();
+		StartupCSNLog(oldestActiveXID);
 		StartupSUBTRANS(oldestActiveXID);
 	}
 
@@ -8523,6 +8527,7 @@ ShutdownXLOG(int code, Datum arg)
 		CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
 	}
 	ShutdownCLOG();
+	ShutdownCSNLog();
 	ShutdownCommitTs();
 	ShutdownSUBTRANS();
 	ShutdownMultiXact();
@@ -9095,7 +9100,10 @@ CreateCheckPoint(int flags)
 	 * StartupSUBTRANS hasn't been called yet.
 	 */
 	if (!RecoveryInProgress())
+	{
 		TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+		TruncateCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+	}
 
 	/* Real work is done, but log and update stats before releasing lock. */
 	LogCheckpointEnd(false);
@@ -9171,6 +9179,7 @@ static void
 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointCLOG();
+	CheckPointCSNLog();
 	CheckPointCommitTs();
 	CheckPointSUBTRANS();
 	CheckPointMultiXact();
@@ -9455,7 +9464,10 @@ CreateRestartPoint(int flags)
 	 * this because StartupSUBTRANS hasn't been called yet.
 	 */
 	if (EnableHotStandby)
+	{
 		TruncateSUBTRANS(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+		TruncateCSNLog(GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT));
+	}
 
 	/* Real work is done, but log and update before releasing lock. */
 	LogCheckpointEnd(true);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..7122babfd6 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -16,11 +16,13 @@
 
 #include "access/clog.h"
 #include "access/commit_ts.h"
+#include "access/csn_log.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
 #include "access/subtrans.h"
 #include "access/twophase.h"
+#include "access/csn_snapshot.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -125,6 +127,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, ProcGlobalShmemSize());
 		size = add_size(size, XLOGShmemSize());
 		size = add_size(size, CLOGShmemSize());
+		size = add_size(size, CSNLogShmemSize());
 		size = add_size(size, CommitTsShmemSize());
 		size = add_size(size, SUBTRANSShmemSize());
 		size = add_size(size, TwoPhaseShmemSize());
@@ -143,6 +146,7 @@ CreateSharedMemoryAndSemaphores(void)
 		size = add_size(size, WalSndShmemSize());
 		size = add_size(size, WalRcvShmemSize());
 		size = add_size(size, ApplyLauncherShmemSize());
+		size = add_size(size, CSNSnapshotShmemSize());
 		size = add_size(size, SnapMgrShmemSize());
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
@@ -213,6 +217,7 @@ CreateSharedMemoryAndSemaphores(void)
 	 */
 	XLOGShmemInit();
 	CLOGShmemInit();
+	CSNLogShmemInit();
 	CommitTsShmemInit();
 	SUBTRANSShmemInit();
 	MultiXactShmemInit();
@@ -264,6 +269,7 @@ CreateSharedMemoryAndSemaphores(void)
 	SyncScanShmemInit();
 	AsyncShmemInit();
 
+	CSNSnapshotShmemInit();
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 3c2b369615..5f491cf6e9 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -46,6 +46,8 @@
 #include <signal.h>
 
 #include "access/clog.h"
+#include "access/csn_log.h"
+#include "access/csn_snapshot.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -352,6 +354,14 @@ ProcArrayRemove(PGPROC *proc, TransactionId latestXid)
 		if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
 								  latestXid))
 			ShmemVariableCache->latestCompletedXid = latestXid;
+
+		/*
+		 * Assign xid csn while holding ProcArrayLock for
+		 * COMMIT PREPARED. After lock is released consequent
+		 * CSNSnapshotCommit() will write this value to CsnLog.
+		 */
+		if (XidCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedXidCsn)))
+			pg_atomic_write_u64(&proc->assignedXidCsn, GenerateCSN(false));
 	}
 	else
 	{
@@ -467,6 +477,16 @@ ProcArrayEndTransactionInternal(PGPROC *proc, PGXACT *pgxact,
 	if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
 							  latestXid))
 		ShmemVariableCache->latestCompletedXid = latestXid;
+
+	/*
+	 * Assign xid csn while holding ProcArrayLock for
+	 * COMMIT.
+	 *
+	 * TODO: in case of group commit we can generate one CSNSnapshot for
+	 * whole group to save time on timestamp aquisition.
+	 */
+	if (XidCSNIsInDoubt(pg_atomic_read_u64(&proc->assignedXidCsn)))
+		pg_atomic_write_u64(&proc->assignedXidCsn, GenerateCSN(false));
 }
 
 /*
@@ -833,6 +853,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 	while (TransactionIdPrecedes(latestObservedXid, running->nextXid))
 	{
 		ExtendSUBTRANS(latestObservedXid);
+		ExtendCSNLog(latestObservedXid);
 		TransactionIdAdvance(latestObservedXid);
 	}
 	TransactionIdRetreat(latestObservedXid);	/* = running->nextXid - 1 */
@@ -1511,6 +1532,7 @@ GetSnapshotData(Snapshot snapshot)
 	int			count = 0;
 	int			subcount = 0;
 	bool		suboverflowed = false;
+	XidCSN	xid_csn = FrozenXidCSN;
 	TransactionId replication_slot_xmin = InvalidTransactionId;
 	TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
 
@@ -1708,6 +1730,13 @@ GetSnapshotData(Snapshot snapshot)
 	if (!TransactionIdIsValid(MyPgXact->xmin))
 		MyPgXact->xmin = TransactionXmin = xmin;
 
+	/*
+	 * Take XidCSN under ProcArrayLock so the snapshot stays
+	 * synchronized.
+	 */
+	if (enable_csn_snapshot)
+		xid_csn = GenerateCSN(false);
+
 	LWLockRelease(ProcArrayLock);
 
 	/*
@@ -1778,6 +1807,8 @@ GetSnapshotData(Snapshot snapshot)
 		MaintainOldSnapshotTimeMapping(snapshot->whenTaken, xmin);
 	}
 
+	snapshot->snapshot_csn = xid_csn;
+
 	return snapshot;
 }
 
@@ -3335,6 +3366,7 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
 		while (TransactionIdPrecedes(next_expected_xid, xid))
 		{
 			TransactionIdAdvance(next_expected_xid);
+			ExtendCSNLog(next_expected_xid);
 			ExtendSUBTRANS(next_expected_xid);
 		}
 		Assert(next_expected_xid == xid);
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 2fa90cc095..77b8426e71 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -134,6 +134,8 @@ static const char *const BuiltinTrancheNames[] = {
 	"CommitTSBuffer",
 	/* LWTRANCHE_SUBTRANS_BUFFER: */
 	"SubtransBuffer",
+	/* LWTRANCHE_CSN_LOG_BUFFERS */
+	"CsnLogBuffer",
 	/* LWTRANCHE_MULTIXACTOFFSET_BUFFER: */
 	"MultiXactOffsetBuffer",
 	/* LWTRANCHE_MULTIXACTMEMBER_BUFFER: */
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index e6985e8eed..3c95ce4aac 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ MultiXactTruncationLock				41
 OldSnapshotTimeMapLock				42
 LogicalRepWorkerLock				43
 XactTruncationLock					44
+CSNLogControlLock				    45
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index f5eef6fa4e..da2868dd6f 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -37,6 +37,7 @@
 
 #include "access/transam.h"
 #include "access/twophase.h"
+#include "access/csn_snapshot.h"
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -441,6 +442,8 @@ InitProcess(void)
 	MyProc->clogGroupMemberLsn = InvalidXLogRecPtr;
 	Assert(pg_atomic_read_u32(&MyProc->clogGroupNext) == INVALID_PGPROCNO);
 
+	pg_atomic_init_u64(&MyProc->assignedXidCsn, InProgressXidCSN);
+
 	/*
 	 * Acquire ownership of the PGPROC's latch, so that we can use WaitLatch
 	 * on it.  That allows us to repoint the process latch, which so far
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 28b2fc72d6..6634804de6 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -28,6 +28,7 @@
 
 #include "access/commit_ts.h"
 #include "access/gin.h"
+#include "access/csn_snapshot.h"
 #include "access/rmgr.h"
 #include "access/tableam.h"
 #include "access/transam.h"
@@ -1163,6 +1164,15 @@ static struct config_bool ConfigureNamesBool[] =
 		false,
 		NULL, NULL, NULL
 	},
+	{
+		{"enable_csn_snapshot", PGC_POSTMASTER, RESOURCES_MEM,
+			gettext_noop("Enable csn-base snapshot."),
+			gettext_noop("Used to achieve REPEATEBLE READ isolation level for postgres_fdw transactions.")
+		},
+		&enable_csn_snapshot,
+		true, /* XXX: set true to simplify tesing. XXX2: Seems that RESOURCES_MEM isn't the best catagory */
+		NULL, NULL, NULL
+	},
 	{
 		{"ssl", PGC_SIGHUP, CONN_AUTH_SSL,
 			gettext_noop("Enables SSL connections."),
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index a0b0458108..679c531622 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -77,6 +77,8 @@ provider postgresql {
 	probe clog__checkpoint__done(bool);
 	probe subtrans__checkpoint__start(bool);
 	probe subtrans__checkpoint__done(bool);
+	probe csnlog__checkpoint__start(bool);
+	probe csnlog__checkpoint__done(bool);
 	probe multixact__checkpoint__start(bool);
 	probe multixact__checkpoint__done(bool);
 	probe twophase__checkpoint__start();
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 1c063c592c..e2baeb9222 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -229,6 +229,7 @@ static TimestampTz AlignTimestampToMinuteBoundary(TimestampTz ts);
 static Snapshot CopySnapshot(Snapshot snapshot);
 static void FreeSnapshot(Snapshot snapshot);
 static void SnapshotResetXmin(void);
+static bool XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot);
 
 /*
  * Snapshot fields to be serialized.
@@ -247,6 +248,7 @@ typedef struct SerializedSnapshotData
 	CommandId	curcid;
 	TimestampTz whenTaken;
 	XLogRecPtr	lsn;
+	XidCSN	xid_csn;
 } SerializedSnapshotData;
 
 Size
@@ -2115,6 +2117,7 @@ SerializeSnapshot(Snapshot snapshot, char *start_address)
 	serialized_snapshot.curcid = snapshot->curcid;
 	serialized_snapshot.whenTaken = snapshot->whenTaken;
 	serialized_snapshot.lsn = snapshot->lsn;
+	serialized_snapshot.xid_csn = snapshot->snapshot_csn;
 
 	/*
 	 * Ignore the SubXID array if it has overflowed, unless the snapshot was
@@ -2189,6 +2192,7 @@ RestoreSnapshot(char *start_address)
 	snapshot->curcid = serialized_snapshot.curcid;
 	snapshot->whenTaken = serialized_snapshot.whenTaken;
 	snapshot->lsn = serialized_snapshot.lsn;
+	snapshot->snapshot_csn = serialized_snapshot.xid_csn;
 
 	/* Copy XIDs, if present. */
 	if (serialized_snapshot.xcnt > 0)
@@ -2229,6 +2233,47 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
 
 /*
  * XidInMVCCSnapshot
+ *
+ * Check whether this xid is in snapshot. When enable_csn_snapshot is
+ * switched off just call XidInLocalMVCCSnapshot().
+ */
+bool
+XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+{
+	bool in_snapshot;
+
+	in_snapshot = XidInLocalMVCCSnapshot(xid, snapshot);
+
+	if (!enable_csn_snapshot)
+	{
+		Assert(XidCSNIsFrozen(snapshot->snapshot_csn));
+		return in_snapshot;
+	}
+
+	if (in_snapshot)
+	{
+		/*
+		 * This xid may be already in unknown state and in that case
+		 * we must wait and recheck.
+		 */
+		return XidInvisibleInCSNSnapshot(xid, snapshot);
+	}
+	else
+	{
+#ifdef USE_ASSERT_CHECKING
+		/* Check that csn snapshot gives the same results as local one */
+		if (XidInvisibleInCSNSnapshot(xid, snapshot))
+		{
+			XidCSN gcsn = TransactionIdGetXidCSN(xid);
+			Assert(XidCSNIsAborted(gcsn));
+		}
+#endif
+		return false;
+	}
+}
+
+/*
+ * XidInLocalMVCCSnapshot
  *		Is the given XID still-in-progress according to the snapshot?
  *
  * Note: GetSnapshotData never stores either top xid or subxids of our own
@@ -2237,8 +2282,8 @@ RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc)
  * TransactionIdIsCurrentTransactionId first, except when it's known the
  * XID could not be ours anyway.
  */
-bool
-XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
+static bool
+XidInLocalMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 {
 	uint32		i;
 
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index 786672b1b6..a52c01889d 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -220,7 +220,8 @@ static const char *const subdirs[] = {
 	"pg_xact",
 	"pg_logical",
 	"pg_logical/snapshots",
-	"pg_logical/mappings"
+	"pg_logical/mappings",
+	"pg_csn"
 };
 
 
diff --git a/src/include/access/csn_log.h b/src/include/access/csn_log.h
new file mode 100644
index 0000000000..9b9611127d
--- /dev/null
+++ b/src/include/access/csn_log.h
@@ -0,0 +1,30 @@
+/*
+ * csn_log.h
+ *
+ * Commit-Sequence-Number log.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/csn_log.h
+ */
+#ifndef CSNLOG_H
+#define CSNLOG_H
+
+#include "access/xlog.h"
+#include "utils/snapshot.h"
+
+extern void CSNLogSetCSN(TransactionId xid, int nsubxids,
+							   TransactionId *subxids, XidCSN csn);
+extern XidCSN CSNLogGetCSNByXid(TransactionId xid);
+
+extern Size CSNLogShmemSize(void);
+extern void CSNLogShmemInit(void);
+extern void BootStrapCSNLog(void);
+extern void StartupCSNLog(TransactionId oldestActiveXID);
+extern void ShutdownCSNLog(void);
+extern void CheckPointCSNLog(void);
+extern void ExtendCSNLog(TransactionId newestXact);
+extern void TruncateCSNLog(TransactionId oldestXact);
+
+#endif   /* CSNLOG_H */
\ No newline at end of file
diff --git a/src/include/access/csn_snapshot.h b/src/include/access/csn_snapshot.h
new file mode 100644
index 0000000000..1894586204
--- /dev/null
+++ b/src/include/access/csn_snapshot.h
@@ -0,0 +1,58 @@
+/*-------------------------------------------------------------------------
+ *
+ * csn_snapshot.h
+ *	  Support for cross-node snapshot isolation.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/csn_snapshot.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CSN_SNAPSHOT_H
+#define CSN_SNAPSHOT_H
+
+#include "port/atomics.h"
+#include "storage/lock.h"
+#include "utils/snapshot.h"
+#include "utils/guc.h"
+
+/*
+ * snapshot.h is used in frontend code so atomic variant of SnapshotCSN type
+ * is defined here.
+ */
+typedef pg_atomic_uint64 CSN_atomic;
+
+#define InProgressXidCSN	 	UINT64CONST(0x0)
+#define AbortedXidCSN	 		UINT64CONST(0x1)
+#define FrozenXidCSN		 	UINT64CONST(0x2)
+#define InDoubtXidCSN	 		UINT64CONST(0x3)
+#define FirstNormalXidCSN 		UINT64CONST(0x4)
+
+#define XidCSNIsInProgress(csn)	((csn) == InProgressXidCSN)
+#define XidCSNIsAborted(csn)		((csn) == AbortedXidCSN)
+#define XidCSNIsFrozen(csn)		((csn) == FrozenXidCSN)
+#define XidCSNIsInDoubt(csn)		((csn) == InDoubtXidCSN)
+#define XidCSNIsNormal(csn)		((csn) >= FirstNormalXidCSN)
+
+
+
+
+extern Size CSNSnapshotShmemSize(void);
+extern void CSNSnapshotShmemInit(void);
+
+extern SnapshotCSN GenerateCSN(bool locked);
+
+extern bool XidInvisibleInCSNSnapshot(TransactionId xid, Snapshot snapshot);
+
+extern XidCSN TransactionIdGetXidCSN(TransactionId xid);
+
+extern void CSNSnapshotAbort(PGPROC *proc, TransactionId xid, int nsubxids,
+								TransactionId *subxids);
+extern void CSNSnapshotPrecommit(PGPROC *proc, TransactionId xid, int nsubxids,
+									TransactionId *subxids);
+extern void CSNSnapshotCommit(PGPROC *proc, TransactionId xid, int nsubxids,
+									TransactionId *subxids);
+
+#endif							/* CSN_SNAPSHOT_H */
diff --git a/src/include/datatype/timestamp.h b/src/include/datatype/timestamp.h
index 6be6d35d1e..583b1beea5 100644
--- a/src/include/datatype/timestamp.h
+++ b/src/include/datatype/timestamp.h
@@ -93,6 +93,9 @@ typedef struct
 #define USECS_PER_MINUTE INT64CONST(60000000)
 #define USECS_PER_SEC	INT64CONST(1000000)
 
+#define NSECS_PER_SEC	INT64CONST(1000000000)
+#define NSECS_PER_USEC	INT64CONST(1000)
+
 /*
  * We allow numeric timezone offsets up to 15:59:59 either way from Greenwich.
  * Currently, the record holders for wackiest offsets in actual use are zones
diff --git a/src/include/fmgr.h b/src/include/fmgr.h
index d349510b7c..5cdf2e17cb 100644
--- a/src/include/fmgr.h
+++ b/src/include/fmgr.h
@@ -280,6 +280,7 @@ extern struct varlena *pg_detoast_datum_packed(struct varlena *datum);
 #define PG_GETARG_FLOAT4(n)  DatumGetFloat4(PG_GETARG_DATUM(n))
 #define PG_GETARG_FLOAT8(n)  DatumGetFloat8(PG_GETARG_DATUM(n))
 #define PG_GETARG_INT64(n)	 DatumGetInt64(PG_GETARG_DATUM(n))
+#define PG_GETARG_UINT64(n)	 DatumGetUInt64(PG_GETARG_DATUM(n))
 /* use this if you want the raw, possibly-toasted input datum: */
 #define PG_GETARG_RAW_VARLENA_P(n)	((struct varlena *) PG_GETARG_POINTER(n))
 /* use this if you want the input datum de-toasted: */
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index d6459327cc..4ac23da654 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -141,6 +141,9 @@ typedef struct timespec instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) ((t).tv_nsec / 1000))
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	(((uint64) (t).tv_sec * (uint64) 1000000000) + (uint64) ((t).tv_nsec))
+
 #else							/* !HAVE_CLOCK_GETTIME */
 
 /* Use gettimeofday() */
@@ -205,6 +208,10 @@ typedef struct timeval instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	(((uint64) (t).tv_sec * (uint64) 1000000) + (uint64) (t).tv_usec)
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	(((uint64) (t).tv_sec * (uint64) 1000000000) + \
+		(uint64) (t).tv_usec * (uint64) 1000)
+
 #endif							/* HAVE_CLOCK_GETTIME */
 
 #else							/* WIN32 */
@@ -237,6 +244,9 @@ typedef LARGE_INTEGER instr_time;
 #define INSTR_TIME_GET_MICROSEC(t) \
 	((uint64) (((double) (t).QuadPart * 1000000.0) / GetTimerFrequency()))
 
+#define INSTR_TIME_GET_NANOSEC(t) \
+	((uint64) (((double) (t).QuadPart * 1000000000.0) / GetTimerFrequency()))
+
 static inline double
 GetTimerFrequency(void)
 {
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index c04ae97148..3b9d248913 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -197,6 +197,7 @@ typedef enum BuiltinTrancheIds
 	LWTRANCHE_XACT_BUFFER = NUM_INDIVIDUAL_LWLOCKS,
 	LWTRANCHE_COMMITTS_BUFFER,
 	LWTRANCHE_SUBTRANS_BUFFER,
+	LWTRANCHE_CSN_LOG_BUFFERS,
 	LWTRANCHE_MULTIXACTOFFSET_BUFFER,
 	LWTRANCHE_MULTIXACTMEMBER_BUFFER,
 	LWTRANCHE_NOTIFY_BUFFER,
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 1ee9000b2b..8c8df6049e 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,8 +15,10 @@
 #define _PROC_H_
 
 #include "access/clog.h"
+#include "access/csn_snapshot.h"
 #include "access/xlogdefs.h"
 #include "lib/ilist.h"
+#include "utils/snapshot.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
@@ -203,6 +205,16 @@ struct PGPROC
 	PGPROC	   *lockGroupLeader;	/* lock group leader, if I'm a member */
 	dlist_head	lockGroupMembers;	/* list of members, if I'm a leader */
 	dlist_node	lockGroupLink;	/* my member link, if I'm a member */
+
+	/*
+	 * assignedXidCsn holds XidCSN for this transaction.  It is generated
+	 * under a ProcArray lock and later is writter to a CSNLog.  This
+	 * variable defined as atomic only for case of group commit, in all other
+	 * scenarios only backend responsible for this proc entry is working with
+	 * this variable.
+	 */
+	CSN_atomic assignedXidCsn;
+
 };
 
 /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */
diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index 4796edb63a..9f622c76a7 100644
--- a/src/include/utils/snapshot.h
+++ b/src/include/utils/snapshot.h
@@ -121,6 +121,9 @@ typedef enum SnapshotType
 typedef struct SnapshotData *Snapshot;
 
 #define InvalidSnapshot		((Snapshot) NULL)
+typedef uint64 XidCSN;
+typedef uint64 SnapshotCSN;
+extern bool enable_csn_snapshot;
 
 /*
  * Struct representing all kind of possible snapshots.
@@ -201,6 +204,12 @@ typedef struct SnapshotData
 
 	TimestampTz whenTaken;		/* timestamp when snapshot was taken */
 	XLogRecPtr	lsn;			/* position in the WAL stream when taken */
+
+	/*
+	 * SnapshotCSN for snapshot isolation support.
+	 * Will be used only if enable_csn_snapshot is enabled.
+	 */
+	SnapshotCSN	snapshot_csn;
 } SnapshotData;
 
 #endif							/* SNAPSHOT_H */
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index a126f0ad61..86a5df0cba 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -73,6 +73,7 @@ select name, setting from pg_settings where name like 'enable%';
               name              | setting 
 --------------------------------+---------
  enable_bitmapscan              | on
+ enable_csn_snapshot            | on
  enable_gathermerge             | on
  enable_groupingsets_hash_disk  | off
  enable_hashagg                 | on
@@ -92,7 +93,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(20 rows)
+(21 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail
