diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index a11f3abc82..12c5c4dbfc 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -13358,7 +13358,7 @@ NULL baz</literallayout>(3 rows)</entry>
       <entry>
        array of the argument type
       </entry>
-      <entry>No</entry>
+      <entry>Yes</entry>
       <entry>input values, including nulls, concatenated into an array</entry>
      </row>
 
@@ -13372,7 +13372,7 @@ NULL baz</literallayout>(3 rows)</entry>
       <entry>
        same as argument data type
       </entry>
-      <entry>No</entry>
+      <entry>Yes</entry>
       <entry>input arrays concatenated into array of one higher dimension
        (inputs must all have same dimensionality,
         and cannot be empty or NULL)</entry>
@@ -13633,7 +13633,7 @@ NULL baz</literallayout>(3 rows)</entry>
       <entry>
        same as argument types
       </entry>
-      <entry>No</entry>
+      <entry>Yes</entry>
       <entry>input values concatenated into a string, separated by delimiter</entry>
      </row>
 
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index ed6b680ed8..c87620632e 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -572,9 +572,27 @@ get_agg_clause_costs_walker(Node *node, get_agg_clause_costs_context *context)
 			 * whether they have serialization/deserialization functions; if
 			 * not, we can't serialize partial-aggregation results.
 			 */
-			else if (aggtranstype == INTERNALOID &&
-					 (!OidIsValid(aggserialfn) || !OidIsValid(aggdeserialfn)))
-				costs->hasNonSerial = true;
+			else if (aggtranstype == INTERNALOID)
+			{
+				if (!OidIsValid(aggserialfn) || !OidIsValid(aggdeserialfn))
+					costs->hasNonSerial = true;
+
+				/*
+				 * array_agg_serialize and array_agg_deserialize make use of
+				 * the aggregate non-byval input type's send and receive
+				 * functions.  There's a chance that the type being aggregated
+				 * has one or both of these functions missing.  In this case
+				 * we must not allow the aggregate's serial and deserial
+				 * functions to be used.  It would be nice not to have special
+				 * case this and instead provide some sort of supporting
+				 * function within the aggregate to do this, but for now, that
+				 * seems like overkill for this one case.
+				 */
+				if ((aggserialfn == F_ARRAY_AGG_SERIALIZE ||
+					 aggdeserialfn == F_ARRAY_AGG_DESERIALIZE) &&
+					 !agg_args_support_sendreceive(aggref))
+					costs->hasNonSerial = true;
+			}
 		}
 
 		/*
diff --git a/src/backend/parser/parse_agg.c b/src/backend/parser/parse_agg.c
index 377a7ed6d0..e0517d02a8 100644
--- a/src/backend/parser/parse_agg.c
+++ b/src/backend/parser/parse_agg.c
@@ -14,6 +14,7 @@
  */
 #include "postgres.h"
 
+#include "access/htup_details.h"
 #include "catalog/pg_aggregate.h"
 #include "catalog/pg_constraint_fn.h"
 #include "catalog/pg_type.h"
@@ -29,7 +30,7 @@
 #include "rewrite/rewriteManip.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
-
+#include "utils/syscache.h"
 
 typedef struct
 {
@@ -1873,6 +1874,40 @@ resolve_aggregate_transtype(Oid aggfuncid,
 	return aggtranstype;
 }
 
+/*
+ * agg_args_support_sendreceive
+ *		Returns true if all non-byval of aggref's arg types have send and
+ *		receive functions.
+ */
+bool
+agg_args_support_sendreceive(Aggref *aggref)
+{
+	ListCell *lc;
+
+	foreach(lc, aggref->args)
+	{
+		HeapTuple	typeTuple;
+		Form_pg_type pt;
+		TargetEntry *tle = (TargetEntry *) lfirst(lc);
+		Oid type = exprType((Node *) tle->expr);
+
+		typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
+		if (!HeapTupleIsValid(typeTuple))
+			elog(ERROR, "cache lookup failed for type %u", type);
+
+		pt = (Form_pg_type) GETSTRUCT(typeTuple);
+
+		if (!pt->typbyval &&
+			(!OidIsValid(pt->typsend) || !OidIsValid(pt->typreceive)))
+		{
+			ReleaseSysCache(typeTuple);
+			return false;
+		}
+		ReleaseSysCache(typeTuple);
+	}
+	return true;
+}
+
 /*
  * Create an expression tree for the transition function of an aggregate.
  * This is needed so that polymorphic functions can be used within an
diff --git a/src/backend/utils/adt/array_userfuncs.c b/src/backend/utils/adt/array_userfuncs.c
index cb7a6b6d01..0007a2864c 100644
--- a/src/backend/utils/adt/array_userfuncs.c
+++ b/src/backend/utils/adt/array_userfuncs.c
@@ -13,12 +13,32 @@
 #include "postgres.h"
 
 #include "catalog/pg_type.h"
+#include "libpq/pqformat.h"
 #include "common/int.h"
 #include "utils/array.h"
+#include "utils/datum.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/typcache.h"
 
+/*
+ * SerialIOData
+ *		Used for caching element-type data in array_agg_serialize
+ */
+typedef struct SerialIOData
+{
+	FmgrInfo	typsend;
+} SerialIOData;
+
+/*
+ * DeserialIOData
+ *		Used for caching element-type data in array_agg_deserialize
+ */
+typedef struct DeserialIOData
+{
+	FmgrInfo	typreceive;
+	Oid			typioparam;
+} DeserialIOData;
 
 static Datum array_position_common(FunctionCallInfo fcinfo);
 
@@ -498,6 +518,320 @@ array_agg_transfn(PG_FUNCTION_ARGS)
 	PG_RETURN_POINTER(state);
 }
 
+Datum
+array_agg_combine(PG_FUNCTION_ARGS)
+{
+	ArrayBuildState *state1;
+	ArrayBuildState *state2;
+	MemoryContext agg_context;
+	MemoryContext old_context;
+	int			i;
+
+	if (!AggCheckCallContext(fcinfo, &agg_context))
+		elog(ERROR, "aggregate function called in non-aggregate context");
+
+	state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(0);
+	state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildState *) PG_GETARG_POINTER(1);
+
+	if (state2 == NULL)
+	{
+		/*
+		 * NULL state2 is easy, just return state1, which we know is already
+		 * in the agg_context
+		 */
+		if (state1 == NULL)
+			PG_RETURN_NULL();
+		PG_RETURN_POINTER(state1);
+	}
+
+	if (state1 == NULL)
+	{
+		/* We must copy state2's data into the agg_context */
+		state1 = initArrayResultWithSize(state2->element_type, agg_context,
+										 false, state2->alen);
+
+		old_context = MemoryContextSwitchTo(agg_context);
+
+		for (i = 0; i < state2->nelems; i++)
+		{
+			if (!state2->dnulls[i])
+				state1->dvalues[i] = datumCopy(state2->dvalues[i],
+											   state1->typbyval,
+											   state1->typlen);
+			else
+				state1->dvalues[i] = (Datum) 0;
+		}
+
+		MemoryContextSwitchTo(old_context);
+
+		memcpy(state1->dnulls, state2->dnulls, sizeof(bool) * state2->nelems);
+
+		state1->nelems = state2->nelems;
+
+		PG_RETURN_POINTER(state1);
+	}
+	else if (state2->nelems > 0)
+	{
+		/* We only need to combine the two states if state2 has any elements */
+		int			reqsize = state1->nelems + state2->nelems;
+		MemoryContext oldContext = MemoryContextSwitchTo(state1->mcontext);
+
+		Assert(state1->element_type == state2->element_type);
+
+		/* Enlarge state1 arrays if needed */
+		if (state1->alen < reqsize)
+		{
+			/* Use a power of 2 size rather than allocating just reqsize */
+			while (state1->alen < reqsize)
+				state1->alen *= 2;
+
+			state1->dvalues = (Datum *) repalloc(state1->dvalues,
+												 state1->alen * sizeof(Datum));
+			state1->dnulls = (bool *) repalloc(state1->dnulls,
+											   state1->alen * sizeof(bool));
+		}
+
+		/* Copy in the state2 elements to the end of the state1 arrays */
+		for (i = 0; i < state2->nelems; i++)
+		{
+			if (!state2->dnulls[i])
+				state1->dvalues[i + state1->nelems] =
+					datumCopy(state2->dvalues[i],
+							  state1->typbyval,
+							  state1->typlen);
+			else
+				state1->dvalues[i + state1->nelems] = (Datum) 0;
+		}
+
+		memcpy(&state1->dnulls[state1->nelems], state2->dnulls,
+			   sizeof(bool) * state2->nelems);
+
+		state1->nelems = reqsize;
+
+		MemoryContextSwitchTo(oldContext);
+	}
+
+	PG_RETURN_POINTER(state1);
+}
+
+/*
+ * array_agg_serialize
+ *		Serialize ArrayBuildState into bytea.
+ */
+Datum
+array_agg_serialize(PG_FUNCTION_ARGS)
+{
+	ArrayBuildState *state;
+	StringInfoData buf;
+	bytea	   *result;
+
+	/* cannot be called directly because of internal-type argument */
+	Assert(AggCheckCallContext(fcinfo, NULL));
+
+	state = (ArrayBuildState *) PG_GETARG_POINTER(0);
+
+	pq_begintypsend(&buf);
+
+	/*
+	 * element_type. Putting this first is more convenient in deserialization
+	 */
+	pq_sendint32(&buf, state->element_type);
+
+	/*
+	 * nelems -- send first so we know how large to make the dvalues and
+	 * dnulls array during deserialization.
+	 */
+	pq_sendint64(&buf, state->nelems);
+
+	/* alen can be decided during deserialization */
+
+	/* typlen */
+	pq_sendint16(&buf, state->typlen);
+
+	/* typbyval */
+	pq_sendbyte(&buf, state->typbyval);
+
+	/* typalign */
+	pq_sendbyte(&buf, state->typalign);
+
+	/* dnulls */
+	pq_sendbytes(&buf, (char *) state->dnulls, sizeof(bool) * state->nelems);
+
+	/*
+	 * dvalues.  By agreement with array_agg_deserialize, when the element
+	 * type is byval, we just transmit the Datum array as-is, including any
+	 * null elements.  For by-ref types, we must invoke the element type's
+	 * send function, and we skip null elements (which is why the nulls flags
+	 * must be sent first).
+	 */
+	if (state->typbyval)
+		pq_sendbytes(&buf, (char *) state->dvalues,
+					 sizeof(Datum) * state->nelems);
+	else
+	{
+		SerialIOData *iodata;
+		int			i;
+
+		/* Avoid repeat catalog lookups for typsend function */
+		iodata = (SerialIOData *) fcinfo->flinfo->fn_extra;
+		if (iodata == NULL)
+		{
+			Oid			typsend;
+			bool		typisvarlena;
+
+			iodata = (SerialIOData *)
+				MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+								   sizeof(SerialIOData));
+			getTypeBinaryOutputInfo(state->element_type, &typsend,
+									&typisvarlena);
+			fmgr_info_cxt(typsend, &iodata->typsend,
+						  fcinfo->flinfo->fn_mcxt);
+			fcinfo->flinfo->fn_extra = (void *) iodata;
+		}
+
+		for (i = 0; i < state->nelems; i++)
+		{
+			bytea	   *outputbytes;
+
+			if (state->dnulls[i])
+				continue;
+			outputbytes = SendFunctionCall(&iodata->typsend,
+										   state->dvalues[i]);
+			pq_sendint32(&buf, VARSIZE(outputbytes) - VARHDRSZ);
+			pq_sendbytes(&buf, VARDATA(outputbytes),
+						 VARSIZE(outputbytes) - VARHDRSZ);
+		}
+	}
+
+	result = pq_endtypsend(&buf);
+
+	PG_RETURN_BYTEA_P(result);
+}
+
+Datum
+array_agg_deserialize(PG_FUNCTION_ARGS)
+{
+	bytea	   *sstate;
+	ArrayBuildState *result;
+	StringInfoData buf;
+	Oid			element_type;
+	int64		nelems;
+	const char *temp;
+
+	if (!AggCheckCallContext(fcinfo, NULL))
+		elog(ERROR, "aggregate function called in non-aggregate context");
+
+	sstate = PG_GETARG_BYTEA_PP(0);
+
+	/*
+	 * Copy the bytea into a StringInfo so that we can "receive" it using the
+	 * standard recv-function infrastructure.
+	 */
+	initStringInfo(&buf);
+	appendBinaryStringInfo(&buf,
+						   VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+	/* element_type */
+	element_type = pq_getmsgint(&buf, 4);
+
+	/* nelems */
+	nelems = pq_getmsgint64(&buf);
+
+	/* Create output ArrayBuildState with the needed number of elements */
+	result = initArrayResultWithSize(element_type, CurrentMemoryContext,
+									 false, nelems);
+	result->nelems = nelems;
+
+	/* typlen */
+	result->typlen = pq_getmsgint(&buf, 2);
+
+	/* typbyval */
+	result->typbyval = pq_getmsgbyte(&buf);
+
+	/* typalign */
+	result->typalign = pq_getmsgbyte(&buf);
+
+	/* dnulls */
+	temp = pq_getmsgbytes(&buf, sizeof(bool) * nelems);
+	memcpy(result->dnulls, temp, sizeof(bool) * nelems);
+
+	/* dvalues --- see comment in array_agg_serialize */
+	if (result->typbyval)
+	{
+		temp = pq_getmsgbytes(&buf, sizeof(Datum) * nelems);
+		memcpy(result->dvalues, temp, sizeof(Datum) * nelems);
+	}
+	else
+	{
+		DeserialIOData *iodata;
+		int			i;
+
+		/* Avoid repeat catalog lookups for typreceive function */
+		iodata = (DeserialIOData *) fcinfo->flinfo->fn_extra;
+		if (iodata == NULL)
+		{
+			Oid			typreceive;
+
+			iodata = (DeserialIOData *)
+				MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
+								   sizeof(DeserialIOData));
+			getTypeBinaryInputInfo(element_type, &typreceive,
+								   &iodata->typioparam);
+			fmgr_info_cxt(typreceive, &iodata->typreceive,
+						  fcinfo->flinfo->fn_mcxt);
+			fcinfo->flinfo->fn_extra = (void *) iodata;
+		}
+
+		for (i = 0; i < nelems; i++)
+		{
+			int			itemlen;
+			StringInfoData elem_buf;
+			char		csave;
+
+			if (result->dnulls[i])
+			{
+				result->dvalues[i] = (Datum) 0;
+				continue;
+			}
+
+			itemlen = pq_getmsgint(&buf, 4);
+			if (itemlen < 0 || itemlen > (buf.len - buf.cursor))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
+						 errmsg("insufficient data left in message")));
+
+			/*
+			 * Rather than copying data around, we just set up a phony
+			 * StringInfo pointing to the correct portion of the input buffer.
+			 * We assume we can scribble on the input buffer so as to maintain
+			 * the convention that StringInfos have a trailing null.
+			 */
+			elem_buf.data = &buf.data[buf.cursor];
+			elem_buf.maxlen = itemlen + 1;
+			elem_buf.len = itemlen;
+			elem_buf.cursor = 0;
+
+			buf.cursor += itemlen;
+
+			csave = buf.data[buf.cursor];
+			buf.data[buf.cursor] = '\0';
+
+			/* Now call the element's receiveproc */
+			result->dvalues[i] = ReceiveFunctionCall(&iodata->typreceive,
+													 &elem_buf,
+													 iodata->typioparam,
+													 -1);
+
+			buf.data[buf.cursor] = csave;
+		}
+	}
+
+	pq_getmsgend(&buf);
+	pfree(buf.data);
+
+	PG_RETURN_POINTER(result);
+}
+
 Datum
 array_agg_finalfn(PG_FUNCTION_ARGS)
 {
@@ -577,6 +911,305 @@ array_agg_array_transfn(PG_FUNCTION_ARGS)
 	PG_RETURN_POINTER(state);
 }
 
+Datum
+array_agg_array_combine(PG_FUNCTION_ARGS)
+{
+	ArrayBuildStateArr *state1;
+	ArrayBuildStateArr *state2;
+	MemoryContext agg_context;
+	MemoryContext old_context;
+
+	if (!AggCheckCallContext(fcinfo, &agg_context))
+		elog(ERROR, "aggregate function called in non-aggregate context");
+
+	state1 = PG_ARGISNULL(0) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(0);
+	state2 = PG_ARGISNULL(1) ? NULL : (ArrayBuildStateArr *) PG_GETARG_POINTER(1);
+
+	if (state2 == NULL)
+	{
+		/*
+		 * NULL state2 is easy, just return state1, which we know is already
+		 * in the agg_context
+		 */
+		if (state1 == NULL)
+			PG_RETURN_NULL();
+		PG_RETURN_POINTER(state1);
+	}
+
+	if (state1 == NULL)
+	{
+		/* We must copy state2's data into the agg_context */
+		old_context = MemoryContextSwitchTo(agg_context);
+
+		state1 = initArrayResultArr(state2->array_type, InvalidOid,
+									agg_context, false);
+
+		state1->abytes = state2->abytes;
+		state1->data = (char *) palloc(state1->abytes);
+
+		if (state2->nullbitmap)
+		{
+			int			size = (state2->aitems + 7) / 8;
+
+			state1->nullbitmap = (bits8 *) palloc(size);
+			memcpy(state1->nullbitmap, state2->nullbitmap, size);
+		}
+
+		memcpy(state1->data, state2->data, state2->nbytes);
+		state1->nbytes = state2->nbytes;
+		state1->aitems = state2->aitems;
+		state1->nitems = state2->nitems;
+		state1->ndims = state2->ndims;
+		memcpy(state1->dims, state2->dims, sizeof(state2->dims));
+		memcpy(state1->lbs, state2->lbs, sizeof(state2->lbs));
+		state1->array_type = state2->array_type;
+		state1->element_type = state2->element_type;
+
+		MemoryContextSwitchTo(old_context);
+
+		PG_RETURN_POINTER(state1);
+	}
+
+	/* We only need to combine the two states if state2 has any items */
+	else if (state2->nitems > 0)
+	{
+		MemoryContext oldContext;
+		int			reqsize = state1->nbytes + state2->nbytes;
+		int			i;
+
+		/*
+		 * Check the states are compatible with each other.  Ensure we use the
+		 * same error messages that are listed in accumArrayResultArr so that
+		 * the same error is shown as would have been if we'd not used the
+		 * combine function for the aggregation.
+		 */
+		if (state1->ndims != state2->ndims)
+			ereport(ERROR,
+					(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
+					 errmsg("cannot accumulate arrays of different dimensionality")));
+
+		/* Check dimensions match ignoring the first dimension. */
+		for (i = 1; i < state1->ndims; i++)
+		{
+			if (state1->dims[i] != state2->dims[i] || state1->lbs[i] != state2->lbs[i])
+				ereport(ERROR,
+						(errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
+						 errmsg("cannot accumulate arrays of different dimensionality")));
+		}
+
+
+		oldContext = MemoryContextSwitchTo(state1->mcontext);
+
+		/*
+		 * If there's not enough space in state1 then we'll need to reallocate
+		 * more.
+		 */
+		if (state1->abytes < reqsize)
+		{
+			/* use a power of 2 size rather than allocating just reqsize */
+			while (state1->abytes < reqsize)
+				state1->abytes *= 2;
+
+			state1->data = (char *) repalloc(state1->data, state1->abytes);
+		}
+
+		if (state2->nullbitmap)
+		{
+			int			newnitems = state1->nitems + state2->nitems;
+
+			if (state1->nullbitmap == NULL)
+			{
+				/*
+				 * First input with nulls; we must retrospectively handle any
+				 * previous inputs by marking all their items non-null.
+				 */
+				state1->aitems = 256;
+				while (state1->aitems <= newnitems)
+					state1->aitems *= 2;
+				state1->nullbitmap = (bits8 *) palloc((state1->aitems + 7) / 8);
+				array_bitmap_copy(state1->nullbitmap, 0,
+								  NULL, 0,
+								  state1->nitems);
+			}
+			else if (newnitems > state1->aitems)
+			{
+				int			newaitems = state1->aitems + state2->aitems;
+
+				while (state1->aitems < newaitems)
+					state1->aitems *= 2;
+
+				state1->nullbitmap = (bits8 *)
+					repalloc(state1->nullbitmap, (state1->aitems + 7) / 8);
+			}
+			array_bitmap_copy(state1->nullbitmap, state1->nitems,
+							  state2->nullbitmap, 0,
+							  state2->nitems);
+		}
+
+		memcpy(state1->data + state1->nbytes, state2->data, state2->nbytes);
+		state1->nbytes += state2->nbytes;
+		state1->nitems += state2->nitems;
+
+		state1->dims[0] += state2->dims[0];
+		/* remaing dims already match, per test above */
+
+		Assert(state1->array_type == state2->array_type);
+		Assert(state1->element_type = state2->element_type);
+
+		MemoryContextSwitchTo(oldContext);
+	}
+
+	PG_RETURN_POINTER(state1);
+}
+
+/*
+ * array_agg_array_serialize
+ *		Serialize ArrayBuildStateArr into bytea.
+ */
+Datum
+array_agg_array_serialize(PG_FUNCTION_ARGS)
+{
+	ArrayBuildStateArr *state;
+	StringInfoData buf;
+	bytea	   *result;
+
+	/* cannot be called directly because of internal-type argument */
+	Assert(AggCheckCallContext(fcinfo, NULL));
+
+	state = (ArrayBuildStateArr *) PG_GETARG_POINTER(0);
+
+	pq_begintypsend(&buf);
+
+	/*
+	 * element_type. Putting this first is more convenient in deserialization
+	 * so that we can init the new state sooner.
+	 */
+	pq_sendint32(&buf, state->element_type);
+
+	/* array_type */
+	pq_sendint32(&buf, state->array_type);
+
+	/* nbytes */
+	pq_sendint32(&buf, state->nbytes);
+
+	/* data */
+	pq_sendbytes(&buf, state->data, state->nbytes);
+
+	/* abytes */
+	pq_sendint32(&buf, state->abytes);
+
+	/* aitems */
+	pq_sendint32(&buf, state->aitems);
+
+	/* nullbitmap */
+	if (state->nullbitmap)
+	{
+		Assert(state->aitems > 0);
+		pq_sendbytes(&buf, (char *) state->nullbitmap, (state->aitems + 7) / 8);
+	}
+
+	/* nitems */
+	pq_sendint32(&buf, state->nitems);
+
+	/* ndims */
+	pq_sendint32(&buf, state->ndims);
+
+	/* dims: XXX should we just send ndims elements? */
+	pq_sendbytes(&buf, (char *) state->dims, sizeof(state->dims));
+
+	/* lbs */
+	pq_sendbytes(&buf, (char *) state->lbs, sizeof(state->lbs));
+
+	result = pq_endtypsend(&buf);
+
+	PG_RETURN_BYTEA_P(result);
+}
+
+Datum
+array_agg_array_deserialize(PG_FUNCTION_ARGS)
+{
+	bytea	   *sstate;
+	ArrayBuildStateArr *result;
+	StringInfoData buf;
+	Oid			element_type;
+	Oid			array_type;
+	int			nbytes;
+	const char *temp;
+
+	/* cannot be called directly because of internal-type argument */
+	Assert(AggCheckCallContext(fcinfo, NULL));
+
+	sstate = PG_GETARG_BYTEA_PP(0);
+
+	/*
+	 * Copy the bytea into a StringInfo so that we can "receive" it using the
+	 * standard recv-function infrastructure.
+	 */
+	initStringInfo(&buf);
+	appendBinaryStringInfo(&buf,
+						   VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+	/* element_type */
+	element_type = pq_getmsgint(&buf, 4);
+
+	/* array_type */
+	array_type = pq_getmsgint(&buf, 4);
+
+	/* nbytes */
+	nbytes = pq_getmsgint(&buf, 4);
+
+	result = initArrayResultArr(array_type, element_type,
+								CurrentMemoryContext, false);
+
+	result->abytes = 1024;
+	while (result->abytes < nbytes)
+		result->abytes *= 2;
+
+	result->data = (char *) palloc(result->abytes);
+
+	/* data */
+	temp = pq_getmsgbytes(&buf, nbytes);
+	memcpy(result->data, temp, nbytes);
+	result->nbytes = nbytes;
+
+	/* abytes */
+	result->abytes = pq_getmsgint(&buf, 4);
+
+	/* aitems: might be 0 */
+	result->aitems = pq_getmsgint(&buf, 4);
+
+	/* nullbitmap */
+	if (result->aitems > 0)
+	{
+		int			size = (result->aitems + 7) / 8;
+
+		result->nullbitmap = (bits8 *) palloc(size);
+		temp = pq_getmsgbytes(&buf, size);
+		memcpy(result->nullbitmap, temp, size);
+	}
+	else
+		result->nullbitmap = NULL;
+
+	/* nitems */
+	result->nitems = pq_getmsgint(&buf, 4);
+
+	/* ndims */
+	result->ndims = pq_getmsgint(&buf, 4);
+
+	/* dims */
+	temp = pq_getmsgbytes(&buf, sizeof(result->dims));
+	memcpy(result->dims, temp, sizeof(result->dims));
+
+	/* lbs */
+	temp = pq_getmsgbytes(&buf, sizeof(result->lbs));
+	memcpy(result->lbs, temp, sizeof(result->lbs));
+
+	pq_getmsgend(&buf);
+	pfree(buf.data);
+
+	PG_RETURN_POINTER(result);
+}
+
 Datum
 array_agg_array_finalfn(PG_FUNCTION_ARGS)
 {
diff --git a/src/backend/utils/adt/arrayfuncs.c b/src/backend/utils/adt/arrayfuncs.c
index 0cbdbe5587..12107043f2 100644
--- a/src/backend/utils/adt/arrayfuncs.c
+++ b/src/backend/utils/adt/arrayfuncs.c
@@ -4996,11 +4996,29 @@ array_insert_slice(ArrayType *destArray,
  * subcontext=false.
  *
  * In cases when the array build states have different lifetimes, using a
- * single memory context is impractical. Instead, pass subcontext=true so that
- * the array build states can be freed individually.
+ * single memory context is impractical.  Instead, pass subcontext=true so
+ * that the array build states can be freed individually.
  */
 ArrayBuildState *
 initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext)
+{
+	/*
+	 * When using a subcontext, we can afford to start with a somewhat larger
+	 * initial array size.  Without subcontexts, we'd better hope that most of
+	 * the states stay small ...
+	 */
+	return initArrayResultWithSize(element_type, rcontext, subcontext,
+								   subcontext ? 64 : 8);
+}
+
+/*
+ * initArrayResultWithSize
+ *		As initArrayResult, but allow the initial size of the allocated arrays
+ *		to be specified.
+ */
+ArrayBuildState *
+initArrayResultWithSize(Oid element_type, MemoryContext rcontext,
+						bool subcontext, int initsize)
 {
 	ArrayBuildState *astate;
 	MemoryContext arr_context = rcontext;
@@ -5015,7 +5033,7 @@ initArrayResult(Oid element_type, MemoryContext rcontext, bool subcontext)
 		MemoryContextAlloc(arr_context, sizeof(ArrayBuildState));
 	astate->mcontext = arr_context;
 	astate->private_cxt = subcontext;
-	astate->alen = (subcontext ? 64 : 8);	/* arbitrary starting array size */
+	astate->alen = initsize;
 	astate->dvalues = (Datum *)
 		MemoryContextAlloc(arr_context, astate->alen * sizeof(Datum));
 	astate->dnulls = (bool *)
diff --git a/src/backend/utils/adt/varlena.c b/src/backend/utils/adt/varlena.c
index 4346410d5a..fff89e76eb 100644
--- a/src/backend/utils/adt/varlena.c
+++ b/src/backend/utils/adt/varlena.c
@@ -453,29 +453,50 @@ bytea_string_agg_transfn(PG_FUNCTION_ARGS)
 
 	state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
 
-	/* Append the value unless null. */
+	/* Append the value unless null, preceding it with the delimiter. */
 	if (!PG_ARGISNULL(1))
 	{
 		bytea	   *value = PG_GETARG_BYTEA_PP(1);
+		bool		isfirst = false;
 
-		/* On the first time through, we ignore the delimiter. */
+		/*
+		 * We store the delimiter for each aggregated item, even the first
+		 * one, though you might think that could be discarded immediately.
+		 * Otherwise, partial aggregation can't work, because the combine
+		 * function needs the delimiter for the first item in the second
+		 * aggregated state.  (IOW, what we now think is the first item might
+		 * not be first overall.)  The finalfn is responsible for stripping
+		 * off the first delimiter.  So that it can tell how much to strip, we
+		 * store the length of the first delimiter in the StringInfo's cursor
+		 * field, which we don't otherwise need here.
+		 */
 		if (state == NULL)
+		{
 			state = makeStringAggState(fcinfo);
-		else if (!PG_ARGISNULL(2))
+			isfirst = true;
+		}
+
+		if (!PG_ARGISNULL(2))
 		{
 			bytea	   *delim = PG_GETARG_BYTEA_PP(2);
 
-			appendBinaryStringInfo(state, VARDATA_ANY(delim), VARSIZE_ANY_EXHDR(delim));
+			appendBinaryStringInfo(state, VARDATA_ANY(delim),
+								   VARSIZE_ANY_EXHDR(delim));
+			if (isfirst)
+				state->cursor = VARSIZE_ANY_EXHDR(delim);
 		}
 
-		appendBinaryStringInfo(state, VARDATA_ANY(value), VARSIZE_ANY_EXHDR(value));
+		appendBinaryStringInfo(state, VARDATA_ANY(value),
+							   VARSIZE_ANY_EXHDR(value));
 	}
 
 	/*
 	 * The transition type for string_agg() is declared to be "internal",
 	 * which is a pass-by-value type the same size as a pointer.
 	 */
-	PG_RETURN_POINTER(state);
+	if (state)
+		PG_RETURN_POINTER(state);
+	PG_RETURN_NULL();
 }
 
 Datum
@@ -490,11 +511,13 @@ bytea_string_agg_finalfn(PG_FUNCTION_ARGS)
 
 	if (state != NULL)
 	{
+		/* As per comment in transfn, strip data before the cursor position */
 		bytea	   *result;
+		int			strippedlen = state->len - state->cursor;
 
-		result = (bytea *) palloc(state->len + VARHDRSZ);
-		SET_VARSIZE(result, state->len + VARHDRSZ);
-		memcpy(VARDATA(result), state->data, state->len);
+		result = (bytea *) palloc(strippedlen + VARHDRSZ);
+		SET_VARSIZE(result, strippedlen + VARHDRSZ);
+		memcpy(VARDATA(result), &state->data[state->cursor], strippedlen);
 		PG_RETURN_BYTEA_P(result);
 	}
 	else
@@ -4653,23 +4676,171 @@ string_agg_transfn(PG_FUNCTION_ARGS)
 
 	state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
 
-	/* Append the value unless null. */
+	/* Append the value unless null, preceding it with the delimiter. */
 	if (!PG_ARGISNULL(1))
 	{
-		/* On the first time through, we ignore the delimiter. */
+		text	   *value = PG_GETARG_TEXT_PP(1);
+		bool		isfirst = false;
+
+		/*
+		 * We store the delimiter for each aggregated item, even the first
+		 * one, though you might think that could be discarded immediately.
+		 * Otherwise, partial aggregation can't work, because the combine
+		 * function needs the delimiter for the first item in the second
+		 * aggregated state.  (IOW, what we now think is the first item might
+		 * not be first overall.)  The finalfn is responsible for stripping
+		 * off the first delimiter.  So that it can tell how much to strip, we
+		 * store the length of the first delimiter in the StringInfo's cursor
+		 * field, which we don't otherwise need here.
+		 */
 		if (state == NULL)
+		{
 			state = makeStringAggState(fcinfo);
-		else if (!PG_ARGISNULL(2))
-			appendStringInfoText(state, PG_GETARG_TEXT_PP(2));	/* delimiter */
+			isfirst = true;
+		}
 
-		appendStringInfoText(state, PG_GETARG_TEXT_PP(1));	/* value */
+		if (!PG_ARGISNULL(2))
+		{
+			text	   *delim = PG_GETARG_TEXT_PP(2);
+
+			appendStringInfoText(state, delim);
+			if (isfirst)
+				state->cursor = VARSIZE_ANY_EXHDR(delim);
+		}
+
+		appendStringInfoText(state, value);
 	}
 
 	/*
 	 * The transition type for string_agg() is declared to be "internal",
 	 * which is a pass-by-value type the same size as a pointer.
 	 */
-	PG_RETURN_POINTER(state);
+	if (state)
+		PG_RETURN_POINTER(state);
+	PG_RETURN_NULL();
+}
+
+/*
+ * string_agg_combine
+ *		Aggregate combine function for string_agg(text) and string_agg(bytea)
+ */
+Datum
+string_agg_combine(PG_FUNCTION_ARGS)
+{
+	StringInfo	state1;
+	StringInfo	state2;
+	MemoryContext agg_context;
+
+	if (!AggCheckCallContext(fcinfo, &agg_context))
+		elog(ERROR, "aggregate function called in non-aggregate context");
+
+	state1 = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
+	state2 = PG_ARGISNULL(1) ? NULL : (StringInfo) PG_GETARG_POINTER(1);
+
+	if (state2 == NULL)
+	{
+		/*
+		 * NULL state2 is easy, just return state1, which we know is already
+		 * in the agg_context
+		 */
+		if (state1 == NULL)
+			PG_RETURN_NULL();
+		PG_RETURN_POINTER(state1);
+	}
+
+	if (state1 == NULL)
+	{
+		/* We must copy state2's data into the agg_context */
+		MemoryContext old_context;
+
+		old_context = MemoryContextSwitchTo(agg_context);
+		state1 = makeStringAggState(fcinfo);
+		appendBinaryStringInfo(state1, state2->data, state2->len);
+		state1->cursor = state2->cursor;
+		MemoryContextSwitchTo(old_context);
+	}
+	else if (state2->len > 0)
+	{
+		/* Combine ... state1->cursor does not change in this case */
+		appendBinaryStringInfo(state1, state2->data, state2->len);
+	}
+
+	PG_RETURN_POINTER(state1);
+}
+
+/*
+ * string_agg_serialize
+ *		Aggregate serialize function for string_agg(text) and string_agg(bytea)
+ *
+ * This is strict, so we need not handle NULL input
+ */
+Datum
+string_agg_serialize(PG_FUNCTION_ARGS)
+{
+	StringInfo	state;
+	StringInfoData buf;
+	bytea	   *result;
+
+	/* cannot be called directly because of internal-type argument */
+	Assert(AggCheckCallContext(fcinfo, NULL));
+
+	state = (StringInfo) PG_GETARG_POINTER(0);
+
+	pq_begintypsend(&buf);
+
+	/* cursor */
+	pq_sendint(&buf, state->cursor, 4);
+
+	/* data */
+	pq_sendbytes(&buf, state->data, state->len);
+
+	result = pq_endtypsend(&buf);
+
+	PG_RETURN_BYTEA_P(result);
+}
+
+/*
+ * string_agg_deserialize
+ *		Aggregate deserial function for string_agg(text) and string_agg(bytea)
+ *
+ * This is strict, so we need not handle NULL input
+ */
+Datum
+string_agg_deserialize(PG_FUNCTION_ARGS)
+{
+	bytea	   *sstate;
+	StringInfo	result;
+	StringInfoData buf;
+	char	   *data;
+	int			datalen;
+
+	/* cannot be called directly because of internal-type argument */
+	Assert(AggCheckCallContext(fcinfo, NULL));
+
+	sstate = PG_GETARG_BYTEA_PP(0);
+
+	/*
+	 * Copy the bytea into a StringInfo so that we can "receive" it using the
+	 * standard recv-function infrastructure.
+	 */
+	initStringInfo(&buf);
+	appendBinaryStringInfo(&buf,
+						   VARDATA_ANY(sstate), VARSIZE_ANY_EXHDR(sstate));
+
+	result = makeStringAggState(fcinfo);
+
+	/* cursor */
+	result->cursor = pq_getmsgint(&buf, 4);
+
+	/* data */
+	datalen = VARSIZE_ANY_EXHDR(sstate) - 4;
+	data = (char *) pq_getmsgbytes(&buf, datalen);
+	appendBinaryStringInfo(result, data, datalen);
+
+	pq_getmsgend(&buf);
+	pfree(buf.data);
+
+	PG_RETURN_POINTER(result);
 }
 
 Datum
@@ -4683,7 +4854,11 @@ string_agg_finalfn(PG_FUNCTION_ARGS)
 	state = PG_ARGISNULL(0) ? NULL : (StringInfo) PG_GETARG_POINTER(0);
 
 	if (state != NULL)
-		PG_RETURN_TEXT_P(cstring_to_text_with_len(state->data, state->len));
+	{
+		/* As per comment in transfn, strip data before the cursor position */
+		PG_RETURN_TEXT_P(cstring_to_text_with_len(&state->data[state->cursor],
+												  state->len - state->cursor));
+	}
 	else
 		PG_RETURN_NULL();
 }
diff --git a/src/include/catalog/pg_aggregate.h b/src/include/catalog/pg_aggregate.h
index 125bb5b479..b624f4eeaf 100644
--- a/src/include/catalog/pg_aggregate.h
+++ b/src/include/catalog/pg_aggregate.h
@@ -300,14 +300,14 @@ DATA(insert ( 2243	n 0 bitor		-				bitor	-	-	-				-				-				f f r r 0	1560	0	0
 DATA(insert ( 2901	n 0 xmlconcat2	-				-		-	-	-				-				-				f f r r 0	142		0	0		0	_null_ _null_ ));
 
 /* array */
-DATA(insert ( 2335	n 0 array_agg_transfn		array_agg_finalfn		-	-	-	-		-				-				t f r r 0	2281	0	0		0	_null_ _null_ ));
-DATA(insert ( 4053	n 0 array_agg_array_transfn array_agg_array_finalfn -	-	-	-		-				-				t f r r 0	2281	0	0		0	_null_ _null_ ));
+DATA(insert ( 2335	n 0 array_agg_transfn		array_agg_finalfn		array_agg_combine		array_agg_serialize			array_agg_deserialize		-		-				-				t f r r 0	2281	0	0		0	_null_ _null_ ));
+DATA(insert ( 4053	n 0 array_agg_array_transfn array_agg_array_finalfn array_agg_array_combine	array_agg_array_serialize	array_agg_array_deserialize	-		-				-				t f r r 0	2281	0	0		0	_null_ _null_ ));
 
 /* text */
-DATA(insert ( 3538	n 0 string_agg_transfn	string_agg_finalfn	-	-	-	-				-				-				f f r r 0	2281	0	0		0	_null_ _null_ ));
+DATA(insert ( 3538	n 0 string_agg_transfn	string_agg_finalfn	string_agg_combine	string_agg_serialize	string_agg_deserialize	-				-				-				f f r r 0	2281	0	0		0	_null_ _null_ ));
 
 /* bytea */
-DATA(insert ( 3545	n 0 bytea_string_agg_transfn	bytea_string_agg_finalfn	-	-	-	-				-				-		f f r r 0	2281	0	0		0	_null_ _null_ ));
+DATA(insert ( 3545	n 0 bytea_string_agg_transfn	bytea_string_agg_finalfn	string_agg_combine	string_agg_serialize	string_agg_deserialize	-				-				-		f f r r 0	2281	0	0		0	_null_ _null_ ));
 
 /* json */
 DATA(insert ( 3175	n 0 json_agg_transfn	json_agg_finalfn			-	-	-	-				-				-				f f r r 0	2281	0	0		0	_null_ _null_ ));
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index ec50afcdf0..3933a133ba 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -942,12 +942,24 @@ DATA(insert OID = 3168 (  array_replace    PGNSP PGUID 12 1 0 0 0 f f f f f i s
 DESCR("replace any occurrences of an element in an array");
 DATA(insert OID = 2333 (  array_agg_transfn   PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2776" _null_ _null_ _null_ _null_ _null_ array_agg_transfn _null_ _null_ _null_ ));
 DESCR("aggregate transition function");
+DATA(insert OID = 3423 (  array_agg_combine	 PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ array_agg_combine _null_ _null_ _null_ ));
+DESCR("aggregate combine function");
+DATA(insert OID = 3424 (  array_agg_serialize    PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ array_agg_serialize _null_ _null_ _null_ ));
+DESCR("aggregate serial function");
+DATA(insert OID = 3425 (  array_agg_deserialize	 PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ array_agg_deserialize _null_ _null_ _null_ ));
+DESCR("aggregate deserial function");
 DATA(insert OID = 2334 (  array_agg_finalfn   PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2277 "2281 2776" _null_ _null_ _null_ _null_ _null_ array_agg_finalfn _null_ _null_ _null_ ));
 DESCR("aggregate final function");
 DATA(insert OID = 2335 (  array_agg		   PGNSP PGUID 12 1 0 0 0 a f f f f i s 1 0 2277 "2776" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ ));
 DESCR("concatenate aggregate input into an array");
 DATA(insert OID = 4051 (  array_agg_array_transfn	PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2277" _null_ _null_ _null_ _null_ _null_ array_agg_array_transfn _null_ _null_ _null_ ));
 DESCR("aggregate transition function");
+DATA(insert OID = 3426 (  array_agg_array_combine	PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_combine _null_ _null_ _null_ ));
+DESCR("aggregate combine function");
+DATA(insert OID = 3427 (  array_agg_array_serialize PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_serialize _null_ _null_ _null_ ));
+DESCR("aggregate serial function");
+DATA(insert OID = 3428 (  array_agg_array_deserialize	PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ array_agg_array_deserialize _null_ _null_ _null_ ));
+DESCR("aggregate deserial function");
 DATA(insert OID = 4052 (  array_agg_array_finalfn	PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2277 "2281 2277" _null_ _null_ _null_ _null_ _null_ array_agg_array_finalfn _null_ _null_ _null_ ));
 DESCR("aggregate final function");
 DATA(insert OID = 4053 (  array_agg		   PGNSP PGUID 12 1 0 0 0 a f f f f i s 1 0 2277 "2277" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ ));
@@ -2726,6 +2738,12 @@ DESCR("aggregate final function");
 
 DATA(insert OID = 3535 (  string_agg_transfn		PGNSP PGUID 12 1 0 0 0 f f f f f i s 3 0 2281 "2281 25 25" _null_ _null_ _null_ _null_ _null_ string_agg_transfn _null_ _null_ _null_ ));
 DESCR("aggregate transition function");
+DATA(insert OID = 3429 (  string_agg_combine		PGNSP PGUID 12 1 0 0 0 f f f f f i s 2 0 2281 "2281 2281" _null_ _null_ _null_ _null_ _null_ string_agg_combine _null_ _null_ _null_ ));
+DESCR("aggregate combine function");
+DATA(insert OID = 3430 (  string_agg_serialize		PGNSP PGUID 12 1 0 0 0 f f f t f i s 1 0 17 "2281" _null_ _null_ _null_ _null_ _null_ string_agg_serialize _null_ _null_ _null_ ));
+DESCR("aggregate serial function");
+DATA(insert OID = 3431 (  string_agg_deserialize	PGNSP PGUID 12 1 0 0 0 f f f t f i s 2 0 2281 "17 2281" _null_ _null_ _null_ _null_ _null_ string_agg_deserialize _null_ _null_ _null_ ));
+DESCR("aggregate deserial function");
 DATA(insert OID = 3536 (  string_agg_finalfn		PGNSP PGUID 12 1 0 0 0 f f f f f i s 1 0 25 "2281" _null_ _null_ _null_ _null_ _null_ string_agg_finalfn _null_ _null_ _null_ ));
 DESCR("aggregate final function");
 DATA(insert OID = 3538 (  string_agg				PGNSP PGUID 12 1 0 0 0 a f f f f i s 2 0 25 "25 25" _null_ _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ ));
diff --git a/src/include/parser/parse_agg.h b/src/include/parser/parse_agg.h
index 0e6adffe57..39df0c5174 100644
--- a/src/include/parser/parse_agg.h
+++ b/src/include/parser/parse_agg.h
@@ -35,6 +35,8 @@ extern Oid resolve_aggregate_transtype(Oid aggfuncid,
 							Oid *inputTypes,
 							int numArguments);
 
+extern bool agg_args_support_sendreceive(Aggref *aggref);
+
 extern void build_aggregate_transfn_expr(Oid *agg_input_types,
 							 int agg_num_inputs,
 							 int agg_num_direct_inputs,
diff --git a/src/include/utils/array.h b/src/include/utils/array.h
index afbb532e9c..f99e05a258 100644
--- a/src/include/utils/array.h
+++ b/src/include/utils/array.h
@@ -393,6 +393,9 @@ extern bool array_contains_nulls(ArrayType *array);
 
 extern ArrayBuildState *initArrayResult(Oid element_type,
 				MemoryContext rcontext, bool subcontext);
+extern ArrayBuildState *initArrayResultWithSize(Oid element_type,
+						MemoryContext rcontext,
+						bool subcontext, int initsize);
 extern ArrayBuildState *accumArrayResult(ArrayBuildState *astate,
 				 Datum dvalue, bool disnull,
 				 Oid element_type,
diff --git a/src/test/regress/expected/aggregates.out b/src/test/regress/expected/aggregates.out
index f85e913850..406fd21411 100644
--- a/src/test/regress/expected/aggregates.out
+++ b/src/test/regress/expected/aggregates.out
@@ -1381,6 +1381,106 @@ select string_agg(v, decode('ee', 'hex')) from bytea_test_table;
 (1 row)
 
 drop table bytea_test_table;
+-- Test parallel string_agg and array_agg
+create table pagg_test (x int, y int);
+insert into pagg_test
+select (case x % 4 when 1 then null else x end), x % 10
+from generate_series(1,5000) x;
+set parallel_setup_cost TO 0;
+set parallel_tuple_cost TO 0;
+set parallel_leader_participation TO 0;
+set min_parallel_table_scan_size = 0;
+set bytea_output = 'escape';
+-- create a view as we otherwise have to repeat this query a few times.
+create view v_pagg_test AS
+select
+	y,
+	min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct,
+	min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct,
+	min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct,
+	min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct
+from (
+	select
+		y,
+		unnest(regexp_split_to_array(a1.t, ','))::int AS t,
+		unnest(regexp_split_to_array(a1.b::text, ',')) AS b,
+		unnest(a1.a) AS a,
+		unnest(a1.aa) AS aa
+	from (
+		select
+			y,
+			string_agg(x::text, ',') AS t,
+			string_agg(x::text::bytea, ',') AS b,
+			array_agg(x) AS a,
+			array_agg(ARRAY[x]) AS aa
+		from pagg_test
+		group by y
+	) a1
+) a2
+group by y;
+-- Ensure results are correct.
+select * from v_pagg_test order by y;
+ y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct 
+---+------+------+------------+------+------+------------+------+------+------------+-------+-------+-------------
+ 0 |   10 | 5000 |        500 | 10   | 990  |        500 |   10 | 5000 |        500 |    10 |  5000 |         500
+ 1 |   11 | 4991 |        250 | 1011 | 991  |        250 |   11 | 4991 |        250 |    11 |  4991 |         250
+ 2 |    2 | 4992 |        500 | 1002 | 992  |        500 |    2 | 4992 |        500 |     2 |  4992 |         500
+ 3 |    3 | 4983 |        250 | 1003 | 983  |        250 |    3 | 4983 |        250 |     3 |  4983 |         250
+ 4 |    4 | 4994 |        500 | 1004 | 994  |        500 |    4 | 4994 |        500 |     4 |  4994 |         500
+ 5 |   15 | 4995 |        250 | 1015 | 995  |        250 |   15 | 4995 |        250 |    15 |  4995 |         250
+ 6 |    6 | 4996 |        500 | 1006 | 996  |        500 |    6 | 4996 |        500 |     6 |  4996 |         500
+ 7 |    7 | 4987 |        250 | 1007 | 987  |        250 |    7 | 4987 |        250 |     7 |  4987 |         250
+ 8 |    8 | 4998 |        500 | 1008 | 998  |        500 |    8 | 4998 |        500 |     8 |  4998 |         500
+ 9 |   19 | 4999 |        250 | 1019 | 999  |        250 |   19 | 4999 |        250 |    19 |  4999 |         250
+(10 rows)
+
+-- Ensure parallel aggregation is actually being used.
+explain (costs off) select * from v_pagg_test order by y;
+                                   QUERY PLAN                                   
+--------------------------------------------------------------------------------
+ GroupAggregate
+   Group Key: pagg_test.y
+   ->  Sort
+         Sort Key: pagg_test.y
+         ->  Result
+               ->  ProjectSet
+                     ->  Finalize GroupAggregate
+                           Group Key: pagg_test.y
+                           ->  Gather Merge
+                                 Workers Planned: 2
+                                 ->  Partial GroupAggregate
+                                       Group Key: pagg_test.y
+                                       ->  Sort
+                                             Sort Key: pagg_test.y
+                                             ->  Parallel Seq Scan on pagg_test
+(15 rows)
+
+set max_parallel_workers_per_gather = 0;
+-- Ensure results are the same without parallel aggregation.
+select * from v_pagg_test order by y;
+ y | tmin | tmax | tndistinct | bmin | bmax | bndistinct | amin | amax | andistinct | aamin | aamax | aandistinct 
+---+------+------+------------+------+------+------------+------+------+------------+-------+-------+-------------
+ 0 |   10 | 5000 |        500 | 10   | 990  |        500 |   10 | 5000 |        500 |    10 |  5000 |         500
+ 1 |   11 | 4991 |        250 | 1011 | 991  |        250 |   11 | 4991 |        250 |    11 |  4991 |         250
+ 2 |    2 | 4992 |        500 | 1002 | 992  |        500 |    2 | 4992 |        500 |     2 |  4992 |         500
+ 3 |    3 | 4983 |        250 | 1003 | 983  |        250 |    3 | 4983 |        250 |     3 |  4983 |         250
+ 4 |    4 | 4994 |        500 | 1004 | 994  |        500 |    4 | 4994 |        500 |     4 |  4994 |         500
+ 5 |   15 | 4995 |        250 | 1015 | 995  |        250 |   15 | 4995 |        250 |    15 |  4995 |         250
+ 6 |    6 | 4996 |        500 | 1006 | 996  |        500 |    6 | 4996 |        500 |     6 |  4996 |         500
+ 7 |    7 | 4987 |        250 | 1007 | 987  |        250 |    7 | 4987 |        250 |     7 |  4987 |         250
+ 8 |    8 | 4998 |        500 | 1008 | 998  |        500 |    8 | 4998 |        500 |     8 |  4998 |         500
+ 9 |   19 | 4999 |        250 | 1019 | 999  |        250 |   19 | 4999 |        250 |    19 |  4999 |         250
+(10 rows)
+
+-- Clean up
+reset max_parallel_workers_per_gather;
+reset bytea_output;
+reset min_parallel_table_scan_size;
+reset parallel_leader_participation;
+reset parallel_tuple_cost;
+reset parallel_setup_cost;
+drop view v_pagg_test;
+drop table pagg_test;
 -- FILTER tests
 select min(unique1) filter (where unique1 > 100) from tenk1;
  min 
diff --git a/src/test/regress/sql/aggregates.sql b/src/test/regress/sql/aggregates.sql
index 506d0442d7..9894aa2b2b 100644
--- a/src/test/regress/sql/aggregates.sql
+++ b/src/test/regress/sql/aggregates.sql
@@ -520,6 +520,68 @@ select string_agg(v, decode('ee', 'hex')) from bytea_test_table;
 
 drop table bytea_test_table;
 
+-- Test parallel string_agg and array_agg
+create table pagg_test (x int, y int);
+insert into pagg_test
+select (case x % 4 when 1 then null else x end), x % 10
+from generate_series(1,5000) x;
+
+set parallel_setup_cost TO 0;
+set parallel_tuple_cost TO 0;
+set parallel_leader_participation TO 0;
+set min_parallel_table_scan_size = 0;
+set bytea_output = 'escape';
+
+-- create a view as we otherwise have to repeat this query a few times.
+create view v_pagg_test AS
+select
+	y,
+	min(t) AS tmin,max(t) AS tmax,count(distinct t) AS tndistinct,
+	min(b) AS bmin,max(b) AS bmax,count(distinct b) AS bndistinct,
+	min(a) AS amin,max(a) AS amax,count(distinct a) AS andistinct,
+	min(aa) AS aamin,max(aa) AS aamax,count(distinct aa) AS aandistinct
+from (
+	select
+		y,
+		unnest(regexp_split_to_array(a1.t, ','))::int AS t,
+		unnest(regexp_split_to_array(a1.b::text, ',')) AS b,
+		unnest(a1.a) AS a,
+		unnest(a1.aa) AS aa
+	from (
+		select
+			y,
+			string_agg(x::text, ',') AS t,
+			string_agg(x::text::bytea, ',') AS b,
+			array_agg(x) AS a,
+			array_agg(ARRAY[x]) AS aa
+		from pagg_test
+		group by y
+	) a1
+) a2
+group by y;
+
+-- Ensure results are correct.
+select * from v_pagg_test order by y;
+
+-- Ensure parallel aggregation is actually being used.
+explain (costs off) select * from v_pagg_test order by y;
+
+set max_parallel_workers_per_gather = 0;
+
+-- Ensure results are the same without parallel aggregation.
+select * from v_pagg_test order by y;
+
+-- Clean up
+reset max_parallel_workers_per_gather;
+reset bytea_output;
+reset min_parallel_table_scan_size;
+reset parallel_leader_participation;
+reset parallel_tuple_cost;
+reset parallel_setup_cost;
+
+drop view v_pagg_test;
+drop table pagg_test;
+
 -- FILTER tests
 
 select min(unique1) filter (where unique1 > 100) from tenk1;
