From 6b15ff7e035587277c440ae7dd0218db339c6006 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Mon, 25 Oct 2021 15:41:43 -0400
Subject: [PATCH v9 1/3] Modify pg_basebackup to use a new COPY subprotocol for
 base backups.

In the new approach, all files across all tablespaces are sent in a
single COPY OUT operation. The CopyData messages are no longer raw
archive content; rather, each message is prefixed with a type byte
that describes its purpose, e.g. 'n' signifies the start of a new
archive and 'd' signifies archive or manifest data. This protocol
is significantly more extensible than the old approach, since we can
later create more message types, though not without concern for
backward compatibility.

The new protocol sends a few things to the client that the old one
did not. First, it sends the name of each archive explicitly, instead
of letting the client compute it. This is intended to make it easier
to write future patches that might send archives in a format other
that tar (e.g. cpio, pax, tar.gz). Second, it sends explicit progress
messages rather than allowing the client to assume that progress is
defined by the number of bytes received. This will help with future
features where the server compresses the data, or sends it someplace
directly rather than transmitting it to the client.

When the new protocol is used, the server generates properly terminated
tar archives, in contrast to the old one which intentionally leaves out
the two blocks of zero bytes that are supposed to occur at the end of
each tar file. Any verison of pg_basebackup new enough to support the
new protocol is also smart enough not to be confused by these padding
blocks, so we need not propagate this kluge.

The old protocol is still supported for compatibility with previous
releases. The new protocol is selected by means of a new
TARGET option to the BASE_BACKUP command. Currently, the
only supported target is 'client'. Support for additional
targets will be added in a later commit.
---
 src/backend/replication/basebackup.c      |  61 ++-
 src/backend/replication/basebackup_copy.c | 277 +++++++++++++-
 src/bin/pg_basebackup/pg_basebackup.c     | 443 +++++++++++++++++++---
 src/include/replication/basebackup_sink.h |   1 +
 4 files changed, 728 insertions(+), 54 deletions(-)

diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 38c82c4619..096455ad02 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -53,6 +53,12 @@
  */
 #define SINK_BUFFER_LENGTH			Max(32768, BLCKSZ)
 
+typedef enum
+{
+	BACKUP_TARGET_COMPAT,
+	BACKUP_TARGET_CLIENT
+} backup_target_type;
+
 typedef struct
 {
 	const char *label;
@@ -62,6 +68,7 @@ typedef struct
 	bool		includewal;
 	uint32		maxrate;
 	bool		sendtblspcmapfile;
+	backup_target_type target;
 	backup_manifest_option manifest;
 	pg_checksum_type manifest_checksum_type;
 } basebackup_options;
@@ -81,6 +88,7 @@ static int64 _tarWriteHeader(bbsink *sink, const char *filename,
 							 const char *linktarget, struct stat *statbuf,
 							 bool sizeonly);
 static void _tarWritePadding(bbsink *sink, int len);
+static void _tarEndArchive(bbsink *sink, backup_target_type target);
 static void convert_link_to_directory(const char *pathbuf, struct stat *statbuf);
 static void perform_base_backup(basebackup_options *opt, bbsink *sink);
 static void parse_basebackup_options(List *options, basebackup_options *opt);
@@ -374,7 +382,10 @@ perform_base_backup(basebackup_options *opt, bbsink *sink)
 				Assert(lnext(state.tablespaces, lc) == NULL);
 			}
 			else
+			{
+				_tarEndArchive(sink, opt->target);
 				bbsink_end_archive(sink);
+			}
 		}
 
 		basebackup_progress_wait_wal_archive(&state);
@@ -611,6 +622,7 @@ perform_base_backup(basebackup_options *opt, bbsink *sink)
 			sendFileWithContent(sink, pathbuf, "", &manifest);
 		}
 
+		_tarEndArchive(sink, opt->target);
 		bbsink_end_archive(sink);
 	}
 
@@ -678,8 +690,10 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_noverify_checksums = false;
 	bool		o_manifest = false;
 	bool		o_manifest_checksums = false;
+	bool		o_target = false;
 
 	MemSet(opt, 0, sizeof(*opt));
+	opt->target = BACKUP_TARGET_COMPAT;
 	opt->manifest = MANIFEST_OPTION_NO;
 	opt->manifest_checksum_type = CHECKSUM_TYPE_CRC32C;
 
@@ -820,6 +834,22 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 								optval)));
 			o_manifest_checksums = true;
 		}
+		else if (strcmp(defel->defname, "target") == 0)
+		{
+			char	   *optval = defGetString(defel);
+
+			if (o_target)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+			if (strcmp(optval, "client") == 0)
+				opt->target = BACKUP_TARGET_CLIENT;
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("unrecognized target: \"%s\"", optval)));
+			o_target = true;
+		}
 		else
 			ereport(ERROR,
 					errcode(ERRCODE_SYNTAX_ERROR),
@@ -865,8 +895,15 @@ SendBaseBackup(BaseBackupCmd *cmd)
 		set_ps_display(activitymsg);
 	}
 
-	/* Create a basic basebackup sink. */
-	sink = bbsink_copytblspc_new();
+	/*
+	 * If the TARGET option was specified, then we can use the new copy-stream
+	 * protocol. If not, we must fall back to the old and less capable
+	 * copy-tablespace protocol.
+	 */
+	if (opt.target != BACKUP_TARGET_COMPAT)
+		sink = bbsink_copystream_new();
+	else
+		sink = bbsink_copytblspc_new();
 
 	/* Set up network throttling, if client requested it */
 	if (opt.maxrate > 0)
@@ -1695,6 +1732,26 @@ _tarWritePadding(bbsink *sink, int len)
 	}
 }
 
+/*
+ * Tar archives are supposed to end with two blocks of zeroes, so add those,
+ * unless we're using the old copy-tablespace protocol. In that system, the
+ * server must not properly terminate the client archive, and the client is
+ * instead responsible for adding those two blocks of zeroes.
+ */
+static void
+_tarEndArchive(bbsink *sink, backup_target_type target)
+{
+	if (target != BACKUP_TARGET_COMPAT)
+	{
+		/* See comments in _tarWriteHeader for why this must be true. */
+		Assert(sink->bbs_buffer_length >= TAR_BLOCK_SIZE);
+
+		MemSet(sink->bbs_buffer, 0, TAR_BLOCK_SIZE);
+		bbsink_archive_contents(sink, TAR_BLOCK_SIZE);
+		bbsink_archive_contents(sink, TAR_BLOCK_SIZE);
+	}
+}
+
 /*
  * If the entry in statbuf is a link, then adjust statbuf to make it look like a
  * directory, so that it will be written that way.
diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c
index 30bab4546e..57183f4d46 100644
--- a/src/backend/replication/basebackup_copy.c
+++ b/src/backend/replication/basebackup_copy.c
@@ -1,8 +1,27 @@
 /*-------------------------------------------------------------------------
  *
  * basebackup_copy.c
- *	  send basebackup archives using one COPY OUT operation per
- *	  tablespace, and an additional COPY OUT for the backup manifest
+ *	  send basebackup archives using COPY OUT
+ *
+ * We have two different ways of doing this.
+ *
+ * 'copytblspc' is an older method still supported for compatibility
+ * with releases prior to v15. In this method, a separate COPY OUT
+ * operation is used for each tablespace. The manifest, if it is sent,
+ * uses an additional COPY OUT operation.
+ *
+ * 'copystream' sends a starts a single COPY OUT operation and transmits
+ * all the archives and the manifest if present during the course of that
+ * single COPY OUT. Each CopyData message begins with a type byte,
+ * allowing us to signal the start of a new archive, or the manifest,
+ * by some means other than ending the COPY stream. This also allows
+ * this protocol to be extended more easily, since we can include
+ * arbitrary information in the message stream as long as we're certain
+ * that the client will know what to do with it.
+ *
+ * Regardless of which method is used, we sent a result set with
+ * information about the tabelspaces to be included in the backup before
+ * starting COPY OUT. This result has the same format in every method.
  *
  * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
  *
@@ -18,6 +37,52 @@
 #include "libpq/pqformat.h"
 #include "replication/basebackup.h"
 #include "replication/basebackup_sink.h"
+#include "utils/timestamp.h"
+
+typedef struct bbsink_copystream
+{
+	/* Common information for all types of sink. */
+	bbsink		base;
+
+	/*
+	 * Protocol message buffer. We assemble CopyData protocol messages by
+	 * setting the first character of this buffer to 'd' (archive or manifest
+	 * data) and then making base.bbs_buffer point to the second character so
+	 * that the rest of the data gets copied into the message just where we
+	 * want it.
+	 */
+	char	   *msgbuffer;
+
+	/*
+	 * When did we last report progress to the client, and how much progress
+	 * did we report?
+	 */
+	TimestampTz last_progress_report_time;
+	uint64		bytes_done_at_last_time_check;
+} bbsink_copystream;
+
+/*
+ * We don't want to send progress messages to the client excessively
+ * frequently. Ideally, we'd like to send a message when the time since the
+ * last message reaches PROGRESS_REPORT_MILLISECOND_THRESHOLD, but checking
+ * the system time every time we send a tiny bit of data seems too expensive.
+ * So we only check it after the number of bytes sine the last check reaches
+ * PROGRESS_REPORT_BYTE_INTERVAL.
+ */
+#define	PROGRESS_REPORT_BYTE_INTERVAL				65536
+#define PROGRESS_REPORT_MILLISECOND_THRESHOLD		1000
+
+static void bbsink_copystream_begin_backup(bbsink *sink);
+static void bbsink_copystream_begin_archive(bbsink *sink,
+											const char *archive_name);
+static void bbsink_copystream_archive_contents(bbsink *sink, size_t len);
+static void bbsink_copystream_end_archive(bbsink *sink);
+static void bbsink_copystream_begin_manifest(bbsink *sink);
+static void bbsink_copystream_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_copystream_end_manifest(bbsink *sink);
+static void bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
+										 TimeLineID endtli);
+static void bbsink_copystream_cleanup(bbsink *sink);
 
 static void bbsink_copytblspc_begin_backup(bbsink *sink);
 static void bbsink_copytblspc_begin_archive(bbsink *sink,
@@ -38,6 +103,18 @@ static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
 static void SendTablespaceList(List *tablespaces);
 static void send_int8_string(StringInfoData *buf, int64 intval);
 
+const bbsink_ops bbsink_copystream_ops = {
+	.begin_backup = bbsink_copystream_begin_backup,
+	.begin_archive = bbsink_copystream_begin_archive,
+	.archive_contents = bbsink_copystream_archive_contents,
+	.end_archive = bbsink_copystream_end_archive,
+	.begin_manifest = bbsink_copystream_begin_manifest,
+	.manifest_contents = bbsink_copystream_manifest_contents,
+	.end_manifest = bbsink_copystream_end_manifest,
+	.end_backup = bbsink_copystream_end_backup,
+	.cleanup = bbsink_copystream_cleanup
+};
+
 const bbsink_ops bbsink_copytblspc_ops = {
 	.begin_backup = bbsink_copytblspc_begin_backup,
 	.begin_archive = bbsink_copytblspc_begin_archive,
@@ -50,6 +127,202 @@ const bbsink_ops bbsink_copytblspc_ops = {
 	.cleanup = bbsink_copytblspc_cleanup
 };
 
+/*
+ * Create a new 'copystream' bbsink.
+ */
+bbsink *
+bbsink_copystream_new(void)
+{
+	bbsink_copystream *sink = palloc0(sizeof(bbsink_copystream));
+
+	*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_copystream_ops;
+
+	/* Set up for periodic progress reporting. */
+	sink->last_progress_report_time = GetCurrentTimestamp();
+	sink->bytes_done_at_last_time_check = UINT64CONST(0);
+
+	return &sink->base;
+}
+
+/*
+ * Send start-of-backup wire protocol messages.
+ */
+static void
+bbsink_copystream_begin_backup(bbsink *sink)
+{
+	bbsink_copystream *mysink = (bbsink_copystream *) sink;
+	bbsink_state *state = sink->bbs_state;
+
+	/*
+	 * Initialize buffer. We ultimately want to send the archive and manifest
+	 * data by means of CopyData messages where the payload portion of each
+	 * message begins with a type byte, so we set up a buffer that begins with
+	 * a the type byte we're going to need, and then arrange things so that
+	 * the data we're given will be written just after that type byte. That
+	 * will allow us to ship the data with a single call to pq_putmessage and
+	 * without needing any extra copying.
+	 */
+	mysink->msgbuffer = palloc(mysink->base.bbs_buffer_length + 1);
+	mysink->base.bbs_buffer = mysink->msgbuffer + 1;
+	mysink->msgbuffer[0] = 'd'; /* archive or manifest data */
+
+	/* Tell client the backup start location. */
+	SendXlogRecPtrResult(state->startptr, state->starttli);
+
+	/* Send client a list of tablespaces. */
+	SendTablespaceList(state->tablespaces);
+
+	/* Send a CommandComplete message */
+	pq_puttextmessage('C', "SELECT");
+
+	/* Begin COPY stream. This will be used for all archives + manifest. */
+	SendCopyOutResponse();
+}
+
+/*
+ * Send a CopyData message announcing the beginning of a new archive.
+ */
+static void
+bbsink_copystream_begin_archive(bbsink *sink, const char *archive_name)
+{
+	bbsink_state *state = sink->bbs_state;
+	tablespaceinfo *ti;
+	StringInfoData buf;
+
+	ti = list_nth(state->tablespaces, state->tablespace_num);
+	pq_beginmessage(&buf, 'd'); /* CopyData */
+	pq_sendbyte(&buf, 'n');		/* New archive */
+	pq_sendstring(&buf, archive_name);
+	pq_sendstring(&buf, ti->path == NULL ? "" : ti->path);
+	pq_endmessage(&buf);
+}
+
+/*
+ * Send a CopyData message containing a chunk of archive content.
+ */
+static void
+bbsink_copystream_archive_contents(bbsink *sink, size_t len)
+{
+	bbsink_copystream *mysink = (bbsink_copystream *) sink;
+	bbsink_state *state = mysink->base.bbs_state;
+	StringInfoData buf;
+	uint64		targetbytes;
+
+	/* Send the archive content to the client (with leading type byte). */
+	pq_putmessage('d', mysink->msgbuffer, len + 1);
+
+	/* Consider whether to send a progress report to the client. */
+	targetbytes = mysink->bytes_done_at_last_time_check
+		+ PROGRESS_REPORT_BYTE_INTERVAL;
+	if (targetbytes <= state->bytes_done)
+	{
+		TimestampTz now = GetCurrentTimestamp();
+		long		ms;
+
+		/*
+		 * OK, we've sent a decent number of bytes, so check the system time
+		 * to see whether we're due to send a progress report.
+		 */
+		mysink->bytes_done_at_last_time_check = state->bytes_done;
+		ms = TimestampDifferenceMilliseconds(mysink->last_progress_report_time,
+											 now);
+
+		/*
+		 * Send a progress report if enough time has passed. Also send one if
+		 * the system clock was set backward, so that such occurrences don't
+		 * have the effect of suppressing further progress messages.
+		 */
+		if (ms < 0 || ms >= PROGRESS_REPORT_MILLISECOND_THRESHOLD)
+		{
+			mysink->last_progress_report_time = now;
+
+			pq_beginmessage(&buf, 'd'); /* CopyData */
+			pq_sendbyte(&buf, 'p'); /* Progress report */
+			pq_sendint64(&buf, state->bytes_done);
+			pq_endmessage(&buf);
+			pq_flush_if_writable();
+		}
+	}
+}
+
+/*
+ * We don't need to explicitly signal the end of the archive; the client
+ * will figure out that we've reached the end when we begin the next one,
+ * or begin the manifest, or end the COPY stream. However, this seems like
+ * a good time to force out a progress report. One reason for that is that
+ * if this is the last archive, and we don't force a progress report now,
+ * the client will never be told that we sent all the bytes.
+ */
+static void
+bbsink_copystream_end_archive(bbsink *sink)
+{
+	bbsink_copystream *mysink = (bbsink_copystream *) sink;
+	bbsink_state *state = mysink->base.bbs_state;
+	StringInfoData buf;
+
+	mysink->bytes_done_at_last_time_check = state->bytes_done;
+	mysink->last_progress_report_time = GetCurrentTimestamp();
+	pq_beginmessage(&buf, 'd'); /* CopyData */
+	pq_sendbyte(&buf, 'p');		/* Progress report */
+	pq_sendint64(&buf, state->bytes_done);
+	pq_endmessage(&buf);
+	pq_flush_if_writable();
+}
+
+/*
+ * Send a CopyData message announcing the beginning of the backup manifest.
+ */
+static void
+bbsink_copystream_begin_manifest(bbsink *sink)
+{
+	StringInfoData buf;
+
+	pq_beginmessage(&buf, 'd'); /* CopyData */
+	pq_sendbyte(&buf, 'm');		/* Manifest */
+	pq_endmessage(&buf);
+}
+
+/*
+ * Each chunk of manifest data is sent using a CopyData message.
+ */
+static void
+bbsink_copystream_manifest_contents(bbsink *sink, size_t len)
+{
+	bbsink_copystream *mysink = (bbsink_copystream *) sink;
+
+	/* Send the manifest content to the client (with leading type byte). */
+	pq_putmessage('d', mysink->msgbuffer, len + 1);
+}
+
+/*
+ * We don't need an explicit terminator for the backup manifest.
+ */
+static void
+bbsink_copystream_end_manifest(bbsink *sink)
+{
+	/* Do nothing. */
+}
+
+/*
+ * Send end-of-backup wire protocol messages.
+ */
+static void
+bbsink_copystream_end_backup(bbsink *sink, XLogRecPtr endptr,
+							 TimeLineID endtli)
+{
+	SendCopyDone();
+	SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * Cleanup.
+ */
+static void
+bbsink_copystream_cleanup(bbsink *sink)
+{
+	/* Nothing to do. */
+}
+
 /*
  * Create a new 'copytblspc' bbsink.
  */
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 169afa5645..ffeb6a3117 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -54,6 +54,16 @@ typedef struct TablespaceList
 	TablespaceListCell *tail;
 } TablespaceList;
 
+typedef struct ArchiveStreamState
+{
+	int			tablespacenum;
+	bbstreamer *streamer;
+	bbstreamer *manifest_inject_streamer;
+	PQExpBuffer manifest_buffer;
+	char		manifest_filename[MAXPGPATH];
+	FILE	   *manifest_file;
+} ArchiveStreamState;
+
 typedef struct WriteTarState
 {
 	int			tablespacenum;
@@ -167,6 +177,13 @@ static void progress_report(int tablespacenum, bool force, bool finished);
 static bbstreamer *CreateBackupStreamer(char *archive_name, char *spclocation,
 										bbstreamer **manifest_inject_streamer_p,
 										bool is_recovery_guc_supported);
+static void ReceiveArchiveStreamChunk(size_t r, char *copybuf,
+									  void *callback_data);
+static char GetCopyDataByte(size_t r, char *copybuf, size_t *cursor);
+static char *GetCopyDataString(size_t r, char *copybuf, size_t *cursor);
+static uint64 GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor);
+static void GetCopyDataEnd(size_t r, char *copybuf, size_t cursor);
+static void ReportCopyDataParseError(size_t r, char *copybuf);
 static void ReceiveTarFile(PGconn *conn, char *archive_name, char *spclocation,
 						   bool tablespacenum);
 static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
@@ -981,10 +998,11 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 
 	/*
 	 * We have to parse the archive if (1) we're suppose to extract it, or if
-	 * (2) we need to inject backup_manifest or recovery configuration into it.
+	 * (2) we need to inject backup_manifest or recovery configuration into
+	 * it.
 	 */
 	must_parse_archive = (format == 'p' || inject_manifest ||
-		(spclocation == NULL && writerecoveryconf));
+						  (spclocation == NULL && writerecoveryconf));
 
 	if (format == 'p')
 	{
@@ -1011,8 +1029,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 		/*
 		 * In tar format, we just write the archive without extracting it.
 		 * Normally, we write it to the archive name provided by the caller,
-		 * but when the base directory is "-" that means we need to write
-		 * to standard output.
+		 * but when the base directory is "-" that means we need to write to
+		 * standard output.
 		 */
 		if (strcmp(basedir, "-") == 0)
 		{
@@ -1052,16 +1070,16 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	}
 
 	/*
-	 * If we're supposed to inject the backup manifest into the results,
-	 * it should be done here, so that the file content can be injected
-	 * directly, without worrying about the details of the tar format.
+	 * If we're supposed to inject the backup manifest into the results, it
+	 * should be done here, so that the file content can be injected directly,
+	 * without worrying about the details of the tar format.
 	 */
 	if (inject_manifest)
 		manifest_inject_streamer = streamer;
 
 	/*
-	 * If this is the main tablespace and we're supposed to write
-	 * recovery information, arrange to do that.
+	 * If this is the main tablespace and we're supposed to write recovery
+	 * information, arrange to do that.
 	 */
 	if (spclocation == NULL && writerecoveryconf)
 	{
@@ -1072,8 +1090,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	}
 
 	/*
-	 * If we're doing anything that involves understanding the contents of
-	 * the archive, we'll need to parse it.
+	 * If we're doing anything that involves understanding the contents of the
+	 * archive, we'll need to parse it.
 	 */
 	if (must_parse_archive)
 		streamer = bbstreamer_tar_parser_new(streamer);
@@ -1083,6 +1101,317 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	return streamer;
 }
 
+/*
+ * Receive all of the archives the server wants to send - and the backup
+ * manifest if present - as a single COPY stream.
+ */
+static void
+ReceiveArchiveStream(PGconn *conn)
+{
+	ArchiveStreamState state;
+
+	/* Set up initial state. */
+	memset(&state, 0, sizeof(state));
+	state.tablespacenum = -1;
+
+	/* All the real work happens in ReceiveArchiveStreamChunk. */
+	ReceiveCopyData(conn, ReceiveArchiveStreamChunk, &state);
+
+	/* If we wrote the backup manifest to a file, close the file. */
+	if (state.manifest_file !=NULL)
+	{
+		fclose(state.manifest_file);
+		state.manifest_file = NULL;
+	}
+
+	/*
+	 * If we buffered the backup manifest in order to inject it into the
+	 * output tarfile, do that now.
+	 */
+	if (state.manifest_inject_streamer != NULL &&
+		state.manifest_buffer != NULL)
+	{
+		bbstreamer_inject_file(state.manifest_inject_streamer,
+							   "backup_manifest",
+							   state.manifest_buffer->data,
+							   state.manifest_buffer->len);
+		destroyPQExpBuffer(state.manifest_buffer);
+		state.manifest_buffer = NULL;
+	}
+
+	/* If there's still an archive in progress, end processing. */
+	if (state.streamer != NULL)
+	{
+		bbstreamer_finalize(state.streamer);
+		bbstreamer_free(state.streamer);
+		state.streamer = NULL;
+	}
+}
+
+/*
+ * Receive one chunk of data sent by the server as part of a single COPY
+ * stream that includes all archives and the manifest.
+ */
+static void
+ReceiveArchiveStreamChunk(size_t r, char *copybuf, void *callback_data)
+{
+	ArchiveStreamState *state = callback_data;
+	size_t		cursor = 0;
+
+	/* Each CopyData message begins with a type byte. */
+	switch (GetCopyDataByte(r, copybuf, &cursor))
+	{
+		case 'n':
+			{
+				/* New archive. */
+				char	   *archive_name;
+				char	   *spclocation;
+
+				/*
+				 * We force a progress report at the end of each tablespace. A
+				 * new tablespace starts when the previous one ends, except in
+				 * the case of the very first one.
+				 */
+				if (++state->tablespacenum > 0)
+					progress_report(state->tablespacenum, true, false);
+
+				/* Sanity check. */
+				if (state->manifest_buffer != NULL ||
+					state->manifest_file !=NULL)
+				{
+					pg_log_error("archives should precede manifest");
+					exit(1);
+				}
+
+				/* Parse the rest of the CopyData message. */
+				archive_name = GetCopyDataString(r, copybuf, &cursor);
+				spclocation = GetCopyDataString(r, copybuf, &cursor);
+				GetCopyDataEnd(r, copybuf, cursor);
+
+				/*
+				 * Basic sanity checks on the archive name: it shouldn't be
+				 * empty, it shouldn't start with a dot, and it shouldn't
+				 * contain a path separator.
+				 */
+				if (archive_name[0] == '\0' || archive_name[0] == '.' ||
+					strchr(archive_name, '/') != NULL ||
+					strchr(archive_name, '\\') != NULL)
+				{
+					pg_log_error("invalid archive name: \"%s\"",
+								 archive_name);
+					exit(1);
+				}
+
+				/*
+				 * An empty spclocation is treated as NULL. We expect this
+				 * case to occur for the data directory itself, but not for
+				 * any archives that correspond to tablespaces.
+				 */
+				if (spclocation[0] == '\0')
+					spclocation = NULL;
+
+				/* End processing of any prior archive. */
+				if (state->streamer != NULL)
+				{
+					bbstreamer_finalize(state->streamer);
+					bbstreamer_free(state->streamer);
+					state->streamer = NULL;
+				}
+
+				/*
+				 * Create an appropriate backup streamer. We know that
+				 * recovery GUCs are supported, because this protocol can only
+				 * be used on v15+.
+				 */
+				state->streamer =
+					CreateBackupStreamer(archive_name,
+										 spclocation,
+										 &state->manifest_inject_streamer,
+										 true);
+				break;
+			}
+
+		case 'd':
+			{
+				/* Archive or manifest data. */
+				if (state->manifest_buffer != NULL)
+				{
+					/* Manifest data, buffer in memory. */
+					appendPQExpBuffer(state->manifest_buffer, copybuf + 1,
+									  r - 1);
+				}
+				else if (state->manifest_file !=NULL)
+				{
+					/* Manifest data, write to disk. */
+					if (fwrite(copybuf + 1, r - 1, 1,
+							   state->manifest_file) != 1)
+					{
+						/*
+						 * If fwrite() didn't set errno, assume that the
+						 * problem is that we're out of disk space.
+						 */
+						if (errno == 0)
+							errno = ENOSPC;
+						pg_log_error("could not write to file \"%s\": %m",
+									 state->manifest_filename);
+						exit(1);
+					}
+				}
+				else if (state->streamer != NULL)
+				{
+					/* Archive data. */
+					bbstreamer_content(state->streamer, NULL, copybuf + 1,
+									   r - 1, BBSTREAMER_UNKNOWN);
+				}
+				else
+				{
+					pg_log_error("unexpected payload data");
+					exit(1);
+				}
+				break;
+			}
+
+		case 'p':
+			{
+				/*
+				 * Progress report.
+				 *
+				 * The remainder of the message is expected to be an 8-byte
+				 * count of bytes completed.
+				 */
+				totaldone = GetCopyDataUInt64(r, copybuf, &cursor);
+				GetCopyDataEnd(r, copybuf, cursor);
+
+				/*
+				 * The server shouldn't send progres report messages too
+				 * often, so we force an update each time we receive one.
+				 */
+				progress_report(state->tablespacenum, true, false);
+				break;
+			}
+
+		case 'm':
+			{
+				/*
+				 * Manifest data will be sent next. This message is not
+				 * expected to have any further payload data.
+				 */
+				GetCopyDataEnd(r, copybuf, cursor);
+
+				/*
+				 * If we're supposed inject the manifest into the archive, we
+				 * prepare to buffer it in memory; otherwise, we prepare to
+				 * write it to a temporary file.
+				 */
+				if (state->manifest_inject_streamer != NULL)
+					state->manifest_buffer = createPQExpBuffer();
+				else
+				{
+					snprintf(state->manifest_filename,
+							 sizeof(state->manifest_filename),
+							 "%s/backup_manifest.tmp", basedir);
+					state->manifest_file =
+						fopen(state->manifest_filename, "wb");
+					if (state->manifest_file == NULL)
+					{
+						pg_log_error("could not create file \"%s\": %m",
+									 state->manifest_filename);
+						exit(1);
+					}
+				}
+				break;
+			}
+
+		default:
+			ReportCopyDataParseError(r, copybuf);
+			break;
+	}
+}
+
+/*
+ * Get a single byte from a CopyData message.
+ *
+ * Bail out if none remain.
+ */
+static char
+GetCopyDataByte(size_t r, char *copybuf, size_t *cursor)
+{
+	if (*cursor >= r)
+		ReportCopyDataParseError(r, copybuf);
+
+	return copybuf[(*cursor)++];
+}
+
+/*
+ * Get a NUL-terminated string from a CopyData message.
+ *
+ * Bail out if the terminating NUL cannot be found.
+ */
+static char *
+GetCopyDataString(size_t r, char *copybuf, size_t *cursor)
+{
+	size_t		startpos = *cursor;
+	size_t		endpos = startpos;
+
+	while (1)
+	{
+		if (endpos >= r)
+			ReportCopyDataParseError(r, copybuf);
+		if (copybuf[endpos] == '\0')
+			break;
+		++endpos;
+	}
+
+	*cursor = endpos + 1;
+	return &copybuf[startpos];
+}
+
+/*
+ * Get an unsigned 64-bit integer from a CopyData message.
+ *
+ * Bail out if there are not at least 8 bytes remaining.
+ */
+static uint64
+GetCopyDataUInt64(size_t r, char *copybuf, size_t *cursor)
+{
+	uint64		result;
+
+	if (*cursor + sizeof(uint64) > r)
+		ReportCopyDataParseError(r, copybuf);
+	memcpy(&result, &copybuf[*cursor], sizeof(uint64));
+	*cursor += sizeof(uint64);
+	return pg_ntoh64(result);
+}
+
+/*
+ * Bail out if we didn't parse the whole message.
+ */
+static void
+GetCopyDataEnd(size_t r, char *copybuf, size_t cursor)
+{
+	if (r != cursor)
+		ReportCopyDataParseError(r, copybuf);
+}
+
+/*
+ * Report failure to parse a CopyData message from the server. Then exit.
+ *
+ * As a debugging aid, we try to give some hint about what kind of message
+ * provoked the failure. Perhaps this is not detailed enough, but it's not
+ * clear that it's worth expending any more code on what shoud be a
+ * can't-happen case.
+ */
+static void
+ReportCopyDataParseError(size_t r, char *copybuf)
+{
+	if (r == 0)
+		pg_log_error("empty COPY message");
+	else
+		pg_log_error("malformed COPY message of type %d, length %zu",
+					 copybuf[0], r);
+	exit(1);
+}
+
 /*
  * Receive raw tar data from the server, and stream it to the appropriate
  * location. If we're writing a single tarfile to standard output, also
@@ -1336,28 +1665,32 @@ BaseBackup(void)
 	}
 	if (maxrate > 0)
 		AppendIntegerCommandOption(&buf, use_new_option_syntax, "MAX_RATE",
-									  maxrate);
+								   maxrate);
 	if (format == 't')
 		AppendPlainCommandOption(&buf, use_new_option_syntax, "TABLESPACE_MAP");
 	if (!verify_checksums)
 	{
 		if (use_new_option_syntax)
 			AppendIntegerCommandOption(&buf, use_new_option_syntax,
-										  "VERIFY_CHECKSUMS", 0);
+									   "VERIFY_CHECKSUMS", 0);
 		else
 			AppendPlainCommandOption(&buf, use_new_option_syntax,
-										"NOVERIFY_CHECKSUMS");
+									 "NOVERIFY_CHECKSUMS");
 	}
 
 	if (manifest)
 	{
 		AppendStringCommandOption(&buf, use_new_option_syntax, "MANIFEST",
-									 manifest_force_encode ? "force-encode" : "yes");
+								  manifest_force_encode ? "force-encode" : "yes");
 		if (manifest_checksums != NULL)
 			AppendStringCommandOption(&buf, use_new_option_syntax,
-										 "MANIFEST_CHECKSUMS", manifest_checksums);
+									  "MANIFEST_CHECKSUMS", manifest_checksums);
 	}
 
+	if (serverMajor >= 1500)
+		AppendStringCommandOption(&buf, use_new_option_syntax,
+								  "TARGET", "client");
+
 	if (verbose)
 		pg_log_info("initiating base backup, waiting for checkpoint to complete");
 
@@ -1480,46 +1813,56 @@ BaseBackup(void)
 		StartLogStreamer(xlogstart, starttli, sysidentifier);
 	}
 
-	/* Receive a tar file for each tablespace in turn */
-	for (i = 0; i < PQntuples(res); i++)
+	if (serverMajor >= 1500)
 	{
-		char   archive_name[MAXPGPATH];
-		char   *spclocation;
-
-		/*
-		 * If we write the data out to a tar file, it will be named base.tar
-		 * if it's the main data directory or <tablespaceoid>.tar if it's for
-		 * another tablespace. CreateBackupStreamer() will arrange to add .gz
-		 * to the archive name if pg_basebackup is performing compression.
-		 */
-		if (PQgetisnull(res, i, 0))
-		{
-			strlcpy(archive_name, "base.tar", sizeof(archive_name));
-			spclocation = NULL;
-		}
-		else
+		/* Receive a single tar stream with everything. */
+		ReceiveArchiveStream(conn);
+	}
+	else
+	{
+		/* Receive a tar file for each tablespace in turn */
+		for (i = 0; i < PQntuples(res); i++)
 		{
-			snprintf(archive_name, sizeof(archive_name),
-					 "%s.tar", PQgetvalue(res, i, 0));
-			spclocation = PQgetvalue(res, i, 1);
+			char		archive_name[MAXPGPATH];
+			char	   *spclocation;
+
+			/*
+			 * If we write the data out to a tar file, it will be named
+			 * base.tar if it's the main data directory or <tablespaceoid>.tar
+			 * if it's for another tablespace. CreateBackupStreamer() will
+			 * arrange to add .gz to the archive name if pg_basebackup is
+			 * performing compression.
+			 */
+			if (PQgetisnull(res, i, 0))
+			{
+				strlcpy(archive_name, "base.tar", sizeof(archive_name));
+				spclocation = NULL;
+			}
+			else
+			{
+				snprintf(archive_name, sizeof(archive_name),
+						 "%s.tar", PQgetvalue(res, i, 0));
+				spclocation = PQgetvalue(res, i, 1);
+			}
+
+			ReceiveTarFile(conn, archive_name, spclocation, i);
 		}
 
-		ReceiveTarFile(conn, archive_name, spclocation, i);
+		/*
+		 * Now receive backup manifest, if appropriate.
+		 *
+		 * If we're writing a tarfile to stdout, ReceiveTarFile will have
+		 * already processed the backup manifest and included it in the output
+		 * tarfile.  Such a configuration doesn't allow for writing multiple
+		 * files.
+		 *
+		 * If we're talking to an older server, it won't send a backup
+		 * manifest, so don't try to receive one.
+		 */
+		if (!writing_to_stdout && manifest)
+			ReceiveBackupManifest(conn);
 	}
 
-	/*
-	 * Now receive backup manifest, if appropriate.
-	 *
-	 * If we're writing a tarfile to stdout, ReceiveTarFile will have already
-	 * processed the backup manifest and included it in the output tarfile.
-	 * Such a configuration doesn't allow for writing multiple files.
-	 *
-	 * If we're talking to an older server, it won't send a backup manifest,
-	 * so don't try to receive one.
-	 */
-	if (!writing_to_stdout && manifest)
-		ReceiveBackupManifest(conn);
-
 	if (showprogress)
 	{
 		progress_filename = NULL;
diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h
index e6c073c567..36b9b76c5f 100644
--- a/src/include/replication/basebackup_sink.h
+++ b/src/include/replication/basebackup_sink.h
@@ -282,6 +282,7 @@ extern void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr,
 extern void bbsink_forward_cleanup(bbsink *sink);
 
 /* Constructors for various types of sinks. */
+extern bbsink *bbsink_copystream_new(void);
 extern bbsink *bbsink_copytblspc_new(void);
 extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size);
 extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate);
-- 
2.24.3 (Apple Git-128)

