From 9ad3dda95b96137232aef4be7632389921830892 Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Thu, 2 Jul 2020 12:15:17 -0400
Subject: [PATCH 2/8] add binary column to pg_subscription support create and
 alter subcription with binary option

---
 src/backend/catalog/system_views.sql          |  2 +-
 src/backend/commands/subscriptioncmds.c       | 39 +++++++++++++++----
 .../libpqwalreceiver/libpqwalreceiver.c       |  3 ++
 src/backend/replication/logical/worker.c      |  1 +
 src/include/catalog/pg_subscription.h         |  4 ++
 src/include/replication/walreceiver.h         |  1 +
 6 files changed, 42 insertions(+), 8 deletions(-)

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 5314e9348f..9a853fe48d 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1125,7 +1125,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (subdbid, subname, subowner, subenabled, subslotname, subpublications)
+GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
     ON pg_subscription TO public;
 
 
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 9ebb026187..f2590ff565 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -59,7 +59,7 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 						   bool *enabled, bool *create_slot,
 						   bool *slot_name_given, char **slot_name,
 						   bool *copy_data, char **synchronous_commit,
-						   bool *refresh)
+						   bool *refresh, bool *binary_given, bool *binary)
 {
 	ListCell   *lc;
 	bool		connect_given = false;
@@ -90,6 +90,12 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 		*synchronous_commit = NULL;
 	if (refresh)
 		*refresh = true;
+	if (binary)
+	{
+		*binary_given = false;
+		/* not all versions of pgoutput will understand this option default to false */
+		*binary = false;
+	}
 
 	/* Parse options */
 	foreach(lc, options)
@@ -175,6 +181,11 @@ parse_subscription_options(List *options, bool *connect, bool *enabled_given,
 			refresh_given = true;
 			*refresh = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "binary") == 0 && binary)
+		{
+			*binary_given = true;
+			*binary = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -324,8 +335,10 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	bool		slotname_given;
 	char		originname[NAMEDATALEN];
 	bool		create_slot;
-	List	   *publications;
+	bool		binary;
+	bool		binary_given;
 
+	List	   *publications;
 	/*
 	 * Parse and check options.
 	 *
@@ -334,7 +347,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 	parse_subscription_options(stmt->options, &connect, &enabled_given,
 							   &enabled, &create_slot, &slotname_given,
 							   &slotname, &copy_data, &synchronous_commit,
-							   NULL);
+							   NULL, &binary_given, &binary);
 
 	/*
 	 * Since creating a replication slot is not transactional, rolling back
@@ -400,6 +413,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
 		DirectFunctionCall1(namein, CStringGetDatum(stmt->subname));
 	values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
 	values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
+	values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
 	values[Anum_pg_subscription_subconninfo - 1] =
 		CStringGetTextDatum(conninfo);
 	if (slotname)
@@ -669,10 +683,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 				char	   *slotname;
 				bool		slotname_given;
 				char	   *synchronous_commit;
+				bool		binary_given;
+				bool		binary;
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, &slotname_given, &slotname,
-										   NULL, &synchronous_commit, NULL);
+										   NULL, &synchronous_commit, NULL,
+										   &binary_given, &binary);
 
 				if (slotname_given)
 				{
@@ -697,6 +714,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 					replaces[Anum_pg_subscription_subsynccommit - 1] = true;
 				}
 
+				if (binary_given)
+				{
+				values[Anum_pg_subscription_subbinary - 1] =
+					BoolGetDatum(binary);
+					replaces[Anum_pg_subscription_subbinary - 1] = true;
+				}
+
 				update_tuple = true;
 				break;
 			}
@@ -708,7 +732,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL,
 										   &enabled_given, &enabled, NULL,
-										   NULL, NULL, NULL, NULL, NULL);
+										   NULL, NULL, NULL, NULL, NULL,
+										   NULL, NULL);
 				Assert(enabled_given);
 
 				if (!sub->slotname && enabled)
@@ -746,7 +771,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, &refresh);
+										   NULL, &refresh, NULL, NULL);
 
 				values[Anum_pg_subscription_subpublications - 1] =
 					publicationListToArray(stmt->publication);
@@ -783,7 +808,7 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
 
 				parse_subscription_options(stmt->options, NULL, NULL, NULL,
 										   NULL, NULL, NULL, &copy_data,
-										   NULL, NULL);
+										   NULL, NULL, NULL, NULL);
 
 				AlterSubscription_refresh(sub, copy_data);
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e4fd1f9bb6..2a9d966ec1 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -402,6 +402,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		char	   *pubnames_str;
 		List	   *pubnames;
 		char	   *pubnames_literal;
+		bool		binary;
 
 		appendStringInfoString(&cmd, " (");
 
@@ -423,6 +424,8 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
 		PQfreemem(pubnames_literal);
 		pfree(pubnames_str);
+		if (options->proto.logical.binary)
+			appendStringInfo(&cmd, ", binary 'true'");
 
 		appendStringInfoChar(&cmd, ')');
 	}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c958c4d8a7..b46e8991cd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2164,6 +2164,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.slotname = myslotname;
 	options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
 	options.proto.logical.publication_names = MySubscription->publications;
+	options.proto.logical.binary = MySubscription->binary;
 
 	/* Start normal logical streaming replication. */
 	walrcv_startstreaming(wrconn, &options);
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index 0a756d42d8..100789a52f 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -48,6 +48,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subenabled;		/* True if the subscription is enabled (the
 								 * worker should be running) */
 
+	bool		subbinary;		/* True if the subscription wants the
+								 * output plugin to send data in binary */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
 	text		subconninfo BKI_FORCE_NOT_NULL;
@@ -73,6 +76,7 @@ typedef struct Subscription
 	char	   *name;			/* Name of the subscription */
 	Oid			owner;			/* Oid of the subscription owner */
 	bool		enabled;		/* Indicates if the subscription is enabled */
+	bool		binary;			/* Indicates if the subscription wants data in binary format */
 	char	   *conninfo;		/* Connection string to the publisher */
 	char	   *slotname;		/* Name of the replication slot */
 	char	   *synccommit;		/* Synchronous commit setting for worker */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index c75dcebea0..1ee316f6d7 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -177,6 +177,7 @@ typedef struct
 		{
 			uint32		proto_version;	/* Logical protocol version */
 			List	   *publication_names;	/* String list of publications */
+			bool		binary;				/* Ask publisher output plugin to use binary */
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
-- 
2.20.1 (Apple Git-117)

