diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 2fbfadd9f0..9fc00bde9e 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3874,12 +3874,6 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_RELATION_MAP_WRITE: event_name = "RelationMapWrite"; break; - case WAIT_EVENT_REORDER_BUFFER_READ: - event_name = "ReorderBufferRead"; - break; - case WAIT_EVENT_REORDER_BUFFER_WRITE: - event_name = "ReorderBufferWrite"; - break; case WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ: event_name = "ReorderLogicalMappingRead"; break; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2cfdf1c9ac..f958d27761 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -71,6 +71,7 @@ #include "replication/slot.h" #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ #include "storage/bufmgr.h" +#include "storage/buffile.h" #include "storage/fd.h" #include "storage/sinval.h" #include "utils/builtins.h" @@ -109,7 +110,7 @@ typedef struct ReorderBufferIterTXNEntry XLogRecPtr lsn; ReorderBufferChange *change; ReorderBufferTXN *txn; - int fd; + TransientBufFile *file; XLogSegNo segno; } ReorderBufferIterTXNEntry; @@ -192,9 +193,11 @@ static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTX static void ReorderBufferCheckSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, - int fd, ReorderBufferChange *change); + TransientBufFile *file, ReorderBufferChange *change); +static void ReorderBufferWriteData(TransientBufFile *file, void *ptr, size_t size, + ReorderBufferTXN *txn); static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno); + TransientBufFile **file, XLogSegNo *segno); static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, char *change); static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); @@ -468,8 +471,8 @@ ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple) Oid * ReorderBufferGetRelids(ReorderBuffer *rb, int nrelids) { - Oid *relids; - Size alloc_len; + Oid *relids; + Size alloc_len; alloc_len = sizeof(Oid) * nrelids; @@ -988,7 +991,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) for (off = 0; off < state->nr_txns; off++) { - state->entries[off].fd = -1; + state->entries[off].file = NULL; state->entries[off].segno = 0; } @@ -1013,7 +1016,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) { /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, txn); - ReorderBufferRestoreChanges(rb, txn, &state->entries[off].fd, + ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file, &state->entries[off].segno); } @@ -1043,7 +1046,7 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn) /* serialize remaining changes */ ReorderBufferSerializeTXN(rb, cur_txn); ReorderBufferRestoreChanges(rb, cur_txn, - &state->entries[off].fd, + &state->entries[off].file, &state->entries[off].segno); } cur_change = dlist_head_element(ReorderBufferChange, node, @@ -1124,7 +1127,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) dlist_delete(&change->node); dlist_push_tail(&state->old_change, &change->node); - if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->fd, + if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, &state->entries[off].segno)) { /* successfully restored changes from disk */ @@ -1163,8 +1166,8 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb, for (off = 0; off < state->nr_txns; off++) { - if (state->entries[off].fd != -1) - CloseTransientFile(state->entries[off].fd); + if (state->entries[off].file) + BufFileCloseTransient(state->entries[off].file); } /* free memory we might have "leaked" in the last *Next call */ @@ -1327,8 +1330,8 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) else { /* - * Maybe we already saw this tuple before in this transaction, - * but if so it must have the same cmin. + * Maybe we already saw this tuple before in this transaction, but + * if so it must have the same cmin. */ Assert(ent->cmin == change->data.tuplecid.cmin); @@ -2254,7 +2257,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { dlist_iter subtxn_i; dlist_mutable_iter change_i; - int fd = -1; + TransientBufFile *file = NULL; XLogSegNo curOpenSegNo = 0; Size spilled = 0; @@ -2281,13 +2284,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) * store in segment in which it belongs by start lsn, don't split over * multiple segments tho */ - if (fd == -1 || + if (file == NULL || !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size)) { char path[MAXPGPATH]; - if (fd != -1) - CloseTransientFile(fd); + if (file) + BufFileCloseTransient(file); XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size); @@ -2299,16 +2302,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) curOpenSegNo); /* open segment, create it if necessary */ - fd = OpenTransientFile(path, - O_CREAT | O_WRONLY | O_APPEND | PG_BINARY); - - if (fd < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", path))); + file = BufFileOpenTransient(path, + O_CREAT | O_WRONLY | O_APPEND | PG_BINARY); } - ReorderBufferSerializeChange(rb, txn, fd, change); + ReorderBufferSerializeChange(rb, txn, file, change); dlist_delete(&change->node); ReorderBufferReturnChange(rb, change); @@ -2320,8 +2318,8 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->nentries_mem = 0; txn->serialized = true; - if (fd != -1) - CloseTransientFile(fd); + if (file) + BufFileCloseTransient(file); } /* @@ -2329,15 +2327,13 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) */ static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, - int fd, ReorderBufferChange *change) + TransientBufFile *file, ReorderBufferChange *change) { - ReorderBufferDiskChange *ondisk; + ReorderBufferDiskChange hdr; Size sz = sizeof(ReorderBufferDiskChange); - ReorderBufferSerializeReserve(rb, sz); - - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - memcpy(&ondisk->change, change, sizeof(ReorderBufferChange)); + memcpy((char *) &hdr + offsetof(ReorderBufferDiskChange, change), + change, sizeof(ReorderBufferChange)); switch (change->action) { @@ -2347,7 +2343,6 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_DELETE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: { - char *data; ReorderBufferTupleBuf *oldtup, *newtup; Size oldlen = 0; @@ -2370,66 +2365,55 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, sz += newlen; } - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, sizeof(ReorderBufferDiskChange), + txn); if (oldlen) { - memcpy(data, &oldtup->tuple, sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - memcpy(data, oldtup->tuple.t_data, oldlen); - data += oldlen; + ReorderBufferWriteData(file, &oldtup->tuple, + sizeof(HeapTupleData), txn); + ReorderBufferWriteData(file, oldtup->tuple.t_data, oldlen, + txn); } if (newlen) { - memcpy(data, &newtup->tuple, sizeof(HeapTupleData)); - data += sizeof(HeapTupleData); - - memcpy(data, newtup->tuple.t_data, newlen); - data += newlen; + ReorderBufferWriteData(file, &newtup->tuple, + sizeof(HeapTupleData), txn); + ReorderBufferWriteData(file, newtup->tuple.t_data, newlen, + txn); } break; } case REORDER_BUFFER_CHANGE_MESSAGE: { - char *data; Size prefix_size = strlen(change->data.msg.prefix) + 1; sz += prefix_size + change->data.msg.message_size + sizeof(Size) + sizeof(Size); - ReorderBufferSerializeReserve(rb, sz); - - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, + sizeof(ReorderBufferDiskChange), + txn); /* write the prefix including the size */ - memcpy(data, &prefix_size, sizeof(Size)); - data += sizeof(Size); - memcpy(data, change->data.msg.prefix, - prefix_size); - data += prefix_size; + ReorderBufferWriteData(file, &prefix_size, sizeof(Size), txn); + ReorderBufferWriteData(file, change->data.msg.prefix, + prefix_size, txn); /* write the message including the size */ - memcpy(data, &change->data.msg.message_size, sizeof(Size)); - data += sizeof(Size); - memcpy(data, change->data.msg.message, - change->data.msg.message_size); - data += change->data.msg.message_size; + ReorderBufferWriteData(file, &change->data.msg.message_size, + sizeof(Size), txn); + ReorderBufferWriteData(file, change->data.msg.message, + change->data.msg.message_size, txn); break; } case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: { Snapshot snap; - char *data; snap = change->data.snapshot; @@ -2438,49 +2422,37 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(TransactionId) * snap->subxcnt ; - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, + sizeof(ReorderBufferDiskChange), txn); - memcpy(data, snap, sizeof(SnapshotData)); - data += sizeof(SnapshotData); + ReorderBufferWriteData(file, snap, sizeof(SnapshotData), txn); if (snap->xcnt) - { - memcpy(data, snap->xip, - sizeof(TransactionId) * snap->xcnt); - data += sizeof(TransactionId) * snap->xcnt; - } + ReorderBufferWriteData(file, snap->xip, + sizeof(TransactionId) * snap->xcnt, + txn); if (snap->subxcnt) - { - memcpy(data, snap->subxip, - sizeof(TransactionId) * snap->subxcnt); - data += sizeof(TransactionId) * snap->subxcnt; - } + ReorderBufferWriteData(file, snap->subxip, + sizeof(TransactionId) * snap->subxcnt, + txn); break; } case REORDER_BUFFER_CHANGE_TRUNCATE: { - Size size; - char *data; + Size size; /* account for the OIDs of truncated relations */ size = sizeof(Oid) * change->data.truncate.nrelids; sz += size; - /* make sure we have enough space */ - ReorderBufferSerializeReserve(rb, sz); - - data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); - /* might have been reallocated above */ - ondisk = (ReorderBufferDiskChange *) rb->outbuf; - - memcpy(data, change->data.truncate.relids, size); - data += size; + hdr.size = sz; + ReorderBufferWriteData(file, &hdr, sizeof(ReorderBufferDiskChange), + txn); + ReorderBufferWriteData(file, change->data.truncate.relids, size, + txn); break; } case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: @@ -2489,27 +2461,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* ReorderBufferChange contains everything important */ break; } +} - ondisk->size = sz; - - errno = 0; - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE); - if (write(fd, rb->outbuf, ondisk->size) != ondisk->size) - { - int save_errno = errno; - - CloseTransientFile(fd); - - /* if write didn't set errno, assume problem is no disk space */ - errno = save_errno ? save_errno : ENOSPC; +/* + * Wrapper for BufFileWriteTransient() that raises ERROR if the whole chunk + * was not written. XXX Should this be a macro? + */ +static void +ReorderBufferWriteData(TransientBufFile *file, void *ptr, size_t size, + ReorderBufferTXN *txn) +{ + if (BufFileWriteTransient(file, ptr, size) != size) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write to data file for XID %u: %m", txn->xid))); - } - pgstat_report_wait_end(); - - Assert(ondisk->change.action == change->action); } /* @@ -2517,7 +2483,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, */ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, - int *fd, XLogSegNo *segno) + TransientBufFile **file, XLogSegNo *segno) { Size restored = 0; XLogSegNo last_segno; @@ -2545,7 +2511,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, int readBytes; ReorderBufferDiskChange *ondisk; - if (*fd == -1) + if (*file == NULL) { char path[MAXPGPATH]; @@ -2562,18 +2528,13 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, *segno); - *fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); - if (*fd < 0 && errno == ENOENT) + *file = BufFileOpenTransient(path, O_RDONLY | PG_BINARY); + if (*file == NULL) { - *fd = -1; + Assert(errno == ENOENT); (*segno)++; continue; } - else if (*fd < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not open file \"%s\": %m", - path))); } /* @@ -2582,22 +2543,16 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, * end of this file. */ ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf, sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = BufFileReadTransient(*file, rb->outbuf, sizeof(ReorderBufferDiskChange)); /* eof */ if (readBytes == 0) { - CloseTransientFile(*fd); - *fd = -1; + BufFileCloseTransient(*file); + *file = NULL; (*segno)++; continue; } - else if (readBytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); else if (readBytes != sizeof(ReorderBufferDiskChange)) ereport(ERROR, (errcode_for_file_access(), @@ -2611,16 +2566,10 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, sizeof(ReorderBufferDiskChange) + ondisk->size); ondisk = (ReorderBufferDiskChange *) rb->outbuf; - pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_READ); - readBytes = read(*fd, rb->outbuf + sizeof(ReorderBufferDiskChange), - ondisk->size - sizeof(ReorderBufferDiskChange)); - pgstat_report_wait_end(); + readBytes = BufFileReadTransient(*file, rb->outbuf + sizeof(ReorderBufferDiskChange), + ondisk->size - sizeof(ReorderBufferDiskChange)); - if (readBytes < 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read from reorderbuffer spill file: %m"))); - else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) + if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", @@ -2767,7 +2716,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, /* the base struct contains all the data, easy peasy */ case REORDER_BUFFER_CHANGE_TRUNCATE: { - Oid *relids; + Oid *relids; relids = ReorderBufferGetRelids(rb, change->data.truncate.nrelids); diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index c2c445dbf4..6c426c0509 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -41,6 +41,8 @@ #include "postgres.h" +#include + #include "executor/instrument.h" #include "miscadmin.h" #include "pgstat.h" @@ -93,6 +95,27 @@ struct BufFile PGAlignedBlock buffer; }; +/* + * Buffered variant of a transient file. Unlike BufFile this is simpler in + * several ways: 1) it's not split into segments, 2) there's no need of seek, + * 3) there's no need to combine read and write access. + */ +struct TransientBufFile +{ + /* The underlying file. */ + char *path; + int fileFlags; + int fd; + + off_t offset; /* next read/write position in file */ + int pos; /* next read/write position in buffer */ + int nbytes; /* total # of valid bytes in buffer */ + + bool dirty; /* unsaved data in the buffer? */ + + PGAlignedBlock buffer; +}; + static BufFile *makeBufFileCommon(int nfiles); static BufFile *makeBufFile(File firstfile); static void extendBufFile(BufFile *file); @@ -101,6 +124,9 @@ static void BufFileDumpBuffer(BufFile *file); static int BufFileFlush(BufFile *file); static File MakeNewSharedSegment(BufFile *file, int segment); +static void BufFileLoadBufferTransient(TransientBufFile *file); +static void BufFileDumpBufferTransient(TransientBufFile *file); + /* * Create BufFile and perform the common initialization. */ @@ -831,3 +857,267 @@ BufFileAppend(BufFile *target, BufFile *source) return startBlock; } + +/* + * Open TransientBufFile at given path or create one if it does not + * exist. User will be allowed either to write to the file or to read from it, + * according to fileFlags, but not both. + */ +TransientBufFile * +BufFileOpenTransient(const char *path, int fileFlags) +{ + TransientBufFile *file; + int fd; + + /* Either read or write mode, but not both. */ + Assert((fileFlags & O_RDWR) == 0); + + fd = OpenTransientFile(path, fileFlags); + if (fd < 0) + { + /* + * If caller wants to read from file and the file is not there, he + * should be able to handle the condition on his own. + * + * XXX Shouldn't we always let caller evaluate errno? + */ + if (errno == ENOENT && (fileFlags & O_RDONLY)) + return NULL; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + } + + file = (TransientBufFile *) palloc(sizeof(TransientBufFile)); + + file->path = pstrdup(path); + file->fileFlags = fileFlags; + file->fd = fd; + + file->dirty = false; + + file->pos = 0; + file->nbytes = 0; + + if (file->fileFlags & O_APPEND) + { + /* Position the buffer at the end of the file. */ + errno = 0; + file->offset = lseek(file->fd, 0, SEEK_END); + } + else + { + /* Load the initial part of the file. */ + file->offset = 0L; + BufFileLoadBufferTransient(file); + } + + if (errno > 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not initialize TransientBufFile for file \"%s\": %m", + path))); + + return file; +} + +/* + * Close a TransientBufFile. + */ +void +BufFileCloseTransient(TransientBufFile *file) +{ + /* Flush any unwritten data. */ + if (file->fileFlags & O_WRONLY && file->dirty && file->nbytes > 0) + { + BufFileDumpBufferTransient(file); + + /* + * Caller of BufFileWriteTransient() recognizes the failure to flush + * buffer by the returned value, however this function has no return + * code. + */ + if (file->dirty) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not flush file \"%s\": %m", file->path))); + } + + /* + * XXX Should we raise ERROR if non-zero code is returned here, or is it + * enough if the failure cause WARNING during transaction commit? + */ + CloseTransientFile(file->fd); + + pfree(file->path); + pfree(file); +} + +/* + * Like BufFileWrite() except it receives pointer to TransientBufFile. + * + * TODO Reuse the code of BufFileWrite() in a better way than copy & paste. + */ +size_t +BufFileWriteTransient(TransientBufFile *file, void *ptr, size_t size) +{ + size_t nwritten = 0; + size_t nthistime; + + Assert((file->fileFlags & O_WRONLY)); + + while (size > 0) + { + if (file->pos >= BLCKSZ) + { + Assert(file->dirty); + + /* Buffer full, dump it out */ + BufFileDumpBufferTransient(file); + if (file->dirty) + break; /* I/O error */ + } + + nthistime = BLCKSZ - file->pos; + if (nthistime > size) + nthistime = size; + Assert(nthistime > 0); + + memcpy(file->buffer.data + file->pos, ptr, nthistime); + + file->dirty = true; + file->pos += nthistime; + if (file->nbytes < file->pos) + file->nbytes = file->pos; + ptr = (void *) ((char *) ptr + nthistime); + size -= nthistime; + nwritten += nthistime; + } + + return nwritten; +} + +/* + * Like BufFileRead() except it receives pointer to TransientBufFile. + * + * TODO Reuse the code of BufFileRead() in a better way than copy & paste. + */ +size_t +BufFileReadTransient(TransientBufFile *file, void *ptr, size_t size) +{ + size_t nread = 0; + size_t nthistime; + + Assert((file->fileFlags & O_WRONLY) == 0); + + while (size > 0) + { + if (file->pos >= file->nbytes) + { + /* Try to load more data into buffer. */ + file->offset += file->pos; + file->pos = 0; + file->nbytes = 0; + BufFileLoadBufferTransient(file); + if (file->nbytes <= 0) + break; /* no more data available */ + } + + nthistime = file->nbytes - file->pos; + if (nthistime > size) + nthistime = size; + Assert(nthistime > 0); + + memcpy(ptr, file->buffer.data + file->pos, nthistime); + + file->pos += nthistime; + ptr = (void *) ((char *) ptr + nthistime); + size -= nthistime; + nread += nthistime; + } + + return nread; +} + +/* + * Load some data into buffer, if possible, starting from file->offset. At + * call, must have dirty = false, pos and nbytes = 0. On exit, nbytes is + * number of bytes loaded. + */ +static void +BufFileLoadBufferTransient(TransientBufFile *file) +{ + Assert(!file->dirty); + Assert(file->pos == 0 && file->nbytes == 0); + +retry: + + /* + * Read whatever we can get, up to a full bufferload. + */ + errno = 0; + pgstat_report_wait_start(WAIT_EVENT_BUFFILE_READ); + file->nbytes = pg_pread(file->fd, file->buffer.data, + sizeof(file->buffer), file->offset); + pgstat_report_wait_end(); + + if (file->nbytes < 0) + { + /* TODO The W32 specific code, see FileWrite. */ + + /* OK to retry if interrupted */ + if (errno == EINTR) + goto retry; + + return; /* failed to write */ + } + + if (file->nbytes < 0) + file->nbytes = 0; + /* we choose not to advance offset here */ +} + +/* + * Write buffer contents to disk. + */ +static void +BufFileDumpBufferTransient(TransientBufFile *file) +{ + int nwritten; + + /* This function should only be needed during write access ... */ + Assert(file->fileFlags & O_WRONLY); + + /* ... and if there's some work to do. */ + Assert(file->dirty); + Assert(file->nbytes > 0); + +retry: + errno = 0; + pgstat_report_wait_start(WAIT_EVENT_BUFFILE_WRITE); + nwritten = pg_pwrite(file->fd, file->buffer.data, file->nbytes, + file->offset); + pgstat_report_wait_end(); + + /* if write didn't set errno, assume problem is no disk space */ + if (nwritten != file->nbytes && errno == 0) + errno = ENOSPC; + + if (nwritten < 0) + { + /* TODO The W32 specific code, see FileWrite. */ + + /* OK to retry if interrupted */ + if (errno == EINTR) + goto retry; + + return; /* failed to write */ + } + + file->dirty = false; + + file->offset += nwritten; + file->pos = 0; + file->nbytes = 0; +} diff --git a/src/include/pgstat.h b/src/include/pgstat.h index ea6cc8b560..cbaeccced5 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -907,8 +907,6 @@ typedef enum WAIT_EVENT_RELATION_MAP_READ, WAIT_EVENT_RELATION_MAP_SYNC, WAIT_EVENT_RELATION_MAP_WRITE, - WAIT_EVENT_REORDER_BUFFER_READ, - WAIT_EVENT_REORDER_BUFFER_WRITE, WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ, WAIT_EVENT_REPLICATION_SLOT_READ, WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC, diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 1fba404fe2..8b2845e33f 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -28,9 +28,13 @@ #include "storage/sharedfileset.h" -/* BufFile is an opaque type whose details are not known outside buffile.c. */ +/* + * BufFile and TransientBufFile are opaque types whose details are not known + * outside buffile.c. + */ typedef struct BufFile BufFile; +typedef struct TransientBufFile TransientBufFile; /* * prototypes for functions in buffile.c @@ -51,4 +55,11 @@ extern void BufFileExportShared(BufFile *file); extern BufFile *BufFileOpenShared(SharedFileSet *fileset, const char *name); extern void BufFileDeleteShared(SharedFileSet *fileset, const char *name); +extern TransientBufFile *BufFileOpenTransient(const char *path, int fileFlags); +extern void BufFileCloseTransient(TransientBufFile *file); +extern size_t BufFileWriteTransient(TransientBufFile *file, void *ptr, + size_t size); +extern size_t BufFileReadTransient(TransientBufFile *file, void *ptr, + size_t size); + #endif /* BUFFILE_H */