Re: [BUGS] BUG #6748: sequence value may be conflict in some cases

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: [BUGS] BUG #6748: sequence value may be conflict in some cases
Дата
Msg-id 9025.1343179266@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: [BUGS] BUG #6748: sequence value may be conflict in some cases  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-hackers
I wrote:
> The tuple in buffers has log_cnt = 1, is_called = false, but the initial
> XLOG_SEQ_LOG record shows log_cnt = 0, is_called = true.  So if we crash
> at this point, after recovery it looks like one nextval() has already
> been done.  However, AlterSequence generates another XLOG_SEQ_LOG record
> based on what's in shared buffers, so after replay of that, we're back
> to the "original" state where it does not appear that any nextval() has
> been done.

> I'm of the opinion that this kluge needs to be removed; it's just insane
> that we're not logging the same state we leave in our buffers.

Attached is a WIP patch for that.  I had to leave nextval() logging
a different state from what it puts into buffers, since without that
we can't have the optimization of one-WAL-record-per-N-nextval-calls,
which seems worth preserving.  But I was able to get rid of the other
routines' doing it.  I think it's all right for nextval to do it because
the only difference between what it logs and what it leaves in the
buffer is that last_value is advanced and log_cnt is zero; in particular
there is not a difference in is_called state.

For awhile I was convinced that nextval() was broken in another way,
because it does MarkBufferDirty() before changing the buffer contents,
which sure *looks* wrong; what if a checkpoint writes the
not-yet-modified buffer contents and then clears the dirty bit?
However, I now think this is safe because we hold the buffer content
lock exclusively until we've finished making the changes.  A checkpoint
might observe the dirty bit set and attempt to write the page, but it
will block trying to get shared buffer content lock until we complete
the changes.  So that should be all right, and I've added some comments
to explain it, but if anyone thinks it's wrong speak up!

Another thing that I think is a real bug is that AlterSequence needs
to reset log_cnt to zero any time it changes any of the sequence
generation parameters, to ensure that the new values will be applied
promptly.  I've done that in the attached.

I also modified the code so that log_cnt is initialized to zero not one
when a sequence is created or reset to is_called = false.  This doesn't
make any functional difference given the change to force logging a WAL
record when is_called becomes set, but I think it's a good change
anyway: it doesn't make any sense anymore to imply that one sequence
value has been pre-reserved.  However, if anyone's particularly annoyed
by the first diff hunk in the regression tests, we could undo that.

Lastly, I modified read_seq_tuple (nee read_info) to return a HeapTuple
pointer to the sequence's tuple, and made use of that to make the rest
of the code simpler and less dependent on page-layout assumptions.
At this point this is only a code-beautification change; it was
functionally necessary in an earlier version of the patch, but I didn't
back it out when I got rid of the change that made it necessary.

Oh, there are also some hunks in here to add debug logging, which I was
using for testing.  I'm undecided whether to remove those or clean them
up some more and wrap them in something like #ifdef SEQUENCE_DEBUG.

One other point about the regression test diffs: formerly, the typical
state seen at line 191 of sequence.out was log_cnt = 32.  Now it is 31,
because the first nextval call increases log_cnt to 32 and then the
second decreases it to 31.  The failure mode described in
http://archives.postgresql.org/pgsql-hackers/2008-08/msg01359.php
is no longer possible, because a checkpoint before the first nextval
call won't change its behavior.  But instead, if a checkpoint happens
*between* those two nextvals, the second nextval will decide to create
a WAL entry and it will increase log_cnt to 32.  So we still need the
variant expected file sequence_1.out, but now it shows log_cnt = 32
instead of the other way around.

Comments anyone?

            regards, tom lane

diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 34b74f6c3844a3a172cd660357f8ccbc48f57f3c..69054b546cec1bb71ff79b18702a0e1edb3a45fb 100644
*** a/src/backend/commands/sequence.c
--- b/src/backend/commands/sequence.c
***************
*** 38,44 ****
  /*
   * We don't want to log each fetching of a value from a sequence,
   * so we pre-log a few fetches in advance. In the event of
!  * crash we can lose as much as we pre-logged.
   */
  #define SEQ_LOG_VALS    32

--- 38,44 ----
  /*
   * We don't want to log each fetching of a value from a sequence,
   * so we pre-log a few fetches in advance. In the event of
!  * crash we can lose (skip over) as many values as we pre-logged.
   */
  #define SEQ_LOG_VALS    32

*************** typedef struct SeqTableData
*** 73,79 ****
      int64        cached;            /* last value already cached for nextval */
      /* if last != cached, we have not used up all the cached values */
      int64        increment;        /* copy of sequence's increment field */
!     /* note that increment is zero until we first do read_info() */
  } SeqTableData;

  typedef SeqTableData *SeqTable;
--- 73,79 ----
      int64        cached;            /* last value already cached for nextval */
      /* if last != cached, we have not used up all the cached values */
      int64        increment;        /* copy of sequence's increment field */
!     /* note that increment is zero until we first do read_seq_tuple() */
  } SeqTableData;

  typedef SeqTableData *SeqTable;
*************** static void fill_seq_with_data(Relation
*** 90,96 ****
  static int64 nextval_internal(Oid relid);
  static Relation open_share_lock(SeqTable seq);
  static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
! static Form_pg_sequence read_info(SeqTable elm, Relation rel, Buffer *buf);
  static void init_params(List *options, bool isInit,
              Form_pg_sequence new, List **owned_by);
  static void do_setval(Oid relid, int64 next, bool iscalled);
--- 90,97 ----
  static int64 nextval_internal(Oid relid);
  static Relation open_share_lock(SeqTable seq);
  static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel);
! static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel,
!                Buffer *buf, HeapTuple seqtuple);
  static void init_params(List *options, bool isInit,
              Form_pg_sequence new, List **owned_by);
  static void do_setval(Oid relid, int64 next, bool iscalled);
*************** DefineSequence(CreateSeqStmt *seq)
*** 187,193 ****
              case SEQ_COL_LOG:
                  coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
                  coldef->colname = "log_cnt";
!                 value[i - 1] = Int64GetDatum((int64) 1);
                  break;
              case SEQ_COL_CYCLE:
                  coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
--- 188,194 ----
              case SEQ_COL_LOG:
                  coldef->typeName = makeTypeNameFromOid(INT8OID, -1);
                  coldef->colname = "log_cnt";
!                 value[i - 1] = Int64GetDatum((int64) 0);
                  break;
              case SEQ_COL_CYCLE:
                  coldef->typeName = makeTypeNameFromOid(BOOLOID, -1);
*************** ResetSequence(Oid seq_relid)
*** 247,256 ****
      SeqTable    elm;
      Form_pg_sequence seq;
      Buffer        buf;
!     Page        page;
      HeapTuple    tuple;
-     HeapTupleData tupledata;
-     ItemId        lp;

      /*
       * Read the old sequence.  This does a bit more work than really
--- 248,255 ----
      SeqTable    elm;
      Form_pg_sequence seq;
      Buffer        buf;
!     HeapTupleData seqtuple;
      HeapTuple    tuple;

      /*
       * Read the old sequence.  This does a bit more work than really
*************** ResetSequence(Oid seq_relid)
*** 258,275 ****
       * indeed a sequence.
       */
      init_sequence(seq_relid, &elm, &seq_rel);
!     read_info(elm, seq_rel, &buf);

      /*
       * Copy the existing sequence tuple.
       */
!     page = BufferGetPage(buf);
!     lp = PageGetItemId(page, FirstOffsetNumber);
!     Assert(ItemIdIsNormal(lp));
!
!     tupledata.t_data = (HeapTupleHeader) PageGetItem(page, lp);
!     tupledata.t_len = ItemIdGetLength(lp);
!     tuple = heap_copytuple(&tupledata);

      /* Now we're done with the old page */
      UnlockReleaseBuffer(buf);
--- 257,268 ----
       * indeed a sequence.
       */
      init_sequence(seq_relid, &elm, &seq_rel);
!     (void) read_seq_tuple(elm, seq_rel, &buf, &seqtuple);

      /*
       * Copy the existing sequence tuple.
       */
!     tuple = heap_copytuple(&seqtuple);

      /* Now we're done with the old page */
      UnlockReleaseBuffer(buf);
*************** ResetSequence(Oid seq_relid)
*** 281,287 ****
      seq = (Form_pg_sequence) GETSTRUCT(tuple);
      seq->last_value = seq->start_value;
      seq->is_called = false;
!     seq->log_cnt = 1;

      /*
       * Create a new storage file for the sequence.    We want to keep the
--- 274,280 ----
      seq = (Form_pg_sequence) GETSTRUCT(tuple);
      seq->last_value = seq->start_value;
      seq->is_called = false;
!     seq->log_cnt = 0;

      /*
       * Create a new storage file for the sequence.    We want to keep the
*************** fill_seq_with_data(Relation rel, HeapTup
*** 378,389 ****
          xl_seq_rec    xlrec;
          XLogRecPtr    recptr;
          XLogRecData rdata[2];
-         Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple);
-
-         /* We do not log first nextval call, so "advance" sequence here */
-         /* Note we are scribbling on local tuple, not the disk buffer */
-         newseq->is_called = true;
-         newseq->log_cnt = 0;

          xlrec.node = rel->rd_node;
          rdata[0].data = (char *) &xlrec;
--- 371,376 ----
*************** fill_seq_with_data(Relation rel, HeapTup
*** 398,403 ****
--- 385,398 ----

          recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);

+         {
+             Form_pg_sequence seq = (Form_pg_sequence) GETSTRUCT(tuple);
+
+             elog(LOG, "DefineSequence WAL entry: name %s, last_value %lld, log_cnt %lld, is_called %d",
+                  NameStr(seq->sequence_name),
+                  seq->last_value, seq->log_cnt, seq->is_called);
+         }
+
          PageSetLSN(page, recptr);
          PageSetTLI(page, ThisTimeLineID);
      }
*************** AlterSequence(AlterSeqStmt *stmt)
*** 419,425 ****
      SeqTable    elm;
      Relation    seqrel;
      Buffer        buf;
!     Page        page;
      Form_pg_sequence seq;
      FormData_pg_sequence new;
      List       *owned_by;
--- 414,420 ----
      SeqTable    elm;
      Relation    seqrel;
      Buffer        buf;
!     HeapTupleData seqtuple;
      Form_pg_sequence seq;
      FormData_pg_sequence new;
      List       *owned_by;
*************** AlterSequence(AlterSeqStmt *stmt)
*** 442,449 ****
                         stmt->sequence->relname);

      /* lock page' buffer and read tuple into new sequence structure */
!     seq = read_info(elm, seqrel, &buf);
!     page = BufferGetPage(buf);

      /* Copy old values of options into workspace */
      memcpy(&new, seq, sizeof(FormData_pg_sequence));
--- 437,443 ----
                         stmt->sequence->relname);

      /* lock page' buffer and read tuple into new sequence structure */
!     seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);

      /* Copy old values of options into workspace */
      memcpy(&new, seq, sizeof(FormData_pg_sequence));
*************** AlterSequence(AlterSeqStmt *stmt)
*** 456,465 ****
      elm->cached = elm->last;

      /* Now okay to update the on-disk tuple */
-     memcpy(seq, &new, sizeof(FormData_pg_sequence));
-
      START_CRIT_SECTION();

      MarkBufferDirty(buf);

      /* XLOG stuff */
--- 450,459 ----
      elm->cached = elm->last;

      /* Now okay to update the on-disk tuple */
      START_CRIT_SECTION();

+     memcpy(seq, &new, sizeof(FormData_pg_sequence));
+
      MarkBufferDirty(buf);

      /* XLOG stuff */
*************** AlterSequence(AlterSeqStmt *stmt)
*** 468,473 ****
--- 462,468 ----
          xl_seq_rec    xlrec;
          XLogRecPtr    recptr;
          XLogRecData rdata[2];
+         Page        page = BufferGetPage(buf);

          xlrec.node = seqrel->rd_node;
          rdata[0].data = (char *) &xlrec;
*************** AlterSequence(AlterSeqStmt *stmt)
*** 475,488 ****
          rdata[0].buffer = InvalidBuffer;
          rdata[0].next = &(rdata[1]);

!         rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
!         rdata[1].len = ((PageHeader) page)->pd_special -
!             ((PageHeader) page)->pd_upper;
          rdata[1].buffer = InvalidBuffer;
          rdata[1].next = NULL;

          recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);

          PageSetLSN(page, recptr);
          PageSetTLI(page, ThisTimeLineID);
      }
--- 470,486 ----
          rdata[0].buffer = InvalidBuffer;
          rdata[0].next = &(rdata[1]);

!         rdata[1].data = (char *) seqtuple.t_data;
!         rdata[1].len = seqtuple.t_len;
          rdata[1].buffer = InvalidBuffer;
          rdata[1].next = NULL;

          recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);

+         elog(LOG, "AlterSequence WAL entry: name %s, last_value %lld, log_cnt %lld, is_called %d",
+              NameStr(seq->sequence_name),
+              seq->last_value, seq->log_cnt, seq->is_called);
+
          PageSetLSN(page, recptr);
          PageSetTLI(page, ThisTimeLineID);
      }
*************** nextval_internal(Oid relid)
*** 541,546 ****
--- 539,545 ----
      Relation    seqrel;
      Buffer        buf;
      Page        page;
+     HeapTupleData seqtuple;
      Form_pg_sequence seq;
      int64        incby,
                  maxv,
*************** nextval_internal(Oid relid)
*** 579,585 ****
      }

      /* lock page' buffer and read tuple */
!     seq = read_info(elm, seqrel, &buf);
      page = BufferGetPage(buf);

      last = next = result = seq->last_value;
--- 578,584 ----
      }

      /* lock page' buffer and read tuple */
!     seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
      page = BufferGetPage(buf);

      last = next = result = seq->last_value;
*************** nextval_internal(Oid relid)
*** 591,599 ****

      if (!seq->is_called)
      {
!         rescnt++;                /* last_value if not called */
          fetch--;
-         log--;
      }

      /*
--- 590,597 ----

      if (!seq->is_called)
      {
!         rescnt++;                /* return last_value if not is_called */
          fetch--;
      }

      /*
*************** nextval_internal(Oid relid)
*** 606,612 ****
       * checkpoint would fail to advance the sequence past the logged values.
       * In this case we may as well fetch extra values.
       */
!     if (log < fetch)
      {
          /* forced log to satisfy local demand for values */
          fetch = log = fetch + SEQ_LOG_VALS;
--- 604,610 ----
       * checkpoint would fail to advance the sequence past the logged values.
       * In this case we may as well fetch extra values.
       */
!     if (log < fetch || !seq->is_called)
      {
          /* forced log to satisfy local demand for values */
          fetch = log = fetch + SEQ_LOG_VALS;
*************** nextval_internal(Oid relid)
*** 624,629 ****
--- 622,630 ----
          }
      }

+     elog(LOG, "nextval: fetch = %lld, log = %lld, logit = %d",
+          fetch, log, logit);
+
      while (fetch)                /* try to fetch cache [+ log ] numbers */
      {
          /*
*************** nextval_internal(Oid relid)
*** 697,704 ****
--- 698,715 ----

      last_used_seq = elm;

+     /* ready to change the on-disk (or really, in-buffer) tuple */
      START_CRIT_SECTION();

+     /*
+      * We must mark the buffer dirty before doing XLogInsert(); see notes in
+      * SyncOneBuffer().  However, we don't apply the desired changes just yet.
+      * This looks like a violation of the buffer update protocol, but it is
+      * in fact safe because we hold exclusive lock on the buffer.  Any other
+      * process, including a checkpoint, that tries to examine the buffer
+      * contents will block until we release the lock, and then will see the
+      * final state that we install below.
+      */
      MarkBufferDirty(buf);

      /* XLOG stuff */
*************** nextval_internal(Oid relid)
*** 708,743 ****
          XLogRecPtr    recptr;
          XLogRecData rdata[2];

!         xlrec.node = seqrel->rd_node;
!         rdata[0].data = (char *) &xlrec;
!         rdata[0].len = sizeof(xl_seq_rec);
!         rdata[0].buffer = InvalidBuffer;
!         rdata[0].next = &(rdata[1]);

          /* set values that will be saved in xlog */
          seq->last_value = next;
          seq->is_called = true;
          seq->log_cnt = 0;

!         rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
!         rdata[1].len = ((PageHeader) page)->pd_special -
!             ((PageHeader) page)->pd_upper;
          rdata[1].buffer = InvalidBuffer;
          rdata[1].next = NULL;

          recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);

          PageSetLSN(page, recptr);
          PageSetTLI(page, ThisTimeLineID);
      }

!     /* update on-disk data */
      seq->last_value = last;        /* last fetched number */
      seq->is_called = true;
      seq->log_cnt = log;            /* how much is logged */

      END_CRIT_SECTION();

      UnlockReleaseBuffer(buf);

      relation_close(seqrel, NoLock);
--- 719,768 ----
          XLogRecPtr    recptr;
          XLogRecData rdata[2];

!         /*
!          * We don't log the current state of the tuple, but rather the state
!          * as it would appear after "log" more fetches.  This lets us skip
!          * that many future WAL records, at the cost that we lose those
!          * sequence values if we crash.
!          */

          /* set values that will be saved in xlog */
          seq->last_value = next;
          seq->is_called = true;
          seq->log_cnt = 0;

!         xlrec.node = seqrel->rd_node;
!         rdata[0].data = (char *) &xlrec;
!         rdata[0].len = sizeof(xl_seq_rec);
!         rdata[0].buffer = InvalidBuffer;
!         rdata[0].next = &(rdata[1]);
!
!         rdata[1].data = (char *) seqtuple.t_data;
!         rdata[1].len = seqtuple.t_len;
          rdata[1].buffer = InvalidBuffer;
          rdata[1].next = NULL;

          recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG, rdata);

+         elog(LOG, "nextval WAL entry: name %s, last_value %lld, log_cnt %lld, is_called %d",
+              NameStr(seq->sequence_name),
+              seq->last_value, seq->log_cnt, seq->is_called);
+
          PageSetLSN(page, recptr);
          PageSetTLI(page, ThisTimeLineID);
      }

!     /* Now update sequence tuple to the intended final state */
      seq->last_value = last;        /* last fetched number */
      seq->is_called = true;
      seq->log_cnt = log;            /* how much is logged */

      END_CRIT_SECTION();

+     elog(LOG, "nextval result: name %s, last_value %lld, log_cnt %lld, is_called %d",
+          NameStr(seq->sequence_name),
+          seq->last_value, seq->log_cnt, seq->is_called);
+
      UnlockReleaseBuffer(buf);

      relation_close(seqrel, NoLock);
*************** do_setval(Oid relid, int64 next, bool is
*** 830,835 ****
--- 855,861 ----
      SeqTable    elm;
      Relation    seqrel;
      Buffer        buf;
+     HeapTupleData seqtuple;
      Form_pg_sequence seq;

      /* open and AccessShareLock sequence */
*************** do_setval(Oid relid, int64 next, bool is
*** 846,852 ****
          PreventCommandIfReadOnly("setval()");

      /* lock page' buffer and read tuple */
!     seq = read_info(elm, seqrel, &buf);

      if ((next < seq->min_value) || (next > seq->max_value))
      {
--- 872,878 ----
          PreventCommandIfReadOnly("setval()");

      /* lock page' buffer and read tuple */
!     seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);

      if ((next < seq->min_value) || (next > seq->max_value))
      {
*************** do_setval(Oid relid, int64 next, bool is
*** 874,881 ****
--- 900,912 ----
      /* In any case, forget any future cached numbers */
      elm->cached = elm->last;

+     /* ready to change the on-disk (or really, in-buffer) tuple */
      START_CRIT_SECTION();

+     seq->last_value = next;        /* last fetched number */
+     seq->is_called = iscalled;
+     seq->log_cnt = 0;
+
      MarkBufferDirty(buf);

      /* XLOG stuff */
*************** do_setval(Oid relid, int64 next, bool is
*** 892,905 ****
          rdata[0].buffer = InvalidBuffer;
          rdata[0].next = &(rdata[1]);

!         /* set values that will be saved in xlog */
!         seq->last_value = next;
!         seq->is_called = true;
!         seq->log_cnt = 0;
!
!         rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper;
!         rdata[1].len = ((PageHeader) page)->pd_special -
!             ((PageHeader) page)->pd_upper;
          rdata[1].buffer = InvalidBuffer;
          rdata[1].next = NULL;

--- 923,930 ----
          rdata[0].buffer = InvalidBuffer;
          rdata[0].next = &(rdata[1]);

!         rdata[1].data = (char *) seqtuple.t_data;
!         rdata[1].len = seqtuple.t_len;
          rdata[1].buffer = InvalidBuffer;
          rdata[1].next = NULL;

*************** do_setval(Oid relid, int64 next, bool is
*** 909,921 ****
          PageSetTLI(page, ThisTimeLineID);
      }

-     /* save info in sequence relation */
-     seq->last_value = next;        /* last fetched number */
-     seq->is_called = iscalled;
-     seq->log_cnt = (iscalled) ? 0 : 1;
-
      END_CRIT_SECTION();

      UnlockReleaseBuffer(buf);

      relation_close(seqrel, NoLock);
--- 934,945 ----
          PageSetTLI(page, ThisTimeLineID);
      }

      END_CRIT_SECTION();

+     elog(LOG, "setval result: name %s, last_value %lld, log_cnt %lld, is_called %d",
+          NameStr(seq->sequence_name),
+          seq->last_value, seq->log_cnt, seq->is_called);
+
      UnlockReleaseBuffer(buf);

      relation_close(seqrel, NoLock);
*************** init_sequence(Oid relid, SeqTable *p_elm
*** 1066,1078 ****
  }


! /* Given an opened relation, lock the page buffer and find the tuple */
  static Form_pg_sequence
! read_info(SeqTable elm, Relation rel, Buffer *buf)
  {
      Page        page;
      ItemId        lp;
-     HeapTupleData tuple;
      sequence_magic *sm;
      Form_pg_sequence seq;

--- 1090,1109 ----
  }


! /*
!  * Given an opened sequence relation, lock the page buffer and find the tuple
!  *
!  * *buf receives the reference to the pinned-and-ex-locked buffer
!  * *seqtuple receives the reference to the sequence tuple proper
!  *        (this arg should point to a local variable of type HeapTupleData)
!  *
!  * Function's return value points to the data payload of the tuple
!  */
  static Form_pg_sequence
! read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple)
  {
      Page        page;
      ItemId        lp;
      sequence_magic *sm;
      Form_pg_sequence seq;

*************** read_info(SeqTable elm, Relation rel, Bu
*** 1088,1094 ****

      lp = PageGetItemId(page, FirstOffsetNumber);
      Assert(ItemIdIsNormal(lp));
!     tuple.t_data = (HeapTupleHeader) PageGetItem(page, lp);

      /*
       * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE on
--- 1119,1128 ----

      lp = PageGetItemId(page, FirstOffsetNumber);
      Assert(ItemIdIsNormal(lp));
!
!     /* Note we currently only bother to set these two fields of *seqtuple */
!     seqtuple->t_data = (HeapTupleHeader) PageGetItem(page, lp);
!     seqtuple->t_len = ItemIdGetLength(lp);

      /*
       * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE on
*************** read_info(SeqTable elm, Relation rel, Bu
*** 1098,1112 ****
       * bit update, ie, don't bother to WAL-log it, since we can certainly do
       * this again if the update gets lost.
       */
!     if (HeapTupleHeaderGetXmax(tuple.t_data) != InvalidTransactionId)
      {
!         HeapTupleHeaderSetXmax(tuple.t_data, InvalidTransactionId);
!         tuple.t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
!         tuple.t_data->t_infomask |= HEAP_XMAX_INVALID;
          SetBufferCommitInfoNeedsSave(*buf);
      }

!     seq = (Form_pg_sequence) GETSTRUCT(&tuple);

      /* this is a handy place to update our copy of the increment */
      elm->increment = seq->increment_by;
--- 1132,1146 ----
       * bit update, ie, don't bother to WAL-log it, since we can certainly do
       * this again if the update gets lost.
       */
!     if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId)
      {
!         HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId);
!         seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED;
!         seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
          SetBufferCommitInfoNeedsSave(*buf);
      }

!     seq = (Form_pg_sequence) GETSTRUCT(seqtuple);

      /* this is a handy place to update our copy of the increment */
      elm->increment = seq->increment_by;
*************** init_params(List *options, bool isInit,
*** 1210,1215 ****
--- 1244,1256 ----
                   defel->defname);
      }

+     /*
+      * We must reset log_cnt when isInit or when changing any parameters
+      * that would affect future nextval allocations.
+      */
+     if (isInit)
+         new->log_cnt = 0;
+
      /* INCREMENT BY */
      if (increment_by != NULL)
      {
*************** init_params(List *options, bool isInit,
*** 1218,1223 ****
--- 1259,1265 ----
              ereport(ERROR,
                      (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                       errmsg("INCREMENT must not be zero")));
+         new->log_cnt = 0;
      }
      else if (isInit)
          new->increment_by = 1;
*************** init_params(List *options, bool isInit,
*** 1227,1256 ****
--- 1269,1307 ----
      {
          new->is_cycled = intVal(is_cycled->arg);
          Assert(BoolIsValid(new->is_cycled));
+         new->log_cnt = 0;
      }
      else if (isInit)
          new->is_cycled = false;

      /* MAXVALUE (null arg means NO MAXVALUE) */
      if (max_value != NULL && max_value->arg)
+     {
          new->max_value = defGetInt64(max_value);
+         new->log_cnt = 0;
+     }
      else if (isInit || max_value != NULL)
      {
          if (new->increment_by > 0)
              new->max_value = SEQ_MAXVALUE;        /* ascending seq */
          else
              new->max_value = -1;    /* descending seq */
+         new->log_cnt = 0;
      }

      /* MINVALUE (null arg means NO MINVALUE) */
      if (min_value != NULL && min_value->arg)
+     {
          new->min_value = defGetInt64(min_value);
+         new->log_cnt = 0;
+     }
      else if (isInit || min_value != NULL)
      {
          if (new->increment_by > 0)
              new->min_value = 1; /* ascending seq */
          else
              new->min_value = SEQ_MINVALUE;        /* descending seq */
+         new->log_cnt = 0;
      }

      /* crosscheck min/max */
*************** init_params(List *options, bool isInit,
*** 1312,1324 ****
          else
              new->last_value = new->start_value;
          new->is_called = false;
!         new->log_cnt = 1;
      }
      else if (isInit)
      {
          new->last_value = new->start_value;
          new->is_called = false;
-         new->log_cnt = 1;
      }

      /* crosscheck RESTART (or current value, if changing MIN/MAX) */
--- 1363,1374 ----
          else
              new->last_value = new->start_value;
          new->is_called = false;
!         new->log_cnt = 0;
      }
      else if (isInit)
      {
          new->last_value = new->start_value;
          new->is_called = false;
      }

      /* crosscheck RESTART (or current value, if changing MIN/MAX) */
*************** init_params(List *options, bool isInit,
*** 1361,1366 ****
--- 1411,1417 ----
                       errmsg("CACHE (%s) must be greater than zero",
                              buf)));
          }
+         new->log_cnt = 0;
      }
      else if (isInit)
          new->cache_value = 1;
*************** pg_sequence_parameters(PG_FUNCTION_ARGS)
*** 1473,1478 ****
--- 1524,1530 ----
      SeqTable    elm;
      Relation    seqrel;
      Buffer        buf;
+     HeapTupleData seqtuple;
      Form_pg_sequence seq;

      /* open and AccessShareLock sequence */
*************** pg_sequence_parameters(PG_FUNCTION_ARGS)
*** 1500,1506 ****

      memset(isnull, 0, sizeof(isnull));

!     seq = read_info(elm, seqrel, &buf);

      values[0] = Int64GetDatum(seq->start_value);
      values[1] = Int64GetDatum(seq->min_value);
--- 1552,1558 ----

      memset(isnull, 0, sizeof(isnull));

!     seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);

      values[0] = Int64GetDatum(seq->start_value);
      values[1] = Int64GetDatum(seq->min_value);
*************** seq_redo(XLogRecPtr lsn, XLogRecord *rec
*** 1555,1561 ****

      item = (char *) xlrec + sizeof(xl_seq_rec);
      itemsz = record->xl_len - sizeof(xl_seq_rec);
!     itemsz = MAXALIGN(itemsz);
      if (PageAddItem(localpage, (Item) item, itemsz,
                      FirstOffsetNumber, false, false) == InvalidOffsetNumber)
          elog(PANIC, "seq_redo: failed to add item to page");
--- 1607,1625 ----

      item = (char *) xlrec + sizeof(xl_seq_rec);
      itemsz = record->xl_len - sizeof(xl_seq_rec);
!
!     {
!         HeapTupleData tseq;
!         FormData_pg_sequence seq;
!
!         tseq.t_data = (HeapTupleHeader) item;
!         memcpy(&seq, GETSTRUCT(&tseq), sizeof(seq));
!         elog(LOG, "replay seq log len %d: name %s, last_value %lld, log_cnt %lld, is_called %d",
!              (int) itemsz,
!              NameStr(seq.sequence_name),
!              seq.last_value, seq.log_cnt, seq.is_called);
!     }
!
      if (PageAddItem(localpage, (Item) item, itemsz,
                      FirstOffsetNumber, false, false) == InvalidOffsetNumber)
          elog(PANIC, "seq_redo: failed to add item to page");
diff --git a/src/test/regress/expected/sequence.out b/src/test/regress/expected/sequence.out
index 578778c312972b3eb81fce157bdccdb50311093a..87feb08b14afa302928bf17f7409bde3a8441ea2 100644
*** a/src/test/regress/expected/sequence.out
--- b/src/test/regress/expected/sequence.out
*************** ALTER TABLE foo_seq RENAME TO foo_seq_ne
*** 170,176 ****
  SELECT * FROM foo_seq_new;
   sequence_name | last_value | start_value | increment_by |      max_value      | min_value | cache_value | log_cnt |
is_cycled| is_called  

---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+-----------
!  foo_seq       |          1 |           1 |            1 | 9223372036854775807 |         1 |           1 |       1 |
f        | f 
  (1 row)

  SELECT nextval('foo_seq_new');
--- 170,176 ----
  SELECT * FROM foo_seq_new;
   sequence_name | last_value | start_value | increment_by |      max_value      | min_value | cache_value | log_cnt |
is_cycled| is_called  

---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+-----------
!  foo_seq       |          1 |           1 |            1 | 9223372036854775807 |         1 |           1 |       0 |
f        | f 
  (1 row)

  SELECT nextval('foo_seq_new');
*************** SELECT nextval('foo_seq_new');
*** 188,194 ****
  SELECT * FROM foo_seq_new;
   sequence_name | last_value | start_value | increment_by |      max_value      | min_value | cache_value | log_cnt |
is_cycled| is_called  

---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+-----------
!  foo_seq       |          2 |           1 |            1 | 9223372036854775807 |         1 |           1 |      32 |
f        | t 
  (1 row)

  DROP SEQUENCE foo_seq_new;
--- 188,194 ----
  SELECT * FROM foo_seq_new;
   sequence_name | last_value | start_value | increment_by |      max_value      | min_value | cache_value | log_cnt |
is_cycled| is_called  

---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+-----------
!  foo_seq       |          2 |           1 |            1 | 9223372036854775807 |         1 |           1 |      31 |
f        | t 
  (1 row)

  DROP SEQUENCE foo_seq_new;
diff --git a/src/test/regress/expected/sequence_1.out b/src/test/regress/expected/sequence_1.out
index 22399ab68dc5e0b9da4a61ce3ec0af98dfcb58b5..124967ede112027da53e4a91b69955039d7fd37a 100644
*** a/src/test/regress/expected/sequence_1.out
--- b/src/test/regress/expected/sequence_1.out
*************** ALTER TABLE foo_seq RENAME TO foo_seq_ne
*** 170,176 ****
  SELECT * FROM foo_seq_new;
   sequence_name | last_value | start_value | increment_by |      max_value      | min_value | cache_value | log_cnt |
is_cycled| is_called  

---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+-----------
!  foo_seq       |          1 |           1 |            1 | 9223372036854775807 |         1 |           1 |       1 |
f        | f 
  (1 row)

  SELECT nextval('foo_seq_new');
--- 170,176 ----
  SELECT * FROM foo_seq_new;
   sequence_name | last_value | start_value | increment_by |      max_value      | min_value | cache_value | log_cnt |
is_cycled| is_called  

---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+-----------
!  foo_seq       |          1 |           1 |            1 | 9223372036854775807 |         1 |           1 |       0 |
f        | f 
  (1 row)

  SELECT nextval('foo_seq_new');
*************** SELECT nextval('foo_seq_new');
*** 188,194 ****
  SELECT * FROM foo_seq_new;
   sequence_name | last_value | start_value | increment_by |      max_value      | min_value | cache_value | log_cnt |
is_cycled| is_called  

---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+-----------
!  foo_seq       |          2 |           1 |            1 | 9223372036854775807 |         1 |           1 |      31 |
f        | t 
  (1 row)

  DROP SEQUENCE foo_seq_new;
--- 188,194 ----
  SELECT * FROM foo_seq_new;
   sequence_name | last_value | start_value | increment_by |      max_value      | min_value | cache_value | log_cnt |
is_cycled| is_called  

---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+-----------
!  foo_seq       |          2 |           1 |            1 | 9223372036854775807 |         1 |           1 |      32 |
f        | t 
  (1 row)

  DROP SEQUENCE foo_seq_new;
*************** DROP SEQUENCE myseq2;
*** 234,239 ****
--- 234,242 ----
  --
  -- Alter sequence
  --
+ ALTER SEQUENCE IF EXISTS sequence_test2 RESTART WITH 24
+      INCREMENT BY 4 MAXVALUE 36 MINVALUE 5 CYCLE;
+ NOTICE:  relation "sequence_test2" does not exist, skipping
  CREATE SEQUENCE sequence_test2 START WITH 32;
  SELECT nextval('sequence_test2');
   nextval

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Merlin Moncure
Дата:
Сообщение: Re: [patch] libpq one-row-at-a-time API
Следующее
От: Tom Lane
Дата:
Сообщение: Re: canceling autovacuum task woes