From 3ebe28045217778bb145ff12c91e593b940e4818 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 15 May 2024 05:43:48 +0000
Subject: [PATCH v21 4/5] Optimize Logical Replication apply with multi inserts

---
 src/backend/executor/execReplication.c   |  39 +++
 src/backend/replication/logical/proto.c  |  24 ++
 src/backend/replication/logical/worker.c | 351 ++++++++++++++++++++++-
 src/include/executor/executor.h          |   4 +
 src/include/replication/logicalproto.h   |   2 +
 src/tools/pgindent/typedefs.list         |   2 +
 6 files changed, 409 insertions(+), 13 deletions(-)

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index d0a89cd577..fae1375537 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -544,6 +544,45 @@ ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 	}
 }
 
+void
+ExecRelationMultiInsert(TableModifyState *MultiInsertState,
+						ResultRelInfo *resultRelInfo,
+						EState *estate, TupleTableSlot *slot)
+{
+	bool		skip_tuple = false;
+	Relation	rel = resultRelInfo->ri_RelationDesc;
+
+	/* For now we support only tables. */
+	Assert(rel->rd_rel->relkind == RELKIND_RELATION);
+
+	CheckCmdReplicaIdentity(rel, CMD_INSERT);
+
+	/* BEFORE ROW INSERT Triggers */
+	if (resultRelInfo->ri_TrigDesc &&
+		resultRelInfo->ri_TrigDesc->trig_insert_before_row)
+	{
+		if (!ExecBRInsertTriggers(estate, resultRelInfo, slot))
+			skip_tuple = true;	/* "do nothing" */
+	}
+
+	if (!skip_tuple)
+	{
+		/* Compute stored generated columns */
+		if (rel->rd_att->constr &&
+			rel->rd_att->constr->has_generated_stored)
+			ExecComputeStoredGenerated(resultRelInfo, estate, slot,
+									   CMD_INSERT);
+
+		/* Check the constraints of the tuple */
+		if (rel->rd_att->constr)
+			ExecConstraints(resultRelInfo, slot, estate);
+		if (rel->rd_rel->relispartition)
+			ExecPartitionCheck(resultRelInfo, slot, estate, true);
+
+		table_modify_buffer_insert(MultiInsertState, slot);
+	}
+}
+
 /*
  * Find the searchslot tuple and update it with data in the slot,
  * update the indexes, and execute any constraints and per-row triggers.
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 95c09c9516..46d38aebd2 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -427,6 +427,30 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
 	logicalrep_write_tuple(out, rel, newslot, binary, columns);
 }
 
+LogicalRepRelId
+logicalrep_read_relid(StringInfo in)
+{
+	LogicalRepRelId relid;
+
+	/* read the relation id */
+	relid = pq_getmsgint(in, 4);
+
+	return relid;
+}
+
+void
+logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup)
+{
+	char		action;
+
+	action = pq_getmsgbyte(in);
+	if (action != 'N')
+		elog(ERROR, "expected new tuple but got %d",
+			 action);
+
+	logicalrep_read_tuple(in, newtup);
+}
+
 /*
  * Read INSERT from stream.
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe3e8..d62772f590 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -148,7 +148,6 @@
 #include <unistd.h>
 
 #include "access/table.h"
-#include "access/tableam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/indexing.h"
@@ -416,6 +415,30 @@ static inline void reset_apply_error_context_info(void);
 static TransApplyAction get_transaction_apply_action(TransactionId xid,
 													 ParallelApplyWorkerInfo **winfo);
 
+typedef enum LRMultiInsertReturnStatus
+{
+	LR_MULTI_INSERT_NONE = 0,
+	LR_MULTI_INSERT_REL_SKIPPED,
+	LR_MULTI_INSERT_DISALLOWED,
+	LR_MULTI_INSERT_DONE,
+} LRMultiInsertReturnStatus;
+
+static TableModifyState *MultiInsertState = NULL;
+static LogicalRepRelMapEntry *LastRel = NULL;
+static LogicalRepRelId LastMultiInsertRelId = InvalidOid;
+static ApplyExecutionData *LastEData = NULL;
+static TupleTableSlot *LastRemoteSlot = NULL;
+
+typedef struct LRModifyBufferFlushContext
+{
+	ResultRelInfo *resultRelInfo;
+	EState	   *estate;
+} LRModifyBufferFlushContext;
+
+static LRModifyBufferFlushContext *modify_buffer_flush_context = NULL;
+static void LRModifyBufferFlushCallback(void *context, TupleTableSlot *slot);
+static void FinishMultiInserts(void);
+
 /*
  * Form the origin name for the subscription.
  *
@@ -1017,6 +1040,8 @@ apply_handle_commit(StringInfo s)
 {
 	LogicalRepCommitData commit_data;
 
+	FinishMultiInserts();
+
 	logicalrep_read_commit(s, &commit_data);
 
 	if (commit_data.commit_lsn != remote_final_lsn)
@@ -1043,6 +1068,8 @@ apply_handle_begin_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData begin_data;
 
+	FinishMultiInserts();
+
 	/* Tablesync should never receive prepare. */
 	if (am_tablesync_worker())
 		ereport(ERROR,
@@ -1109,6 +1136,8 @@ apply_handle_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData prepare_data;
 
+	FinishMultiInserts();
+
 	logicalrep_read_prepare(s, &prepare_data);
 
 	if (prepare_data.prepare_lsn != remote_final_lsn)
@@ -1171,6 +1200,8 @@ apply_handle_commit_prepared(StringInfo s)
 	LogicalRepCommitPreparedTxnData prepare_data;
 	char		gid[GIDSIZE];
 
+	FinishMultiInserts();
+
 	logicalrep_read_commit_prepared(s, &prepare_data);
 	set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_lsn);
 
@@ -1220,6 +1251,8 @@ apply_handle_rollback_prepared(StringInfo s)
 	LogicalRepRollbackPreparedTxnData rollback_data;
 	char		gid[GIDSIZE];
 
+	FinishMultiInserts();
+
 	logicalrep_read_rollback_prepared(s, &rollback_data);
 	set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_end_lsn);
 
@@ -1277,6 +1310,8 @@ apply_handle_stream_prepare(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1304,6 +1339,8 @@ apply_handle_stream_prepare(StringInfo s)
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset,
 								   prepare_data.xid, prepare_data.prepare_lsn);
 
+			FinishMultiInserts();
+
 			/* Mark the transaction as prepared. */
 			apply_handle_prepare_internal(&prepare_data);
 
@@ -1407,6 +1444,8 @@ apply_handle_stream_prepare(StringInfo s)
 static void
 apply_handle_origin(StringInfo s)
 {
+	FinishMultiInserts();
+
 	/*
 	 * ORIGIN message can only come inside streaming transaction or inside
 	 * remote transaction and before any actual writes.
@@ -1473,6 +1512,8 @@ apply_handle_stream_start(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1628,6 +1669,8 @@ apply_handle_stream_stop(StringInfo s)
 	ParallelApplyWorkerInfo *winfo;
 	TransApplyAction apply_action;
 
+	FinishMultiInserts();
+
 	if (!in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1821,6 +1864,8 @@ apply_handle_stream_abort(StringInfo s)
 	StringInfoData original_msg = *s;
 	bool		toplevel_xact;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2138,6 +2183,8 @@ apply_handle_stream_commit(StringInfo s)
 	/* Save the message before it is consumed. */
 	StringInfoData original_msg = *s;
 
+	FinishMultiInserts();
+
 	if (in_streamed_transaction)
 		ereport(ERROR,
 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -2159,6 +2206,8 @@ apply_handle_stream_commit(StringInfo s)
 			apply_spooled_messages(MyLogicalRepWorker->stream_fileset, xid,
 								   commit_data.commit_lsn);
 
+			FinishMultiInserts();
+
 			apply_handle_commit_internal(&commit_data);
 
 			/* Unlink the files with serialized changes and subxact info. */
@@ -2302,6 +2351,8 @@ apply_handle_relation(StringInfo s)
 {
 	LogicalRepRelation *rel;
 
+	FinishMultiInserts();
+
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_RELATION, s))
 		return;
 
@@ -2325,6 +2376,8 @@ apply_handle_type(StringInfo s)
 {
 	LogicalRepTyp typ;
 
+	FinishMultiInserts();
+
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_TYPE, s))
 		return;
 
@@ -2363,16 +2416,126 @@ TargetPrivilegesCheck(Relation rel, AclMode mode)
 						RelationGetRelationName(rel))));
 }
 
-/*
- * Handle INSERT message.
- */
+static void
+FinishMultiInserts(void)
+{
+	LogicalRepMsgType saved_command;
+
+	if (MultiInsertState == NULL)
+		return;
+
+	Assert(OidIsValid(LastMultiInsertRelId));
+	Assert(LastEData != NULL);
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = LastRel;
+
+	/* Set current command for error callback */
+	saved_command = apply_error_callback_arg.command;
+	apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT;
+
+	ExecDropSingleTupleTableSlot(LastRemoteSlot);
+	LastRemoteSlot = NULL;
+
+	table_modify_end(MultiInsertState);
+	MultiInsertState = NULL;
+	LastMultiInsertRelId = InvalidOid;
+
+	pfree(modify_buffer_flush_context);
+	modify_buffer_flush_context = NULL;
+
+	ExecCloseIndices(LastEData->targetRelInfo);
+
+	finish_edata(LastEData);
+	LastEData = NULL;
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	/* Reset the current command */
+	apply_error_callback_arg.command = saved_command;
+
+	logicalrep_rel_close(LastRel, NoLock);
+	LastRel = NULL;
+
+	end_replication_step();
+}
 
 static void
-apply_handle_insert(StringInfo s)
+LRModifyBufferFlushCallback(void *context, TupleTableSlot *slot)
+{
+	LRModifyBufferFlushContext *ctx = (LRModifyBufferFlushContext *) context;
+	ResultRelInfo *resultRelInfo = ctx->resultRelInfo;
+	EState	   *estate = ctx->estate;
+	LogicalRepMsgType saved_command;
+
+	/* Quick exit if no indexes or no triggers */
+	if (!(resultRelInfo->ri_NumIndices > 0 ||
+		  (resultRelInfo->ri_TrigDesc != NULL &&
+		   (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			resultRelInfo->ri_TrigDesc->trig_insert_new_table))))
+		return;
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = LastRel;
+
+	/* Set current command for error callback */
+	saved_command = apply_error_callback_arg.command;
+	apply_error_callback_arg.command = LOGICAL_REP_MSG_INSERT;
+
+	/* Caller must take care of opening and closing the indices */
+
+	/*
+	 * If there are any indexes, update them for all the inserted tuples, and
+	 * run AFTER ROW INSERT triggers.
+	 */
+	if (resultRelInfo->ri_NumIndices > 0)
+	{
+		List	   *recheckIndexes;
+
+		recheckIndexes =
+			ExecInsertIndexTuples(resultRelInfo,
+								  slot, estate, false,
+								  false, NULL, NIL, false);
+
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, recheckIndexes,
+							 NULL);
+
+		list_free(recheckIndexes);
+	}
+
+	/*
+	 * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+	 * anyway.
+	 */
+	else if (resultRelInfo->ri_TrigDesc != NULL &&
+			 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+			  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+	{
+		ExecARInsertTriggers(estate, resultRelInfo,
+							 slot, NIL,
+							 NULL);
+	}
+
+	/*
+	 * XXX we should in theory pass a TransitionCaptureState object to the
+	 * above to capture transition tuples, but after statement triggers don't
+	 * actually get fired by replication yet anyway
+	 */
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	/* Reset the current command */
+	apply_error_callback_arg.command = saved_command;
+}
+
+static LRMultiInsertReturnStatus
+do_multi_inserts(StringInfo s, LogicalRepRelId *relid)
 {
 	LogicalRepRelMapEntry *rel;
 	LogicalRepTupleData newtup;
-	LogicalRepRelId relid;
 	UserContext ucxt;
 	ApplyExecutionData *edata;
 	EState	   *estate;
@@ -2380,17 +2543,143 @@ apply_handle_insert(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	if (MultiInsertState == NULL)
+		begin_replication_step();
+
+	*relid = logicalrep_read_relid(s);
+
+	if (MultiInsertState != NULL &&
+		(LastMultiInsertRelId != InvalidOid &&
+		 *relid != InvalidOid &&
+		 LastMultiInsertRelId != *relid))
+		FinishMultiInserts();
+
+	if (MultiInsertState == NULL)
+		rel = logicalrep_rel_open(*relid, RowExclusiveLock);
+	else
+		rel = LastRel;
+
+	if (!should_apply_changes_for_rel(rel))
+	{
+		Assert(MultiInsertState == NULL);
+
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return LR_MULTI_INSERT_REL_SKIPPED;
+	}
+
+	/* For a partitioned table, let's not do multi inserts. */
+	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		Assert(MultiInsertState == NULL);
+
+		/*
+		 * The relation can't become interesting in the middle of the
+		 * transaction so it's safe to unlock it.
+		 */
+		logicalrep_rel_close(rel, RowExclusiveLock);
+		end_replication_step();
+		return LR_MULTI_INSERT_DISALLOWED;
+	}
+
 	/*
-	 * Quick return if we are skipping data modification changes or handling
-	 * streamed transactions.
+	 * Make sure that any user-supplied code runs as the table owner, unless
+	 * the user has opted out of that behavior.
 	 */
-	if (is_skipping_changes() ||
-		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
-		return;
+	run_as_owner = MySubscription->runasowner;
+	if (!run_as_owner)
+		SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+
+	/* Set relation for error callback */
+	apply_error_callback_arg.rel = rel;
+
+	if (MultiInsertState == NULL)
+	{
+		oldctx = MemoryContextSwitchTo(TopTransactionContext);
+
+		/* Initialize the executor state. */
+		LastEData = edata = create_edata_for_relation(rel);
+		estate = edata->estate;
+
+		LastRemoteSlot = remoteslot = MakeTupleTableSlot(RelationGetDescr(rel->localrel),
+														 &TTSOpsVirtual);
+
+		modify_buffer_flush_context = (LRModifyBufferFlushContext *) palloc(sizeof(LRModifyBufferFlushContext));
+		modify_buffer_flush_context->resultRelInfo = edata->targetRelInfo;
+		modify_buffer_flush_context->estate = estate;
+
+		MultiInsertState = table_modify_begin(edata->targetRelInfo->ri_RelationDesc,
+											  TM_FLAG_MULTI_INSERTS |
+											  TM_FLAG_BAS_BULKWRITE,
+											  GetCurrentCommandId(true),
+											  0,
+											  LRModifyBufferFlushCallback,
+											  modify_buffer_flush_context);
+		LastRel = rel;
+		LastMultiInsertRelId = *relid;
+
+		/* We must open indexes here. */
+		ExecOpenIndices(edata->targetRelInfo, false);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+	else
+	{
+		CommandId	cid;
+
+		edata = LastEData;
+		estate = edata->estate;
+		ResetExprContext(GetPerTupleExprContext(estate));
+		ExecClearTuple(LastRemoteSlot);
+		remoteslot = LastRemoteSlot;
+		cid = GetCurrentCommandId(true);
+		MultiInsertState->cid = cid;
+		estate->es_output_cid = cid;
+	}
+
+	/* Process and store remote tuple in the slot */
+	logicalrep_read_insert_v2(s, &newtup);
+	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+	slot_store_data(remoteslot, rel, &newtup);
+	slot_fill_defaults(rel, estate, remoteslot);
+	MemoryContextSwitchTo(oldctx);
+
+	TargetPrivilegesCheck(edata->targetRelInfo->ri_RelationDesc, ACL_INSERT);
+	ExecRelationMultiInsert(MultiInsertState, edata->targetRelInfo, estate, remoteslot);
+
+	/* Reset relation for error callback */
+	apply_error_callback_arg.rel = NULL;
+
+	if (!run_as_owner)
+		RestoreUserContext(&ucxt);
+
+	Assert(MultiInsertState != NULL);
+
+	CommandCounterIncrement();
+
+	return LR_MULTI_INSERT_DONE;
+}
+
+static bool
+do_single_inserts(StringInfo s, LogicalRepRelId relid)
+{
+	LogicalRepRelMapEntry *rel;
+	LogicalRepTupleData newtup;
+	UserContext ucxt;
+	ApplyExecutionData *edata;
+	EState	   *estate;
+	TupleTableSlot *remoteslot;
+	MemoryContext oldctx;
+	bool		run_as_owner;
+
+	Assert(relid != InvalidOid);
 
 	begin_replication_step();
 
-	relid = logicalrep_read_insert(s, &newtup);
 	rel = logicalrep_rel_open(relid, RowExclusiveLock);
 	if (!should_apply_changes_for_rel(rel))
 	{
@@ -2400,7 +2689,7 @@ apply_handle_insert(StringInfo s)
 		 */
 		logicalrep_rel_close(rel, RowExclusiveLock);
 		end_replication_step();
-		return;
+		return false;
 	}
 
 	/*
@@ -2422,6 +2711,7 @@ apply_handle_insert(StringInfo s)
 										&TTSOpsVirtual);
 
 	/* Process and store remote tuple in the slot */
+	logicalrep_read_insert_v2(s, &newtup);
 	oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
 	slot_store_data(remoteslot, rel, &newtup);
 	slot_fill_defaults(rel, estate, remoteslot);
@@ -2446,6 +2736,35 @@ apply_handle_insert(StringInfo s)
 	logicalrep_rel_close(rel, NoLock);
 
 	end_replication_step();
+
+	return true;
+}
+
+/*
+ * Handle INSERT message.
+ */
+static void
+apply_handle_insert(StringInfo s)
+{
+	LRMultiInsertReturnStatus mi_status;
+	LogicalRepRelId relid;
+
+	/*
+	 * Quick return if we are skipping data modification changes or handling
+	 * streamed transactions.
+	 */
+	if (is_skipping_changes() ||
+		handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
+		return;
+
+	mi_status = do_multi_inserts(s, &relid);
+	if (mi_status == LR_MULTI_INSERT_REL_SKIPPED ||
+		mi_status == LR_MULTI_INSERT_DONE)
+		return;
+
+	do_single_inserts(s, relid);
+
+	return;
 }
 
 /*
@@ -2532,6 +2851,8 @@ apply_handle_update(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -2713,6 +3034,8 @@ apply_handle_delete(StringInfo s)
 	MemoryContext oldctx;
 	bool		run_as_owner;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
@@ -3154,6 +3477,8 @@ apply_handle_truncate(StringInfo s)
 	ListCell   *lc;
 	LOCKMODE	lockmode = AccessExclusiveLock;
 
+	FinishMultiInserts();
+
 	/*
 	 * Quick return if we are skipping data modification changes or handling
 	 * streamed transactions.
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 9770752ea3..8f10ea977b 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -14,6 +14,7 @@
 #ifndef EXECUTOR_H
 #define EXECUTOR_H
 
+#include "access/tableam.h"
 #include "executor/execdesc.h"
 #include "fmgr.h"
 #include "nodes/lockoptions.h"
@@ -656,6 +657,9 @@ extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
 
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
 									 EState *estate, TupleTableSlot *slot);
+extern void ExecRelationMultiInsert(TableModifyState *MultiInsertState,
+									ResultRelInfo *resultRelInfo,
+									EState *estate, TupleTableSlot *slot);
 extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 									 EState *estate, EPQState *epqstate,
 									 TupleTableSlot *searchslot, TupleTableSlot *slot);
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index c409638a2e..3f3a7f0a31 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -226,6 +226,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									Relation rel,
 									TupleTableSlot *newslot,
 									bool binary, Bitmapset *columns);
+extern LogicalRepRelId logicalrep_read_relid(StringInfo in);
+extern void logicalrep_read_insert_v2(StringInfo in, LogicalRepTupleData *newtup);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
 									Relation rel,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5e3e900cb8..8463343325 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1468,6 +1468,8 @@ LPTHREAD_START_ROUTINE
 LPTSTR
 LPVOID
 LPWSTR
+LRModifyBufferFlushContext
+LRMultiInsertReturnStatus
 LSEG
 LUID
 LVRelState
-- 
2.34.1

