From fc203704634e9561680550fd7800e908e8816932 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Fri, 26 Jan 2024 11:31:41 +0900
Subject: [PATCH v6 3/3] Use max-heap to efficiently select largest
 transactions in ReorderBuffer.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Previously, when selecting the transaction to evict during logical
decoding, we check all transactions to find the largest
transaction. Which could lead to a significant replication lag
especially in case where there are many subtransactions.

This commit improves the eviction algorithm in ReorderBuffer using the
max-heap with transaction size as the key to find the largest
transaction depending on the number of transactions being decoded.

Overall algorithm:

There are two memory track states: REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP
and REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP.

REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP is the starting state, where we do
not update the max-heap when updating the memory counter. We build the
max-heap just before selecting large transactions. Therefore, in this
state, we can update the memory counter with no additional costs but
need O(n) time to get the largest transaction, where n is the number of
transactions including top-level transactions and subtransactions.

Once we build the max-heap, we switch to
REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP state, where we also update
the max-heap when updating the memory counter. The intention is to
efficiently retrieve the largest transaction in O(1) time instead of
incurring the cost of memory counter updates (O(log n)). We remain in
this state as long as the number of transactions is larger than the
threshold, REORDER_BUFFER_MEM_TRACK_THRESHOLD. Otherwise, we switch back
to REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and reset the max-heap.

The performance benchmark results showed significant speed up (more
than x30 speed up on my machine) in decoding a transaction with 100k
subtransactions, whereas there is no visible overhead in other cases.

XXX: update typedef.list

Reviewed-by: Amit Kapila, Hayato Kuroda, Vignesh C, Ajin Cherian,
Tomas Vondra, Shubham Khanna, Álvaro Herrera, Euler Taveira
Discussion: https://postgr.es/m/CAD21AoAfKTgrBrLq96GcTv9d6k97zaQcDM-rxfKEt4GSe0qnaQ%40mail.gmail.com
---
 .../replication/logical/reorderbuffer.c       | 183 +++++++++++++++---
 src/include/replication/reorderbuffer.h       |  21 ++
 2 files changed, 176 insertions(+), 28 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 91b9618d7e..f077e998a3 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -67,6 +67,26 @@
  *	  allocator, evicting the oldest changes would make it more likely the
  *	  memory gets actually freed.
  *
+ *	  We use a max-heap with transaction size as the key to efficiently find
+ *	  the largest transaction. The max-heap state is managed in two states:
+ *	  REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP.
+ *
+ *	  REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP is the starting state, where we do
+ *	  not update the max-heap when updating the memory counter. We build the
+ *	  max-heap just before selecting large transactions. Therefore, in this
+ *	  state, we can update the memory counter with no additional costs but
+ *	  need O(n) time to get the largest transaction, where n is the number of
+ *	  transactions including top-level transactions and subtransactions.
+ *
+ *	  Once we build the max-heap, we switch to
+ *	  REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP state, where we also update
+ *	  the max-heap when updating the memory counter. The intention is to
+ *	  efficiently retrieve the largest transaction in O(1) time instead of
+ *	  incurring the cost of memory counter updates (O(log n)). We remain in
+ *	  this state as long as the number of transactions is larger than the
+ *	  threshold, REORDER_BUFFER_MEM_TRACK_THRESHOLD. Otherwise, we switch back
+ *	  to REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP and reset the max-heap.
+ *
  *	  We still rely on max_changes_in_memory when loading serialized changes
  *	  back into memory. At that point we can't use the memory limit directly
  *	  as we load the subxacts independently. One option to deal with this
@@ -109,6 +129,11 @@
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
 
+/*
+ * The threshold of the number of transactions in the max-heap (rb->txn_heap)
+ * to switch the state.
+ */
+#define REORDER_BUFFER_MEM_TRACK_THRESHOLD 1024
 
 /* entry for a hash table we use to map from xid to our transaction state */
 typedef struct ReorderBufferTXNByIdEnt
@@ -295,7 +320,10 @@ static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *t
 static Size ReorderBufferChangeSize(ReorderBufferChange *change);
 static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
+											ReorderBufferTXN *txn,
 											bool addition, Size sz);
+static int ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg);
+static void ReorderBufferMaybeChangeNoMaxHeap(ReorderBuffer *rb);
 
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
@@ -357,6 +385,15 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	/*
+	 * Don't start with a lower number than REORDER_BUFFER_MEM_TRACK_THRESHOLD, since
+	 * we add at least REORDER_BUFFER_MEM_TRACK_THRESHOLD entries at once.
+	 */
+	buffer->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP;
+	buffer->txn_heap = binaryheap_allocate(REORDER_BUFFER_MEM_TRACK_THRESHOLD * 2,
+										   ReorderBufferTXNSizeCompare,
+										   true, NULL);
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -487,7 +524,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
 {
 	/* update memory accounting info */
 	if (upd_mem)
-		ReorderBufferChangeMemoryUpdate(rb, change, false,
+		ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
 										ReorderBufferChangeSize(change));
 
 	/* free contained data */
@@ -818,7 +855,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	txn->nentries_mem++;
 
 	/* update memory accounting information */
-	ReorderBufferChangeMemoryUpdate(rb, change, true,
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
 
 	/* process partial change */
@@ -1529,7 +1566,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		/* Check we're not mixing changes from different transactions. */
 		Assert(change->txn == txn);
 
-		ReorderBufferReturnChange(rb, change, true);
+		ReorderBufferReturnChange(rb, change, false);
 	}
 
 	/*
@@ -1588,8 +1625,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	if (rbtxn_is_serialized(txn))
 		ReorderBufferRestoreCleanup(rb, txn);
 
+	/* Update the memory counter */
+	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
 	/* deallocate */
 	ReorderBufferReturnTXN(rb, txn);
+
+	/* check the memory track state */
+	ReorderBufferMaybeChangeNoMaxHeap(rb);
 }
 
 /*
@@ -3172,26 +3215,32 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
  * counters instead - we don't really care about subtransactions as we
  * can't stream them individually anyway, and we only pick toplevel
  * transactions for eviction. So only toplevel transactions matter.
+ *
+ * XXX Not sure the naming is great, it seems pretty similar to the earlier
+ * function, can be quite confusing. Why do we even need the separate function
+ * and can't simply call ReorderBufferChangeMemoryUpdate from everywhere?
  */
 static void
 ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 								ReorderBufferChange *change,
+								ReorderBufferTXN *txn,
 								bool addition, Size sz)
 {
-	ReorderBufferTXN *txn;
 	ReorderBufferTXN *toptxn;
 
-	Assert(change->txn);
-
 	/*
 	 * Ignore tuple CID changes, because those are not evicted when reaching
 	 * memory limit. So we just don't count them, because it might easily
 	 * trigger a pointless attempt to spill.
 	 */
-	if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
+	if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
+		return;
+
+	if (sz == 0)
 		return;
 
-	txn = change->txn;
+	txn = txn != NULL ? txn : change->txn;
+	Assert(txn != NULL);
 
 	/*
 	 * Update the total size in top level as well. This is later used to
@@ -3206,6 +3255,15 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size += sz;
+
+		/* Update the max-heap as well if necessary */
+		if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP)
+		{
+			if ((txn->size - sz) == 0)
+				binaryheap_add(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_update_up(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 	else
 	{
@@ -3215,11 +3273,43 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 
 		/* Update the total size in the top transaction. */
 		toptxn->total_size -= sz;
+
+		/* Update the max-heap as well if necessary */
+		if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP)
+		{
+			if (txn->size == 0)
+				binaryheap_remove_node_ptr(rb->txn_heap, PointerGetDatum(txn));
+			else
+				binaryheap_update_down(rb->txn_heap, PointerGetDatum(txn));
+		}
 	}
 
 	Assert(txn->size <= rb->size);
 }
 
+/*
+ * Switch to NO_MAXHEAP state and reset the max-heap if the number of
+ * transactions got lower than the threshold.
+ */
+static void
+ReorderBufferMaybeChangeNoMaxHeap(ReorderBuffer *rb)
+{
+	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP)
+		return;
+
+	/*
+	 * If we add and remove transactions right around the threshold,
+	 * we could easily end up "thrashing". It is more efficient if we
+	 * accept a certain amount, say 90%, of transactions to switch back
+	 * to the NO_MAXHEAP state.
+	 */
+	if (binaryheap_size(rb->txn_heap) < REORDER_BUFFER_MEM_TRACK_THRESHOLD * 0.9)
+	{
+		rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP;
+		binaryheap_reset(rb->txn_heap);
+	}
+}
+
 /*
  * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
  *
@@ -3472,31 +3562,45 @@ ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
 
 /*
  * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
- *
- * XXX With many subtransactions this might be quite slow, because we'll have
- * to walk through all of them. There are some options how we could improve
- * that: (a) maintain some secondary structure with transactions sorted by
- * amount of changes, (b) not looking for the entirely largest transaction,
- * but e.g. for transaction using at least some fraction of the memory limit,
- * and (c) evicting multiple transactions at once, e.g. to free a given portion
- * of the memory limit (e.g. 50%).
  */
 static ReorderBufferTXN *
 ReorderBufferLargestTXN(ReorderBuffer *rb)
 {
-	HASH_SEQ_STATUS hash_seq;
-	ReorderBufferTXNByIdEnt *ent;
 	ReorderBufferTXN *largest = NULL;
 
-	hash_seq_init(&hash_seq, rb->by_txn);
-	while ((ent = hash_seq_search(&hash_seq)) != NULL)
+	/*
+	 * Build the max-heap to pick the largest transaction if not yet. We will
+	 * run a heap assembly step at the end, which is more efficient.
+	 */
+	if (rb->memtrack_state == REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP)
 	{
-		ReorderBufferTXN *txn = ent->txn;
+		HASH_SEQ_STATUS hash_seq;
+		ReorderBufferTXNByIdEnt *ent;
 
-		/* if the current transaction is larger, remember it */
-		if ((!largest) || (txn->size > largest->size))
-			largest = txn;
+		hash_seq_init(&hash_seq, rb->by_txn);
+		while ((ent = hash_seq_search(&hash_seq)) != NULL)
+		{
+			ReorderBufferTXN *txn = ent->txn;
+
+			if (txn->size == 0)
+				continue;
+
+			binaryheap_add_unordered(rb->txn_heap, PointerGetDatum(txn));
+		}
+
+		binaryheap_build(rb->txn_heap);
+
+		/*
+		 * The max-heap is ready now. We remain in this state at least until
+		 * we free up enough transactions to bring the total memory usage
+		 * below the limit.
+		 */
+		rb->memtrack_state = REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP;
 	}
+	else
+		Assert(binaryheap_size(rb->txn_heap) > 0);
+
+	largest = (ReorderBufferTXN *) DatumGetPointer(binaryheap_first(rb->txn_heap));
 
 	Assert(largest);
 	Assert(largest->size > 0);
@@ -3638,6 +3742,9 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 		Assert(txn->nentries_mem == 0);
 	}
 
+	/* check the memory track state */
+	ReorderBufferMaybeChangeNoMaxHeap(rb);
+
 	/* We must be under the memory limit now. */
 	Assert(rb->size < logical_decoding_work_mem * 1024L);
 }
@@ -3707,11 +3814,14 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 		ReorderBufferSerializeChange(rb, txn, fd, change);
 		dlist_delete(&change->node);
-		ReorderBufferReturnChange(rb, change, true);
+		ReorderBufferReturnChange(rb, change, false);
 
 		spilled++;
 	}
 
+	/* Update the memory counter */
+	ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+
 	/* update the statistics iff we have spilled anything */
 	if (spilled)
 	{
@@ -4493,7 +4603,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	 * update the accounting too (subtracting the size from the counters). And
 	 * we don't want to underflow there.
 	 */
-	ReorderBufferChangeMemoryUpdate(rb, change, true,
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
 }
 
@@ -4905,9 +5015,9 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	MemoryContextSwitchTo(oldcontext);
 
 	/* subtract the old change size */
-	ReorderBufferChangeMemoryUpdate(rb, change, false, old_size);
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
 	/* now add the change back, with the correct size */
-	ReorderBufferChangeMemoryUpdate(rb, change, true,
+	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
 }
 
@@ -5273,3 +5383,20 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Compare between sizes of two transactions. This is for a binary heap
+ * comparison function.
+ */
+static int
+ReorderBufferTXNSizeCompare(Datum a, Datum b, void *arg)
+{
+	ReorderBufferTXN	*ta = (ReorderBufferTXN *) DatumGetPointer(a);
+	ReorderBufferTXN	*tb = (ReorderBufferTXN *) DatumGetPointer(b);
+
+	if (ta->size < tb->size)
+		return -1;
+	if (ta->size > tb->size)
+		return 1;
+	return 0;
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa..f0d352cfcc 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "lib/binaryheap.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -531,6 +532,22 @@ typedef void (*ReorderBufferUpdateProgressTxnCB) (
 												  ReorderBufferTXN *txn,
 												  XLogRecPtr lsn);
 
+/* State of how to track the memory usage of each transaction being decoded */
+typedef enum ReorderBufferMemTrackState
+{
+	/*
+	 * We don't update max-heap while updating the memory counter. The
+	 * max-heap is built before use.
+	 */
+	REORDER_BUFFER_MEM_TRACK_NO_MAXHEAP,
+
+	/*
+	 * We also update the max-heap when updating the memory counter so
+	 * the heap property is always preserved.
+	 */
+	REORDER_BUFFER_MEM_TRACK_MAINTAIN_MAXHEAP,
+} ReorderBufferMemTrackState;
+
 struct ReorderBuffer
 {
 	/*
@@ -631,6 +648,10 @@ struct ReorderBuffer
 	/* memory accounting */
 	Size		size;
 
+	/* Max-heap for sizes of all top-level and sub transactions */
+	ReorderBufferMemTrackState memtrack_state;
+	binaryheap	*txn_heap;
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
-- 
2.39.3

