Re: [HACKERS] WAL logging problem in 9.4.3?

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: [HACKERS] WAL logging problem in 9.4.3?
Дата
Msg-id 20190402.195406.20162559.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: [HACKERS] WAL logging problem in 9.4.3?  (Noah Misch <noah@leadboat.com>)
Ответы Re: [HACKERS] WAL logging problem in 9.4.3?  (Robert Haas <robertmhaas@gmail.com>)
Список pgsql-hackers
Thank you for reviewing.

At Sun, 31 Mar 2019 15:31:58 -0700, Noah Misch <noah@leadboat.com> wrote in <20190331223158.GB891537@rfd.leadboat.com>
> On Sun, Mar 10, 2019 at 07:27:08PM -0700, Noah Misch wrote:
> > On Mon, Mar 04, 2019 at 12:24:48PM +0900, Kyotaro HORIGUCHI wrote:
> > > +/*
> > > + * RelationRemovePendingSync() -- remove pendingSync entry for a relation
> > > + */
> > > +void
> > > +RelationRemovePendingSync(Relation rel)
> > 
> > What is the coding rule for deciding when to call this?  Currently, only
> > ATExecSetTableSpace() calls this.  CLUSTER doesn't call it, despite behaving
> > much like ALTER TABLE SET TABLESPACE behaves.
> 
> This question still applies.  (The function name did change from
> RelationRemovePendingSync() to RelationInvalidateWALRequirements().)

It is called for heap_register_sync()'ed relations to avoid
syncing useless or trying to sync nonexistent files. I modifed
all CLUSTER, COPY FROM, CREATE AS, REFRESH MATVIEW and SET
TABLESPACE uses the function. (The function is renamed to
table_relation_invalidate_walskip()).

I noticed that heap_register_sync and friends are now a kind of
Table-AM function. So I added .relation_register_walskip and
.relation_invalidate_walskip in TableAMRoutine and moved the
heap_register_sync stuff as heapam_relation_register_walskip and
friends. .finish_bulk_insert() is modified to be used only
WAL-skip is active on the relation. (0004, 0005) But I'm not sure
that is the right direction.

(RelWALRequirements is renamed to RelWALSkip)

The change made smgrFinishBulkInsert (known as smgrDoPendingSync)
need to call a tableam interface. Relation is required to call it
in the designed way but relcache cannot live until there. In the
attached patch 0005, a new member TableAmRoutine *tableam is
added to RelWalSkip and calls finish_bulk_insert() via the
tableAm. But I'm quite uneasy with that...

> On Mon, Mar 25, 2019 at 09:32:04PM +0900, Kyotaro HORIGUCHI wrote:
> > At Wed, 20 Mar 2019 22:48:35 -0700, Noah Misch <noah@leadboat.com> wrote in
<20190321054835.GB3842129@rfd.leadboat.com>
> Again, I do want them in the code.  Please restore them, but use a mechanism
> like CACHE_elog() so they're built only if one defines a preprocessor symbol.

Ah, sorry. I restored the messages using STORAGE_elog(). I also
needed this. (SMGR_ might be better but I'm not sure.)

> On Tue, Mar 26, 2019 at 04:35:07PM +0900, Kyotaro HORIGUCHI wrote:
> > +    smgrProcessWALRequirementInval(s->subTransactionId, false);
> 
> The smgrProcessWALRequirementInval() calls almost certainly belong in
> CommitSubTransaction() and AbortSubTransaction(), not in these functions.  By
> doing it here, you'd get the wrong behavior in a subtransaction created via a
> plpgsql "BEGIN ... EXCEPTION WHEN OTHERS THEN" block.

Thanks. Moved it to AtSubAbort_smgr() and AtSubCommit_smgr(). (0005)

> > +/*
> > + * Process pending invalidation of WAL requirements happened in the
> > + * subtransaction
> > + */
> > +void
> > +smgrProcessWALRequirementInval(SubTransactionId sxid, bool isCommit)
> > +{
> > +    HASH_SEQ_STATUS status;
> > +    RelWalRequirement *walreq;
> > +
> > +    if (!walRequirements)
> > +        return;
> > +
> > +    /* We expect that we don't have walRequirements in almost all cases */
> > +    hash_seq_init(&status, walRequirements);
> > +
> > +    while ((walreq = hash_seq_search(&status)) != NULL)
> > +    {
> > +        /* remove useless entry */
> > +        if (isCommit ?
> > +            walreq->invalidate_sxid == sxid :
> > +            walreq->create_sxid == sxid)
> > +            hash_search(walRequirements, &walreq->relnode, HASH_REMOVE, NULL);
> 
> Do not remove entries during subtransaction commit, because a parent
> subtransaction might still abort.  See other CommitSubTransaction() callees
> for examples of correct subtransaction handling.  AtEOSubXact_Files() is one
> simple example.

Thanks. smgrProcessWALSkipInval() (0005) is changed so that:

 - If a RelWalSkip entry is created in aborted subtransaction,
   remove it.

 - If a RelWalSkip entry is created then invalidated in committed
   subtransaction, remove it.

 - If a RelWalSkip entry is created and committed, change the
   creator subtransaction to the parent subtransaction.

 - If a RelWalSkip entry is create elsewhere and invalidated in
   committed subtransaction, move the invalidation to the parent
   subtransaction.

 - If a RelWalSkip entry is created elsewhere and invalidated in
   aborted subtransaction, cancel the invalidation.

Test is added as test3a2 and test3a3. (0001)

> > @@ -3567,15 +3602,26 @@ heap_update
> >           */
> >          if (RelationIsAccessibleInLogicalDecoding(relation))
> >          {
> > -            log_heap_new_cid(relation, &oldtup);
> > -            log_heap_new_cid(relation, heaptup);
> > +            if (oldbuf_needs_wal)
> > +                log_heap_new_cid(relation, &oldtup);
> > +            if (newbuf_needs_wal)
> > +                log_heap_new_cid(relation, heaptup);
> 
> These if(...) conditions are always true, since they're redundant with
> RelationIsAccessibleInLogicalDecoding(relation).  Remove the conditions or
> replace them with asserts.

Ah.. I see. It is not the minimal case. Added a comment and an
assertion. (0006)

+  * catalog. Both oldbuf_needs_wal and newbuf_needs_wal must be true
+  * when logical decoding is active.

> By using DELETE and INSERT records to implement an UPDATE, you lose the ctid
> chain and infomask bits that were present before crash recovery.  If that's
> okay in these circumstances, please write a comment explaining why.

Sounds reasonable. Added a comment. (Honestly I completely forgot
about that.. Thanks!) (0006)

+  * Insert log record. Using delete or insert log loses HOT chain
+  * information but that happens only when newbuf is different from
+  * buffer, where HOT cannot happen.


> > @@ -1096,7 +1097,9 @@ _bt_insertonpg(Relation rel,
> >   |  |  | cachedBlock = BufferGetBlockNumber(buf);
> >  
> >   |  | /* XLOG stuff */
> > - |  | if (RelationNeedsWAL(rel))
> > + |  | if (BufferNeedsWAL(rel, buf) ||
> > + |  |  | (!P_ISLEAF(lpageop) && BufferNeedsWAL(rel, cbuf)) ||
> > + |  |  | (BufferIsValid(metabuf) && BufferNeedsWAL(rel, metabuf)))
> 
> This appears to have the same problem that heap_update() had in v7; if
> BufferNeedsWAL(rel, buf) is false and BufferNeedsWAL(rel, metabuf) is true, we
> emit WAL for both buffers.  If that can't actually happen today, use asserts.
> 
> I don't want the btree code to get significantly more complicated in order to
> participate in the RelWalRequirement system.  If btree code would get more
> complicated, it's better to have btree continue using the old system.  If
> btree's complexity would be essentially unchanged, it's still good to use the
> new system.

It was broken. I tried to fix it but page split baffled me. I
reverted it and added a comment there explaining the reason for
not applying BufferNeedsWAL stuff to nbtree. WAL-logging skip
feature is now restricted to work only on non-index
heaps. (getWalSkipEntry and RecordPendingSync in 0005)

> > @@ -334,6 +334,10 @@ btbuild(Relation heap, Relation index, IndexInfo *indexInfo)
> >  
> >   | reltuples = _bt_spools_heapscan(heap, index, &buildstate, indexInfo);
> >  
> > + | /* Skip WAL-logging if wal_level = minimal */
> > + | if (!XLogIsNeeded())
> > + |  | RecordWALSkipping(index);
> 
> _bt_load() still has an smgrimmedsync(wstate->index->rd_smgr, MAIN_FORKNUM),
> which should be unnecessary after you add this end-of-transaction sync.  Also,
> this code can reach an assertion failure at wal_level=minimal:
> 
> 910024 2019-03-31 19:12:13.728 GMT LOG:  statement: create temp table x (c int primary key)
> 910024 2019-03-31 19:12:13.729 GMT DEBUG:  CREATE TABLE / PRIMARY KEY will create implicit index "x_pkey" for table
"x"
> 910024 2019-03-31 19:12:13.730 GMT DEBUG:  building index "x_pkey" on table "x" serially
> TRAP: FailedAssertion("!(((rel)->rd_rel->relpersistence == 'p'))", File: "storage.c", Line: 460)

This is what I mentioned as "broken" above. Sorry for the
silly mistake.

> Also, please fix whitespace problems that "git diff --check master" reports.

Thanks. Good to know the command.


After all, this patch set contains the following files.

v10-0001-TAP-test-for-copy-truncation-optimization.patch

 Tap test script. Multi-level subtransaction case is added.

v10-0002-Write-WAL-for-empty-nbtree-index-build.patch

 As mentioned above, nbtree patch has been shrinked to the
 initial state of a workaround. Comment is rewrited. (v9-0002 +
 v9-0008)

v10-0003-Move-XLOG-stuff-from-heap_insert-and-heap_delete.patch

 Not substantially changed.

v10-0004-Add-new-interface-to-TableAmRoutine.patch

 New file. Adds two new interfaces to TableAmRoutine and modified
 one interface.

v10-0005-Add-infrastructure-to-WAL-logging-skip-feature.patch

 Heavily revised version of v9-0004.
   Some functions are renamed.
   Fixed subtransaction handling.
   Added STORAGE_elog() stuff.
   Uses table-am functions.
   Changes heapam stuff.

v10-0006-Fix-WAL-skipping-feature.patch

  Revised version of v9-0005 + v9-0006 + v9-0007.

    Added comment and assertion in heap_insert().

v10-0007-Remove-TABLE-HEAP_INSERT_SKIP_WAL.patch

 Separated from v9-0005 so that subsequent patches are sane.

 Removes TABLE/HEAP_ISNERT_SKIP_WAL.
 
regards.

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
From 55c85f06a9dc0a77f4cc6b02d4538b2e7169b3dc Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 11 Oct 2018 10:03:21 +0900
Subject: [PATCH 1/7] TAP test for copy-truncation optimization.

---
 src/test/recovery/t/017_wal_optimize.pl | 291 ++++++++++++++++++++++++++++++++
 1 file changed, 291 insertions(+)
 create mode 100644 src/test/recovery/t/017_wal_optimize.pl

diff --git a/src/test/recovery/t/017_wal_optimize.pl b/src/test/recovery/t/017_wal_optimize.pl
new file mode 100644
index 0000000000..4fa8be728e
--- /dev/null
+++ b/src/test/recovery/t/017_wal_optimize.pl
@@ -0,0 +1,291 @@
+# Test WAL replay for optimized TRUNCATE and COPY records
+#
+# WAL truncation is optimized in some cases with TRUNCATE and COPY queries
+# which sometimes interact badly with the other optimizations in line with
+# several setting values of wal_level, particularly when using "minimal" or
+# "replica".  The optimization may be enabled or disabled depending on the
+# scenarios dealt here, and should never result in any type of failures or
+# data loss.
+use strict;
+use warnings;
+
+use PostgresNode;
+use TestLib;
+use Test::More tests => 24;
+
+sub check_orphan_relfilenodes
+{
+    my($node, $test_name) = @_;
+
+    my $db_oid = $node->safe_psql('postgres',
+       "SELECT oid FROM pg_database WHERE datname = 'postgres'");
+    my $prefix = "base/$db_oid/";
+    my $filepaths_referenced = $node->safe_psql('postgres', "
+       SELECT pg_relation_filepath(oid) FROM pg_class
+       WHERE reltablespace = 0 and relpersistence <> 't' and
+       pg_relation_filepath(oid) IS NOT NULL;");
+    is_deeply([sort(map { "$prefix$_" }
+                    grep(/^[0-9]+$/,
+                         slurp_dir($node->data_dir . "/$prefix")))],
+              [sort split /\n/, $filepaths_referenced],
+              $test_name);
+    return;
+}
+
+# Wrapper routine tunable for wal_level.
+sub run_wal_optimize
+{
+    my $wal_level = shift;
+
+    # Primary needs to have wal_level = minimal here
+    my $node = get_new_node("node_$wal_level");
+    $node->init;
+    $node->append_conf('postgresql.conf', qq(
+wal_level = $wal_level
+));
+    $node->start;
+
+    # Setup
+    my $tablespace_dir = $node->basedir . '/tablespace_other';
+    mkdir ($tablespace_dir);
+    $tablespace_dir = TestLib::real_dir($tablespace_dir);
+    $node->safe_psql('postgres',
+       "CREATE TABLESPACE other LOCATION '$tablespace_dir';");
+
+    # Test direct truncation optimization.  No tuples
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test1 (id serial PRIMARY KEY);
+        TRUNCATE test1;
+        COMMIT;");
+
+    $node->stop('immediate');
+    $node->start;
+
+    my $result = $node->safe_psql('postgres', "SELECT count(*) FROM test1;");
+    is($result, qq(0),
+       "wal_level = $wal_level, optimized truncation with empty table");
+
+    # Test truncation with inserted tuples within the same transaction.
+    # Tuples inserted after the truncation should be seen.
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test2 (id serial PRIMARY KEY);
+        INSERT INTO test2 VALUES (DEFAULT);
+        TRUNCATE test2;
+        INSERT INTO test2 VALUES (DEFAULT);
+        COMMIT;");
+
+    $node->stop('immediate');
+    $node->start;
+
+    $result = $node->safe_psql('postgres', "SELECT count(*) FROM test2;");
+    is($result, qq(1),
+       "wal_level = $wal_level, optimized truncation with inserted table");
+
+    # Data file for COPY query in follow-up tests.
+    my $basedir = $node->basedir;
+    my $copy_file = "$basedir/copy_data.txt";
+    TestLib::append_to_file($copy_file, qq(20000,30000
+20001,30001
+20002,30002));
+
+    # Test truncation with inserted tuples using COPY.  Tuples copied after the
+    # truncation should be seen.
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test3 (id serial PRIMARY KEY, id2 int);
+        INSERT INTO test3 (id, id2) VALUES (DEFAULT, generate_series(1,3000));
+        TRUNCATE test3;
+        COPY test3 FROM '$copy_file' DELIMITER ',';
+        COMMIT;");
+    $node->stop('immediate');
+    $node->start;
+    $result = $node->safe_psql('postgres', "SELECT count(*) FROM test3;");
+    is($result, qq(3),
+       "wal_level = $wal_level, optimized truncation with copied table");
+
+    # Like previous test, but rollback SET TABLESPACE in a subtransaction.
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test3a (id serial PRIMARY KEY, id2 int);
+        INSERT INTO test3a (id, id2) VALUES (DEFAULT, generate_series(1,3000));
+        TRUNCATE test3a;
+        SAVEPOINT s; ALTER TABLE test3a SET TABLESPACE other; ROLLBACK TO s;
+        COPY test3a FROM '$copy_file' DELIMITER ',';
+        COMMIT;");
+    $node->stop('immediate');
+    $node->start;
+    $result = $node->safe_psql('postgres', "SELECT count(*) FROM test3a;");
+    is($result, qq(3),
+       "wal_level = $wal_level, SET TABLESPACE in subtransaction");
+
+    # in different subtransaction patterns
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test3a2 (id serial PRIMARY KEY, id2 int);
+        INSERT INTO test3a2 (id, id2) VALUES (DEFAULT, generate_series(1,3000));
+        TRUNCATE test3a2;
+        SAVEPOINT s; ALTER TABLE test3a SET TABLESPACE other; RELEASE s;
+        COPY test3a2 FROM '$copy_file' DELIMITER ',';
+        COMMIT;");
+    $node->stop('immediate');
+    $node->start;
+    $result = $node->safe_psql('postgres', "SELECT count(*) FROM test3a;");
+    is($result, qq(3),
+       "wal_level = $wal_level, SET TABLESPACE in subtransaction");
+
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test3a3 (id serial PRIMARY KEY, id2 int);
+        INSERT INTO test3a3 (id, id2) VALUES (DEFAULT, generate_series(1,3000));
+        TRUNCATE test3a3;
+        SAVEPOINT s;
+            ALTER TABLE test3a3 SET TABLESPACE other;
+            SAVEPOINT s2;
+                ALTER TABLE test3a3 SET TABLESPACE pg_default;
+            ROLLBACK TO s2;
+            SAVEPOINT s2;
+                ALTER TABLE test3a3 SET TABLESPACE pg_default;
+            RELEASE s2;
+        ROLLBACK TO s;
+        COPY test3a3 FROM '$copy_file' DELIMITER ',';
+        COMMIT;");
+    $node->stop('immediate');
+    $node->start;
+    $result = $node->safe_psql('postgres', "SELECT count(*) FROM test3a;");
+    is($result, qq(3),
+       "wal_level = $wal_level, SET TABLESPACE in subtransaction");
+
+    # UPDATE touches two buffers; one is BufferNeedsWAL(); the other is not.
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test3b (id serial PRIMARY KEY, id2 int);
+        INSERT INTO test3b (id, id2) VALUES (DEFAULT, generate_series(1,10000));
+        COPY test3b FROM '$copy_file' DELIMITER ',';  -- set sync_above
+        UPDATE test3b SET id2 = id2 + 1;
+        DELETE FROM test3b;
+        COMMIT;");
+    $node->stop('immediate');
+    $node->start;
+    $result = $node->safe_psql('postgres', "SELECT count(*) FROM test3b;");
+    is($result, qq(0),
+       "wal_level = $wal_level, UPDATE of logged page extends relation");
+
+    # Test truncation with inserted tuples using both INSERT and COPY. Tuples
+    # inserted after the truncation should be seen.
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test4 (id serial PRIMARY KEY, id2 int);
+        INSERT INTO test4 (id, id2) VALUES (DEFAULT, generate_series(1,10000));
+        TRUNCATE test4;
+        INSERT INTO test4 (id, id2) VALUES (DEFAULT, 10000);
+        COPY test4 FROM '$copy_file' DELIMITER ',';
+        INSERT INTO test4 (id, id2) VALUES (DEFAULT, 10000);
+        COMMIT;");
+
+    $node->stop('immediate');
+    $node->start;
+    $result = $node->safe_psql('postgres', "SELECT count(*) FROM test4;");
+    is($result, qq(5),
+       "wal_level = $wal_level, optimized truncation with inserted/copied table");
+
+    # Test consistency of COPY with INSERT for table created in the same
+    # transaction.
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test5 (id serial PRIMARY KEY, id2 int);
+        INSERT INTO test5 VALUES (DEFAULT, 1);
+        COPY test5 FROM '$copy_file' DELIMITER ',';
+        COMMIT;");
+    $node->stop('immediate');
+    $node->start;
+    $result = $node->safe_psql('postgres', "SELECT count(*) FROM test5;");
+    is($result, qq(4),
+       "wal_level = $wal_level, replay of optimized copy with inserted table");
+
+    # Test consistency of COPY that inserts more to the same table using
+    # triggers.  If the INSERTS from the trigger go to the same block data
+    # is copied to, and the INSERTs are WAL-logged, WAL replay will fail when
+    # it tries to replay the WAL record but the "before" image doesn't match,
+    # because not all changes were WAL-logged.
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test6 (id serial PRIMARY KEY, id2 text);
+        CREATE FUNCTION test6_before_row_trig() RETURNS trigger
+          LANGUAGE plpgsql as \$\$
+          BEGIN
+            IF new.id2 NOT LIKE 'triggered%' THEN
+              INSERT INTO test6 VALUES (DEFAULT, 'triggered row before' || NEW.id2);
+            END IF;
+            RETURN NEW;
+          END; \$\$;
+        CREATE FUNCTION test6_after_row_trig() RETURNS trigger
+          LANGUAGE plpgsql as \$\$
+          BEGIN
+            IF new.id2 NOT LIKE 'triggered%' THEN
+              INSERT INTO test6 VALUES (DEFAULT, 'triggered row after' || OLD.id2);
+            END IF;
+            RETURN NEW;
+          END; \$\$;
+        CREATE TRIGGER test6_before_row_insert
+          BEFORE INSERT ON test6
+          FOR EACH ROW EXECUTE PROCEDURE test6_before_row_trig();
+        CREATE TRIGGER test6_after_row_insert
+          AFTER INSERT ON test6
+          FOR EACH ROW EXECUTE PROCEDURE test6_after_row_trig();
+        COPY test6 FROM '$copy_file' DELIMITER ',';
+        COMMIT;");
+    $node->stop('immediate');
+    $node->start;
+    $result = $node->safe_psql('postgres', "SELECT count(*) FROM test6;");
+    is($result, qq(9),
+       "wal_level = $wal_level, replay of optimized copy with before trigger");
+
+    # Test consistency of INSERT, COPY and TRUNCATE in same transaction block
+    # with TRUNCATE triggers.
+    $node->safe_psql('postgres', "
+        BEGIN;
+        CREATE TABLE test7 (id serial PRIMARY KEY, id2 text);
+        CREATE FUNCTION test7_before_stat_trig() RETURNS trigger
+          LANGUAGE plpgsql as \$\$
+          BEGIN
+            INSERT INTO test7 VALUES (DEFAULT, 'triggered stat before');
+            RETURN NULL;
+          END; \$\$;
+        CREATE FUNCTION test7_after_stat_trig() RETURNS trigger
+          LANGUAGE plpgsql as \$\$
+          BEGIN
+            INSERT INTO test7 VALUES (DEFAULT, 'triggered stat before');
+            RETURN NULL;
+          END; \$\$;
+        CREATE TRIGGER test7_before_stat_truncate
+          BEFORE TRUNCATE ON test7
+          FOR EACH STATEMENT EXECUTE PROCEDURE test7_before_stat_trig();
+        CREATE TRIGGER test7_after_stat_truncate
+          AFTER TRUNCATE ON test7
+          FOR EACH STATEMENT EXECUTE PROCEDURE test7_after_stat_trig();
+        INSERT INTO test7 VALUES (DEFAULT, 1);
+        TRUNCATE test7;
+        COPY test7 FROM '$copy_file' DELIMITER ',';
+        COMMIT;");
+    $node->stop('immediate');
+    $node->start;
+    $result = $node->safe_psql('postgres', "SELECT count(*) FROM test7;");
+    is($result, qq(4),
+       "wal_level = $wal_level, replay of optimized copy with before trigger");
+
+    # Test redo of temp table creation.
+    $node->safe_psql('postgres', "
+        CREATE TEMP TABLE test8 (id serial PRIMARY KEY, id2 text);");
+    $node->stop('immediate');
+    $node->start;
+
+    check_orphan_relfilenodes($node, "wal_level = $wal_level, no orphan relfilenode remains");
+
+    return;
+}
+
+# Run same test suite for multiple wal_level values.
+run_wal_optimize("minimal");
+run_wal_optimize("replica");
-- 
2.16.3

From fda405f0f0f9a5fa816c426adc5eb8850f20f6eb Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Thu, 11 Oct 2018 10:03:08 +0900
Subject: [PATCH 2/7] Write WAL for empty nbtree index build

After relation truncation indexes are also rebuild. It doesn't emit
WAL in minimal mode and if truncation happened within its creation
transaction, crash recovery leaves an empty index heap, which is
considered broken. This patch forces to emit WAL when an index_build
turns into empty nbtree index.
---
 src/backend/access/nbtree/nbtsort.c | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 14d9545768..5551a9c227 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -622,8 +622,16 @@ _bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
     /* Ensure rd_smgr is open (could have been closed by relcache flush!) */
     RelationOpenSmgr(wstate->index);
 
-    /* XLOG stuff */
-    if (wstate->btws_use_wal)
+    /* XLOG stuff
+     *
+     * Even when wal_level is minimal, WAL is required here if truncation
+     * happened after being created in the same transaction. This is hacky but
+     * we cannot use BufferNeedsWAL() stuff for nbtree since it can emit
+     * atomic WAL records on multiple buffers.
+     */
+    if (wstate->btws_use_wal ||
+        (RelationNeedsWAL(wstate->index) &&
+         (blkno == BTREE_METAPAGE && BTPageGetMeta(page)->btm_root == 0)))
     {
         /* We use the heap NEWPAGE record type for this */
         log_newpage(&wstate->index->rd_node, MAIN_FORKNUM, blkno, page, true);
-- 
2.16.3

From d15655d7bfe0b44c3b027ccdcc36fe0087f823c1 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Mon, 25 Mar 2019 13:29:50 +0900
Subject: [PATCH 3/7] Move XLOG stuff from heap_insert and heap_delete

Succeeding commit makes heap_update emit insert and delete WAL
records. Move out XLOG stuff for insert and delete so that heap_update
can use the stuff.
---
 src/backend/access/heap/heapam.c | 275 ++++++++++++++++++++++-----------------
 1 file changed, 156 insertions(+), 119 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 05ceb6550d..267570b461 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -72,6 +72,11 @@
 
 static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
                     TransactionId xid, CommandId cid, int options);
+static XLogRecPtr log_heap_insert(Relation relation, Buffer buffer,
+                HeapTuple heaptup, int options, bool all_visible_cleared);
+static XLogRecPtr log_heap_delete(Relation relation, Buffer buffer,
+                HeapTuple tp, HeapTuple old_key_tuple, TransactionId new_xmax,
+                bool changingPart, bool all_visible_cleared);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
                 Buffer newbuf, HeapTuple oldtup,
                 HeapTuple newtup, HeapTuple old_key_tup,
@@ -1875,6 +1880,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
     TransactionId xid = GetCurrentTransactionId();
     HeapTuple    heaptup;
     Buffer        buffer;
+    Page        page;
     Buffer        vmbuffer = InvalidBuffer;
     bool        all_visible_cleared = false;
 
@@ -1911,16 +1917,18 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
      */
     CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
 
+    page = BufferGetPage(buffer);
+
     /* NO EREPORT(ERROR) from here till changes are logged */
     START_CRIT_SECTION();
 
     RelationPutHeapTuple(relation, buffer, heaptup,
                          (options & HEAP_INSERT_SPECULATIVE) != 0);
 
-    if (PageIsAllVisible(BufferGetPage(buffer)))
+    if (PageIsAllVisible(page))
     {
         all_visible_cleared = true;
-        PageClearAllVisible(BufferGetPage(buffer));
+        PageClearAllVisible(page);
         visibilitymap_clear(relation,
                             ItemPointerGetBlockNumber(&(heaptup->t_self)),
                             vmbuffer, VISIBILITYMAP_VALID_BITS);
@@ -1942,75 +1950,10 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
     /* XLOG stuff */
     if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
     {
-        xl_heap_insert xlrec;
-        xl_heap_header xlhdr;
         XLogRecPtr    recptr;
-        Page        page = BufferGetPage(buffer);
-        uint8        info = XLOG_HEAP_INSERT;
-        int            bufflags = 0;
 
-        /*
-         * If this is a catalog, we need to transmit combocids to properly
-         * decode, so log that as well.
-         */
-        if (RelationIsAccessibleInLogicalDecoding(relation))
-            log_heap_new_cid(relation, heaptup);
-
-        /*
-         * If this is the single and first tuple on page, we can reinit the
-         * page instead of restoring the whole thing.  Set flag, and hide
-         * buffer references from XLogInsert.
-         */
-        if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
-            PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
-        {
-            info |= XLOG_HEAP_INIT_PAGE;
-            bufflags |= REGBUF_WILL_INIT;
-        }
-
-        xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
-        xlrec.flags = 0;
-        if (all_visible_cleared)
-            xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED;
-        if (options & HEAP_INSERT_SPECULATIVE)
-            xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
-        Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));
-
-        /*
-         * For logical decoding, we need the tuple even if we're doing a full
-         * page write, so make sure it's included even if we take a full-page
-         * image. (XXX We could alternatively store a pointer into the FPW).
-         */
-        if (RelationIsLogicallyLogged(relation) &&
-            !(options & HEAP_INSERT_NO_LOGICAL))
-        {
-            xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
-            bufflags |= REGBUF_KEEP_DATA;
-        }
-
-        XLogBeginInsert();
-        XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);
-
-        xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
-        xlhdr.t_infomask = heaptup->t_data->t_infomask;
-        xlhdr.t_hoff = heaptup->t_data->t_hoff;
-
-        /*
-         * note we mark xlhdr as belonging to buffer; if XLogInsert decides to
-         * write the whole page to the xlog, we don't need to store
-         * xl_heap_header in the xlog.
-         */
-        XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
-        XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
-        /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
-        XLogRegisterBufData(0,
-                            (char *) heaptup->t_data + SizeofHeapTupleHeader,
-                            heaptup->t_len - SizeofHeapTupleHeader);
-
-        /* filtering by origin on a row level is much more efficient */
-        XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
-
-        recptr = XLogInsert(RM_HEAP_ID, info);
+        recptr = log_heap_insert(relation, buffer, heaptup,
+                                 options, all_visible_cleared);
 
         PageSetLSN(page, recptr);
     }
@@ -2730,58 +2673,10 @@ l1:
      */
     if (RelationNeedsWAL(relation))
     {
-        xl_heap_delete xlrec;
-        xl_heap_header xlhdr;
         XLogRecPtr    recptr;
 
-        /* For logical decode we need combocids to properly decode the catalog */
-        if (RelationIsAccessibleInLogicalDecoding(relation))
-            log_heap_new_cid(relation, &tp);
-
-        xlrec.flags = 0;
-        if (all_visible_cleared)
-            xlrec.flags |= XLH_DELETE_ALL_VISIBLE_CLEARED;
-        if (changingPart)
-            xlrec.flags |= XLH_DELETE_IS_PARTITION_MOVE;
-        xlrec.infobits_set = compute_infobits(tp.t_data->t_infomask,
-                                              tp.t_data->t_infomask2);
-        xlrec.offnum = ItemPointerGetOffsetNumber(&tp.t_self);
-        xlrec.xmax = new_xmax;
-
-        if (old_key_tuple != NULL)
-        {
-            if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
-                xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE;
-            else
-                xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
-        }
-
-        XLogBeginInsert();
-        XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);
-
-        XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
-
-        /*
-         * Log replica identity of the deleted tuple if there is one
-         */
-        if (old_key_tuple != NULL)
-        {
-            xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
-            xlhdr.t_infomask = old_key_tuple->t_data->t_infomask;
-            xlhdr.t_hoff = old_key_tuple->t_data->t_hoff;
-
-            XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
-            XLogRegisterData((char *) old_key_tuple->t_data
-                             + SizeofHeapTupleHeader,
-                             old_key_tuple->t_len
-                             - SizeofHeapTupleHeader);
-        }
-
-        /* filtering by origin on a row level is much more efficient */
-        XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
-
-        recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
-
+        recptr = log_heap_delete(relation, buffer, &tp, old_key_tuple, new_xmax,
+                                 changingPart, all_visible_cleared);
         PageSetLSN(page, recptr);
     }
 
@@ -7245,6 +7140,148 @@ log_heap_visible(RelFileNode rnode, Buffer heap_buffer, Buffer vm_buffer,
     return recptr;
 }
 
+/*
+ * Perform XLogInsert for a heap-insert operation.  Caller must already
+ * have modified the buffer and marked it dirty.
+ */
+XLogRecPtr
+log_heap_insert(Relation relation, Buffer buffer,
+                HeapTuple heaptup, int options, bool all_visible_cleared)
+{
+    xl_heap_insert xlrec;
+    xl_heap_header xlhdr;
+    uint8        info = XLOG_HEAP_INSERT;
+    int            bufflags = 0;
+    Page        page = BufferGetPage(buffer);
+
+    /*
+     * If this is a catalog, we need to transmit combocids to properly
+     * decode, so log that as well.
+     */
+    if (RelationIsAccessibleInLogicalDecoding(relation))
+        log_heap_new_cid(relation, heaptup);
+
+    /*
+     * If this is the single and first tuple on page, we can reinit the
+     * page instead of restoring the whole thing.  Set flag, and hide
+     * buffer references from XLogInsert.
+     */
+    if (ItemPointerGetOffsetNumber(&(heaptup->t_self)) == FirstOffsetNumber &&
+        PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
+    {
+        info |= XLOG_HEAP_INIT_PAGE;
+        bufflags |= REGBUF_WILL_INIT;
+    }
+
+    xlrec.offnum = ItemPointerGetOffsetNumber(&heaptup->t_self);
+    xlrec.flags = 0;
+    if (all_visible_cleared)
+        xlrec.flags |= XLH_INSERT_ALL_VISIBLE_CLEARED;
+    if (options & HEAP_INSERT_SPECULATIVE)
+        xlrec.flags |= XLH_INSERT_IS_SPECULATIVE;
+    Assert(ItemPointerGetBlockNumber(&heaptup->t_self) == BufferGetBlockNumber(buffer));
+
+    /*
+     * For logical decoding, we need the tuple even if we're doing a full
+     * page write, so make sure it's included even if we take a full-page
+     * image. (XXX We could alternatively store a pointer into the FPW).
+     */
+    if (RelationIsLogicallyLogged(relation) &&
+        !(options & HEAP_INSERT_NO_LOGICAL))
+    {
+        xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
+        bufflags |= REGBUF_KEEP_DATA;
+    }
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, SizeOfHeapInsert);
+
+    xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;
+    xlhdr.t_infomask = heaptup->t_data->t_infomask;
+    xlhdr.t_hoff = heaptup->t_data->t_hoff;
+
+    /*
+     * note we mark xlhdr as belonging to buffer; if XLogInsert decides to
+     * write the whole page to the xlog, we don't need to store
+     * xl_heap_header in the xlog.
+     */
+    XLogRegisterBuffer(0, buffer, REGBUF_STANDARD | bufflags);
+    XLogRegisterBufData(0, (char *) &xlhdr, SizeOfHeapHeader);
+    /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
+    XLogRegisterBufData(0,
+                        (char *) heaptup->t_data + SizeofHeapTupleHeader,
+                        heaptup->t_len - SizeofHeapTupleHeader);
+
+    /* filtering by origin on a row level is much more efficient */
+    XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+    return XLogInsert(RM_HEAP_ID, info);
+}
+
+/*
+ * Perform XLogInsert for a heap-insert operation.  Caller must already
+ * have modified the buffer and marked it dirty.
+ *
+ * NB: heap_abort_speculative() uses the same xlog record and replay
+ * routines.
+ */
+static XLogRecPtr
+log_heap_delete(Relation relation, Buffer buffer,
+                HeapTuple tp, HeapTuple old_key_tuple, TransactionId new_xmax,
+                bool changingPart, bool all_visible_cleared)
+{
+    xl_heap_delete xlrec;
+    xl_heap_header xlhdr;
+
+    /* For logical decode we need combocids to properly decode the catalog */
+    if (RelationIsAccessibleInLogicalDecoding(relation))
+        log_heap_new_cid(relation, tp);
+
+    xlrec.flags = 0;
+    if (all_visible_cleared)
+        xlrec.flags |= XLH_DELETE_ALL_VISIBLE_CLEARED;
+    if (changingPart)
+        xlrec.flags |= XLH_DELETE_IS_PARTITION_MOVE;
+    xlrec.infobits_set = compute_infobits(tp->t_data->t_infomask,
+                                          tp->t_data->t_infomask2);
+    xlrec.offnum = ItemPointerGetOffsetNumber(&tp->t_self);
+    xlrec.xmax = new_xmax;
+
+    if (old_key_tuple != NULL)
+    {
+        if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+            xlrec.flags |= XLH_DELETE_CONTAINS_OLD_TUPLE;
+        else
+            xlrec.flags |= XLH_DELETE_CONTAINS_OLD_KEY;
+    }
+
+    XLogBeginInsert();
+    XLogRegisterData((char *) &xlrec, SizeOfHeapDelete);
+
+    XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
+
+    /*
+     * Log replica identity of the deleted tuple if there is one
+     */
+    if (old_key_tuple != NULL)
+    {
+        xlhdr.t_infomask2 = old_key_tuple->t_data->t_infomask2;
+        xlhdr.t_infomask = old_key_tuple->t_data->t_infomask;
+        xlhdr.t_hoff = old_key_tuple->t_data->t_hoff;
+
+        XLogRegisterData((char *) &xlhdr, SizeOfHeapHeader);
+        XLogRegisterData((char *) old_key_tuple->t_data
+                         + SizeofHeapTupleHeader,
+                         old_key_tuple->t_len
+                         - SizeofHeapTupleHeader);
+    }
+
+    /* filtering by origin on a row level is much more efficient */
+    XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+
+    return XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE);
+}
+
 /*
  * Perform XLogInsert for a heap-update operation.  Caller must already
  * have modified the buffer(s) and marked them dirty.
-- 
2.16.3

From 255e3b3d5998318a9aa7abd0d3f9dab67dd0053a Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 2 Apr 2019 11:53:36 +0900
Subject: [PATCH 4/7] Add new interface to TableAmRoutine

Add two interface functions to TableAmRoutine, which are related to
WAL-skipping feature.
---
 src/backend/access/table/tableamapi.c |  4 ++
 src/include/access/tableam.h          | 79 +++++++++++++++++++++++------------
 2 files changed, 56 insertions(+), 27 deletions(-)

diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c
index 51c0deaaf2..fef4e523e8 100644
--- a/src/backend/access/table/tableamapi.c
+++ b/src/backend/access/table/tableamapi.c
@@ -94,6 +94,10 @@ GetTableAmRoutine(Oid amhandler)
            (routine->scan_bitmap_next_tuple == NULL));
     Assert(routine->scan_sample_next_block != NULL);
     Assert(routine->scan_sample_next_tuple != NULL);
+    Assert((routine->relation_register_walskip == NULL) ==
+           (routine->relation_invalidate_walskip == NULL) &&
+           (routine->relation_register_walskip == NULL) ==
+           (routine->finish_bulk_insert == NULL));
 
     return routine;
 }
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 4efe178ed1..1a3a3c6711 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -382,19 +382,15 @@ typedef struct TableAmRoutine
 
     /*
      * Perform operations necessary to complete insertions made via
-     * tuple_insert and multi_insert with a BulkInsertState specified. This
-     * e.g. may e.g. used to flush the relation when inserting with
-     * TABLE_INSERT_SKIP_WAL specified.
+     * tuple_insert and multi_insert or page-level copying performed by ALTER
+     * TABLE rewrite. This is called at commit time if WAL-skipping is
+     * activated and the caller decided that any finish work is required to
+     * the file.
      *
-     * Typically callers of tuple_insert and multi_insert will just pass all
-     * the flags the apply to them, and each AM has to decide which of them
-     * make sense for it, and then only take actions in finish_bulk_insert
-     * that make sense for a specific AM.
-     *
-     * Optional callback.
+     * Optional callback. Must be provided when relation_register_walskip is
+     * provided.
      */
-    void        (*finish_bulk_insert) (Relation rel, int options);
-
+    void        (*finish_bulk_insert) (RelFileNode rnode, ForkNumber forkNum);
 
     /* ------------------------------------------------------------------------
      * DDL related functionality.
@@ -447,6 +443,26 @@ typedef struct TableAmRoutine
                                               double *tups_vacuumed,
                                               double *tups_recently_dead);
 
+    /*
+     * Register WAL-skipping on the current storage of rel. WAL-logging on the
+     * relation is skipped and the storage will be synced at commit. Returns
+     * true if successfully registered, and finish_bulk_insert() is called at
+     * commit.
+     *
+     * Optional callback.
+     */
+    void        (*relation_register_walskip) (Relation rel);
+
+    /*
+     * Invalidate registered WAL skipping on the current storage of rel. The
+     * function is called when the storage of the relation is going to be
+     * out-of-use after commit.
+     *
+     * Optional callback. Must be provided when relation_register_walskip is
+     * provided.
+     */
+    void        (*relation_invalidate_walskip) (Relation rel);
+
     /*
      * React to VACUUM command on the relation. The VACUUM might be user
      * triggered or by autovacuum. The specific actions performed by the AM
@@ -1026,8 +1042,7 @@ table_compute_xid_horizon_for_tuples(Relation rel,
  *
  *
  * The BulkInsertState object (if any; bistate can be NULL for default
- * behavior) is also just passed through to RelationGetBufferForTuple. If
- * `bistate` is provided, table_finish_bulk_insert() needs to be called.
+ * behavior) is also just passed through to RelationGetBufferForTuple.
  *
  * On return the slot's tts_tid and tts_tableOid are updated to reflect the
  * insertion. But note that any toasting of fields within the slot is NOT
@@ -1201,20 +1216,6 @@ table_lock_tuple(Relation rel, ItemPointer tid, Snapshot snapshot,
                                        flags, tmfd);
 }
 
-/*
- * Perform operations necessary to complete insertions made via
- * tuple_insert and multi_insert with a BulkInsertState specified. This
- * e.g. may e.g. used to flush the relation when inserting with
- * TABLE_INSERT_SKIP_WAL specified.
- */
-static inline void
-table_finish_bulk_insert(Relation rel, int options)
-{
-    /* optional callback */
-    if (rel->rd_tableam && rel->rd_tableam->finish_bulk_insert)
-        rel->rd_tableam->finish_bulk_insert(rel, options);
-}
-
 
 /* ------------------------------------------------------------------------
  * DDL related functionality.
@@ -1298,6 +1299,30 @@ table_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
                                                    tups_recently_dead);
 }
 
+/*
+ * Register WAL-skipping to the relation. WAL-logging is skipped for the new
+ * pages after this call and the relation file is going to be synced at
+ * commit.
+ */
+static inline void
+table_relation_register_walskip(Relation rel)
+{
+    if (rel->rd_tableam && rel->rd_tableam->relation_register_walskip)
+        rel->rd_tableam->relation_register_walskip(rel);
+}
+
+/*
+ * Unregister WAL-skipping to the relation. Call this when the relation is
+ * going to be out-of-use after commit. WAL-skipping continues but the
+ * relation won't be synced at commit.
+ */
+static inline void
+table_relation_invalidate_walskip(Relation rel)
+{
+    if (rel->rd_tableam && rel->rd_tableam->relation_invalidate_walskip)
+        rel->rd_tableam->relation_invalidate_walskip(rel);
+}
+
 /*
  * Perform VACUUM on the relation. The VACUUM can be user triggered or by
  * autovacuum. The specific actions performed by the AM will depend heavily on
-- 
2.16.3

From 24c9b0b9b9698d86fce3ad129400e3042a2e0afd Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 2 Apr 2019 18:05:10 +0900
Subject: [PATCH 5/7] Add infrastructure to WAL-logging skip feature

We used to optimize WAL-logging for truncation of in-transaction
crated tables in minimal mode by just signaling by
HEAP_INSERT_SKIP_WAL option on heap operations. This mechanism can
emit WAL records that results in corrupt state for certain series of
in-transaction operations. This patch provides infrastructure to track
pending at-commit fsyncs for a relation and in-transaction
truncations. table_relation_register_walskip() should be used to start
tracking before batch operations like COPY and CLUSTER, and use
BufferNeedsWAL() instead of RelationNeedsWAL() at the places related
to WAL-logging about heap-modifying operations, then remove
call to table_finish_bulk_insert() and the tableam intaface.
---
 src/backend/access/transam/xact.c   |  12 +-
 src/backend/catalog/storage.c       | 612 +++++++++++++++++++++++++++++++++---
 src/backend/commands/tablecmds.c    |   6 +-
 src/backend/storage/buffer/bufmgr.c |  39 ++-
 src/backend/utils/cache/relcache.c  |   3 +
 src/include/catalog/storage.h       |  17 +-
 src/include/storage/bufmgr.h        |   2 +
 src/include/utils/rel.h             |   7 +
 8 files changed, 631 insertions(+), 67 deletions(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index e9ed92b70b..33a83dc784 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2102,6 +2102,9 @@ CommitTransaction(void)
     /* close large objects before lower-level cleanup */
     AtEOXact_LargeObject(true);
 
+    /* Flush updates to relations that we didn't WAL-logged */
+    smgrFinishBulkInsert(true);
+
     /*
      * Mark serializable transaction as complete for predicate locking
      * purposes.  This should be done as late as we can put it and still allow
@@ -2334,6 +2337,9 @@ PrepareTransaction(void)
     /* close large objects before lower-level cleanup */
     AtEOXact_LargeObject(true);
 
+    /* Flush updates to relations that we didn't WAL-logged */
+    smgrFinishBulkInsert(true);
+
     /*
      * Mark serializable transaction as complete for predicate locking
      * purposes.  This should be done as late as we can put it and still allow
@@ -2659,6 +2665,7 @@ AbortTransaction(void)
     AtAbort_Notify();
     AtEOXact_RelationMap(false, is_parallel_worker);
     AtAbort_Twophase();
+    smgrFinishBulkInsert(false);    /* abandon pending syncs */
 
     /*
      * Advertise the fact that we aborted in pg_xact (assuming that we got as
@@ -4792,8 +4799,7 @@ CommitSubTransaction(void)
     AtEOSubXact_RelationCache(true, s->subTransactionId,
                               s->parent->subTransactionId);
     AtEOSubXact_Inval(true);
-    AtSubCommit_smgr();
-
+    AtSubCommit_smgr(s->subTransactionId, s->parent->subTransactionId);
     /*
      * The only lock we actually release here is the subtransaction XID lock.
      */
@@ -4970,7 +4976,7 @@ AbortSubTransaction(void)
         ResourceOwnerRelease(s->curTransactionOwner,
                              RESOURCE_RELEASE_AFTER_LOCKS,
                              false, false);
-        AtSubAbort_smgr();
+        AtSubAbort_smgr(s->subTransactionId, s->parent->subTransactionId);
 
         AtEOXact_GUC(false, s->gucNestLevel);
         AtEOSubXact_SPI(false, s->subTransactionId);
diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c
index 72242b2476..4cd112f86c 100644
--- a/src/backend/catalog/storage.c
+++ b/src/backend/catalog/storage.c
@@ -21,6 +21,7 @@
 
 #include "miscadmin.h"
 
+#include "access/tableam.h"
 #include "access/visibilitymap.h"
 #include "access/xact.h"
 #include "access/xlog.h"
@@ -29,10 +30,18 @@
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
 #include "storage/freespace.h"
-#include "storage/smgr.h"
+#include "utils/hsearch.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
+ /* #define STORAGEDEBUG */    /* turns DEBUG elogs on */
+
+#ifdef STORAGEDEBUG
+#define STORAGE_elog(...)                elog(__VA_ARGS__)
+#else
+#define STORAGE_elog(...)
+#endif
+
 /*
  * We keep a list of all relations (represented as RelFileNode values)
  * that have been created or deleted in the current transaction.  When
@@ -64,6 +73,61 @@ typedef struct PendingRelDelete
 
 static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
 
+/*
+ * We also track relation files (RelFileNode values) that have been created
+ * in the same transaction, and that have been modified without WAL-logging
+ * the action (an optimization possible with wal_level=minimal). When we are
+ * about to skip WAL-logging, a RelWalSkip entry is created, and
+ * 'skip_wal_min_blk' is set to the current size of the relation. Any
+ * operations on blocks < skip_wal_min_blk need to be WAL-logged as usual, but
+ * for operations on higher blocks, WAL-logging is skipped.
+
+ *
+ * NB: after WAL-logging has been skipped for a block, we must not WAL-log
+ * any subsequent actions on the same block either. Replaying the WAL record
+ * of the subsequent action might fail otherwise, as the "before" state of
+ * the block might not match, as the earlier actions were not WAL-logged.
+ * Likewise, after we have WAL-logged an operation for a block, we must
+ * WAL-log any subsequent operations on the same page as well. Replaying
+ * a possible full-page-image from the earlier WAL record would otherwise
+ * revert the page to the old state, even if we sync the relation at end
+ * of transaction.
+ *
+ * If a relation is truncated (without creating a new relfilenode), and we
+ * emit a WAL record of the truncation, we can't skip WAL-logging for any
+ * of the truncated blocks anymore, as replaying the truncation record will
+ * destroy all the data inserted after that. But if we have already decided
+ * to skip WAL-logging changes to a relation, and the relation is truncated,
+ * we don't need to WAL-log the truncation either.
+ *
+ * This mechanism is currently only used by heaps. Indexes are always
+ * WAL-logged. Also, this only applies for wal_level=minimal; with higher
+ * WAL levels we need the WAL for PITR/replication anyway.
+ */
+typedef struct RelWalSkip
+{
+    RelFileNode relnode;            /* relation created in same xact */
+    bool        forks[MAX_FORKNUM + 1];    /* target forknums */
+    BlockNumber skip_wal_min_blk;    /* WAL-logging skipped for blocks >=
+                                     * skip_wal_min_blk */
+    BlockNumber wal_log_min_blk;     /* The minimum blk number that requires
+                                     * WAL-logging even if skipped by the
+                                     * above*/
+    SubTransactionId create_sxid;    /* subxid where this entry is created */
+    SubTransactionId invalidate_sxid; /* subxid where this entry is
+                                       * invalidated */
+    const TableAmRoutine *tableam;    /* Table access routine */
+}    RelWalSkip;
+
+/* Relations that need to be fsync'd at commit */
+static HTAB *walSkipHash = NULL;
+
+static RelWalSkip *getWalSkipEntry(Relation rel, bool create);
+static RelWalSkip *getWalSkipEntryRNode(RelFileNode *node,
+                                                      bool create);
+static void smgrProcessWALSkipInval(bool isCommit, SubTransactionId mySubid,
+                        SubTransactionId parentSubid);
+
 /*
  * RelationCreateStorage
  *        Create physical storage for a relation.
@@ -261,31 +325,59 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
      */
     if (RelationNeedsWAL(rel))
     {
-        /*
-         * Make an XLOG entry reporting the file truncation.
-         */
-        XLogRecPtr    lsn;
-        xl_smgr_truncate xlrec;
+        RelWalSkip *walskip;
 
-        xlrec.blkno = nblocks;
-        xlrec.rnode = rel->rd_node;
-        xlrec.flags = SMGR_TRUNCATE_ALL;
-
-        XLogBeginInsert();
-        XLogRegisterData((char *) &xlrec, sizeof(xlrec));
-
-        lsn = XLogInsert(RM_SMGR_ID,
-                         XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+        /* get pending sync entry, create if not yet */
+        walskip = getWalSkipEntry(rel, true);
 
         /*
-         * Flush, because otherwise the truncation of the main relation might
-         * hit the disk before the WAL record, and the truncation of the FSM
-         * or visibility map. If we crashed during that window, we'd be left
-         * with a truncated heap, but the FSM or visibility map would still
-         * contain entries for the non-existent heap pages.
+         * walskip is null here if rel doesn't support WAL-logging skip,
+         * otherwise check for WAL-skipping status.
          */
-        if (fsm || vm)
-            XLogFlush(lsn);
+        if (walskip == NULL ||
+            walskip->skip_wal_min_blk == InvalidBlockNumber ||
+            walskip->skip_wal_min_blk < nblocks)
+        {
+            /*
+             * If WAL-skipping is enabled, this is the first time truncation
+             * of this relation in this transaction or truncation that leaves
+             * pages that need at-commit fsync.  Make an XLOG entry reporting
+             * the file truncation.
+             */
+            XLogRecPtr        lsn;
+            xl_smgr_truncate xlrec;
+
+            xlrec.blkno = nblocks;
+            xlrec.rnode = rel->rd_node;
+            xlrec.flags = SMGR_TRUNCATE_ALL;
+
+            XLogBeginInsert();
+            XLogRegisterData((char *) &xlrec, sizeof(xlrec));
+
+            lsn = XLogInsert(RM_SMGR_ID,
+                             XLOG_SMGR_TRUNCATE | XLR_SPECIAL_REL_UPDATE);
+
+            STORAGE_elog(DEBUG2,
+                         "WAL-logged truncation of rel %u/%u/%u to %u blocks",
+                         rel->rd_node.spcNode, rel->rd_node.dbNode,
+                         rel->rd_node.relNode, nblocks);
+            /*
+             * Flush, because otherwise the truncation of the main relation
+             * might hit the disk before the WAL record, and the truncation of
+             * the FSM or visibility map. If we crashed during that window,
+             * we'd be left with a truncated heap, but the FSM or visibility
+             * map would still contain entries for the non-existent heap
+             * pages.
+             */
+            if (fsm || vm)
+                XLogFlush(lsn);
+
+            if (walskip)
+            {
+                /* no longer skip WAL-logging for the blocks */
+                walskip->wal_log_min_blk = nblocks;
+            }
+        }
     }
 
     /* Do the real work */
@@ -296,8 +388,7 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
  * Copy a fork's data, block by block.
  */
 void
-RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
-                    ForkNumber forkNum, char relpersistence)
+RelationCopyStorage(Relation srcrel, SMgrRelation dst, ForkNumber forkNum)
 {
     PGAlignedBlock buf;
     Page        page;
@@ -305,6 +396,8 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
     bool        copying_initfork;
     BlockNumber nblocks;
     BlockNumber blkno;
+    SMgrRelation src = srcrel->rd_smgr;
+    char         relpersistence = srcrel->rd_rel->relpersistence;
 
     page = (Page) buf.data;
 
@@ -316,12 +409,33 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
     copying_initfork = relpersistence == RELPERSISTENCE_UNLOGGED &&
         forkNum == INIT_FORKNUM;
 
-    /*
-     * We need to log the copied data in WAL iff WAL archiving/streaming is
-     * enabled AND it's a permanent relation.
-     */
-    use_wal = XLogIsNeeded() &&
-        (relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork);
+    if (relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork)
+    {
+        /*
+         * We need to log the copied data in WAL iff WAL archiving/streaming
+         * is enabled AND it's a permanent relation.
+         */
+        if (XLogIsNeeded())
+            use_wal = true;
+
+        /*
+         * If the rel is WAL-logged, must fsync before commit.  We use
+         * heap_sync to ensure that the toast table gets fsync'd too.  (For a
+         * temp or unlogged rel we don't care since the data will be gone
+         * after a crash anyway.)
+         *
+         * It's obvious that we must do this when not WAL-logging the
+         * copy. It's less obvious that we have to do it even if we did
+         * WAL-log the copied pages. The reason is that since we're copying
+         * outside shared buffers, a CHECKPOINT occurring during the copy has
+         * no way to flush the previously written data to disk (indeed it
+         * won't know the new rel even exists).  A crash later on would replay
+         * WAL from the checkpoint, therefore it wouldn't replay our earlier
+         * WAL entries. If we do not fsync those pages here, they might still
+         * not be on disk when the crash occurs.
+         */
+        RecordPendingSync(srcrel, dst, forkNum);
+    }
 
     nblocks = smgrnblocks(src, forkNum);
 
@@ -358,24 +472,321 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
          */
         smgrextend(dst, forkNum, blkno, buf.data, true);
     }
+}
+
+/*
+ * Do changes to given heap page need to be WAL-logged?
+ *
+ * This takes into account any previous RecordPendingSync() requests.
+ *
+ * Note that it is required to check this before creating any WAL records for
+ * heap pages - it is not merely an optimization! WAL-logging a record, when
+ * we have already skipped a previous WAL record for the same page could lead
+ * to failure at WAL replay, as the "before" state expected by the record
+ * might not match what's on disk. Also, if the heap was truncated earlier, we
+ * must WAL-log any changes to the once-truncated blocks, because replaying
+ * the truncation record will destroy them.
+ */
+bool
+BufferNeedsWAL(Relation rel, Buffer buf)
+{
+    BlockNumber        blkno = InvalidBlockNumber;
+    RelWalSkip *walskip;
+
+    if (!RelationNeedsWAL(rel))
+        return false;
+
+    /* fetch existing pending sync entry */
+    walskip = getWalSkipEntry(rel, false);
 
     /*
-     * If the rel is WAL-logged, must fsync before commit.  We use heap_sync
-     * to ensure that the toast table gets fsync'd too.  (For a temp or
-     * unlogged rel we don't care since the data will be gone after a crash
-     * anyway.)
-     *
-     * It's obvious that we must do this when not WAL-logging the copy. It's
-     * less obvious that we have to do it even if we did WAL-log the copied
-     * pages. The reason is that since we're copying outside shared buffers, a
-     * CHECKPOINT occurring during the copy has no way to flush the previously
-     * written data to disk (indeed it won't know the new rel even exists).  A
-     * crash later on would replay WAL from the checkpoint, therefore it
-     * wouldn't replay our earlier WAL entries. If we do not fsync those pages
-     * here, they might still not be on disk when the crash occurs.
+     * no point in doing further work if we know that we don't skip
+     * WAL-logging.
      */
-    if (relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork)
-        smgrimmedsync(dst, forkNum);
+    if (!walskip)
+    {
+        STORAGE_elog(DEBUG2,
+                     "not skipping WAL-logging for rel %u/%u/%u block %u",
+                     rel->rd_node.spcNode, rel->rd_node.dbNode,
+                     rel->rd_node.relNode, BufferGetBlockNumber(buf));
+        return true;
+    }
+
+    Assert(BufferIsValid(buf));
+
+    blkno = BufferGetBlockNumber(buf);
+
+    /*
+     * We don't skip WAL-logging for pages that once done.
+     */
+    if (walskip->skip_wal_min_blk == InvalidBlockNumber ||
+        walskip->skip_wal_min_blk > blkno)
+    {
+        STORAGE_elog(DEBUG2, "not skipping WAL-logging for rel %u/%u/%u block %u, because skip_wal_min_blk is %u",
+                     rel->rd_node.spcNode, rel->rd_node.dbNode,
+                     rel->rd_node.relNode, blkno, walskip->skip_wal_min_blk);
+        return true;
+    }
+
+    /*
+     * we don't skip WAL-logging for blocks that have got WAL-logged
+     * truncation
+     */
+    if (walskip->wal_log_min_blk != InvalidBlockNumber &&
+        walskip->wal_log_min_blk <= blkno)
+    {
+        STORAGE_elog(DEBUG2, "not skipping WAL-logging for rel %u/%u/%u block %u, because wal_log_min_blk is %u",
+                     rel->rd_node.spcNode, rel->rd_node.dbNode,
+                     rel->rd_node.relNode, blkno, walskip->wal_log_min_blk);
+        return true;
+    }
+
+    STORAGE_elog(DEBUG2, "skipping WAL-logging for rel %u/%u/%u block %u",
+                 rel->rd_node.spcNode, rel->rd_node.dbNode,
+                 rel->rd_node.relNode, blkno);
+
+    return false;
+}
+
+bool
+BlockNeedsWAL(Relation rel, BlockNumber blkno)
+{
+    RelWalSkip *walskip;
+
+    if (!RelationNeedsWAL(rel))
+        return false;
+
+    /* fetch exising pending sync entry */
+    walskip = getWalSkipEntry(rel, false);
+
+    /*
+     * no point in doing further work if we know that we don't skip
+     * WAL-logging.
+     */
+    if (!walskip)
+        return true;
+
+    /*
+     * We don't skip WAL-logging for pages that once done.
+     */
+    if (walskip->skip_wal_min_blk == InvalidBlockNumber ||
+        walskip->skip_wal_min_blk > blkno)
+    {
+        STORAGE_elog(DEBUG2, "not skipping WAL-logging for rel %u/%u/%u block %u, because skip_wal_min_blk is %u",
+                     rel->rd_node.spcNode, rel->rd_node.dbNode,
+                     rel->rd_node.relNode, blkno, walskip->skip_wal_min_blk);
+        return true;
+    }
+
+    /*
+     * we don't skip WAL-logging for blocks that have got WAL-logged
+     * truncation
+     */
+    if (walskip->wal_log_min_blk != InvalidBlockNumber &&
+        walskip->wal_log_min_blk <= blkno)
+    {
+        STORAGE_elog(DEBUG2, "not skipping WAL-logging for rel %u/%u/%u block %u, because wal_log_min_blk is %u",
+                     rel->rd_node.spcNode, rel->rd_node.dbNode,
+                     rel->rd_node.relNode, blkno, walskip->wal_log_min_blk);
+
+        return true;
+    }
+
+    STORAGE_elog(DEBUG2, "skipping WAL-logging for rel %u/%u/%u block %u",
+                 rel->rd_node.spcNode, rel->rd_node.dbNode,
+                 rel->rd_node.relNode, blkno);
+
+    return false;
+}
+
+/*
+ * Remember that the given relation doesn't need WAL-logging for the blocks
+ * after the current block size and for the blocks that are going to be synced
+ * at commit.
+ */
+void
+RecordWALSkipping(Relation rel)
+{
+    RelWalSkip *walskip;
+
+    Assert(RelationNeedsWAL(rel));
+
+    /* get pending sync entry, create if not yet  */
+    walskip = getWalSkipEntry(rel, true);
+
+    if (walskip == NULL)
+        return;
+
+    /*
+     *  Record only the first registration.
+     */
+    if (walskip->skip_wal_min_blk != InvalidBlockNumber)
+    {
+        STORAGE_elog(DEBUG2, "WAL skipping for rel %u/%u/%u was already registered at block %u (new %u)",
+                     rel->rd_node.spcNode, rel->rd_node.dbNode,
+                     rel->rd_node.relNode, walskip->skip_wal_min_blk,
+                     RelationGetNumberOfBlocks(rel));
+        return;
+    }
+
+    STORAGE_elog(DEBUG2, "registering new WAL skipping rel %u/%u/%u at block %u",
+                 rel->rd_node.spcNode, rel->rd_node.dbNode,
+                 rel->rd_node.relNode, RelationGetNumberOfBlocks(rel));
+
+    walskip->skip_wal_min_blk = RelationGetNumberOfBlocks(rel);
+}
+
+/*
+ * Record commit-time file sync. This shouldn't be used mixing with
+ * RecordWALSkipping.
+ */
+void
+RecordPendingSync(Relation rel, SMgrRelation targetsrel, ForkNumber forknum)
+{
+    RelWalSkip *walskip;
+
+    Assert(RelationNeedsWAL(rel));
+
+    /* check for support for this feature */
+    if (rel->rd_tableam == NULL ||
+        rel->rd_tableam->relation_register_walskip == NULL)
+        return;
+
+    walskip = getWalSkipEntryRNode(&targetsrel->smgr_rnode.node, true);
+    walskip->forks[forknum] = true;
+    walskip->skip_wal_min_blk = 0;
+    walskip->tableam = rel->rd_tableam;
+
+    STORAGE_elog(DEBUG2,
+                 "registering new pending sync for rel %u/%u/%u at block %u",
+                 walskip->relnode.spcNode, walskip->relnode.dbNode,
+                 walskip->relnode.relNode, 0);
+}
+
+/*
+ * RelationInvalidateWALSkip() -- invalidate WAL-skip entry
+ */
+void
+RelationInvalidateWALSkip(Relation rel)
+{
+    RelWalSkip *walskip;
+
+    /* we know we don't have one */
+    if (rel->rd_nowalskip)
+        return;
+
+    walskip = getWalSkipEntry(rel, false);
+
+    if (!walskip)
+        return;
+
+    /*
+     * The state is reset at subtransaction commit/abort. No invalidation
+     * request must not come for the same relation in the same subtransaction.
+     */
+    Assert(walskip->invalidate_sxid == InvalidSubTransactionId);
+
+    walskip->invalidate_sxid = GetCurrentSubTransactionId();
+
+    STORAGE_elog(DEBUG2,
+                 "WAL skip of rel %u/%u/%u invalidated by sxid %d",
+                 walskip->relnode.spcNode, walskip->relnode.dbNode,
+                 walskip->relnode.relNode, walskip->invalidate_sxid);
+}
+
+/*
+ * getWalSkipEntry: get WAL skip entry.
+ *
+ * Returns WAL skip entry for the relation. The entry tracks WAL-skipping
+ * blocks for the relation.  The WAL-skipped blocks need fsync at commit time.
+ * Creates one if needed when create is true. If rel doesn't support this
+ * feature, returns true even if create is true.
+ */
+static inline RelWalSkip *
+getWalSkipEntry(Relation rel, bool create)
+{
+    RelWalSkip *walskip_entry = NULL;
+
+    if (rel->rd_walskip)
+        return rel->rd_walskip;
+
+    /* we know we don't have pending sync entry */
+    if (!create && rel->rd_nowalskip)
+        return NULL;
+
+    /* check for support for this feature */
+    if (rel->rd_tableam == NULL ||
+        rel->rd_tableam->relation_register_walskip == NULL)
+    {
+        rel->rd_nowalskip = true;
+        return NULL;
+    }
+
+    walskip_entry = getWalSkipEntryRNode(&rel->rd_node, create);
+
+    if (!walskip_entry)
+    {
+        /* prevent further hash lookup */
+        rel->rd_nowalskip = true;
+        return NULL;
+    }
+
+    walskip_entry->forks[MAIN_FORKNUM] = true;
+    walskip_entry->tableam = rel->rd_tableam;
+
+    /* hold shortcut in Relation */
+    rel->rd_nowalskip = false;
+    rel->rd_walskip = walskip_entry;
+
+    return walskip_entry;
+}
+
+/*
+ * getWalSkipEntryRNode: get WAL skip entry by rnode
+ *
+ * Returns a WAL skip entry for the RelFileNode.
+ */
+static RelWalSkip *
+getWalSkipEntryRNode(RelFileNode *rnode, bool create)
+{
+    RelWalSkip *walskip_entry = NULL;
+    bool            found;
+
+    if (!walSkipHash)
+    {
+        /* First time through: initialize the hash table */
+        HASHCTL        ctl;
+
+        if (!create)
+            return NULL;
+
+        MemSet(&ctl, 0, sizeof(ctl));
+        ctl.keysize = sizeof(RelFileNode);
+        ctl.entrysize = sizeof(RelWalSkip);
+        ctl.hash = tag_hash;
+        walSkipHash = hash_create("pending relation sync table", 5,
+                                   &ctl, HASH_ELEM | HASH_FUNCTION);
+    }
+
+    walskip_entry = (RelWalSkip *)
+        hash_search(walSkipHash, (void *) rnode,
+                    create ? HASH_ENTER: HASH_FIND,    &found);
+
+    if (!walskip_entry)
+        return NULL;
+
+    /* new entry created */
+    if (!found)
+    {
+        memset(&walskip_entry->forks, 0, sizeof(walskip_entry->forks));
+        walskip_entry->wal_log_min_blk = InvalidBlockNumber;
+        walskip_entry->skip_wal_min_blk = InvalidBlockNumber;
+        walskip_entry->create_sxid = GetCurrentSubTransactionId();
+        walskip_entry->invalidate_sxid = InvalidSubTransactionId;
+        walskip_entry->tableam = NULL;
+    }
+
+    return walskip_entry;
 }
 
 /*
@@ -506,6 +917,107 @@ smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr)
     return nrels;
 }
 
+/*
+ * Finish bulk insert of files.
+ */
+void
+smgrFinishBulkInsert(bool isCommit)
+{
+    if (!walSkipHash)
+        return;
+
+    if (isCommit)
+    {
+        HASH_SEQ_STATUS status;
+        RelWalSkip *walskip;
+
+        hash_seq_init(&status, walSkipHash);
+
+        while ((walskip = hash_seq_search(&status)) != NULL)
+        {
+            /*
+             * On commit, process valid entreis. Rollback doesn't need sync on
+             * all changes during the transaction.
+             */
+            if (walskip->skip_wal_min_blk != InvalidBlockNumber &&
+                walskip->invalidate_sxid == InvalidSubTransactionId)
+            {
+                int f;
+
+                FlushRelationBuffersWithoutRelCache(walskip->relnode, false);
+
+                /*
+                 * We mustn't create an entry when the table AM doesn't
+                 * support WAL-skipping.
+                 */
+                Assert (walskip->tableam->finish_bulk_insert);
+
+                /* flush all requested forks  */
+                for (f = MAIN_FORKNUM ; f <= MAX_FORKNUM ; f++)
+                {
+                    if (walskip->forks[f])
+                    {
+                        walskip->tableam->finish_bulk_insert(walskip->relnode, f);
+                        STORAGE_elog(DEBUG2, "finishing bulk insert to rel %u/%u/%u fork %d",
+                                     walskip->relnode.spcNode,
+                                     walskip->relnode.dbNode,
+                                     walskip->relnode.relNode, f);
+                    }
+                }
+            }
+        }
+    }
+
+    hash_destroy(walSkipHash);
+    walSkipHash = NULL;
+}
+
+/*
+ * Process pending invalidation of WAL skip happened in the subtransaction
+ */
+void
+smgrProcessWALSkipInval(bool isCommit, SubTransactionId mySubid,
+                        SubTransactionId parentSubid)
+{
+    HASH_SEQ_STATUS status;
+    RelWalSkip *walskip;
+
+    if (!walSkipHash)
+        return;
+
+    /* We expect that we don't have walSkipHash in almost all cases */
+    hash_seq_init(&status, walSkipHash);
+
+    while ((walskip = hash_seq_search(&status)) != NULL)
+    {
+        if (walskip->create_sxid == mySubid)
+        {
+            /*
+             * The entry was created in this subxact. Remove it on abort, or
+             * on commit after invalidation.
+             */
+            if (!isCommit || walskip->invalidate_sxid == mySubid)
+                hash_search(walSkipHash, &walskip->relnode,
+                            HASH_REMOVE, NULL);
+            /* Treat committing valid entry as creation by the parent. */
+            else if (walskip->invalidate_sxid == InvalidSubTransactionId)
+                walskip->create_sxid = parentSubid;
+        }
+        else if (walskip->invalidate_sxid == mySubid)
+        {
+            /*
+             * This entry was created elsewhere then invalidated by this
+             * subxact. Treat commit as invalidation by the parent. Otherwise
+             * cancel invalidation.
+             */
+            if (isCommit)
+                walskip->invalidate_sxid = parentSubid;
+            else
+                walskip->invalidate_sxid = InvalidSubTransactionId;
+        }
+    }
+}
+
 /*
  *    PostPrepare_smgr -- Clean up after a successful PREPARE
  *
@@ -535,7 +1047,7 @@ PostPrepare_smgr(void)
  * Reassign all items in the pending-deletes list to the parent transaction.
  */
 void
-AtSubCommit_smgr(void)
+AtSubCommit_smgr(SubTransactionId mySubid, SubTransactionId parentSubid)
 {
     int            nestLevel = GetCurrentTransactionNestLevel();
     PendingRelDelete *pending;
@@ -545,6 +1057,9 @@ AtSubCommit_smgr(void)
         if (pending->nestLevel >= nestLevel)
             pending->nestLevel = nestLevel - 1;
     }
+
+    /* Remove invalidated WAL skip in this subtransaction */
+    smgrProcessWALSkipInval(true, mySubid, parentSubid);
 }
 
 /*
@@ -555,9 +1070,12 @@ AtSubCommit_smgr(void)
  * subtransaction will not commit.
  */
 void
-AtSubAbort_smgr(void)
+AtSubAbort_smgr(SubTransactionId mySubid, SubTransactionId parentSubid)
 {
     smgrDoPendingDeletes(false);
+
+    /* Remove invalidated WAL skip in this subtransaction */
+    smgrProcessWALSkipInval(false, mySubid, parentSubid);
 }
 
 void
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 654179297c..8908b77d98 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -11983,8 +11983,7 @@ index_copy_data(Relation rel, RelFileNode newrnode)
     RelationCreateStorage(newrnode, rel->rd_rel->relpersistence);
 
     /* copy main fork */
-    RelationCopyStorage(rel->rd_smgr, dstrel, MAIN_FORKNUM,
-                        rel->rd_rel->relpersistence);
+    RelationCopyStorage(rel, dstrel, MAIN_FORKNUM);
 
     /* copy those extra forks that exist */
     for (ForkNumber forkNum = MAIN_FORKNUM + 1;
@@ -12002,8 +12001,7 @@ index_copy_data(Relation rel, RelFileNode newrnode)
                 (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED &&
                  forkNum == INIT_FORKNUM))
                 log_smgrcreate(&newrnode, forkNum);
-            RelationCopyStorage(rel->rd_smgr, dstrel, forkNum,
-                                rel->rd_rel->relpersistence);
+            RelationCopyStorage(rel, dstrel, forkNum);
         }
     }
 
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 273e2f385f..f00826712a 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -451,6 +451,7 @@ static BufferDesc *BufferAlloc(SMgrRelation smgr,
             BufferAccessStrategy strategy,
             bool *foundPtr);
 static void FlushBuffer(BufferDesc *buf, SMgrRelation reln);
+static void FlushRelationBuffers_common(SMgrRelation smgr, bool islocal);
 static void AtProcExit_Buffers(int code, Datum arg);
 static void CheckForBufferLeaks(void);
 static int    rnode_comparator(const void *p1, const void *p2);
@@ -3153,20 +3154,40 @@ PrintPinnedBufs(void)
 void
 FlushRelationBuffers(Relation rel)
 {
-    int            i;
-    BufferDesc *bufHdr;
-
     /* Open rel at the smgr level if not already done */
     RelationOpenSmgr(rel);
 
-    if (RelationUsesLocalBuffers(rel))
+    FlushRelationBuffers_common(rel->rd_smgr, RelationUsesLocalBuffers(rel));
+}
+
+/*
+ * Like FlushRelationBuffers(), but the relation is specified by RelFileNode
+ */
+void
+FlushRelationBuffersWithoutRelCache(RelFileNode rnode, bool islocal)
+{
+    FlushRelationBuffers_common(smgropen(rnode, InvalidBackendId), islocal);
+}
+
+/*
+ * Code shared between functions FlushRelationBuffers() and
+ * FlushRelationBuffersWithoutRelCache().
+ */
+static void
+FlushRelationBuffers_common(SMgrRelation smgr, bool islocal)
+{
+    RelFileNode rnode = smgr->smgr_rnode.node;
+    int            i;
+    BufferDesc *bufHdr;
+
+    if (islocal)
     {
         for (i = 0; i < NLocBuffer; i++)
         {
             uint32        buf_state;
 
             bufHdr = GetLocalBufferDescriptor(i);
-            if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+            if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
                 ((buf_state = pg_atomic_read_u32(&bufHdr->state)) &
                  (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
             {
@@ -3183,7 +3204,7 @@ FlushRelationBuffers(Relation rel)
 
                 PageSetChecksumInplace(localpage, bufHdr->tag.blockNum);
 
-                smgrwrite(rel->rd_smgr,
+                smgrwrite(smgr,
                           bufHdr->tag.forkNum,
                           bufHdr->tag.blockNum,
                           localpage,
@@ -3213,18 +3234,18 @@ FlushRelationBuffers(Relation rel)
          * As in DropRelFileNodeBuffers, an unlocked precheck should be safe
          * and saves some cycles.
          */
-        if (!RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node))
+        if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode))
             continue;
 
         ReservePrivateRefCountEntry();
 
         buf_state = LockBufHdr(bufHdr);
-        if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) &&
+        if (RelFileNodeEquals(bufHdr->tag.rnode, rnode) &&
             (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY))
         {
             PinBuffer_Locked(bufHdr);
             LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED);
-            FlushBuffer(bufHdr, rel->rd_smgr);
+            FlushBuffer(bufHdr, smgr);
             LWLockRelease(BufferDescriptorGetContentLock(bufHdr));
             UnpinBuffer(bufHdr, true);
         }
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 64f3c2e887..f06d55a8fe 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -75,6 +75,7 @@
 #include "partitioning/partdesc.h"
 #include "rewrite/rewriteDefine.h"
 #include "rewrite/rowsecurity.h"
+#include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 #include "utils/array.h"
@@ -5644,6 +5645,8 @@ load_relcache_init_file(bool shared)
         rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
         rel->rd_amcache = NULL;
         MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
+        rel->rd_nowalskip = false;
+        rel->rd_walskip = NULL;
 
         /*
          * Recompute lock and physical addressing info.  This is needed in
diff --git a/src/include/catalog/storage.h b/src/include/catalog/storage.h
index 882dc65c89..83fee7dbfe 100644
--- a/src/include/catalog/storage.h
+++ b/src/include/catalog/storage.h
@@ -23,8 +23,14 @@ extern void RelationCreateStorage(RelFileNode rnode, char relpersistence);
 extern void RelationDropStorage(Relation rel);
 extern void RelationPreserveStorage(RelFileNode rnode, bool atCommit);
 extern void RelationTruncate(Relation rel, BlockNumber nblocks);
-extern void RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
-                                ForkNumber forkNum, char relpersistence);
+extern void RelationCopyStorage(Relation srcrel, SMgrRelation dst,
+                                ForkNumber forkNum);
+extern bool BufferNeedsWAL(Relation rel, Buffer buf);
+extern bool BlockNeedsWAL(Relation rel, BlockNumber blkno);
+extern void RecordWALSkipping(Relation rel);
+extern void RecordPendingSync(Relation rel, SMgrRelation srel,
+                              ForkNumber forknum);
+extern void RelationInvalidateWALSkip(Relation rel);
 
 /*
  * These functions used to be in storage/smgr/smgr.c, which explains the
@@ -32,8 +38,11 @@ extern void RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
  */
 extern void smgrDoPendingDeletes(bool isCommit);
 extern int    smgrGetPendingDeletes(bool forCommit, RelFileNode **ptr);
-extern void AtSubCommit_smgr(void);
-extern void AtSubAbort_smgr(void);
+extern void smgrFinishBulkInsert(bool isCommit);
+extern void AtSubCommit_smgr(SubTransactionId mySubid,
+                             SubTransactionId parentSubid);
+extern void AtSubAbort_smgr(SubTransactionId mySubid,
+                             SubTransactionId parentSubid);
 extern void PostPrepare_smgr(void);
 
 #endif                            /* STORAGE_H */
diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h
index c5826f691d..8a9ea041dd 100644
--- a/src/include/storage/bufmgr.h
+++ b/src/include/storage/bufmgr.h
@@ -189,6 +189,8 @@ extern BlockNumber RelationGetNumberOfBlocksInFork(Relation relation,
                                 ForkNumber forkNum);
 extern void FlushOneBuffer(Buffer buffer);
 extern void FlushRelationBuffers(Relation rel);
+extern void FlushRelationBuffersWithoutRelCache(RelFileNode rnode,
+                                    bool islocal);
 extern void FlushDatabaseBuffers(Oid dbid);
 extern void DropRelFileNodeBuffers(RelFileNodeBackend rnode,
                        ForkNumber forkNum, BlockNumber firstDelBlock);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 54028515a7..b2b46322b2 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -198,6 +198,13 @@ typedef struct RelationData
 
     /* use "struct" here to avoid needing to include pgstat.h: */
     struct PgStat_TableStatus *pgstat_info; /* statistics collection area */
+
+    /*
+     * rd_nowalskip is true if this relation is known not to skip WAL.
+     * Otherwise we need to ask smgr for an entry if rd_walskip is NULL.
+     */
+    bool                rd_nowalskip;
+    struct RelWalSkip   *rd_walskip;
 } RelationData;
 
 
-- 
2.16.3

From 3e816b09365dc8d388832460820a3ee2ca58dc5b Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 2 Apr 2019 13:29:23 +0900
Subject: [PATCH 6/7] Fix WAL skipping feature.

This patch replaces WAL-skipping means from HEAP_INSERT_SKIP_WAL to
the new infrastructure.
---
 src/backend/access/heap/heapam.c         | 114 +++++++++++++++++++++++--------
 src/backend/access/heap/heapam_handler.c |  88 ++++++++++++++++++------
 src/backend/access/heap/pruneheap.c      |   3 +-
 src/backend/access/heap/rewriteheap.c    |  28 ++------
 src/backend/access/heap/vacuumlazy.c     |   6 +-
 src/backend/access/heap/visibilitymap.c  |   3 +-
 src/backend/commands/cluster.c           |  27 ++++++++
 src/backend/commands/copy.c              |  15 +++-
 src/backend/commands/createas.c          |   7 +-
 src/backend/commands/matview.c           |   7 +-
 src/backend/commands/tablecmds.c         |   8 ++-
 src/include/access/rewriteheap.h         |   2 +-
 12 files changed, 219 insertions(+), 89 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 267570b461..cc516e599d 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -28,6 +28,27 @@
  *      the POSTGRES heap access method used for all POSTGRES
  *      relations.
  *
+ * WAL CONSIDERATIONS
+ *      All heap operations are normally WAL-logged. but there are a few
+ *      exceptions. Temporary and unlogged relations never need to be
+ *      WAL-logged, but we can also skip WAL-logging for a table that was
+ *      created in the same transaction, if we don't need WAL for PITR or WAL
+ *      archival purposes (i.e. if wal_level=minimal), and we fsync() the file
+ *      to disk at COMMIT instead.
+ *
+ *      The same-relation optimization is not employed automatically on all
+ *      updates to a table that was created in the same transaction, because for
+ *      a small number of changes, it's cheaper to just create the WAL records
+ *      than fsync()ing the whole relation at COMMIT. It is only worthwhile for
+ *      (presumably) large operations like COPY, CLUSTER, or VACUUM FULL. Use
+ *      table_relation_register_sync() to initiate such an operation; it will
+ *      cause any subsequent updates to the table to skip WAL-logging, if
+ *      possible, and cause the heap to be synced to disk at COMMIT.
+ *
+ *      To make that work, all modifications to heap must use
+ *      BufferNeedsWAL() to check if WAL-logging is needed in this transaction
+ *      for the given block.
+ *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -51,6 +72,7 @@
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "port/atomics.h"
@@ -1948,7 +1970,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
     MarkBufferDirty(buffer);
 
     /* XLOG stuff */
-    if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))
+    if (BufferNeedsWAL(relation, buffer))
     {
         XLogRecPtr    recptr;
 
@@ -2058,7 +2080,6 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
     int            ndone;
     PGAlignedBlock scratch;
     Page        page;
-    bool        needwal;
     Size        saveFreeSpace;
     bool        need_tuple_data = RelationIsLogicallyLogged(relation);
     bool        need_cids = RelationIsAccessibleInLogicalDecoding(relation);
@@ -2066,7 +2087,6 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
     /* currently not needed (thus unsupported) for heap_multi_insert() */
     AssertArg(!(options & HEAP_INSERT_NO_LOGICAL));
 
-    needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
     saveFreeSpace = RelationGetTargetPageFreeSpace(relation,
                                                    HEAP_DEFAULT_FILLFACTOR);
 
@@ -2108,6 +2128,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
         Buffer        vmbuffer = InvalidBuffer;
         bool        all_visible_cleared = false;
         int            nthispage;
+        bool        needwal;
 
         CHECK_FOR_INTERRUPTS();
 
@@ -2119,6 +2140,7 @@ heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
                                            InvalidBuffer, options, bistate,
                                            &vmbuffer, NULL);
         page = BufferGetPage(buffer);
+        needwal = BufferNeedsWAL(relation, buffer);
 
         /* NO EREPORT(ERROR) from here till changes are logged */
         START_CRIT_SECTION();
@@ -2671,7 +2693,7 @@ l1:
      * NB: heap_abort_speculative() uses the same xlog record and replay
      * routines.
      */
-    if (RelationNeedsWAL(relation))
+    if (BufferNeedsWAL(relation, buffer))
     {
         XLogRecPtr    recptr;
 
@@ -2805,6 +2827,8 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
                 vmbuffer = InvalidBuffer,
                 vmbuffer_new = InvalidBuffer;
     bool        need_toast;
+    bool        oldbuf_needs_wal,
+                newbuf_needs_wal;
     Size        newtupsize,
                 pagefree;
     bool        have_tuple_lock = false;
@@ -3356,7 +3380,7 @@ l2:
 
         MarkBufferDirty(buffer);
 
-        if (RelationNeedsWAL(relation))
+        if (BufferNeedsWAL(relation, buffer))
         {
             xl_heap_lock xlrec;
             XLogRecPtr    recptr;
@@ -3570,26 +3594,55 @@ l2:
         MarkBufferDirty(newbuf);
     MarkBufferDirty(buffer);
 
-    /* XLOG stuff */
-    if (RelationNeedsWAL(relation))
+    /*
+     *  XLOG stuff
+     *
+     * Emit heap-update log. When wal_level = minimal, we may emit insert or
+     * delete record according to wal-optimization.
+     */
+    oldbuf_needs_wal = BufferNeedsWAL(relation, buffer);
+
+    if (newbuf == buffer)
+        newbuf_needs_wal = oldbuf_needs_wal;
+    else
+        newbuf_needs_wal = BufferNeedsWAL(relation, newbuf);
+
+    if (oldbuf_needs_wal || newbuf_needs_wal)
     {
         XLogRecPtr    recptr;
 
         /*
          * For logical decoding we need combocids to properly decode the
-         * catalog.
+         * catalog. Both oldbuf_needs_wal and newbuf_needs_wal must be true
+         * when logical decoding is active.
          */
         if (RelationIsAccessibleInLogicalDecoding(relation))
         {
+            Assert(oldbuf_needs_wal && newbuf_needs_wal);
+
             log_heap_new_cid(relation, &oldtup);
             log_heap_new_cid(relation, heaptup);
         }
 
-        recptr = log_heap_update(relation, buffer,
-                                 newbuf, &oldtup, heaptup,
-                                 old_key_tuple,
-                                 all_visible_cleared,
-                                 all_visible_cleared_new);
+        /*
+         * Insert log record. Using delete or insert log loses HOT chain
+         * information but that happens only when newbuf is different from
+         * buffer, where HOT cannot happen.
+         */
+        if (oldbuf_needs_wal && newbuf_needs_wal)
+            recptr = log_heap_update(relation, buffer, newbuf,
+                                     &oldtup, heaptup,
+                                     old_key_tuple,
+                                     all_visible_cleared,
+                                     all_visible_cleared_new);
+        else if (oldbuf_needs_wal)
+            recptr = log_heap_delete(relation, buffer, &oldtup, old_key_tuple,
+                                     xmax_old_tuple, false,
+                                     all_visible_cleared);
+        else
+            recptr = log_heap_insert(relation, buffer, newtup,
+                                     0, all_visible_cleared_new);
+
         if (newbuf != buffer)
         {
             PageSetLSN(BufferGetPage(newbuf), recptr);
@@ -4467,7 +4520,7 @@ failed:
      * (Also, in a PITR log-shipping or 2PC environment, we have to have XLOG
      * entries for everything anyway.)
      */
-    if (RelationNeedsWAL(relation))
+    if (BufferNeedsWAL(relation, *buffer))
     {
         xl_heap_lock xlrec;
         XLogRecPtr    recptr;
@@ -5219,7 +5272,7 @@ l4:
         MarkBufferDirty(buf);
 
         /* XLOG stuff */
-        if (RelationNeedsWAL(rel))
+        if (BufferNeedsWAL(rel, buf))
         {
             xl_heap_lock_updated xlrec;
             XLogRecPtr    recptr;
@@ -5379,7 +5432,7 @@ heap_finish_speculative(Relation relation, ItemPointer tid)
     htup->t_ctid = *tid;
 
     /* XLOG stuff */
-    if (RelationNeedsWAL(relation))
+    if (BufferNeedsWAL(relation, buffer))
     {
         xl_heap_confirm xlrec;
         XLogRecPtr    recptr;
@@ -5511,7 +5564,7 @@ heap_abort_speculative(Relation relation, ItemPointer tid)
      * The WAL records generated here match heap_delete().  The same recovery
      * routines are used.
      */
-    if (RelationNeedsWAL(relation))
+    if (BufferNeedsWAL(relation, buffer))
     {
         xl_heap_delete xlrec;
         XLogRecPtr    recptr;
@@ -5620,7 +5673,7 @@ heap_inplace_update(Relation relation, HeapTuple tuple)
     MarkBufferDirty(buffer);
 
     /* XLOG stuff */
-    if (RelationNeedsWAL(relation))
+    if (BufferNeedsWAL(relation, buffer))
     {
         xl_heap_inplace xlrec;
         XLogRecPtr    recptr;
@@ -7030,8 +7083,8 @@ log_heap_clean(Relation reln, Buffer buffer,
     xl_heap_clean xlrec;
     XLogRecPtr    recptr;
 
-    /* Caller should not call me on a non-WAL-logged relation */
-    Assert(RelationNeedsWAL(reln));
+    /* Caller should not call me on non-WAL-logged buffers */
+    Assert(BufferNeedsWAL(reln, buffer));
 
     xlrec.latestRemovedXid = latestRemovedXid;
     xlrec.nredirected = nredirected;
@@ -7078,8 +7131,8 @@ log_heap_freeze(Relation reln, Buffer buffer, TransactionId cutoff_xid,
     xl_heap_freeze_page xlrec;
     XLogRecPtr    recptr;
 
-    /* Caller should not call me on a non-WAL-logged relation */
-    Assert(RelationNeedsWAL(reln));
+    /* Caller should not call me on non-WAL-logged buffers */
+    Assert(BufferNeedsWAL(reln, buffer));
     /* nor when there are no tuples to freeze */
     Assert(ntuples > 0);
 
@@ -7305,8 +7358,8 @@ log_heap_update(Relation reln, Buffer oldbuf,
     bool        init;
     int            bufflags;
 
-    /* Caller should not call me on a non-WAL-logged relation */
-    Assert(RelationNeedsWAL(reln));
+    /* Caller should not call me when no buffer needs WAL-logging */
+    Assert(BufferNeedsWAL(reln, newbuf) || BufferNeedsWAL(reln, oldbuf));
 
     XLogBeginInsert();
 
@@ -8910,9 +8963,16 @@ heap2_redo(XLogReaderState *record)
  *    heap_sync        - sync a heap, for use when no WAL has been written
  *
  * This forces the heap contents (including TOAST heap if any) down to disk.
- * If we skipped using WAL, and WAL is otherwise needed, we must force the
- * relation down to disk before it's safe to commit the transaction.  This
- * requires writing out any dirty buffers and then doing a forced fsync.
+ * If we did any changes to the heap bypassing the buffer manager, we must
+ * force the relation down to disk before it's safe to commit the
+ * transaction, because the direct modifications will not be flushed by
+ * the next checkpoint.
+ *
+ * We used to also use this after batch operations like COPY and CLUSTER,
+ * if we skipped using WAL and WAL is otherwise needed, but there were
+ * corner-cases involving other WAL-logged operations to the same
+ * relation, where that was not enough. table_relation_register_sync() should
+ * be used for that purpose instead.
  *
  * Indexes are not touched.  (Currently, index operations associated with
  * the commands that use this are WAL-logged and so do not need fsync.
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 5c96fc91b7..bddf026b81 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -57,6 +57,9 @@ static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer,
                        HeapTuple tuple,
                        OffsetNumber tupoffset);
 
+static void heapam_relation_register_walskip(Relation rel);
+static void heapam_relation_invalidate_walskip(Relation rel);
+
 static const TableAmRoutine heapam_methods;
 
 
@@ -541,14 +544,10 @@ tuple_lock_retry:
 }
 
 static void
-heapam_finish_bulk_insert(Relation relation, int options)
+heapam_finish_bulk_insert(RelFileNode rnode, ForkNumber forkNum)
 {
-    /*
-     * If we skipped writing WAL, then we need to sync the heap (but not
-     * indexes since those use WAL anyway / don't go through tableam)
-     */
-    if (options & HEAP_INSERT_SKIP_WAL)
-        heap_sync(relation);
+    /* Sync the file immedately */
+    smgrimmedsync(smgropen(rnode, InvalidBackendId), forkNum);
 }
 
 
@@ -616,6 +615,12 @@ heapam_relation_copy_data(Relation rel, RelFileNode newrnode)
     dstrel = smgropen(newrnode, rel->rd_backend);
     RelationOpenSmgr(rel);
 
+    /*
+     * Register WAL-skipping for the relation. WAL-logging is skipped and sync
+     * the file at commit if the AM supports the feature.
+     */
+    table_relation_register_walskip(rel);
+
     /*
      * Create and copy all forks of the relation, and schedule unlinking of
      * old physical files.
@@ -626,8 +631,7 @@ heapam_relation_copy_data(Relation rel, RelFileNode newrnode)
     RelationCreateStorage(newrnode, rel->rd_rel->relpersistence);
 
     /* copy main fork */
-    RelationCopyStorage(rel->rd_smgr, dstrel, MAIN_FORKNUM,
-                        rel->rd_rel->relpersistence);
+    RelationCopyStorage(rel, dstrel, MAIN_FORKNUM);
 
     /* copy those extra forks that exist */
     for (ForkNumber forkNum = MAIN_FORKNUM + 1;
@@ -645,8 +649,7 @@ heapam_relation_copy_data(Relation rel, RelFileNode newrnode)
                 (rel->rd_rel->relpersistence == RELPERSISTENCE_UNLOGGED &&
                  forkNum == INIT_FORKNUM))
                 log_smgrcreate(&newrnode, forkNum);
-            RelationCopyStorage(rel->rd_smgr, dstrel, forkNum,
-                                rel->rd_rel->relpersistence);
+            RelationCopyStorage(rel, dstrel, forkNum);
         }
     }
 
@@ -670,7 +673,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
     IndexScanDesc indexScan;
     TableScanDesc tableScan;
     HeapScanDesc heapScan;
-    bool        use_wal;
     bool        is_system_catalog;
     Tuplesortstate *tuplesort;
     TupleDesc    oldTupDesc = RelationGetDescr(OldHeap);
@@ -684,15 +686,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
     /* Remember if it's a system catalog */
     is_system_catalog = IsSystemRelation(OldHeap);
 
-    /*
-     * We need to log the copied data in WAL iff WAL archiving/streaming is
-     * enabled AND it's a WAL-logged rel.
-     */
-    use_wal = XLogIsNeeded() && RelationNeedsWAL(NewHeap);
-
-    /* use_wal off requires smgr_targblock be initially invalid */
-    Assert(RelationGetTargetBlock(NewHeap) == InvalidBlockNumber);
-
     /* Preallocate values/isnull arrays */
     natts = newTupDesc->natts;
     values = (Datum *) palloc(natts * sizeof(Datum));
@@ -700,7 +693,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
 
     /* Initialize the rewrite operation */
     rwstate = begin_heap_rewrite(OldHeap, NewHeap, OldestXmin, FreezeXid,
-                                 MultiXactCutoff, use_wal);
+                                 MultiXactCutoff);
 
 
     /* Set up sorting if wanted */
@@ -946,6 +939,55 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
     pfree(isnull);
 }
 
+/*
+ *    heapam_relation_register_walskip - register a heap to be WAL-skipped then
+ *                                       synced to disk at commit
+ *
+ * This can be used to skip WAL-logging changes on a relation file. This makes
+ * note of the current size of the relation, and ensures that when the
+ * relation is extended, any changes to the new blocks in the heap, in the
+ * same transaction, will not be WAL-logged. Instead, the heap contents are
+ * flushed to disk at commit.
+ *
+ * This does the same for the TOAST heap, if any. Indexes are not affected.
+ */
+static void
+heapam_relation_register_walskip(Relation rel)
+{
+    /* non-WAL-logged tables never need fsync */
+    if (!RelationNeedsWAL(rel))
+        return;
+
+    RecordWALSkipping(rel);
+    if (OidIsValid(rel->rd_rel->reltoastrelid))
+    {
+        Relation    toastrel;
+
+        toastrel = heap_open(rel->rd_rel->reltoastrelid, AccessShareLock);
+        RecordWALSkipping(toastrel);
+        heap_close(toastrel, AccessShareLock);
+    }
+
+    return;
+}
+
+/*
+ *    heapam_relation_invalidate_walskip    - invalidate registered WAL skipping
+ *
+ *  After some file-replacing operations like CLUSTER, the old file no longe
+ *  needs to be synced to disk. This function invalidates the registered
+ *  WAL-skipping on the current relfilenode of the relation.
+ */
+static void
+heapam_relation_invalidate_walskip(Relation rel)
+{
+    /* non-WAL-logged tables never need fsync */
+    if (!RelationNeedsWAL(rel))
+        return;
+
+    RelationInvalidateWALSkip(rel);
+}
+
 static bool
 heapam_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
                                BufferAccessStrategy bstrategy)
@@ -2423,6 +2465,8 @@ static const TableAmRoutine heapam_methods = {
     .relation_nontransactional_truncate = heapam_relation_nontransactional_truncate,
     .relation_copy_data = heapam_relation_copy_data,
     .relation_copy_for_cluster = heapam_relation_copy_for_cluster,
+    .relation_register_walskip = heapam_relation_register_walskip,
+    .relation_invalidate_walskip = heapam_relation_invalidate_walskip,
     .relation_vacuum = heap_vacuum_rel,
     .scan_analyze_next_block = heapam_scan_analyze_next_block,
     .scan_analyze_next_tuple = heapam_scan_analyze_next_tuple,
diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c
index a3e51922d8..a05659b168 100644
--- a/src/backend/access/heap/pruneheap.c
+++ b/src/backend/access/heap/pruneheap.c
@@ -20,6 +20,7 @@
 #include "access/htup_details.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
@@ -258,7 +259,7 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
         /*
          * Emit a WAL HEAP_CLEAN record showing what we did
          */
-        if (RelationNeedsWAL(relation))
+        if (BufferNeedsWAL(relation, buffer))
         {
             XLogRecPtr    recptr;
 
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index bce4274362..494f7fcd41 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -116,6 +116,7 @@
 #include "access/xloginsert.h"
 
 #include "catalog/catalog.h"
+#include "catalog/storage.h"
 
 #include "lib/ilist.h"
 
@@ -144,7 +145,6 @@ typedef struct RewriteStateData
     Page        rs_buffer;        /* page currently being built */
     BlockNumber rs_blockno;        /* block where page will go */
     bool        rs_buffer_valid;    /* T if any tuples in buffer */
-    bool        rs_use_wal;        /* must we WAL-log inserts? */
     bool        rs_logical_rewrite; /* do we need to do logical rewriting */
     TransactionId rs_oldest_xmin;    /* oldest xmin used by caller to determine
                                      * tuple visibility */
@@ -238,15 +238,13 @@ static void logical_end_heap_rewrite(RewriteState state);
  * oldest_xmin    xid used by the caller to determine which tuples are dead
  * freeze_xid    xid before which tuples will be frozen
  * min_multi    multixact before which multis will be removed
- * use_wal        should the inserts to the new heap be WAL-logged?
  *
  * Returns an opaque RewriteState, allocated in current memory context,
  * to be used in subsequent calls to the other functions.
  */
 RewriteState
 begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xmin,
-                   TransactionId freeze_xid, MultiXactId cutoff_multi,
-                   bool use_wal)
+                   TransactionId freeze_xid, MultiXactId cutoff_multi)
 {
     RewriteState state;
     MemoryContext rw_cxt;
@@ -271,7 +269,6 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
     /* new_heap needn't be empty, just locked */
     state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
     state->rs_buffer_valid = false;
-    state->rs_use_wal = use_wal;
     state->rs_oldest_xmin = oldest_xmin;
     state->rs_freeze_xid = freeze_xid;
     state->rs_cutoff_multi = cutoff_multi;
@@ -330,7 +327,7 @@ end_heap_rewrite(RewriteState state)
     /* Write the last page, if any */
     if (state->rs_buffer_valid)
     {
-        if (state->rs_use_wal)
+        if (BlockNeedsWAL(state->rs_new_rel, state->rs_blockno))
             log_newpage(&state->rs_new_rel->rd_node,
                         MAIN_FORKNUM,
                         state->rs_blockno,
@@ -344,19 +341,7 @@ end_heap_rewrite(RewriteState state)
                    (char *) state->rs_buffer, true);
     }
 
-    /*
-     * If the rel is WAL-logged, must fsync before commit.  We use heap_sync
-     * to ensure that the toast table gets fsync'd too.
-     *
-     * It's obvious that we must do this when not WAL-logging. It's less
-     * obvious that we have to do it even if we did WAL-log the pages. The
-     * reason is the same as in tablecmds.c's copy_relation_data(): we're
-     * writing data that's not in shared buffers, and so a CHECKPOINT
-     * occurring during the rewriteheap operation won't have fsync'd data we
-     * wrote before the checkpoint.
-     */
-    if (RelationNeedsWAL(state->rs_new_rel))
-        heap_sync(state->rs_new_rel);
+    /* If we skipped using WAL, we will sync the relation at commit */
 
     logical_end_heap_rewrite(state);
 
@@ -654,9 +639,6 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
     {
         int options = HEAP_INSERT_SKIP_FSM;
 
-        if (!state->rs_use_wal)
-            options |= HEAP_INSERT_SKIP_WAL;
-
         /*
          * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data
          * for the TOAST table are not logically decoded.  The main heap is
@@ -695,7 +677,7 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
             /* Doesn't fit, so write out the existing page */
 
             /* XLOG stuff */
-            if (state->rs_use_wal)
+            if (BlockNeedsWAL(state->rs_new_rel, state->rs_blockno))
                 log_newpage(&state->rs_new_rel->rd_node,
                             MAIN_FORKNUM,
                             state->rs_blockno,
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index b5b464e4a9..45139ec70e 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -945,7 +945,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
                  * page has been previously WAL-logged, and if not, do that
                  * now.
                  */
-                if (RelationNeedsWAL(onerel) &&
+                if (BufferNeedsWAL(onerel, buf) &&
                     PageGetLSN(page) == InvalidXLogRecPtr)
                     log_newpage_buffer(buf, true);
 
@@ -1209,7 +1209,7 @@ lazy_scan_heap(Relation onerel, int options, LVRelStats *vacrelstats,
             }
 
             /* Now WAL-log freezing if necessary */
-            if (RelationNeedsWAL(onerel))
+            if (BufferNeedsWAL(onerel, buf))
             {
                 XLogRecPtr    recptr;
 
@@ -1591,7 +1591,7 @@ lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
     MarkBufferDirty(buffer);
 
     /* XLOG stuff */
-    if (RelationNeedsWAL(onerel))
+    if (BufferNeedsWAL(onerel, buffer))
     {
         XLogRecPtr    recptr;
 
diff --git a/src/backend/access/heap/visibilitymap.c b/src/backend/access/heap/visibilitymap.c
index 64dfe06b26..1f5f7d92dd 100644
--- a/src/backend/access/heap/visibilitymap.c
+++ b/src/backend/access/heap/visibilitymap.c
@@ -88,6 +88,7 @@
 #include "access/heapam_xlog.h"
 #include "access/visibilitymap.h"
 #include "access/xlog.h"
+#include "catalog/storage.h"
 #include "miscadmin.h"
 #include "port/pg_bitutils.h"
 #include "storage/bufmgr.h"
@@ -276,7 +277,7 @@ visibilitymap_set(Relation rel, BlockNumber heapBlk, Buffer heapBuf,
         map[mapByte] |= (flags << mapOffset);
         MarkBufferDirty(vmBuf);
 
-        if (RelationNeedsWAL(rel))
+        if (BufferNeedsWAL(rel, heapBuf))
         {
             if (XLogRecPtrIsInvalid(recptr))
             {
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 4f4be1efbf..b5db26fda5 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -612,6 +612,18 @@ rebuild_relation(Relation OldHeap, Oid indexOid, bool verbose)
                                relpersistence,
                                AccessExclusiveLock);
 
+    /*
+     * If wal_level is minimal, we skip WAL-logging even for WAL-logging
+     * relations. The filenode is synced at commit.
+     */
+    if (!XLogIsNeeded())
+    {
+        /* make_new_heap doesn't lock OIDNewHeap */
+        Relation newheap = table_open(OIDNewHeap, AccessShareLock);
+        table_relation_register_walskip(newheap);
+        table_close(newheap, AccessShareLock);
+    }
+
     /* Copy the heap data into the new table in the desired order */
     copy_table_data(OIDNewHeap, tableOid, indexOid, verbose,
                    &swap_toast_by_content, &frozenXid, &cutoffMulti);
@@ -1355,6 +1367,21 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
     /* Zero out possible results from swapped_relation_files */
     memset(mapped_tables, 0, sizeof(mapped_tables));
 
+    /*
+     * Unregister useless pending file-sync. table_relation_unregister_sync
+     * relies on a premise that relation cache has the correct relfilenode and
+     * related members. After swap_relation_files, the relcache entry for the
+     * heaps gets inconsistent with pg_class entry so we should do this before
+     * the call.
+     */
+    if (!XLogIsNeeded())
+    {
+        Relation oldheap = table_open(OIDOldHeap, AccessShareLock);
+
+        table_relation_invalidate_walskip(oldheap);
+        table_close(oldheap, AccessShareLock);
+    }
+
     /*
      * Swap the contents of the heap relations (including any toast tables).
      * Also set old heap's relfrozenxid to frozenXid.
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index c1fd7b78ce..6a85ab890e 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2437,9 +2437,13 @@ CopyFrom(CopyState cstate)
         (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
          cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId))
     {
-        ti_options |= TABLE_INSERT_SKIP_FSM;
+        /*
+         * We can skip WAL-logging the insertions, unless PITR or streaming
+         * replication is in use. We can skip the FSM in any case.
+         */
         if (!XLogIsNeeded())
-            ti_options |= TABLE_INSERT_SKIP_WAL;
+            table_relation_register_walskip(cstate->rel);
+        ti_options |= TABLE_INSERT_SKIP_FSM;
     }
 
     /*
@@ -3106,7 +3110,12 @@ CopyFrom(CopyState cstate)
 
     FreeExecutorState(estate);
 
-    table_finish_bulk_insert(cstate->rel, ti_options);
+    /*
+     * If we skipped writing WAL, then we will sync the heap at the end of
+     * the transaction. (We used to do it here, but it was later found out
+     * that to be safe, we must also avoid WAL-logging any subsequent
+     * actions on the pages we skipped WAL for). Indexes always use WAL.
+     */
 
     return processed;
 }
diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c
index 43c2fa9124..8b73654413 100644
--- a/src/backend/commands/createas.c
+++ b/src/backend/commands/createas.c
@@ -558,8 +558,9 @@ intorel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
      * We can skip WAL-logging the insertions, unless PITR or streaming
      * replication is in use. We can skip the FSM in any case.
      */
-    myState->ti_options = TABLE_INSERT_SKIP_FSM |
-        (XLogIsNeeded() ? 0 : TABLE_INSERT_SKIP_WAL);
+    if (!XLogIsNeeded())
+        table_relation_register_walskip(intoRelationDesc);
+    myState->ti_options = HEAP_INSERT_SKIP_FSM;
     myState->bistate = GetBulkInsertState();
 
     /* Not using WAL requires smgr_targblock be initially invalid */
@@ -604,7 +605,7 @@ intorel_shutdown(DestReceiver *self)
 
     FreeBulkInsertState(myState->bistate);
 
-    table_finish_bulk_insert(myState->rel, myState->ti_options);
+    /* If we skipped using WAL, we will sync the relation at commit */
 
     /* close rel, but keep lock until commit */
     table_close(myState->rel, NoLock);
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 2aac63296b..33b7bc4c16 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -462,9 +462,10 @@ transientrel_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
      * We can skip WAL-logging the insertions, unless PITR or streaming
      * replication is in use. We can skip the FSM in any case.
      */
-    myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
     if (!XLogIsNeeded())
-        myState->ti_options |= TABLE_INSERT_SKIP_WAL;
+        table_relation_register_walskip(transientrel);
+    myState->ti_options = TABLE_INSERT_SKIP_FSM | TABLE_INSERT_FROZEN;
+
     myState->bistate = GetBulkInsertState();
 
     /* Not using WAL requires smgr_targblock be initially invalid */
@@ -509,7 +510,7 @@ transientrel_shutdown(DestReceiver *self)
 
     FreeBulkInsertState(myState->bistate);
 
-    table_finish_bulk_insert(myState->transientrel, myState->ti_options);
+    /* If we skipped using WAL, we will sync the relation at commit */
 
     /* close transientrel, but keep lock until commit */
     table_close(myState->transientrel, NoLock);
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 8908b77d98..deb147c45a 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -4716,7 +4716,11 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
 
         ti_options = TABLE_INSERT_SKIP_FSM;
         if (!XLogIsNeeded())
-            ti_options |= TABLE_INSERT_SKIP_WAL;
+        {
+            /* Forget old relation's registerd sync */
+            table_relation_invalidate_walskip(oldrel);
+            table_relation_register_walskip(newrel);
+        }
     }
     else
     {
@@ -5000,7 +5004,7 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode)
     {
         FreeBulkInsertState(bistate);
 
-        table_finish_bulk_insert(newrel, ti_options);
+        /* If we skipped writing WAL, then it will be done at commit. */
 
         table_close(newrel, NoLock);
     }
diff --git a/src/include/access/rewriteheap.h b/src/include/access/rewriteheap.h
index 6006249d96..64efecf48b 100644
--- a/src/include/access/rewriteheap.h
+++ b/src/include/access/rewriteheap.h
@@ -23,7 +23,7 @@ typedef struct RewriteStateData *RewriteState;
 
 extern RewriteState begin_heap_rewrite(Relation OldHeap, Relation NewHeap,
                    TransactionId OldestXmin, TransactionId FreezeXid,
-                   MultiXactId MultiXactCutoff, bool use_wal);
+                   MultiXactId MultiXactCutoff);
 extern void end_heap_rewrite(RewriteState state);
 extern void rewrite_heap_tuple(RewriteState state, HeapTuple oldTuple,
                    HeapTuple newTuple);
-- 
2.16.3

From f4a0cc5382805500c3db3d4ec2231cee383841f3 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <horiguchi.kyotaro@lab.ntt.co.jp>
Date: Tue, 2 Apr 2019 13:31:33 +0900
Subject: [PATCH 7/7] Remove TABLE/HEAP_INSERT_SKIP_WAL

Remove no-longer-used symbol TABLE/HEAP_INSERT_SKIP_WAL.
---
 src/include/access/heapam.h  |  3 +--
 src/include/access/tableam.h | 11 +++--------
 2 files changed, 4 insertions(+), 10 deletions(-)

diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4c077755d5..5b084c2f5a 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -29,11 +29,10 @@
 
 
 /* "options" flag bits for heap_insert */
-#define HEAP_INSERT_SKIP_WAL    TABLE_INSERT_SKIP_WAL
 #define HEAP_INSERT_SKIP_FSM    TABLE_INSERT_SKIP_FSM
 #define HEAP_INSERT_FROZEN        TABLE_INSERT_FROZEN
 #define HEAP_INSERT_NO_LOGICAL    TABLE_INSERT_NO_LOGICAL
-#define HEAP_INSERT_SPECULATIVE 0x0010
+#define HEAP_INSERT_SPECULATIVE 0x0008
 
 typedef struct BulkInsertStateData *BulkInsertState;
 
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 1a3a3c6711..b5203dd485 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -100,10 +100,9 @@ typedef struct TM_FailureData
 } TM_FailureData;
 
 /* "options" flag bits for table_insert */
-#define TABLE_INSERT_SKIP_WAL        0x0001
-#define TABLE_INSERT_SKIP_FSM        0x0002
-#define TABLE_INSERT_FROZEN            0x0004
-#define TABLE_INSERT_NO_LOGICAL        0x0008
+#define TABLE_INSERT_SKIP_FSM        0x0001
+#define TABLE_INSERT_FROZEN            0x0002
+#define TABLE_INSERT_NO_LOGICAL        0x0004
 
 /* flag bits fortable_lock_tuple */
 /* Follow tuples whose update is in progress if lock modes don't conflict  */
@@ -1017,10 +1016,6 @@ table_compute_xid_horizon_for_tuples(Relation rel,
  * behaviour of the AM. Several options might be ignored by AMs not supporting
  * them.
  *
- * If the TABLE_INSERT_SKIP_WAL option is specified, the new tuple will not
- * necessarily logged to WAL, even for a non-temp relation. It is the AMs
- * choice whether this optimization is supported.
- *
  * If the TABLE_INSERT_SKIP_FSM option is specified, AMs are free to not reuse
  * free space in the relation. This can save some cycles when we know the
  * relation is new and doesn't contain useful amounts of free space.  It's
-- 
2.16.3


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Thomas Munro
Дата:
Сообщение: Re: Refactoring the checkpointer's fsync request queue
Следующее
От: David Steele
Дата:
Сообщение: Re: GCoS2019--pgBackRest port to Windows (2019)