Re: accounting for memory used for BufFile during hash joins

Поиск
Список
Период
Сортировка
От Hubert Zhang
Тема Re: accounting for memory used for BufFile during hash joins
Дата
Msg-id CAB0yremvswRAT86Afb9MZ_PaLHyY9BT313-adCHbhMJ=x_GEcg@mail.gmail.com
обсуждение исходный текст
Ответ на Re: accounting for memory used for BufFile during hash joins  (Robert Haas <robertmhaas@gmail.com>)
Список pgsql-hackers

On Fri, Jul 12, 2019 at 1:16 AM Robert Haas <robertmhaas@gmail.com> wrote:
On Mon, May 6, 2019 at 9:49 PM Thomas Munro <thomas.munro@gmail.com> wrote:
> Stepping back a bit, I think there is something fishy about the way we
> detect extreme skew.  Is that a factor in this case?  Right now we
> wait until we have a batch that gets split into child batches
> containing exactly 0% and 100% of the tuples before we give up.
> Previously I had thought of that as merely a waste of time, but
> clearly it's also a waste of unmetered memory.  Oops.
>
> I think our extreme skew detector should go off sooner, because
> otherwise if you have N nicely distributed unique keys and also M
> duplicates of one bad egg key that'll never fit in memory, we keep
> repartitioning until none of the N keys fall into the batch containing
> the key for the M duplicates before we give up!  You can use
> balls-into-bins maths to figure out the number, but I think that means
> we expect to keep splitting until we have N * some_constant batches,
> and that's just silly and liable to create massive numbers of
> partitions proportional to N, even though we're trying to solve a
> problem with M.  In another thread I suggested we should stop when
> (say) 95% of the tuples go to one child batch.  I'm not sure how you
> pick the number.

Another thing that is fishy about this is that we can't split a batch
or a bucket without splitting them all.  Let's say that nbatches *
nbuckets = 16 million. One bucket in one batch contains 90% of the
tuples. Splitting *that* bucket might be a good idea if only 5% of the
tuples end up moving, perhaps even if only 1% end up moving. But, if
you have to double the total number of batches to get that benefit,
it's a lot less compelling, because now you have to rescan the outer
side more times.
 
It seems to me that a good chunk of what's being proposed right now
basically ignores the fact that we're not really responding to the
skew in a very effective way.  Thomas wants to stop splitting all the
buckets when splitting one of the buckets produces only a very small
benefit rather than when it produces no benefit at all, but he's not
asking why we're splitting all of the buckets in the first place.
Tomas wants to slice the array of batches because there are so many of
them, but why are there so many? As he said himself, "it gets to that
many batches because some of the values are very common and we don't
disable the growth earlier."  Realistically, I don't see how there can
be so many batches that we can't even fit the metadata about the
matches into memory unless we're unnecessarily creating a lot of
little tiny batches that we don't really need.


+1 on Robert's suggestion. It's worth to find the root cause of the batch explosion problem.
As Robert pointed out "we can't split a batch without spilling them all".  In fact, the hybrid hash join algorithm should only split the overflow batch and avoid to split the small batch which could be processed in memory. Planner should calculate the initial batch number which ensure the average size batch could be processed in memory giving different data distribution. Executor should spilt skew batch in an one-batch-a-time manner.

I will firstly show an example to help understand batch explosion problem. 
Suppose we are going to join R and S and planner calculate the initial nbatch as 4.
In the first batch run, during HJ_BUILD_HASHTABLE state we Scan R and build in memory hash table for batch1 and spill other tuples of R into different batch files(R2-R4). During HJ_NEED_NEW_OUTER and HJ_SCAN_BUCKET state, we do two things: 1. if tuple in S belong to current batch, match it with in memory R1 and emit result to parent plan node; 2. if tuple in S doesn't belong to current batch, spill it to batch files of S2-S4.  As a result, after the first batch run we get:
6 disk files: batch2(R2,S2), batch3(R3,S3) batch4(R4,S4)

Now we run into HJ_NEED_NEW_BATCH state and begin to process R2 and S2. Suppose the second batch R2 is skewed and need to split batch number to 8. When building in-memory hash table for R2, we also split some tuples in R2 into spill file R6.(Based on our hash function, tuples belong to R2 will not be shuffled to batches except R6). After R2's hash table is built, we begin to probe tuples in S2. Since batch number is changed from 4 to 8, some of tuples in S2 now belong to S6 and we spilt them to disk file S6. For other tuples in S2, we match them with R2 and output the result to parent plannode. After the second batch processed, we got:
disk files: batch3(R3,S3), batch4(R4,S4),batch(R6,S6)

Next, we  begin to process R3 and S3. The third batch R3 is not skewed, but since our hash function depends on batch number, which is 8 now. So we have to split some  tuples in R3 to disk file R7, which is not necessary. When Probing S3, we also need to spilt some tuples in S3 into S7, which is not necessary either. Since R3 could be loaded into memory entirely, spill part of R3 to disk file not only introduce more file and file buffers(which is problem Tomas try to solve), but also slow down the performance. After the third batch processed, we got:
disk files:  batch4(R4,S4),batch(R6,S6),batch(R7,S7)

Next, we begin to process R4 and S4. Similar to R3, some tuples in R4 also need to be spilled to file R8. But after this splitting, suppose R4 is still skewed, and we increase the batch number again to 16. As a result, some tuples in R4 will be spilled to file R12 and R16. When probing S4, similarly we need to split some tuples in S4 into S8,S12 and S16. After this step, we get:
 disk files:  batch(R6,S6),batch(R7,S7),batch(R8,S8),batch(R12,S12),batch(R16,S16).

Next, when we begin to process R6 and S6, even if we could build hash table for R6 all in memory, but we have to spilt R6 based on new batch number 16 and spill to file: R14. It's not necessary.

Now we could conclude that increasing batch number would introduce unnecessary repeated spill not only on original batch(R3,S3) but also on new generated batch(R6,S6) in a cascade way. In a worse case, suppose R2 is super skew and need to split 10 times, while R3 is OK to build hash table all in memory. In this case, we have to introduce R7,R11,....,R4095, total 1023 unnecessary spill files. Each of these files may only contain less than ten tuples. Also, we need to palloc file buffer(512KB) for these spill files. This is the so called batch explosion problem.

Solutions:
To avoid these unnecessary repeated spill, I propose to make function ExecHashGetBucketAndBatch as a hash function chain to determine the batchno.
Here is the original implementation of ExecHashGetBucketAndBatch
```

//nbatch is the global batch number

*batchno = (hashvalue >> hashtable->log2_nbuckets) & (nbatch - 1);

```
We can see the original hash function basically calculate MOD of global batch number(IBN).

A real hybrid hash join should use a hash function chain to determine the batchno. In the new algorithm, the component of hash function chain is defined as: MOD of #IBN, MOD of #IBN*2, MOD of #IBN*4,MOD of #IBN*8 ....etc. A small batch will just use the first hash function in chain, while the skew batch will use the same number of hash functions in chain as the times it is split.
Here is the new implementation of ExecHashGetBucketAndBatch
```
/* i is the current batchno we are processing */
/* hashChainLen record the times batch i is spilt */
for (j=0;j<hashChainLen[i];j++)
{
    batchno = (hashvalue >> hashtable->log2_nbuckets) & ((#initialBatch)* (2^j) - 1);
    /* if the calculated batchno is still i, we need to call more hash functions
     * in chain to determine the final bucketno, else we could return directly.
     */
    if ( batchno != i ) 
       return batchno;
}
return batchno;
```

A quick example, Suppose R3's input is  3,7,11,15,19,23,27,31,35,15,27(we could ensure they MOD4=3)
Suppose Initial batch number is 4 and memory could contain 4 tuples, the 5th tuple need to do batch spilt.
Step1: batch3 process 3,7,11,15,19 and now need to split, 
            chainLen[3]=2
            batch3: 3,11,19
            batch7: 7,15
Step2: 23,27,31 coming
           batch3: 3,11,19,27
           batch7: 7,15,23,31
Step 3: 35 coming, batch3 need to split again
           chainLen[3]=3
           batch3: 3,19,35
           batch7: 7,15,23,31
           batch11: 11,27
Step 4  15 coming, HashFun1 15%4=3, HashFun2 15%8=7; 
           since 7!=3 spill 15 to batch7.
Step 5  27 coming, 27%4=3, 27%8=3, 27%16 =11
           since 27!=3 spill 27 to batch 11. 
Final state:
           chainLen[3]=3
           batch3:  3,19,35
           batch7:  7,15,23,31,15
           batch11: 11,27,27

Here is pseudo code of processing of batch i:
```
/*Step 1: build hash table for Ri*/
tuple = ReadFromFile(Ri);
/* get batchno by the new function*/
batchno =NewExecHashGetBucketAndBatch()
/* do spill if not belong to current batch*/
if(batchno != i)
    spill to file[batchno]
flag = InsertTupleToHashTable(HT, tuple);
if (flag == NEED_SPILT)
{
    hashChainLen[i] ++;
    /* then call ExecHashIncreaseNumBatches() to do the real spill */
}

/* probe stage */
tuple = ReadFromFile(S[i+Bi*k]);
batchno = NewExecHashGetBucketAndBatch()
if (batchno == curbatch)
   probe and match
else
   spillToFile(tuple, batchno)
}
```

This solution only split the batch which needs to be split in a lazy way. If this solution makes sense, I would like write the real patch. 
Any comment?


--
Thanks

Hubert Zhang

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Ashutosh Sharma
Дата:
Сообщение: Re: Zedstore - compressed in-core columnar storage
Следующее
От: Antonin Houska
Дата:
Сообщение: Re: [Proposal] Table-level Transparent Data Encryption (TDE) and Key Management Service (KMS)