On Wed, Apr 24, 2019 at 02:36:33AM +0200, Tomas Vondra wrote:
... I still think the idea with an "overflow batch" is worth considering, because it'd allow us to keep the memory usage within work_mem. And after getting familiar with the hash join code again (haven't messed with it since 9.5 or so) I think it should not be all that difficult. I'll give it a try over the weekend if I get bored for a while.
OK, so I took a stab at this, and overall it seems to be workable. The
patches I have are nowhere near committable, but I think the approach
works fairly well - the memory is kept in check, and the performance is
comparable to the "ballancing" approach tested before.
To explain it a bit, the idea is that we can compute how many BufFile
structures we can keep in memory - we can't use more than work_mem/2 for
that, because then we'd mostly eliminate space for the actual data. For
example with 4MB, we know we can keep 128 batches - we need 128 for
outer and inner side, so 256 in total, and 256*8kB = 2MB.
And then, we just increase the number of batches but instead of adding
the BufFile entries, we split batches into slices that we can keep in
memory (say, the 128 batches). And we keep BufFiles for the current one
and an "overflow file" for the other slices. After processing a slice,
we simply switch to the next one, and use the overflow file as a temp
file for the first batch - we redistribute it into the other batches in
the slice and another overflow file.
That's what the v3 patch (named 'single overflow file') does. I does
work, but unfortunately it significantly inflates the amount of data
written to temporary files. Assume we need e.g. 1024 batches, but only
128 fit into memory. That means we'll need 8 slices, and during the
first pass we'll handle 1/8 of the data and write 7/8 to the overflow
file. Then after processing the slice and switching to the next one, we
repeat this dance - 1/8 gets processed, 6/8 written to another overflow
file. So essentially we "forward" about
7/8 + 6/8 + 5/8 + ... + 1/8 = 28/8 = 3.5
of data between slices, and we need to re-shuffle data in each slice,
which amounts to additional 1x data. That's pretty significant overhead,
as will be clear from the measurements I'll present shortly.
But luckily, there's a simple solution to this - instead of writing the
data into a single overflow file, we can create one overflow file for
each slice. That will leave us with the ~1x of additional writes when
distributing data into batches in the current slice, but it eliminates
the main source of write amplification - awalanche-like forwarding of
data between slices.
This relaxes the memory limit a bit again, because we can't really keep
the number of overflow files constrained by work_mem, but we should only
need few of them (much less than when adding one file per batch right
away). For example with 128 in-memory batches, this reduces the amount
of necessary memory 128x.
And this is what v4 (per-slice overflow file) does, pretty much.
Two more comments, regarding memory accounting in previous patches. It
was a bit broken, because we actually need 2x the number of BufFiles. We
needed nbatch files for outer side and nbatch files for inner side, but
we only considered one of those - both when deciding when to increase
the number of batches / increase spaceAllowed, and when reporting the
memory usage. So with large number of batches the reported amount of
used memory was roughly 1/2 of the actual value :-/
The memory accounting was a bit bogus for another reason - spaceUsed
simply tracks the amount of memory for hash table contents. But at the
end we were simply adding the current space for BufFile stuff, ignoring
the fact that that's likely much larger than when the spacePeak value
got stored. For example we might have kept early spaceUsed when it was
almost work_mem, and then added the final large BufFile allocation.
I've fixed both issues in the patches attached to this message. It does
not make a huge difference in practice, but it makes it easier to
compare values between patches.
Now, some test results - I've repeated the simple test with uniform data
set, which is pretty much ideal for hash joins (no unexlectedly large
batches that can't be split, etc.). I've done this with 1M, 5M, 10M, 25M
and 50M rows in the large table (which gets picked for the "hash" side),
and measured how much memory gets used, how many batches, how long it
takes and how much data gets written to temp files.
See the hashjoin-test.sh script for more details.
So, here are the results with work_mem = 4MB (so the number of in-memory
batches for the last two entries is 128). The columns are:
* nbatch - the final number of batches
* memory - memory usage, as reported by explain analyze
* time - duration of the query (without explain analyze) in seconds
* size - size of the large table
* temp - amount of data written to temp files
* amplif - write amplification (temp / size)
1M rows
===================================================================
nbatch memory time size (MB) temp (MB) amplif
-------------------------------------------------------------------
master 256 7681 3.3 730 899 1.23
rebalance 256 7711 3.3 730 884 1.21
single file 1024 4161 7.2 730 3168 4.34
per-slice file 1024 4161 4.7 730 1653 2.26
5M rows
===================================================================
nbatch memory time size (MB) temp (MB) amplif
-------------------------------------------------------------------
master 2048 36353 22 3652 5276 1.44
rebalance 512 16515 18 3652 4169 1.14
single file 4096 4353 156 3652 53897 14.76
per-slice file 4096 4353 28 3652 8106 2.21
10M rows
===================================================================
nbatch memory time size (MB) temp (MB) amplif
-------------------------------------------------------------------
master 4096 69121 61 7303 10556 1.45
rebalance 512 24326 46 7303 7405 1.01
single file 8192 4636 762 7303 211234 28.92
per-slice file 8192 4636 65 7303 16278 2.23
25M rows
===================================================================
nbatch memory time size (MB) temp (MB) amplif
-------------------------------------------------------------------
master 8192 134657 190 7303 24279 1.33
rebalance 1024 36611 158 7303 20024 1.10
single file 16384 6011 4054 7303 1046174 57.32
per-slice file 16384 6011 207 7303 39073 2.14
50M rows
===================================================================
nbatch memory time size (MB) temp (MB) amplif
-------------------------------------------------------------------
master 16384 265729 531 36500 48519 1.33
rebalance 2048 53241 447 36500 48077 1.32
single file - - - 36500 - -
per-slice file 32768 8125 451 36500 78662 2.16
From those numbers it's pretty clear that per-slice overflow file does
by far the best job in enforcing work_mem and minimizing the amount of data spilled to temp files. It does write a bit more data than both master and the simple rebalancing, but that's the cost for enforcing work_mem more strictly. It's generally a bit slower than those two approaches, although on the largest scale it's actually a bit faster than master. I think that's pretty acceptable, considering this is meant to address extreme underestimates where we currently just eat memory. The case with single overflow file performs rather poorly - I haven't even collected data from the largest scale, but considering it spilled 1TB of temp files with a dataset half the size, that's not an issue. (Note that this does not mean it needs 1TB of temp space, those writes are spread over time and the files are created/closed as we go. The system only has ~100GB of free disk space.) Gunther, could you try the v2 and v4 patches on your data set? That would be an interesting data point, I think. regards -- Tomas Vondra http://www.2ndQuadrant.comPostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..4d5a6872cc 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable
hashtable,
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
/* ----------------------------------------------------------------
* ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
ExecHashIncreaseNumBuckets(hashtable);
- /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
- hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
hashtable->partialTuples = hashtable->totalTuples;
}
@@ -1647,12 +1646,56 @@ ExecHashTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much
*/
hashtable->spaceUsed += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
+ /*
+ * Consider increasing number of batches.
+ *
+ * Each batch requires a non-trivial amount of memory, because
BufFile
+ * includes a PGAlignedBlock (typically 8kB buffer). So when
doubling
+ * the number of batches, we need to be careful and only allow
that if
+ * it actually has a chance of reducing memory usage.
+ *
+ * In particular, doubling the number of batches is pointless
when
+ *
+ * (spaceUsed / 2) < (nbatches * sizeof(BufFile))
+ *
+ * because we expect to save roughly 1/2 of memory currently
used for
+ * data (rows) at the price of doubling the memory used for
BufFile.
+ *
+ * We can't stop adding batches entirely, because that would
just mean
+ * the batches would need more and more memory. So we need to
increase
+ * the number of batches, even if we can't enforce work_mem
properly.
+ * The goal is to minimize the overall memory usage of the hash
join.
+ *
+ * Note: This applies mostly to cases of significant
underestimates,
+ * resulting in an explosion of the number of batches. The
properly
+ * estimated cases should generally end up using merge join
based on
+ * high cost of the batched hash join.
+ */
if (hashtable->spaceUsed +
- hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+ hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+ hashtable->nbatch * sizeof(PGAlignedBlock) * 2
> hashtable->spaceAllowed)
+ {
ExecHashIncreaseNumBatches(hashtable);
+
+ /*
+ * Consider increasing the resize threshold.
+ *
+ * For well estimated cases this does nothing, because
batches are
+ * expected to account only for small fraction of
work_mem. But if
+ * we significantly underestimate the number of
batches, we may end
+ * up in a situation where BufFile alone exceed
work_mem. So move
+ * the threshold a bit, until the next point where
it'll make sense
+ * to consider adding batches again.
+ */
+ hashtable->spaceAllowed
+ = Max(hashtable->spaceAllowed,
+ hashtable->nbatch *
sizeof(PGAlignedBlock) * 3);
+ }
}
else
{
@@ -1893,6 +1936,21 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
}
}
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+ Size spaceUsed = hashtable->spaceUsed;
+
+ /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+ spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+ /* Account for memory used for batch files (inner + outer) */
+ spaceUsed += hashtable->nbatch * sizeof(PGAlignedBlock) * 2;
+
+ if (spaceUsed > hashtable->spacePeak)
+ hashtable->spacePeak = spaceUsed;
+}
+
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2330,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
+ mcvsToUse * sizeof(int);
hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
/*
* Create a skew bucket for each MCV hash value.
@@ -2322,8 +2381,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
}
free_attstatsslot(&sslot);
@@ -2411,8 +2471,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashtable->spaceUsedSkew += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
ExecHashRemoveNextSkewBucket(hashtable);
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 799a22e9d5..c957043599 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es)
hinstrument.nbatch, es);
ExplainPropertyInteger("Original Hash Batches", NULL,
hinstrument.nbatch_original, es);
+ ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+
hinstrument.nbatch_original, es);
ExplainPropertyInteger("Peak Memory Usage", "kB",
spacePeakKb,
es);
}
@@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es)
hinstrument.nbuckets_original !=
hinstrument.nbuckets)
{
appendStringInfoSpaces(es->str, es->indent * 2);
- appendStringInfo(es->str,
- "Buckets: %d
(originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
- hinstrument.nbuckets,
-
hinstrument.nbuckets_original,
- hinstrument.nbatch,
-
hinstrument.nbatch_original,
- spacePeakKb);
+ if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+ appendStringInfo(es->str,
+ "Buckets: %d
(originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage:
%ldkB\n",
+
hinstrument.nbuckets,
+
hinstrument.nbuckets_original,
+
hinstrument.nbatch,
+
hinstrument.nbatch_original,
+
hinstrument.nbatch_inmemory,
+ spacePeakKb);
+ else
+ appendStringInfo(es->str,
+ "Buckets: %d
(originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
+
hinstrument.nbuckets,
+
hinstrument.nbuckets_original,
+
hinstrument.nbatch,
+
hinstrument.nbatch_original,
+ spacePeakKb);
}
else
{
appendStringInfoSpaces(es->str, es->indent * 2);
- appendStringInfo(es->str,
- "Buckets: %d Batches:
%d Memory Usage: %ldkB\n",
- hinstrument.nbuckets,
hinstrument.nbatch,
- spacePeakKb);
+ if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+ appendStringInfo(es->str,
+ "Buckets: %d
Batches: %d (in-memory: %d) Memory Usage: %ldkB\n",
+
hinstrument.nbuckets, hinstrument.nbatch,
+
hinstrument.nbatch_inmemory,
+ spacePeakKb);
+ else
+ appendStringInfo(es->str,
+ "Buckets: %d
Batches: %d Memory Usage: %ldkB\n",
+
hinstrument.nbuckets, hinstrument.nbatch,
+ spacePeakKb);
}
}
}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..044d360fd4 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable
hashtable,
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
/* ----------------------------------------------------------------
* ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
ExecHashIncreaseNumBuckets(hashtable);
- /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
- hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
hashtable->partialTuples = hashtable->totalTuples;
}
@@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
size_t space_allowed;
int nbuckets;
int nbatch;
+ int nbatch_inmemory;
double rows;
int num_skew_mcvs;
int log2_nbuckets;
@@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
state->parallel_state
!= NULL ?
state->parallel_state->nparticipants - 1 : 0,
&space_allowed,
- &nbuckets, &nbatch,
&num_skew_mcvs);
+ &nbuckets, &nbatch,
&nbatch_inmemory,
+ &num_skew_mcvs);
/* nbuckets must be a power of 2 */
log2_nbuckets = my_log2(nbuckets);
@@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
hashtable->nSkewBuckets = 0;
hashtable->skewBucketNums = NULL;
hashtable->nbatch = nbatch;
+ hashtable->nbatch_inmemory = nbatch_inmemory;
hashtable->curbatch = 0;
hashtable->nbatch_original = nbatch;
hashtable->nbatch_outstart = nbatch;
@@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
+ hashtable->innerOverflowFile = NULL;
+ hashtable->outerOverflowFile = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
hashtable->spaceAllowed = space_allowed;
@@ -559,14 +563,16 @@ ExecHashTableCreate(HashState *state, List
*hashOperators, bool keepNulls)
if (nbatch > 1 && hashtable->parallel_state == NULL)
{
+ int cnt = Min(nbatch, nbatch_inmemory);
+
/*
* allocate and initialize the file arrays in hashCxt (not
needed for
* parallel case which uses shared tuplestores instead of raw
files)
*/
hashtable->innerBatchFile = (BufFile **)
- palloc0(nbatch * sizeof(BufFile *));
+ palloc0(cnt * sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
- palloc0(nbatch * sizeof(BufFile *));
+ palloc0(cnt * sizeof(BufFile *));
/* The files will not be opened until needed... */
/* ... but make sure we have temp tablespaces established for
them */
PrepareTempTablespaces();
@@ -665,6 +671,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs)
{
int tupsize;
@@ -675,6 +682,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
long max_pointers;
long mppow2;
int nbatch = 1;
+ int nbatch_inmemory = 1;
int nbuckets;
double dbuckets;
@@ -795,6 +803,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
space_allowed,
numbuckets,
numbatches,
+
numbatches_inmemory,
num_skew_mcvs);
return;
}
@@ -831,11 +840,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth,
bool useskew,
nbatch <<= 1;
}
+ /*
+ * See how many batches we can fit into memory (driven mostly by size
+ * of BufFile, with PGAlignedBlock being the largest part of that).
+ * We need one BufFile for inner and outer side, so we count it twice
+ * for each batch, and we stop once we exceed (work_mem/2).
+ */
+ while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+ <= (work_mem * 1024L / 2))
+ nbatch_inmemory *= 2;
+
Assert(nbuckets > 0);
Assert(nbatch > 0);
*numbuckets = nbuckets;
*numbatches = nbatch;
+ *numbatches_inmemory = nbatch_inmemory;
}
@@ -857,13 +877,21 @@ ExecHashTableDestroy(HashJoinTable hashtable)
*/
if (hashtable->innerBatchFile != NULL)
{
- for (i = 1; i < hashtable->nbatch; i++)
+ int nbatch = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+ for (i = 1; i < nbatch; i++)
{
if (hashtable->innerBatchFile[i])
BufFileClose(hashtable->innerBatchFile[i]);
if (hashtable->outerBatchFile[i])
BufFileClose(hashtable->outerBatchFile[i]);
}
+
+ if (hashtable->innerOverflowFile)
+ BufFileClose(hashtable->innerOverflowFile);
+
+ if (hashtable->outerOverflowFile)
+ BufFileClose(hashtable->outerOverflowFile);
}
/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -909,6 +937,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
if (hashtable->innerBatchFile == NULL)
{
+ /* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+
/* we had no file arrays before */
hashtable->innerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
@@ -919,15 +949,23 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
/* enlarge arrays and zero out added entries */
hashtable->innerBatchFile = (BufFile **)
- repalloc(hashtable->innerBatchFile, nbatch *
sizeof(BufFile *));
+ repalloc(hashtable->innerBatchFile, nbatch_tmp *
sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
- repalloc(hashtable->outerBatchFile, nbatch *
sizeof(BufFile *));
- MemSet(hashtable->innerBatchFile + oldnbatch, 0,
- (nbatch - oldnbatch) * sizeof(BufFile *));
- MemSet(hashtable->outerBatchFile + oldnbatch, 0,
- (nbatch - oldnbatch) * sizeof(BufFile *));
+ repalloc(hashtable->outerBatchFile, nbatch_tmp *
sizeof(BufFile *));
+
+ if (oldnbatch < nbatch_tmp)
+ {
+ MemSet(hashtable->innerBatchFile + oldnbatch, 0,
+ (nbatch_tmp - oldnbatch) * sizeof(BufFile
*));
+ MemSet(hashtable->outerBatchFile + oldnbatch, 0,
+ (nbatch_tmp - oldnbatch) * sizeof(BufFile
*));
+ }
+
+ /* no need to initialize the overflow files explicitly */
}
MemoryContextSwitchTo(oldcxt);
@@ -999,11 +1037,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* dump it out */
Assert(batchno > curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable,
batchno,
+
hashtable->innerBatchFile,
+
&hashtable->innerOverflowFile);
+
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
hashTuple->hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+
batchFile);
hashtable->spaceUsed -= hashTupleSize;
nfreed++;
@@ -1647,22 +1692,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much
*/
hashtable->spaceUsed += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
+ /* Consider increasing number of batches if we filled work_mem.
*/
if (hashtable->spaceUsed +
- hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+ hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+ Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
sizeof(PGAlignedBlock) * 2 /* inner + outer */
> hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);
}
else
{
+ BufFile **batchFile;
+
/*
* put the tuple into a temp file for later batches
*/
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->innerBatchFile,
+
&hashtable->innerOverflowFile);
+
ExecHashJoinSaveTuple(tuple,
hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+ batchFile);
}
}
@@ -1893,6 +1949,100 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
}
}
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+ int slice,
+ curslice;
+
+ if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+ return batchno;
+
+ slice = batchno / hashtable->nbatch_inmemory;
+ curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+ /* slices can't go backwards */
+ Assert(slice >= curslice);
+
+ /* overflow slice */
+ if (slice > curslice)
+ return -1;
+
+ /* current slice, compute index in the current array */
+ return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **batchFiles, BufFile
**overflowFile)
+{
+ int idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+ if (idx == -1)
+ return overflowFile;
+
+ return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+ memset(hashtable->innerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->innerBatchFile[0] = hashtable->innerOverflowFile;
+ hashtable->innerOverflowFile = NULL;
+
+ memset(hashtable->outerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->outerBatchFile[0] = hashtable->outerOverflowFile;
+ hashtable->outerOverflowFile = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+ int batchidx;
+
+ hashtable->curbatch++;
+
+ /* see if we skipped to the next batch slice */
+ batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+ /* Can't be -1, current batch is in the current slice by definition. */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+ /*
+ * If we skipped to the next slice of batches, reset the array of files
+ * and use the overflow file as the first batch.
+ */
+ if (batchidx == 0)
+ ExecHashSwitchToNextBatchSlice(hashtable);
+
+ return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+ Size spaceUsed = hashtable->spaceUsed;
+
+ /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+ spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+ /* Account for memory used for batch files (inner + outer) */
+ spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ /* Account for slice files (inner + outer) */
+ spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ if (spaceUsed > hashtable->spacePeak)
+ hashtable->spacePeak = spaceUsed;
+}
+
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2422,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
+ mcvsToUse * sizeof(int);
hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
/*
* Create a skew bucket for each MCV hash value.
@@ -2322,8 +2473,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
}
free_attstatsslot(&sslot);
@@ -2411,8 +2563,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashtable->spaceUsedSkew += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
ExecHashRemoveNextSkewBucket(hashtable);
@@ -2488,10 +2642,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* Put the tuple into a temp file for later batches */
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->innerBatchFile,
+
&hashtable->innerOverflowFile);
+
ExecHashJoinSaveTuple(tuple, hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+ batchFile);
pfree(hashTuple);
hashtable->spaceUsed -= tupleSize;
hashtable->spaceUsedSkew -= tupleSize;
@@ -2640,6 +2801,7 @@ ExecHashGetInstrumentation(HashInstrumentation
*instrument,
instrument->nbuckets_original = hashtable->nbuckets_original;
instrument->nbatch = hashtable->nbatch;
instrument->nbatch_original = hashtable->nbatch_original;
+ instrument->nbatch_inmemory = Min(hashtable->nbatch,
hashtable->nbatch_inmemory);
instrument->space_peak = hashtable->spacePeak;
}
diff --git a/src/backend/executor/nodeHashjoin.c
b/src/backend/executor/nodeHashjoin.c
index 5922e60eed..e59d4d8003 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (batchno != hashtable->curbatch &&
node->hj_CurSkewBucketNo ==
INVALID_SKEW_BUCKET_NO)
{
+ BufFile **batchFile;
+
/*
* Need to postpone this outer tuple to
a later batch.
* Save it in the corresponding
outer-batch file.
*/
Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
+
+ batchFile =
ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->outerBatchFile,
+
&hashtable->outerOverflowFile);
+
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
hashvalue,
-
&hashtable->outerBatchFile[batchno]);
+
batchFile);
/* Loop around, staying in
HJ_NEED_NEW_OUTER state */
continue;
@@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
}
else if (curbatch < hashtable->nbatch)
{
- BufFile *file = hashtable->outerBatchFile[curbatch];
+ BufFile **file = ExecHashGetBatchFile(hashtable, curbatch,
+
hashtable->outerBatchFile,
+
&hashtable->outerOverflowFile);
/*
* In outer-join cases, we could get here even though the batch
file
* is empty.
*/
- if (file == NULL)
+ if (*file == NULL)
return NULL;
slot = ExecHashJoinGetSavedTuple(hjstate,
-
file,
+
*file,
hashvalue,
hjstate->hj_OuterTupleSlot);
if (!TupIsNull(slot))
@@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
BufFile *innerFile;
TupleTableSlot *slot;
uint32 hashvalue;
+ int batchidx;
+ int curbatch_old;
nbatch = hashtable->nbatch;
curbatch = hashtable->curbatch;
+ curbatch_old = curbatch;
+
+ /* index of the old batch */
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+ /* has to be in the current slice of batches */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
if (curbatch > 0)
{
@@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* We no longer need the previous outer batch file; close it
right
* away to free disk space.
*/
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
}
else /* we just finished the
first batch */
{
@@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* scan, we have to rescan outer batches in case they contain tuples
that
* need to be reassigned.
*/
- curbatch++;
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
while (curbatch < nbatch &&
- (hashtable->outerBatchFile[curbatch] == NULL ||
- hashtable->innerBatchFile[curbatch] == NULL))
+ (hashtable->outerBatchFile[batchidx] == NULL ||
+ hashtable->innerBatchFile[batchidx] == NULL))
{
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
HJ_FILL_OUTER(hjstate))
break; /* must process due to
rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
HJ_FILL_INNER(hjstate))
break; /* must process due to
rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_original)
break; /* must process due to
rule 2 */
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_outstart)
break; /* must process due to
rule 3 */
/* We can ignore this batch. */
/* Release associated temp files right away. */
- if (hashtable->innerBatchFile[curbatch])
- BufFileClose(hashtable->innerBatchFile[curbatch]);
- hashtable->innerBatchFile[curbatch] = NULL;
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
- curbatch++;
+ if (hashtable->innerBatchFile[batchidx])
+ BufFileClose(hashtable->innerBatchFile[batchidx]);
+ hashtable->innerBatchFile[batchidx] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
+
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
}
if (curbatch >= nbatch)
+ {
+ hashtable->curbatch = curbatch_old;
return false; /* no more batches */
-
- hashtable->curbatch = curbatch;
+ }
/*
* Reload the hash table with the new inner batch (which could be empty)
*/
ExecHashTableReset(hashtable);
- innerFile = hashtable->innerBatchFile[curbatch];
+ innerFile = hashtable->innerBatchFile[batchidx];
if (innerFile != NULL)
{
@@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* needed
*/
BufFileClose(innerFile);
- hashtable->innerBatchFile[curbatch] = NULL;
+ hashtable->innerBatchFile[batchidx] = NULL;
}
/*
* Rewind outer batch file (if present), so that we can start reading
it.
*/
- if (hashtable->outerBatchFile[curbatch] != NULL)
+ if (hashtable->outerBatchFile[batchidx] != NULL)
{
- if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L,
SEEK_SET))
+ if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L,
SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join
temporary file: %m")));
diff --git a/src/backend/optimizer/path/costsize.c
b/src/backend/optimizer/path/costsize.c
index c7400941ee..e324869c09 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root,
JoinCostWorkspace *workspace,
int num_hashclauses = list_length(hashclauses);
int numbuckets;
int numbatches;
+ int numbatches_inmemory;
int num_skew_mcvs;
size_t space_allowed; /* unused */
@@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root,
JoinCostWorkspace *workspace,
&space_allowed,
&numbuckets,
&numbatches,
+ &numbatches_inmemory,
&num_skew_mcvs);
/*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index a9f9872a78..ef60df5024 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -308,6 +308,7 @@ typedef struct HashJoinTableData
int *skewBucketNums; /* array indexes of active skew
buckets */
int nbatch; /* number of batches */
+ int nbatch_inmemory; /* max number of
in-memory batches */
int curbatch; /* current batch #; 0
during 1st pass */
int nbatch_original; /* nbatch when we
started inner scan */
@@ -329,6 +330,9 @@ typedef struct HashJoinTableData
BufFile **innerBatchFile; /* buffered virtual temp file per batch */
BufFile **outerBatchFile; /* buffered virtual temp file per batch */
+ BufFile *innerOverflowFile; /* temp file for overflow batch batch */
+ BufFile *outerOverflowFile; /* temp file for overflow batch batch */
+
/*
* Info about the datatype-specific hash functions for the datatypes
being
* hashed. These are arrays of the same length as the number of hash
join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 8d700c06c5..78389bc0cf 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
#include "access/parallel.h"
#include "nodes/execnodes.h"
+#include "storage/buffile.h"
struct SharedHashJoinBatch;
@@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable
hashtable,
uint32 hashvalue,
int *bucketno,
int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **files, BufFile **overflow);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext
*econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int
tupwidth, bool useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs);
extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32
hashvalue);
extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9959c9e31f..6c53c5abd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation
int nbuckets_original; /* planned number of
buckets */
int nbatch; /* number of batches at
end of execution */
int nbatch_original; /* planned number of
batches */
+ int nbatch_inmemory; /* number of batches
kept in memory */
size_t space_peak; /* speak memory usage in bytes
*/
} HashInstrumentation;
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 799a22e9d5..c957043599 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -2612,6 +2612,8 @@ show_hash_info(HashState *hashstate, ExplainState *es)
hinstrument.nbatch, es);
ExplainPropertyInteger("Original Hash Batches", NULL,
hinstrument.nbatch_original, es);
+ ExplainPropertyInteger("In-Memory Hash Batches", NULL,
+
hinstrument.nbatch_original, es);
ExplainPropertyInteger("Peak Memory Usage", "kB",
spacePeakKb,
es);
}
@@ -2619,21 +2621,38 @@ show_hash_info(HashState *hashstate, ExplainState *es)
hinstrument.nbuckets_original !=
hinstrument.nbuckets)
{
appendStringInfoSpaces(es->str, es->indent * 2);
- appendStringInfo(es->str,
- "Buckets: %d
(originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
- hinstrument.nbuckets,
-
hinstrument.nbuckets_original,
- hinstrument.nbatch,
-
hinstrument.nbatch_original,
- spacePeakKb);
+ if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+ appendStringInfo(es->str,
+ "Buckets: %d
(originally %d) Batches: %d (originally %d, in-memory %d) Memory Usage:
%ldkB\n",
+
hinstrument.nbuckets,
+
hinstrument.nbuckets_original,
+
hinstrument.nbatch,
+
hinstrument.nbatch_original,
+
hinstrument.nbatch_inmemory,
+ spacePeakKb);
+ else
+ appendStringInfo(es->str,
+ "Buckets: %d
(originally %d) Batches: %d (originally %d) Memory Usage: %ldkB\n",
+
hinstrument.nbuckets,
+
hinstrument.nbuckets_original,
+
hinstrument.nbatch,
+
hinstrument.nbatch_original,
+ spacePeakKb);
}
else
{
appendStringInfoSpaces(es->str, es->indent * 2);
- appendStringInfo(es->str,
- "Buckets: %d Batches:
%d Memory Usage: %ldkB\n",
- hinstrument.nbuckets,
hinstrument.nbatch,
- spacePeakKb);
+ if (hinstrument.nbatch != hinstrument.nbatch_inmemory)
+ appendStringInfo(es->str,
+ "Buckets: %d
Batches: %d (in-memory: %d) Memory Usage: %ldkB\n",
+
hinstrument.nbuckets, hinstrument.nbatch,
+
hinstrument.nbatch_inmemory,
+ spacePeakKb);
+ else
+ appendStringInfo(es->str,
+ "Buckets: %d
Batches: %d Memory Usage: %ldkB\n",
+
hinstrument.nbuckets, hinstrument.nbatch,
+ spacePeakKb);
}
}
}
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c
index 6ffaa751f2..4364eb7cdd 100644
--- a/src/backend/executor/nodeHash.c
+++ b/src/backend/executor/nodeHash.c
@@ -80,6 +80,7 @@ static bool ExecParallelHashTuplePrealloc(HashJoinTable
hashtable,
static void ExecParallelHashMergeCounters(HashJoinTable hashtable);
static void ExecParallelHashCloseBatchAccessors(HashJoinTable hashtable);
+static void ExecHashUpdateSpacePeak(HashJoinTable hashtable);
/* ----------------------------------------------------------------
* ExecHash
@@ -193,10 +194,8 @@ MultiExecPrivateHash(HashState *node)
if (hashtable->nbuckets != hashtable->nbuckets_optimal)
ExecHashIncreaseNumBuckets(hashtable);
- /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
- hashtable->spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
hashtable->partialTuples = hashtable->totalTuples;
}
@@ -433,6 +432,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
size_t space_allowed;
int nbuckets;
int nbatch;
+ int nbatch_inmemory;
double rows;
int num_skew_mcvs;
int log2_nbuckets;
@@ -462,7 +462,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
state->parallel_state
!= NULL ?
state->parallel_state->nparticipants - 1 : 0,
&space_allowed,
- &nbuckets, &nbatch,
&num_skew_mcvs);
+ &nbuckets, &nbatch,
&nbatch_inmemory,
+ &num_skew_mcvs);
/* nbuckets must be a power of 2 */
log2_nbuckets = my_log2(nbuckets);
@@ -489,6 +490,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
hashtable->nSkewBuckets = 0;
hashtable->skewBucketNums = NULL;
hashtable->nbatch = nbatch;
+ hashtable->nbatch_inmemory = nbatch_inmemory;
hashtable->curbatch = 0;
hashtable->nbatch_original = nbatch;
hashtable->nbatch_outstart = nbatch;
@@ -498,6 +500,8 @@ ExecHashTableCreate(HashState *state, List *hashOperators,
bool keepNulls)
hashtable->skewTuples = 0;
hashtable->innerBatchFile = NULL;
hashtable->outerBatchFile = NULL;
+ hashtable->innerOverflowFiles = NULL;
+ hashtable->outerOverflowFiles = NULL;
hashtable->spaceUsed = 0;
hashtable->spacePeak = 0;
hashtable->spaceAllowed = space_allowed;
@@ -559,16 +563,30 @@ ExecHashTableCreate(HashState *state, List
*hashOperators, bool keepNulls)
if (nbatch > 1 && hashtable->parallel_state == NULL)
{
+ int cnt = Min(nbatch, nbatch_inmemory);
+
/*
* allocate and initialize the file arrays in hashCxt (not
needed for
* parallel case which uses shared tuplestores instead of raw
files)
*/
hashtable->innerBatchFile = (BufFile **)
- palloc0(nbatch * sizeof(BufFile *));
+ palloc0(cnt * sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
- palloc0(nbatch * sizeof(BufFile *));
+ palloc0(cnt * sizeof(BufFile *));
/* The files will not be opened until needed... */
/* ... but make sure we have temp tablespaces established for
them */
+
+ /* also allocate files for overflow batches */
+ if (nbatch > nbatch_inmemory)
+ {
+ int nslices = (nbatch / nbatch_inmemory);
+
+ hashtable->innerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ hashtable->outerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ }
+
PrepareTempTablespaces();
}
@@ -665,6 +683,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs)
{
int tupsize;
@@ -675,6 +694,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
long max_pointers;
long mppow2;
int nbatch = 1;
+ int nbatch_inmemory = 1;
int nbuckets;
double dbuckets;
@@ -795,6 +815,7 @@ ExecChooseHashTableSize(double ntuples, int tupwidth, bool
useskew,
space_allowed,
numbuckets,
numbatches,
+
numbatches_inmemory,
num_skew_mcvs);
return;
}
@@ -831,11 +852,22 @@ ExecChooseHashTableSize(double ntuples, int tupwidth,
bool useskew,
nbatch <<= 1;
}
+ /*
+ * See how many batches we can fit into memory (driven mostly by size
+ * of BufFile, with PGAlignedBlock being the largest part of that).
+ * We need one BufFile for inner and outer side, so we count it twice
+ * for each batch, and we stop once we exceed (work_mem/2).
+ */
+ while ((nbatch_inmemory * 2) * sizeof(PGAlignedBlock) * 2
+ <= (work_mem * 1024L / 2))
+ nbatch_inmemory *= 2;
+
Assert(nbuckets > 0);
Assert(nbatch > 0);
*numbuckets = nbuckets;
*numbatches = nbatch;
+ *numbatches_inmemory = nbatch_inmemory;
}
@@ -857,13 +889,27 @@ ExecHashTableDestroy(HashJoinTable hashtable)
*/
if (hashtable->innerBatchFile != NULL)
{
- for (i = 1; i < hashtable->nbatch; i++)
+ int n = Min(hashtable->nbatch, hashtable->nbatch_inmemory);
+
+ for (i = 1; i < n; i++)
{
if (hashtable->innerBatchFile[i])
BufFileClose(hashtable->innerBatchFile[i]);
if (hashtable->outerBatchFile[i])
BufFileClose(hashtable->outerBatchFile[i]);
}
+
+ /* number of batch slices */
+ n = hashtable->nbatch / hashtable->nbatch_inmemory;
+
+ for (i = 1; i < n; i++)
+ {
+ if (hashtable->innerOverflowFiles[i])
+ BufFileClose(hashtable->innerOverflowFiles[i]);
+
+ if (hashtable->outerOverflowFiles[i])
+ BufFileClose(hashtable->outerOverflowFiles[i]);
+ }
}
/* Release working memory (batchCxt is a child, so it goes away too) */
@@ -909,6 +955,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
if (hashtable->innerBatchFile == NULL)
{
+ /* XXX nbatch=1, no need to deal with nbatch_inmemory here */
+
/* we had no file arrays before */
hashtable->innerBatchFile = (BufFile **)
palloc0(nbatch * sizeof(BufFile *));
@@ -919,15 +967,50 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ int nbatch_tmp = Min(nbatch, hashtable->nbatch_inmemory);
+
/* enlarge arrays and zero out added entries */
hashtable->innerBatchFile = (BufFile **)
- repalloc(hashtable->innerBatchFile, nbatch *
sizeof(BufFile *));
+ repalloc(hashtable->innerBatchFile, nbatch_tmp *
sizeof(BufFile *));
hashtable->outerBatchFile = (BufFile **)
- repalloc(hashtable->outerBatchFile, nbatch *
sizeof(BufFile *));
- MemSet(hashtable->innerBatchFile + oldnbatch, 0,
- (nbatch - oldnbatch) * sizeof(BufFile *));
- MemSet(hashtable->outerBatchFile + oldnbatch, 0,
- (nbatch - oldnbatch) * sizeof(BufFile *));
+ repalloc(hashtable->outerBatchFile, nbatch_tmp *
sizeof(BufFile *));
+
+ if (oldnbatch < nbatch_tmp)
+ {
+ MemSet(hashtable->innerBatchFile + oldnbatch, 0,
+ (nbatch_tmp - oldnbatch) * sizeof(BufFile
*));
+ MemSet(hashtable->outerBatchFile + oldnbatch, 0,
+ (nbatch_tmp - oldnbatch) * sizeof(BufFile
*));
+ }
+
+ if (nbatch_tmp > hashtable->nbatch_inmemory)
+ {
+ int nslices = (nbatch / hashtable->nbatch_inmemory);
+
+ if (hashtable->innerOverflowFiles == NULL)
+ {
+ hashtable->innerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ hashtable->outerOverflowFiles = (BufFile **)
+ palloc0(nslices * sizeof(BufFile *));
+ }
+ else
+ {
+ hashtable->innerOverflowFiles = (BufFile **)
+ repalloc(hashtable->innerOverflowFiles,
+ nslices *
sizeof(BufFile *));
+ hashtable->outerOverflowFiles = (BufFile **)
+ repalloc(hashtable->outerOverflowFiles,
+ nslices *
sizeof(BufFile *));
+
+ /* we double the number of batches, so we know
the old
+ * value was nslices/2 exactly */
+ memset(hashtable->innerOverflowFiles +
nslices/2, 0,
+ (nslices/2) * sizeof(BufFile *));
+ memset(hashtable->outerOverflowFiles +
nslices/2, 0,
+ (nslices/2) * sizeof(BufFile *));
+ }
+ }
}
MemoryContextSwitchTo(oldcxt);
@@ -999,11 +1082,18 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* dump it out */
Assert(batchno > curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable,
batchno,
+
hashtable->innerBatchFile,
+
hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
hashTuple->hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+
batchFile);
hashtable->spaceUsed -= hashTupleSize;
nfreed++;
@@ -1647,22 +1737,33 @@ ExecHashTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much
*/
hashtable->spaceUsed += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
+ /* Consider increasing number of batches if we filled work_mem.
*/
if (hashtable->spaceUsed +
- hashtable->nbuckets_optimal * sizeof(HashJoinTuple)
+ hashtable->nbuckets_optimal * sizeof(HashJoinTuple) +
+ Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
sizeof(PGAlignedBlock) * 2 /* inner + outer */
> hashtable->spaceAllowed)
ExecHashIncreaseNumBatches(hashtable);
}
else
{
+ BufFile **batchFile;
+
/*
* put the tuple into a temp file for later batches
*/
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->innerBatchFile,
+
hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(tuple,
hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+ batchFile);
}
}
@@ -1893,6 +1994,108 @@ ExecHashGetBucketAndBatch(HashJoinTable hashtable,
}
}
+int
+ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno)
+{
+ int slice,
+ curslice;
+
+ if (hashtable->nbatch <= hashtable->nbatch_inmemory)
+ return batchno;
+
+ slice = batchno / hashtable->nbatch_inmemory;
+ curslice = hashtable->curbatch / hashtable->nbatch_inmemory;
+
+ /* slices can't go backwards */
+ Assert(slice >= curslice);
+
+ /* overflow slice */
+ if (slice > curslice)
+ return -1;
+
+ /* current slice, compute index in the current array */
+ return (batchno % hashtable->nbatch_inmemory);
+}
+
+BufFile **
+ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **batchFiles, BufFile
**overflowFiles)
+{
+ int idx = ExecHashGetBatchIndex(hashtable, batchno);
+
+ /* get the right overflow file */
+ if (idx == -1)
+ {
+ int slice = (batchno / hashtable->nbatch_inmemory);
+
+ return &overflowFiles[slice];
+ }
+
+ /* batch file in the current slice */
+ return &batchFiles[idx];
+}
+
+void
+ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable)
+{
+ int slice = (hashtable->curbatch / hashtable->nbatch_inmemory);
+
+ memset(hashtable->innerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->innerBatchFile[0] = hashtable->innerOverflowFiles[slice];
+ hashtable->innerOverflowFiles[slice] = NULL;
+
+ memset(hashtable->outerBatchFile, 0,
+ hashtable->nbatch_inmemory * sizeof(BufFile *));
+
+ hashtable->outerBatchFile[0] = hashtable->outerOverflowFiles[slice];
+ hashtable->outerOverflowFiles[slice] = NULL;
+}
+
+int
+ExecHashSwitchToNextBatch(HashJoinTable hashtable)
+{
+ int batchidx;
+
+ hashtable->curbatch++;
+
+ /* see if we skipped to the next batch slice */
+ batchidx = ExecHashGetBatchIndex(hashtable, hashtable->curbatch);
+
+ /* Can't be -1, current batch is in the current slice by definition. */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
+
+ /*
+ * If we skipped to the next slice of batches, reset the array of files
+ * and use the overflow file as the first batch.
+ */
+ if (batchidx == 0)
+ ExecHashSwitchToNextBatchSlice(hashtable);
+
+ return hashtable->curbatch;
+}
+
+static void
+ExecHashUpdateSpacePeak(HashJoinTable hashtable)
+{
+ Size spaceUsed = hashtable->spaceUsed;
+
+ /* Account for the buckets in spaceUsed (reported in EXPLAIN ANALYZE) */
+ spaceUsed += hashtable->nbuckets * sizeof(HashJoinTuple);
+
+ /* Account for memory used for batch files (inner + outer) */
+ spaceUsed += Min(hashtable->nbatch, hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ /* Account for slice files (inner + outer) */
+ spaceUsed += (hashtable->nbatch / hashtable->nbatch_inmemory) *
+ sizeof(PGAlignedBlock) * 2;
+
+ if (spaceUsed > hashtable->spacePeak)
+ hashtable->spacePeak = spaceUsed;
+}
+
/*
* ExecScanHashBucket
* scan a hash bucket for matches to the current outer tuple
@@ -2272,8 +2475,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
+ mcvsToUse * sizeof(int);
hashtable->spaceUsedSkew += nbuckets * sizeof(HashSkewBucket *)
+ mcvsToUse * sizeof(int);
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
/*
* Create a skew bucket for each MCV hash value.
@@ -2322,8 +2526,9 @@ ExecHashBuildSkewHash(HashJoinTable hashtable, Hash
*node, int mcvsToUse)
hashtable->nSkewBuckets++;
hashtable->spaceUsed += SKEW_BUCKET_OVERHEAD;
hashtable->spaceUsedSkew += SKEW_BUCKET_OVERHEAD;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
}
free_attstatsslot(&sslot);
@@ -2411,8 +2616,10 @@ ExecHashSkewTableInsert(HashJoinTable hashtable,
/* Account for space used, and back off if we've used too much */
hashtable->spaceUsed += hashTupleSize;
hashtable->spaceUsedSkew += hashTupleSize;
- if (hashtable->spaceUsed > hashtable->spacePeak)
- hashtable->spacePeak = hashtable->spaceUsed;
+
+ /* refresh info about peak used memory */
+ ExecHashUpdateSpacePeak(hashtable);
+
while (hashtable->spaceUsedSkew > hashtable->spaceAllowedSkew)
ExecHashRemoveNextSkewBucket(hashtable);
@@ -2488,10 +2695,17 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
}
else
{
+ BufFile **batchFile;
+
/* Put the tuple into a temp file for later batches */
Assert(batchno > hashtable->curbatch);
+
+ batchFile = ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->innerBatchFile,
+
hashtable->innerOverflowFiles);
+
ExecHashJoinSaveTuple(tuple, hashvalue,
-
&hashtable->innerBatchFile[batchno]);
+ batchFile);
pfree(hashTuple);
hashtable->spaceUsed -= tupleSize;
hashtable->spaceUsedSkew -= tupleSize;
@@ -2640,6 +2854,7 @@ ExecHashGetInstrumentation(HashInstrumentation
*instrument,
instrument->nbuckets_original = hashtable->nbuckets_original;
instrument->nbatch = hashtable->nbatch;
instrument->nbatch_original = hashtable->nbatch_original;
+ instrument->nbatch_inmemory = Min(hashtable->nbatch,
hashtable->nbatch_inmemory);
instrument->space_peak = hashtable->spacePeak;
}
diff --git a/src/backend/executor/nodeHashjoin.c
b/src/backend/executor/nodeHashjoin.c
index 5922e60eed..a8db71925b 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -389,15 +389,22 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
if (batchno != hashtable->curbatch &&
node->hj_CurSkewBucketNo ==
INVALID_SKEW_BUCKET_NO)
{
+ BufFile **batchFile;
+
/*
* Need to postpone this outer tuple to
a later batch.
* Save it in the corresponding
outer-batch file.
*/
Assert(parallel_state == NULL);
Assert(batchno > hashtable->curbatch);
+
+ batchFile =
ExecHashGetBatchFile(hashtable, batchno,
+
hashtable->outerBatchFile,
+
hashtable->outerOverflowFiles);
+
ExecHashJoinSaveTuple(ExecFetchSlotMinimalTuple(outerTupleSlot),
hashvalue,
-
&hashtable->outerBatchFile[batchno]);
+
batchFile);
/* Loop around, staying in
HJ_NEED_NEW_OUTER state */
continue;
@@ -849,17 +856,19 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode,
}
else if (curbatch < hashtable->nbatch)
{
- BufFile *file = hashtable->outerBatchFile[curbatch];
+ BufFile **file = ExecHashGetBatchFile(hashtable, curbatch,
+
hashtable->outerBatchFile,
+
hashtable->outerOverflowFiles);
/*
* In outer-join cases, we could get here even though the batch
file
* is empty.
*/
- if (file == NULL)
+ if (*file == NULL)
return NULL;
slot = ExecHashJoinGetSavedTuple(hjstate,
-
file,
+
*file,
hashvalue,
hjstate->hj_OuterTupleSlot);
if (!TupIsNull(slot))
@@ -946,9 +955,18 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
BufFile *innerFile;
TupleTableSlot *slot;
uint32 hashvalue;
+ int batchidx;
+ int curbatch_old;
nbatch = hashtable->nbatch;
curbatch = hashtable->curbatch;
+ curbatch_old = curbatch;
+
+ /* index of the old batch */
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
+ /* has to be in the current slice of batches */
+ Assert(batchidx >= 0 && batchidx < hashtable->nbatch_inmemory);
if (curbatch > 0)
{
@@ -956,9 +974,9 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* We no longer need the previous outer batch file; close it
right
* away to free disk space.
*/
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
}
else /* we just finished the
first batch */
{
@@ -992,45 +1010,50 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* scan, we have to rescan outer batches in case they contain tuples
that
* need to be reassigned.
*/
- curbatch++;
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
+
while (curbatch < nbatch &&
- (hashtable->outerBatchFile[curbatch] == NULL ||
- hashtable->innerBatchFile[curbatch] == NULL))
+ (hashtable->outerBatchFile[batchidx] == NULL ||
+ hashtable->innerBatchFile[batchidx] == NULL))
{
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
HJ_FILL_OUTER(hjstate))
break; /* must process due to
rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
HJ_FILL_INNER(hjstate))
break; /* must process due to
rule 1 */
- if (hashtable->innerBatchFile[curbatch] &&
+ if (hashtable->innerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_original)
break; /* must process due to
rule 2 */
- if (hashtable->outerBatchFile[curbatch] &&
+ if (hashtable->outerBatchFile[batchidx] &&
nbatch != hashtable->nbatch_outstart)
break; /* must process due to
rule 3 */
/* We can ignore this batch. */
/* Release associated temp files right away. */
- if (hashtable->innerBatchFile[curbatch])
- BufFileClose(hashtable->innerBatchFile[curbatch]);
- hashtable->innerBatchFile[curbatch] = NULL;
- if (hashtable->outerBatchFile[curbatch])
- BufFileClose(hashtable->outerBatchFile[curbatch]);
- hashtable->outerBatchFile[curbatch] = NULL;
- curbatch++;
+ if (hashtable->innerBatchFile[batchidx])
+ BufFileClose(hashtable->innerBatchFile[batchidx]);
+ hashtable->innerBatchFile[batchidx] = NULL;
+ if (hashtable->outerBatchFile[batchidx])
+ BufFileClose(hashtable->outerBatchFile[batchidx]);
+ hashtable->outerBatchFile[batchidx] = NULL;
+
+ curbatch = ExecHashSwitchToNextBatch(hashtable);
+ batchidx = ExecHashGetBatchIndex(hashtable, curbatch);
}
if (curbatch >= nbatch)
+ {
+ hashtable->curbatch = curbatch_old;
return false; /* no more batches */
-
- hashtable->curbatch = curbatch;
+ }
/*
* Reload the hash table with the new inner batch (which could be empty)
*/
ExecHashTableReset(hashtable);
- innerFile = hashtable->innerBatchFile[curbatch];
+ innerFile = hashtable->innerBatchFile[batchidx];
if (innerFile != NULL)
{
@@ -1056,15 +1079,15 @@ ExecHashJoinNewBatch(HashJoinState *hjstate)
* needed
*/
BufFileClose(innerFile);
- hashtable->innerBatchFile[curbatch] = NULL;
+ hashtable->innerBatchFile[batchidx] = NULL;
}
/*
* Rewind outer batch file (if present), so that we can start reading
it.
*/
- if (hashtable->outerBatchFile[curbatch] != NULL)
+ if (hashtable->outerBatchFile[batchidx] != NULL)
{
- if (BufFileSeek(hashtable->outerBatchFile[curbatch], 0, 0L,
SEEK_SET))
+ if (BufFileSeek(hashtable->outerBatchFile[batchidx], 0, 0L,
SEEK_SET))
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not rewind hash-join
temporary file: %m")));
diff --git a/src/backend/optimizer/path/costsize.c
b/src/backend/optimizer/path/costsize.c
index c7400941ee..e324869c09 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -3170,6 +3170,7 @@ initial_cost_hashjoin(PlannerInfo *root,
JoinCostWorkspace *workspace,
int num_hashclauses = list_length(hashclauses);
int numbuckets;
int numbatches;
+ int numbatches_inmemory;
int num_skew_mcvs;
size_t space_allowed; /* unused */
@@ -3219,6 +3220,7 @@ initial_cost_hashjoin(PlannerInfo *root,
JoinCostWorkspace *workspace,
&space_allowed,
&numbuckets,
&numbatches,
+ &numbatches_inmemory,
&num_skew_mcvs);
/*
diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h
index a9f9872a78..311a0980ee 100644
--- a/src/include/executor/hashjoin.h
+++ b/src/include/executor/hashjoin.h
@@ -308,6 +308,7 @@ typedef struct HashJoinTableData
int *skewBucketNums; /* array indexes of active skew
buckets */
int nbatch; /* number of batches */
+ int nbatch_inmemory; /* max number of
in-memory batches */
int curbatch; /* current batch #; 0
during 1st pass */
int nbatch_original; /* nbatch when we
started inner scan */
@@ -329,6 +330,9 @@ typedef struct HashJoinTableData
BufFile **innerBatchFile; /* buffered virtual temp file per batch */
BufFile **outerBatchFile; /* buffered virtual temp file per batch */
+ BufFile **innerOverflowFiles; /* temp file for inner overflow batches
*/
+ BufFile **outerOverflowFiles; /* temp file for outer overflow batches
*/
+
/*
* Info about the datatype-specific hash functions for the datatypes
being
* hashed. These are arrays of the same length as the number of hash
join
diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h
index 8d700c06c5..bb6b24a1b4 100644
--- a/src/include/executor/nodeHash.h
+++ b/src/include/executor/nodeHash.h
@@ -16,6 +16,7 @@
#include "access/parallel.h"
#include "nodes/execnodes.h"
+#include "storage/buffile.h"
struct SharedHashJoinBatch;
@@ -53,6 +54,11 @@ extern void ExecHashGetBucketAndBatch(HashJoinTable
hashtable,
uint32 hashvalue,
int *bucketno,
int *batchno);
+extern int ExecHashGetBatchIndex(HashJoinTable hashtable, int batchno);
+extern BufFile **ExecHashGetBatchFile(HashJoinTable hashtable, int batchno,
+ BufFile **batchFiles, BufFile
**overflowFiles);
+extern void ExecHashSwitchToNextBatchSlice(HashJoinTable hashtable);
+extern int ExecHashSwitchToNextBatch(HashJoinTable hashtable);
extern bool ExecScanHashBucket(HashJoinState *hjstate, ExprContext *econtext);
extern bool ExecParallelScanHashBucket(HashJoinState *hjstate, ExprContext
*econtext);
extern void ExecPrepHashTableForUnmatched(HashJoinState *hjstate);
@@ -66,6 +72,7 @@ extern void ExecChooseHashTableSize(double ntuples, int
tupwidth, bool useskew,
size_t *space_allowed,
int *numbuckets,
int *numbatches,
+ int *numbatches_inmemory,
int *num_skew_mcvs);
extern int ExecHashGetSkewBucket(HashJoinTable hashtable, uint32
hashvalue);
extern void ExecHashEstimate(HashState *node, ParallelContext *pcxt);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9959c9e31f..6c53c5abd2 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2115,6 +2115,7 @@ typedef struct HashInstrumentation
int nbuckets_original; /* planned number of
buckets */
int nbatch; /* number of batches at
end of execution */
int nbatch_original; /* planned number of
batches */
+ int nbatch_inmemory; /* number of batches
kept in memory */
size_t space_peak; /* speak memory usage in bytes
*/
} HashInstrumentation;
hashjoin-test.sh
Description: Bourne shell script
