From 037aac9afdc988608afb8cac231a11ce7d4d830b Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 26 Jan 2024 11:31:41 +0900
Subject: [PATCH v7 3/3] Improve eviction algorithm in Reorderbuffer using
 max-heap for many subtransactions.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Previously, when selecting the transaction to evict during logical
decoding, we check all transactions to find the largest
transaction. Which could lead to a significant replication lag
especially in case where there are many subtransactions.

This commit improves the eviction algorithm in ReorderBuffer using the
max-heap with transaction size as the key to efficiently find the
largest transaction.

Overall algorithm:

There are two memory track states: REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP
and REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP.

REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP is the starting state, where we do
not update the max-heap when updating the memory counter. We build the
max-heap just before selecting large transactions. Therefore, in this
state, we can update the memory counter with no additional costs but
need O(n) time to get the largest transaction, where n is the number of
transactions including top-level transactions and subtransactions.

Once we build the max-heap, we switch to
REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP state, where we also update
the max-heap when updating the memory counter. The intention is to
efficiently retrieve the largest transaction in O(1) time instead of
incurring the cost of memory counter updates (O(log n)). To minimize
the overhead of maintaining the max-heap, we batch memory updates when
cleaning up all changes. We remain in this state as long as the number
of transactions is larger than the threshold,
REORDER_BUFFER_MEM_TRACK_THRESHOLD. Otherwise, we switch back to
REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and reset the max-heap.

The performance benchmark results showed significant speed up (more
than x30 speed up on my machine) in decoding a transaction with 100k
subtransactions, whereas there is no visible overhead in other cases.

Reviewed-by: Amit Kapila, Hayato Kuroda, Vignesh C, Ajin Cherian,
Tomas Vondra, Shubham Khanna, Álvaro Herrera, Euler Taveira
Discussion: https://postgr.es/m/CAD21AoAfKTgrBrLq96GcTv9d6k97zaQcDM-rxfKEt4GSe0qnaQ%40mail.gmail.com
---
 .../replication/logical/reorderbuffer.c       | 188 +++++++++++++++---
 src/include/replication/reorderbuffer.h       |  21 ++
 src/tools/pgindent/typedefs.list              |   1 +
 3 files changed, 181 insertions(+), 29 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 91b9618d7e..3bc40fd7b6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -67,6 +67,26 @@
  *	  allocator, evicting the oldest changes would make it more likely the
  *	  memory gets actually freed.
  *
+ *	  We use a max-heap with transaction size as the key to efficiently find
+ *	  the largest transaction. The max-heap state is managed in two states:
+ *	  REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP.
+ *
+ *	  REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP is the starting state, where we do
+ *	  not update the max-heap when updating the memory counter. We build the
+ *	  max-heap just before selecting large transactions. Therefore, in this
+ *	  state, we can update the memory counter with no additional costs but
+ *	  need O(n) time to get the largest transaction, where n is the number of
+ *	  transactions including top-level transactions and subtransactions.
+ *
+ *	  Once we build the max-heap, we switch to
+ *	  REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP state, where we also update
+ *	  the max-heap when updating the memory counter. The intention is to
+ *	  efficiently retrieve the largest transaction in O(1) time instead of
+ *	  incurring the cost of memory counter updates (O(log n)). We remain in
+ *	  this state as long as the number of transactions is larger than the
+ *	  threshold, REORDER_BUFFER_MEM_TRACK_THRESHOLD. Otherwise, we switch back
+ *	  to REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and reset the max-heap.
+ *
  *	  We still rely on max_changes_in_memory when loading serialized changes
  *	  back into memory. At that point we can't use the memory limit directly
  *	  as we load the subxacts independently. One option to deal with this
@@ -109,6 +129,15 @@
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
 
+/*
+ * Threshold of the total number of top-level and sub transactions that controls
+ * whether we switch the memory track state. While the MAINTAIN_HEAP state is
+ * effective when there are many transactions being decoded, in many systems
+ * there is generally no need to use it as long as all transactions being decoded
+ * are top-level transactions. Therefore, we use MaxConnections as the threshold
+ * so we can prevent switch to the state unless we use subtransactions.
+ */
+#define REORDER_BUFFER_MEM_TRACK_THRESHOLD	MaxConnections
 
 /* entry for a hash table we use to map from xid to our transaction state */
 typedef struct ReorderBufferTXNByIdEnt
@@ -295,7 +324,10 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
 static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
+											ReorderBufferTXN *txn,
 											bool addition, Size sz);
+static int	ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
+static void ReorderBufferMaybeChangeNoMaxHeap(ReorderBuffer *rb);
 
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
@@ -357,6 +389,16 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	/*
+	 * Don't start with a lower number than
+	 * REORDER_BUFFER_MEM_TRACK_THRESHOLD, since we add at least
+	 * REORDER_BUFFER_MEM_TRACK_THRESHOLD entries at once.
+	 */
+	buffer->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP;
+	buffer->txn_heap = binaryheap_allocate(REORDER_BUFFER_MEM_TRACK_THRESHOLD * 2,
+										   ReorderBufferTXNSizeCompare,
+										   true, NULL);
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -487,7 +529,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 {
 	/* update memory accounting info */
 	if (upd_mem)
-		ReorderBufferChangeMemoryUpdate(rb, change, false,
+		ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
 										ReorderBufferChangeSize(change));
 
 	/* free contained data */
@@ -818,7 +860,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	txn->nentries_mem++;
 
 	/* update memory accounting information */
-	ReorderBufferChangeMemoryUpdate(rb, change, true,
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
 
 	/* process partial change */
@@ -1529,7 +1571,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		/* Check we're not mixing changes from different transactions. */
 		Assert(change->txn == txn);
 
-		ReorderBufferReturnChange(rb, change, true);
+		ReorderBufferReturnChange(rb, change, false);
 	}
 
 	/*
@@ -1588,8 +1630,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
 
+	/* Update the memory counter */
+	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
 	/* deallocate */
 	ReorderBufferReturnTXN(rb, txn);
+
+	/* check the memory track state */
+	ReorderBufferMaybeChangeNoMaxHeap(rb);
 }
 
 /*
@@ -1639,9 +1687,12 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 		/* remove the change from it's containing list */
 		dlist_delete(&change->node);
 
-		ReorderBufferReturnChange(rb, change, true);
+		ReorderBufferReturnChange(rb, change, false);
 	}
 
+	/* Update the memory counter */
+	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
 	/*
 	 * Mark the transaction as streamed.
 	 *
@@ -3176,22 +3227,24 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
 static void
 ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 								ReorderBufferChange *change,
+								ReorderBufferTXN *txn,
 								bool addition, Size sz)
 {
-	ReorderBufferTXN *txn;
 	ReorderBufferTXN *toptxn;
 
-	Assert(change->txn);
-
 	/*
 	 * Ignore tuple CID changes, because those are not evicted when reaching
 	 * memory limit. So we just don't count them, because it might easily
 	 * trigger a pointless attempt to spill.
 	 */
-	if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
+	if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
 		return;
 
-	txn = change->txn;
+	if (sz == 0)
+		return;
+
+	txn = txn != NULL ? txn : change->txn;
+	Assert(txn != NULL);
 
 	/*
 	 * Update the total size in top level as well. This is later used to
@@ -3206,6 +3259,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size += sz;
+
+		/* Update the max-heap as well if necessary */
+		if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP)
+		{
+			if ((txn->size - sz) == 0)
+				binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 	else
 	{
@@ -3215,11 +3277,42 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size -= sz;
+
+		/* Update the max-heap as well if necessary */
+		if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP)
+		{
+			if (txn->size == 0)
+				binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 
 	Assert(txn->size <= rb->size);
 }
 
+/*
+ * Switch to NO_MAXHEAP state and reset the max-heap if the number of
+ * transactions got lower than the threshold.
+ */
+static void
+ReorderBufferMaybeChangeNoMaxHeap(ReorderBuffer *rb)
+{
+	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP)
+		return;
+
+	/*
+	 * If we add and remove transactions right around the threshold, we could
+	 * easily end up "thrashing". To avoid it, we adapt 10% of transactions to
+	 * switch back to the NO_MAXHEAP state.
+	 */
+	if (binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD * 0.9)
+	{
+		rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP;
+		binaryheap_reset(rb->txn_heap);
+	}
+}
+
 /*
  * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
  *
@@ -3472,31 +3565,45 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
 
 /*
  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
- *
- * XXX With many subtransactions this might be quite slow, because we'll have
- * to walk through all of them. There are some options how we could improve
- * that: (a) maintain some secondary structure with transactions sorted by
- * amount of changes, (b) not looking for the entirely largest transaction,
- * but e.g. for transaction using at least some fraction of the memory limit,
- * and (c) evicting multiple transactions at once, e.g. to free a given portion
- * of the memory limit (e.g. 50%).
  */
 static ReorderBufferTXN *
 ReorderBufferLargestTXN(ReorderBuffer *rb)
 {
-	HASH_SEQ_STATUS hash_seq;
-	ReorderBufferTXNByIdEnt *ent;
 	ReorderBufferTXN *largest = NULL;
 
-	hash_seq_init(&hash_seq, rb->by_txn);
-	while ((ent = hash_seq_search(&hash_seq)) != NULL)
+	/*
+	 * Build the max-heap to pick the largest transaction if not yet. We will
+	 * run a heap assembly step at the end, which is more efficient.
+	 */
+	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP)
 	{
-		ReorderBufferTXN *txn = ent->txn;
+		HASH_SEQ_STATUS hash_seq;
+		ReorderBufferTXNByIdEnt *ent;
 
-		/* if the current transaction is larger, remember it */
-		if ((!largest) || (txn->size > largest->size))
-			largest = txn;
+		hash_seq_init(&hash_seq, rb->by_txn);
+		while ((ent = hash_seq_search(&hash_seq)) != NULL)
+		{
+			ReorderBufferTXN *txn = ent->txn;
+
+			if (txn->size == 0)
+				continue;
+
+			binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
+		}
+
+		binaryheap_build(rb->txn_heap);
+
+		/*
+		 * The max-heap is ready now. We remain in this state at least until
+		 * we free up enough transactions to bring the total memory usage
+		 * below the limit.
+		 */
+		rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP;
 	}
+	else
+		Assert(binaryheap_size(rb->txn_heap) > 0);
+
+	largest = (ReorderBufferTXN *) DatumGetPointer(binaryheap_first(rb->txn_heap));
 
 	Assert(largest);
 	Assert(largest->size > 0);
@@ -3638,6 +3745,9 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 		Assert(txn->nentries_mem == 0);
 	}
 
+	/* check the memory track state */
+	ReorderBufferMaybeChangeNoMaxHeap(rb);
+
 	/* We must be under the memory limit now. */
 	Assert(rb->size < logical_decoding_work_mem * 1024L);
 }
@@ -3707,11 +3817,14 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 		ReorderBufferSerializeChange(rb, txn, fd, change);
 		dlist_delete(&change->node);
-		ReorderBufferReturnChange(rb, change, true);
+		ReorderBufferReturnChange(rb, change, false);
 
 		spilled++;
 	}
 
+	/* Update the memory counter */
+	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
 	/* update the statistics iff we have spilled anything */
 	if (spilled)
 	{
@@ -4493,7 +4606,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	 * update the accounting too (subtracting the size from the counters). And
 	 * we don't want to underflow there.
 	 */
-	ReorderBufferChangeMemoryUpdate(rb, change, true,
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
 }
 
@@ -4905,9 +5018,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	MemoryContextSwitchTo(oldcontext);
 
 	/* subtract the old change size */
-	ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
 	/* now add the change back, with the correct size */
-	ReorderBufferChangeMemoryUpdate(rb, change, true,
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
 }
 
@@ -5273,3 +5386,20 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Compare between sizes of two transactions. This is for a binary heap
+ * comparison function.
+ */
+static int
+ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
+{
+	ReorderBufferTXN *ta = (ReorderBufferTXN *) DatumGetPointer(a);
+	ReorderBufferTXN *tb = (ReorderBufferTXN *) DatumGetPointer(b);
+
+	if (ta->size < tb->size)
+		return -1;
+	if (ta->size > tb->size)
+		return 1;
+	return 0;
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa..1f0ad2b94e 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "lib/binaryheap.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -531,6 +532,22 @@ typedef void (*ReorderBufferUpdateProgressTxnCB) (
 												  ReorderBufferTXN *txn,
 												  XLogRecPtr lsn);
 
+/* State of how to track the memory usage of each transaction being decoded */
+typedef enum ReorderBufferMemTrackState
+{
+	/*
+	 * We don't update max-heap while updating the memory counter. The
+	 * max-heap is built before use.
+	 */
+	REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP,
+
+	/*
+	 * We also update the max-heap when updating the memory counter so the
+	 * heap property is always preserved.
+	 */
+	REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP,
+} ReorderBufferMemTrackState;
+
 struct ReorderBuffer
 {
 	/*
@@ -631,6 +648,10 @@ struct ReorderBuffer
 	/* memory accounting */
 	Size		size;
 
+	/* Max-heap for sizes of all top-level and sub transactions */
+	ReorderBufferMemTrackState memtrack_state;
+	binaryheap *txn_heap;
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 82ee10afac..b672853858 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -4056,3 +4056,4 @@ ws_options
 ws_file_info
 PathKeyInfo
 bh_nodeidx_entry
+ReorderBufferMemTrackState
-- 
2.39.3

