From 71cf350965e596aaf0bf7bb74395a35eef3c0f9d Mon Sep 17 00:00:00 2001
From: Daniel Gustafsson <daniel@yesql.se>
Date: Mon, 16 Dec 2019 15:06:25 +0100
Subject: [PATCH] Online enable and disable of data checksums

This allows data checksums to be enabled, or disabled, in a running
cluster without requiring a restart. The data checksum mode will be
set to "inprogress" while the checksumhelper background worker walks
the database and rewrites all the data pages to include checksums.
Once all pages have been checksummed, the data checksum mode turns
to "on". The synchronization of the checksumhelper is based on the
procbarrier infrastructure.

This adds a new section in the documention for checksums, covering
the basics on how to enable and disable with the various means that
are possible. More information can probably be added covering other
aspects of data checksums.

In passing, this patch removes the SAMPLE procbarrier and replaces
it with the CHECKSUM procbarrier, as there is now a consumer of the
procbarrier API and the "dummy" barrier can be removed (it was needed
to keep the enum with something).
---
 doc/src/sgml/func.sgml                      |  65 ++
 doc/src/sgml/monitoring.sgml                |   4 +
 doc/src/sgml/ref/initdb.sgml                |   1 +
 doc/src/sgml/wal.sgml                       |  99 ++
 src/backend/access/rmgrdesc/xlogdesc.c      |  16 +
 src/backend/access/transam/xlog.c           | 131 ++-
 src/backend/access/transam/xlogfuncs.c      |  57 ++
 src/backend/catalog/system_views.sql        |   5 +
 src/backend/postmaster/Makefile             |   1 +
 src/backend/postmaster/bgworker.c           |   7 +
 src/backend/postmaster/checksumhelper.c     | 950 ++++++++++++++++++++
 src/backend/postmaster/pgstat.c             |   9 +
 src/backend/replication/basebackup.c        |   2 +-
 src/backend/replication/logical/decode.c    |   1 +
 src/backend/storage/ipc/ipci.c              |   2 +
 src/backend/storage/ipc/procsignal.c        |  24 +-
 src/backend/storage/lmgr/lwlocknames.txt    |   1 +
 src/backend/storage/page/README             |   3 +-
 src/backend/storage/page/bufpage.c          |   6 +-
 src/backend/utils/adt/pgstatfuncs.c         |   4 +-
 src/backend/utils/misc/guc.c                |  36 +-
 src/bin/pg_upgrade/controldata.c            |   9 +
 src/bin/pg_upgrade/pg_upgrade.h             |   2 +-
 src/include/access/xlog.h                   |   9 +-
 src/include/access/xlog_internal.h          |   7 +
 src/include/catalog/pg_control.h            |   1 +
 src/include/catalog/pg_proc.dat             |  16 +
 src/include/pgstat.h                        |   5 +-
 src/include/postmaster/checksumhelper.h     |  31 +
 src/include/storage/bufpage.h               |   1 +
 src/include/storage/checksum.h              |   7 +
 src/include/storage/procsignal.h            |   7 +-
 src/test/Makefile                           |   3 +-
 src/test/checksum/.gitignore                |   2 +
 src/test/checksum/Makefile                  |  24 +
 src/test/checksum/README                    |  22 +
 src/test/checksum/t/001_standby_checksum.pl | 104 +++
 37 files changed, 1622 insertions(+), 52 deletions(-)
 create mode 100644 src/backend/postmaster/checksumhelper.c
 create mode 100644 src/include/postmaster/checksumhelper.h
 create mode 100644 src/test/checksum/.gitignore
 create mode 100644 src/test/checksum/Makefile
 create mode 100644 src/test/checksum/README
 create mode 100644 src/test/checksum/t/001_standby_checksum.pl

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 72072e7545..823a5fdf41 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -21316,6 +21316,71 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
 
   </sect2>
 
+  <sect2 id="functions-admin-checksum">
+   <title>Data Checksum Functions</title>
+
+   <para>
+    The functions shown in <xref linkend="functions-checksums-table" /> can
+    be used to enable or disable data checksums in a running cluster.
+    See <xref linkend="checksums" /> for details.
+   </para>
+
+   <table id="functions-checksums-table">
+    <title>Checksum <acronym>SQL</acronym> Functions</title>
+    <tgroup cols="3">
+     <thead>
+      <row>
+       <entry>Function</entry>
+       <entry>Return Type</entry>
+       <entry>Description</entry>
+      </row>
+     </thead>
+     <tbody>
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_enable_data_checksums</primary>
+        </indexterm>
+        <literal><function>pg_enable_data_checksums(<optional><parameter>cost_delay</parameter> <type>int</type>, <parameter>cost_limit</parameter> <type>int</type></optional>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        <para>
+         Initiates data checksums for the cluster. This will switch the data checksums mode
+         to <literal>inprogress</literal> as well as start a background worker that will process
+         all data in the database and enable checksums for it. When all data pages have had
+         checksums enabled, the cluster will automatically switch data checksums mode to
+         <literal>on</literal>.
+        </para>
+        <para>
+         If <parameter>cost_delay</parameter> and <parameter>cost_limit</parameter> are
+         specified, the speed of the process is throttled using the same principles as
+         <link linkend="runtime-config-resource-vacuum-cost">Cost-based Vacuum Delay</link>.
+        </para>
+       </entry>
+      </row>
+      <row>
+       <entry>
+        <indexterm>
+         <primary>pg_disable_data_checksums</primary>
+        </indexterm>
+        <literal><function>pg_disable_data_checksums()</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Disables data checksums for the cluster.
+       </entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
+  </sect2>
+
   <sect2 id="functions-admin-dbobject">
    <title>Database Object Management Functions</title>
 
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 0bfd6151c4..628580dfd1 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1339,6 +1339,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry><literal>CheckpointStart</literal></entry>
          <entry>Waiting for a checkpoint to start.</entry>
         </row>
+        <row>
+         <entry><literal>ChecksumEnableStartcondition</literal></entry>
+         <entry>Waiting for transactions to finish before starting to enable checksums.</entry>
+        </row>
         <row>
          <entry><literal>ClogGroupUpdate</literal></entry>
          <entry>Waiting for group leader to update transaction status at transaction end.</entry>
diff --git a/doc/src/sgml/ref/initdb.sgml b/doc/src/sgml/ref/initdb.sgml
index da5c8f5307..69b7f91cbc 100644
--- a/doc/src/sgml/ref/initdb.sgml
+++ b/doc/src/sgml/ref/initdb.sgml
@@ -220,6 +220,7 @@ PostgreSQL documentation
         are calculated for all objects, in all databases. All checksum
         failures will be reported in the
         <xref linkend="pg-stat-database-view"/> view.
+        See <xref linkend="checksums" /> for details.
        </para>
       </listitem>
      </varlistentry>
diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml
index 4eb8feb903..2d8693def1 100644
--- a/doc/src/sgml/wal.sgml
+++ b/doc/src/sgml/wal.sgml
@@ -230,6 +230,105 @@
   </para>
  </sect1>
 
+ <sect1 id="checksums">
+  <title>Data Checksums</title>
+  <indexterm>
+   <primary>checksums</primary>
+  </indexterm>
+
+  <para>
+   Data pages are not checksum protected by default, but this can optionally be
+   enabled for a cluster.  When enabled, each data page will be assigned a
+   checksum that is updated when the page is written and verified every time
+   the page is read. Only data pages are protected by checksums, internal data
+   structures and temporary files are not.
+  </para>
+
+  <para>
+   Checksums are normally enabled when the cluster is initialized using <link
+   linkend="app-initdb-data-checksums"><application>initdb</application></link>.
+   They can also be enabled or disabled at a later timne, either as an offline
+   operation or in a running cluster. In all cases, checksums are enabled or
+   disabled at the full cluster level, and cannot be specified individually for
+   databases or tables.
+  </para>
+
+  <para>
+   The current state of checksums in the cluster can be verified by viewing the
+   value of the read-only configuration variable <xref
+   linkend="guc-data-checksums" /> by issuing the command <command>SHOW
+   data_checksums</command>.
+  </para>
+
+  <para>
+   When attempting to recover from corrupt data it may be necessary to bypass
+   the checksum protection in order to recover data. To do this, temporarily
+   set the configuration parameter <xref linkend="guc-ignore-checksum-failure" />.
+  </para>
+
+  <sect2 id="checksums-enable-disable">
+   <title>On-line Enabling of Checksums</title>
+
+   <para>
+    Checksums can be enabled or disabled online, by calling the appropriate
+    <link linkend="functions-admin-checksum">functions</link>.
+    Disabling of checksums takes effect immediately when the function is called.
+   </para>
+
+   <para>
+    Enabling checksums will put the cluster checksum mode in
+    <literal>inprogress</literal> mode.  During this time, checksums will be
+    written but not verified. In addition to this, a background worker process
+    is started that enables checksums on all existing data in the cluster. Once
+    this worker has completed processing all databases in the cluster, the
+    checksum mode will automatically switch to <literal>on</literal>. The
+    processing will consume a background worker process, make sure that
+    <varname>max_worker_processes</varname> allows for at least one more
+    additional process.
+   </para>
+
+   <para>
+    The process will initially wait for all open transactions to finish before
+    it starts, so that it can be certain that there are no tables that have been
+    created inside a transaction that has not committed yet and thus would not
+    be visible to the process enabling checksums. It will also, for each database,
+    wait for all pre-existing temporary tables to get removed before it finishes.
+    If long-lived temporary tables are used in the application it may be necessary
+    to terminate these application connections to allow the process to complete.
+    Information about open transactions and connections with temporary tables is
+    written to log.
+   </para>
+
+   <para>
+    If the cluster is stopped while in <literal>inprogress</literal> mode, for
+    any reason, then this process must be restarted manually. To do this,
+    re-execute the function <function>pg_enable_data_checksums()</function>
+    once the cluster has been restarted. It is not possible to resume the work,
+    the process has to start over from the beginning.
+   </para>
+
+   <note>
+    <para>
+     Enabling checksums can cause significant I/O to the system, as most of the
+     database pages will need to be rewritten, and will be written both to the
+     data files and the WAL.
+    </para>
+   </note>
+
+  </sect2>
+
+  <sect2 id="checksums-enable-disable">
+   <title>Off-line Enabling of Checksums</title>
+
+   <para>
+    The <link linkend="pg_checksums"><application>pg_checksums</application></link>
+    application can be used to enable or disable data checksums, as well as 
+    verify checksums, on an offline cluster.
+   </para>
+
+  </sect2>
+ </sect1>
+
   <sect1 id="wal-intro">
    <title>Write-Ahead Logging (<acronym>WAL</acronym>)</title>
 
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 1cd97852e8..779567736d 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -18,6 +18,7 @@
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "catalog/pg_control.h"
+#include "storage/bufpage.h"
 #include "utils/guc.h"
 #include "utils/timestamp.h"
 
@@ -140,6 +141,18 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
 						 xlrec.ThisTimeLineID, xlrec.PrevTimeLineID,
 						 timestamptz_to_str(xlrec.end_time));
 	}
+	else if (info == XLOG_CHECKSUMS)
+	{
+		xl_checksum_state xlrec;
+
+		memcpy(&xlrec, rec, sizeof(xl_checksum_state));
+		if (xlrec.new_checksumtype == PG_DATA_CHECKSUM_VERSION)
+			appendStringInfo(buf, "on");
+		else if (xlrec.new_checksumtype == PG_DATA_CHECKSUM_INPROGRESS_VERSION)
+			appendStringInfo(buf, "inprogress");
+		else
+			appendStringInfo(buf, "off");
+	}
 }
 
 const char *
@@ -185,6 +198,9 @@ xlog_identify(uint8 info)
 		case XLOG_FPI_FOR_HINT:
 			id = "FPI_FOR_HINT";
 			break;
+		case XLOG_CHECKSUMS:
+			id = "CHECKSUMS";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7f4f784c0e..7ad1f02321 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -866,6 +866,7 @@ static void SetLatestXTime(TimestampTz xtime);
 static void SetCurrentChunkStartTime(TimestampTz xtime);
 static void CheckRequiredParameterValues(void);
 static void XLogReportParameters(void);
+static void XlogChecksums(ChecksumType new_type);
 static void checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI,
 								TimeLineID prevTLI);
 static void LocalSetXLogInsertAllowed(void);
@@ -940,6 +941,8 @@ static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
+static bool DataChecksumsInProgress(void);
+
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
  * chunks.  This is a low-level routine; to construct the WAL record header
@@ -1048,7 +1051,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		Assert(RedoRecPtr < Insert->RedoRecPtr);
 		RedoRecPtr = Insert->RedoRecPtr;
 	}
-	doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
+	doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites || DataChecksumsInProgress());
 
 	if (doPageWrites &&
 		(!prevDoPageWrites ||
@@ -4775,10 +4778,6 @@ ReadControlFile(void)
 		(SizeOfXLogLongPHD - SizeOfXLogShortPHD);
 
 	CalculateCheckpointSegments();
-
-	/* Make the initdb settings visible as GUC variables, too */
-	SetConfigOption("data_checksums", DataChecksumsEnabled() ? "yes" : "no",
-					PGC_INTERNAL, PGC_S_OVERRIDE);
 }
 
 /*
@@ -4815,12 +4814,92 @@ GetMockAuthenticationNonce(void)
  * Are checksums enabled for data pages?
  */
 bool
-DataChecksumsEnabled(void)
+DataChecksumsNeedWrite(void)
 {
 	Assert(ControlFile != NULL);
 	return (ControlFile->data_checksum_version > 0);
 }
 
+bool
+DataChecksumsNeedVerify(void)
+{
+	Assert(ControlFile != NULL);
+
+	/*
+	 * Only verify checksums if they are fully enabled in the cluster. In
+	 * inprogress state they are only updated, not verified.
+	 */
+	return (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_VERSION);
+}
+
+static inline bool
+DataChecksumsInProgress(void)
+{
+	return (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_INPROGRESS_VERSION);
+}
+
+void
+SetDataChecksumsInProgress(void)
+{
+	Assert(ControlFile != NULL);
+	if (ControlFile->data_checksum_version > 0)
+		return;
+
+	XlogChecksums(PG_DATA_CHECKSUM_INPROGRESS_VERSION);
+
+	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+	ControlFile->data_checksum_version = PG_DATA_CHECKSUM_INPROGRESS_VERSION;
+	UpdateControlFile();
+	LWLockRelease(ControlFileLock);
+	WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_CHECKSUM));
+}
+
+void
+SetDataChecksumsOn(void)
+{
+	Assert(ControlFile != NULL);
+
+	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+
+	if (ControlFile->data_checksum_version != PG_DATA_CHECKSUM_INPROGRESS_VERSION)
+	{
+		LWLockRelease(ControlFileLock);
+		elog(ERROR, "checksums not in inprogress mode");
+	}
+
+	ControlFile->data_checksum_version = PG_DATA_CHECKSUM_VERSION;
+	UpdateControlFile();
+	LWLockRelease(ControlFileLock);
+	WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_CHECKSUM));
+
+	XlogChecksums(PG_DATA_CHECKSUM_VERSION);
+}
+
+void
+SetDataChecksumsOff(void)
+{
+	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+
+	ControlFile->data_checksum_version = 0;
+	UpdateControlFile();
+	LWLockRelease(ControlFileLock);
+	WaitForProcSignalBarrier(EmitProcSignalBarrier(PROCSIGNAL_BARRIER_CHECKSUM));
+
+	XlogChecksums(0);
+}
+
+/* guc hook */
+const char *
+show_data_checksums(void)
+{
+	if (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_VERSION)
+		return "on";
+	else if (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_INPROGRESS_VERSION)
+		return "inprogress";
+	else
+		return "off";
+}
+
 /*
  * Returns a fake LSN for unlogged relations.
  *
@@ -7777,6 +7856,17 @@ StartupXLOG(void)
 	 */
 	CompleteCommitTsInitialization();
 
+	/*
+	 * If we reach this point with checksums in inprogress state, we notify
+	 * the user that they need to manually restart the process to enable
+	 * checksums. This is because we cannot launch a dynamic background worker
+	 * directly from here, it has to be launched from a regular backend.
+	 */
+	if (ControlFile->data_checksum_version == PG_DATA_CHECKSUM_INPROGRESS_VERSION)
+		ereport(WARNING,
+				(errmsg("checksum state is \"inprogress\" with no worker"),
+				 errhint("Either disable or enable checksums by calling the pg_disable_data_checksums() or pg_enable_data_checksums() functions.")));
+
 	/*
 	 * All done with end-of-recovery actions.
 	 *
@@ -9500,6 +9590,24 @@ XLogReportParameters(void)
 	}
 }
 
+/*
+ * Log the new state of checksums
+ */
+static void
+XlogChecksums(ChecksumType new_type)
+{
+	xl_checksum_state xlrec;
+	XLogRecPtr	recptr;
+
+	xlrec.new_checksumtype = new_type;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, sizeof(xl_checksum_state));
+
+	recptr = XLogInsert(RM_XLOG_ID, XLOG_CHECKSUMS);
+	XLogFlush(recptr);
+}
+
 /*
  * Update full_page_writes in shared memory, and write an
  * XLOG_FPW_CHANGE record if necessary.
@@ -9951,6 +10059,17 @@ xlog_redo(XLogReaderState *record)
 		/* Keep track of full_page_writes */
 		lastFullPageWrites = fpw;
 	}
+	else if (info == XLOG_CHECKSUMS)
+	{
+		xl_checksum_state state;
+
+		memcpy(&state, XLogRecGetData(record), sizeof(xl_checksum_state));
+
+		LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+		ControlFile->data_checksum_version = state.new_checksumtype;
+		UpdateControlFile();
+		LWLockRelease(ControlFileLock);
+	}
 }
 
 #ifdef WAL_DEBUG
diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c
index d4e21f90a8..89ef077d59 100644
--- a/src/backend/access/transam/xlogfuncs.c
+++ b/src/backend/access/transam/xlogfuncs.c
@@ -25,6 +25,7 @@
 #include "catalog/pg_type.h"
 #include "funcapi.h"
 #include "miscadmin.h"
+#include "postmaster/checksumhelper.h"
 #include "pgstat.h"
 #include "replication/walreceiver.h"
 #include "storage/fd.h"
@@ -770,3 +771,59 @@ pg_promote(PG_FUNCTION_ARGS)
 			(errmsg("server did not promote within %d seconds", wait_seconds)));
 	PG_RETURN_BOOL(false);
 }
+
+/*
+ * Disables checksums for the cluster, unless already disabled.
+ *
+ * Has immediate effect - the checksums are set to off right away.
+ */
+Datum
+disable_data_checksums(PG_FUNCTION_ARGS)
+{
+	/*
+	 * If we don't need to write new checksums, then clearly they are already
+	 * disabled.
+	 */
+	if (!DataChecksumsNeedWrite())
+		ereport(ERROR,
+				(errmsg("data checksums already disabled")));
+
+	ShutdownChecksumHelperIfRunning();
+
+	SetDataChecksumsOff();
+
+	PG_RETURN_VOID();
+}
+
+/*
+ * Enables checksums for the cluster, unless already enabled.
+ *
+ * Supports vacuum-like cost-based throttling, to limit system load.
+ * Starts a background worker that updates checksums on existing data.
+ */
+Datum
+enable_data_checksums(PG_FUNCTION_ARGS)
+{
+	int			cost_delay = PG_GETARG_INT32(0);
+	int			cost_limit = PG_GETARG_INT32(1);
+
+	if (cost_delay < 0)
+		ereport(ERROR,
+				(errmsg("cost delay cannot be less than zero")));
+	if (cost_limit <= 0)
+		ereport(ERROR,
+				(errmsg("cost limit must be a positive value")));
+
+	/*
+	 * Allow state change from "off" or from "inprogress", since this is how
+	 * we restart the worker if necessary.
+	 */
+	if (DataChecksumsNeedVerify())
+		ereport(ERROR,
+				(errmsg("data checksums already enabled")));
+
+	SetDataChecksumsInProgress();
+	StartChecksumHelperLauncher(cost_delay, cost_limit);
+
+	PG_RETURN_VOID();
+}
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c9e75f4370..34be3b1246 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1187,6 +1187,11 @@ CREATE OR REPLACE FUNCTION
   RETURNS boolean STRICT VOLATILE LANGUAGE INTERNAL AS 'pg_promote'
   PARALLEL SAFE;
 
+CREATE OR REPLACE FUNCTION pg_enable_data_checksums (
+        cost_delay int DEFAULT 0, cost_limit int DEFAULT 100)
+  RETURNS void STRICT VOLATILE LANGUAGE internal AS 'enable_data_checksums'
+  PARALLEL RESTRICTED;
+
 -- legacy definition for compatibility with 9.3
 CREATE OR REPLACE FUNCTION
   json_populate_record(base anyelement, from_json json, use_json_as_text boolean DEFAULT false)
diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile
index bfdf6a833d..73df17e5f3 100644
--- a/src/backend/postmaster/Makefile
+++ b/src/backend/postmaster/Makefile
@@ -17,6 +17,7 @@ OBJS = \
 	bgworker.o \
 	bgwriter.o \
 	checkpointer.o \
+	checksumhelper.o \
 	fork_process.o \
 	interrupt.o \
 	pgarch.o \
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 75fc0d5d33..fb1bd0dace 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -18,6 +18,7 @@
 #include "pgstat.h"
 #include "port/atomics.h"
 #include "postmaster/bgworker_internals.h"
+#include "postmaster/checksumhelper.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
 #include "replication/logicallauncher.h"
@@ -128,6 +129,12 @@ static const struct
 	},
 	{
 		"ApplyWorkerMain", ApplyWorkerMain
+	},
+	{
+		"ChecksumHelperLauncherMain", ChecksumHelperLauncherMain
+	},
+	{
+		"ChecksumHelperWorkerMain", ChecksumHelperWorkerMain
 	}
 };
 
diff --git a/src/backend/postmaster/checksumhelper.c b/src/backend/postmaster/checksumhelper.c
new file mode 100644
index 0000000000..fba7f4cfdb
--- /dev/null
+++ b/src/backend/postmaster/checksumhelper.c
@@ -0,0 +1,950 @@
+/*-------------------------------------------------------------------------
+ *
+ * checksumhelper.c
+ *	  Background worker to walk the database and write checksums to pages
+ *
+ * When enabling data checksums on a database at initdb time or with
+ * pg_checksums, no extra process is required as each page is checksummed, and
+ * verified, at accesses.  When enabling checksums on an already running
+ * cluster, which was not initialized with checksums, this helper worker will
+ * ensure that all pages are checksummed before verification of the checksums
+ * is turned on.
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/postmaster/checksumhelper.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/htup_details.h"
+#include "access/xact.h"
+#include "catalog/pg_class.h"
+#include "catalog/pg_database.h"
+#include "commands/vacuum.h"
+#include "common/relpath.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/bgwriter.h"
+#include "postmaster/checksumhelper.h"
+#include "storage/bufmgr.h"
+#include "storage/checksum.h"
+#include "storage/lmgr.h"
+#include "storage/ipc.h"
+#include "storage/procarray.h"
+#include "storage/smgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/lsyscache.h"
+#include "utils/ps_status.h"
+
+#define CHECKSUMHELPER_MAX_DB_RETRIES 5
+
+typedef enum
+{
+	CHECKSUMHELPER_SUCCESSFUL = 0,
+	CHECKSUMHELPER_ABORTED,
+	CHECKSUMHELPER_FAILED,
+	CHECKSUMHELPER_RETRYDB,
+}			ChecksumHelperResult;
+
+typedef struct ChecksumHelperShmemStruct
+{
+	/*
+	 * Access to launcher_started and abort must be protected by
+	 * ChecksumHelperLock.
+	 */
+	bool		launcher_started;
+	bool		abort;
+
+	/*
+	 * Access to other members can be done without a lock, as while they are
+	 * in shared memory, they are never concurrently accessed. When a worker
+	 * is running, the launcher is only waiting for that worker to finish.
+	 */
+	ChecksumHelperResult success;
+	bool		process_shared_catalogs;
+	/* Parameter values set on start */
+	int			cost_delay;
+	int			cost_limit;
+}			ChecksumHelperShmemStruct;
+
+/* Shared memory segment for checksumhelper */
+static ChecksumHelperShmemStruct * ChecksumHelperShmem;
+
+/* Bookkeeping for work to do */
+typedef struct ChecksumHelperDatabase
+{
+	Oid			dboid;
+	char	   *dbname;
+}			ChecksumHelperDatabase;
+
+typedef struct ChecksumHelperRelation
+{
+	Oid			reloid;
+	char		relkind;
+}			ChecksumHelperRelation;
+
+typedef struct ChecksumHelperResultEntry
+{
+	Oid						dboid;
+	ChecksumHelperResult	result;
+	int						retries;
+}			ChecksumHelperResultEntry;
+
+
+/* Prototypes */
+static List *BuildDatabaseList(void);
+static List *BuildRelationList(bool include_shared);
+static List *BuildTempTableList(void);
+static ChecksumHelperResult ProcessDatabase(ChecksumHelperDatabase * db);
+static void launcher_cancel_handler(SIGNAL_ARGS);
+
+/*
+ * Main entry point for checksumhelper launcher process.
+ */
+void
+StartChecksumHelperLauncher(int cost_delay, int cost_limit)
+{
+	BackgroundWorker bgw;
+	BackgroundWorkerHandle *bgw_handle;
+
+	LWLockAcquire(ChecksumHelperLock, LW_EXCLUSIVE);
+	if (ChecksumHelperShmem->abort)
+	{
+		LWLockRelease(ChecksumHelperLock);
+		ereport(ERROR,
+				(errmsg("checksum enabling has been aborted")));
+	}
+
+	if (ChecksumHelperShmem->launcher_started)
+	{
+		/* Failed to set means somebody else started */
+		LWLockRelease(ChecksumHelperLock);
+		ereport(NOTICE,
+				(errmsg("checksums are already being enabled")));
+		return;
+	}
+
+	ChecksumHelperShmem->cost_delay = cost_delay;
+	ChecksumHelperShmem->cost_limit = cost_limit;
+
+	memset(&bgw, 0, sizeof(bgw));
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ChecksumHelperLauncherMain");
+	snprintf(bgw.bgw_name, BGW_MAXLEN, "checksumhelper launcher");
+	snprintf(bgw.bgw_type, BGW_MAXLEN, "checksumhelper launcher");
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+	bgw.bgw_notify_pid = MyProcPid;
+	bgw.bgw_main_arg = (Datum) 0;
+
+	ChecksumHelperShmem->launcher_started = true;
+	LWLockRelease(ChecksumHelperLock);
+
+	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
+	{
+		LWLockAcquire(ChecksumHelperLock, LW_EXCLUSIVE);
+		ChecksumHelperShmem->launcher_started = false;
+		LWLockRelease(ChecksumHelperLock);
+		ereport(ERROR,
+				(errmsg("failed to start background worker to enable checksums")));
+	}
+}
+
+/*
+ * ShutdownChecksumHelperIfRunning
+ *		Request shutdown of the checksumhelper
+ *
+ * This does not turn off processing immediately, it signals the checksum
+ * process to end when done with the current block.
+ */
+void
+ShutdownChecksumHelperIfRunning(void)
+{
+	/* If the launcher isn't started, there is nothing to shut down */
+	LWLockAcquire(ChecksumHelperLock, LW_EXCLUSIVE);
+	if (ChecksumHelperShmem->launcher_started)
+		ChecksumHelperShmem->abort = true;
+	LWLockRelease(ChecksumHelperLock);
+}
+
+/*
+ * ProcessSingleRelationFork
+ *		Enable checksums in a single relation/fork.
+ *
+ * Returns true if successful, and false if *aborted*. On error, an actual
+ * error is raised in the lower levels.
+ */
+static bool
+ProcessSingleRelationFork(Relation reln, ForkNumber forkNum, BufferAccessStrategy strategy)
+{
+	BlockNumber numblocks = RelationGetNumberOfBlocksInFork(reln, forkNum);
+	BlockNumber b;
+	char		activity[NAMEDATALEN * 2 + 128];
+
+	for (b = 0; b < numblocks; b++)
+	{
+		Buffer		buf = ReadBufferExtended(reln, forkNum, b, RBM_NORMAL, strategy);
+
+		/*
+		 * Report to pgstat every 100 blocks (so as not to "spam")
+		 */
+		if ((b % 100) == 0)
+		{
+			snprintf(activity, sizeof(activity) - 1, "processing: %s.%s (%s block %d/%d)",
+					 get_namespace_name(RelationGetNamespace(reln)), RelationGetRelationName(reln),
+					 forkNames[forkNum], b, numblocks);
+			pgstat_report_activity(STATE_RUNNING, activity);
+		}
+
+		/* Need to get an exclusive lock before we can flag as dirty */
+		LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+
+		/*
+			processed_databases++;
+		 * Mark the buffer as dirty and force a full page write.  We have to
+		 * re-write the page to WAL even if the checksum hasn't changed,
+		 * because if there is a replica it might have a slightly different
+		 * version of the page with an invalid checksum, caused by unlogged
+		 * changes (e.g. hintbits) on the master happening while checksums
+		 * were off. This can happen if there was a valid checksum on the page
+		 * at one point in the past, so only when checksums are first on, then
+		 * off, and then turned on again.
+		 */
+		START_CRIT_SECTION();
+		MarkBufferDirty(buf);
+		log_newpage_buffer(buf, false);
+		END_CRIT_SECTION();
+
+		UnlockReleaseBuffer(buf);
+
+		/*
+		 * This is the only place where we check if we are asked to abort, the
+		 * abortion will bubble up from here. It's safe to check this without
+		 * a lock, because if we miss it being set, we will try again soon.
+		 */
+		if (ChecksumHelperShmem->abort)
+			return false;
+
+		vacuum_delay_point();
+	}
+
+	return true;
+}
+
+/*
+ * ProcessSingleRelationByOid
+ *		Process a single relation based on oid.
+ *
+ * Returns true if successful, and false if *aborted*. On error, an actual error
+ * is raised in the lower levels.
+ */
+static bool
+ProcessSingleRelationByOid(Oid relationId, BufferAccessStrategy strategy)
+{
+	Relation	rel;
+	ForkNumber	fnum;
+	bool		aborted = false;
+
+	StartTransactionCommand();
+
+	elog(DEBUG2, "background worker \"checksumhelper\" starting to process relation %u", relationId);
+	rel = try_relation_open(relationId, AccessShareLock);
+	if (rel == NULL)
+	{
+		/*
+		 * Relation no longer exist. We consider this a success, since there
+		 * are no pages in it that need checksums, and thus return true.
+		 */
+		elog(DEBUG1, "background worker \"checksumhelper\" skipping relation %u as it no longer exists", relationId);
+		CommitTransactionCommand();
+		pgstat_report_activity(STATE_IDLE, NULL);
+		return true;
+	}
+	RelationOpenSmgr(rel);
+
+	for (fnum = 0; fnum <= MAX_FORKNUM; fnum++)
+	{
+		if (smgrexists(rel->rd_smgr, fnum))
+		{
+			if (!ProcessSingleRelationFork(rel, fnum, strategy))
+			{
+				aborted = true;
+				break;
+			}
+		}
+	}
+	relation_close(rel, AccessShareLock);
+	elog(DEBUG2, "background worker \"checksumhelper\" done with relation %u: %s",
+		 relationId, (aborted ? "aborted" : "finished"));
+
+	CommitTransactionCommand();
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+
+	return !aborted;
+}
+
+/*
+ * ProcessDatabase
+ *		Enable checksums in a single database.
+ *
+ * We do this by launching a dynamic background worker into this database, and
+ * waiting for it to finish.  We have to do this in a separate worker, since
+ * each process can only be connected to one database during its lifetime.
+ */
+static ChecksumHelperResult
+ProcessDatabase(ChecksumHelperDatabase * db)
+{
+	BackgroundWorker bgw;
+	BackgroundWorkerHandle *bgw_handle;
+	BgwHandleStatus status;
+	pid_t		pid;
+	char		activity[NAMEDATALEN + 64];
+
+	ChecksumHelperShmem->success = CHECKSUMHELPER_FAILED;
+
+	memset(&bgw, 0, sizeof(bgw));
+	bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
+	bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
+	snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres");
+	snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ChecksumHelperWorkerMain");
+	snprintf(bgw.bgw_name, BGW_MAXLEN, "checksumhelper worker");
+	snprintf(bgw.bgw_type, BGW_MAXLEN, "checksumhelper worker");
+	bgw.bgw_restart_time = BGW_NEVER_RESTART;
+	bgw.bgw_notify_pid = MyProcPid;
+	bgw.bgw_main_arg = ObjectIdGetDatum(db->dboid);
+
+	/*
+	 * If there are no worker slots available, make sure we retry processing
+	 * this database. This will make the checksumhelper move on to the next
+	 * database and quite likely fail with the same problem. Maybe we need a
+	 * backoff to avoid running through all the databases here in short order.
+	 */
+	if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle))
+	{
+		ereport(WARNING,
+				(errmsg("failed to start worker for enabling checksums in \"%s\", retrying",
+						db->dbname),
+				 errhint("The max_worker_processes setting might be too low.")));
+		return CHECKSUMHELPER_RETRYDB;
+	}
+
+	status = WaitForBackgroundWorkerStartup(bgw_handle, &pid);
+	if (status == BGWH_STOPPED)
+	{
+		ereport(WARNING,
+				(errmsg("could not start background worker for enabling checksums in \"%s\"",
+						db->dbname),
+				 errhint("More details on the error might be found in the server log.")));
+		return CHECKSUMHELPER_FAILED;
+	}
+
+	/*
+	 * If the postmaster crashed we cannot end up with checksums enabled
+	 * clusterwide so we have no alternative other than exiting.
+	 */
+	if (status == BGWH_POSTMASTER_DIED)
+		ereport(FATAL,
+				(errmsg("cannot enable checksums without the postmaster process"),
+				 errhint("Restart the database and restart the checksumming process by calling pg_enable_data_checksums().")));
+
+	Assert(status == BGWH_STARTED);
+	ereport(DEBUG1,
+			(errmsg("started background worker \"checksumhelper\" in database \"%s\"",
+					db->dbname)));
+
+	snprintf(activity, sizeof(activity) - 1,
+			 "Waiting for worker in database %s (pid %d)", db->dbname, pid);
+	pgstat_report_activity(STATE_RUNNING, activity);
+
+	status = WaitForBackgroundWorkerShutdown(bgw_handle);
+	if (status == BGWH_POSTMASTER_DIED)
+		ereport(FATAL,
+				(errmsg("postmaster exited during checksum processing in \"%s\"",
+						db->dbname),
+				 errhint("Restart the database and restart the checksumming process by calling pg_enable_data_checksums().")));
+
+	if (ChecksumHelperShmem->success == CHECKSUMHELPER_ABORTED)
+		ereport(LOG,
+				(errmsg("background worker for enabling checksums was aborted during processing in \"%s\"",
+						db->dbname)));
+
+	ereport(DEBUG1,
+			(errmsg("background worker \"checksumhelper\" in \"%s\" completed",
+					db->dbname)));
+
+	pgstat_report_activity(STATE_IDLE, NULL);
+
+	return ChecksumHelperShmem->success;
+}
+
+static void
+launcher_exit(int code, Datum arg)
+{
+	LWLockAcquire(ChecksumHelperLock, LW_EXCLUSIVE);
+	ChecksumHelperShmem->abort = false;
+	ChecksumHelperShmem->launcher_started = false;
+	LWLockRelease(ChecksumHelperLock);
+}
+
+static void
+launcher_cancel_handler(SIGNAL_ARGS)
+{
+	LWLockAcquire(ChecksumHelperLock, LW_EXCLUSIVE);
+	ChecksumHelperShmem->abort = true;
+	LWLockRelease(ChecksumHelperLock);
+}
+
+static void
+WaitForAllTransactionsToFinish(void)
+{
+	TransactionId waitforxid;
+
+	LWLockAcquire(XidGenLock, LW_SHARED);
+	waitforxid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
+	LWLockRelease(XidGenLock);
+
+	while (true)
+	{
+		TransactionId oldestxid = GetOldestActiveTransactionId();
+
+		if (TransactionIdPrecedes(oldestxid, waitforxid))
+		{
+			char		activity[64];
+
+			/* Oldest running xid is older than us, so wait */
+			snprintf(activity, sizeof(activity), "Waiting for current transactions to finish (waiting for %u)", waitforxid);
+			pgstat_report_activity(STATE_RUNNING, activity);
+
+			/* Retry every 5 seconds */
+			ResetLatch(MyLatch);
+			(void) WaitLatch(MyLatch,
+							 WL_LATCH_SET | WL_TIMEOUT,
+							 5000,
+							 WAIT_EVENT_CHECKSUM_ENABLE_STARTCONDITION);
+		}
+		else
+		{
+			pgstat_report_activity(STATE_IDLE, NULL);
+			return;
+		}
+	}
+}
+
+void
+ChecksumHelperLauncherMain(Datum arg)
+{
+	List	   *DatabaseList;
+	HTAB	   *ProcessedDatabases = NULL;
+	ListCell   *lc;
+	HASHCTL		hash_ctl;
+	bool		found_failed = false;
+
+	on_shmem_exit(launcher_exit, 0);
+
+	ereport(DEBUG1,
+			(errmsg("background worker \"checksumhelper\" launcher started")));
+
+	pqsignal(SIGTERM, die);
+	pqsignal(SIGINT, launcher_cancel_handler);
+
+	BackgroundWorkerUnblockSignals();
+
+	init_ps_display(pgstat_get_backend_desc(B_CHECKSUMHELPER_LAUNCHER),
+					"", "", "");
+
+	/* Initialize a hash tracking all processed databases */
+	memset(&hash_ctl, 0, sizeof(hash_ctl));
+	hash_ctl.keysize = sizeof(Oid);
+	hash_ctl.entrysize = sizeof(ChecksumHelperResultEntry);
+	ProcessedDatabases = hash_create("Processed databases",
+									 64,
+									 &hash_ctl,
+									 HASH_ELEM | HASH_BLOBS);
+
+	/*
+	 * Initialize a connection to shared catalogs only.
+	 */
+	BackgroundWorkerInitializeConnection(NULL, NULL, 0);
+
+	/*
+	 * Set up so first run processes shared catalogs, but not once in every
+	 * db.
+	 */
+	ChecksumHelperShmem->process_shared_catalogs = true;
+
+	while (true)
+	{
+		int			processed_databases = 0;
+
+		/*
+		 * Get a list of all databases to process. This may include databases
+		 * that were created during our runtime.
+		 *
+		 * Since a database can be created as a copy of any other database
+		 * (which may not have existed in our last run), we have to repeat
+		 * this loop until no new databases show up in the list. Since we wait
+		 * for all pre-existing transactions finish, this way we can be
+		 * certain that there are no databases left without checksums.
+		 */
+		DatabaseList = BuildDatabaseList();
+
+		/*
+		 * If there are no databases at all to checksum, we can exit
+		 * immediately as there is no work to do. This can probably never
+		 * happen, but just in case.
+		 */
+		if (list_length(DatabaseList) == 0)
+			return;
+
+		foreach(lc, DatabaseList)
+		{
+			ChecksumHelperDatabase *db = (ChecksumHelperDatabase *) lfirst(lc);
+			ChecksumHelperResult result;
+			ChecksumHelperResultEntry *entry;
+			bool			found;
+
+			elog(DEBUG1, "Starting processing of database %s with oid %u", db->dbname, db->dboid);
+
+			entry = (ChecksumHelperResultEntry *) hash_search(ProcessedDatabases, &db->dboid,
+								HASH_FIND, NULL);
+
+			/* Skip if this database has been processed already */
+			if (entry)
+			{
+				if (entry->result == CHECKSUMHELPER_RETRYDB)
+				{
+					/*
+					 * Limit the number of retries to avoid infinite looping
+					 * in case there simply wont be enough workers in the
+					 * cluster to finish this operation.
+					 */
+					if (entry->retries > CHECKSUMHELPER_MAX_DB_RETRIES)
+						entry->result = CHECKSUMHELPER_FAILED;
+				}
+
+				if (entry->result != CHECKSUMHELPER_RETRYDB)
+				{
+					pfree(db->dbname);
+					pfree(db);
+					continue;
+				}
+			}
+
+			result = ProcessDatabase(db);
+			processed_databases++;
+
+			if (result == CHECKSUMHELPER_SUCCESSFUL)
+			{
+				/*
+				 * If one database has completed shared catalogs, we don't
+				 * have to process them again.
+				 */
+				if (ChecksumHelperShmem->process_shared_catalogs)
+					ChecksumHelperShmem->process_shared_catalogs = false;
+			}
+			else if (result == CHECKSUMHELPER_ABORTED)
+				/* Abort flag set, so exit the whole process */
+				return;
+
+			entry = hash_search(ProcessedDatabases, &db->dboid, HASH_ENTER, &found);
+			entry->dboid = db->dboid;
+			entry->result = result;
+			if (!found)
+				entry->retries = 0;
+			else
+				entry->retries++;
+
+			pfree(db->dbname);
+			pfree(db);
+		}
+
+		elog(DEBUG1,
+			 "completed one pass over all databases for checksum enabling, %i databases processed",
+			 processed_databases);
+
+		list_free(DatabaseList);
+
+		/*
+		 * If no databases were processed in this run of the loop, we have now
+		 * finished all databases and no concurrently created ones can exist.
+		 */
+		if (processed_databases == 0)
+			break;
+	}
+
+	/*
+	 * ProcessedDatabases now has all databases and the results of their
+	 * processing. Failure to enable checksums for a database can be because
+	 * they actually failed for some reason, or because the database was
+	 * dropped between us getting the database list and trying to process it.
+	 * Get a fresh list of databases to detect the second case where the
+	 * database was dropped before we had started processing it. If a database
+	 * still exists, but enabling checksums failed then we fail the entire
+	 * checksumming process and exit with an error.
+	 */
+	DatabaseList = BuildDatabaseList();
+
+	foreach(lc, DatabaseList)
+	{
+		ChecksumHelperDatabase *db = (ChecksumHelperDatabase *) lfirst(lc);
+		ChecksumHelperResult *entry;
+		bool		found;
+
+		entry = hash_search(ProcessedDatabases, (void *) &db->dboid,
+							HASH_FIND, &found);
+
+		/*
+		 * We are only interested in the databases where the failed database
+		 * still exist.
+		 */
+		if (found && *entry == CHECKSUMHELPER_FAILED)
+		{
+			ereport(WARNING,
+					(errmsg("failed to enable checksums in \"%s\"",
+							db->dbname)));
+			found_failed = found;
+			continue;
+		}
+	}
+
+	if (found_failed)
+	{
+		/* Disable checksums on cluster, because we failed */
+		SetDataChecksumsOff();
+		ereport(ERROR,
+				(errmsg("checksums failed to get enabled in all databases, aborting"),
+				 errhint("The server log might have more information on the error.")));
+	}
+
+	/*
+	 * Force a checkpoint to get everything out to disk. XXX: this should
+	 * probably not be an IMMEDIATE checkpoint, but leave it there for now for
+	 * testing.
+	 */
+	RequestCheckpoint(CHECKPOINT_FORCE | CHECKPOINT_WAIT | CHECKPOINT_IMMEDIATE);
+
+	/*
+	 * Everything has been processed, so flag checksums enabled.
+	 */
+	SetDataChecksumsOn();
+
+	ereport(LOG,
+			(errmsg("checksums enabled clusterwide")));
+}
+
+/*
+ * ChecksumHelperShmemSize
+ *		Compute required space for checksumhelper-related shared memory
+ */
+Size
+ChecksumHelperShmemSize(void)
+{
+	Size		size;
+
+	size = sizeof(ChecksumHelperShmemStruct);
+	size = MAXALIGN(size);
+
+	return size;
+}
+
+/*
+ * ChecksumHelperShmemInit
+ *		Allocate and initialize checksumhelper-related shared memory
+ */
+void
+ChecksumHelperShmemInit(void)
+{
+	bool		found;
+
+	ChecksumHelperShmem = (ChecksumHelperShmemStruct *)
+		ShmemInitStruct("ChecksumHelper Data",
+						ChecksumHelperShmemSize(),
+						&found);
+
+	if (!found)
+	{
+		MemSet(ChecksumHelperShmem, 0, ChecksumHelperShmemSize());
+	}
+}
+
+/*
+ * BuildDatabaseList
+ *		Compile a list of all currently available databases in the cluster
+ *
+ * This creates the list of databases for the checksumhelper workers to add
+ * checksums to.
+ */
+static List *
+BuildDatabaseList(void)
+{
+	List	   *DatabaseList = NIL;
+	Relation	rel;
+	TableScanDesc scan;
+	HeapTuple	tup;
+	MemoryContext ctx = CurrentMemoryContext;
+	MemoryContext oldctx;
+
+	StartTransactionCommand();
+
+	rel = table_open(DatabaseRelationId, AccessShareLock);
+
+	/*
+	 * Before we do this, wait for all pending transactions to finish. This
+	 * will ensure there are no concurrently running CREATE DATABASE, which
+	 * could cause us to miss the creation of a database that was copied
+	 * without checksums.
+	 */
+	WaitForAllTransactionsToFinish();
+
+	scan = table_beginscan_catalog(rel, 0, NULL);
+
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_database pgdb = (Form_pg_database) GETSTRUCT(tup);
+		ChecksumHelperDatabase *db;
+
+		oldctx = MemoryContextSwitchTo(ctx);
+
+		db = (ChecksumHelperDatabase *) palloc(sizeof(ChecksumHelperDatabase));
+
+		db->dboid = pgdb->oid;
+		db->dbname = pstrdup(NameStr(pgdb->datname));
+
+		DatabaseList = lappend(DatabaseList, db);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	CommitTransactionCommand();
+
+	return DatabaseList;
+}
+
+/*
+ * BuildRelationList
+ *		Compile a list of all relations in the database
+ *
+ * If shared is true, both shared relations and local ones are returned, else
+ * all non-shared relations are returned.
+ * Temp tables are not included.
+ */
+static List *
+BuildRelationList(bool include_shared)
+{
+	List	   *RelationList = NIL;
+	Relation	rel;
+	TableScanDesc scan;
+	HeapTuple	tup;
+	MemoryContext ctx = CurrentMemoryContext;
+	MemoryContext oldctx;
+
+	StartTransactionCommand();
+
+	rel = table_open(RelationRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 0, NULL);
+
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_class pgc = (Form_pg_class) GETSTRUCT(tup);
+		ChecksumHelperRelation *relentry;
+
+		if (pgc->relpersistence == RELPERSISTENCE_TEMP)
+			continue;
+
+		if (pgc->relisshared && !include_shared)
+			continue;
+
+		/*
+		 * Only include relations types that have local storage
+		 */
+		if (pgc->relkind == RELKIND_VIEW ||
+			pgc->relkind == RELKIND_COMPOSITE_TYPE ||
+			pgc->relkind == RELKIND_FOREIGN_TABLE)
+			continue;
+
+		oldctx = MemoryContextSwitchTo(ctx);
+		relentry = (ChecksumHelperRelation *) palloc(sizeof(ChecksumHelperRelation));
+
+		relentry->reloid = pgc->oid;
+		relentry->relkind = pgc->relkind;
+
+		RelationList = lappend(RelationList, relentry);
+
+		MemoryContextSwitchTo(oldctx);
+	}
+
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	CommitTransactionCommand();
+
+	return RelationList;
+}
+
+/*
+ * BuildTempTableList
+ *		Compile a list of all temporary tables in database
+ *
+ * Returns a List of oids.
+ */
+static List *
+BuildTempTableList(void)
+{
+	List	   *RelationList = NIL;
+	Relation	rel;
+	TableScanDesc scan;
+	HeapTuple	tup;
+	MemoryContext ctx = CurrentMemoryContext;
+	MemoryContext oldctx;
+
+	StartTransactionCommand();
+
+	rel = table_open(RelationRelationId, AccessShareLock);
+	scan = table_beginscan_catalog(rel, 0, NULL);
+
+	while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+	{
+		Form_pg_class pgc = (Form_pg_class) GETSTRUCT(tup);
+
+		if (pgc->relpersistence != RELPERSISTENCE_TEMP)
+			continue;
+
+		oldctx = MemoryContextSwitchTo(ctx);
+		RelationList = lappend_oid(RelationList, pgc->oid);
+		MemoryContextSwitchTo(oldctx);
+	}
+
+	table_endscan(scan);
+	table_close(rel, AccessShareLock);
+
+	CommitTransactionCommand();
+
+	return RelationList;
+}
+
+/*
+ * Main function for enabling checksums in a single database
+ */
+void
+ChecksumHelperWorkerMain(Datum arg)
+{
+	Oid			dboid = DatumGetObjectId(arg);
+	List	   *RelationList = NIL;
+	List	   *InitialTempTableList = NIL;
+	ListCell   *lc;
+	BufferAccessStrategy strategy;
+	bool		aborted = false;
+
+	pqsignal(SIGTERM, die);
+
+	BackgroundWorkerUnblockSignals();
+
+	init_ps_display(pgstat_get_backend_desc(B_CHECKSUMHELPER_WORKER), "", "", "");
+
+	ereport(DEBUG1,
+			(errmsg("background worker \"checksumhelper\" starting for database oid %d",
+					dboid)));
+
+	BackgroundWorkerInitializeConnectionByOid(dboid, InvalidOid, BGWORKER_BYPASS_ALLOWCONN);
+
+	/*
+	 * Get a list of all temp tables present as we start in this database. We
+	 * need to wait until they are all gone until we are done, since we cannot
+	 * access those files and modify them.
+	 */
+	InitialTempTableList = BuildTempTableList();
+
+	/*
+	 * Enable vacuum cost delay, if any.
+	 */
+	VacuumCostDelay = ChecksumHelperShmem->cost_delay;
+	VacuumCostLimit = ChecksumHelperShmem->cost_limit;
+	VacuumCostActive = (VacuumCostDelay > 0);
+	VacuumCostBalance = 0;
+	VacuumPageHit = 0;
+	VacuumPageMiss = 0;
+	VacuumPageDirty = 0;
+
+	/*
+	 * Create and set the vacuum strategy as our buffer strategy.
+	 */
+	strategy = GetAccessStrategy(BAS_VACUUM);
+
+	RelationList = BuildRelationList(ChecksumHelperShmem->process_shared_catalogs);
+	foreach(lc, RelationList)
+	{
+		ChecksumHelperRelation *rel = (ChecksumHelperRelation *) lfirst(lc);
+
+		if (!ProcessSingleRelationByOid(rel->reloid, strategy))
+		{
+			aborted = true;
+			break;
+		}
+	}
+	list_free_deep(RelationList);
+
+	if (aborted)
+	{
+		ChecksumHelperShmem->success = CHECKSUMHELPER_ABORTED;
+		ereport(DEBUG1,
+				(errmsg("background worker \"checksumhelper\" aborted in database oid %d",
+						dboid)));
+		return;
+	}
+
+	/*
+	 * Wait for all temp tables that existed when we started to go away. This
+	 * is necessary since we cannot "reach" them to enable checksums. Any temp
+	 * tables created after we started will already have checksums in them
+	 * (due to the inprogress state), so those are safe.
+	 */
+	while (true)
+	{
+		List	   *CurrentTempTables;
+		ListCell   *lc;
+		int			numleft;
+		char		activity[64];
+
+		CurrentTempTables = BuildTempTableList();
+		numleft = 0;
+		foreach(lc, InitialTempTableList)
+		{
+			if (list_member_oid(CurrentTempTables, lfirst_oid(lc)))
+				numleft++;
+		}
+		list_free(CurrentTempTables);
+
+		if (numleft == 0)
+			break;
+
+		/* At least one temp table left to wait for */
+		snprintf(activity, sizeof(activity), "Waiting for %d temp tables to be removed", numleft);
+		pgstat_report_activity(STATE_RUNNING, activity);
+
+		/* Retry every 5 seconds */
+		ResetLatch(MyLatch);
+		(void) WaitLatch(MyLatch,
+						 WL_LATCH_SET | WL_TIMEOUT,
+						 5000,
+						 WAIT_EVENT_CHECKSUM_ENABLE_STARTCONDITION);
+	}
+
+	list_free(InitialTempTableList);
+
+	ChecksumHelperShmem->success = CHECKSUMHELPER_SUCCESSFUL;
+	ereport(DEBUG1,
+			(errmsg("background worker \"checksumhelper\" completed in database oid %d",
+					dboid)));
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 51c486bebd..3754a7b0ae 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3752,6 +3752,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 		case WAIT_EVENT_CHECKPOINT_START:
 			event_name = "CheckpointStart";
 			break;
+		case WAIT_EVENT_CHECKSUM_ENABLE_STARTCONDITION:
+			event_name = "ChecksumEnableStartcondition";
+			break;
 		case WAIT_EVENT_CLOG_GROUP_UPDATE:
 			event_name = "ClogGroupUpdate";
 			break;
@@ -4303,6 +4306,12 @@ pgstat_get_backend_desc(BackendType backendType)
 		case B_WAL_WRITER:
 			backendDesc = "walwriter";
 			break;
+		case B_CHECKSUMHELPER_LAUNCHER:
+			backendDesc = "checksumhelper launcher";
+			break;
+		case B_CHECKSUMHELPER_WORKER:
+			backendDesc = "checksumhelper worker";
+			break;
 	}
 
 	return backendDesc;
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index dea8aab45e..c54e059041 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -1385,7 +1385,7 @@ sendFile(const char *readfilename, const char *tarfilename, struct stat *statbuf
 
 	_tarWriteHeader(tarfilename, NULL, statbuf, false);
 
-	if (!noverify_checksums && DataChecksumsEnabled())
+	if (!noverify_checksums && DataChecksumsNeedVerify())
 	{
 		char	   *filename;
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5e1dc8a651..39ef348cc4 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -196,6 +196,7 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_FPW_CHANGE:
 		case XLOG_FPI_FOR_HINT:
 		case XLOG_FPI:
+		case XLOG_CHECKSUMS:
 			break;
 		default:
 			elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 427b0d59cd..be422954f2 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -27,6 +27,7 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/bgworker_internals.h"
 #include "postmaster/bgwriter.h"
+#include "postmaster/checksumhelper.h"
 #include "postmaster/postmaster.h"
 #include "replication/logicallauncher.h"
 #include "replication/origin.h"
@@ -255,6 +256,7 @@ CreateSharedMemoryAndSemaphores(void)
 	WalSndShmemInit();
 	WalRcvShmemInit();
 	ApplyLauncherShmemInit();
+	ChecksumHelperShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 65d3946386..f2088a9c8e 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -92,7 +92,6 @@ static volatile ProcSignalSlot *MyProcSignalSlot = NULL;
 
 static bool CheckProcSignal(ProcSignalReason reason);
 static void CleanupProcSignalState(int status, Datum arg);
-static void ProcessBarrierPlaceholder(void);
 
 /*
  * ProcSignalShmemSize
@@ -456,8 +455,14 @@ ProcessProcSignalBarrier(void)
 	 * unconditionally, but it's more efficient to call only the ones that
 	 * might need us to do something based on the flags.
 	 */
-	if (BARRIER_SHOULD_CHECK(flags, PROCSIGNAL_BARRIER_PLACEHOLDER))
-		ProcessBarrierPlaceholder();
+	if (BARRIER_SHOULD_CHECK(flags, PROCSIGNAL_BARRIER_CHECKSUM))
+	{
+		/*
+		 * By virtue of getting here (i.e. interrupts being processed), we
+		 * know that this backend won't have any in-progress writes (which
+		 * might have missed the checksum change).
+		 */
+	}
 
 	/*
 	 * State changes related to all types of barriers that might have been
@@ -469,19 +474,6 @@ ProcessProcSignalBarrier(void)
 	pg_atomic_write_u64(&MyProcSignalSlot->pss_barrierGeneration, generation);
 }
 
-static void
-ProcessBarrierPlaceholder(void)
-{
-	/*
-	 * XXX. This is just a placeholder until the first real user of this
-	 * machinery gets committed. Rename PROCSIGNAL_BARRIER_PLACEHOLDER to
-	 * PROCSIGNAL_BARRIER_SOMETHING_ELSE where SOMETHING_ELSE is something
-	 * appropriately descriptive. Get rid of this function and instead have
-	 * ProcessBarrierSomethingElse. Most likely, that function should live
-	 * in the file pertaining to that subsystem, rather than here.
-	 */
-}
-
 /*
  * CheckProcSignal - check to see if a particular reason has been
  * signaled, and clear the signal flag.  Should be called after receiving
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index db47843229..d50b4b13e1 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -49,3 +49,4 @@ MultiXactTruncationLock				41
 OldSnapshotTimeMapLock				42
 LogicalRepWorkerLock				43
 CLogTruncationLock					44
+ChecksumHelperLock					45
diff --git a/src/backend/storage/page/README b/src/backend/storage/page/README
index 5127d98da3..f873fb0eea 100644
--- a/src/backend/storage/page/README
+++ b/src/backend/storage/page/README
@@ -9,7 +9,8 @@ have a very low measured incidence according to research on large server farms,
 http://www.cs.toronto.edu/~bianca/papers/sigmetrics09.pdf, discussed
 2010/12/22 on -hackers list.
 
-Current implementation requires this be enabled system-wide at initdb time.
+Checksums can be enabled at initdb time, but can also be turned on and off
+using pg_enable_data_checksums()/pg_disable_data_checksums() at runtime.
 
 The checksum is not valid at all times on a data page!!
 The checksum is valid when the page leaves the shared pool and is checked
diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c
index f47176753d..b7a4760633 100644
--- a/src/backend/storage/page/bufpage.c
+++ b/src/backend/storage/page/bufpage.c
@@ -94,7 +94,7 @@ PageIsVerified(Page page, BlockNumber blkno)
 	 */
 	if (!PageIsNew(page))
 	{
-		if (DataChecksumsEnabled())
+		if (DataChecksumsNeedVerify())
 		{
 			checksum = pg_checksum_page((char *) page, blkno);
 
@@ -1171,7 +1171,7 @@ PageSetChecksumCopy(Page page, BlockNumber blkno)
 	static char *pageCopy = NULL;
 
 	/* If we don't need a checksum, just return the passed-in data */
-	if (PageIsNew(page) || !DataChecksumsEnabled())
+	if (PageIsNew(page) || !DataChecksumsNeedWrite())
 		return (char *) page;
 
 	/*
@@ -1198,7 +1198,7 @@ void
 PageSetChecksumInplace(Page page, BlockNumber blkno)
 {
 	/* If we don't need a checksum, just return */
-	if (PageIsNew(page) || !DataChecksumsEnabled())
+	if (PageIsNew(page) || !DataChecksumsNeedWrite())
 		return;
 
 	((PageHeader) page)->pd_checksum = pg_checksum_page((char *) page, blkno);
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 74f899f24d..654ef89663 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -1533,7 +1533,7 @@ pg_stat_get_db_checksum_failures(PG_FUNCTION_ARGS)
 	int64		result;
 	PgStat_StatDBEntry *dbentry;
 
-	if (!DataChecksumsEnabled())
+	if (!DataChecksumsNeedWrite())
 		PG_RETURN_NULL();
 
 	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
@@ -1551,7 +1551,7 @@ pg_stat_get_db_checksum_last_failure(PG_FUNCTION_ARGS)
 	TimestampTz result;
 	PgStat_StatDBEntry *dbentry;
 
-	if (!DataChecksumsEnabled())
+	if (!DataChecksumsNeedWrite())
 		PG_RETURN_NULL();
 
 	if ((dbentry = pgstat_fetch_stat_dbentry(dbid)) == NULL)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e44f71e991..177206e58d 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -33,6 +33,7 @@
 #include "access/transam.h"
 #include "access/twophase.h"
 #include "access/xact.h"
+#include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_authid.h"
@@ -72,6 +73,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "storage/bufmgr.h"
+#include "storage/checksum.h"
 #include "storage/dsm_impl.h"
 #include "storage/fd.h"
 #include "storage/large_object.h"
@@ -477,6 +479,16 @@ static struct config_enum_entry shared_memory_options[] = {
 	{NULL, 0, false}
 };
 
+/*
+ * Options for data_checksums enum.
+ */
+static const struct config_enum_entry data_checksum_options[] = {
+	{"on", DATA_CHECKSUMS_ON, true},
+	{"off", DATA_CHECKSUMS_OFF, true},
+	{"inprogress", DATA_CHECKSUMS_INPROGRESS, true},
+	{NULL, 0, false}
+};
+
 /*
  * Options for enum values stored in other modules
  */
@@ -584,7 +596,7 @@ static int	max_identifier_length;
 static int	block_size;
 static int	segment_size;
 static int	wal_block_size;
-static bool data_checksums;
+static int	data_checksums_tmp;
 static bool integer_datetimes;
 static bool assert_enabled;
 static char *recovery_target_timeline_string;
@@ -1844,17 +1856,6 @@ static struct config_bool ConfigureNamesBool[] =
 		NULL, NULL, NULL
 	},
 
-	{
-		{"data_checksums", PGC_INTERNAL, PRESET_OPTIONS,
-			gettext_noop("Shows whether data checksums are turned on for this cluster."),
-			NULL,
-			GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE
-		},
-		&data_checksums,
-		false,
-		NULL, NULL, NULL
-	},
-
 	{
 		{"syslog_sequence_numbers", PGC_SIGHUP, LOGGING_WHERE,
 			gettext_noop("Add sequence number to syslog messages to avoid duplicate suppression."),
@@ -4613,6 +4614,17 @@ static struct config_enum ConfigureNamesEnum[] =
 		check_ssl_max_protocol_version, NULL, NULL
 	},
 
+	{
+		{"data_checksums", PGC_INTERNAL, PRESET_OPTIONS,
+			gettext_noop("Shows whether data checksums are turned on for this cluster."),
+			NULL,
+			GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE
+		},
+		&data_checksums_tmp,
+		DATA_CHECKSUMS_OFF, data_checksum_options,
+		NULL, NULL, show_data_checksums
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 00d71e3a8a..04a6999cf9 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -657,6 +657,15 @@ check_control_data(ControlData *oldctrl,
 	 * check_for_isn_and_int8_passing_mismatch().
 	 */
 
+	/*
+	 * If checksums have been turned on in the old cluster, but the
+	 * checksumhelper have yet to finish, then disallow upgrading. The user
+	 * should either let the process finish, or turn off checksums, before
+	 * retrying.
+	 */
+	if (oldctrl->data_checksum_version == 2)
+		pg_fatal("checksum enabling in old cluster is in progress\n");
+
 	/*
 	 * We might eventually allow upgrades from checksum to no-checksum
 	 * clusters.
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index b156b516cc..3f033a6854 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -220,7 +220,7 @@ typedef struct
 	uint32		large_object;
 	bool		date_is_int;
 	bool		float8_pass_by_value;
-	bool		data_checksum_version;
+	uint32		data_checksum_version;
 } ControlData;
 
 /*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 98b033fc20..69df34cc3b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -189,7 +189,7 @@ extern PGDLLIMPORT int wal_level;
  * of the bits make it to disk, but the checksum wouldn't match.  Also WAL-log
  * them if forced by wal_log_hints=on.
  */
-#define XLogHintBitIsNeeded() (DataChecksumsEnabled() || wal_log_hints)
+#define XLogHintBitIsNeeded() (DataChecksumsNeedWrite() || wal_log_hints)
 
 /* Do we need to WAL-log information required only for Hot Standby and logical replication? */
 #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA)
@@ -292,7 +292,12 @@ extern TimestampTz GetCurrentChunkReplayStartTime(void);
 extern void UpdateControlFile(void);
 extern uint64 GetSystemIdentifier(void);
 extern char *GetMockAuthenticationNonce(void);
-extern bool DataChecksumsEnabled(void);
+extern bool DataChecksumsNeedWrite(void);
+extern bool DataChecksumsNeedVerify(void);
+extern void SetDataChecksumsInProgress(void);
+extern void SetDataChecksumsOn(void);
+extern void SetDataChecksumsOff(void);
+extern const char *show_data_checksums(void);
 extern XLogRecPtr GetFakeLSNForUnloggedRel(void);
 extern Size XLOGShmemSize(void);
 extern void XLOGShmemInit(void);
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 087918d41d..56d2efd5b0 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -25,6 +25,7 @@
 #include "lib/stringinfo.h"
 #include "pgtime.h"
 #include "storage/block.h"
+#include "storage/checksum.h"
 #include "storage/relfilenode.h"
 
 
@@ -245,6 +246,12 @@ typedef struct xl_restore_point
 	char		rp_name[MAXFNAMELEN];
 } xl_restore_point;
 
+/* Information logged when checksum level is changed */
+typedef struct xl_checksum_state
+{
+	ChecksumType new_checksumtype;
+}			xl_checksum_state;
+
 /* End of recovery mark, when we don't do an END_OF_RECOVERY checkpoint */
 typedef struct xl_end_of_recovery
 {
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index de5670e538..73a5495335 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -76,6 +76,7 @@ typedef struct CheckPoint
 #define XLOG_END_OF_RECOVERY			0x90
 #define XLOG_FPI_FOR_HINT				0xA0
 #define XLOG_FPI						0xB0
+#define XLOG_CHECKSUMS					0xC0
 
 
 /*
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fcf2a1214c..2d81d17948 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10682,6 +10682,22 @@
   proargnames => '{max_data_alignment,database_block_size,blocks_per_segment,wal_block_size,bytes_per_wal_segment,max_identifier_length,max_index_columns,max_toast_chunk_size,large_object_chunk_size,float8_pass_by_value,data_page_checksum_version}',
   prosrc => 'pg_control_init' },
 
+{ oid => '4142',
+  descr => 'disable data checksums',
+  proname => 'pg_disable_data_checksums', provolatile => 'v', prorettype => 'bool',
+  proparallel => 'r',
+  proargtypes => '',
+  prosrc => 'disable_data_checksums' },
+
+{ oid => '4035',
+  descr => 'enable data checksums',
+  proname => 'pg_enable_data_checksums', provolatile => 'v', prorettype => 'void',
+  proparallel => 'r',
+  proargtypes => 'int4 int4', proallargtypes => '{int4,int4}',
+  proargmodes => '{i,i}',
+  proargnames => '{cost_delay,cost_limit}',
+  prosrc => 'enable_data_checksums' },
+
 # collation management functions
 { oid => '3445', descr => 'import collations from operating system',
   proname => 'pg_import_system_collations', procost => '100',
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 36b530bc27..2f30138a26 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -727,7 +727,9 @@ typedef enum BackendType
 	B_STARTUP,
 	B_WAL_RECEIVER,
 	B_WAL_SENDER,
-	B_WAL_WRITER
+	B_WAL_WRITER,
+	B_CHECKSUMHELPER_LAUNCHER,
+	B_CHECKSUMHELPER_WORKER
 } BackendType;
 
 
@@ -823,6 +825,7 @@ typedef enum
 	WAIT_EVENT_CLOG_GROUP_UPDATE,
 	WAIT_EVENT_CHECKPOINT_DONE,
 	WAIT_EVENT_CHECKPOINT_START,
+	WAIT_EVENT_CHECKSUM_ENABLE_STARTCONDITION,
 	WAIT_EVENT_EXECUTE_GATHER,
 	WAIT_EVENT_HASH_BATCH_ALLOCATING,
 	WAIT_EVENT_HASH_BATCH_ELECTING,
diff --git a/src/include/postmaster/checksumhelper.h b/src/include/postmaster/checksumhelper.h
new file mode 100644
index 0000000000..888269f3ac
--- /dev/null
+++ b/src/include/postmaster/checksumhelper.h
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ *
+ * checksumhelper.h
+ *	  header file for checksum helper background worker
+ *
+ *
+ * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/postmaster/checksumhelper.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CHECKSUMHELPER_H
+#define CHECKSUMHELPER_H
+
+/* Shared memory */
+extern Size ChecksumHelperShmemSize(void);
+extern void ChecksumHelperShmemInit(void);
+
+/* Start the background processes for enabling checksums */
+void		StartChecksumHelperLauncher(int cost_delay, int cost_limit);
+
+/* Shutdown the background processes, if any */
+void		ShutdownChecksumHelperIfRunning(void);
+
+/* Background worker entrypoints */
+void		ChecksumHelperLauncherMain(Datum arg);
+void		ChecksumHelperWorkerMain(Datum arg);
+
+#endif							/* CHECKSUMHELPER_H */
diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h
index 870ecb51b7..22c5e02175 100644
--- a/src/include/storage/bufpage.h
+++ b/src/include/storage/bufpage.h
@@ -198,6 +198,7 @@ typedef PageHeaderData *PageHeader;
  */
 #define PG_PAGE_LAYOUT_VERSION		4
 #define PG_DATA_CHECKSUM_VERSION	1
+#define PG_DATA_CHECKSUM_INPROGRESS_VERSION		2
 
 /* ----------------------------------------------------------------
  *						page support macros
diff --git a/src/include/storage/checksum.h b/src/include/storage/checksum.h
index 6e77744cbc..c991dec6a4 100644
--- a/src/include/storage/checksum.h
+++ b/src/include/storage/checksum.h
@@ -15,6 +15,13 @@
 
 #include "storage/block.h"
 
+typedef enum ChecksumType
+{
+	DATA_CHECKSUMS_OFF = 0,
+	DATA_CHECKSUMS_ON,
+	DATA_CHECKSUMS_INPROGRESS
+}			ChecksumType;
+
 /*
  * Compute the checksum for a Postgres page.  The page must be aligned on a
  * 4-byte boundary.
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index 90607df106..a99eb1fe37 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -47,12 +47,7 @@ typedef enum
 
 typedef enum
 {
-	/*
-	 * XXX. PROCSIGNAL_BARRIER_PLACEHOLDER should be replaced when the first
-	 * real user of the ProcSignalBarrier mechanism is added. It's just here
-	 * for now because we can't have an empty enum.
-	 */
-	PROCSIGNAL_BARRIER_PLACEHOLDER = 0
+	PROCSIGNAL_BARRIER_CHECKSUM = 0
 } ProcSignalBarrierType;
 
 /*
diff --git a/src/test/Makefile b/src/test/Makefile
index efb206aa75..6469ac94a4 100644
--- a/src/test/Makefile
+++ b/src/test/Makefile
@@ -12,7 +12,8 @@ subdir = src/test
 top_builddir = ../..
 include $(top_builddir)/src/Makefile.global
 
-SUBDIRS = perl regress isolation modules authentication recovery subscription
+SUBDIRS = perl regress isolation modules authentication recovery subscription \
+			checksum
 
 # Test suites that are not safe by default but can be run if selected
 # by the user via the whitespace-separated list in variable
diff --git a/src/test/checksum/.gitignore b/src/test/checksum/.gitignore
new file mode 100644
index 0000000000..871e943d50
--- /dev/null
+++ b/src/test/checksum/.gitignore
@@ -0,0 +1,2 @@
+# Generated by test suite
+/tmp_check/
diff --git a/src/test/checksum/Makefile b/src/test/checksum/Makefile
new file mode 100644
index 0000000000..22a3b64dd8
--- /dev/null
+++ b/src/test/checksum/Makefile
@@ -0,0 +1,24 @@
+#-------------------------------------------------------------------------
+#
+# Makefile for src/test/checksum
+#
+# Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/test/checksum/Makefile
+#
+#-------------------------------------------------------------------------
+
+subdir = src/test/checksum
+top_builddir = ../../..
+include $(top_builddir)/src/Makefile.global
+
+check:
+	$(prove_check)
+
+installcheck:
+	$(prove_installcheck)
+
+clean distclean maintainer-clean:
+	rm -rf tmp_check
+
diff --git a/src/test/checksum/README b/src/test/checksum/README
new file mode 100644
index 0000000000..e3fbd2bdb5
--- /dev/null
+++ b/src/test/checksum/README
@@ -0,0 +1,22 @@
+src/test/checksum/README
+
+Regression tests for data checksums
+===================================
+
+This directory contains a test suite for enabling data checksums
+in a running cluster with streaming replication.
+
+Running the tests
+=================
+
+    make check
+
+or
+
+    make installcheck
+
+NOTE: This creates a temporary installation (in the case of "check"),
+with multiple nodes, be they master or standby(s) for the purpose of
+the tests.
+
+NOTE: This requires the --enable-tap-tests argument to configure.
diff --git a/src/test/checksum/t/001_standby_checksum.pl b/src/test/checksum/t/001_standby_checksum.pl
new file mode 100644
index 0000000000..891743fa6c
--- /dev/null
+++ b/src/test/checksum/t/001_standby_checksum.pl
@@ -0,0 +1,104 @@
+# Test suite for testing enabling data checksums with streaming replication
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 10;
+
+my $MAX_TRIES = 30;
+
+# Initialize master node
+my $node_master = get_new_node('master');
+$node_master->init(allows_streaming => 1);
+$node_master->start;
+my $backup_name = 'my_backup';
+
+# Take backup
+$node_master->backup($backup_name);
+
+# Create streaming standby linking to master
+my $node_standby_1 = get_new_node('standby_1');
+$node_standby_1->init_from_backup($node_master, $backup_name,
+	has_streaming => 1);
+$node_standby_1->start;
+
+# Create some content on master to have un-checksummed data in the cluster
+$node_master->safe_psql('postgres',
+	"CREATE TABLE t AS SELECT generate_series(1,10000) AS a;");
+
+# Wait for standbys to catch up
+$node_master->wait_for_catchup($node_standby_1, 'replay',
+	$node_master->lsn('insert'));
+
+# Check that checksums are turned off
+my $result = $node_master->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "off", 'ensure checksums are turned off on master');
+
+$result = $node_standby_1->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "off", 'ensure checksums are turned off on standby_1');
+
+# Enable checksums for the cluster
+$node_master->safe_psql('postgres', "SELECT pg_enable_data_checksums();");
+
+# Ensure that the master has switched to inprogress immediately
+$result = $node_master->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "inprogress", 'ensure checksums are in progress on master');
+
+# Wait for checksum enable to be replayed
+$node_master->wait_for_catchup($node_standby_1, 'replay');
+
+# Ensure that the standby has switched to inprogress or on
+# Normally it would be "inprogress", but it is theoretically possible for the master
+# to complete the checksum enabling *and* have the standby replay that record before
+# we reach the check below.
+$result = $node_standby_1->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+cmp_ok($result, '~~', ["inprogress", "on"], 'ensure checksums are on or in progress on standby_1');
+
+# Insert some more data which should be checksummed on INSERT
+$node_master->safe_psql('postgres',
+	"INSERT INTO t VALUES (generate_series(1,10000));");
+
+# Wait for checksums enabled on the master
+for (my $i = 0; $i < $MAX_TRIES; $i++)
+{
+	$result = $node_master->safe_psql('postgres',
+		"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+	last if ($result eq 'on');
+	sleep(1);
+}
+is ($result, "on", 'ensure checksums are enabled on master');
+
+# Wait for checksums enabled on the standby
+for (my $i = 0; $i < $MAX_TRIES; $i++)
+{
+	$result = $node_standby_1->safe_psql('postgres',
+		"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+	last if ($result eq 'on');
+	sleep(1);
+}
+is ($result, "on", 'ensure checksums are enabled on standby');
+
+$result = $node_master->safe_psql('postgres', "SELECT count(a) FROM t");
+is ($result, "20000", 'ensure we can safely read all data with checksums');
+
+# Disable checksums and ensure it's propagated to standby and that we can
+# still read all data
+$node_master->safe_psql('postgres', "SELECT pg_disable_data_checksums();");
+$result = $node_master->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "off", 'ensure checksums are off on master');
+
+# Wait for checksum disable to be replayed
+$node_master->wait_for_catchup($node_standby_1, 'replay');
+
+# Ensure that the standby has switched to off
+$result = $node_standby_1->safe_psql('postgres',
+	"SELECT setting FROM pg_catalog.pg_settings WHERE name = 'data_checksums';");
+is($result, "off", 'ensure checksums are off on standby_1');
+
+$result = $node_master->safe_psql('postgres', "SELECT count(a) FROM t");
+is ($result, "20000", 'ensure we can safely read all data without checksums');
-- 
2.21.0 (Apple Git-122.2)

