From 68d5222520c245427d6e3a9ecc95657a36212f09 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Tue, 15 Sep 2020 07:17:26 -0400
Subject: [PATCH 1/3] Support decoding of two-phase transactions

Until now two-phase transactions were decoded at COMMIT, just like
regular transaction. During replay, two-phase transactions were
translated into regular transactions on the subscriber, and the GID
was not forwarded to it.

This patch allows PREPARE-time decoding two-phase transactions (if
the output plugin supports this capability), in which case the
transactions are replayed at PREPARE and then committed later when
COMMIT PREPARED arrives.

Includes documentation changes.
---
 contrib/test_decoding/expected/prepared.out     | 185 ++++++++++++++++++---
 contrib/test_decoding/sql/prepared.sql          |  77 ++++++++-
 contrib/test_decoding/test_decoding.c           | 181 ++++++++++++++++++++
 doc/src/sgml/logicaldecoding.sgml               | 127 +++++++++++++-
 src/backend/replication/logical/decode.c        | 141 ++++++++++++++--
 src/backend/replication/logical/logical.c       | 194 ++++++++++++++++++++++
 src/backend/replication/logical/reorderbuffer.c | 209 +++++++++++++++++++++---
 src/include/replication/output_plugin.h         |  46 ++++++
 src/include/replication/reorderbuffer.h         |  78 ++++++++-
 9 files changed, 1165 insertions(+), 73 deletions(-)

diff --git a/contrib/test_decoding/expected/prepared.out b/contrib/test_decoding/expected/prepared.out
index 46e915d..94fb0c9 100644
--- a/contrib/test_decoding/expected/prepared.out
+++ b/contrib/test_decoding/expected/prepared.out
@@ -6,19 +6,50 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
  init
 (1 row)
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:1
+ PREPARE TRANSACTION 'test_prepared#1'
+(3 rows)
+
 COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#1'
+(1 row)
+
 INSERT INTO test_prepared1 VALUES (2);
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:2
+ COMMIT
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:3
+ PREPARE TRANSACTION 'test_prepared#2'
+(6 rows)
+
 ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                data                 
+-------------------------------------
+ ROLLBACK PREPARED 'test_prepared#2'
+(1 row)
+
 INSERT INTO test_prepared1 VALUES (4);
 -- test prepared xact containing ddl
 BEGIN;
@@ -26,45 +57,149 @@ INSERT INTO test_prepared1 VALUES (5);
 ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
-INSERT INTO test_prepared2 VALUES (7);
-COMMIT PREPARED 'test_prepared#3';
--- make sure stuff still works
-INSERT INTO test_prepared1 VALUES (8);
-INSERT INTO test_prepared2 VALUES (9);
--- cleanup
-DROP TABLE test_prepared1;
-DROP TABLE test_prepared2;
--- show results
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+    relation     | locktype |        mode         
+-----------------+----------+---------------------
+ test_prepared_1 | relation | RowExclusiveLock
+ test_prepared_1 | relation | AccessExclusiveLock
+(2 rows)
+
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
                                   data                                   
 -------------------------------------------------------------------------
  BEGIN
- table public.test_prepared1: INSERT: id[integer]:1
- COMMIT
- BEGIN
- table public.test_prepared1: INSERT: id[integer]:2
- COMMIT
- BEGIN
  table public.test_prepared1: INSERT: id[integer]:4
  COMMIT
  BEGIN
- table public.test_prepared2: INSERT: id[integer]:7
- COMMIT
- BEGIN
  table public.test_prepared1: INSERT: id[integer]:5
  table public.test_prepared1: INSERT: id[integer]:6 data[text]:'frakbar'
+ PREPARE TRANSACTION 'test_prepared#3'
+(7 rows)
+
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
+INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                        data                        
+----------------------------------------------------
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:7
  COMMIT
+(3 rows)
+
+COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+               data                
+-----------------------------------
+ COMMIT PREPARED 'test_prepared#3'
+(1 row)
+
+-- make sure stuff still works
+INSERT INTO test_prepared1 VALUES (8);
+INSERT INTO test_prepared2 VALUES (9);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                data                                
+--------------------------------------------------------------------
  BEGIN
  table public.test_prepared1: INSERT: id[integer]:8 data[text]:null
  COMMIT
  BEGIN
  table public.test_prepared2: INSERT: id[integer]:9
  COMMIT
-(22 rows)
+(6 rows)
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+ relation | locktype | mode 
+----------+----------+------
+(0 rows)
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:10 data[text]:'othercol'
+ table public.test_prepared1: INSERT: id[integer]:11 data[text]:'othercol2'
+ PREPARE TRANSACTION 'test_prepared_lock'
+ BEGIN
+ table public.test_prepared2: INSERT: id[integer]:12
+ PREPARE TRANSACTION 'test_prepared_lock2'
+ COMMIT PREPARED 'test_prepared_lock2'
+(8 rows)
+
+RESET statement_timeout;
+COMMIT PREPARED 'test_prepared_lock';
+-- will work normally after we commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                 data                 
+--------------------------------------
+ COMMIT PREPARED 'test_prepared_lock'
+(1 row)
+
+-- test savepoints
+BEGIN;
+SAVEPOINT test_savepoint;
+CREATE TABLE test_prepared_savepoint (a int);
+PREPARE TRANSACTION 'test_prepared_savepoint';
+COMMIT PREPARED 'test_prepared_savepoint';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                   data                    
+-------------------------------------------
+ COMMIT PREPARED 'test_prepared_savepoint'
+(1 row)
+
+-- test that a GID containing "nodecode" gets decoded at commit prepared time
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                data                                 
+---------------------------------------------------------------------
+ BEGIN
+ table public.test_prepared1: INSERT: id[integer]:20 data[text]:null
+ COMMIT
+(3 rows)
+
+-- cleanup
+DROP TABLE test_prepared1;
+DROP TABLE test_prepared2;
+-- show results. There should be nothing to show
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
 
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
diff --git a/contrib/test_decoding/sql/prepared.sql b/contrib/test_decoding/sql/prepared.sql
index e726397..ca801e4 100644
--- a/contrib/test_decoding/sql/prepared.sql
+++ b/contrib/test_decoding/sql/prepared.sql
@@ -2,21 +2,25 @@
 SET synchronous_commit = on;
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
-CREATE TABLE test_prepared1(id int);
-CREATE TABLE test_prepared2(id int);
+CREATE TABLE test_prepared1(id integer primary key);
+CREATE TABLE test_prepared2(id integer primary key);
 
 -- test simple successful use of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (1);
 PREPARE TRANSACTION 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 COMMIT PREPARED 'test_prepared#1';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 INSERT INTO test_prepared1 VALUES (2);
 
 -- test abort of a prepared xact
 BEGIN;
 INSERT INTO test_prepared1 VALUES (3);
 PREPARE TRANSACTION 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 ROLLBACK PREPARED 'test_prepared#2';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 INSERT INTO test_prepared1 VALUES (4);
 
@@ -27,24 +31,83 @@ ALTER TABLE test_prepared1 ADD COLUMN data text;
 INSERT INTO test_prepared1 VALUES (6, 'frakbar');
 PREPARE TRANSACTION 'test_prepared#3';
 
--- test that we decode correctly while an uncommitted prepared xact
--- with ddl exists.
+SELECT 'test_prepared_1' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'test_prepared1'::regclass;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
--- separate table because of the lock from the ALTER
--- this will come before the '5' row above, as this commits before it.
+-- Test that we decode correctly while an uncommitted prepared xact
+-- with ddl exists.
+--
+-- Use a separate table for the concurrent transaction because the lock from
+-- the ALTER will stop us inserting into the other one.
+--
+-- We should see '7' before '5' in our results since it commits first.
+--
 INSERT INTO test_prepared2 VALUES (7);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 COMMIT PREPARED 'test_prepared#3';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- make sure stuff still works
 INSERT INTO test_prepared1 VALUES (8);
 INSERT INTO test_prepared2 VALUES (9);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Check `CLUSTER` (as operation that hold exclusive lock) doesn't block
+-- logical decoding.
+BEGIN;
+INSERT INTO test_prepared1 VALUES (10, 'othercol');
+CLUSTER test_prepared1 USING test_prepared1_pkey;
+INSERT INTO test_prepared1 VALUES (11, 'othercol2');
+PREPARE TRANSACTION 'test_prepared_lock';
+
+BEGIN;
+insert into test_prepared2 values (12);
+PREPARE TRANSACTION 'test_prepared_lock2';
+COMMIT PREPARED 'test_prepared_lock2';
+
+SELECT 'pg_class' AS relation, locktype, mode
+FROM pg_locks
+WHERE locktype = 'relation'
+  AND relation = 'pg_class'::regclass;
+
+-- Shouldn't timeout on 2pc decoding.
+SET statement_timeout = '1s';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+RESET statement_timeout;
+
+COMMIT PREPARED 'test_prepared_lock';
+
+-- will work normally after we commit
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test savepoints
+BEGIN;
+SAVEPOINT test_savepoint;
+CREATE TABLE test_prepared_savepoint (a int);
+PREPARE TRANSACTION 'test_prepared_savepoint';
+COMMIT PREPARED 'test_prepared_savepoint';
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- test that a GID containing "nodecode" gets decoded at commit prepared time
+BEGIN;
+INSERT INTO test_prepared1 VALUES (20);
+PREPARE TRANSACTION 'test_prepared_nodecode';
+-- should show nothing
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+COMMIT PREPARED 'test_prepared_nodecode';
+-- should be decoded now
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 -- cleanup
 DROP TABLE test_prepared1;
 DROP TABLE test_prepared2;
 
--- show results
+-- show results. There should be nothing to show
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 
 SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index e60ab34..149e7f6 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -11,12 +11,16 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include "miscadmin.h"
 
+#include "access/transam.h"
 #include "catalog/pg_type.h"
 
 #include "replication/logical.h"
 #include "replication/origin.h"
 
+#include "storage/procarray.h"
+
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -36,6 +40,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	TransactionId	check_xid; /* track abort of this txid */
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -49,6 +54,8 @@ static void pg_output_begin(LogicalDecodingContext *ctx,
 							bool last_write);
 static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
 								 ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
+static void pg_decode_abort_txn(LogicalDecodingContext *ctx,
+								ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
 static void pg_decode_change(LogicalDecodingContext *ctx,
 							 ReorderBufferTXN *txn, Relation rel,
 							 ReorderBufferChange *change);
@@ -88,6 +95,19 @@ static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
 									  ReorderBufferTXN *txn,
 									  int nrelations, Relation relations[],
 									  ReorderBufferChange *change);
+static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+									 ReorderBufferTXN *txn,
+									 TransactionId xid, const char *gid);
+static void pg_decode_prepare_txn(LogicalDecodingContext *ctx,
+								  ReorderBufferTXN *txn,
+								  XLogRecPtr prepare_lsn);
+static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx,
+										  ReorderBufferTXN *txn,
+										  XLogRecPtr commit_lsn);
+static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
+										 ReorderBufferTXN *txn,
+										 XLogRecPtr abort_lsn);
+
 
 void
 _PG_init(void)
@@ -106,6 +126,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->change_cb = pg_decode_change;
 	cb->truncate_cb = pg_decode_truncate;
 	cb->commit_cb = pg_decode_commit_txn;
+	cb->abort_cb = pg_decode_abort_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
 	cb->message_cb = pg_decode_message;
@@ -116,6 +137,11 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->stream_change_cb = pg_decode_stream_change;
 	cb->stream_message_cb = pg_decode_stream_message;
 	cb->stream_truncate_cb = pg_decode_stream_truncate;
+	cb->filter_prepare_cb = pg_decode_filter_prepare;
+	cb->prepare_cb = pg_decode_prepare_txn;
+	cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
+	cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
+
 }
 
 
@@ -136,11 +162,14 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->check_xid = InvalidTransactionId;
 
 	ctx->output_plugin_private = data;
 
 	opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
 	opt->receive_rewrites = false;
+	/* this plugin supports decoding of 2pc */
+	opt->enable_twophase = true;
 
 	foreach(option, ctx->output_plugin_options)
 	{
@@ -227,6 +256,32 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 						 errmsg("could not parse value \"%s\" for parameter \"%s\"",
 								strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "check-xid") == 0)
+		{
+			if (elem->arg)
+			{
+				errno = 0;
+				data->check_xid = (TransactionId)
+					strtoul(strVal(elem->arg), NULL, 0);
+
+				if (errno == EINVAL || errno == ERANGE)
+					ereport(FATAL,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("check-xid is not a valid number: \"%s\"",
+								strVal(elem->arg))));
+			}
+			else
+				ereport(FATAL,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("check-xid needs an input value")));
+
+			if (data->check_xid <= 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("Specify positive value for parameter \"%s\","
+								" you specified \"%s\"",
+								elem->defname, strVal(elem->arg))));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -297,6 +352,116 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	OutputPluginWrite(ctx, true);
 }
 
+/* ABORT callback */
+static void
+pg_decode_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+	if (data->include_xids)
+		appendStringInfo(ctx->out, "ABORT %u", txn->xid);
+	else
+		appendStringInfoString(ctx->out, "ABORT");
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/*
+ * Filter out two-phase transactions.
+ *
+ * Each plugin can implement its own filtering logic. Here
+ * we demonstrate a simple logic by checking the GID. If the
+ * GID contains the "_nodecode" substring, then we filter
+ * it out.
+ */
+static bool
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+						 TransactionId xid, const char *gid)
+{
+	if (strstr(gid, "_nodecode") != NULL)
+		return true;
+
+	return false;
+}
+
+/* PREPARE callback */
+static void
+pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+					  XLogRecPtr prepare_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (data->skip_empty_xacts && !data->xact_wrote_changes)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "PREPARE TRANSACTION %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* COMMIT PREPARED callback */
+static void
+pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							  XLogRecPtr commit_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "COMMIT PREPARED %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
+/* ABORT PREPARED callback */
+static void
+pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+							 XLogRecPtr abort_lsn)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "ROLLBACK PREPARED %s",
+					 quote_literal_cstr(txn->gid));
+
+	if (data->include_xids)
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_timestamp)
+		appendStringInfo(ctx->out, " (at %s)",
+						 timestamptz_to_str(txn->commit_time));
+
+	OutputPluginWrite(ctx, true);
+}
+
 static bool
 pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id)
@@ -455,6 +620,22 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	}
 	data->xact_wrote_changes = true;
 
+	/* if check_xid is specified */
+	if (TransactionIdIsValid(data->check_xid))
+	{
+		elog(LOG, "waiting for %u to abort", data->check_xid);
+		while (TransactionIdIsInProgress(data->check_xid))
+		{
+			CHECK_FOR_INTERRUPTS();
+			pg_usleep(10000L);
+		}
+		if (!TransactionIdIsInProgress(data->check_xid) &&
+			   !TransactionIdDidCommit(data->check_xid))
+			elog(LOG, "%u aborted", data->check_xid);
+
+		Assert(TransactionIdDidAbort(data->check_xid));
+	}
+
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
 
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 813a037..1cddfeb 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -386,7 +386,12 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeChangeCB change_cb;
     LogicalDecodeTruncateCB truncate_cb;
     LogicalDecodeCommitCB commit_cb;
+    LogicalDecodeAbortCB abort_cb;
     LogicalDecodeMessageCB message_cb;
+    LogicalDecodeFilterPrepareCB filter_prepare_cb;
+    LogicalDecodePrepareCB prepare_cb;
+    LogicalDecodeCommitPreparedCB commit_prepared_cb;
+    LogicalDecodeAbortPreparedCB abort_prepared_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
     LogicalDecodeStreamStartCB stream_start_cb;
@@ -477,7 +482,13 @@ CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
      never get
      decoded. Successful savepoints are
      folded into the transaction containing them in the order they were
-     executed within that transaction.
+     executed within that transaction. A transaction that is prepared for
+     a two-phase commit using <command>PREPARE TRANSACTION</command> will
+     also be decoded if the output plugin callbacks needed for decoding
+     them are provided. It is possible that the current transaction which
+     is being decoded is aborted concurrently via a <command>ROLLBACK PREPARED</command>
+     command. In that case, the logical decoding of this transaction will
+     be aborted too.
     </para>
 
     <note>
@@ -578,6 +589,71 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
      </para>
     </sect3>
 
+    <sect3 id="logicaldecoding-output-plugin-prepare">
+     <title>Transaction Prepare Callback</title>
+
+     <para>
+      The optional <function>prepare_cb</function> callback is called whenever
+      a transaction which is prepared for two-phase commit has been
+      decoded. The <function>change_cb</function> callbacks for all modified
+      rows will have been called before this, if there have been any modified
+      rows.
+<programlisting>
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+                                        ReorderBufferTXN *txn,
+                                        XLogRecPtr prepare_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-commit-prepared">
+     <title>Commit Prepared Transaction Callback</title>
+
+     <para>
+      The optional <function>commit_prepared_cb</function> callback is called whenever
+      a commit prepared transaction has been decoded. The <parameter>gid</parameter> field,
+      which is part of the <parameter>txn</parameter> parameter can be used in this
+      callback.
+<programlisting>
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+                                               ReorderBufferTXN *txn,
+                                               XLogRecPtr commit_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-abort-prepared">
+     <title>Rollback Prepared Transaction Callback</title>
+
+     <para>
+      The optional <function>abort_prepared_cb</function> callback is called whenever
+      a rollback prepared transaction has been decoded. The <parameter>gid</parameter> field,
+      which is part of the <parameter>txn</parameter> parameter can be used in this
+      callback.
+<programlisting>
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+                                              ReorderBufferTXN *txn,
+                                              XLogRecPtr abort_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-abort">
+     <title>Transaction Abort Callback</title>
+
+     <para>
+      The required <function>abort_cb</function> callback is called whenever
+      a transaction abort has to be initiated. This can happen if we are
+      decoding a transaction that has been prepared for two-phase commit and
+      a concurrent rollback happens while we are decoding it.
+<programlisting>
+typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx,
+                                       ReorderBufferTXN *txn,
+                                       XLogRecPtr abort_lsn);
+</programlisting>
+     </para>
+    </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-change">
      <title>Change Callback</title>
 
@@ -587,7 +663,13 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
       an <command>INSERT</command>, <command>UPDATE</command>,
       or <command>DELETE</command>. Even if the original command modified
       several rows at once the callback will be called individually for each
-      row.
+      row. The <function>change_cb</function> callback may access system or
+      user catalog tables to aid in the process of outputting the row
+      modification details. In case of decoding a prepared (but yet
+      uncommitted) transaction or decoding of an uncommitted transaction, this
+      change callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
 <programlisting>
 typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
@@ -664,6 +746,39 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ct
      </para>
      </sect3>
 
+     <sect3 id="logicaldecoding-output-plugin-filter-prepare">
+     <title>Prepare Filter Callback</title>
+
+     <para>
+       The optional <function>filter_prepare_cb</function> callback
+       is called to determine whether data that is part of the current
+       two-phase commit transaction should be considered for decode
+       at this prepare stage or as a regular one-phase transaction at
+       <command>COMMIT PREPARED</command> time later. To signal that
+       decoding should be skipped, return <literal>true</literal>;
+       <literal>false</literal> otherwise. When the callback is not
+       defined, <literal>false</literal> is assumed (i.e. nothing is
+       filtered).
+<programlisting>
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+                                              ReorderBufferTXN *txn,
+                                              TransactionId xid,
+                                              const char *gid);
+</programlisting>
+      The <parameter>ctx</parameter> parameter has the same contents
+      as for the other callbacks. The <parameter>txn</parameter> parameter
+      contains meta information about the transaction. The <parameter>xid</parameter>
+      contains the XID because <parameter>txn</parameter> can be NULL in some cases.
+      The <parameter>gid</parameter> is the identifier that later identifies this
+      transaction for <command>COMMIT PREPARED</command> or <command>ROLLBACK PREPARED</command>.
+     </para>
+     <para>
+      The callback has to provide the same static answer for a given combination of
+      <parameter>xid</parameter> and <parameter>gid</parameter> every time it is
+      called.
+     </para>
+     </sect3>
+
     <sect3 id="logicaldecoding-output-plugin-message">
      <title>Generic Message Callback</title>
 
@@ -685,7 +800,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
       non-transactional and the XID was not assigned yet in the transaction
       which logged the message. The <parameter>lsn</parameter> has WAL
       location of the message. The <parameter>transactional</parameter> says
-      if the message was sent as transactional or not.
+      if the message was sent as transactional or not. Similar to the change
+      callback, in case of decoding a prepared (but yet uncommitted)
+      transaction or decoding of an uncommitted transaction, this message 
+      callback might also error out due to simultaneous rollback of
+      this very same transaction. In that case, the logical decoding of this
+      aborted transaction is stopped gracefully.
+
       The <parameter>prefix</parameter> is arbitrary null-terminated prefix
       which can be used for identifying interesting messages for the current
       plugin. And finally the <parameter>message</parameter> parameter holds
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index f21f61d..63d5acf 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -70,6 +70,9 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						 xl_xact_parsed_commit *parsed, TransactionId xid);
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						xl_xact_parsed_abort *parsed, TransactionId xid);
+static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+						xl_xact_parsed_prepare * parsed);
+
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -312,17 +315,34 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			}
 			break;
 		case XLOG_XACT_PREPARE:
+			{
+				xl_xact_parsed_prepare parsed;
+				xl_xact_prepare *xlrec;
+				/* check that output plugin is capable of twophase decoding */
+				if (!ctx->options.enable_twophase)
+				{
+					ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
+					break;
+				}
 
-			/*
-			 * Currently decoding ignores PREPARE TRANSACTION and will just
-			 * decode the transaction when the COMMIT PREPARED is sent or
-			 * throw away the transaction's contents when a ROLLBACK PREPARED
-			 * is received. In the future we could add code to expose prepared
-			 * transactions in the changestream allowing for a kind of
-			 * distributed 2PC.
-			 */
-			ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
-			break;
+				/* ok, parse it */
+				xlrec = (xl_xact_prepare *)XLogRecGetData(r);
+				ParsePrepareRecord(XLogRecGetInfo(buf->record),
+									xlrec, &parsed);
+
+				/* does output plugin want this particular transaction? */
+				if (ctx->callbacks.filter_prepare_cb &&
+					ReorderBufferPrepareNeedSkip(reorder, parsed.twophase_xid,
+												parsed.twophase_gid))
+				{
+					ReorderBufferProcessXid(reorder, parsed.twophase_xid,
+											buf->origptr);
+					break;
+				}
+
+				DecodePrepare(ctx, buf, &parsed);
+				break;
+			}
 		default:
 			elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
 	}
@@ -647,9 +667,82 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 								 buf->origptr, buf->endptr);
 	}
 
+	/*
+	 * Decide if we're processing COMMIT PREPARED, or a regular COMMIT.
+	 * Regular commit simply triggers a replay of transaction changes from the
+	 * reorder buffer. For COMMIT PREPARED that however already happened at
+	 * PREPARE time, and so we only need to notify the subscriber that the GID
+	 * finally committed.
+	 *
+	 * For output plugins that do not support PREPARE-time decoding of
+	 * two-phase transactions, we never even see the PREPARE and all two-phase
+	 * transactions simply fall through to the second branch.
+	 */
+	if (TransactionIdIsValid(parsed->twophase_xid) &&
+		ReorderBufferTxnIsPrepared(ctx->reorder,
+								   parsed->twophase_xid, parsed->twophase_gid))
+	{
+		Assert(xid == parsed->twophase_xid);
+		/* we are processing COMMIT PREPARED */
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+									commit_time, origin_id, origin_lsn,
+									parsed->twophase_gid, true);
+	}
+	else
+	{
+		/* replay actions of all transaction + subtransactions in order */
+		ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
+							commit_time, origin_id, origin_lsn);
+	}
+}
+
+/*
+ * Decode PREPARE record. Similar logic as in COMMIT
+ */
+static void
+DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			  xl_xact_parsed_prepare * parsed)
+{
+	XLogRecPtr	origin_lsn = parsed->origin_lsn;
+	TimestampTz commit_time = parsed->origin_timestamp;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+	int			i;
+	TransactionId xid = parsed->twophase_xid;
+
+	/*
+	 * Process invalidation messages, even if we're not interested in the
+	 * transaction's contents, since the various caches need to always be
+	 * consistent.
+	 */
+	if (parsed->nmsgs > 0)
+	{
+		if (!ctx->fast_forward)
+			ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
+										  parsed->nmsgs, parsed->msgs);
+		ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+	}
+
+	/*
+	 * Tell the reorderbuffer about the surviving subtransactions. We need to
+	 * do this because the main transaction itself has not committed since we
+	 * are in the prepare phase right now. So we need to be sure the snapshot
+	 * is setup correctly for the main transaction in case all changes
+	 * happened in subtransanctions
+	 */
+	for (i = 0; i < parsed->nsubxacts; i++)
+	{
+		ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
+								 buf->origptr, buf->endptr);
+	}
+
+	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		(parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
+		ctx->fast_forward || FilterByOrigin(ctx, origin_id))
+		return;
+
 	/* replay actions of all transaction + subtransactions in order */
-	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time, origin_id, origin_lsn);
+	ReorderBufferPrepare(ctx->reorder, xid, buf->origptr, buf->endptr,
+						 commit_time, origin_id, origin_lsn, parsed->twophase_gid);
 }
 
 /*
@@ -661,6 +754,30 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid)
 {
 	int			i;
+	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
+	TimestampTz commit_time = 0;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
+
+	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+	{
+		origin_lsn = parsed->origin_lsn;
+		commit_time = parsed->origin_timestamp;
+	}
+
+	/*
+	 * If it's ROLLBACK PREPARED then handle it via callbacks.
+	 */
+	if (TransactionIdIsValid(xid) &&
+		!SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) &&
+		parsed->dbId == ctx->slot->data.database &&
+		!FilterByOrigin(ctx, origin_id) &&
+		ReorderBufferTxnIsPrepared(ctx->reorder, xid, parsed->twophase_gid))
+	{
+		ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
+									commit_time, origin_id, origin_lsn,
+									parsed->twophase_gid, false);
+		return;
+	}
 
 	for (i = 0; i < parsed->nsubxacts; i++)
 	{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 0f6af95..0ae5da7 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -58,6 +58,16 @@ static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
 static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
 static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							  XLogRecPtr commit_lsn);
+static void abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							 XLogRecPtr abort_lsn);
+static bool filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									  TransactionId xid, const char *gid);
+static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							   XLogRecPtr prepare_lsn);
+static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									   XLogRecPtr commit_lsn);
+static void abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+									  XLogRecPtr abort_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 							  Relation relation, ReorderBufferChange *change);
 static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
@@ -206,6 +216,11 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->apply_truncate = truncate_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->abort = abort_cb_wrapper;
+	ctx->reorder->filter_prepare = filter_prepare_cb_wrapper;
+	ctx->reorder->prepare = prepare_cb_wrapper;
+	ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
+	ctx->reorder->abort_prepared = abort_prepared_cb_wrapper;
 	ctx->reorder->message = message_cb_wrapper;
 
 	/*
@@ -782,6 +797,140 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 }
 
 static void
+abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				 XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "abort";
+	state.report_location = txn->final_lsn; /* beginning of abort record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* do the actual work: call callback */
+	ctx->callbacks.abort_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				   XLogRecPtr prepare_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "prepare";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* If the plugin support 2 phase commits then prepare callback is mandatory */
+	if (ctx->options.enable_twophase && ctx->callbacks.prepare_cb == NULL)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("Output plugin did not register prepare_cb callback")));
+
+	/* do the actual work: call callback */
+	ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						   XLogRecPtr commit_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "commit_prepared";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* If the plugin support 2 phase commits then commit prepared callback is mandatory */
+	if (ctx->options.enable_twophase && ctx->callbacks.commit_prepared_cb == NULL)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("Output plugin did not register commit_prepared_cb callback")));
+
+	/* do the actual work: call callback */
+	ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
+abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  XLogRecPtr abort_lsn)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "abort_prepared";
+	state.report_location = txn->final_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn->xid;
+	ctx->write_location = txn->end_lsn; /* points to the end of the record */
+
+	/* If the plugin support 2 phase commits then abort prepared callback is mandatory */
+	if (ctx->options.enable_twophase && ctx->callbacks.abort_prepared_cb == NULL)
+		ereport(ERROR,
+			(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("Output plugin did not register abort_prepared_cb callback")));
+
+	/* do the actual work: call callback */
+	ctx->callbacks.abort_prepared_cb(ctx, txn, abort_lsn);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+static void
 change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change)
 {
@@ -858,6 +1007,51 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 	error_context_stack = errcallback.previous;
 }
 
+static bool
+filter_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+						  TransactionId xid, const char *gid)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	/*
+	 * Skip if decoding of twophase at PREPARE time is not enabled. In that
+	 * case all twophase transactions are considered filtered out and will be
+	 * applied as regular transactions at COMMIT PREPARED.
+	 */
+	if (!ctx->options.enable_twophase)
+		return true;
+
+	/*
+	 * The filter_prepare callback is optional. When not supplied, all
+	 * prepared transactions should go through.
+	 */
+	if (!ctx->callbacks.filter_prepare_cb)
+		return false;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_prepare";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_prepare_cb(ctx, txn, xid, gid);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 bool
 filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 1975d62..d6556be 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -413,6 +413,11 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	}
 
 	/* free data that's contained */
+	if (txn->gid != NULL)
+	{
+		pfree(txn->gid);
+		txn->gid = NULL;
+	}
 
 	if (txn->tuplecid_hash != NULL)
 	{
@@ -1987,7 +1992,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			prev_lsn = change->lsn;
 
 			/* Set the current xid to detect concurrent aborts. */
-			if (streaming)
+			if (streaming || rbtxn_prepared(change->txn))
 			{
 				curtxn = change->txn;
 				SetupCheckXidLive(curtxn->xid);
@@ -2249,7 +2254,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					break;
 			}
 		}
-
 		/*
 		 * There's a speculative insertion remaining, just clean in up, it
 		 * can't have been successful, otherwise we'd gotten a confirmation
@@ -2278,7 +2282,24 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			}
 		}
 		else
-			rb->commit(rb, txn, commit_lsn);
+		{
+			/*
+			 * Call abort/commit/prepare callback, depending on the transaction
+ 			 * state.
+ 			 *
+ 			 * If the transaction aborted during apply (which currently can happen
+ 			 * only for prepared transactions), simply call the abort callback.
+ 			 *
+ 			 * Otherwise call either PREPARE (for twophase transactions) or COMMIT
+ 			 * (for regular ones).
+ 			 */
+			if (rbtxn_rollback(txn))
+				rb->abort(rb, txn, commit_lsn);
+			else if (rbtxn_prepared(txn))
+				rb->prepare(rb, txn, commit_lsn);
+			else
+				rb->commit(rb, txn, commit_lsn);
+		}
 
 		/* this is just a sanity check against bad output plugin behaviour */
 		if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
@@ -2361,8 +2382,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			 * This error can only occur when we are sending the data in
 			 * streaming mode and the streaming is not finished yet.
 			 */
-			Assert(streaming);
-			Assert(stream_started);
+			Assert(streaming || rbtxn_prepared(txn));
+			Assert(stream_started  || rbtxn_prepared(txn));
 
 			/* Cleanup the temporary error state. */
 			FlushErrorState();
@@ -2370,10 +2391,19 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			errdata = NULL;
 			curtxn->concurrent_abort = true;
 
-			/* Reset the TXN so that it is allowed to stream remaining data. */
-			ReorderBufferResetTXN(rb, txn, snapshot_now,
-								  command_id, prev_lsn,
-								  specinsert);
+			/* If streaming, reset the TXN so that it is allowed to stream remaining data. */
+			if (streaming && stream_started)
+			{
+				ReorderBufferResetTXN(rb, txn, snapshot_now,
+									  command_id, prev_lsn,
+									  specinsert);
+			}
+			else
+			{
+				elog(LOG, "stopping decoding of %s (%u)",
+						txn->gid[0] != '\0'? txn->gid:"", txn->xid);
+				rb->abort(rb, txn, commit_lsn);
+			}
 		}
 		else
 		{
@@ -2395,23 +2425,16 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
  * This interface is called once a toplevel commit is read for both streamed
  * as well as non-streamed transactions.
  */
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
-					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-					TimestampTz commit_time,
-					RepOriginId origin_id, XLogRecPtr origin_lsn)
+static void
+ReorderBufferCommitInternal(ReorderBufferTXN *txn,
+                            ReorderBuffer *rb, TransactionId xid,
+					        XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					        TimestampTz commit_time,
+					        RepOriginId origin_id, XLogRecPtr origin_lsn)
 {
-	ReorderBufferTXN *txn;
 	Snapshot	snapshot_now;
 	CommandId	command_id = FirstCommandId;
 
-	txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-								false);
-
-	/* unknown transaction, nothing to replay */
-	if (txn == NULL)
-		return;
-
 	txn->final_lsn = commit_lsn;
 	txn->end_lsn = end_lsn;
 	txn->commit_time = commit_time;
@@ -2453,6 +2476,140 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 }
 
 /*
+ * Ask output plugin whether we want to skip this PREPARE and send
+ * this transaction as a regular commit later.
+ */
+bool
+ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid, const char *gid)
+{
+   ReorderBufferTXN *txn;
+
+   txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
+
+   return rb->filter_prepare(rb, txn, xid, gid);
+}
+
+
+/*
+ * Commit a transaction.
+ *
+ * See comments for ReorderBufferCommitInternal()
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+                   XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+                   TimestampTz commit_time,
+                   RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+   ReorderBufferTXN *txn;
+
+   txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                               false);
+
+   /* unknown transaction, nothing to replay */
+   if (txn == NULL)
+       return;
+
+   ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+                               commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Prepare a twophase transaction. It calls ReorderBufferCommitInternal()
+ * since all prepared transactions need to be decoded at PREPARE time.
+ */
+void
+ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+                    XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+                    TimestampTz commit_time,
+                    RepOriginId origin_id, XLogRecPtr origin_lsn,
+                    char *gid)
+{
+   ReorderBufferTXN *txn;
+
+   txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                               false);
+
+   /* unknown transaction, nothing to replay */
+   if (txn == NULL)
+       return;
+
+   txn->txn_flags |= RBTXN_PREPARE;
+   txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */
+   strcpy(txn->gid, gid);
+
+   ReorderBufferCommitInternal(txn, rb, xid, commit_lsn, end_lsn,
+                               commit_time, origin_id, origin_lsn);
+}
+
+/*
+ * Check whether this transaction was sent as prepared to subscribers.
+ * Called while handling commit|abort prepared.
+ */
+bool
+ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+                          const char *gid)
+{
+   ReorderBufferTXN *txn;
+
+   txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                               false);
+
+   /*
+    * Always call the prepare filter. It's the job of the prepare filter to
+    * give us the *same* response for a given xid across multiple calls
+    * (including ones on restart)
+    */
+   return !(rb->filter_prepare(rb, txn, xid, gid));
+}
+
+/*
+ * Send standalone xact event. This is used to handle COMMIT/ABORT PREPARED.
+ */
+void
+ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+                           XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+                           TimestampTz commit_time,
+                           RepOriginId origin_id, XLogRecPtr origin_lsn,
+                           char *gid, bool is_commit)
+{
+   ReorderBufferTXN *txn;
+
+   /*
+    * The transaction may or may not exist (during restarts for example).
+    * Anyways, 2PC transactions do not contain any reorderbuffers. So allow
+    * it to be created below.
+    */
+   txn = ReorderBufferTXNByXid(rb, xid, true, NULL, commit_lsn,
+                               true);
+
+   txn->final_lsn = commit_lsn;
+   txn->end_lsn = end_lsn;
+   txn->commit_time = commit_time;
+   txn->origin_id = origin_id;
+   txn->origin_lsn = origin_lsn;
+   /* this txn is obviously prepared */
+   txn->txn_flags |= RBTXN_PREPARE;
+   txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */
+   strcpy(txn->gid, gid);
+
+   if (is_commit)
+   {
+       txn->txn_flags |= RBTXN_COMMIT_PREPARED;
+       rb->commit_prepared(rb, txn, commit_lsn);
+   }
+   else
+   {
+       txn->txn_flags |= RBTXN_ROLLBACK_PREPARED;
+       rb->abort_prepared(rb, txn, commit_lsn);
+   }
+
+   /* cleanup: make sure there's no cache pollution */
+   ReorderBufferExecuteInvalidations(rb, txn);
+   ReorderBufferCleanupTXN(rb, txn);
+}
+
+/*
  * Abort a transaction that possibly has previous changes. Needs to be first
  * called for subtransactions and then for the toplevel xid.
  *
@@ -2495,7 +2652,13 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
 	/* cosmetic... */
 	txn->final_lsn = lsn;
 
-	/* remove potential on-disk data, and deallocate */
+    /*
+     * remove potential on-disk data, and deallocate.
+     *
+     * We remove it even for prepared transactions (GID is enough to
+     * commit/abort those later).
+     */
+
 	ReorderBufferCleanupTXN(rb, txn);
 }
 
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index b78c796..f6ca87f 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -27,6 +27,7 @@ typedef struct OutputPluginOptions
 {
 	OutputPluginOutputType output_type;
 	bool		receive_rewrites;
+	bool		enable_twophase;
 } OutputPluginOptions;
 
 /*
@@ -78,6 +79,46 @@ typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
 									   XLogRecPtr commit_lsn);
 
 /*
+ * Called for an implicit ABORT of a transaction.
+ */
+typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx,
+									  ReorderBufferTXN *txn,
+									  XLogRecPtr abort_lsn);
+
+ /*
+  * Called before decoding of PREPARE record to decide whether this
+  * transaction should be decoded with separate calls to prepare and
+  * commit_prepared/abort_prepared callbacks or wait till COMMIT PREPARED and
+  * sent as usual transaction.
+  */
+typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+											  ReorderBufferTXN *txn,
+											  TransactionId xid,
+											  const char *gid);
+
+/*
+ * Called for PREPARE record unless it was filtered by filter_prepare()
+ * callback.
+ */
+typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
+										ReorderBufferTXN *txn,
+										XLogRecPtr prepare_lsn);
+
+/*
+ * Called for COMMIT PREPARED.
+ */
+typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
+											   ReorderBufferTXN *txn,
+											   XLogRecPtr commit_lsn);
+
+/*
+ * Called for ROLLBACK PREPARED.
+ */
+typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
+											  ReorderBufferTXN *txn,
+											  XLogRecPtr abort_lsn);
+
+/*
  * Called for the generic logical decoding messages.
  */
 typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
@@ -170,7 +211,12 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeTruncateCB truncate_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeAbortCB abort_cb;
 	LogicalDecodeMessageCB message_cb;
+	LogicalDecodeFilterPrepareCB filter_prepare_cb;
+	LogicalDecodePrepareCB prepare_cb;
+	LogicalDecodeCommitPreparedCB commit_prepared_cb;
+	LogicalDecodeAbortPreparedCB abort_prepared_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 	/* streaming of changes */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1ae17d5..820840a 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -10,6 +10,7 @@
 #define REORDERBUFFER_H
 
 #include "access/htup_details.h"
+#include "access/twophase.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
 #include "utils/hsearch.h"
@@ -162,9 +163,14 @@ typedef struct ReorderBufferChange
 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
 #define RBTXN_IS_SUBXACT          0x0002
 #define RBTXN_IS_SERIALIZED       0x0004
-#define RBTXN_IS_STREAMED         0x0008
-#define RBTXN_HAS_TOAST_INSERT    0x0010
-#define RBTXN_HAS_SPEC_INSERT     0x0020
+#define RBTXN_PREPARE             0x0008
+#define RBTXN_COMMIT_PREPARED     0x0010
+#define RBTXN_ROLLBACK_PREPARED   0x0020
+#define RBTXN_COMMIT              0x0040
+#define RBTXN_ROLLBACK            0x0080
+#define RBTXN_IS_STREAMED         0x0100
+#define RBTXN_HAS_TOAST_INSERT    0x0200
+#define RBTXN_HAS_SPEC_INSERT     0x0400
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -218,6 +224,17 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
 )
 
+/* is this txn prepared? */
+#define rbtxn_prepared(txn)            (txn->txn_flags & RBTXN_PREPARE)
+/* was this prepared txn committed in the meanwhile? */
+#define rbtxn_commit_prepared(txn)     (txn->txn_flags & RBTXN_COMMIT_PREPARED)
+/* was this prepared txn aborted in the meanwhile? */
+#define rbtxn_rollback_prepared(txn)   (txn->txn_flags & RBTXN_ROLLBACK_PREPARED)
+/* was this txn committed in the meanwhile? */
+#define rbtxn_commit(txn)              (txn->txn_flags & RBTXN_COMMIT)
+/* was this prepared txn aborted in the meanwhile? */
+#define rbtxn_rollback(txn)            (txn->txn_flags & RBTXN_ROLLBACK)
+
 typedef struct ReorderBufferTXN
 {
 	/* See above */
@@ -229,6 +246,9 @@ typedef struct ReorderBufferTXN
 	/* Xid of top-level transaction, if known */
 	TransactionId toplevel_xid;
 
+	/* In case of 2PC we need to pass GID to output plugin */
+	char         *gid;
+
 	/*
 	 * LSN of the first data carrying, WAL record with knowledge about this
 	 * xid. This is allowed to *not* be first record adorned with this xid, if
@@ -390,6 +410,39 @@ typedef void (*ReorderBufferCommitCB) (ReorderBuffer *rb,
 									   ReorderBufferTXN *txn,
 									   XLogRecPtr commit_lsn);
 
+/* abort callback signature */
+typedef void (*ReorderBufferAbortCB) (
+                                     ReorderBuffer *rb,
+                                     ReorderBufferTXN *txn,
+                                     XLogRecPtr abort_lsn);
+
+typedef bool (*ReorderBufferFilterPrepareCB) (
+                                             ReorderBuffer *rb,
+                                             ReorderBufferTXN *txn,
+                                             TransactionId xid,
+                                             const char *gid);
+
+/* prepare callback signature */
+typedef void (*ReorderBufferPrepareCB) (
+                                       ReorderBuffer *rb,
+                                       ReorderBufferTXN *txn,
+                                       XLogRecPtr prepare_lsn);
+
+/* commit prepared callback signature */
+typedef void (*ReorderBufferCommitPreparedCB) (
+                                              ReorderBuffer *rb,
+                                              ReorderBufferTXN *txn,
+                                              XLogRecPtr commit_lsn);
+
+/* abort prepared callback signature */
+typedef void (*ReorderBufferAbortPreparedCB) (
+                                             ReorderBuffer *rb,
+                                             ReorderBufferTXN *txn,
+                                             XLogRecPtr abort_lsn);
+
+
+
+
 /* message callback signature */
 typedef void (*ReorderBufferMessageCB) (ReorderBuffer *rb,
 										ReorderBufferTXN *txn,
@@ -482,6 +535,11 @@ struct ReorderBuffer
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferApplyTruncateCB apply_truncate;
 	ReorderBufferCommitCB commit;
+	ReorderBufferAbortCB abort;
+	ReorderBufferFilterPrepareCB filter_prepare;
+	ReorderBufferPrepareCB prepare;
+	ReorderBufferCommitPreparedCB commit_prepared;
+	ReorderBufferAbortPreparedCB abort_prepared;
 	ReorderBufferMessageCB message;
 
 	/*
@@ -548,6 +606,11 @@ void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapsho
 void		ReorderBufferCommit(ReorderBuffer *, TransactionId,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
+void		ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
+                           XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+                           TimestampTz commit_time,
+                           RepOriginId origin_id, XLogRecPtr origin_lsn,
+                           char *gid, bool is_commit);
 void		ReorderBufferAssignChild(ReorderBuffer *, TransactionId, TransactionId, XLogRecPtr commit_lsn);
 void		ReorderBufferCommitChild(ReorderBuffer *, TransactionId, TransactionId,
 									 XLogRecPtr commit_lsn, XLogRecPtr end_lsn);
@@ -571,6 +634,15 @@ void		ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog
 bool		ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
 bool		ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
 
+bool ReorderBufferPrepareNeedSkip(ReorderBuffer *rb, TransactionId xid,
+							 const char *gid);
+bool ReorderBufferTxnIsPrepared(ReorderBuffer *rb, TransactionId xid,
+						   const char *gid);
+void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
+					 XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+					 TimestampTz commit_time,
+					 RepOriginId origin_id, XLogRecPtr origin_lsn,
+					 char *gid);
 ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
 
-- 
1.8.3.1

