From 4a8e0913d003928bd4726ed4da2ab15bf09cecb3 Mon Sep 17 00:00:00 2001
From: Osumi Takamichi <osumi.takamichi@fujitsu.com>
Date: Tue, 9 Nov 2021 11:08:01 +0000
Subject: [PATCH v9] Extend pg_stat_subscription_workers to include general transaction statistics

Categorize transactions of logical replication subscriber
into three types (commit, error, abort) and introduce
cumulative columns of those numbers and amounts of
consumed data during message apply respectively.

One scenario to utilize those columns is to suppress
unnecessary network bandwidth, when streaming transaction aborted
more than expected is observed by the column of abort_count
or its bytes column.

The calculation of consumed resources by subscriber is computed
based on the data structure for message apply, which is different
from that of publisher's decoding processing.

Stats of prepared transaction becomes persistent to conclude
the appropriate category for the transaction at
either commit prepared or rollback prepared time.

Also, streaming transactions running in parallel on publisher
can cause data blocks in unpredictable order. Managing each
streaming tranaction by xid makes it possible to handle
stream abort, stream commit and stream prepare properly.

Author: Takamichi Osumi
Discussed & Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow, Vignesh C, Ajin Cherian
Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com
---
 doc/src/sgml/monitoring.sgml                     |  95 +++++--
 src/backend/catalog/system_views.sql             |  18 +-
 src/backend/executor/execPartition.c             |  10 +
 src/backend/postmaster/pgstat.c                  | 294 ++++++++++++++++++++++
 src/backend/replication/logical/tablesync.c      |   6 +
 src/backend/replication/logical/worker.c         | 304 +++++++++++++++++++++++
 src/backend/utils/adt/pgstatfuncs.c              |  44 +++-
 src/include/catalog/pg_proc.dat                  |   6 +-
 src/include/executor/execPartition.h             |   1 +
 src/include/pgstat.h                             |  90 +++++++
 src/include/replication/logicalworker.h          |   5 +
 src/test/regress/expected/rules.out              |  16 +-
 src/test/subscription/t/026_error_report.pl      |   8 +-
 src/test/subscription/t/027_worker_xact_stats.pl | 173 +++++++++++++
 src/tools/pgindent/typedefs.list                 |   6 +
 15 files changed, 1032 insertions(+), 44 deletions(-)
 create mode 100644 src/test/subscription/t/027_worker_xact_stats.pl

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 7a4a1a2..3a35efa 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -629,7 +629,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_workers</structname><indexterm><primary>pg_stat_subscription_workers</primary></indexterm></entry>
-      <entry>At least one row per subscription, showing about errors that
+      <entry>At least one row per subscription, showing transaction statistics and information about errors that
       occurred on subscription.
       See <link linkend="monitoring-pg-stat-subscription-workers">
       <structname>pg_stat_subscription_workers</structname></link> for details.
@@ -3052,9 +3052,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
   <para>
    The <structname>pg_stat_subscription_workers</structname> view will contain
-   one row per subscription error reported by workers applying logical
-   replication changes and workers handling the initial data copy of the
-   subscribed tables.
+   one row per subscription, showing corresponding transaction statistics and
+   information about the last error reported by workers applying logical replication
+   changes or by workers handling the initial data copy of the subscribed tables.
+   The statistics of transaction size is utilized only by the apply worker.
   </para>
 
   <table id="pg-stat-subscription-workers" xreflabel="pg_stat_subscription_workers">
@@ -3102,20 +3103,86 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>relid</structfield> <type>oid</type>
+       <structfield>commit_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions successfully applied in this subscription.
+       COMMIT, COMMIT of streaming transaction and COMMIT PREPARED increments
+       this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>commit_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of data (in bytes) successfully applied in this subscription,
+       across <literal>commit_count</literal> transactions.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions that failed to be applied by the table
+       sync worker or main apply worker in this subscription.
+       </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>error_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of data (in bytes) unsuccessfully applied in this subscription,
+       across <literal>error_count</literal> transactions.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>abort_count</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of transactions aborted in this subscription.
+       ROLLBACK PREPARED and abort of streaming transaction
+       increments this counter.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>abort_bytes</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Amount of data (in bytes) aborted in this subscription,
+       across <literal>abort_count</literal> transactions.
+       In order to suppress unnecessary consumed network bandwidth,
+       increase <literal>logical_decoding_work_mem</literal> on the publisher
+       so that it exceeds the size of the whole streamed transaction, if
+       an unexpected amount of streamed transactions are aborted.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>last_error_relid</structfield> <type>oid</type>
       </para>
       <para>
        OID of the relation that the worker was processing when the
-       error occurred
+       last error occurred
       </para></entry>
      </row>
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>command</structfield> <type>text</type>
+       <structfield>last_error_command</structfield> <type>text</type>
       </para>
       <para>
-       Name of command being applied when the error occurred.  This field
+       Name of last command being applied when the error occurred.  This field
        is always NULL if the error was reported during the initial data
        copy.
       </para></entry>
@@ -3123,10 +3190,10 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>xid</structfield> <type>xid</type>
+       <structfield>last_error_xid</structfield> <type>xid</type>
       </para>
       <para>
-       Transaction ID of the publisher node being applied when the error
+       Transaction ID of the publisher node being applied when the last error
        occurred.  This field is always NULL if the error was reported
        during the initial data copy.
       </para></entry>
@@ -3134,19 +3201,19 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>error_count</structfield> <type>uint8</type>
+       <structfield>last_error_count</structfield> <type>uint8</type>
       </para>
       <para>
-       Number of consecutive times the error occurred
+       Number of consecutive times the last error occurred
       </para></entry>
      </row>
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>error_message</structfield> <type>text</type>
+       <structfield>last_error_message</structfield> <type>text</type>
       </para>
       <para>
-       The error message
+       The last error message
       </para></entry>
      </row>
 
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index fece48a..9e0cfce 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1267,12 +1267,18 @@ CREATE VIEW pg_stat_subscription_workers AS
         w.subid,
         s.subname,
         w.subrelid,
-        w.relid,
-        w.command,
-        w.xid,
-        w.error_count,
-        w.error_message,
-        w.last_error_time
+	w.commit_count,
+	w.commit_bytes,
+	w.error_count,
+	w.error_bytes,
+	w.abort_count,
+	w.abort_bytes,
+	w.last_error_relid,
+	w.last_error_command,
+	w.last_error_xid,
+	w.last_error_count,
+	w.last_error_message,
+	w.last_error_time
     FROM (SELECT
               oid as subid,
               NULL as relid
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 5c723bc..20184c6 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -192,6 +192,16 @@ static void find_matching_subplans_recurse(PartitionPruningData *prunedata,
 										   bool initial_prune,
 										   Bitmapset **validsubplans);
 
+/*
+ * PartitionTupleRoutingSize - exported to calculate total data size
+ * of logical replication mesage apply, because this is one of the
+ * ApplyExecutionData struct members.
+ */
+size_t
+PartitionTupleRoutingSize(void)
+{
+	return sizeof(PartitionTupleRouting);
+}
 
 /*
  * ExecSetupPartitionTupleRouting - sets up information needed during
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index d944361..09c1662 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -54,6 +54,7 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
+#include "replication/logicalworker.h"
 #include "replication/slot.h"
 #include "replication/walsender.h"
 #include "storage/backendid.h"
@@ -287,6 +288,15 @@ static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
 static HTAB *replSlotStatHash = NULL;
 
 /*
+ * Stats of prepared transactions should be displayed
+ * at either commit prepared or rollback prepared time, even when it's
+ * after the server restart. We have the apply worker send those statistics
+ * to the stats collector at prepare time and the startup process restore
+ * those at restart if necessary.
+ */
+static HTAB *subWorkerPreparedXactSizeHash = NULL;
+
+/*
  * List of OIDs of databases we need to write out.  If an entry is InvalidOid,
  * it means to write only the shared-catalog stats ("DB 0"); otherwise, we
  * will write both that DB's data and the shared stats.
@@ -326,6 +336,9 @@ static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry,
 static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry,
 															 Oid subid, Oid subrelid,
 															 bool create);
+static PgStat_StatSubWorkerPreparedXactSize *pgstat_get_subworker_prepared_txn(Oid subid,
+																			   char *gid, bool create);
+
 static void pgstat_write_statsfiles(bool permanent, bool allDbs);
 static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent);
 static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep);
@@ -385,6 +398,9 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
 static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
 static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len);
 static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len);
+static void pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len);
+static void pgstat_recv_subworker_twophase_xact(PgStat_MsgSubWorkerTwophaseXact *msg, int len);
+
 
 /* ------------------------------------------------------------
  * Public functions called from postmaster follow
@@ -1965,6 +1981,62 @@ pgstat_report_replslot_drop(const char *slotname)
 }
 
 /* ----------
+ * pgstat_report_subworker_xact_end() -
+ *
+ *  Tell the collector that worker transaction has successfully completed.
+ * ----------
+ */
+void
+pgstat_report_subworker_xact_end(Oid subid, Oid subrel,
+								 LogicalRepMsgType command, PgStat_Counter xact_size)
+{
+	PgStat_MsgSubWorkerXactEnd msg;
+
+	Assert(command == 0 /* table sync worker */ ||
+		   command == LOGICAL_REP_MSG_COMMIT ||
+		   command == LOGICAL_REP_MSG_STREAM_ABORT ||
+		   command == LOGICAL_REP_MSG_STREAM_COMMIT);
+
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERXACTEND);
+	msg.m_databaseid = MyDatabaseId;
+	msg.m_subid = subid;
+	msg.m_subrelid = subrel;
+	msg.m_command = command;
+	msg.m_xact_bytes = xact_size;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerXactEnd));
+
+	reset_apply_error_context_xact_size();
+}
+
+/* ----------
+ * pgstat_report_subworker_twophase_xact() -
+ *
+ *  Tell the collector that worker transaction has done 2PC related operation.
+ * ----------
+ */
+void
+pgstat_report_subworker_twophase_xact(Oid subid, LogicalRepMsgType command,
+									  PgStat_Counter xact_size, char *gid)
+{
+	PgStat_MsgSubWorkerTwophaseXact msg;
+
+	Assert(command == LOGICAL_REP_MSG_PREPARE ||
+		   command == LOGICAL_REP_MSG_STREAM_PREPARE ||
+		   command == LOGICAL_REP_MSG_COMMIT_PREPARED ||
+		   command == LOGICAL_REP_MSG_ROLLBACK_PREPARED);
+
+	/* setup the message */
+	pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT);
+	msg.m_databaseid = MyDatabaseId;
+	msg.m_subid = subid;
+	msg.m_command = command;
+	strlcpy(msg.m_gid, gid, sizeof(msg.m_gid));
+	msg.m_xact_bytes = xact_size;
+	pgstat_send(&msg, sizeof(PgStat_MsgSubWorkerTwophaseXact));
+	reset_apply_error_context_xact_size();
+}
+
+/* ----------
  * pgstat_report_subworker_error() -
  *
  *	Tell the collector about the subscription worker error.
@@ -1985,6 +2057,7 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 	msg.m_databaseid = MyDatabaseId;
 	msg.m_subid = subid;
 	msg.m_subrelid = subrelid;
+	msg.m_xact_error_bytes = get_apply_error_context_xact_size();
 	msg.m_relid = relid;
 	msg.m_command = command;
 	msg.m_xid = xid;
@@ -1992,6 +2065,8 @@ pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 	strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN);
 
 	pgstat_send(&msg, len);
+
+	reset_apply_error_context_xact_size();
 }
 
 /* ----------
@@ -3750,6 +3825,14 @@ PgstatCollectorMain(int argc, char *argv[])
 					pgstat_recv_subworker_error(&msg.msg_subworkererror, len);
 					break;
 
+				case PGSTAT_MTYPE_SUBWORKERXACTEND:
+					pgstat_recv_subworker_xact_end(&msg.msg_subworkerxactend, len);
+					break;
+
+				case PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT:
+					pgstat_recv_subworker_twophase_xact(&msg.msg_subworkertwophasexact, len);
+					break;
+
 				default:
 					break;
 			}
@@ -4058,6 +4141,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
 	}
 
 	/*
+	 * Write subscription worker's prepared transaction struct
+	 */
+	if (subWorkerPreparedXactSizeHash)
+	{
+		PgStat_StatSubWorkerPreparedXactSize *prepared_size;
+
+		hash_seq_init(&hstat, subWorkerPreparedXactSizeHash);
+		while((prepared_size = (PgStat_StatSubWorkerPreparedXactSize *) hash_seq_search(&hstat)) != NULL)
+		{
+			fputc('P', fpout);
+			rc = fwrite(prepared_size, sizeof(PgStat_StatSubWorkerPreparedXactSize), 1, fpout);
+			(void) rc; 			/* we'll check for error with ferror */
+		}
+	}
+
+	/*
 	 * No more output to be done. Close the temp file and replace the old
 	 * pgstat.stat with it.  The ferror() check replaces testing for error
 	 * after each individual fputc or fwrite above.
@@ -4541,6 +4640,40 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
 					break;
 				}
 
+			case 'P':
+				{
+					PgStat_StatSubWorkerPreparedXactSize buff;
+					PgStat_StatSubWorkerPreparedXactSize *prepared_xact_size;
+
+					if (fread(&buff, 1, sizeof(PgStat_StatSubWorkerPreparedXactSize),
+							  fpin) != sizeof(PgStat_StatSubWorkerPreparedXactSize))
+					{
+						ereport(pgStatRunningInCollector ? LOG : WARNING,
+								(errmsg("corrupted statistics file \"%s\"",
+										statfile)));
+						goto done;
+					}
+
+					if (subWorkerPreparedXactSizeHash == NULL)
+					{
+						HASHCTL		hash_ctl;
+
+						hash_ctl.keysize = sizeof(PgStat_StatSubWorkerPreparedXact);
+						hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerPreparedXactSize);
+						hash_ctl.hcxt = pgStatLocalContext;
+						subWorkerPreparedXactSizeHash = hash_create("Subscription worker stats of prepared txn",
+														PGSTAT_SUBWORKER_HASH_SIZE,
+														&hash_ctl,
+														HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+					}
+
+					prepared_xact_size =
+						(PgStat_StatSubWorkerPreparedXactSize *) hash_search(subWorkerPreparedXactSizeHash,
+																			 (void *) &buff.key,
+																			 HASH_ENTER, NULL);
+					memcpy(prepared_xact_size, &buff, sizeof(PgStat_StatSubWorkerPreparedXactSize));
+					break;
+				}
 			case 'E':
 				goto done;
 
@@ -6148,6 +6281,112 @@ pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len)
 }
 
 /* ----------
+ * pgstat_recv_subworker_xact_end() -
+ *
+ *	Process a SUBWORKERXACTEND message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_xact_end(PgStat_MsgSubWorkerXactEnd *msg, int len)
+{
+	PgStat_StatDBEntry *dbentry;
+	PgStat_StatSubWorkerEntry *wentry;
+
+	dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+	wentry = pgstat_get_subworker_entry(dbentry, msg->m_subid,
+										msg->m_subrelid, true);
+	Assert(wentry);
+
+	/* table sync worker */
+	if (msg->m_command == 0)
+		wentry->xact_commit_count++;
+	else
+	{
+		/* apply worker */
+		switch(msg->m_command)
+		{
+			case LOGICAL_REP_MSG_COMMIT:
+			case LOGICAL_REP_MSG_STREAM_COMMIT:
+				wentry->xact_commit_count++;
+				wentry->xact_commit_bytes += msg->m_xact_bytes;
+				break;
+			case LOGICAL_REP_MSG_STREAM_ABORT:
+				wentry->xact_abort_count++;
+				wentry->xact_abort_bytes += msg->m_xact_bytes;
+				break;
+			default:
+				elog(ERROR, "unexpected logical message type as normal apply end");
+				break;
+		}
+	}
+}
+
+/* ----------
+ * pgstat_recv_subworker_twophase_xact() -
+ *
+ *	Process a SUBWORKERTWOPHASEXACT message.
+ * ----------
+ */
+static void
+pgstat_recv_subworker_twophase_xact(PgStat_MsgSubWorkerTwophaseXact *msg, int len)
+{
+	PgStat_StatSubWorkerPreparedXactSize *prepared_txn;
+	PgStat_StatDBEntry *dbentry;
+	PgStat_StatSubWorkerEntry *wentry;
+	PgStat_StatSubWorkerPreparedXact key;
+
+	prepared_txn = pgstat_get_subworker_prepared_txn(msg->m_subid,
+													 msg->m_gid, true);
+	Assert(prepared_txn);
+	switch(msg->m_command)
+	{
+		case LOGICAL_REP_MSG_PREPARE:
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+			/*
+			 * Make each size of prepared transaction persistent
+			 * so that we can update stats over the server restart
+			 * and make prepared stats updated when commit prepared
+			 * or rollback prepared arrives.
+			 */
+			prepared_txn->subid = msg->m_subid;
+			strlcpy(prepared_txn->gid, msg->m_gid, sizeof(prepared_txn->gid));
+			prepared_txn->xact_size = msg->m_xact_bytes;
+			break;
+
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+			/* Update exported xact stats now */
+			dbentry = pgstat_get_db_entry(msg->m_databaseid, true);
+			wentry = pgstat_get_subworker_entry(dbentry,
+												msg->m_subid,
+												InvalidOid /* apply worker */,
+												true);
+			Assert(wentry);
+			if (msg->m_command == LOGICAL_REP_MSG_COMMIT_PREPARED)
+			{
+				wentry->xact_commit_count++;
+				wentry->xact_commit_bytes += prepared_txn->xact_size;
+			}
+			else
+			{
+				wentry->xact_abort_count++;
+				wentry->xact_abort_bytes += prepared_txn->xact_size;
+			}
+
+			/* Clean up this gid from transaction size hash */
+			key.subid = prepared_txn->subid;
+			strlcpy(key.gid, msg->m_gid, sizeof(key.gid));
+			(void) hash_search(subWorkerPreparedXactSizeHash,
+							   (void *) &key, HASH_REMOVE, NULL);
+			break;
+
+		default:
+			elog(ERROR, "unexpected logical message type as prepare transaction");
+			break;
+	}
+}
+
+/* ----------
  * pgstat_recv_subworker_error() -
  *
  *	Process a SUBWORKERERROR message.
@@ -6166,6 +6405,10 @@ pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len)
 										   msg->m_subrelid, true);
 	Assert(subwentry);
 
+	/* general transaction stats for error */
+	subwentry->xact_error_count++;
+	subwentry->xact_error_bytes += msg->m_xact_error_bytes;
+
 	/*
 	 * Update only the counter and timestamp if we received the same error
 	 * again
@@ -6335,6 +6578,12 @@ pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid,
 	/* If not found, initialize the new one */
 	if (create && !found)
 	{
+		subwentry->xact_commit_count = 0;
+		subwentry->xact_commit_bytes = 0;
+		subwentry->xact_error_count = 0;
+		subwentry->xact_error_bytes = 0;
+		subwentry->xact_abort_count = 0;
+		subwentry->xact_abort_bytes = 0;
 		subwentry->relid = InvalidOid;
 		subwentry->command = 0;
 		subwentry->xid = InvalidTransactionId;
@@ -6451,3 +6700,48 @@ pgstat_count_slru_truncate(int slru_idx)
 {
 	slru_entry(slru_idx)->m_truncate += 1;
 }
+
+ /* ----------
+ * pgstat_get_subworker_prepared_txn
+ *
+ * Return subscription worker entry with the given subscription OID and
+ * gid.
+ * ----------
+ */
+static PgStat_StatSubWorkerPreparedXactSize*
+pgstat_get_subworker_prepared_txn(Oid subid, char *gid, bool create)
+{
+	PgStat_StatSubWorkerPreparedXact key;
+	PgStat_StatSubWorkerPreparedXactSize *prepared_txn_size;
+	HASHACTION	action;
+	bool		found;
+
+	if (subWorkerPreparedXactSizeHash == NULL)
+	{
+		HASHCTL		hash_ctl;
+
+		hash_ctl.keysize = sizeof(PgStat_StatSubWorkerPreparedXact);
+		hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerPreparedXactSize);
+		hash_ctl.hcxt = pgStatLocalContext;
+		subWorkerPreparedXactSizeHash = hash_create("Subscription worker stats of prepared txn",
+													PGSTAT_SUBWORKER_HASH_SIZE,
+													&hash_ctl,
+													HASH_ELEM | HASH_STRINGS);
+	}
+
+	key.subid = subid;
+	memcpy(key.gid, gid, sizeof(key.gid));
+	action = (create ? HASH_ENTER : HASH_FIND);
+	prepared_txn_size = (PgStat_StatSubWorkerPreparedXactSize *) hash_search(subWorkerPreparedXactSizeHash,
+																			 (void *) &key,
+																			 action, &found);
+
+	if (create && !found)
+	{
+		prepared_txn_size->subid = 0;
+		prepared_txn_size->gid[0] = '\0';
+		prepared_txn_size->xact_size = 0;
+	}
+
+	return prepared_txn_size;
+}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f07983a..aff78fd 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1149,6 +1149,12 @@ copy_table_done:
 	MyLogicalRepWorker->relstate_lsn = *origin_startpos;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
+	/* Report the success of table sync. */
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 MyLogicalRepWorker->relid,
+									 0 /* no logical message type */,
+									 0 /* xact size */);
+
 	/*
 	 * Finally, wait until the main apply worker tells us to catch up and then
 	 * return to let LogicalRepApplyLoop do it.
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e2a929b..db7abfc 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -221,21 +221,65 @@ typedef struct ApplyErrorCallbackArg
 	LogicalRepMsgType command;	/* 0 if invalid */
 	LogicalRepRelMapEntry *rel;
 
+	/*
+	 * Store data size of this transaction.
+	 *
+	 * The byte size of transaction on the publisher is calculated
+	 * by ReorderBufferChangeSize() based on the ReorderBufferChange
+	 * structure. But on the subscriber, consumed resources are
+	 * not same as the publisher's decoding processsing and required
+	 * to be computed in different way. Therefore, the exact same byte
+	 * size is not restored on the subscriber usually.
+	 *
+	 * Data size of streaming transactions is managed by streamingXactSize
+	 * for flexible data blocks handling.
+	 */
+	PgStat_Counter bytes;
+
 	/* Remote node information */
 	int			remote_attnum;	/* -1 if invalid */
 	TransactionId remote_xid;
 	TimestampTz ts;				/* commit, rollback, or prepare timestamp */
 } ApplyErrorCallbackArg;
 
+/* Struct to indicate extra consumption of transaction size */
+typedef union ApplyTxnExtraData
+{
+	LogicalRepRelMapEntry *relmapentry;
+	LogicalRepRelation *reprelation;
+	int         *stream_write_len;
+} ApplyTxnExtraData;
+
 static ApplyErrorCallbackArg apply_error_callback_arg =
 {
 	.command = 0,
 	.rel = NULL,
+	.bytes = 0,
 	.remote_attnum = -1,
 	.remote_xid = InvalidTransactionId,
 	.ts = 0,
 };
 
+/*
+ * Two or more streaming transactions in parallel on the publisher
+ * generate unexpected order of partial txn data demarcated stream start
+ * and stream stop to the subscriber, since whenever its size of one of
+ * the txns reaches the publisher's logical_decoding_work_mem,
+ * the part (and in the end, last remaining changes) is streamed.
+ * This creates mixed blocks of streaming data while
+ * there's possibility some are successfully committed but others are
+ * not by stream abort. Therefore, to track correct byte size
+ * it's necessary to trace each streaming transaction by making pair
+ * of xid and transaction size.
+ */
+#define PARALLEL_STREAMING_XACTS 32
+typedef struct XactSizeEntry
+{
+	TransactionId	key;
+	PgStat_Counter	xact_size;
+} XactSizeEntry;
+static HTAB *streamingXactSize = NULL;
+
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
@@ -304,6 +348,9 @@ static void maybe_reread_subscription(void);
 /* prototype needed because of stream_commit */
 static void apply_dispatch(StringInfo s);
 
+static void update_apply_change_size(LogicalRepMsgType action,
+									 ApplyTxnExtraData *extra_data);
+
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
@@ -818,6 +865,11 @@ apply_handle_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 InvalidOid,
+									 LOGICAL_REP_MSG_COMMIT,
+									 get_apply_error_context_xact_size());
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -922,6 +974,11 @@ apply_handle_prepare(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_PREPARE,
+										  get_apply_error_context_xact_size(),
+										  prepare_data.gid);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -963,6 +1020,13 @@ apply_handle_commit_prepared(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	update_apply_change_size(LOGICAL_REP_MSG_COMMIT_PREPARED, NULL);
+
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_COMMIT_PREPARED,
+										  get_apply_error_context_xact_size(),
+										  prepare_data.gid);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1014,6 +1078,12 @@ apply_handle_rollback_prepared(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(rollback_data.rollback_end_lsn);
 
+	/* send rollback prepared message for this gid */
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_ROLLBACK_PREPARED,
+										  get_apply_error_context_xact_size(),
+										  rollback_data.gid);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 	reset_apply_error_context_info();
 }
@@ -1029,6 +1099,7 @@ static void
 apply_handle_stream_prepare(StringInfo s)
 {
 	LogicalRepPreparedTxnData prepare_data;
+	XactSizeEntry *streamed_entry;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1066,6 +1137,19 @@ apply_handle_stream_prepare(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(prepare_data.end_lsn);
 
+	/*
+	 * Report the prepared streaming xact size to the stats collector
+	 * in a prepared xact manner to make it survive over the restart.
+	 */
+	streamed_entry = hash_search(streamingXactSize,
+								 (void *) &prepare_data.xid,
+								 HASH_FIND, NULL);
+	Assert(streamed_entry);
+	pgstat_report_subworker_twophase_xact(MyLogicalRepWorker->subid,
+										  LOGICAL_REP_MSG_STREAM_PREPARE,
+										  streamed_entry->xact_size,
+										  prepare_data.gid);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -1143,6 +1227,8 @@ apply_handle_stream_start(StringInfo s)
 		MyLogicalRepWorker->stream_fileset = palloc(sizeof(FileSet));
 		FileSetInit(MyLogicalRepWorker->stream_fileset);
 
+		update_apply_change_size(LOGICAL_REP_MSG_STREAM_START, NULL);
+
 		MemoryContextSwitchTo(oldctx);
 	}
 
@@ -1193,12 +1279,17 @@ apply_handle_stream_stop(StringInfo s)
 
 /*
  * Handle STREAM abort message.
+ *
+ * Currently, abort of streaming subtransaction does not affect
+ * size of streaming transaction resources because it has used the
+ * resources anyway.
  */
 static void
 apply_handle_stream_abort(StringInfo s)
 {
 	TransactionId xid;
 	TransactionId subxid;
+	XactSizeEntry *streamed_entry;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1213,8 +1304,36 @@ apply_handle_stream_abort(StringInfo s)
 	 */
 	if (xid == subxid)
 	{
+		bool		found = false;
 		set_apply_error_context_xact(xid, 0);
 		stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+		/*
+		 * We've completed to handle stream abort without issue, so
+		 * get ready to report the transaction stats via normal
+		 * termination route instead of the apply error route.
+		 */
+		streamed_entry = hash_search(streamingXactSize,
+									 (void *) &xid,
+									 HASH_FIND, &found);
+		/*
+		 * It's possible that we get stream abort
+		 * earlier than any call of write_stream_change that
+		 * creates one hash entry for this xid. In this case,
+		 * to find a entry with this xid fails. So just check
+		 * if we've found it. Only when we confirm some writes
+		 * by write_stream_change, report the stream_abort.
+		 */
+		if (found)
+		{
+			pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+											 InvalidOid,
+											 LOGICAL_REP_MSG_STREAM_ABORT,
+											 streamed_entry->xact_size);
+			(void) hash_search(streamingXactSize,
+							   (void *) &xid,
+							   HASH_REMOVE, NULL);
+		}
 	}
 	else
 	{
@@ -1417,6 +1536,7 @@ apply_handle_stream_commit(StringInfo s)
 {
 	TransactionId xid;
 	LogicalRepCommitData commit_data;
+	XactSizeEntry *streamed_entry;
 
 	if (in_streamed_transaction)
 		ereport(ERROR,
@@ -1438,6 +1558,19 @@ apply_handle_stream_commit(StringInfo s)
 	/* Process any tables that are being synchronized in parallel. */
 	process_syncing_tables(commit_data.end_lsn);
 
+	/* Report and clean up the xid */
+	streamed_entry = hash_search(streamingXactSize,
+								 (void *) &xid,
+								 HASH_FIND, NULL);
+	Assert(streamed_entry);
+	pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid,
+									 InvalidOid,
+									 LOGICAL_REP_MSG_STREAM_COMMIT,
+									 streamed_entry->xact_size);
+	(void) hash_search(streamingXactSize,
+					   (void *) &xid,
+					   HASH_REMOVE, NULL);
+
 	pgstat_report_activity(STATE_IDLE, NULL);
 
 	reset_apply_error_context_info();
@@ -1541,6 +1674,7 @@ apply_handle_insert(StringInfo s)
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
 	MemoryContext oldctx;
+	ApplyTxnExtraData extra_size;
 
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_INSERT, s))
 		return;
@@ -1576,6 +1710,10 @@ apply_handle_insert(StringInfo s)
 	slot_fill_defaults(rel, estate, remoteslot);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update transaction size */
+	extra_size.relmapentry = rel;
+	update_apply_change_size(LOGICAL_REP_MSG_INSERT, &extra_size);
+
 	/* For a partitioned table, insert the tuple into a partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -1667,6 +1805,7 @@ apply_handle_update(StringInfo s)
 	TupleTableSlot *remoteslot;
 	RangeTblEntry *target_rte;
 	MemoryContext oldctx;
+	ApplyTxnExtraData extra_size;
 
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_UPDATE, s))
 		return;
@@ -1733,6 +1872,10 @@ apply_handle_update(StringInfo s)
 					has_oldtup ? &oldtup : &newtup);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update transaction size */
+	extra_size.relmapentry = rel;
+	update_apply_change_size(LOGICAL_REP_MSG_UPDATE, &extra_size);
+
 	/* For a partitioned table, apply update to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -1830,6 +1973,7 @@ apply_handle_delete(StringInfo s)
 	EState	   *estate;
 	TupleTableSlot *remoteslot;
 	MemoryContext oldctx;
+	ApplyTxnExtraData extra_size;
 
 	if (handle_streamed_transaction(LOGICAL_REP_MSG_DELETE, s))
 		return;
@@ -1867,6 +2011,10 @@ apply_handle_delete(StringInfo s)
 	slot_store_data(remoteslot, rel, &oldtup);
 	MemoryContextSwitchTo(oldctx);
 
+	/* Update transaction size */
+	extra_size.relmapentry =  rel;
+	update_apply_change_size(LOGICAL_REP_MSG_DELETE, &extra_size);
+
 	/* For a partitioned table, apply delete to correct partition. */
 	if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		apply_handle_tuple_routing(edata,
@@ -2418,6 +2566,111 @@ apply_dispatch(StringInfo s)
 }
 
 /*
+ * Subscriber side implementation equivalent to ReorderBufferChangeSize
+ * of the publisher.
+ *
+ * According to the logical replication message type, record major
+ * resource consumptions of this subscription for each message.
+ * At present, do not collect data from generic functions to keep
+ * code simplicity, since the implementation complexity versus benefit
+ * tradeoff should not be good. Also, add multiple values
+ * at once in order to reduce the number of calls to this function.
+ *
+ * 'extra_data' controls detail handling of data size calculation.
+ */
+static void
+update_apply_change_size(LogicalRepMsgType action, ApplyTxnExtraData *extra_data)
+{
+	int64       size = 0;
+
+	/*
+	 * In streaming mode, stream_write_change is called
+	 * instead of immediate apply. List up the messages types
+	 * that can be caught by handle_streamed_transaction and
+	 * treat the write length as the size of transaction so
+	 * that we can export it as part of pg_stat_subscription_workers.
+	 */
+	if (in_streamed_transaction &&
+		(action == LOGICAL_REP_MSG_INSERT ||
+		 action == LOGICAL_REP_MSG_UPDATE ||
+		 action == LOGICAL_REP_MSG_DELETE ||
+		 action == LOGICAL_REP_MSG_TRUNCATE ||
+		 action == LOGICAL_REP_MSG_RELATION ||
+		 action == LOGICAL_REP_MSG_TYPE))
+	{
+		size += *extra_data->stream_write_len;
+		add_apply_error_context_xact_size(size);
+		return;
+	}
+
+	switch (action)
+	{
+		/* No special memory consumption */
+		case LOGICAL_REP_MSG_BEGIN:
+		case LOGICAL_REP_MSG_COMMIT:
+		case LOGICAL_REP_MSG_TRUNCATE:
+		case LOGICAL_REP_MSG_TYPE:
+		case LOGICAL_REP_MSG_ORIGIN:
+		case LOGICAL_REP_MSG_MESSAGE:
+		case LOGICAL_REP_MSG_STREAM_STOP:
+		case LOGICAL_REP_MSG_STREAM_ABORT:
+		case LOGICAL_REP_MSG_BEGIN_PREPARE:
+			break;
+
+		case LOGICAL_REP_MSG_INSERT:
+		case LOGICAL_REP_MSG_UPDATE:
+		case LOGICAL_REP_MSG_DELETE:
+			Assert(extra_data != NULL);
+
+			/*
+			 * Compute size based on ApplyExecutionData.
+			 * The size of LogicalRepRelMapEntry can be skipped because
+			 * it is obtained from hash_search in logicalrep_rel_open.
+			 */
+			size += sizeof(ApplyExecutionData) + sizeof(EState) +
+				sizeof(ResultRelInfo) + sizeof(ResultRelInfo);
+
+			/*
+			 * Add some extra size if the target relation is partitioned.
+			 * PartitionTupleRouting isn't exported. Therefore, call the
+			 * function that returns its size instead.
+			 */
+			if (extra_data->relmapentry->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+				size += sizeof(ModifyTableState) + PartitionTupleRoutingSize();
+			break;
+
+		case LOGICAL_REP_MSG_RELATION:
+			Assert(extra_data != NULL);
+
+			/* See logicalrep_read_attrs for the last two */
+			size += sizeof(LogicalRepRelation) +
+				extra_data->reprelation->natts * sizeof(char *) +
+				extra_data->reprelation->natts * sizeof(Oid);
+			break;
+
+		case LOGICAL_REP_MSG_STREAM_START:
+			size += sizeof(FileSet);
+			break;
+
+		case LOGICAL_REP_MSG_PREPARE:
+		case LOGICAL_REP_MSG_COMMIT_PREPARED:
+		case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
+		case LOGICAL_REP_MSG_STREAM_COMMIT:
+		case LOGICAL_REP_MSG_STREAM_PREPARE:
+			size += sizeof(FlushPosition);
+			break;
+
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+					 errmsg("invalid logical replication message type \"%c\"", action)));
+	}
+
+	/* update the total size of consumption */
+	add_apply_error_context_xact_size(size);
+}
+
+/*
  * Figure out which write/flush positions to report to the walsender process.
  *
  * We can't simply report back the last LSN the walsender sent us because the
@@ -3271,6 +3524,9 @@ static void
 stream_write_change(char action, StringInfo s)
 {
 	int			len;
+	int			total_len;
+	bool		found;
+	XactSizeEntry *streamed_entry;
 
 	Assert(in_streamed_transaction);
 	Assert(TransactionIdIsValid(stream_xid));
@@ -3289,6 +3545,16 @@ stream_write_change(char action, StringInfo s)
 	len = (s->len - s->cursor);
 
 	BufFileWrite(stream_fd, &s->data[s->cursor], len);
+
+	/* update xact size by xid */
+	total_len = (s->len - s->cursor) * 2 + sizeof(char) + sizeof(action);
+	streamed_entry = (XactSizeEntry *) hash_search(streamingXactSize,
+												   (void *) &stream_xid,
+												   HASH_ENTER, &found);
+	if (!found)
+		streamed_entry->xact_size = total_len; /* init */
+	else
+		streamed_entry->xact_size += total_len; /* update */
 }
 
 /*
@@ -3426,6 +3692,23 @@ ApplyWorkerMain(Datum main_arg)
 	elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
 		 MySubscription->conninfo);
 
+	/*
+	 * Initialize the apply worker's hash to manage bytes for
+	 * streaming txns.
+	 */
+	if (!am_tablesync_worker() && MySubscription->stream)
+	{
+		HASHCTL     hash_ctl;
+
+		hash_ctl.keysize = sizeof(TransactionId);
+		hash_ctl.entrysize = sizeof(XactSizeEntry);
+		hash_ctl.hcxt = LogicalStreamingContext;
+		streamingXactSize = hash_create("xact size per streaming xid",
+										PARALLEL_STREAMING_XACTS,
+										&hash_ctl,
+										HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+	}
+
 	if (am_tablesync_worker())
 	{
 		char	   *syncslotname;
@@ -3601,6 +3884,27 @@ ApplyWorkerMain(Datum main_arg)
 	proc_exit(0);
 }
 
+/* Exported so that stats collector can utilize this value */
+int64
+get_apply_error_context_xact_size(void)
+{
+	return apply_error_callback_arg.bytes;
+}
+
+/* Add size to apply error bytes */
+void
+add_apply_error_context_xact_size(int64 size)
+{
+	apply_error_callback_arg.bytes += size;
+}
+
+/* Reset information of apply error callback */
+void
+reset_apply_error_context_xact_size(void)
+{
+	apply_error_callback_arg.bytes = 0;
+}
+
 /*
  * Is current process a logical replication worker?
  */
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index aa17b82..c902795 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2411,7 +2411,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	8
+#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS	14
 	Oid			subid = PG_GETARG_OID(0);
 	Oid			subrelid;
 	TupleDesc	tupdesc;
@@ -2438,17 +2438,29 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 					   OIDOID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "relid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "commit_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "commit_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "error_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "error_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "abort_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "abort_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_relid",
 					   OIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "command",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "last_error_command",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 5, "xid",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "last_error_xid",
 					   XIDOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 6, "error_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "last_error_count",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 7, "error_message",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 13, "last_error_message",
 					   TEXTOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 14, "last_error_time",
 					   TIMESTAMPTZOID, -1, 0);
 	BlessTupleDesc(tupdesc);
 
@@ -2466,28 +2478,36 @@ pg_stat_get_subscription_worker(PG_FUNCTION_ARGS)
 	else
 		nulls[i++] = true;
 
-	/* relid */
+	/* transaction stats */
+	values[i++] = Int64GetDatum(wentry->xact_commit_count);
+	values[i++] = Int64GetDatum(wentry->xact_commit_bytes);
+	values[i++] = Int64GetDatum(wentry->xact_error_count);
+	values[i++] = Int64GetDatum(wentry->xact_error_bytes);
+	values[i++] = Int64GetDatum(wentry->xact_abort_count);
+	values[i++] = Int64GetDatum(wentry->xact_abort_bytes);
+
+	/* last_error_relid */
 	if (OidIsValid(wentry->relid))
 		values[i++] = ObjectIdGetDatum(wentry->relid);
 	else
 		nulls[i++] = true;
 
-	/* command */
+	/* last_error_command */
 	if (wentry->command != 0)
 		values[i++] = CStringGetTextDatum(logicalrep_message_type(wentry->command));
 	else
 		nulls[i++] = true;
 
-	/* xid */
+	/* last_error_xid */
 	if (TransactionIdIsValid(wentry->xid))
 		values[i++] = TransactionIdGetDatum(wentry->xid);
 	else
 		nulls[i++] = true;
 
-	/* error_count */
+	/* last_error_count */
 	values[i++] = Int64GetDatum(wentry->error_count);
 
-	/* error_message */
+	/* last_error_message */
 	values[i++] = CStringGetTextDatum(wentry->error_message);
 
 	/* last_error_time */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 528c39d..f276b0b 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5389,9 +5389,9 @@
   proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'oid oid',
-  proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz}',
-  proargmodes => '{i,i,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subrelid,subid,subrelid,relid,command,xid,error_count,error_message,last_error_time}',
+  proallargtypes => '{oid,oid,oid,oid,int8,int8,int8,int8,int8,int8,oid,text,xid,int8,text,timestamptz}',
+  proargmodes => '{i,i,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subrelid,subid,subrelid,commit_count,commit_bytes,error_count,error_bytes,abort_count,abort_bytes,last_error_relid,last_error_command,last_error_xid,last_error_count,last_error_message,last_error_time}',
   prosrc => 'pg_stat_get_subscription_worker' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index 694e38b..773e46c 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -110,6 +110,7 @@ typedef struct PartitionPruneState
 	PartitionPruningData *partprunedata[FLEXIBLE_ARRAY_MEMBER];
 } PartitionPruneState;
 
+extern size_t PartitionTupleRoutingSize(void);
 extern PartitionTupleRouting *ExecSetupPartitionTupleRouting(EState *estate,
 															 Relation rel);
 extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 3749bac..91a92da 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -87,6 +87,8 @@ typedef enum StatMsgType
 	PGSTAT_MTYPE_DISCONNECT,
 	PGSTAT_MTYPE_SUBSCRIPTIONPURGE,
 	PGSTAT_MTYPE_SUBWORKERERROR,
+	PGSTAT_MTYPE_SUBWORKERXACTEND,
+	PGSTAT_MTYPE_SUBWORKERTWOPHASEXACT,
 } StatMsgType;
 
 /* ----------
@@ -579,6 +581,56 @@ typedef struct PgStat_MsgSubscriptionPurge
 } PgStat_MsgSubscriptionPurge;
 
 /* ----------
+ * PgStat_MsgSubscriptionXactEnd	Sent by the apply worker or the table sync worker
+ *									to report successful transaction ends.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerXactEnd
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the worker entry */
+	Oid			m_databaseid;
+	Oid			m_subid;
+	Oid			m_subrelid;
+
+	/*
+	 * distinguish between transaction commits and streaming transaction aborts
+	 * that are handled without error.
+	 */
+	LogicalRepMsgType m_command;
+
+	/* memory consumption used by transaction */
+	PgStat_Counter m_xact_bytes;
+
+} PgStat_MsgSubWorkerXactEnd;
+
+/* ----------
+ * PgStat_MsgSubWorkerTwophaseXact	Sent by the apply worker to make size of prepared
+ *									txn persistent over the server restart and make it
+ *									visible after commit prepare or rollback prepared.
+ *									This is separated from PgStat_MsgSubWorkerXactEnd
+ *									so that we can reduce message size of gid for other
+ *									operations (e.g. normal COMMIT) that should happen more
+ *									frequently than prepare operation usually.
+ * ----------
+ */
+typedef struct PgStat_MsgSubWorkerTwophaseXact
+{
+	PgStat_MsgHdr m_hdr;
+
+	/* determine the subscription */
+	Oid			m_databaseid;
+	Oid			m_subid;
+
+	LogicalRepMsgType m_command;
+	char			m_gid[GIDSIZE];
+	int			gid_len;
+	PgStat_Counter m_xact_bytes;
+
+} PgStat_MsgSubWorkerTwophaseXact;
+
+/* ----------
  * PgStat_MsgSubWorkerError		Sent by the apply worker or the table sync worker to
  *								report the error occurred during logical replication.
  * ----------
@@ -598,6 +650,12 @@ typedef struct PgStat_MsgSubWorkerError
 	Oid			m_subrelid;
 
 	/*
+	 * Transaction stats of subscription needs to be updated when an
+	 * error occurs.
+	 */
+	PgStat_Counter m_xact_error_bytes;
+
+	/*
 	 * Oids of the database and the table that the reporter was actually
 	 * processing. m_relid can be InvalidOid if an error occurred during
 	 * worker applying a non-data-modification message such as RELATION.
@@ -790,6 +848,8 @@ typedef union PgStat_Msg
 	PgStat_MsgDisconnect msg_disconnect;
 	PgStat_MsgSubscriptionPurge msg_subscriptionpurge;
 	PgStat_MsgSubWorkerError msg_subworkererror;
+	PgStat_MsgSubWorkerXactEnd msg_subworkerxactend;
+	PgStat_MsgSubWorkerTwophaseXact msg_subworkertwophasexact;
 } PgStat_Msg;
 
 
@@ -1027,6 +1087,16 @@ typedef struct PgStat_StatSubWorkerEntry
 	PgStat_StatSubWorkerKey key;	/* hash key (must be first) */
 
 	/*
+	 * Cumulative transaction statistics of subscription worker
+	 */
+	PgStat_Counter xact_commit_count;
+	PgStat_Counter xact_commit_bytes;
+	PgStat_Counter xact_error_count;
+	PgStat_Counter xact_error_bytes;
+	PgStat_Counter xact_abort_count;
+	PgStat_Counter xact_abort_bytes;
+
+	/*
 	 * Subscription worker error statistics representing an error that
 	 * occurred during application of logical replication or the initial table
 	 * synchronization.
@@ -1040,6 +1110,22 @@ typedef struct PgStat_StatSubWorkerEntry
 	char		error_message[PGSTAT_SUBWORKERERROR_MSGLEN];
 } PgStat_StatSubWorkerEntry;
 
+/* prepared transaction */
+typedef struct PgStat_StatSubWorkerPreparedXact
+{
+	Oid			subid;
+	char		gid[GIDSIZE];
+} PgStat_StatSubWorkerPreparedXact;
+
+typedef struct PgStat_StatSubWorkerPreparedXactSize
+{
+	PgStat_StatSubWorkerPreparedXact key; /* hash key */
+
+	Oid			subid;
+	char		gid[GIDSIZE];
+	PgStat_Counter xact_size;
+} PgStat_StatSubWorkerPreparedXactSize;
+
 /*
  * Working state needed to accumulate per-function-call timing statistics.
  */
@@ -1149,6 +1235,10 @@ extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat);
 extern void pgstat_report_replslot_create(const char *slotname);
 extern void pgstat_report_replslot_drop(const char *slotname);
+extern void pgstat_report_subworker_xact_end(Oid subid, Oid subrel,
+											 LogicalRepMsgType command, PgStat_Counter xact_size);
+extern void pgstat_report_subworker_twophase_xact(Oid subid, LogicalRepMsgType command,
+												  PgStat_Counter xact_size, char *gid);
 extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid,
 										  LogicalRepMsgType command,
 										  TransactionId xid, const char *errmsg);
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 2ad61a0..9a8447b 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,9 @@ extern void ApplyWorkerMain(Datum main_arg);
 
 extern bool IsLogicalWorker(void);
 
+/* for transaction stats */
+extern int64 get_apply_error_context_xact_size(void);
+extern void add_apply_error_context_xact_size(int64 size);
+extern void reset_apply_error_context_xact_size(void);
+
 #endif							/* LOGICALWORKER_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index d7d17b7..7445041 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2097,11 +2097,17 @@ pg_stat_subscription| SELECT su.oid AS subid,
 pg_stat_subscription_workers| SELECT w.subid,
     s.subname,
     w.subrelid,
-    w.relid,
-    w.command,
-    w.xid,
+    w.commit_count,
+    w.commit_bytes,
     w.error_count,
-    w.error_message,
+    w.error_bytes,
+    w.abort_count,
+    w.abort_bytes,
+    w.last_error_relid,
+    w.last_error_command,
+    w.last_error_xid,
+    w.last_error_count,
+    w.last_error_message,
     w.last_error_time
    FROM ( SELECT pg_subscription.oid AS subid,
             NULL::oid AS relid
@@ -2111,7 +2117,7 @@ pg_stat_subscription_workers| SELECT w.subid,
             pg_subscription_rel.srrelid AS relid
            FROM pg_subscription_rel
           WHERE (pg_subscription_rel.srsubstate <> 'r'::"char")) sr,
-    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, relid, command, xid, error_count, error_message, last_error_time)
+    (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, commit_count, commit_bytes, error_count, error_bytes, abort_count, abort_bytes, last_error_relid, last_error_command, last_error_xid, last_error_count, last_error_message, last_error_time)
      JOIN pg_subscription s ON ((w.subid = s.oid)));
 pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
     pg_stat_all_indexes.indexrelid,
diff --git a/src/test/subscription/t/026_error_report.pl b/src/test/subscription/t/026_error_report.pl
index 3d23bb5..9ff619e 100644
--- a/src/test/subscription/t/026_error_report.pl
+++ b/src/test/subscription/t/026_error_report.pl
@@ -15,8 +15,8 @@ sub test_subscription_error
 
     my $check_sql = qq[
 SELECT count(1) > 0 FROM pg_stat_subscription_workers
-WHERE relid = '$relname'::regclass];
-    $check_sql .= " AND xid = '$xid'::xid;" if $xid ne '';
+WHERE last_error_relid = '$relname'::regclass];
+    $check_sql .= " AND last_error_xid = '$xid'::xid;" if $xid ne '';
 
     # Wait for the error statistics to be updated.
     $node->poll_query_until(
@@ -26,9 +26,9 @@ WHERE relid = '$relname'::regclass];
     my $result = $node->safe_psql(
 	'postgres',
 	qq[
-SELECT subname, command, relid::regclass, error_count > 0
+SELECT subname, last_error_command, last_error_relid::regclass, last_error_count > 0
 FROM pg_stat_subscription_workers
-WHERE relid = '$relname'::regclass;
+WHERE last_error_relid = '$relname'::regclass;
 ]);
     is($result, $expected_error, $msg);
 }
diff --git a/src/test/subscription/t/027_worker_xact_stats.pl b/src/test/subscription/t/027_worker_xact_stats.pl
new file mode 100644
index 0000000..0057e8d
--- /dev/null
+++ b/src/test/subscription/t/027_worker_xact_stats.pl
@@ -0,0 +1,173 @@
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Tests for subscription worker statistics during apply.
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More tests => 2;
+
+# Create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', qq[
+logical_decoding_work_mem = 64kB
+max_prepared_transactions = 10
+max_wal_senders = 10
+wal_sender_timeout = 0
+]);
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf', qq[
+max_prepared_transactions = 10
+log_min_messages = DEBUG1
+]);
+$node_subscriber->start;
+
+# Setup structures on the publisher and the subscriber
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_tab (a int primary key);");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+my $subopts = 'streaming = on, two_phase = on, copy_data = false';
+$node_subscriber->safe_psql('postgres', qq[
+CREATE SUBSCRIPTION tap_sub
+CONNECTION '$publisher_connstr application_name=$appname'
+PUBLICATION tap_pub WITH ($subopts);
+]);
+
+$node_publisher->wait_for_catchup($appname);
+
+# There's no entry at the beginning
+my $result = $node_subscriber->safe_psql('postgres',
+"SELECT count(*) FROM pg_stat_subscription_workers;");
+is($result, q(0), 'no entry for transaction stats yet');
+
+# COMMIT
+$node_publisher->safe_psql('postgres',
+    "BEGIN; INSERT INTO test_tab VALUES (1); COMMIT;");
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where commit_count = 1;")
+  or die "didn't get updates of xact stats by commit";
+
+# Now, stats collector make the bytes updated also.
+$result = $node_subscriber->safe_psql('postgres',
+"SELECT commit_bytes > 0 FROM pg_stat_subscription_workers;");
+is($result, q(t), 'got consumed bytes');
+
+# STREAM COMMIT
+$node_publisher->safe_psql(
+    'postgres',
+    "BEGIN; INSERT INTO test_tab VALUES(generate_series(1001, 2000)); COMMIT;");
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where commit_count = 2;")
+  or die "didn't get updates of xact stats by stream commit";
+
+# STREAM PREPARE & COMMIT PREPARED
+# This should restore the xact size before the shutdown.
+$node_publisher->safe_psql(
+    'postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES(generate_series(2001, 3000));
+PREPARE TRANSACTION 'gid1';
+]);
+
+# This streamed prepare is not displayed until the commit prepared
+# or rollback prepared. Hence, there's no way to confirm that
+# stats collector has received the bytes of prepared transaction.
+# So, instead of checking the view, issue one more committed transaction
+# after the prepare and make sure that this commit's update is done,
+# which should mean the previous streamed prepare is already processed
+# by the stats collector as well.
+$node_publisher->safe_psql('postgres',
+"BEGIN; INSERT INTO test_tab VALUES (2); COMMIT;");
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where commit_count = 3;")
+  or die "didn't process the updates of committed transaction";
+
+$node_subscriber->restart;
+
+# Get commit_bytes before commit prepared.
+my $tmp = $node_subscriber->safe_psql('postgres',
+"SELECT commit_bytes FROM pg_stat_subscription_workers where commit_count = 3;");
+
+# Commit prepared increments the commit_count.
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'gid1'");
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where commit_count = 4;")
+  or die "didn't get updates of xact stats by stream prepare and commit prepared";
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT commit_bytes > $tmp FROM pg_stat_subscription_workers where commit_count = 4;");
+
+# STREAM ABORT
+# Store previous stream_counts to recognize
+# another streaming from the increase of this value below.
+$tmp = $node_publisher->safe_psql('postgres',
+"SELECT stream_count FROM pg_stat_replication_slots");
+
+# Cause a new streaming and check it by another session
+my $in = '';
+my $out = '';
+my $timer = IPC::Run::timeout(180);
+my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer,
+	on_error_stop => 1);
+
+$in .= q{
+BEGIN;
+INSERT INTO test_tab VALUES(generate_series(3001, 4000));
+};
+$h->pump_nb;
+
+# Wait until this transaction is streamed certainly
+# and after that rollback to send stream abort.
+$node_publisher->poll_query_until('postgres',
+"SELECT stream_count > $tmp FROM pg_stat_replication_slots;")
+  or die "didn't stream data of a new transaction";
+
+$in .= q{
+ROLLBACK;
+\q
+};
+$h->finish;
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where abort_count = 1;")
+  or die "didn't get updates of xact stats by stream abort";
+
+# ROLLBACK PREPARED
+$node_publisher->safe_psql('postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES (3);
+PREPARE TRANSACTION 'gid2';
+]);
+$node_publisher->safe_psql('postgres', "ROLLBACK PREPARED 'gid2'");
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where abort_count = 2;")
+  or die "didn't get updates of xact stats by rollback prepared";
+
+# error stats (by duplication error)
+$node_publisher->safe_psql('postgres', q[
+BEGIN;
+INSERT INTO test_tab VALUES (1);
+COMMIT;
+]);
+
+$node_subscriber->poll_query_until('postgres',
+"SELECT count(1) = 1 FROM pg_stat_subscription_workers where error_count > 0;")
+  or die "didn't get updates of xact stats by error";
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e4d78a9..693aae5 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -116,6 +116,7 @@ AppendState
 ApplyErrorCallbackArg
 ApplyExecutionData
 ApplySubXactData
+ApplyTxnExtraData
 Archive
 ArchiveEntryPtrType
 ArchiveFormat
@@ -1947,6 +1948,8 @@ PgStat_MsgSLRU
 PgStat_MsgSubWorkerError
 PgStat_MsgSubWorkerErrorPurge
 PgStat_MsgSubWorkerPurge
+PgStat_MsgSubWorkerTwophaseXact
+PgStat_MsgSubWorkerXactEnd
 PgStat_MsgTabpurge
 PgStat_MsgTabstat
 PgStat_MsgTempFile
@@ -1960,6 +1963,8 @@ PgStat_StatFuncEntry
 PgStat_StatReplSlotEntry
 PgStat_StatSubWorkerEntry
 PgStat_StatSubWorkerKey
+PgStat_StatSubWorkerPreparedXact
+PgStat_StatSubWorkerPreparedXactSize
 PgStat_StatTabEntry
 PgStat_SubXactStatus
 PgStat_TableCounts
@@ -2957,6 +2962,7 @@ XactCallback
 XactCallbackItem
 XactEvent
 XactLockTableWaitInfo
+XactSizeEntry
 XidBoundsViolation
 XidCacheStatus
 XidCommitStatus
-- 
2.2.0

