From e0329c7a8a886e51a4d09e25ab22b70c8afc2079 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Sun, 2 Feb 2020 22:24:22 +1300
Subject: [PATCH] Prefetch cache lines while building hash join table.

Since tuple insertion is expected to generate cache misses
most of the time for large hash tables, build a small software
prefetch pipeline, on compilers that provide a builtin for
that.

Experimental code only!

Discussion: https://postgr.es/m/CAEepm%3D2y9HM9QP%2BHhRZdQ3pU6FShSMyu%3DV1uHXhQ5gG-dketHg%40mail.gmail.com
---
 config/c-compiler.m4                |  17 +++++
 configure                           |  40 ++++++++++
 configure.in                        |   3 +
 src/backend/executor/nodeHash.c     | 112 ++++++++++++++++++++++++++--
 src/backend/executor/nodeHashjoin.c |   2 +
 src/include/c.h                     |   8 ++
 src/include/executor/hashjoin.h     |  15 ++++
 src/include/executor/nodeHash.h     |   2 +
 src/include/pg_config.h.in          |   3 +
 9 files changed, 196 insertions(+), 6 deletions(-)

diff --git a/config/c-compiler.m4 b/config/c-compiler.m4
index 71b645839d..656c259e9b 100644
--- a/config/c-compiler.m4
+++ b/config/c-compiler.m4
@@ -394,6 +394,23 @@ AC_DEFINE_UNQUOTED(AS_TR_CPP([HAVE$1]), 1,
                    [Define to 1 if your compiler understands $1.])
 fi])# PGAC_CHECK_BUILTIN_FUNC
 
+# PGAC_CHECK_BUILTIN_VOID_FUNC
+# -----------------------
+# Variant for void functions.
+AC_DEFUN([PGAC_CHECK_BUILTIN_VOID_FUNC],
+[AC_CACHE_CHECK(for $1, pgac_cv$1,
+[AC_LINK_IFELSE([AC_LANG_PROGRAM([
+void
+call$1($2)
+{
+    $1(x);
+}], [])],
+[pgac_cv$1=yes],
+[pgac_cv$1=no])])
+if test x"${pgac_cv$1}" = xyes ; then
+AC_DEFINE_UNQUOTED(AS_TR_CPP([HAVE$1]), 1,
+                   [Define to 1 if your compiler understands $1.])
+fi])# PGAC_CHECK_BUILTIN_VOID_FUNC
 
 
 # PGAC_PROG_VARCC_VARFLAGS_OPT
diff --git a/configure b/configure
index 702adba839..73d9f92bec 100755
--- a/configure
+++ b/configure
@@ -15240,6 +15240,46 @@ _ACEOF
 
 fi
 
+# Can we use a built-in to prefetch memory?
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for __builtin_prefetch" >&5
+$as_echo_n "checking for __builtin_prefetch... " >&6; }
+if ${pgac_cv__builtin_prefetch+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+void
+call__builtin_prefetch(void *x)
+{
+    __builtin_prefetch(x);
+}
+int
+main ()
+{
+
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  pgac_cv__builtin_prefetch=yes
+else
+  pgac_cv__builtin_prefetch=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $pgac_cv__builtin_prefetch" >&5
+$as_echo "$pgac_cv__builtin_prefetch" >&6; }
+if test x"${pgac_cv__builtin_prefetch}" = xyes ; then
+
+cat >>confdefs.h <<_ACEOF
+#define HAVE__BUILTIN_PREFETCH 1
+_ACEOF
+
+fi
+
 ac_fn_c_check_func "$LINENO" "fseeko" "ac_cv_func_fseeko"
 if test "x$ac_cv_func_fseeko" = xyes; then :
   $as_echo "#define HAVE_FSEEKO 1" >>confdefs.h
diff --git a/configure.in b/configure.in
index 8165f70039..a6173dd362 100644
--- a/configure.in
+++ b/configure.in
@@ -1660,6 +1660,9 @@ PGAC_CHECK_BUILTIN_FUNC([__builtin_clz], [unsigned int x])
 PGAC_CHECK_BUILTIN_FUNC([__builtin_ctz], [unsigned int x])
 PGAC_CHECK_BUILTIN_FUNC([__builtin_popcount], [unsigned int x])
 
+# Can we use a built-in to prefetch memory?
+PGAC_CHECK_BUILTIN_VOID_FUNC([__builtin_prefetch], [void *x])
+
 AC_REPLACE_FUNCS(fseeko)
 case $host_os in
 	# NetBSD uses a custom fseeko/ftello built on fsetpos/fgetpos
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index b6d5084908..a44a358fee 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -68,6 +68,10 @@ static inline HashJoinTuple ExecParallelHashNextTuple(HashJoinTable table,
 static inline void ExecParallelHashPushTuple(dsa_pointer_atomic *head,
 											 HashJoinTuple tuple,
 											 dsa_pointer tuple_shared);
+static inline void ExecParallelHashEnqueueTuple(HashJoinTable hashtable,
+												HashJoinTuple tuple,
+												dsa_pointer tuple_shared,
+												int bucketno);
 static void ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch);
 static void ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable);
 static void ExecParallelHashRepartitionFirst(HashJoinTable hashtable);
@@ -79,6 +83,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable hashtable,
 										  size_t size);
 static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
 static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+static void ExecHashResetInsertQueue(HashJoinTable hashtable);
 
 
 /* ----------------------------------------------------------------
@@ -189,6 +194,7 @@ MultiExecPrivateHash(HashState *node)
 			hashtable->totalTuples += 1;
 		}
 	}
+	ExecHashFlushInsertQueue(hashtable);
 
 	/* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */
 	if (hashtable->nbuckets != hashtable->nbuckets_optimal)
@@ -289,6 +295,7 @@ MultiExecParallelHash(HashState *node)
 					ExecParallelHashTableInsert(hashtable, slot, hashvalue);
 				hashtable->partialTuples++;
 			}
+			ExecParallelHashFlushInsertQueue(hashtable);
 
 			/*
 			 * Make sure that any tuples we wrote to disk are visible to
@@ -512,6 +519,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 	hashtable->parallel_state = state->parallel_state;
 	hashtable->area = state->ps.state->es_query_dsa;
 	hashtable->batches = NULL;
+	ExecHashResetInsertQueue(hashtable);
 
 #ifdef HJDEBUG
 	printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n",
@@ -905,6 +913,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
 	nbatch = oldnbatch * 2;
 	Assert(nbatch > 1);
 
+	ExecHashFlushInsertQueue(hashtable);
+
 #ifdef HJDEBUG
 	printf("Hashjoin %p: increasing nbatch to %d because space = %zu\n",
 		   hashtable, nbatch, hashtable->spaceUsed);
@@ -1062,6 +1072,8 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable)
 
 	Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
 
+	ExecHashResetInsertQueue(hashtable);
+
 	/*
 	 * It's unlikely, but we need to be prepared for new participants to show
 	 * up while we're in the middle of this operation so we need to switch on
@@ -1461,6 +1473,7 @@ ExecHashIncreaseNumBuckets(HashJoinTable hashtable)
 
 	memset(hashtable->buckets.unshared, 0,
 		   hashtable->nbuckets * sizeof(HashJoinTuple));
+	ExecHashResetInsertQueue(hashtable);
 
 	/* scan through all tuples in all chunks to rebuild the hash table */
 	for (chunk = hashtable->chunks; chunk != NULL; chunk = chunk->next.unshared)
@@ -1501,6 +1514,8 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
 
 	Assert(BarrierPhase(&pstate->build_barrier) == PHJ_BUILD_HASHING_INNER);
 
+	ExecHashResetInsertQueue(hashtable);
+
 	/*
 	 * It's unlikely, but we need to be prepared for new participants to show
 	 * up while we're in the middle of this operation so we need to switch on
@@ -1579,6 +1594,55 @@ ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable)
 	}
 }
 
+static inline void
+ExecHashPushTuple(HashJoinTable hashtable, HashJoinTuple tuple, int bucketno)
+{
+	tuple->next.unshared = hashtable->buckets.unshared[bucketno];
+	hashtable->buckets.unshared[bucketno] = tuple;
+}
+
+static inline void
+ExecHashEnqueueTuple(HashJoinTable hashtable, HashJoinTuple tuple, int bucketno)
+{
+	HashJoinTableInserter *inserter = &hashtable->inserter;
+	int			i = inserter->head;
+
+	/* Push the oldest item into the hash table. */
+	if (inserter->queue[i].tuple)
+		ExecHashPushTuple(hashtable,
+						  inserter->queue[i].tuple,
+						  inserter->queue[i].bucketno);
+
+	/* Start fetching the cache line, and add it to the queue. */
+	pg_prefetch_mem(&hashtable->buckets.unshared[bucketno]);
+	inserter->queue[i].tuple = tuple;
+	inserter->queue[i].bucketno = bucketno;
+	inserter->head = (i + 1) % HJ_INSERTION_QUEUE_DEPTH;
+}
+
+void
+ExecHashFlushInsertQueue(HashJoinTable hashtable)
+{
+	HashJoinTableInserter *inserter = &hashtable->inserter;
+
+	for (int i = 0; i < HJ_INSERTION_QUEUE_DEPTH; ++i)
+	{
+		if (inserter->queue[i].tuple)
+		{
+			ExecHashPushTuple(hashtable,
+							  inserter->queue[i].tuple,
+							  inserter->queue[i].bucketno);
+			inserter->queue[i].tuple = NULL;
+		}
+	}
+}
+
+static inline void
+ExecHashResetInsertQueue(HashJoinTable hashtable)
+{
+	memset(&hashtable->inserter, 0, sizeof(hashtable->inserter));
+}
+
 /*
  * ExecHashTableInsert
  *		insert a tuple into the hash table depending on the hash value
@@ -1631,8 +1695,7 @@ ExecHashTableInsert(HashJoinTable hashtable,
 		HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
 
 		/* Push it onto the front of the bucket's list */
-		hashTuple->next.unshared = hashtable->buckets.unshared[bucketno];
-		hashtable->buckets.unshared[bucketno] = hashTuple;
+		ExecHashEnqueueTuple(hashtable, hashTuple, bucketno);
 
 		/*
 		 * Increase the (optimal) number of buckets if we just exceeded the
@@ -1711,8 +1774,7 @@ retry:
 		memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
 
 		/* Push it onto the front of the bucket's list */
-		ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
-								  hashTuple, shared);
+		ExecParallelHashEnqueueTuple(hashtable, hashTuple, shared, bucketno);
 	}
 	else
 	{
@@ -1758,14 +1820,14 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable,
 
 	ExecHashGetBucketAndBatch(hashtable, hashvalue, &bucketno, &batchno);
 	Assert(batchno == hashtable->curbatch);
+
 	hashTuple = ExecParallelHashTupleAlloc(hashtable,
 										   HJTUPLE_OVERHEAD + tuple->t_len,
 										   &shared);
 	hashTuple->hashvalue = hashvalue;
 	memcpy(HJTUPLE_MINTUPLE(hashTuple), tuple, tuple->t_len);
 	HeapTupleHeaderClearMatch(HJTUPLE_MINTUPLE(hashTuple));
-	ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno],
-							  hashTuple, shared);
+	ExecParallelHashEnqueueTuple(hashtable, hashTuple, shared, bucketno);
 
 	if (shouldFree)
 		heap_free_minimal_tuple(tuple);
@@ -3224,6 +3286,44 @@ ExecParallelHashPushTuple(dsa_pointer_atomic *head,
 	}
 }
 
+static inline void
+ExecParallelHashEnqueueTuple(HashJoinTable hashtable, HashJoinTuple tuple,
+							 dsa_pointer tuple_shared, int bucketno)
+{
+	HashJoinTableInserter *inserter = &hashtable->inserter;
+	int			i = inserter->head;
+
+	/* Push the oldest item into the hash table. */
+	if (inserter->queue[i].tuple)
+		ExecParallelHashPushTuple(&hashtable->buckets.shared[inserter->queue[i].bucketno],
+								  inserter->queue[i].tuple,
+								  inserter->queue[i].tuple_shared);
+
+	/* Start fetching the cache line, and add it to the queue. */
+	pg_prefetch_mem(&hashtable->buckets.unshared[bucketno]);
+	inserter->queue[i].tuple = tuple;
+	inserter->queue[i].tuple_shared = tuple_shared;
+	inserter->queue[i].bucketno = bucketno;
+	inserter->head = (i + 1) % HJ_INSERTION_QUEUE_DEPTH;
+}
+
+void
+ExecParallelHashFlushInsertQueue(HashJoinTable hashtable)
+{
+	HashJoinTableInserter *inserter = &hashtable->inserter;
+
+	for (int i = 0; i < HJ_INSERTION_QUEUE_DEPTH; ++i)
+	{
+		if (inserter->queue[i].tuple)
+		{
+			ExecParallelHashPushTuple(&hashtable->buckets.shared[inserter->queue[i].bucketno],
+									  inserter->queue[i].tuple,
+									  inserter->queue[i].tuple_shared);
+			inserter->queue[i].tuple = NULL;
+		}
+	}
+}
+
 /*
  * Prepare to work on a given batch.
  */
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index c901a80923..93a12bd44e 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1056,6 +1056,7 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
 			 */
 			ExecHashTableInsert(hashtable, slot, hashvalue);
 		}
+		ExecHashFlushInsertQueue(hashtable);
 
 		/*
 		 * after we build the hash table, the inner batch file is no longer
@@ -1160,6 +1161,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
 						ExecParallelHashTableInsertCurrentBatch(hashtable, slot,
 																hashvalue);
 					}
+					ExecParallelHashFlushInsertQueue(hashtable);
 					sts_end_parallel_scan(inner_tuples);
 					BarrierArriveAndWait(batch_barrier,
 										 WAIT_EVENT_HASH_BATCH_LOADING);
diff --git a/src/include/c.h b/src/include/c.h
index 6898229b43..0546b13589 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -278,6 +278,14 @@
 #endif
 #endif
 
+/* Do we have support for prefetching memory? */
+#if defined(HAVE__BUILTIN_PREFETCH)
+#define pg_prefetch_mem(a) __builtin_prefetch(a)
+#elif defined(_MSC_VER)
+#define pg_prefetch_mem(a) _m_prefetch(a)
+#else
+#define pg_prefetch_mem(a)
+#endif
 
 /* ----------------------------------------------------------------
  *				Section 2:	bool, true, false
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index 79b634e8ed..8acb1d40f4 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -281,6 +281,19 @@ typedef struct ParallelHashJoinState
 #define PHJ_GROW_BUCKETS_REINSERTING	2
 #define PHJ_GROW_BUCKETS_PHASE(n)		((n) % 3)	/* circular phases */
 
+#define HJ_INSERTION_QUEUE_DEPTH		4
+
+typedef struct HashJoinTableInserter
+{
+	struct
+	{
+		HashJoinTupleData *tuple;
+		dsa_pointer	tuple_shared;
+		int			bucketno;
+	}			queue[HJ_INSERTION_QUEUE_DEPTH];
+	int			head;
+} HashJoinTableInserter;
+
 typedef struct HashJoinTableData
 {
 	int			nbuckets;		/* # buckets in the in-memory hash table */
@@ -348,6 +361,8 @@ typedef struct HashJoinTableData
 	MemoryContext hashCxt;		/* context for whole-hash-join storage */
 	MemoryContext batchCxt;		/* context for this-batch-only storage */
 
+	HashJoinTableInserter inserter;
+
 	/* used for dense allocation of tuples (into linked chunks) */
 	HashMemoryChunk chunks;		/* one list for the whole batch */
 
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 1336fde6b4..37825ff5d9 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -75,5 +75,7 @@ extern void ExecHashRetrieveInstrumentation(HashState *node);
 extern void ExecShutdownHash(HashState *node);
 extern void ExecHashGetInstrumentation(HashInstrumentation *instrument,
 									   HashJoinTable hashtable);
+extern void ExecHashFlushInsertQueue(HashJoinTable hashtable);
+extern void ExecParallelHashFlushInsertQueue(HashJoinTable hashtable);
 
 #endif							/* NODEHASH_H */
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 6f485f73cd..f952cc8a81 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -758,6 +758,9 @@
 /* Define to 1 if your compiler understands __builtin_popcount. */
 #undef HAVE__BUILTIN_POPCOUNT
 
+/* Define to 1 if your compiler understands __builtin_prefetch. */
+#undef HAVE__BUILTIN_PREFETCH
+
 /* Define to 1 if your compiler understands __builtin_types_compatible_p. */
 #undef HAVE__BUILTIN_TYPES_COMPATIBLE_P
 
-- 
2.23.0

