Re: POC: Cleaning up orphaned files using undo logs

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: POC: Cleaning up orphaned files using undo logs
Дата
Msg-id 20190806075626.zgpx3g2wsuebdbvq@alap3.anarazel.de
обсуждение исходный текст
Ответ на Re: POC: Cleaning up orphaned files using undo logs  (Andres Freund <andres@anarazel.de>)
Ответы Re: POC: Cleaning up orphaned files using undo logs  (Andres Freund <andres@anarazel.de>)
Re: POC: Cleaning up orphaned files using undo logs  (Kuntal Ghosh <kuntalghosh.2007@gmail.com>)
Re: POC: Cleaning up orphaned files using undo logs  (Dilip Kumar <dilipbalaut@gmail.com>)
Re: POC: Cleaning up orphaned files using undo logs  (Dilip Kumar <dilipbalaut@gmail.com>)
Re: POC: Cleaning up orphaned files using undo logs  (Amit Kapila <amit.kapila16@gmail.com>)
Re: POC: Cleaning up orphaned files using undo logs  (Dilip Kumar <dilipbalaut@gmail.com>)
Список pgsql-hackers
Hi,

On 2019-08-05 11:29:34 -0700, Andres Freund wrote:
> Need to do something else for a bit. More later.

Here we go.




> + /*
> +  * Compute the header size of the undo record.
> +  */
> +Size
> +UndoRecordHeaderSize(uint16 uur_info)
> +{
> +    Size        size;
> +
> +    /* Add fixed header size. */
> +    size = SizeOfUndoRecordHeader;
> +
> +    /* Add size of transaction header if it presets. */
> +    if ((uur_info & UREC_INFO_TRANSACTION) != 0)
> +        size += SizeOfUndoRecordTransaction;
> +
> +    /* Add size of rmid if it presets. */
> +    if ((uur_info & UREC_INFO_RMID) != 0)
> +        size += sizeof(RmgrId);
> +
> +    /* Add size of reloid if it presets. */
> +    if ((uur_info & UREC_INFO_RELOID) != 0)
> +        size += sizeof(Oid);
> +
> +    /* Add size of fxid if it presets. */
> +    if ((uur_info & UREC_INFO_XID) != 0)
> +        size += sizeof(FullTransactionId);
> +
> +    /* Add size of cid if it presets. */
> +    if ((uur_info & UREC_INFO_CID) != 0)
> +        size += sizeof(CommandId);
> +
> +    /* Add size of forknum if it presets. */
> +    if ((uur_info & UREC_INFO_FORK) != 0)
> +        size += sizeof(ForkNumber);
> +
> +    /* Add size of prevundo if it presets. */
> +    if ((uur_info & UREC_INFO_PREVUNDO) != 0)
> +        size += sizeof(UndoRecPtr);
> +
> +    /* Add size of the block header if it presets. */
> +    if ((uur_info & UREC_INFO_BLOCK) != 0)
> +        size += SizeOfUndoRecordBlock;
> +
> +    /* Add size of the log switch header if it presets. */
> +    if ((uur_info & UREC_INFO_LOGSWITCH) != 0)
> +        size += SizeOfUndoRecordLogSwitch;
> +
> +    /* Add size of the payload header if it presets. */
> +    if ((uur_info & UREC_INFO_PAYLOAD) != 0)
> +        size += SizeOfUndoRecordPayload;

There's numerous blocks with one if for each type, and the body copied
basically the same for each alternative. That doesn't seem like a
reasonable approach to me. Means that many places need to be adjusted
when we invariably add another type, and seems likely to lead to bugs
over time.

> +    /* Add size of the payload header if it presets. */

FWIW, repeating the same comment, with or without minor differences, 10
times is a bad idea. Especially when the comment doesn't add *any* sort
of information.

Also, "if it presets" presumably is a typo?


> +/*
> + * Compute and return the expected size of an undo record.
> + */
> +Size
> +UndoRecordExpectedSize(UnpackedUndoRecord *uur)
> +{
> +    Size        size;
> +
> +    /* Header size. */
> +    size = UndoRecordHeaderSize(uur->uur_info);
> +
> +    /* Payload data size. */
> +    if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
> +    {
> +        size += uur->uur_payload.len;
> +        size += uur->uur_tuple.len;
> +    }
> +
> +    /* Add undo record length size. */
> +    size += sizeof(uint16);
> +
> +    return size;
> +}
> +
> +/*
> + * Calculate the size of the undo record stored on the page.
> + */
> +static inline Size
> +UndoRecordSizeOnPage(char *page_ptr)
> +{
> +    uint16        uur_info = ((UndoRecordHeader *) page_ptr)->urec_info;
> +    Size        size;
> +
> +    /* Header size. */
> +    size = UndoRecordHeaderSize(uur_info);
> +
> +    /* Payload data size. */
> +    if ((uur_info & UREC_INFO_PAYLOAD) != 0)
> +    {
> +        UndoRecordPayload *payload = (UndoRecordPayload *) (page_ptr + size);
> +
> +        size += payload->urec_payload_len;
> +        size += payload->urec_tuple_len;
> +    }
> +
> +    return size;
> +}
> +
> +/*
> + * Compute size of the Unpacked undo record in memory
> + */
> +Size
> +UnpackedUndoRecordSize(UnpackedUndoRecord *uur)
> +{
> +    Size        size;
> +
> +    size = sizeof(UnpackedUndoRecord);
> +
> +    /* Add payload size if record contains payload data. */
> +    if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
> +    {
> +        size += uur->uur_payload.len;
> +        size += uur->uur_tuple.len;
> +    }
> +
> +    return size;
> +}

These functions are all basically the same. We shouldn't copy code over
and over like this.


> +/*
> + * Initiate inserting an undo record.
> + *
> + * This function will initialize the context for inserting and undo record
> + * which will be inserted by calling InsertUndoData.
> + */
> +void
> +BeginInsertUndo(UndoPackContext *ucontext, UnpackedUndoRecord *uur)
> +{
> +    ucontext->stage = UNDO_PACK_STAGE_HEADER;
> +    ucontext->already_processed = 0;
> +    ucontext->partial_bytes = 0;
> +
> +    /* Copy undo record header. */
> +    ucontext->urec_hd.urec_type = uur->uur_type;
> +    ucontext->urec_hd.urec_info = uur->uur_info;
> +
> +    /* Copy undo record transaction header if it is present. */
> +    if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
> +        memcpy(&ucontext->urec_txn, uur->uur_txn, SizeOfUndoRecordTransaction);
> +
> +    /* Copy rmid if present. */
> +    if ((uur->uur_info & UREC_INFO_RMID) != 0)
> +        ucontext->urec_rmid = uur->uur_rmid;
> +
> +    /* Copy reloid if present. */
> +    if ((uur->uur_info & UREC_INFO_RELOID) != 0)
> +        ucontext->urec_reloid = uur->uur_reloid;
> +
> +    /* Copy fxid if present. */
> +    if ((uur->uur_info & UREC_INFO_XID) != 0)
> +        ucontext->urec_fxid = uur->uur_fxid;
> +
> +    /* Copy cid if present. */
> +    if ((uur->uur_info & UREC_INFO_CID) != 0)
> +        ucontext->urec_cid = uur->uur_cid;
> +
> +    /* Copy undo record relation header if it is present. */
> +    if ((uur->uur_info & UREC_INFO_FORK) != 0)
> +        ucontext->urec_fork = uur->uur_fork;
> +
> +    /* Copy prev undo record pointer if it is present. */
> +    if ((uur->uur_info & UREC_INFO_PREVUNDO) != 0)
> +        ucontext->urec_prevundo = uur->uur_prevundo;
> +
> +    /* Copy undo record block header if it is present. */
> +    if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
> +    {
> +        ucontext->urec_blk.urec_block = uur->uur_block;
> +        ucontext->urec_blk.urec_offset = uur->uur_offset;
> +    }
> +
> +    /* Copy undo record log switch header if it is present. */
> +    if ((uur->uur_info & UREC_INFO_LOGSWITCH) != 0)
> +        memcpy(&ucontext->urec_logswitch, uur->uur_logswitch,
> +               SizeOfUndoRecordLogSwitch);
> +
> +    /* Copy undo record payload header and data if it is present. */
> +    if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
> +    {
> +        ucontext->urec_payload.urec_payload_len = uur->uur_payload.len;
> +        ucontext->urec_payload.urec_tuple_len = uur->uur_tuple.len;
> +        ucontext->urec_payloaddata = uur->uur_payload.data;
> +        ucontext->urec_tupledata = uur->uur_tuple.data;
> +    }
> +    else
> +    {
> +        ucontext->urec_payload.urec_payload_len = 0;
> +        ucontext->urec_payload.urec_tuple_len = 0;
> +    }
> +
> +    /* Compute undo record expected size and store in the context. */
> +    ucontext->undo_len = UndoRecordExpectedSize(uur);
> +}

It really can't be right to have all these fields basically twice, in
UnackedUndoRecord, and UndoPackContext. And then copy them one-by-one.
I mean there's really just some random differences (ordering, some field
names) between the structures, but otherwise they're the same?

What on earth do we gain by this?  This entire intermediate stage makes
no sense at all to me. We copy data into an UndoRecord, then we copy
into an UndoRecordContext, with essentially a field-by-field copy
logic. Then we have another field-by-field logic that copies the data
into the page.




> +/*
> + * Insert the undo record into the input page from the unpack undo context.
> + *
> + * Caller can  call this function multiple times until desired stage is reached.
> + * This will write the undo record into the page.
> + */
> +void
> +InsertUndoData(UndoPackContext *ucontext, Page page, int starting_byte)
> +{
> +    char       *writeptr = (char *) page + starting_byte;
> +    char       *endptr = (char *) page + BLCKSZ;
> +
> +    switch (ucontext->stage)
> +    {
> +        case UNDO_PACK_STAGE_HEADER:
> +            /* Insert undo record header. */
> +            if (!InsertUndoBytes((char *) &ucontext->urec_hd,
> +                                 SizeOfUndoRecordHeader, &writeptr, endptr,
> +                                 &ucontext->already_processed,
> +                                 &ucontext->partial_bytes))
> +                return;
> +            ucontext->stage = UNDO_PACK_STAGE_TRANSACTION;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_TRANSACTION:
> +            if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0)
> +            {
> +                /* Insert undo record transaction header. */
> +                if (!InsertUndoBytes((char *) &ucontext->urec_txn,
> +                                     SizeOfUndoRecordTransaction,
> +                                     &writeptr, endptr,
> +                                     &ucontext->already_processed,
> +                                     &ucontext->partial_bytes))
> +                    return;
> +            }
> +            ucontext->stage = UNDO_PACK_STAGE_RMID;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_RMID:
> +            /* Write rmid(if needed and not already done). */
> +            if ((ucontext->urec_hd.urec_info & UREC_INFO_RMID) != 0)
> +            {
> +                if (!InsertUndoBytes((char *) &(ucontext->urec_rmid), sizeof(RmgrId),
> +                                     &writeptr, endptr,
> +                                     &ucontext->already_processed,
> +                                     &ucontext->partial_bytes))
> +                    return;
> +            }
> +            ucontext->stage = UNDO_PACK_STAGE_RELOID;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_RELOID:
> +            /* Write reloid(if needed and not already done). */
> +            if ((ucontext->urec_hd.urec_info & UREC_INFO_RELOID) != 0)
> +            {
> +                if (!InsertUndoBytes((char *) &(ucontext->urec_reloid), sizeof(Oid),
> +                                     &writeptr, endptr,
> +                                     &ucontext->already_processed,
> +                                     &ucontext->partial_bytes))
> +                    return;
> +            }
> +            ucontext->stage = UNDO_PACK_STAGE_XID;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_XID:
> +            /* Write xid(if needed and not already done). */
> +            if ((ucontext->urec_hd.urec_info & UREC_INFO_XID) != 0)
> +            {
> +                if (!InsertUndoBytes((char *) &(ucontext->urec_fxid), sizeof(FullTransactionId),
> +                                     &writeptr, endptr,
> +                                     &ucontext->already_processed,
> +                                     &ucontext->partial_bytes))
> +                    return;
> +            }
> +            ucontext->stage = UNDO_PACK_STAGE_CID;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_CID:
> +            /* Write cid(if needed and not already done). */
> +            if ((ucontext->urec_hd.urec_info & UREC_INFO_CID) != 0)
> +            {
> +                if (!InsertUndoBytes((char *) &(ucontext->urec_cid), sizeof(CommandId),
> +                                     &writeptr, endptr,
> +                                     &ucontext->already_processed,
> +                                     &ucontext->partial_bytes))
> +                    return;
> +            }
> +            ucontext->stage = UNDO_PACK_STAGE_FORKNUM;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_FORKNUM:
> +            if ((ucontext->urec_hd.urec_info & UREC_INFO_FORK) != 0)
> +            {
> +                /* Insert undo record fork number. */
> +                if (!InsertUndoBytes((char *) &ucontext->urec_fork,
> +                                     sizeof(ForkNumber),
> +                                     &writeptr, endptr,
> +                                     &ucontext->already_processed,
> +                                     &ucontext->partial_bytes))
> +                    return;
> +            }
> +            ucontext->stage = UNDO_PACK_STAGE_PREVUNDO;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_PREVUNDO:
> +            if ((ucontext->urec_hd.urec_info & UREC_INFO_PREVUNDO) != 0)
> +            {
> +                /* Insert undo record blkprev. */
> +                if (!InsertUndoBytes((char *) &ucontext->urec_prevundo,
> +                                     sizeof(UndoRecPtr),
> +                                     &writeptr, endptr,
> +                                     &ucontext->already_processed,
> +                                     &ucontext->partial_bytes))
> +                    return;
> +            }
> +            ucontext->stage = UNDO_PACK_STAGE_BLOCK;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_BLOCK:
> +            if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0)
> +            {
> +                /* Insert undo record block header. */
> +                if (!InsertUndoBytes((char *) &ucontext->urec_blk,
> +                                     SizeOfUndoRecordBlock,
> +                                     &writeptr, endptr,
> +                                     &ucontext->already_processed,
> +                                     &ucontext->partial_bytes))
> +                    return;
> +            }
> +            ucontext->stage = UNDO_PACK_STAGE_LOGSWITCH;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_LOGSWITCH:
> +            if ((ucontext->urec_hd.urec_info & UREC_INFO_LOGSWITCH) != 0)
> +            {
> +                /* Insert undo record transaction header. */
> +                if (!InsertUndoBytes((char *) &ucontext->urec_logswitch,
> +                                     SizeOfUndoRecordLogSwitch,
> +                                     &writeptr, endptr,
> +                                     &ucontext->already_processed,
> +                                     &ucontext->partial_bytes))
> +                    return;
> +            }
> +            ucontext->stage = UNDO_PACK_STAGE_PAYLOAD;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_PAYLOAD:
> +            if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0)
> +            {
> +                /* Insert undo record payload header. */
> +                if (!InsertUndoBytes((char *) &ucontext->urec_payload,
> +                                     SizeOfUndoRecordPayload,
> +                                     &writeptr, endptr,
> +                                     &ucontext->already_processed,
> +                                     &ucontext->partial_bytes))
> +                    return;
> +            }
> +            ucontext->stage = UNDO_PACK_STAGE_PAYLOAD_DATA;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_PAYLOAD_DATA:
> +            {
> +                int            len = ucontext->urec_payload.urec_payload_len;
> +
> +                if (len > 0)
> +                {
> +                    /* Insert payload data. */
> +                    if (!InsertUndoBytes((char *) ucontext->urec_payloaddata,
> +                                         len, &writeptr, endptr,
> +                                         &ucontext->already_processed,
> +                                         &ucontext->partial_bytes))
> +                        return;
> +                }
> +                ucontext->stage = UNDO_PACK_STAGE_TUPLE_DATA;
> +            }
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_TUPLE_DATA:
> +            {
> +                int            len = ucontext->urec_payload.urec_tuple_len;
> +
> +                if (len > 0)
> +                {
> +                    /* Insert tuple data. */
> +                    if (!InsertUndoBytes((char *) ucontext->urec_tupledata,
> +                                         len, &writeptr, endptr,
> +                                         &ucontext->already_processed,
> +                                         &ucontext->partial_bytes))
> +                        return;
> +                }
> +                ucontext->stage = UNDO_PACK_STAGE_UNDO_LENGTH;
> +            }
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_UNDO_LENGTH:
> +            /* Insert undo length. */
> +            if (!InsertUndoBytes((char *) &ucontext->undo_len,
> +                                 sizeof(uint16), &writeptr, endptr,
> +                                 &ucontext->already_processed,
> +                                 &ucontext->partial_bytes))
> +                return;
> +
> +            ucontext->stage = UNDO_PACK_STAGE_DONE;
> +            /* fall through */
> +
> +        case UNDO_PACK_STAGE_DONE:
> +            /* Nothing to be done. */
> +            break;
> +
> +        default:
> +            Assert(0);            /* Invalid stage */
> +    }
> +}

I don't understand. The only purpose of this is that we can partially
write a packed-but-not-actually-packed record onto a bunch of pages? And
for that we have an endless chain of copy and pasted code calling
InsertUndoBytes()? Copying data into shared buffers in tiny increments?

If we need to this, what is the whole packed record format good for?
Except for adding a bunch of functions with 10++ ifs and nearly
identical code?

Copying data is expensive. Copying data in tiny increments is more
expensive. Copying data in tiny increments, with a bunch of branches, is
even more expensive. Copying data in tiny increments, with a bunch of
branches, is even more expensive, especially when it's shared
memory. Copying data in tiny increments, with a bunch of branches, is
even more expensive, especially when it's shared memory, especially when
all that shared meory is locked at once.


> +/*
> + * Read the undo record from the input page to the unpack undo context.
> + *
> + * Caller can  call this function multiple times until desired stage is reached.
> + * This will read the undo record from the page and store the data into unpack
> + * undo context, which can be later copied to unpacked undo record by calling
> + * FinishUnpackUndo.
> + */
> +void
> +UnpackUndoData(UndoPackContext *ucontext, Page page, int starting_byte)
> +{
> +    char       *readptr = (char *) page + starting_byte;
> +    char       *endptr = (char *) page + BLCKSZ;
> +
> +    switch (ucontext->stage)
> +    {
> +        case UNDO_PACK_STAGE_HEADER:

You know roughly what I'm thinking.




> commit 95d10fb308e3ec6ac8a7b4b5e7af78f6825f4dc8
> Author:     Amit Kapila <amit.kapila@enterprisedb.com>
> AuthorDate: 2019-06-13 15:10:06 +0530
> Commit:     Amit Kapila <amit.kapila@enterprisedb.com>
> CommitDate: 2019-07-31 16:36:52 +0530
>
>     Infrastructure to register and fetch undo action requests.

I'm pretty sure I suggested that before, but this seems the wrong
order. We should have very basic undo functionality in place, even if it
can't actually guarantee that undo gets processed, before this. The
design of this piece depends on understanding the later parts too much.


>     This infrasture provides a way to allow execution of undo actions.  One
>     might think that we can always execute undo actions on error or explicit
>     rollabck by user, however there are cases when that is not posssible.

s/rollabck by user/rollback by a user/


>     For example, (a) if the system crash while doing operation, then after
>     startup, we need a way to perform undo actions; (b) If we get error while
>     performing undo actions.

"doing operation" doesn't sound right. Maybe "performing an operation"?


>     Apart from this, when there are large rollback requests, then it is quite
>     inefficient to perform all the undo actions and then return control to
>     user.

I don't think efficiency is the right word to describe that. I'd argue
that it's probably often at least as efficient to let that rollback be
processed in that context (higher cache locality, preventing that
backend from creating further undo). It's just that doing so has a bad
effect on latency.


>     To allow efficient execution of the undo actions, we create three queues
>     and a hash table for the rollback requests.

Again I don't think efficient is the right descriptor. My understanding
of the goals of having multiple queues is that it helps to achieve
forward progress among separate goals, without loosing too much
efficiency.

>     A Xid based priority queue
>     which will allow us to process the requests of older transactions and help
>     us to move oldesdXidHavingUnappliedUndo (this is a xid-horizon below which
>     all the transactions are visible) forward.

"This is an important concern, because ..."




> +/*
> + * Returns the undo record pointer corresponding to first record in the given
> + * block.
> + */
> +UndoRecPtr
> +UndoBlockGetFirstUndoRecord(BlockNumber blkno, UndoRecPtr urec_ptr,
> +                            UndoLogCategory category)
> +{
> +    Buffer buffer;
> +    Page page;
> +    UndoPageHeader    phdr;
> +    RelFileNode        rnode;
> +    UndoLogOffset    log_cur_off;
> +    Size            partial_rec_size;
> +    int                offset_cur_page;
> +
> +    if (!BlockNumberIsValid(blkno))
> +        ereport(ERROR,
> +                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> +                 errmsg("invalid undo block number")));
> +
> +    UndoRecPtrAssignRelFileNode(rnode, urec_ptr);
> +
> +    buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum, blkno,
> +                                       RBM_NORMAL, NULL,
> +                                       RelPersistenceForUndoLogCategory(category));
> +
> +    LockBuffer(buffer, BUFFER_LOCK_SHARE);
> +
> +    page = BufferGetPage(buffer);
> +    phdr = (UndoPageHeader)page;
> +
> +    /* Calculate the size of the partial record. */
> +    partial_rec_size = UndoRecordHeaderSize(phdr->uur_info) +
> +                        phdr->tuple_len + phdr->payload_len -
> +                        phdr->record_offset;
> +
> +    /* calculate the offset in current log. */
> +    offset_cur_page = SizeOfUndoPageHeaderData + partial_rec_size;
> +    log_cur_off = (blkno * BLCKSZ) + offset_cur_page;
> +
> +    UnlockReleaseBuffer(buffer);
> +
> +    /* calculate the undo record pointer based on current offset in log. */
> +    return MakeUndoRecPtr(UndoRecPtrGetLogNo(urec_ptr), log_cur_off);
> +}

Yet another function reading undo blocks. No.


>     The undo requests must appear in both xid and size
> + * requests queues or neither.

Why?

>     As of now we, process the requests from these
> + * queues in a round-robin fashion to give equal priority to all three type
> + * of requests.

*types


> + * The rollback requests exceeding a certain threshold are pushed into both
> + * xid and size based queues.  They are also registered in the hash table.

Why aren't rollbacks below the threshold in the hashtable?


> + * To ensure that backend and discard worker don't register the same request
> + * in the hash table, we always register the request with full_xid and the
> + * start pointer for the transaction in the hash table as key.  Backends
> + * always remember the value of start pointer, but discard worker doesn't know

*the discard worker

There's no explanation as to why we need more than the full_xid
(presumably persistency levels). Nor why you chose not to include those.


> + * the actual start value in case transaction's undo spans across multiple
> + * logs.  The reason for the same is that discard worker might encounter the
> + * log which has overflowed undo records of the transaction first.

"the log which has overflowed undo records of the transaction first" is
confusing. Perhaps "the undo log into which the logically earlier undo
overflowed before encountering the logically earlier undo"?


> In such
> + * cases, we need to compute the actual start position.  The first record of a
> + * transaction in each undo log contains a reference to the first record of
> + * this transaction in the previous log.  By following the previous log chain
> + * of this transaction, we find the initial location which is used to register
> + * the request.

It seem wrong that the undo request layer needs to care about any of
this.


> +/* Each worker queue is a binary heap. */
> +typedef struct
> +{
> +    binaryheap *bh;
> +    union
> +    {
> +        UndoXidQueue *xid_elems;
> +        UndoSizeQueue *size_elems;
> +        UndoErrorQueue *error_elems;
> +    }            q_choice;
> +} UndoWorkerQueue;

As we IIRC have decided to change this into a rbtree, I'll ignore
related parts of the current code.  What is the status of that work?
I've checked the git trees, without seeing anything? Your last mail with
patches
https://www.postgresql.org/message-id/CAA4eK1KKAFBCJuPnFtgdc89djv4xO%3DZkAdXvKQinqN4hWiRbvA%40mail.gmail.com
doesn't seem to contain that either?


> +/* Different operations for XID queue */
> +#define InitXidQueue(bh, elems) \
> +( \
> +    UndoWorkerQueues[XID_QUEUE].bh = bh, \
> +    UndoWorkerQueues[XID_QUEUE].q_choice.xid_elems = elems \
> +)
> +
> +#define XidQueueIsEmpty() \
> +    (binaryheap_empty(UndoWorkerQueues[XID_QUEUE].bh))
> +
> +#define GetXidQueueSize() \
> +    (binaryheap_cur_size(UndoWorkerQueues[XID_QUEUE].bh))
> +
> +#define GetXidQueueElem(elem) \
> +    (UndoWorkerQueues[XID_QUEUE].q_choice.xid_elems[elem])
> +
> +#define GetXidQueueTopElem() \
> +( \
> +    AssertMacro(!binaryheap_empty(UndoWorkerQueues[XID_QUEUE].bh)), \
> +    DatumGetPointer(binaryheap_first(UndoWorkerQueues[XID_QUEUE].bh)) \
> +)
> +
> +#define GetXidQueueNthElem(n) \
> +( \
> +    AssertMacro(!XidQueueIsEmpty()), \
> +    DatumGetPointer(binaryheap_nth(UndoWorkerQueues[XID_QUEUE].bh, n)) \
> +)
> +
> +#define SetXidQueueElem(elem, e_dbid, e_full_xid, e_start_urec_ptr) \
> +( \
> +    GetXidQueueElem(elem).dbid = e_dbid, \
> +    GetXidQueueElem(elem).full_xid = e_full_xid, \
> +    GetXidQueueElem(elem).start_urec_ptr = e_start_urec_ptr \
> +)
> +
> +/* Different operations for SIZE queue */
> +#define InitSizeQueue(bh, elems) \
> +( \
> +    UndoWorkerQueues[SIZE_QUEUE].bh = bh, \
> +    UndoWorkerQueues[SIZE_QUEUE].q_choice.size_elems = elems \
> +)
> +
> +#define SizeQueueIsEmpty() \
> +    (binaryheap_empty(UndoWorkerQueues[SIZE_QUEUE].bh))
> +
> +#define GetSizeQueueSize() \
> +    (binaryheap_cur_size(UndoWorkerQueues[SIZE_QUEUE].bh))
> +
> +#define GetSizeQueueElem(elem) \
> +    (UndoWorkerQueues[SIZE_QUEUE].q_choice.size_elems[elem])
> +
> +#define GetSizeQueueTopElem() \
> +( \
> +    AssertMacro(!SizeQueueIsEmpty()), \
> +    DatumGetPointer(binaryheap_first(UndoWorkerQueues[SIZE_QUEUE].bh)) \
> +)
> +
> +#define GetSizeQueueNthElem(n) \
> +( \
> +    AssertMacro(!SizeQueueIsEmpty()), \
> +    DatumGetPointer(binaryheap_nth(UndoWorkerQueues[SIZE_QUEUE].bh, n)) \
> +)
> +
> +#define SetSizeQueueElem(elem, e_dbid, e_full_xid, e_size, e_start_urec_ptr) \
> +( \
> +    GetSizeQueueElem(elem).dbid = e_dbid, \
> +    GetSizeQueueElem(elem).full_xid = e_full_xid, \
> +    GetSizeQueueElem(elem).request_size = e_size, \
> +    GetSizeQueueElem(elem).start_urec_ptr = e_start_urec_ptr \
> +)
> +
> +/* Different operations for Error queue */
> +#define InitErrorQueue(bh, elems) \
> +( \
> +    UndoWorkerQueues[ERROR_QUEUE].bh = bh, \
> +    UndoWorkerQueues[ERROR_QUEUE].q_choice.error_elems = elems \
> +)
> +
> +#define ErrorQueueIsEmpty() \
> +    (binaryheap_empty(UndoWorkerQueues[ERROR_QUEUE].bh))
> +
> +#define GetErrorQueueSize() \
> +    (binaryheap_cur_size(UndoWorkerQueues[ERROR_QUEUE].bh))
> +
> +#define GetErrorQueueElem(elem) \
> +    (UndoWorkerQueues[ERROR_QUEUE].q_choice.error_elems[elem])
> +
> +#define GetErrorQueueTopElem() \
> +( \
> +    AssertMacro(!binaryheap_empty(UndoWorkerQueues[ERROR_QUEUE].bh)), \
> +    DatumGetPointer(binaryheap_first(UndoWorkerQueues[ERROR_QUEUE].bh)) \
> +)
> +
> +#define GetErrorQueueNthElem(n) \
> +( \
> +    AssertMacro(!ErrorQueueIsEmpty()), \
> +    DatumGetPointer(binaryheap_nth(UndoWorkerQueues[ERROR_QUEUE].bh, n)) \
> +)


-ETOOMANYMACROS

I think nearly all of these shouldn't exist. See further below.


> +#define SetErrorQueueElem(elem, e_dbid, e_full_xid, e_start_urec_ptr, e_retry_at, e_occurred_at) \
> +( \
> +    GetErrorQueueElem(elem).dbid = e_dbid, \
> +    GetErrorQueueElem(elem).full_xid = e_full_xid, \
> +    GetErrorQueueElem(elem).start_urec_ptr = e_start_urec_ptr, \
> +    GetErrorQueueElem(elem).next_retry_at = e_retry_at, \
> +    GetErrorQueueElem(elem).err_occurred_at = e_occurred_at \
> +)

It's very very rarely a good idea to have macros that evaluate their
arguments multiple times. It'll also never be a good idea to get the
same element multiple times from a queue.  If needed - I'm very doubtful
of that, given that there's a single caller - it should be a static
inline function that gets the element once, stores it in a local
variable, and then updates all the fields.


> +/*
> + * Binary heap comparison function to compare the size of transactions.
> + */
> +static int
> +undo_size_comparator(Datum a, Datum b, void *arg)
> +{
> +    UndoSizeQueue *sizeQueueElem1 = (UndoSizeQueue *) DatumGetPointer(a);
> +    UndoSizeQueue *sizeQueueElem2 = (UndoSizeQueue *) DatumGetPointer(b);
>
It's very odd that elements are named 'Queue' rather than a queue element.


> +/*
> + * Binary heap comparison function to compare the time at which an error
> + * occurred for transactions.
> + *
> + * The error queue is sorted by next_retry_at and err_occurred_at.  Currently,
> + * the next_retry_at has some constant delay time (see PushErrorQueueElem), so
> + * it doesn't make much sense to sort by both values.  However, in future, if
> + * we have some different algorithm for next_retry_at, then it will work
> + * seamlessly.
> + */

Why is it useful to have error_occurred_at be part of the comparison at
all? If we need a tiebraker, err_occurred_at isn't that (if we can get
conflicts for next_retry_at, then we can also get conflicts in
err_occurred_at).  Seems better to use something actually guaranteed to
be unique for a tiebreaker.


> +int
> +UndoRollbackHashTableSize()
> +{

missing void, at least compared to our common style.


> +    /*
> +     * The rollback hash table is used to avoid duplicate undo requests by
> +     * backends and discard worker.  The table must be able to accomodate all
> +     * active undo requests.  The undo requests must appear in both xid and
> +     * size requests queues or neither.  In same transaction, there can be two
> +     * requests one for logged relations and another for unlogged relations.
> +     * So, the rollback hash table size should be equal to two request queues,
> +     * an error queue (currently this is same as request queue) and max

"the same"? I assume this intended to mean the same size?


> +     * backends. This will ensure that it won't get filled.
> +     */

How does this ensure anything?



> +static int
> +RemoveOldElemsFromXidQueue()

void.



> +/*
> + * Traverse the queue and remove dangling entries, if any.  The queue
> + * entry is considered dangling if the hash table doesn't contain the
> + * corresponding entry.
> + */
> +static int
> +RemoveOldElemsFromSizeQueue()

void.


We shouldn't need this in this form anymore after the rbtree conversion
- but because it again highlights on of my main complaints of all this
work: Don't have multiple copies of essentially equivalent non-trivial
functions. Especially not in the same file.  This is a near verbatim
copy of RemoveOldElemsFromXidQueue.  Without any explanations why it's
needed.

Even if you intended it only as a short-term workaround (e.g. for the
queues not sharing enough of a common base-layout to be able to share
one cleanup routine), at the very least you need to add a FIXME or such
explaining that this needs to be fixed.


> +/*
> + * Traverse the queue and remove dangling entries, if any.  The queue
> + * entry is considered dangling if the hash table doesn't contain the
> + * corresponding entry.
> + */
> +static int
> +RemoveOldElemsFromErrorQueue()
> +{

Another copy.


> +/*
> + * Returns true, if there is some valid request in the given queue, false,
> + * otherwise.
> + *
> + * It fills hkey with hash key corresponding to the nth element of the
> + * specified queue.
> + */
> +static bool
> +GetRollbackHashKeyFromQueue(UndoWorkerQueueType cur_queue, int n,
> +                            RollbackHashKey *hkey)
> +{
> +    if (cur_queue == XID_QUEUE)
> +    {
> +        UndoXidQueue *elem;
> +
> +        /* check if there is a work in the next queue */
> +        if (GetXidQueueSize() <= n)
> +            return false;
> +
> +        elem = (UndoXidQueue *) GetXidQueueNthElem(n);
> +        hkey->full_xid = elem->full_xid;
> +        hkey->start_urec_ptr = elem->start_urec_ptr;
> +    }

This is a slightly different form of copying code repeatedly. Instead of
passing in the queue type, this should get a pointer to the queue passed
in. Functions like Get*QueueSize(), GetErrorQueueNthElem() shouldn't
exist once for each queue type, they should be agnostic as to what the
queue type is, and accept a queue as the parameter.

Yes, there'd still be one additional queue type specific check, for the
time. But that's still a lot less copied code.


I also don't think it's a good idea to use RollbackHashKey as the
parameter/function name here. This function doesn't need to know that
it's for a hash table lookup.


> +/*
> + * Fetch the end urec pointer for the transaction and the undo request size.
> + *
> + * end_urecptr_out - This is an INOUT parameter. If end undo pointer is
> + * specified, we use the same to calculate the size.  Else, we calculate
> + * the end undo pointer and return the same.
> + *
> + * last_log_start_urec_ptr_out - This is an OUT parameter.  If a transaction
> + * writes undo records in multiple undo logs, this is set to the start undo
> + * record pointer of this transaction in the last log.  If the transaction
> + * writes undo records only in single undo log, it is set to start_urec_ptr.
> + * This value is used to update the rollback progress of the transaction in
> + * the last log.  Once, we have start location in last log, the start location
> + * in all the previous logs can be computed.  See execute_undo_actions for
> + * more details.
> + *
> + * XXX: We don't calculate the exact undo size.  We always skip the size of
> + * the last undo record (if not already discarded) from the calculation.  This
> + * optimization allows us to skip fetching an undo record for the most
> + * frequent cases where the end pointer and current start pointer belong to
> + * the same log.  A simple subtraction between them gives us the size.  In
> + * future this function can be modified if someone needs the exact undo size.
> + * As of now, we use this function to calculate the undo size for inserting
> + * in the pending undo actions in undo worker's size queue.
> + */
> +uint64
> +FindUndoEndLocationAndSize(UndoRecPtr start_urecptr,
> +                           UndoRecPtr *end_urecptr_out,
> +                           UndoRecPtr *last_log_start_urecptr_out,
> +                           FullTransactionId full_xid)
> +{

This really can't be the right place for this function.



> +/*
> + * Returns true, if we can push the rollback request to undo wrokers, false,

*workers

Also, it's not really queued to workers. Something like "can queue the
rollback request to be executed in the background" would be more
accurate afaict.


> + * otherwise.
> + */
> +static bool
> +CanPushReqToUndoWorker(UndoRecPtr start_urec_ptr, UndoRecPtr end_urec_ptr,
> +                       uint64 req_size)
> +{
> +    /*
> +     * This must be called after acquring RollbackRequestLock as we will check

*acquiring


> +     * the binary heaps which can change.
> +     */
> +    Assert(LWLockHeldByMeInMode(RollbackRequestLock, LW_EXCLUSIVE));
> +
> +    /*
> +     * We normally push the rollback request to undo workers if the size of
> +     * same is above a certain threshold.
> +     */
> +    if (req_size >= rollback_overflow_size * 1024 * 1024)
> +    {

Why is this being checked with the lock held? Seems like this should be
handled in a pre-check?


> +/*
> + * Initialize the hash-table and priority heap based queues for rollback
> + * requests in shared memory.
> + */
> +void
> +PendingUndoShmemInit(void)
> +{
> +    HASHCTL        info;
> +    bool        foundXidQueue = false;
> +    bool        foundSizeQueue = false;
> +    bool        foundErrorQueue = false;
> +    binaryheap *bh;
> +    UndoXidQueue *xid_elems;
> +    UndoSizeQueue *size_elems;
> +    UndoErrorQueue *error_elems;
> +
> +    MemSet(&info, 0, sizeof(info));
> +
> +    info.keysize = sizeof(TransactionId) + sizeof(UndoRecPtr);
> +    info.entrysize = sizeof(RollbackHashEntry);
> +    info.hash = tag_hash;
> +
> +    RollbackHT = ShmemInitHash("Undo Actions Lookup Table",
> +                               UndoRollbackHashTableSize(),
> +                               UndoRollbackHashTableSize(), &info,
> +                               HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE);
> +
> +    bh = binaryheap_allocate_shm("Undo Xid Binary Heap",
> +                                 pending_undo_queue_size,
> +                                 undo_age_comparator,
> +                                 NULL);
> +
> +    xid_elems = (UndoXidQueue *) ShmemInitStruct("Undo Xid Queue Elements",
> +                                                 UndoXidQueueElemsShmSize(),
> +                                                 &foundXidQueue);
> +
> +    Assert(foundXidQueue || !IsUnderPostmaster);
> +
> +    if (!IsUnderPostmaster)
> +        memset(xid_elems, 0, sizeof(UndoXidQueue));
> +
> +    InitXidQueue(bh, xid_elems);
> +
> +    bh = binaryheap_allocate_shm("Undo Size Binary Heap",
> +                                 pending_undo_queue_size,
> +                                 undo_size_comparator,
> +                                 NULL);
> +    size_elems = (UndoSizeQueue *) ShmemInitStruct("Undo Size Queue Elements",
> +                                                   UndoSizeQueueElemsShmSize(),
> +                                                   &foundSizeQueue);
> +    Assert(foundSizeQueue || !IsUnderPostmaster);
> +
> +    if (!IsUnderPostmaster)
> +        memset(size_elems, 0, sizeof(UndoSizeQueue));
> +
> +    InitSizeQueue(bh, size_elems);
> +
> +    bh = binaryheap_allocate_shm("Undo Error Binary Heap",
> +                                 pending_undo_queue_size,
> +                                 undo_err_time_comparator,
> +                                 NULL);
> +
> +    error_elems = (UndoErrorQueue *) ShmemInitStruct("Undo Error Queue Elements",
> +                                                     UndoErrorQueueElemsShmSize(),
> +                                                     &foundErrorQueue);
> +    Assert(foundErrorQueue || !IsUnderPostmaster);
> +
> +    if (!IsUnderPostmaster)
> +        memset(error_elems, 0, sizeof(UndoSizeQueue));
> +
> +    InitErrorQueue(bh, error_elems);

Hm. Aren't you overwriting previously initialized data here with memset
and Init*Queue, when using an EXEC_BACKEND build (e.g windows)?

I think all the initialization should only be done once, e.g. if
ShmemInitStruct() sets the *found to true. And then the other elements
should be asserted to also exist/not exist.

Also, what is the memset() here supposed to be doing? Aren't you just
memsetting() the first element in the queue? Since the queue is
dynamically sized, a static length (sizeof(UndoSizeQueue)) memset()
obviously cannot cannot initialize the members.

Also, this again is repeating code unnecessarily.


> +/* Insert the request into an error queue. */
> +bool
> +InsertRequestIntoErrorUndoQueue(volatile UndoRequestInfo * urinfo)
> +{
> +    RollbackHashEntry *rh;
> +
> +    LWLockAcquire(RollbackRequestLock, LW_EXCLUSIVE);
> +
> +    /* We can't insert into an error queue if it is already full. */
> +    if (GetErrorQueueSize() >= pending_undo_queue_size)
> +    {
> +        int            num_removed = 0;
> +
> +        /* Try to remove few elements */
> +        num_removed = RemoveOldElemsFromErrorQueue();

If we kept this, I'd rename these as Prune* and reword the comments to
match. This makes the code look like we're actually removing valid
entries.


> +/*
> + * Get the next set of pending rollback request for undo worker.

"set"? We only remove one, no?


> + * allow_peek - if true, peeks a few element from each queue to check whether
> + * any request matches current dbid.
> + * remove_from_queue - if true, picks an element from the queue whose dbid
> + * matches current dbid and remove it from the queue before returning the same
> + * to caller.
> + * urinfo - this is an OUT parameter that returns the details of undo request
> + * whose undo action is still pending.
> + * in_other_db_out - this is an OUT parameter.  If we've not found any work
> + * for current database, but there is work for some other database, we set
> + * this parameter as true.
> + */
> +bool
> +UndoGetWork(bool allow_peek, bool remove_from_queue, UndoRequestInfo *urinfo,
> +            bool *in_other_db_out)
> +{


> +        /*
> +         * If some undo worker is already processing the rollback request or
> +         * it is already processed, then we drop that request from the queue
> +         * and fetch the next entry from the queue.
> +         */
> +        if (!rh || UndoRequestIsInProgress(rh))
> +        {
> +            RemoveRequestFromQueue(cur_queue, 0);
> +            cur_undo_queue++;
> +            continue;
> +        }

When is it possible to hit the in-progress case?




> +        /*
> +         * We've found a work for some database.  If we don't want to remove
> +         * the request, we return from here and spawn a worker process to
> +         * apply the same.
> +         */
> +        if (!remove_from_queue)
> +        {
> +            bool        exists;
> +
> +            StartTransactionCommand();
> +            exists = dbid_exists(rh->dbid);
> +            CommitTransactionCommand();
> +
> +            /*
> +             * If the database doesn't exist, just remove the request since we
> +             * no longer need to apply the undo actions.
> +             */
> +            if (!exists)
> +            {
> +                RemoveRequestFromQueue(cur_queue, 0);
> +                RollbackHTRemoveEntry(rh->full_xid, rh->start_urec_ptr, true);
> +                cur_undo_queue++;
> +                continue;
> +            }

I still think there never should be a case in which this is
possible. Dropping a database ought to remove all the associated undo.


> +                /*
> +                 * The worker can perform this request if it is either not
> +                 * connected to any database or the request belongs to the
> +                 * same database to which it is connected.
> +                 */
> +                if ((MyDatabaseId == InvalidOid) ||
> +                    (MyDatabaseId != InvalidOid && MyDatabaseId == rh->dbid))
> +                {
> +                    /* found a work for current database */
> +                    if (in_other_db_out)
> +                        *in_other_db_out = false;
> +
> +                    /*
> +                     * Mark the undo request in hash table as in_progress so
> +                     * that other undo worker doesn't pick the same entry for
> +                     * rollback.
> +                     */
> +                    rh->status = UNDO_REQUEST_INPROGRESS;
> +
> +                    /* set the undo request info to process */
> +                    SetUndoRequestInfoFromRHEntry(urinfo, rh, cur_queue);
> +
> +                    /*
> +                     * Remove the request from queue so that other undo worker
> +                     * doesn't process the same entry.
> +                     */
> +                    RemoveRequestFromQueue(cur_queue, depth);
> +                    LWLockRelease(RollbackRequestLock);
> +                    return true;

Copy of code from above.


> +/*
> + * This function registers the rollback requests.
> + *
> + * Returns true, if the request is registered and will be processed by undo
> + * worker at some later point of time, false, otherwise in which case caller
> + * can process the undo request by itself.
> + *
> + * The caller may execute undo actions itself if the request is not already
> + * present in rollback hash table and can't be pushed to pending undo request
> + * queues.  The two reasons why request can't be pushed are (a) the size of
> + * request is smaller than a threshold and the request is not from discard
> + * worker, (b) the undo request queues are full.
> + *
> + * It is not advisable to apply the undo actions of a very large transaction
> + * in the foreground as that can lead to a delay in retruning the control back

*returning



> +/* different types of undo worker */
> +typedef enum
> +{
> +    XID_QUEUE = 0,
> +    SIZE_QUEUE = 1,
> +    ERROR_QUEUE
> +} UndoWorkerQueueType;

IMO odd to explictly number two elements of an enum, but not the third.

> +/* This is an entry for undo request queue that is sorted by xid. */
> +typedef struct UndoXidQueue
> +{
> +    FullTransactionId full_xid;
> +    UndoRecPtr    start_urec_ptr;
> +    Oid            dbid;
> +} UndoXidQueue;

As I said before, this isn't a queue, it's a queue entry.


> +/* Reset the undo request info */
> +#define ResetUndoRequestInfo(urinfo) \
> +( \
> +    (urinfo)->full_xid = InvalidFullTransactionId, \
> +    (urinfo)->start_urec_ptr = InvalidUndoRecPtr, \
> +    (urinfo)->end_urec_ptr = InvalidUndoRecPtr, \
> +    (urinfo)->last_log_start_urec_ptr = InvalidUndoRecPtr, \
> +    (urinfo)->dbid = InvalidOid, \
> +    (urinfo)->request_size = 0, \
> +    (urinfo)->undo_worker_queue = InvalidUndoWorkerQueue \
> +)
> +
> +/* set the undo request info from the rollback request */
> +#define SetUndoRequestInfoFromRHEntry(urinfo, rh, cur_queue) \
> +( \
> +    urinfo->full_xid = rh->full_xid, \
> +    urinfo->start_urec_ptr = rh->start_urec_ptr, \
> +    urinfo->end_urec_ptr = rh->end_urec_ptr, \
> +    urinfo->last_log_start_urec_ptr = rh->last_log_start_urec_ptr, \
> +    urinfo->dbid = rh->dbid, \
> +    urinfo->undo_worker_queue = cur_queue \
> +)

See my other complaint about such macros. Multiple evaluation hazard
etc. Also, the different formatting in two consecutively defined macros
is odd.




> +/*-------------------------------------------------------------------------
> + *
> + * undoaction.c
> + *      execute undo actions
> + *
> + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
> + * Portions Copyright (c) 1994, Regents of the University of California
> + *
> + * src/backend/access/undo/undoaction.c
> + *
> + * To apply the undo actions, we collect the undo records in bulk and try to

s/the//g

> + * process them together.  We ensure to update the transaction's progress at
> + * regular intervals so that after a crash we can skip already applied undo.
> + * The undo apply progress is updated in terms of the number of blocks
> + * processed.  Undo apply progress value XACT_APPLY_PROGRESS_COMPLETED
> + * indicates that all the undo is applied, XACT_APPLY_PROGRESS_NOT_STARTED
> + * indicates that no undo action has been applied yet and any other value
> + * indicates that we have applied undo partially and after crash recovery, we
> + * need to start processing the undo from the same location.
> + *-------------------------------------------------------------------------


> +/*
> + * UpdateUndoApplyProgress - Updates how far undo actions from a particular
> + * log have been applied while rolling back a transaction.  This progress is
> + * measured in terms of undo block number of the undo log till which the
> + * undo actions have been applied.
> + */
> +static void
> +UpdateUndoApplyProgress(UndoRecPtr progress_urec_ptr,
> +                        BlockNumber block_num)
> +{
> +    UndoLogCategory category;
> +    UndoRecordInsertContext context = {{0}};
> +
> +    category =
> +        UndoLogNumberGetCategory(UndoRecPtrGetLogNo(progress_urec_ptr));
> +
> +    /*
> +     * We don't need to update the progress for temp tables as they get
> +     * discraded after startup.
> +     */
> +    if (category == UNDO_TEMP)
> +        return;
> +
> +    BeginUndoRecordInsert(&context, category, 1, NULL);
> +
> +    /*
> +     * Prepare and update the undo apply progress in the transaction header.
> +     */
> +    UndoRecordPrepareApplyProgress(&context, progress_urec_ptr, block_num);
> +
> +    START_CRIT_SECTION();
> +
> +    /* Update the progress in the transaction header. */
> +    UndoRecordUpdateTransInfo(&context, 0);
> +
> +    /* WAL log the undo apply progress. */
> +    {
> +        XLogRecPtr    lsn;
> +        xl_undoapply_progress xlrec;
> +
> +        xlrec.urec_ptr = progress_urec_ptr;
> +        xlrec.progress = block_num;
> +
> +        XLogBeginInsert();
> +        XLogRegisterData((char *) &xlrec, sizeof(xlrec));
> +
> +        RegisterUndoLogBuffers(&context, 1);
> +        lsn = XLogInsert(RM_UNDOACTION_ID, XLOG_UNDO_APPLY_PROGRESS);
> +        UndoLogBuffersSetLSN(&context, lsn);
> +    }
> +
> +    END_CRIT_SECTION();
> +
> +    /* Release undo buffers. */
> +    FinishUndoRecordInsert(&context);
> +}

This whole prepare/execute split for updating apply pregress, and next
undo pointers makes no sense to me.


> +/*
> + * UndoAlreadyApplied - Retruns true, if the actions are already applied,

*returns


> + *    false, otherwise.
> + */
> +static bool
> +UndoAlreadyApplied(FullTransactionId full_xid, UndoRecPtr to_urecptr)
> +{
> +    UnpackedUndoRecord *uur = NULL;
> +    UndoRecordFetchContext    context;
> +
> +    /* Fetch the undo record. */
> +    BeginUndoFetch(&context);
> +    uur = UndoFetchRecord(&context, to_urecptr);
> +    FinishUndoFetch(&context);

Literally all the places that fetch a record, fetch them with exactly
this combination of calls. If that's the pattern, what do we gain by
this split?   Note that UndoBulkFetchRecord does *NOT* use an
UndoRecordFetchContext, for reasons that are beyond me.


> +static void
> +ProcessAndApplyUndo(FullTransactionId full_xid, UndoRecPtr from_urecptr,
> +                    UndoRecPtr to_urecptr, UndoRecPtr last_log_start_urec_ptr,
> +                    bool complete_xact)
> +{
> +    UndoRecInfo *urecinfo;
> +    UndoRecPtr    urec_ptr = from_urecptr;
> +    int            undo_apply_size;
> +
> +    /*
> +     * We choose maintenance_work_mem to collect the undo records for
> +     * rollbacks as most of the large rollback requests are done by
> +     * background worker which can be considered as maintainence operation.
> +     * However, we can introduce a new guc for this as well.
> +     */
> +    undo_apply_size = maintenance_work_mem * 1024L;
> +
> +    /*
> +     * Fetch the multiple undo records that can fit into undo_apply_size; sort
> +     * them and then rmgr specific callback to process them.  Repeat this
> +     * until we process all the records for the transaction being rolled back.
> +     */
> +    do
> +    {

use for(;;) or while (true).

> +        BlockNumber    progress_block_num = InvalidBlockNumber;
> +        int            i;
> +        int            nrecords;
> +        bool        log_switched = false;
> +        bool        rollback_completed = false;
> +        bool        update_progress = false;
> +        UndoRecPtr    progress_urec_ptr = InvalidUndoRecPtr;
> +        UndoRecInfo    *first_urecinfo;
> +        UndoRecInfo    *last_urecinfo;
> +
> +        CHECK_FOR_INTERRUPTS();
> +
> +        /*
> +         * Fetch multiple undo records at once.
> +         *
> +         * At a time, we only fetch the undo records from a single undo log.
> +         * Once, we process all the undo records from one undo log, we update

s/Once, we process/Once we have processed/

> +         * the last_log_start_urec_ptr and proceed to the previous undo log.
> +         */
> +        urecinfo = UndoBulkFetchRecord(&urec_ptr, last_log_start_urec_ptr,
> +                                       undo_apply_size, &nrecords, false);
> +
> +        /*
> +         * Since the rollback of this transaction is in-progress, there will be
> +         * at least one undo record which is not yet discarded.
> +         */
> +        Assert(nrecords > 0);
> +
> +        /*
> +         * Get the required information from first and last undo record before
> +         * we sort all the records.
> +         */
> +        first_urecinfo = &urecinfo[0];
> +        last_urecinfo = &urecinfo[nrecords - 1];
> +        if (last_urecinfo->uur->uur_info & UREC_INFO_LOGSWITCH)
> +        {
> +            UndoRecordLogSwitch *logswitch = last_urecinfo->uur->uur_logswitch;
> +
> +            /*
> +             * We have crossed the log boundary.  The rest of the undo for
> +             * this transaction is in some other log, the location of which
> +             * can be found from this record.  See commets atop undoaccess.c.

*comments


> +            /*
> +             * We need to save the undo record pointer of the last record from
> +             * previous undo log.  We will use the same as from location in
> +             * next iteration of bulk fetch.
> +             */
> +            Assert(UndoRecPtrIsValid(logswitch->urec_prevurp));
> +            urec_ptr = logswitch->urec_prevurp;
> +
> +            /*
> +             * The last fetched undo record corresponds to the first undo
> +             * record of the current log.  Once, the undo actions are performed
> +             * from this log, we've to mark the progress as completed.
> +             */
> +            progress_urec_ptr = last_urecinfo->urp;
> +
> +            /*
> +             * We also need to save the start location of this transaction in
> +             * previous log.  This will be used in the next iteration of bulk
> +             * fetch and updating progress location.
> +             */
> +            if (complete_xact)
> +            {
> +                Assert(UndoRecPtrIsValid(logswitch->urec_prevlogstart));
> +                last_log_start_urec_ptr = logswitch->urec_prevlogstart;
> +            }
> +
> +            /* We've to update the progress for the current log as completed. */
> +            update_progress = true;
> +        }
> +        else if (complete_xact)
> +        {
> +            if (UndoRecPtrIsValid(urec_ptr))
> +            {
> +                /*
> +                 * There are still some undo actions pending in this log.  So,
> +                 * just update the progress block number.
> +                 */
> +                progress_block_num = UndoRecPtrGetBlockNum(last_urecinfo->urp);
> +
> +                /*
> +                 * If we've not fetched undo records for more than one undo
> +                 * block, we can't update the progress block number.  Because,
> +                 * there can still be undo records in this block that needs to
> +                 * be applied for rolling back this transaction.
> +                 */
> +                if (UndoRecPtrGetBlockNum(first_urecinfo->urp) > progress_block_num)
> +                {
> +                    update_progress = true;
> +                    progress_urec_ptr = last_log_start_urec_ptr;
> +                }
> +            }
> +            else
> +            {
> +                /*
> +                 * Invalid urec_ptr indicates that we have executed all the undo
> +                 * actions for this transaction.  So, mark current log header
> +                 * as complete.
> +                 */
> +                Assert(last_log_start_urec_ptr == to_urecptr);
> +                rollback_completed = true;
> +                update_progress = true;
> +                progress_urec_ptr = last_log_start_urec_ptr;
> +            }
> +        }

This should be in a separate function.


> +        /* Free all undo records. */
> +        for (i = 0; i < nrecords; i++)
> +            UndoRecordRelease(urecinfo[i].uur);
> +
> +        /* Free urp array for the current batch of undo records. */
> +        pfree(urecinfo);

As noted elsewhere, I think that's the wrong memory management
strategy. We should be using a memory context for undo processing, and
then just reset it as a whole.  For one, freeing granularly is
inefficient. But more than that, it also means there's nothing to
prevent memory leaks here.


> +/*
> + * execute_undo_actions - Execute the undo actions

That's juts a restatement of the function name.


> + * full_xid - Transaction id that is getting rolled back.
> + * from_urecptr - undo record pointer from where to start applying undo
> + *                actions.
> + * to_urecptr    - undo record pointer up to which the undo actions need to be
> + *                applied.
> + * complete_xact    - true if rollback is for complete transaction.
> + */
> +void
> +execute_undo_actions(FullTransactionId full_xid, UndoRecPtr from_urecptr,
> +                     UndoRecPtr to_urecptr, bool complete_xact)
> +{

Why is this lower case, but ApplyUndo() camel case? How is a reader
supposed to know which one uses for what?


>  typedef struct TwoPhaseFileHeader
>  {
> @@ -927,6 +928,16 @@ typedef struct TwoPhaseFileHeader
>      uint16        gidlen;            /* length of the GID - GID follows the header */
>      XLogRecPtr    origin_lsn;        /* lsn of this record at origin node */
>      TimestampTz origin_timestamp;    /* time of prepare at origin node */
> +
> +    /*
> +     * We need the locations of the start and end undo record pointers when
> +     * rollbacks are to be performed for prepared transactions using undo-based
> +     * relations.  We need to store this information in the file as the user
> +     * might rollback the prepared transaction after recovery and for that we
> +     * need its start and end undo locations.
> +     */
> +    UndoRecPtr    start_urec_ptr[UndoLogCategories];
> +    UndoRecPtr    end_urec_ptr[UndoLogCategories];
>  } TwoPhaseFileHeader;

Why do we not need that knowledge for undo processing of a non-prepared
transaction?


> @@ -191,6 +195,16 @@ typedef struct TransactionStateData
>      bool        didLogXid;        /* has xid been included in WAL record? */
>      int            parallelModeLevel;    /* Enter/ExitParallelMode counter */
>      bool        chain;            /* start a new block after this one */
> +
> +    /* start and end undo record location for each log category */
> +    UndoRecPtr    startUrecPtr[UndoLogCategories]; /* this is 'to' location */
> +    UndoRecPtr    latestUrecPtr[UndoLogCategories]; /* this is 'from'
> +                                                   * location */
> +    /*
> +     * whether the undo request is registered to be processed by worker later?
> +     */
> +    bool        undoRequestResgistered[UndoLogCategories];
> +

s/Resgistered/Registered/


> @@ -2906,9 +2942,18 @@ CommitTransactionCommand(void)
>               * StartTransactionCommand didn't set the STARTED state
>               * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended
>               * by EndParallelWorkerTransaction(), not this function.
> +             *
> +             * TBLOCK_(SUB)UNDO means the error has occurred while applying
> +             * undo for a (sub)transaction.  We can't reach here as while

s/We can't reach here as while/This can't be reached while/



> +             * applying undo via top-level transaction, if we get an error,
> +             * then it is handled by ReleaseResourcesAndProcessUndo

Where and how does it handle that? Maybe I misunderstand what you mean?


> +            case TBLOCK_UNDO:
> +                /*
> +                 * We reach here when we got error while applying undo
> +                 * actions, so we don't want to again start applying it. Undo
> +                 * workers can take care of it.
> +                 *
> +                 * AbortTransaction is already done, still need to release
> +                 * locks and perform cleanup.
> +                 */
> +                ResetUndoActionsInfo();
> +                ResourceOwnerRelease(s->curTransactionOwner,
> +                                     RESOURCE_RELEASE_LOCKS,
> +                                     false,
> +                                     true);
> +                s->state = TRANS_ABORT;
>                  CleanupTransaction();

Hm. Why is it ok that we only perform that cleanup action? Either the
rest of potentially held resources will get cleaned up somehow as well,
in which case this ResourceOwnerRelease() ought to be redundant, or
we're potentially leaking important resources like buffer pins, relcache
references and whatnot here?


> +/*
> + * CheckAndRegisterUndoRequest - Register the request for applying undo
> + *    actions.
> + *
> + * It sets the transaction state to indicate whether the request is pushed to
> + * the background worker which is used later to decide whether to apply the
> + * actions.
> + *
> + * It is important to do this before marking the transaction as aborted in
> + * clog otherwise, it is quite possible that discard worker miss this rollback
> + * request from the computation of oldestXidHavingUnappliedUndo.  This is
> + * because it might do that computation before backend can register it in the
> + * rollback hash table.  So, neither oldestXmin computation will consider it
> + * nor the hash table pass would have that value.
> + */
> +static void
> +CheckAndRegisterUndoRequest()

(void)


> +{
> +    TransactionState s = CurrentTransactionState;
> +    bool    result;
> +    int        i;
> +
> +    /*
> +     * We don't want to apply the undo actions when we are already cleaning up
> +     * for FATAL error.  See ReleaseResourcesAndProcessUndo.
> +     */
> +    if (SemiCritSectionCount > 0)
> +    {
> +        ResetUndoActionsInfo();
> +        return;
> +    }

Wait what? Semi critical sections?



> +    for (i = 0; i < UndoLogCategories; i++)
> +    {
> +        /*
> +         * We can't push the undo actions for temp table to background
> +         * workers as the the temp tables are only accessible in the
> +         * backend that has created them.
> +         */
> +        if (i != UNDO_TEMP && UndoRecPtrIsValid(s->latestUrecPtr[i]))
> +        {
> +            result = RegisterUndoRequest(s->latestUrecPtr[i],
> +                                         s->startUrecPtr[i],
> +                                         MyDatabaseId,
> +                                         GetTopFullTransactionId());
> +            s->undoRequestResgistered[i] = result;
> +        }
> +    }

Give code like this I have a hard time seing what the point of having
separate queue entries for the different persistency levels is.




> +void
> +ReleaseResourcesAndProcessUndo(void)
> +{
> +    TransactionState s = CurrentTransactionState;
> +
> +    /*
> +     * We don't want to apply the undo actions when we are already cleaning up
> +     * for FATAL error.  One of the main reasons is that we might be already
> +     * processing undo actions for a (sub)transaction when we reach here
> +     * (for ex. error happens while processing undo actions for a
> +     * subtransaction).
> +     */
> +    if (SemiCritSectionCount > 0)
> +    {
> +        ResetUndoActionsInfo();
> +        return;
> +    }
> +
> +    if (!NeedToPerformUndoActions())
> +        return;
> +
> +    /*
> +     * State should still be TRANS_ABORT from AbortTransaction().
> +     */
> +    if (s->state != TRANS_ABORT)
> +        elog(FATAL, "ReleaseResourcesAndProcessUndo: unexpected state %s",
> +            TransStateAsString(s->state));
> +
> +    /*
> +     * Do abort cleanup processing before applying the undo actions.  We must
> +     * do this before applying the undo actions to remove the effects of
> +     * failed transaction.
> +     */
> +    if (IsSubTransaction())
> +    {
> +        AtSubCleanup_Portals(s->subTransactionId);
> +        s->blockState = TBLOCK_SUBUNDO;
> +    }
> +    else
> +    {
> +        AtCleanup_Portals();    /* now safe to release portal memory */
> +        AtEOXact_Snapshot(false, true); /* and release the transaction's
> +                                         * snapshots */

Why do precisely these actions need to be performed here?


> +        s->fullTransactionId = InvalidFullTransactionId;
> +        s->subTransactionId = TopSubTransactionId;
> +        s->blockState = TBLOCK_UNDO;
> +    }
> +
> +    s->state = TRANS_UNDO;

This seems guaranteed to constantly be out of date with other
modifications of the commit/abort sequence.



> +bool
> +ProcessUndoRequestForEachLogCat(FullTransactionId fxid, Oid dbid,
> +                                UndoRecPtr *end_urec_ptr, UndoRecPtr *start_urec_ptr,
> +                                bool *undoRequestResgistered, bool isSubTrans)
> +{
> +    UndoRequestInfo urinfo;
> +    int            i;
> +    uint32        save_holdoff;
> +    bool        success = true;
> +
> +    for (i = 0; i < UndoLogCategories; i++)
> +    {
> +        if (end_urec_ptr[i] && !undoRequestResgistered[i])
> +        {
> +            save_holdoff = InterruptHoldoffCount;
> +
> +            PG_TRY();
> +            {
> +                /* for subtransactions, we do partial rollback. */
> +                execute_undo_actions(fxid,
> +                                     end_urec_ptr[i],
> +                                     start_urec_ptr[i],
> +                                     !isSubTrans);
> +            }
> +            PG_CATCH();
> +            {
> +                /*
> +                 * Add the request into an error queue so that it can be
> +                 * processed in a timely fashion.
> +                 *
> +                 * If we fail to add the request in an error queue, then mark
> +                 * the entry status as invalid and continue to process the
> +                 * remaining undo requests if any.  This request will be later
> +                 * added back to the queue by discard worker.
> +                 */
> +                ResetUndoRequestInfo(&urinfo);
> +                urinfo.dbid = dbid;
> +                urinfo.full_xid = fxid;
> +                urinfo.start_urec_ptr = start_urec_ptr[i];
> +                if (!InsertRequestIntoErrorUndoQueue(&urinfo))
> +                    RollbackHTMarkEntryInvalid(urinfo.full_xid,
> +                                               urinfo.start_urec_ptr);
> +                /*
> +                 * Errors can reset holdoff count, so restore back.  This is
> +                 * required because this function can be called after holding
> +                 * interrupts.
> +                 */
> +                InterruptHoldoffCount = save_holdoff;
> +
> +                /* Send the error only to server log. */
> +                err_out_to_client(false);
> +                EmitErrorReport();
> +
> +                success = false;
> +
> +                /* We should never reach here when we are in a semi-critical-section. */
> +                Assert(SemiCritSectionCount == 0);

This seems entirely and completely broken. You can't just catch an
exception and continue. What if somebody held an lwlock when the error
was thrown? A buffer pin?  As far as I can tell the semi crit section
stuff doesn't protect you against anything here, because it's not used
exclusively.



> +to complete the requests by themselves.  There is an exception to it where when
> +error queue becomes full, we just mark the request as 'invalid' and continue to
> +process other requests if any.  The discard worker will find this errored
> +transaction at later point of time and again add it to the request queues.

You say it's an exception, but you do not explain why that exception is
there.

Nor why that's not a problem for:

> +We have the hard limit (proportional to the size of the rollback hash table)
> +for the number of transactions that can have pending undo.  This can help us
> +in computing the value of oldestXidHavingUnappliedUndo and allowing us not to
> +accumulate pending undo for a long time which will eventually block the
> +discard of undo.



> + * The main responsibility of the discard worker is to discard the undo log
> + * of transactions that are committed and all-visible or are rolledback.  It

*rolled back


> + * also registers the request for aborted transactions in the work queues.
> + * To know more about work queues, see undorequest.c.  It iterates through all
> + * the active logs one-by-one and try to discard the transactions that are old
> + * enough to matter.
> + *
> + * For tranasctions that spans across multiple logs, the log for committed and

*transactions


> + * all-visible transactions are discarded seprately for each log.  This is

*separately


> + * possible as the transactions that span across logs have separate transaction
> + * header for each log.  For aborted transactions, we try to process the actions

*tranaction headers


> + * of entire transaction at one-shot as we need to perform the actions starting

*an entire transaction in one shot

> + * from end location to start location.  However, it is possbile that the later

*possible

> + * portion of transaction that is overflowed into a separate log can be processed

*a transaction

> + * separately if we encounter the corresponding log first.  If we want we can
> + * combine the log for processing in that case as well, but there is no clear
> + * advantage of the same.

*of doing so



> +void
> +DiscardWorkerRegister(void)
> +{
> +    BackgroundWorker bgw;
> +
> +    memset(&bgw, 0, sizeof(bgw));
> +    bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
> +        BGWORKER_BACKEND_DATABASE_CONNECTION;

Why is a database needed?


> +    /*
> +     * Scan all the undo logs and intialize the rollback hash table with all
> +     * the pending rollback requests.  This need to be done as a first step
> +     * because only after this the transactions will be allowed to write new
> +     * undo.  See comments atop UndoLogProcess.
> +     */
> +    UndoLogProcess();

Too generic name.



> @@ -668,6 +676,50 @@ PrepareUndoInsert(UndoRecordInsertContext *context,
>      UndoCompressionInfo *compression_info =
>      &context->undo_compression_info[context->alloc_context.category];
>
> +    if (!InRecovery && IsUnderPostmaster)
> +    {
> +        int try_count = 0;
> +
> +        /*
> +         * If we are not in a recovery and not in a single-user-mode, then undo

s/in a single-user-mode/in single-user-mode/ (although I'd also remove
the dashes)


> +         * generation should not be allowed until we have scanned all the undo
> +         * logs and initialized the hash table with all the aborted
> +         * transaction entries.  See detailed comments in UndoLogProcess.
> +         */
> +        while (!ProcGlobal->rollbackHTInitialized)
> +        {
> +            /* Error out after trying for one minute. */
> +            if (try_count > ROLLBACK_HT_INIT_WAIT_TRY)
> +                ereport(ERROR,
> +                        (errcode(ERRCODE_E_R_E_MODIFYING_SQL_DATA_NOT_PERMITTED),
> +                         errmsg("rollback hash table is not yet initialized, wait for sometime and try again")));
> +
> +            /*
> +             * Rollback hash table is not yet intialized, sleep for 1 second
> +             * and try again.
> +             */
> +            pg_usleep(1000000L);
> +            try_count++;
> +        }
> +    }

I think it's wrong to do this here. We shouldn't open the database for
writes before having performed sufficient initialization. If done like
that, we shouldn't ever get here.  Without such sequencing it's actually
not possible to bring up a standby and allow writes in a normal way -
the first few transactions will just fail. That's not ok.

Nor are new retry loops with sleeps ok IMO.


> +    /*
> +     * If the rollback hash table is already full (excluding one additional
> +     * space for each backend) then don't allow to generate any new undo until
> +     * we apply some of the pending requests and create some space in the hash
> +     * table to accept new rollback requests.  Leave the enough slots in the
> +     * hash table so that there is space for all the backends to register at
> +     * least one request.  This is to protect the situation where one backend
> +     * keep consuming slots reserve for the other backends and suddenly there
> +     * is concurrent undo request from all the backends.  So we always keep
> +     * the space reserve for MaxBackends.
> +     */
> +    if (ProcGlobal->xactsHavingPendingUndo >
> +        (UndoRollbackHashTableSize() - MaxBackends))
> +        ereport(ERROR,
> +                (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
> +                 errmsg("max limit for pending rollback request has reached, wait for sometime and try again")));
> +

Why do we need to this work every time we're inserting undo? shouldn't
that just happen once, when first accessing an undo log in a
transaction?


> +    /* There might not be any undo log and hibernation might be needed. */
> +    *hibernate = true;
> +
> +    StartTransactionCommand();

Why do we need this? I assume it's so we can have a resource owner?


Out of energy.


Greetings,

Andres Freund



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Konstantin Knizhnik
Дата:
Сообщение: Re: Adding column "mem_usage" to view pg_prepared_statements
Следующее
От: Kyotaro Horiguchi
Дата:
Сообщение: Re: BUG #15938: Corrupted WAL segment after crash recovery