From 1305433d20466eddfeacdc218bdae4fe084dadbe Mon Sep 17 00:00:00 2001
From: Ashutosh Sharma <ashu.coek88@gmail.com>
Date: Mon, 16 Mar 2026 06:48:35 +0000
Subject: [PATCH 1/2] Report downstream sent bytes in pg_stat_replication_slots

pg_stat_replication_slots currently tracks bytes added to the reorder
buffer, but it does not track bytes sent to downstream consumers.
Add sent_bytes to expose that metric and help diagnose replication
throughput issues caused by a slow or stalled downstream consumer.

Author: Ashutosh Bapat <ashutosh.bapat.oss@gmail.com>
Author: Ashutosh Sharma <ashu.coek88@gmail.com>
Reviewed-by: Shveta Malik <shveta.malik@gmail.com>
Reviewed-by: Bertrand Drouvot <bertranddrouvot.pg@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Ashutosh Sharma <ashu.coek88@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://www.postgresql.org/message-id/CAExHW5s6KntzUyUoMbKR5dgwRmdV2Ay_2+AnTgYGAzo=Qv61wA@mail.gmail.com
---
 contrib/test_decoding/expected/stats.out      | 48 +++++++++----------
 contrib/test_decoding/sql/stats.sql           |  7 +--
 contrib/test_decoding/t/001_repl_stats.pl     | 13 ++---
 doc/src/sgml/monitoring.sgml                  | 17 +++++++
 src/backend/catalog/system_views.sql          |  1 +
 src/backend/replication/logical/logical.c     |  7 ++-
 .../replication/logical/logicalfuncs.c        |  8 ++++
 .../replication/logical/reorderbuffer.c       |  1 +
 src/backend/replication/walsender.c           |  3 ++
 src/backend/utils/activity/pgstat_replslot.c  |  1 +
 src/backend/utils/adt/pgstatfuncs.c           | 21 ++++----
 src/include/catalog/pg_proc.dat               |  6 +--
 src/include/pgstat.h                          |  1 +
 src/include/replication/reorderbuffer.h       |  1 +
 src/test/recovery/t/006_logical_decoding.pl   | 12 ++---
 src/test/regress/expected/rules.out           |  3 +-
 16 files changed, 96 insertions(+), 54 deletions(-)

diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out
index a9ead3c41aa..2dd45460317 100644
--- a/contrib/test_decoding/expected/stats.out
+++ b/contrib/test_decoding/expected/stats.out
@@ -37,12 +37,12 @@ SELECT pg_stat_force_next_flush();
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t          | t           | t          | t           | t
- regression_slot_stats2 | t          | t           | t          | t           | t
- regression_slot_stats3 | t          | t           | t          | t           | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_bytes > 0 AS sent_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | sent_bytes | mem_exceeded_count 
+------------------------+------------+-------------+------------+-------------+------------+--------------------
+ regression_slot_stats1 | t          | t           | t          | t           | t          | t
+ regression_slot_stats2 | t          | t           | t          | t           | t          | t
+ regression_slot_stats3 | t          | t           | t          | t           | t          | t
 (3 rows)
 
 RESET logical_decoding_work_mem;
@@ -53,12 +53,12 @@ SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t          | t           | f          | f           | t
- regression_slot_stats2 | t          | t           | t          | t           | t
- regression_slot_stats3 | t          | t           | t          | t           | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_bytes > 0 AS sent_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | sent_bytes | mem_exceeded_count 
+------------------------+------------+-------------+------------+-------------+------------+--------------------
+ regression_slot_stats1 | t          | t           | f          | f           | f          | t
+ regression_slot_stats2 | t          | t           | t          | t           | t          | t
+ regression_slot_stats3 | t          | t           | t          | t           | t          | t
 (3 rows)
 
 -- reset stats for all slots
@@ -68,27 +68,27 @@ SELECT pg_stat_reset_replication_slot(NULL);
  
 (1 row)
 
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
-       slot_name        | spill_txns | spill_count | total_txns | total_bytes | mem_exceeded_count 
-------------------------+------------+-------------+------------+-------------+--------------------
- regression_slot_stats1 | t          | t           | f          | f           | t
- regression_slot_stats2 | t          | t           | f          | f           | t
- regression_slot_stats3 | t          | t           | f          | f           | t
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+       slot_name        | spill_txns | spill_count | total_txns | total_bytes | sent_bytes | mem_exceeded_count 
+------------------------+------------+-------------+------------+-------------+------------+--------------------
+ regression_slot_stats1 | t          | t           | f          | f           |          0 | t
+ regression_slot_stats2 | t          | t           | f          | f           |          0 | t
+ regression_slot_stats3 | t          | t           | f          | f           |          0 | t
 (3 rows)
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+--------------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 |                   0 |                    | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | sent_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+------------+---------------------+--------------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 |          0 |                   0 |                    | 
 (1 row)
 
 SELECT pg_stat_reset_replication_slot('do-not-exist');
 ERROR:  replication slot "do-not-exist" does not exist
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
-  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset 
---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+--------------------+-------------
- do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 |                   0 |                    | 
+  slot_name   | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | sent_bytes | slotsync_skip_count | slotsync_last_skip | stats_reset 
+--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+------------+---------------------+--------------------+-------------
+ do-not-exist |          0 |           0 |           0 |           0 |            0 |            0 |                  0 |          0 |           0 |          0 |                   0 |                    | 
 (1 row)
 
 -- spilling the xact
diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql
index 6661dbcb85c..9f85455bce1 100644
--- a/contrib/test_decoding/sql/stats.sql
+++ b/contrib/test_decoding/sql/stats.sql
@@ -15,16 +15,17 @@ SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats1', NULL,
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats2', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot_stats3', NULL, NULL, 'skip-empty-xacts', '1');
 SELECT pg_stat_force_next_flush();
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_bytes > 0 AS sent_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 RESET logical_decoding_work_mem;
 
 -- reset stats for one slot, others should be unaffected
 SELECT pg_stat_reset_replication_slot('regression_slot_stats1');
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_bytes > 0 AS sent_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- reset stats for all slots
 SELECT pg_stat_reset_replication_slot(NULL);
-SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
+SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes, sent_bytes, mem_exceeded_count = 0 AS mem_exceeded_count FROM pg_stat_replication_slots ORDER BY slot_name;
 
 -- verify accessing/resetting stats for non-existent slot does something reasonable
 SELECT * FROM pg_stat_get_replication_slot('do-not-exist');
diff --git a/contrib/test_decoding/t/001_repl_stats.pl b/contrib/test_decoding/t/001_repl_stats.pl
index 6814c792e2b..88899824bae 100644
--- a/contrib/test_decoding/t/001_repl_stats.pl
+++ b/contrib/test_decoding/t/001_repl_stats.pl
@@ -26,7 +26,8 @@ sub test_slot_stats
 	my $result = $node->safe_psql(
 		'postgres', qq[
 		SELECT slot_name, total_txns > 0 AS total_txn,
-			   total_bytes > 0 AS total_bytes
+			   total_bytes > 0 AS total_bytes,
+			   sent_bytes > 0 AS sent_bytes
 			   FROM pg_stat_replication_slots
 			   ORDER BY slot_name]);
 	is($result, $expected, $msg);
@@ -80,9 +81,9 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t
-regression_slot3|t|t),
+	qq(regression_slot1|t|t|t
+regression_slot2|t|t|t
+regression_slot3|t|t|t),
 	'check replication statistics are updated');
 
 # Test to remove one of the replication slots and adjust
@@ -104,8 +105,8 @@ $node->start;
 # restart.
 test_slot_stats(
 	$node,
-	qq(regression_slot1|t|t
-regression_slot2|t|t),
+	qq(regression_slot1|t|t|t
+regression_slot2|t|t|t),
 	'check replication statistics after removing the slot file');
 
 # cleanup
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 9c5c6dc490f..addf99e06e9 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1673,6 +1673,23 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+        <structfield>sent_bytes</structfield><type>bigint</type>
+       </para>
+       <para>
+        Amount of transaction changes sent downstream for this slot by the
+        output plugin after applying output plugin filters, if any, and
+        converting it into the output plugin format. After applying filters,
+        the amount of data that gets converted is less than
+        <structfield>total_bytes</structfield>. But the format of data before
+        and after the conversion is different. Hence the value of
+        <structfield>sent_bytes</structfield> is not directly related to the
+        value of <structfield>total_bytes</structfield>.
+       </para>
+      </entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
         <structfield>slotsync_skip_count</structfield><type>bigint</type>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 90d48bc9c80..6e8e6147761 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1105,6 +1105,7 @@ CREATE VIEW pg_stat_replication_slots AS
             s.mem_exceeded_count,
             s.total_txns,
             s.total_bytes,
+            s.sent_bytes,
             s.slotsync_skip_count,
             s.slotsync_last_skip,
             s.stats_reset
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 603a2b94d05..73338959851 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1950,7 +1950,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		rb->memExceededCount <= 0)
 		return;
 
-	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
+	elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
 		 rb,
 		 rb->spillTxns,
 		 rb->spillCount,
@@ -1960,7 +1960,8 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 		 rb->streamBytes,
 		 rb->memExceededCount,
 		 rb->totalTxns,
-		 rb->totalBytes);
+		 rb->totalBytes,
+		 rb->sentBytes);
 
 	repSlotStat.spill_txns = rb->spillTxns;
 	repSlotStat.spill_count = rb->spillCount;
@@ -1971,6 +1972,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	repSlotStat.mem_exceeded_count = rb->memExceededCount;
 	repSlotStat.total_txns = rb->totalTxns;
 	repSlotStat.total_bytes = rb->totalBytes;
+	repSlotStat.sent_bytes = rb->sentBytes;
 
 	pgstat_report_replslot(ctx->slot, &repSlotStat);
 
@@ -1983,6 +1985,7 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
 	rb->memExceededCount = 0;
 	rb->totalTxns = 0;
 	rb->totalBytes = 0;
+	rb->sentBytes = 0;
 }
 
 /*
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 9760818941d..b8a9209c300 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -65,6 +65,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 	Datum		values[3];
 	bool		nulls[3];
 	DecodingOutputState *p;
+	int64		sentBytes = 0;
 
 	/* SQL Datums can only be of a limited length... */
 	if (ctx->out->len > MaxAllocSize - VARHDRSZ)
@@ -74,7 +75,9 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 
 	memset(nulls, 0, sizeof(nulls));
 	values[0] = LSNGetDatum(lsn);
+	sentBytes += sizeof(XLogRecPtr);
 	values[1] = TransactionIdGetDatum(xid);
+	sentBytes += sizeof(TransactionId);
 
 	/*
 	 * Assert ctx->out is in database encoding when we're writing textual
@@ -87,8 +90,13 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
 
 	/* ick, but cstring_to_text_with_len works for bytea perfectly fine */
 	values[2] = PointerGetDatum(cstring_to_text_with_len(ctx->out->data, ctx->out->len));
+	sentBytes += ctx->out->len;
 
 	tuplestore_putvalues(p->tupstore, p->tupdesc, values, nulls);
+
+	/* Update the amount of data sent downstream. */
+	ctx->reorder->sentBytes += sentBytes;
+
 	p->returned_rows++;
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 682d13c9f22..33df98541b3 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -394,6 +394,7 @@ ReorderBufferAllocate(void)
 	buffer->memExceededCount = 0;
 	buffer->totalTxns = 0;
 	buffer->totalBytes = 0;
+	buffer->sentBytes = 0;
 
 	buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
 
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 376ff46340d..0c015406023 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1599,6 +1599,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
 	/* output previously gathered data in a CopyData packet */
 	pq_putmessage_noblock(PqMsg_CopyData, ctx->out->data, ctx->out->len);
 
+	/* Update the amount of data sent downstream. */
+	ctx->reorder->sentBytes += ctx->out->len + 1;	/* +1 for the 'd' */
+
 	CHECK_FOR_INTERRUPTS();
 
 	/* Try to flush pending output to the client */
diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c
index 168ef8f8f45..e324e1d20bd 100644
--- a/src/backend/utils/activity/pgstat_replslot.c
+++ b/src/backend/utils/activity/pgstat_replslot.c
@@ -96,6 +96,7 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re
 	REPLSLOT_ACC(mem_exceeded_count);
 	REPLSLOT_ACC(total_txns);
 	REPLSLOT_ACC(total_bytes);
+	REPLSLOT_ACC(sent_bytes);
 #undef REPLSLOT_ACC
 
 	pgstat_unlock_entry(entry_ref);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index bad5642d9c9..4b6b31ae002 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -2145,7 +2145,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_REPLICATION_SLOT_COLS 13
+#define PG_STAT_GET_REPLICATION_SLOT_COLS 14
 	text	   *slotname_text = PG_GETARG_TEXT_P(0);
 	NameData	slotname;
 	TupleDesc	tupdesc;
@@ -2176,11 +2176,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 					   INT8OID, -1, 0);
 	TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "slotsync_skip_count",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 11, "sent_bytes",
 					   INT8OID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_last_skip",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_skip_count",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 13, "slotsync_last_skip",
 					   TIMESTAMPTZOID, -1, 0);
-	TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset",
+	TupleDescInitEntry(tupdesc, (AttrNumber) 14, "stats_reset",
 					   TIMESTAMPTZOID, -1, 0);
 	TupleDescFinalize(tupdesc);
 	BlessTupleDesc(tupdesc);
@@ -2207,17 +2209,18 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 	values[7] = Int64GetDatum(slotent->mem_exceeded_count);
 	values[8] = Int64GetDatum(slotent->total_txns);
 	values[9] = Int64GetDatum(slotent->total_bytes);
-	values[10] = Int64GetDatum(slotent->slotsync_skip_count);
+	values[10] = Int64GetDatum(slotent->sent_bytes);
+	values[11] = Int64GetDatum(slotent->slotsync_skip_count);
 
 	if (slotent->slotsync_last_skip == 0)
-		nulls[11] = true;
+		nulls[12] = true;
 	else
-		values[11] = TimestampTzGetDatum(slotent->slotsync_last_skip);
+		values[12] = TimestampTzGetDatum(slotent->slotsync_last_skip);
 
 	if (slotent->stat_reset_timestamp == 0)
-		nulls[12] = true;
+		nulls[13] = true;
 	else
-		values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
+		values[13] = TimestampTzGetDatum(slotent->stat_reset_timestamp);
 
 	/* Returns the record as Datum */
 	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 361e2cfffeb..1e94aa2825e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5711,9 +5711,9 @@
 { oid => '6169', descr => 'statistics: information about replication slot',
   proname => 'pg_stat_get_replication_slot', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'text',
-  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,slotsync_skip_count,slotsync_last_skip,stats_reset}',
+  proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,sent_bytes,slotsync_skip_count,slotsync_last_skip,stats_reset}',
   prosrc => 'pg_stat_get_replication_slot' },
 
 { oid => '6230', descr => 'statistics: check if a stats object exists',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 216b93492ba..ca0bb599a1a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -403,6 +403,7 @@ typedef struct PgStat_StatReplSlotEntry
 	PgStat_Counter mem_exceeded_count;
 	PgStat_Counter total_txns;
 	PgStat_Counter total_bytes;
+	PgStat_Counter sent_bytes;
 	PgStat_Counter slotsync_skip_count;
 	TimestampTz slotsync_last_skip;
 	TimestampTz stat_reset_timestamp;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 2d717a9e152..d3a6c7cd85b 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -699,6 +699,7 @@ struct ReorderBuffer
 	 */
 	int64		totalTxns;		/* total number of transactions sent */
 	int64		totalBytes;		/* total amount of data decoded */
+	int64		sentBytes;		/* amount of data decoded and sent downstream */
 };
 
 
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 97d11f98b59..5354322e196 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -214,10 +214,10 @@ my $stats_test_slot2 = 'logical_slot';
 # Stats exist for stats test slot 1
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT total_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT total_bytes > 0, sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Total bytes is > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
+	qq(t|t|t),
+	qq(Total bytes and sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.)
 );
 
 # Do reset of stats for stats test slot 1
@@ -235,10 +235,10 @@ $node_primary->safe_psql('postgres',
 
 is( $node_primary->safe_psql(
 		'postgres',
-		qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
+		qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, sent_bytes = 0 FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1')
 	),
-	qq(t|t),
-	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes was set to 0.)
+	qq(t|t|t),
+	qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and sent_bytes were set to 0.)
 );
 
 # Check that test slot 2 has NULL in reset timestamp
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 71d7262049e..7b84a6d79ad 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2201,11 +2201,12 @@ pg_stat_replication_slots| SELECT s.slot_name,
     s.mem_exceeded_count,
     s.total_txns,
     s.total_bytes,
+    s.sent_bytes,
     s.slotsync_skip_count,
     s.slotsync_last_skip,
     s.stats_reset
    FROM pg_replication_slots r,
-    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, slotsync_skip_count, slotsync_last_skip, stats_reset)
+    LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, sent_bytes, slotsync_skip_count, slotsync_last_skip, stats_reset)
   WHERE (r.datoid IS NOT NULL);
 pg_stat_slru| SELECT name,
     blks_zeroed,
-- 
2.43.0

