diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 85e6515ac2..9328830b2e 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -858,6 +858,11 @@ RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
 	 */
 	WaitForCommands(AH, pipefd);
 
+	/*
+	 * Close an eventually open BLOB batch transaction.
+	 */
+	CommitBlobTransaction((Archive *)AH);
+
 	/*
 	 * Disconnect from database and clean up.
 	 */
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 9ef2f2017e..65519791e9 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -223,6 +223,8 @@ typedef struct Archive
 	int			numWorkers;		/* number of parallel processes */
 	char	   *sync_snapshot_id;	/* sync snapshot id for parallel operation */
 
+	int			blobBatchSize;	/* # of blobs to restore per transaction */
+
 	/* info needed for string escaping */
 	int			encoding;		/* libpq code for client_encoding */
 	bool		std_strings;	/* standard_conforming_strings */
@@ -293,6 +295,7 @@ extern void WriteData(Archive *AHX, const void *data, size_t dLen);
 extern int	StartLO(Archive *AHX, Oid oid);
 extern int	EndLO(Archive *AHX, Oid oid);
 
+extern void	CommitBlobTransaction(Archive *AH);
 extern void CloseArchive(Archive *AHX);
 
 extern void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 256d1e35a4..43be945064 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -45,6 +45,7 @@
 #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
 #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
 
+static int		blobBatchCount = 0;
 
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 							   const pg_compress_specification compression_spec,
@@ -258,6 +259,23 @@ CloseArchive(Archive *AHX)
 		pg_fatal("could not close output file: %m");
 }
 
+/* Public */
+void
+CommitBlobTransaction(Archive *AHX)
+{
+	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+	if (blobBatchCount > 0)
+	{
+		ahprintf(AH, "--\n");
+		ahprintf(AH, "-- End BLOB restore batch\n");
+		ahprintf(AH, "--\n");
+		ahprintf(AH, "COMMIT;\n\n");
+
+		blobBatchCount = 0;
+	}
+}
+
 /* Public */
 void
 SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
@@ -719,6 +737,8 @@ RestoreArchive(Archive *AHX)
 			ahprintf(AH, "COMMIT;\n\n");
 	}
 
+	CommitBlobTransaction(AHX);
+
 	if (AH->public.verbose)
 		dumpTimestamp(AH, "Completed on", time(NULL));
 
@@ -3543,6 +3563,57 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
 {
 	RestoreOptions *ropt = AH->public.ropt;
 
+	/* We restore BLOBs in batches to reduce XID consumption */
+	if (strcmp(te->desc, "BLOB") == 0 && AH->public.blobBatchSize > 1)
+	{
+		if (blobBatchCount > 0)
+		{
+			/* We are inside a BLOB restore transaction */
+			if (blobBatchCount >= AH->public.blobBatchSize)
+			{
+				/*
+				 * We did reach the batch size with the previous BLOB.
+				 * Commit and start a new batch.
+				 */
+				ahprintf(AH, "--\n");
+				ahprintf(AH, "-- BLOB batch size reached\n");
+				ahprintf(AH, "--\n");
+				ahprintf(AH, "COMMIT;\n");
+				ahprintf(AH, "BEGIN;\n\n");
+
+				blobBatchCount = 1;
+			}
+			else
+			{
+				/* This one still fits into the current batch */
+				blobBatchCount++;
+			}
+		}
+		else
+		{
+			/* Not inside a transaction, start a new batch */
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "-- Start BLOB restore batch\n");
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "BEGIN;\n\n");
+
+			blobBatchCount = 1;
+		}
+	}
+	else
+	{
+		/* Not a BLOB. If we have a BLOB batch open, close it. */
+		if (blobBatchCount > 0)
+		{
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "-- End BLOB restore batch\n");
+			ahprintf(AH, "--\n");
+			ahprintf(AH, "COMMIT;\n\n");
+
+			blobBatchCount = 0;
+		}
+	}
+
 	/* Select owner, schema, tablespace and default AM as necessary */
 	_becomeOwner(AH, te);
 	_selectOutputSchema(AH, te->namespace);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e863913849..2c6d49732b 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -205,11 +205,20 @@ static inline void dumpComment(Archive *fout, const char *type,
 							   const char *name, const char *namespace,
 							   const char *owner, CatalogId catalogId,
 							   int subid, DumpId dumpId);
+static bool dumpCommentQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+							 const char *type, const char *name,
+							 const char *namespace, const char *owner,
+							 CatalogId catalogId, int subid, DumpId dumpId,
+							 const char *initdb_comment);
 static int	findComments(Oid classoid, Oid objoid, CommentItem **items);
 static void collectComments(Archive *fout);
 static void dumpSecLabel(Archive *fout, const char *type, const char *name,
 						 const char *namespace, const char *owner,
 						 CatalogId catalogId, int subid, DumpId dumpId);
+static bool dumpSecLabelQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+							  const char *type, const char *name,
+							  const char *namespace, const char *owner,
+							  CatalogId catalogId, int subid, DumpId dumpId);
 static int	findSecLabels(Oid classoid, Oid objoid, SecLabelItem **items);
 static void collectSecLabels(Archive *fout);
 static void dumpDumpableObject(Archive *fout, DumpableObject *dobj);
@@ -265,6 +274,12 @@ static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 					  const char *type, const char *name, const char *subname,
 					  const char *nspname, const char *owner,
 					  const DumpableAcl *dacl);
+static bool dumpACLQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+						 DumpId objDumpId, DumpId altDumpId,
+						 const char *type, const char *name,
+						 const char *subname,
+						 const char *nspname, const char *owner,
+						 const DumpableAcl *dacl);
 
 static void getDependencies(Archive *fout);
 static void BuildArchiveDependencies(Archive *fout);
@@ -3641,10 +3656,42 @@ dumpLO(Archive *fout, const LoInfo *loinfo)
 {
 	PQExpBuffer cquery = createPQExpBuffer();
 	PQExpBuffer dquery = createPQExpBuffer();
+	PQExpBuffer tag    = createPQExpBuffer();
+	teSection	section = SECTION_PRE_DATA;
 
 	appendPQExpBuffer(cquery,
 					  "SELECT pg_catalog.lo_create('%s');\n",
 					  loinfo->dobj.name);
+	/*
+	 * In binary upgrade mode we put all the queries to restore
+	 * one large object into a single TOC entry and emit it as
+	 * SECTION_DATA so that they can be restored in parallel.
+	 */
+	if (fout->dopt->binary_upgrade)
+	{
+		section = SECTION_DATA;
+
+		/* Dump comment if any */
+		if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
+			dumpCommentQuery(fout, cquery, tag, "LARGE OBJECT",
+							 loinfo->dobj.name, NULL, loinfo->rolname,
+							 loinfo->dobj.catId, 0, loinfo->dobj.dumpId, NULL);
+
+		/* Dump security label if any */
+		if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
+			dumpSecLabelQuery(fout, cquery, tag, "LARGE OBJECT",
+							  loinfo->dobj.name,
+							  NULL, loinfo->rolname,
+							  loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
+
+		/* Dump ACL if any */
+		if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
+			dumpACLQuery(fout, cquery, tag,
+						 loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
+						 loinfo->dobj.name, NULL,
+						 NULL, loinfo->rolname, &loinfo->dacl);
+	}
+
 
 	appendPQExpBuffer(dquery,
 					  "SELECT pg_catalog.lo_unlink('%s');\n",
@@ -3655,27 +3702,28 @@ dumpLO(Archive *fout, const LoInfo *loinfo)
 					 ARCHIVE_OPTS(.tag = loinfo->dobj.name,
 								  .owner = loinfo->rolname,
 								  .description = "BLOB",
-								  .section = SECTION_PRE_DATA,
+								  .section = section,
 								  .createStmt = cquery->data,
 								  .dropStmt = dquery->data));
 
-	/* Dump comment if any */
-	if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
-		dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name,
-					NULL, loinfo->rolname,
-					loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
-
-	/* Dump security label if any */
-	if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
-		dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name,
-					 NULL, loinfo->rolname,
-					 loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
-
-	/* Dump ACL if any */
-	if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
-		dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
-				loinfo->dobj.name, NULL,
-				NULL, loinfo->rolname, &loinfo->dacl);
+	if (!fout->dopt->binary_upgrade)
+	{
+		/* Dump comment if any */
+		if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
+			dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name,
+						NULL, loinfo->rolname,
+						loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
+		/* Dump security label if any */
+		if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
+			dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name,
+						 NULL, loinfo->rolname,
+						 loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
+		/* Dump ACL if any */
+		if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
+			dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
+					loinfo->dobj.name, NULL,
+					NULL, loinfo->rolname, &loinfo->dacl);
+	}
 
 	destroyPQExpBuffer(cquery);
 	destroyPQExpBuffer(dquery);
@@ -9899,6 +9947,38 @@ dumpCommentExtended(Archive *fout, const char *type,
 					const char *owner, CatalogId catalogId,
 					int subid, DumpId dumpId,
 					const char *initdb_comment)
+{
+	PQExpBuffer query = createPQExpBuffer();
+	PQExpBuffer tag = createPQExpBuffer();
+
+	if (dumpCommentQuery(fout, query, tag, type, name, namespace, owner,
+						 catalogId, subid, dumpId, initdb_comment))
+	{
+		/*
+		 * We mark comments as SECTION_NONE because they really belong in the
+		 * same section as their parent, whether that is pre-data or
+		 * post-data.
+		 */
+		ArchiveEntry(fout, nilCatalogId, createDumpId(),
+					 ARCHIVE_OPTS(.tag = tag->data,
+								  .namespace = namespace,
+								  .owner = owner,
+								  .description = "COMMENT",
+								  .section = SECTION_NONE,
+								  .createStmt = query->data,
+								  .deps = &dumpId,
+								  .nDeps = 1));
+	}
+	destroyPQExpBuffer(query);
+	destroyPQExpBuffer(tag);
+}
+
+static bool
+dumpCommentQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+				 const char *type, const char *name,
+				 const char *namespace, const char *owner,
+				 CatalogId catalogId, int subid, DumpId dumpId,
+				 const char *initdb_comment)
 {
 	DumpOptions *dopt = fout->dopt;
 	CommentItem *comments;
@@ -9906,19 +9986,19 @@ dumpCommentExtended(Archive *fout, const char *type,
 
 	/* do nothing, if --no-comments is supplied */
 	if (dopt->no_comments)
-		return;
+		return false;
 
 	/* Comments are schema not data ... except LO comments are data */
 	if (strcmp(type, "LARGE OBJECT") != 0)
 	{
 		if (dopt->dataOnly)
-			return;
+			return false;
 	}
 	else
 	{
 		/* We do dump LO comments in binary-upgrade mode */
 		if (dopt->schemaOnly && !dopt->binary_upgrade)
-			return;
+			return false;
 	}
 
 	/* Search for comments associated with catalogId, using table */
@@ -9956,9 +10036,6 @@ dumpCommentExtended(Archive *fout, const char *type,
 	/* If a comment exists, build COMMENT ON statement */
 	if (ncomments > 0)
 	{
-		PQExpBuffer query = createPQExpBuffer();
-		PQExpBuffer tag = createPQExpBuffer();
-
 		appendPQExpBuffer(query, "COMMENT ON %s ", type);
 		if (namespace && *namespace)
 			appendPQExpBuffer(query, "%s.", fmtId(namespace));
@@ -9968,24 +10045,10 @@ dumpCommentExtended(Archive *fout, const char *type,
 
 		appendPQExpBuffer(tag, "%s %s", type, name);
 
-		/*
-		 * We mark comments as SECTION_NONE because they really belong in the
-		 * same section as their parent, whether that is pre-data or
-		 * post-data.
-		 */
-		ArchiveEntry(fout, nilCatalogId, createDumpId(),
-					 ARCHIVE_OPTS(.tag = tag->data,
-								  .namespace = namespace,
-								  .owner = owner,
-								  .description = "COMMENT",
-								  .section = SECTION_NONE,
-								  .createStmt = query->data,
-								  .deps = &dumpId,
-								  .nDeps = 1));
-
-		destroyPQExpBuffer(query);
-		destroyPQExpBuffer(tag);
+		return true;
 	}
+
+	return false;
 }
 
 /*
@@ -14939,23 +15002,65 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 		const DumpableAcl *dacl)
 {
 	DumpId		aclDumpId = InvalidDumpId;
+	PQExpBuffer query = createPQExpBuffer();
+	PQExpBuffer tag = createPQExpBuffer();
+
+	if (dumpACLQuery(fout, query, tag, objDumpId, altDumpId,
+					 type, name, subname, nspname, owner, dacl))
+	{
+		DumpId		aclDeps[2];
+		int			nDeps = 0;
+
+		if (subname)
+			appendPQExpBuffer(tag, "COLUMN %s.%s", name, subname);
+		else
+			appendPQExpBuffer(tag, "%s %s", type, name);
+
+		aclDeps[nDeps++] = objDumpId;
+		if (altDumpId != InvalidDumpId)
+			aclDeps[nDeps++] = altDumpId;
+
+		aclDumpId = createDumpId();
+
+		ArchiveEntry(fout, nilCatalogId, aclDumpId,
+					 ARCHIVE_OPTS(.tag = tag->data,
+								  .namespace = nspname,
+								  .owner = owner,
+								  .description = "ACL",
+								  .section = SECTION_NONE,
+								  .createStmt = query->data,
+								  .deps = aclDeps,
+								  .nDeps = nDeps));
+
+	}
+
+	destroyPQExpBuffer(query);
+	destroyPQExpBuffer(tag);
+
+	return aclDumpId;
+}
+
+static bool
+dumpACLQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+			 DumpId objDumpId, DumpId altDumpId,
+			 const char *type, const char *name, const char *subname,
+			 const char *nspname, const char *owner,
+			 const DumpableAcl *dacl)
+{
 	DumpOptions *dopt = fout->dopt;
 	const char *acls = dacl->acl;
 	const char *acldefault = dacl->acldefault;
 	char		privtype = dacl->privtype;
 	const char *initprivs = dacl->initprivs;
 	const char *baseacls;
-	PQExpBuffer sql;
 
 	/* Do nothing if ACL dump is not enabled */
 	if (dopt->aclsSkip)
-		return InvalidDumpId;
+		return false;
 
 	/* --data-only skips ACLs *except* large object ACLs */
 	if (dopt->dataOnly && strcmp(type, "LARGE OBJECT") != 0)
-		return InvalidDumpId;
-
-	sql = createPQExpBuffer();
+		return false;
 
 	/*
 	 * In binary upgrade mode, we don't run an extension's script but instead
@@ -14973,13 +15078,13 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 	if (dopt->binary_upgrade && privtype == 'e' &&
 		initprivs && *initprivs != '\0')
 	{
-		appendPQExpBufferStr(sql, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(true);\n");
+		appendPQExpBufferStr(query, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(true);\n");
 		if (!buildACLCommands(name, subname, nspname, type,
 							  initprivs, acldefault, owner,
-							  "", fout->remoteVersion, sql))
+							  "", fout->remoteVersion, query))
 			pg_fatal("could not parse initial ACL list (%s) or default (%s) for object \"%s\" (%s)",
 					 initprivs, acldefault, name, type);
-		appendPQExpBufferStr(sql, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(false);\n");
+		appendPQExpBufferStr(query, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(false);\n");
 	}
 
 	/*
@@ -15001,43 +15106,19 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
 
 	if (!buildACLCommands(name, subname, nspname, type,
 						  acls, baseacls, owner,
-						  "", fout->remoteVersion, sql))
+						  "", fout->remoteVersion, query))
 		pg_fatal("could not parse ACL list (%s) or default (%s) for object \"%s\" (%s)",
 				 acls, baseacls, name, type);
 
-	if (sql->len > 0)
+	if (query->len > 0 && tag != NULL)
 	{
-		PQExpBuffer tag = createPQExpBuffer();
-		DumpId		aclDeps[2];
-		int			nDeps = 0;
-
 		if (subname)
 			appendPQExpBuffer(tag, "COLUMN %s.%s", name, subname);
 		else
 			appendPQExpBuffer(tag, "%s %s", type, name);
-
-		aclDeps[nDeps++] = objDumpId;
-		if (altDumpId != InvalidDumpId)
-			aclDeps[nDeps++] = altDumpId;
-
-		aclDumpId = createDumpId();
-
-		ArchiveEntry(fout, nilCatalogId, aclDumpId,
-					 ARCHIVE_OPTS(.tag = tag->data,
-								  .namespace = nspname,
-								  .owner = owner,
-								  .description = "ACL",
-								  .section = SECTION_NONE,
-								  .createStmt = sql->data,
-								  .deps = aclDeps,
-								  .nDeps = nDeps));
-
-		destroyPQExpBuffer(tag);
 	}
 
-	destroyPQExpBuffer(sql);
-
-	return aclDumpId;
+	return true;
 }
 
 /*
@@ -15062,16 +15143,42 @@ static void
 dumpSecLabel(Archive *fout, const char *type, const char *name,
 			 const char *namespace, const char *owner,
 			 CatalogId catalogId, int subid, DumpId dumpId)
+{
+	PQExpBuffer query = createPQExpBuffer();
+	PQExpBuffer tag = createPQExpBuffer();
+
+	if (dumpSecLabelQuery(fout, query, tag, type, name,
+						  namespace, owner, catalogId, subid, dumpId))
+	{
+		ArchiveEntry(fout, nilCatalogId, createDumpId(),
+					 ARCHIVE_OPTS(.tag = tag->data,
+								  .namespace = namespace,
+								  .owner = owner,
+								  .description = "SECURITY LABEL",
+								  .section = SECTION_NONE,
+								  .createStmt = query->data,
+								  .deps = &dumpId,
+								  .nDeps = 1));
+	}
+
+	destroyPQExpBuffer(query);
+	destroyPQExpBuffer(tag);
+}
+
+static bool
+dumpSecLabelQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+				  const char *type, const char *name,
+				  const char *namespace, const char *owner,
+				  CatalogId catalogId, int subid, DumpId dumpId)
 {
 	DumpOptions *dopt = fout->dopt;
 	SecLabelItem *labels;
 	int			nlabels;
 	int			i;
-	PQExpBuffer query;
 
 	/* do nothing, if --no-security-labels is supplied */
 	if (dopt->no_security_labels)
-		return;
+		return false;
 
 	/*
 	 * Security labels are schema not data ... except large object labels are
@@ -15080,20 +15187,18 @@ dumpSecLabel(Archive *fout, const char *type, const char *name,
 	if (strcmp(type, "LARGE OBJECT") != 0)
 	{
 		if (dopt->dataOnly)
-			return;
+			return false;
 	}
 	else
 	{
 		/* We do dump large object security labels in binary-upgrade mode */
 		if (dopt->schemaOnly && !dopt->binary_upgrade)
-			return;
+			return false;
 	}
 
 	/* Search for security labels associated with catalogId, using table */
 	nlabels = findSecLabels(catalogId.tableoid, catalogId.oid, &labels);
 
-	query = createPQExpBuffer();
-
 	for (i = 0; i < nlabels; i++)
 	{
 		/*
@@ -15114,22 +15219,11 @@ dumpSecLabel(Archive *fout, const char *type, const char *name,
 
 	if (query->len > 0)
 	{
-		PQExpBuffer tag = createPQExpBuffer();
-
 		appendPQExpBuffer(tag, "%s %s", type, name);
-		ArchiveEntry(fout, nilCatalogId, createDumpId(),
-					 ARCHIVE_OPTS(.tag = tag->data,
-								  .namespace = namespace,
-								  .owner = owner,
-								  .description = "SECURITY LABEL",
-								  .section = SECTION_NONE,
-								  .createStmt = query->data,
-								  .deps = &dumpId,
-								  .nDeps = 1));
-		destroyPQExpBuffer(tag);
+		return true;
 	}
 
-	destroyPQExpBuffer(query);
+	return false;
 }
 
 /*
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 049a100634..2159f72ffb 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -60,6 +60,7 @@ main(int argc, char **argv)
 	int			c;
 	int			exit_code;
 	int			numWorkers = 1;
+	int			blobBatchSize = 0;
 	Archive    *AH;
 	char	   *inputFileSpec;
 	static int	disable_triggers = 0;
@@ -123,6 +124,7 @@ main(int argc, char **argv)
 		{"no-publications", no_argument, &no_publications, 1},
 		{"no-security-labels", no_argument, &no_security_labels, 1},
 		{"no-subscriptions", no_argument, &no_subscriptions, 1},
+		{"restore-blob-batch-size", required_argument, NULL, 4},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -286,6 +288,10 @@ main(int argc, char **argv)
 				set_dump_section(optarg, &(opts->dumpSections));
 				break;
 
+			case 4:				/* # of blobs to restore per transaction */
+				blobBatchSize = atoi(optarg);
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -405,6 +411,7 @@ main(int argc, char **argv)
 		SortTocFromFile(AH);
 
 	AH->numWorkers = numWorkers;
+	AH->blobBatchSize = blobBatchSize;
 
 	if (opts->tocSummary)
 		PrintTOCSummary(AH);
@@ -478,6 +485,8 @@ usage(const char *progname)
 	printf(_("  --use-set-session-authorization\n"
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
+	printf(_("  --restore-blob-batch-size=NUM\n"
+			 "                               attempt to restore NUM large objects per transaction\n"));
 
 	printf(_("\nConnection options:\n"));
 	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index fa52aa2c22..459d834ac3 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -84,7 +84,7 @@ output_check_banner(bool live_check)
 
 
 void
-check_and_dump_old_cluster(bool live_check)
+check_and_dump_old_cluster(bool live_check, DbDumpStats **stats)
 {
 	/* -- OLD -- */
 
@@ -202,12 +202,36 @@ check_and_dump_old_cluster(bool live_check)
 	 * the old server is running.
 	 */
 	if (!user_opts.check)
+	{
+		*stats = collect_db_stats();
 		generate_old_dump();
+	}
 
 	if (!live_check)
 		stop_postmaster(false);
 }
 
+DbDumpStats* collect_db_stats(void)
+{
+	uint dbnum;
+	DbDumpStats *stats = (DbDumpStats *)pg_malloc(sizeof(DbDumpStats));
+	stats->large_objects = (uint64 *)pg_malloc(sizeof(old_cluster.dbarr.ndbs * sizeof(uint64)));
+	prep_status("Collecting database stats");
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		PGresult   *res;
+		DbInfo	   *active_db = &old_cluster.dbarr.dbs[dbnum];
+		PGconn	   *conn = connectToServer(&old_cluster, active_db->db_name);
+
+		res = executeQueryOrDie(conn, "SELECT count(*) from pg_largeobject_metadata");
+		stats->large_objects[dbnum] = atoll(PQgetvalue(res, 0, 0));
+		PQclear(res);
+		PQfinish(conn);
+	}
+	check_ok();
+
+	return stats;
+}
 
 void
 check_new_cluster(void)
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 3960af4036..12605200b5 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -54,7 +54,8 @@
 static void set_locale_and_encoding(void);
 static void prepare_new_cluster(void);
 static void prepare_new_globals(void);
-static void create_new_objects(void);
+static void restore_dbs(DbDumpStats *stats, bool parallel_restore);
+static void create_new_objects(DbDumpStats *stats);
 static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
@@ -82,6 +83,7 @@ main(int argc, char **argv)
 {
 	char	   *deletion_script_file_name = NULL;
 	bool		live_check = false;
+	DbDumpStats *stats;
 
 	/*
 	 * pg_upgrade doesn't currently use common/logging.c, but initialize it
@@ -127,7 +129,7 @@ main(int argc, char **argv)
 
 	check_cluster_compatibility(live_check);
 
-	check_and_dump_old_cluster(live_check);
+	check_and_dump_old_cluster(live_check, &stats);
 
 
 	/* -- NEW -- */
@@ -160,7 +162,7 @@ main(int argc, char **argv)
 
 	prepare_new_globals();
 
-	create_new_objects();
+	create_new_objects(stats);
 
 	stop_postmaster(false);
 
@@ -508,9 +510,72 @@ prepare_new_globals(void)
 	check_ok();
 }
 
+static void
+restore_dbs(DbDumpStats *stats, bool parallel_restore)
+{
+	int			dbnum;
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		char		sql_file_name[MAXPGPATH],
+					log_file_name[MAXPGPATH];
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+		const char *create_opts;
+		int jobs = user_opts.jobs ? user_opts.jobs : 1 ;
+		bool large_objects_thresold_breached = stats && stats->large_objects[dbnum] > LARGE_OBJECTS_THRESOLD;
+
+		/* Skip template1 in this pass */
+		if (strcmp(old_db->db_name, "template1") == 0)
+			continue;
+		/* Skip dbs where LARGE_OBJECTS_THRESOLD is breached and parallel_restore is enabled*/
+		if (large_objects_thresold_breached && parallel_restore)
+			continue;
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+		snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid);
+		snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		/*
+		 * postgres database will already exist in the target installation, so
+		 * tell pg_restore to drop and recreate it; otherwise we would fail to
+		 * propagate its database-level properties.
+		 */
+		if (strcmp(old_db->db_name, "postgres") == 0)
+			create_opts = "--clean --create";
+		else
+			create_opts = "--create";
+
+		if (parallel_restore)
+			parallel_exec_prog(log_file_name,
+							   NULL,
+							   "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+							   "--dbname template1 \"%s/%s\"",
+							   new_cluster.bindir,
+							   cluster_conn_opts(&new_cluster),
+							   create_opts,
+							   log_opts.dumpdir,
+							   sql_file_name);
+		else
+			exec_prog(log_file_name,
+							   NULL,
+							   true,
+							   true,
+							   "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+					  		   "--restore-blob-batch-size %d --jobs %d "
+							   "--dbname template1 \"%s/%s\"",
+							   new_cluster.bindir,
+							   cluster_conn_opts(&new_cluster),
+							   create_opts,
+					  		   large_objects_thresold_breached ?
+								LARGE_OBJECTS_THRESOLD : 0,
+				  			   large_objects_thresold_breached ?
+				  				jobs : 1,
+							   log_opts.dumpdir,
+							   sql_file_name);
+	}
+}
 
 static void
-create_new_objects(void)
+create_new_objects(DbDumpStats *stats)
 {
 	int			dbnum;
 
@@ -557,43 +622,13 @@ create_new_objects(void)
 
 		break;					/* done once we've processed template1 */
 	}
-
-	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
-	{
-		char		sql_file_name[MAXPGPATH],
-					log_file_name[MAXPGPATH];
-		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
-		const char *create_opts;
-
-		/* Skip template1 in this pass */
-		if (strcmp(old_db->db_name, "template1") == 0)
-			continue;
-
-		pg_log(PG_STATUS, "%s", old_db->db_name);
-		snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid);
-		snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
-
-		/*
-		 * postgres database will already exist in the target installation, so
-		 * tell pg_restore to drop and recreate it; otherwise we would fail to
-		 * propagate its database-level properties.
-		 */
-		if (strcmp(old_db->db_name, "postgres") == 0)
-			create_opts = "--clean --create";
-		else
-			create_opts = "--create";
-
-		parallel_exec_prog(log_file_name,
-						   NULL,
-						   "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
-						   "--dbname template1 \"%s/%s\"",
-						   new_cluster.bindir,
-						   cluster_conn_opts(&new_cluster),
-						   create_opts,
-						   log_opts.dumpdir,
-						   sql_file_name);
-	}
-
+	/*  Restore all the dbs where LARGE_OBJECTS_THRESOLD is not breached */
+	restore_dbs(stats, true);
+	/* reap all children */
+	while (reap_child(true) == true)
+		;
+	/*  Restore rest of the dbs one by one  with pg_restore --jobs = user_opts.jobs */
+	restore_dbs(stats, false);
 	/* reap all children */
 	while (reap_child(true) == true)
 		;
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index a710f325de..f41063dbc7 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -24,6 +24,8 @@
 
 #define MESSAGE_WIDTH		62
 
+#define LARGE_OBJECTS_THRESOLD 200 // maybe 10k ? 
+
 #define GET_MAJOR_VERSION(v)	((v) / 100)
 
 /* contains both global db information and CREATE DATABASE commands */
@@ -347,6 +349,15 @@ typedef struct
 	ClusterInfo *running_cluster;
 } OSInfo;
 
+/*
+ * Dump stats, will be used by pg_upgrade to efficently run pg_restore 
+ */
+
+typedef struct
+{
+	uint64 *large_objects;
+}DbDumpStats;
+
 
 /*
  * Global variables
@@ -361,7 +372,7 @@ extern OSInfo os_info;
 /* check.c */
 
 void		output_check_banner(bool live_check);
-void		check_and_dump_old_cluster(bool live_check);
+void		check_and_dump_old_cluster(bool live_check, DbDumpStats **stats);
 void		check_new_cluster(void);
 void		report_clusters_compatible(void);
 void		issue_warnings_and_set_wal_level(void);
@@ -369,6 +380,7 @@ void		output_completion_banner(char *deletion_script_file_name);
 void		check_cluster_versions(void);
 void		check_cluster_compatibility(bool live_check);
 void		create_script_for_old_cluster_deletion(char **deletion_script_file_name);
+DbDumpStats* collect_db_stats(void);
 
 
 /* controldata.c */
