Обсуждение: [HACKERS] Keeping pg_recvlogical's "feTimestamp" separate from TimestampTz

Поиск
Список
Период
Сортировка

[HACKERS] Keeping pg_recvlogical's "feTimestamp" separate from TimestampTz

От
Tom Lane
Дата:
Thomas Munro pointed out that commit 7c030783a broke things on
--disable-integer-datetimes builds, because somebody cleverly used
TimestampTz to declare timestamp variables, no doubt not having
read the comment (which doesn't even appear in the same file :-()
that
* ... The replication protocol always uses integer timestamps,* regardless of the server setting.

I am not sure that it was really a good design to pretend that the
replication protocol is independent of --disable-integer-datetimes
when the underlying WAL stream most certainly isn't.

However, if we keep it like this, I think we need to expend significantly
more effort on making sure that "feTimestamps" aren't confused with
TimestampTzs.  It's only the sheer happenstance that a couple of new
functions were declared with "TimestampTz *" not "TimestampTz" arguments
that we found this mechanically at all --- otherwise, the compiler doesn't
know any better than to naively cast from int64 to double or vice versa
when asked to.

Moreover, I think there is absolutely nothing stopping somebody from
trying to compare a replication-protocol timestamp to a TimestampTz
taken out of the WAL stream, which will give completely wrong answers
for float timestamps, and which no compiler on earth will warn about.
Even if there are no such occurrences today, does anyone really want
to bet that somebody won't submit a patch tomorrow that subtly depends
on replication-protocol timestamps matching backend timestamps?

I propose that what we need to do is get rid of the dangerous and
none-too-readable-anyway use of "int64" to declare replication-protocol
timestamps, and instead declare them as, say,
typedef struct RPTimestamp{    int64 rptimestamp;} RPTimestamp;

This will entail slightly more notation in the subroutines that actually
do something with the timestamp values, but it will make certain that
you can't assign or compare an RPTimestamp to a backend timestamp without
the compiler complaining about it.
        regards, tom lane



Re: [HACKERS] Keeping pg_recvlogical's "feTimestamp" separate from TimestampTz

От
Tom Lane
Дата:
I wrote:
> I propose that what we need to do is get rid of the dangerous and
> none-too-readable-anyway use of "int64" to declare replication-protocol
> timestamps, and instead declare them as, say,

>     typedef struct RPTimestamp
>     {
>         int64 rptimestamp;
>     } RPTimestamp;

I experimented with this a bit, as in the attached draft patch, and
concluded that use of a struct is probably a bridge too far.  It makes the
patch pretty invasive notationally, and yet it still fails to catch places
where nobody had bothered to even think about float timestamps anywhere
nearby; which there are several of, as per my report
https://www.postgresql.org/message-id/26788.1487455319@sss.pgh.pa.us

We might be able to salvage the parts of this that simply redeclare
appropriate variables as "IntegerTimestamp" rather than "int64", with
the type declaration just being "typedef int64 IntegerTimestamp".
If we go forward with keeping protocol fields as integer timestamps
then I'd want to do that, just so we have some notation in the code
as to what the values are meant to be.  But right at the moment I'm
thinking we'd be better off to change them all to TimestampTz instead,
and adjust the arithmetic on them appropriately.

            regards, tom lane

diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 09ecc15..2f688ec 100644
*** a/src/backend/replication/basebackup.c
--- b/src/backend/replication/basebackup.c
*************** static int64 throttling_counter;
*** 95,101 ****
  static int64 elapsed_min_unit;

  /* The last check of the transfer rate. */
! static int64 throttled_last;

  /*
   * The contents of these directories are removed or recreated during server
--- 95,101 ----
  static int64 elapsed_min_unit;

  /* The last check of the transfer rate. */
! static IntegerTimestamp throttled_last;

  /*
   * The contents of these directories are removed or recreated during server
*************** throttle(size_t increment)
*** 1346,1352 ****
          return;

      /* Time elapsed since the last measurement (and possible wake up). */
!     elapsed = GetCurrentIntegerTimestamp() - throttled_last;
      /* How much should have elapsed at minimum? */
      elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
      sleep = elapsed_min - elapsed;
--- 1346,1352 ----
          return;

      /* Time elapsed since the last measurement (and possible wake up). */
!     elapsed = GetCurrentIntegerTimestamp().integer_ts - throttled_last.integer_ts;
      /* How much should have elapsed at minimum? */
      elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
      sleep = elapsed_min - elapsed;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 0b19fec..30e2f19 100644
*** a/src/backend/replication/logical/worker.c
--- b/src/backend/replication/logical/worker.c
*************** ApplyLoop(void)
*** 989,995 ****
                          start_lsn = pq_getmsgint64(&s);
                          end_lsn = pq_getmsgint64(&s);
                          send_time =
!                             IntegerTimestampToTimestampTz(pq_getmsgint64(&s));

                          if (last_received < start_lsn)
                              last_received = start_lsn;
--- 989,995 ----
                          start_lsn = pq_getmsgint64(&s);
                          end_lsn = pq_getmsgint64(&s);
                          send_time =
!                             Int64ToTimestampTz(pq_getmsgint64(&s));

                          if (last_received < start_lsn)
                              last_received = start_lsn;
*************** ApplyLoop(void)
*** 1009,1015 ****

                          endpos = pq_getmsgint64(&s);
                          timestamp =
!                             IntegerTimestampToTimestampTz(pq_getmsgint64(&s));
                          reply_requested = pq_getmsgbyte(&s);

                          send_feedback(endpos, reply_requested, false);
--- 1009,1015 ----

                          endpos = pq_getmsgint64(&s);
                          timestamp =
!                             Int64ToTimestampTz(pq_getmsgint64(&s));
                          reply_requested = pq_getmsgbyte(&s);

                          send_feedback(endpos, reply_requested, false);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 5c2e72b..289c806 100644
*** a/src/backend/replication/walreceiver.c
--- b/src/backend/replication/walreceiver.c
*************** XLogWalRcvProcessMsg(unsigned char type,
*** 892,898 ****
                  /* read the fields */
                  dataStart = pq_getmsgint64(&incoming_message);
                  walEnd = pq_getmsgint64(&incoming_message);
!                 sendTime = IntegerTimestampToTimestampTz(
                                            pq_getmsgint64(&incoming_message));
                  ProcessWalSndrMessage(walEnd, sendTime);

--- 892,898 ----
                  /* read the fields */
                  dataStart = pq_getmsgint64(&incoming_message);
                  walEnd = pq_getmsgint64(&incoming_message);
!                 sendTime = Int64ToTimestampTz(
                                            pq_getmsgint64(&incoming_message));
                  ProcessWalSndrMessage(walEnd, sendTime);

*************** XLogWalRcvProcessMsg(unsigned char type,
*** 913,919 ****

                  /* read the fields */
                  walEnd = pq_getmsgint64(&incoming_message);
!                 sendTime = IntegerTimestampToTimestampTz(
                                            pq_getmsgint64(&incoming_message));
                  replyRequested = pq_getmsgbyte(&incoming_message);

--- 913,919 ----

                  /* read the fields */
                  walEnd = pq_getmsgint64(&incoming_message);
!                 sendTime = Int64ToTimestampTz(
                                            pq_getmsgint64(&incoming_message));
                  replyRequested = pq_getmsgbyte(&incoming_message);

*************** XLogWalRcvSendReply(bool force, bool req
*** 1149,1155 ****
      pq_sendint64(&reply_message, writePtr);
      pq_sendint64(&reply_message, flushPtr);
      pq_sendint64(&reply_message, applyPtr);
!     pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
      pq_sendbyte(&reply_message, requestReply ? 1 : 0);

      /* Send it */
--- 1149,1155 ----
      pq_sendint64(&reply_message, writePtr);
      pq_sendint64(&reply_message, flushPtr);
      pq_sendint64(&reply_message, applyPtr);
!     pq_sendint64(&reply_message, GetCurrentIntegerTimestamp().integer_ts);
      pq_sendbyte(&reply_message, requestReply ? 1 : 0);

      /* Send it */
*************** XLogWalRcvSendHSFeedback(bool immed)
*** 1241,1247 ****
      /* Construct the message and send it. */
      resetStringInfo(&reply_message);
      pq_sendbyte(&reply_message, 'h');
!     pq_sendint64(&reply_message, GetCurrentIntegerTimestamp());
      pq_sendint(&reply_message, xmin, 4);
      pq_sendint(&reply_message, nextEpoch, 4);
      walrcv_send(wrconn, reply_message.data, reply_message.len);
--- 1241,1247 ----
      /* Construct the message and send it. */
      resetStringInfo(&reply_message);
      pq_sendbyte(&reply_message, 'h');
!     pq_sendint64(&reply_message, GetCurrentIntegerTimestamp().integer_ts);
      pq_sendint(&reply_message, xmin, 4);
      pq_sendint(&reply_message, nextEpoch, 4);
      walrcv_send(wrconn, reply_message.data, reply_message.len);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index ba506e2..ed0fef8 100644
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
*************** WalSndWriteData(LogicalDecodingContext *
*** 1016,1022 ****
       * several releases by streaming physical replication.
       */
      resetStringInfo(&tmpbuf);
!     pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
      memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
             tmpbuf.data, sizeof(int64));

--- 1016,1022 ----
       * several releases by streaming physical replication.
       */
      resetStringInfo(&tmpbuf);
!     pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp().integer_ts);
      memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
             tmpbuf.data, sizeof(int64));

*************** XLogSendPhysical(void)
*** 2336,2342 ****
       * Fill the send timestamp last, so that it is taken as late as possible.
       */
      resetStringInfo(&tmpbuf);
!     pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp());
      memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
             tmpbuf.data, sizeof(int64));

--- 2336,2342 ----
       * Fill the send timestamp last, so that it is taken as late as possible.
       */
      resetStringInfo(&tmpbuf);
!     pq_sendint64(&tmpbuf, GetCurrentIntegerTimestamp().integer_ts);
      memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
             tmpbuf.data, sizeof(int64));

*************** WalSndKeepalive(bool requestReply)
*** 2844,2850 ****
      resetStringInfo(&output_message);
      pq_sendbyte(&output_message, 'k');
      pq_sendint64(&output_message, sentPtr);
!     pq_sendint64(&output_message, GetCurrentIntegerTimestamp());
      pq_sendbyte(&output_message, requestReply ? 1 : 0);

      /* ... and send it wrapped in CopyData */
--- 2844,2850 ----
      resetStringInfo(&output_message);
      pq_sendbyte(&output_message, 'k');
      pq_sendint64(&output_message, sentPtr);
!     pq_sendint64(&output_message, GetCurrentIntegerTimestamp().integer_ts);
      pq_sendbyte(&output_message, requestReply ? 1 : 0);

      /* ... and send it wrapped in CopyData */
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 3cb5120..9717273 100644
*** a/src/backend/storage/buffer/bufmgr.c
--- b/src/backend/storage/buffer/bufmgr.c
*************** void
*** 4333,4339 ****
  TestForOldSnapshot_impl(Snapshot snapshot, Relation relation)
  {
      if (RelationAllowsEarlyPruning(relation)
!         && (snapshot)->whenTaken < GetOldSnapshotThresholdTimestamp())
          ereport(ERROR,
                  (errcode(ERRCODE_SNAPSHOT_TOO_OLD),
                   errmsg("snapshot too old")));
--- 4333,4339 ----
  TestForOldSnapshot_impl(Snapshot snapshot, Relation relation)
  {
      if (RelationAllowsEarlyPruning(relation)
!         && (snapshot)->whenTaken.integer_ts < GetOldSnapshotThresholdTimestamp().integer_ts)
          ereport(ERROR,
                  (errcode(ERRCODE_SNAPSHOT_TOO_OLD),
                   errmsg("snapshot too old")));
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index cd14667..fcb4093 100644
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
*************** GetSnapshotData(Snapshot snapshot)
*** 1760,1766 ****
           * dummy values that don't require any locking.
           */
          snapshot->lsn = InvalidXLogRecPtr;
!         snapshot->whenTaken = 0;
      }
      else
      {
--- 1760,1766 ----
           * dummy values that don't require any locking.
           */
          snapshot->lsn = InvalidXLogRecPtr;
!         snapshot->whenTaken.integer_ts = 0;
      }
      else
      {
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index 9b4c012..9d18875 100644
*** a/src/backend/utils/adt/timestamp.c
--- b/src/backend/utils/adt/timestamp.c
*************** GetCurrentTimestamp(void)
*** 1711,1720 ****
   * with --enable-integer-datetimes, this is identical to GetCurrentTimestamp(),
   * and is implemented as a macro.
   */
! #ifndef HAVE_INT64_TIMESTAMP
! int64
  GetCurrentIntegerTimestamp(void)
  {
      int64        result;
      struct timeval tp;

--- 1711,1721 ----
   * with --enable-integer-datetimes, this is identical to GetCurrentTimestamp(),
   * and is implemented as a macro.
   */
! //#ifndef HAVE_INT64_TIMESTAMP
! IntegerTimestamp
  GetCurrentIntegerTimestamp(void)
  {
+     IntegerTimestamp ts;
      int64        result;
      struct timeval tp;

*************** GetCurrentIntegerTimestamp(void)
*** 1725,1748 ****

      result = (result * USECS_PER_SEC) + tp.tv_usec;

      return result;
  }
  #endif

  /*
!  * IntegerTimestampToTimestampTz -- convert an int64 timestamp to native format
   *
   * When compiled with --enable-integer-datetimes, this is implemented as a
!  * no-op macro.
   */
  #ifndef HAVE_INT64_TIMESTAMP
  TimestampTz
! IntegerTimestampToTimestampTz(int64 timestamp)
  {
      TimestampTz result;

!     result = timestamp / USECS_PER_SEC;
!     result += (timestamp % USECS_PER_SEC) / 1000000.0;

      return result;
  }
--- 1726,1761 ----

      result = (result * USECS_PER_SEC) + tp.tv_usec;

+     ts.integer_ts = result;
+     return ts;
+ }
+ //#endif
+
+ #ifndef HAVE_INT64_TIMESTAMP
+ TimestampTz
+ Int64ToTimestampTz(int64 timestamp)
+ {
+     TimestampTz result;
+
+     result = ((TimestampTz) timestamp) / USECS_PER_SEC;
+
      return result;
  }
  #endif

  /*
!  * IntegerTimestampToTimestampTz -- convert integer timestamp to native format
   *
   * When compiled with --enable-integer-datetimes, this is implemented as a
!  * trivial macro.
   */
  #ifndef HAVE_INT64_TIMESTAMP
  TimestampTz
! IntegerTimestampToTimestampTz(IntegerTimestamp timestamp)
  {
      TimestampTz result;

!     result = ((TimestampTz) timestamp.integer_ts) / USECS_PER_SEC;

      return result;
  }
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 92afc32..338c2e1 100644
*** a/src/backend/utils/time/snapmgr.c
--- b/src/backend/utils/time/snapmgr.c
*************** typedef struct OldSnapshotControlData
*** 83,95 ****
       * only allowed to move forward.
       */
      slock_t        mutex_current;    /* protect current_timestamp */
!     int64        current_timestamp;        /* latest snapshot timestamp */
      slock_t        mutex_latest_xmin;        /* protect latest_xmin and
                                           * next_map_update */
      TransactionId latest_xmin;    /* latest snapshot xmin */
!     int64        next_map_update;    /* latest snapshot valid up to */
      slock_t        mutex_threshold;    /* protect threshold fields */
!     int64        threshold_timestamp;    /* earlier snapshot is old */
      TransactionId threshold_xid;    /* earlier xid may be gone */

      /*
--- 83,95 ----
       * only allowed to move forward.
       */
      slock_t        mutex_current;    /* protect current_timestamp */
!     IntegerTimestamp current_timestamp; /* latest snapshot timestamp */
      slock_t        mutex_latest_xmin;        /* protect latest_xmin and
                                           * next_map_update */
      TransactionId latest_xmin;    /* latest snapshot xmin */
!     IntegerTimestamp next_map_update;    /* latest snapshot valid up to */
      slock_t        mutex_threshold;    /* protect threshold fields */
!     IntegerTimestamp threshold_timestamp;        /* earlier snapshot is old */
      TransactionId threshold_xid;    /* earlier xid may be gone */

      /*
*************** typedef struct OldSnapshotControlData
*** 121,127 ****
       * Persistence is not needed.
       */
      int            head_offset;    /* subscript of oldest tracked time */
!     int64        head_timestamp; /* time corresponding to head xid */
      int            count_used;        /* how many slots are in use */
      TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
  } OldSnapshotControlData;
--- 121,127 ----
       * Persistence is not needed.
       */
      int            head_offset;    /* subscript of oldest tracked time */
!     IntegerTimestamp head_timestamp;    /* time corresponding to head xid */
      int            count_used;        /* how many slots are in use */
      TransactionId xid_by_minute[FLEXIBLE_ARRAY_MEMBER];
  } OldSnapshotControlData;
*************** static Snapshot FirstXactSnapshot = NULL
*** 219,225 ****
  static List *exportedSnapshots = NIL;

  /* Prototypes for local functions */
! static int64 AlignTimestampToMinuteBoundary(int64 ts);
  static Snapshot CopySnapshot(Snapshot snapshot);
  static void FreeSnapshot(Snapshot snapshot);
  static void SnapshotResetXmin(void);
--- 219,225 ----
  static List *exportedSnapshots = NIL;

  /* Prototypes for local functions */
! static IntegerTimestamp AlignTimestampToMinuteBoundary(IntegerTimestamp ts);
  static Snapshot CopySnapshot(Snapshot snapshot);
  static void FreeSnapshot(Snapshot snapshot);
  static void SnapshotResetXmin(void);
*************** typedef struct SerializedSnapshotData
*** 239,245 ****
      bool        suboverflowed;
      bool        takenDuringRecovery;
      CommandId    curcid;
!     int64        whenTaken;
      XLogRecPtr    lsn;
  } SerializedSnapshotData;

--- 239,245 ----
      bool        suboverflowed;
      bool        takenDuringRecovery;
      CommandId    curcid;
!     IntegerTimestamp whenTaken;
      XLogRecPtr    lsn;
  } SerializedSnapshotData;

*************** SnapMgrInit(void)
*** 274,288 ****
      if (!found)
      {
          SpinLockInit(&oldSnapshotControl->mutex_current);
!         oldSnapshotControl->current_timestamp = 0;
          SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
          oldSnapshotControl->latest_xmin = InvalidTransactionId;
!         oldSnapshotControl->next_map_update = 0;
          SpinLockInit(&oldSnapshotControl->mutex_threshold);
!         oldSnapshotControl->threshold_timestamp = 0;
          oldSnapshotControl->threshold_xid = InvalidTransactionId;
          oldSnapshotControl->head_offset = 0;
!         oldSnapshotControl->head_timestamp = 0;
          oldSnapshotControl->count_used = 0;
      }
  }
--- 274,288 ----
      if (!found)
      {
          SpinLockInit(&oldSnapshotControl->mutex_current);
!         oldSnapshotControl->current_timestamp.integer_ts = 0;
          SpinLockInit(&oldSnapshotControl->mutex_latest_xmin);
          oldSnapshotControl->latest_xmin = InvalidTransactionId;
!         oldSnapshotControl->next_map_update.integer_ts = 0;
          SpinLockInit(&oldSnapshotControl->mutex_threshold);
!         oldSnapshotControl->threshold_timestamp.integer_ts = 0;
          oldSnapshotControl->threshold_xid = InvalidTransactionId;
          oldSnapshotControl->head_offset = 0;
!         oldSnapshotControl->head_timestamp.integer_ts = 0;
          oldSnapshotControl->count_used = 0;
      }
  }
*************** ThereAreNoPriorRegisteredSnapshots(void)
*** 1611,1642 ****


  /*
!  * Return an int64 timestamp which is exactly on a minute boundary.
   *
   * If the argument is already aligned, return that value, otherwise move to
   * the next minute boundary following the given time.
   */
! static int64
! AlignTimestampToMinuteBoundary(int64 ts)
  {
!     int64        retval = ts + (USECS_PER_MINUTE - 1);
!
!     return retval - (retval % USECS_PER_MINUTE);
  }

  /*
!  * Get current timestamp for snapshots as int64 that never moves backward.
   */
! int64
  GetSnapshotCurrentTimestamp(void)
  {
!     int64        now = GetCurrentIntegerTimestamp();

      /*
       * Don't let time move backward; if it hasn't advanced, use the old value.
       */
      SpinLockAcquire(&oldSnapshotControl->mutex_current);
!     if (now <= oldSnapshotControl->current_timestamp)
          now = oldSnapshotControl->current_timestamp;
      else
          oldSnapshotControl->current_timestamp = now;
--- 1611,1642 ----


  /*
!  * Return an integer timestamp which is exactly on a minute boundary.
   *
   * If the argument is already aligned, return that value, otherwise move to
   * the next minute boundary following the given time.
   */
! static IntegerTimestamp
! AlignTimestampToMinuteBoundary(IntegerTimestamp ts)
  {
!     ts.integer_ts += USECS_PER_MINUTE - 1;
!     ts.integer_ts -= ts.integer_ts % USECS_PER_MINUTE;
!     return ts;
  }

  /*
!  * Get current timestamp for snapshots as integer that never moves backward.
   */
! IntegerTimestamp
  GetSnapshotCurrentTimestamp(void)
  {
!     IntegerTimestamp now = GetCurrentIntegerTimestamp();

      /*
       * Don't let time move backward; if it hasn't advanced, use the old value.
       */
      SpinLockAcquire(&oldSnapshotControl->mutex_current);
!     if (now.integer_ts <= oldSnapshotControl->current_timestamp.integer_ts)
          now = oldSnapshotControl->current_timestamp;
      else
          oldSnapshotControl->current_timestamp = now;
*************** GetSnapshotCurrentTimestamp(void)
*** 1652,1661 ****
   * XXX: So far, we never trust that a 64-bit value can be read atomically; if
   * that ever changes, we could get rid of the spinlock here.
   */
! int64
  GetOldSnapshotThresholdTimestamp(void)
  {
!     int64        threshold_timestamp;

      SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
      threshold_timestamp = oldSnapshotControl->threshold_timestamp;
--- 1652,1661 ----
   * XXX: So far, we never trust that a 64-bit value can be read atomically; if
   * that ever changes, we could get rid of the spinlock here.
   */
! IntegerTimestamp
  GetOldSnapshotThresholdTimestamp(void)
  {
!     IntegerTimestamp threshold_timestamp;

      SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
      threshold_timestamp = oldSnapshotControl->threshold_timestamp;
*************** GetOldSnapshotThresholdTimestamp(void)
*** 1665,1671 ****
  }

  static void
! SetOldSnapshotThresholdTimestamp(int64 ts, TransactionId xlimit)
  {
      SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
      oldSnapshotControl->threshold_timestamp = ts;
--- 1665,1671 ----
  }

  static void
! SetOldSnapshotThresholdTimestamp(IntegerTimestamp ts, TransactionId xlimit)
  {
      SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
      oldSnapshotControl->threshold_timestamp = ts;
*************** TransactionIdLimitedForOldSnapshots(Tran
*** 1690,1699 ****
          && old_snapshot_threshold >= 0
          && RelationAllowsEarlyPruning(relation))
      {
!         int64        ts = GetSnapshotCurrentTimestamp();
          TransactionId xlimit = recentXmin;
          TransactionId latest_xmin;
!         int64        update_ts;
          bool        same_ts_as_threshold = false;

          SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
--- 1690,1699 ----
          && old_snapshot_threshold >= 0
          && RelationAllowsEarlyPruning(relation))
      {
!         IntegerTimestamp ts = GetSnapshotCurrentTimestamp();
          TransactionId xlimit = recentXmin;
          TransactionId latest_xmin;
!         IntegerTimestamp update_ts;
          bool        same_ts_as_threshold = false;

          SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
*************** TransactionIdLimitedForOldSnapshots(Tran
*** 1715,1732 ****
                  && TransactionIdFollows(latest_xmin, xlimit))
                  xlimit = latest_xmin;

!             ts -= 5 * USECS_PER_SEC;
              SetOldSnapshotThresholdTimestamp(ts, xlimit);

              return xlimit;
          }

!         ts = AlignTimestampToMinuteBoundary(ts)
!             - (old_snapshot_threshold * USECS_PER_MINUTE);

          /* Check for fast exit without LW locking. */
          SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
!         if (ts == oldSnapshotControl->threshold_timestamp)
          {
              xlimit = oldSnapshotControl->threshold_xid;
              same_ts_as_threshold = true;
--- 1715,1732 ----
                  && TransactionIdFollows(latest_xmin, xlimit))
                  xlimit = latest_xmin;

!             ts.integer_ts -= 5 * USECS_PER_SEC;
              SetOldSnapshotThresholdTimestamp(ts, xlimit);

              return xlimit;
          }

!         ts = AlignTimestampToMinuteBoundary(ts);
!         ts.integer_ts -= old_snapshot_threshold * USECS_PER_MINUTE;

          /* Check for fast exit without LW locking. */
          SpinLockAcquire(&oldSnapshotControl->mutex_threshold);
!         if (ts.integer_ts == oldSnapshotControl->threshold_timestamp.integer_ts)
          {
              xlimit = oldSnapshotControl->threshold_xid;
              same_ts_as_threshold = true;
*************** TransactionIdLimitedForOldSnapshots(Tran
*** 1735,1741 ****

          if (!same_ts_as_threshold)
          {
!             if (ts == update_ts)
              {
                  xlimit = latest_xmin;
                  if (NormalTransactionIdFollows(xlimit, recentXmin))
--- 1735,1741 ----

          if (!same_ts_as_threshold)
          {
!             if (ts.integer_ts == update_ts.integer_ts)
              {
                  xlimit = latest_xmin;
                  if (NormalTransactionIdFollows(xlimit, recentXmin))
*************** TransactionIdLimitedForOldSnapshots(Tran
*** 1746,1756 ****
                  LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);

                  if (oldSnapshotControl->count_used > 0
!                     && ts >= oldSnapshotControl->head_timestamp)
                  {
                      int            offset;

!                     offset = ((ts - oldSnapshotControl->head_timestamp)
                                / USECS_PER_MINUTE);
                      if (offset > oldSnapshotControl->count_used - 1)
                          offset = oldSnapshotControl->count_used - 1;
--- 1746,1756 ----
                  LWLockAcquire(OldSnapshotTimeMapLock, LW_SHARED);

                  if (oldSnapshotControl->count_used > 0
!                     && ts.integer_ts >= oldSnapshotControl->head_timestamp.integer_ts)
                  {
                      int            offset;

!                     offset = ((ts.integer_ts - oldSnapshotControl->head_timestamp.integer_ts)
                                / USECS_PER_MINUTE);
                      if (offset > oldSnapshotControl->count_used - 1)
                          offset = oldSnapshotControl->count_used - 1;
*************** TransactionIdLimitedForOldSnapshots(Tran
*** 1790,1800 ****
   * Take care of the circular buffer that maps time to xid.
   */
  void
! MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin)
  {
!     int64        ts;
      TransactionId latest_xmin;
!     int64        update_ts;
      bool        map_update_required = false;

      /* Never call this function when old snapshot checking is disabled. */
--- 1790,1800 ----
   * Take care of the circular buffer that maps time to xid.
   */
  void
! MaintainOldSnapshotTimeMapping(IntegerTimestamp whenTaken, TransactionId xmin)
  {
!     IntegerTimestamp ts;
      TransactionId latest_xmin;
!     IntegerTimestamp update_ts;
      bool        map_update_required = false;

      /* Never call this function when old snapshot checking is disabled. */
*************** MaintainOldSnapshotTimeMapping(int64 whe
*** 1809,1815 ****
      SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
      latest_xmin = oldSnapshotControl->latest_xmin;
      update_ts = oldSnapshotControl->next_map_update;
!     if (ts > update_ts)
      {
          oldSnapshotControl->next_map_update = ts;
          map_update_required = true;
--- 1809,1815 ----
      SpinLockAcquire(&oldSnapshotControl->mutex_latest_xmin);
      latest_xmin = oldSnapshotControl->latest_xmin;
      update_ts = oldSnapshotControl->next_map_update;
!     if (ts.integer_ts > update_ts.integer_ts)
      {
          oldSnapshotControl->next_map_update = ts;
          map_update_required = true;
*************** MaintainOldSnapshotTimeMapping(int64 whe
*** 1832,1842 ****
       * processing for this feature; so if something seems unreasonable, just
       * log at DEBUG level and return without doing anything.
       */
!     if (whenTaken < 0)
      {
          elog(DEBUG1,
          "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
!              (long) whenTaken);
          return;
      }
      if (!TransactionIdIsNormal(xmin))
--- 1832,1842 ----
       * processing for this feature; so if something seems unreasonable, just
       * log at DEBUG level and return without doing anything.
       */
!     if (whenTaken.integer_ts < 0)
      {
          elog(DEBUG1,
          "MaintainOldSnapshotTimeMapping called with negative whenTaken = %ld",
!              (long) whenTaken.integer_ts);
          return;
      }
      if (!TransactionIdIsNormal(xmin))
*************** MaintainOldSnapshotTimeMapping(int64 whe
*** 1851,1857 ****

      Assert(oldSnapshotControl->head_offset >= 0);
      Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
!     Assert((oldSnapshotControl->head_timestamp % USECS_PER_MINUTE) == 0);
      Assert(oldSnapshotControl->count_used >= 0);
      Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);

--- 1851,1857 ----

      Assert(oldSnapshotControl->head_offset >= 0);
      Assert(oldSnapshotControl->head_offset < OLD_SNAPSHOT_TIME_MAP_ENTRIES);
!     Assert((oldSnapshotControl->head_timestamp.integer_ts % USECS_PER_MINUTE) == 0);
      Assert(oldSnapshotControl->count_used >= 0);
      Assert(oldSnapshotControl->count_used <= OLD_SNAPSHOT_TIME_MAP_ENTRIES);

*************** MaintainOldSnapshotTimeMapping(int64 whe
*** 1863,1884 ****
          oldSnapshotControl->count_used = 1;
          oldSnapshotControl->xid_by_minute[0] = xmin;
      }
!     else if (ts < oldSnapshotControl->head_timestamp)
      {
          /* old ts; log it at DEBUG */
          LWLockRelease(OldSnapshotTimeMapLock);
          elog(DEBUG1,
               "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
!              (long) whenTaken);
          return;
      }
!     else if (ts <= (oldSnapshotControl->head_timestamp +
                      ((oldSnapshotControl->count_used - 1)
                       * USECS_PER_MINUTE)))
      {
          /* existing mapping; advance xid if possible */
          int            bucket = (oldSnapshotControl->head_offset
!                               + ((ts - oldSnapshotControl->head_timestamp)
                                   / USECS_PER_MINUTE))
          % OLD_SNAPSHOT_TIME_MAP_ENTRIES;

--- 1863,1884 ----
          oldSnapshotControl->count_used = 1;
          oldSnapshotControl->xid_by_minute[0] = xmin;
      }
!     else if (ts.integer_ts < oldSnapshotControl->head_timestamp.integer_ts)
      {
          /* old ts; log it at DEBUG */
          LWLockRelease(OldSnapshotTimeMapLock);
          elog(DEBUG1,
               "MaintainOldSnapshotTimeMapping called with old whenTaken = %ld",
!              (long) whenTaken.integer_ts);
          return;
      }
!     else if (ts.integer_ts <= (oldSnapshotControl->head_timestamp.integer_ts +
                      ((oldSnapshotControl->count_used - 1)
                       * USECS_PER_MINUTE)))
      {
          /* existing mapping; advance xid if possible */
          int            bucket = (oldSnapshotControl->head_offset
!                               + ((ts.integer_ts - oldSnapshotControl->head_timestamp.integer_ts)
                                   / USECS_PER_MINUTE))
          % OLD_SNAPSHOT_TIME_MAP_ENTRIES;

*************** MaintainOldSnapshotTimeMapping(int64 whe
*** 1888,1894 ****
      else
      {
          /* We need a new bucket, but it might not be the very next one. */
!         int            advance = ((ts - oldSnapshotControl->head_timestamp)
                                 / USECS_PER_MINUTE);

          oldSnapshotControl->head_timestamp = ts;
--- 1888,1894 ----
      else
      {
          /* We need a new bucket, but it might not be the very next one. */
!         int            advance = ((ts.integer_ts - oldSnapshotControl->head_timestamp.integer_ts)
                                 / USECS_PER_MINUTE);

          oldSnapshotControl->head_timestamp = ts;
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index d16d08b..53a4bbd 100644
*** a/src/bin/pg_basebackup/pg_recvlogical.c
--- b/src/bin/pg_basebackup/pg_recvlogical.c
*************** static int    outfd = -1;
*** 57,63 ****
  static volatile sig_atomic_t time_to_abort = false;
  static volatile sig_atomic_t output_reopen = false;
  static bool output_isfile;
! static int64 output_last_fsync = -1;
  static bool output_needs_fsync = false;
  static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
  static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
--- 57,63 ----
  static volatile sig_atomic_t time_to_abort = false;
  static volatile sig_atomic_t output_reopen = false;
  static bool output_isfile;
! static IntegerTimestamp output_last_fsync = {-1};
  static bool output_needs_fsync = false;
  static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
  static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
*************** static XLogRecPtr output_fsync_lsn = Inv
*** 65,71 ****
  static void usage(void);
  static void StreamLogicalLog(void);
  static void disconnect_and_exit(int code);
! static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
  static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
                     bool keepalive, XLogRecPtr lsn);

--- 65,71 ----
  static void usage(void);
  static void StreamLogicalLog(void);
  static void disconnect_and_exit(int code);
! static bool flushAndSendFeedback(PGconn *conn, IntegerTimestamp *now);
  static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
                     bool keepalive, XLogRecPtr lsn);

*************** usage(void)
*** 112,118 ****
   * Send a Standby Status Update message to server.
   */
  static bool
! sendFeedback(PGconn *conn, int64 now, bool force, bool replyRequested)
  {
      static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
      static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
--- 112,118 ----
   * Send a Standby Status Update message to server.
   */
  static bool
! sendFeedback(PGconn *conn, IntegerTimestamp now, bool force, bool replyRequested)
  {
      static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
      static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
*************** sendFeedback(PGconn *conn, int64 now, bo
*** 146,152 ****
      len += 8;
      fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
      len += 8;
!     fe_sendint64(now, &replybuf[len]);    /* sendTime */
      len += 8;
      replybuf[len] = replyRequested ? 1 : 0;        /* replyRequested */
      len += 1;
--- 146,152 ----
      len += 8;
      fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
      len += 8;
!     fe_sendint64(now.integer_ts, &replybuf[len]);    /* sendTime */
      len += 8;
      replybuf[len] = replyRequested ? 1 : 0;        /* replyRequested */
      len += 1;
*************** disconnect_and_exit(int code)
*** 175,181 ****
  }

  static bool
! OutputFsync(int64 now)
  {
      output_last_fsync = now;

--- 175,181 ----
  }

  static bool
! OutputFsync(IntegerTimestamp now)
  {
      output_last_fsync = now;

*************** StreamLogicalLog(void)
*** 212,218 ****
  {
      PGresult   *res;
      char       *copybuf = NULL;
!     int64        last_status = -1;
      int            i;
      PQExpBuffer query;

--- 212,218 ----
  {
      PGresult   *res;
      char       *copybuf = NULL;
!     IntegerTimestamp last_status = {-1};
      int            i;
      PQExpBuffer query;

*************** StreamLogicalLog(void)
*** 285,291 ****
          int            r;
          int            bytes_left;
          int            bytes_written;
!         int64        now;
          int            hdr_len;
          XLogRecPtr    cur_record_lsn = InvalidXLogRecPtr;

--- 285,291 ----
          int            r;
          int            bytes_left;
          int            bytes_written;
!         IntegerTimestamp now;
          int            hdr_len;
          XLogRecPtr    cur_record_lsn = InvalidXLogRecPtr;

*************** StreamLogicalLog(void)
*** 365,372 ****
               * response back to the client.
               */
              fd_set        input_mask;
!             int64        message_target = 0;
!             int64        fsync_target = 0;
              struct timeval timeout;
              struct timeval *timeoutptr = NULL;

--- 365,372 ----
               * response back to the client.
               */
              fd_set        input_mask;
!             IntegerTimestamp message_target = {0};
!             IntegerTimestamp fsync_target = {0};
              struct timeval timeout;
              struct timeval *timeoutptr = NULL;

*************** StreamLogicalLog(void)
*** 383,406 ****

              /* Compute when we need to wakeup to send a keepalive message. */
              if (standby_message_timeout)
!                 message_target = last_status + (standby_message_timeout - 1) *
                      ((int64) 1000);

              /* Compute when we need to wakeup to fsync the output file. */
              if (fsync_interval > 0 && output_needs_fsync)
!                 fsync_target = output_last_fsync + (fsync_interval - 1) *
                      ((int64) 1000);

              /* Now compute when to wakeup. */
!             if (message_target > 0 || fsync_target > 0)
              {
!                 int64        targettime;
                  long        secs;
                  int            usecs;

                  targettime = message_target;

!                 if (fsync_target > 0 && fsync_target < targettime)
                      targettime = fsync_target;

                  feTimestampDifference(now,
--- 383,406 ----

              /* Compute when we need to wakeup to send a keepalive message. */
              if (standby_message_timeout)
!                 message_target.integer_ts = last_status.integer_ts + (standby_message_timeout - 1) *
                      ((int64) 1000);

              /* Compute when we need to wakeup to fsync the output file. */
              if (fsync_interval > 0 && output_needs_fsync)
!                 fsync_target.integer_ts = output_last_fsync.integer_ts + (fsync_interval - 1) *
                      ((int64) 1000);

              /* Now compute when to wakeup. */
!             if (message_target.integer_ts > 0 || fsync_target.integer_ts > 0)
              {
!                 IntegerTimestamp targettime;
                  long        secs;
                  int            usecs;

                  targettime = message_target;

!                 if (fsync_target.integer_ts > 0 && fsync_target.integer_ts < targettime.integer_ts)
                      targettime = fsync_target;

                  feTimestampDifference(now,
*************** StreamLogicalLog(void)
*** 622,628 ****

      if (outfd != -1 && strcmp(outfile, "-") != 0)
      {
!         int64        t = feGetCurrentTimestamp();

          /* no need to jump to error on failure here, we're finishing anyway */
          OutputFsync(t);
--- 622,628 ----

      if (outfd != -1 && strcmp(outfile, "-") != 0)
      {
!         IntegerTimestamp t = feGetCurrentTimestamp();

          /* no need to jump to error on failure here, we're finishing anyway */
          OutputFsync(t);
*************** main(int argc, char **argv)
*** 1024,1030 ****
   * feedback.
   */
  static bool
! flushAndSendFeedback(PGconn *conn, TimestampTz *now)
  {
      /* flush data to disk, so that we send a recent flush pointer */
      if (!OutputFsync(*now))
--- 1024,1030 ----
   * feedback.
   */
  static bool
! flushAndSendFeedback(PGconn *conn, IntegerTimestamp *now)
  {
      /* flush data to disk, so that we send a recent flush pointer */
      if (!OutputFsync(*now))
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 5561283..639f739 100644
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
*************** static PGresult *HandleCopyStream(PGconn
*** 42,56 ****
  static int    CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int    CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
!                     int len, XLogRecPtr blockpos, int64 *last_status);
  static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                     XLogRecPtr *blockpos);
  static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
                        XLogRecPtr blockpos, XLogRecPtr *stoppos);
  static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
                      XLogRecPtr *stoppos);
! static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
!                              int64 last_status);

  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                           uint32 *timeline);
--- 42,56 ----
  static int    CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int    CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf,
!                 int len, XLogRecPtr blockpos, IntegerTimestamp *last_status);
  static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
                     XLogRecPtr *blockpos);
  static PGresult *HandleEndOfCopyStream(PGconn *conn, StreamCtl *stream, char *copybuf,
                        XLogRecPtr blockpos, XLogRecPtr *stoppos);
  static bool CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,
                      XLogRecPtr *stoppos);
! static long CalculateCopyStreamSleeptime(IntegerTimestamp now, int standby_message_timeout,
!                              IntegerTimestamp last_status);

  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                           uint32 *timeline);
*************** writeTimeLineHistoryFile(StreamCtl *stre
*** 319,325 ****
   * Send a Standby Status Update message to server.
   */
  static bool
! sendFeedback(PGconn *conn, XLogRecPtr blockpos, int64 now, bool replyRequested)
  {
      char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
      int            len = 0;
--- 319,325 ----
   * Send a Standby Status Update message to server.
   */
  static bool
! sendFeedback(PGconn *conn, XLogRecPtr blockpos, IntegerTimestamp now, bool replyRequested)
  {
      char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
      int            len = 0;
*************** sendFeedback(PGconn *conn, XLogRecPtr bl
*** 335,341 ****
      len += 8;
      fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
      len += 8;
!     fe_sendint64(now, &replybuf[len]);    /* sendTime */
      len += 8;
      replybuf[len] = replyRequested ? 1 : 0;        /* replyRequested */
      len += 1;
--- 335,341 ----
      len += 8;
      fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
      len += 8;
!     fe_sendint64(now.integer_ts, &replybuf[len]);    /* sendTime */
      len += 8;
      replybuf[len] = replyRequested ? 1 : 0;        /* replyRequested */
      len += 1;
*************** HandleCopyStream(PGconn *conn, StreamCtl
*** 761,767 ****
                   XLogRecPtr *stoppos)
  {
      char       *copybuf = NULL;
!     int64        last_status = -1;
      XLogRecPtr    blockpos = stream->startpos;

      still_sending = true;
--- 761,767 ----
                   XLogRecPtr *stoppos)
  {
      char       *copybuf = NULL;
!     IntegerTimestamp last_status = {-1};
      XLogRecPtr    blockpos = stream->startpos;

      still_sending = true;
*************** HandleCopyStream(PGconn *conn, StreamCtl
*** 769,775 ****
      while (1)
      {
          int            r;
!         int64        now;
          long        sleeptime;

          /*
--- 769,775 ----
      while (1)
      {
          int            r;
!         IntegerTimestamp now;
          long        sleeptime;

          /*
*************** CopyStreamReceive(PGconn *conn, long tim
*** 994,1004 ****
   */
  static bool
  ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
!                     XLogRecPtr blockpos, int64 *last_status)
  {
      int            pos;
      bool        replyRequested;
!     int64        now;

      /*
       * Parse the keepalive message, enclosed in the CopyData message. We just
--- 994,1004 ----
   */
  static bool
  ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
!                     XLogRecPtr blockpos, IntegerTimestamp *last_status)
  {
      int            pos;
      bool        replyRequested;
!     IntegerTimestamp now;

      /*
       * Parse the keepalive message, enclosed in the CopyData message. We just
*************** CheckCopyStreamStop(PGconn *conn, Stream
*** 1253,1269 ****
   * Calculate how long send/receive loops should sleep
   */
  static long
! CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
!                              int64 last_status)
  {
!     int64        status_targettime = 0;
      long        sleeptime;

      if (standby_message_timeout && still_sending)
!         status_targettime = last_status +
              (standby_message_timeout - 1) * ((int64) 1000);

!     if (status_targettime > 0)
      {
          long        secs;
          int            usecs;
--- 1253,1269 ----
   * Calculate how long send/receive loops should sleep
   */
  static long
! CalculateCopyStreamSleeptime(IntegerTimestamp now, int standby_message_timeout,
!                              IntegerTimestamp last_status)
  {
!     IntegerTimestamp status_targettime = {0};
      long        sleeptime;

      if (standby_message_timeout && still_sending)
!         status_targettime.integer_ts = last_status.integer_ts +
              (standby_message_timeout - 1) * ((int64) 1000);

!     if (status_targettime.integer_ts > 0)
      {
          long        secs;
          int            usecs;
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 31290d3..81c90d6 100644
*** a/src/bin/pg_basebackup/streamutil.c
--- b/src/bin/pg_basebackup/streamutil.c
*************** DropReplicationSlot(PGconn *conn, const
*** 436,448 ****


  /*
!  * Frontend version of GetCurrentTimestamp(), since we are not linked with
!  * backend code. The replication protocol always uses integer timestamps,
   * regardless of the server setting.
   */
! int64
  feGetCurrentTimestamp(void)
  {
      int64        result;
      struct timeval tp;

--- 436,449 ----


  /*
!  * Frontend version of GetCurrentIntegerTimestamp(), since we are not linked
!  * with backend code. The replication protocol always uses integer timestamps,
   * regardless of the server setting.
   */
! IntegerTimestamp
  feGetCurrentTimestamp(void)
  {
+     IntegerTimestamp ts;
      int64        result;
      struct timeval tp;

*************** feGetCurrentTimestamp(void)
*** 453,459 ****

      result = (result * USECS_PER_SEC) + tp.tv_usec;

!     return result;
  }

  /*
--- 454,461 ----

      result = (result * USECS_PER_SEC) + tp.tv_usec;

!     ts.integer_ts = result;
!     return ts;
  }

  /*
*************** feGetCurrentTimestamp(void)
*** 461,470 ****
   * backend code.
   */
  void
! feTimestampDifference(int64 start_time, int64 stop_time,
                        long *secs, int *microsecs)
  {
!     int64        diff = stop_time - start_time;

      if (diff <= 0)
      {
--- 463,472 ----
   * backend code.
   */
  void
! feTimestampDifference(IntegerTimestamp start_time, IntegerTimestamp stop_time,
                        long *secs, int *microsecs)
  {
!     int64        diff = stop_time.integer_ts - start_time.integer_ts;

      if (diff <= 0)
      {
*************** feTimestampDifference(int64 start_time,
*** 483,493 ****
   * linked with backend code.
   */
  bool
! feTimestampDifferenceExceeds(int64 start_time,
!                              int64 stop_time,
                               int msec)
  {
!     int64        diff = stop_time - start_time;

      return (diff >= msec * INT64CONST(1000));
  }
--- 485,495 ----
   * linked with backend code.
   */
  bool
! feTimestampDifferenceExceeds(IntegerTimestamp start_time,
!                              IntegerTimestamp stop_time,
                               int msec)
  {
!     int64        diff = stop_time.integer_ts - start_time.integer_ts;

      return (diff >= msec * INT64CONST(1000));
  }
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 663bfac..7c1a125 100644
*** a/src/bin/pg_basebackup/streamutil.h
--- b/src/bin/pg_basebackup/streamutil.h
*************** extern bool RunIdentifySystem(PGconn *co
*** 38,48 ****
                    TimeLineID *starttli,
                    XLogRecPtr *startpos,
                    char **db_name);
! extern int64 feGetCurrentTimestamp(void);
! extern void feTimestampDifference(int64 start_time, int64 stop_time,
                        long *secs, int *microsecs);

! extern bool feTimestampDifferenceExceeds(int64 start_time, int64 stop_time,
                               int msec);
  extern void fe_sendint64(int64 i, char *buf);
  extern int64 fe_recvint64(char *buf);
--- 38,50 ----
                    TimeLineID *starttli,
                    XLogRecPtr *startpos,
                    char **db_name);
! extern IntegerTimestamp feGetCurrentTimestamp(void);
! extern void feTimestampDifference(IntegerTimestamp start_time,
!                       IntegerTimestamp stop_time,
                        long *secs, int *microsecs);

! extern bool feTimestampDifferenceExceeds(IntegerTimestamp start_time,
!                              IntegerTimestamp stop_time,
                               int msec);
  extern void fe_sendint64(int64 i, char *buf);
  extern int64 fe_recvint64(char *buf);
diff --git a/src/include/c.h b/src/include/c.h
index 91e5baa..a7c0774 100644
*** a/src/include/c.h
--- b/src/include/c.h
*************** typedef unsigned PG_INT128_TYPE uint128;
*** 346,351 ****
--- 346,362 ----
  #endif

  /*
+  * In some places we prefer to work with timestamps that are integers
+  * regardless of the value of HAVE_INT64_TIMESTAMP.  To avoid coding errors
+  * from confusing such timestamps with Timestamp/TimestampTz, we use this
+  * datatype:
+  */
+ typedef struct IntegerTimestamp
+ {
+     int64        integer_ts;
+ } IntegerTimestamp;
+
+ /*
   * Size
   *        Size of any memory resident object, as returned by sizeof.
   */
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 2618cc4..fb345a0 100644
*** a/src/include/utils/snapmgr.h
--- b/src/include/utils/snapmgr.h
*************** extern PGDLLIMPORT int old_snapshot_thre
*** 51,58 ****

  extern Size SnapMgrShmemSize(void);
  extern void SnapMgrInit(void);
! extern int64 GetSnapshotCurrentTimestamp(void);
! extern int64 GetOldSnapshotThresholdTimestamp(void);

  extern bool FirstSnapshotSet;

--- 51,58 ----

  extern Size SnapMgrShmemSize(void);
  extern void SnapMgrInit(void);
! extern IntegerTimestamp GetSnapshotCurrentTimestamp(void);
! extern IntegerTimestamp GetOldSnapshotThresholdTimestamp(void);

  extern bool FirstSnapshotSet;

*************** extern void DeleteAllExportedSnapshotFil
*** 93,99 ****
  extern bool ThereAreNoPriorRegisteredSnapshots(void);
  extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
                                      Relation relation);
! extern void MaintainOldSnapshotTimeMapping(int64 whenTaken, TransactionId xmin);

  extern char *ExportSnapshot(Snapshot snapshot);

--- 93,99 ----
  extern bool ThereAreNoPriorRegisteredSnapshots(void);
  extern TransactionId TransactionIdLimitedForOldSnapshots(TransactionId recentXmin,
                                      Relation relation);
! extern void MaintainOldSnapshotTimeMapping(IntegerTimestamp whenTaken, TransactionId xmin);

  extern char *ExportSnapshot(Snapshot snapshot);

diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h
index ebd5d5a..66d554b 100644
*** a/src/include/utils/snapshot.h
--- b/src/include/utils/snapshot.h
*************** typedef struct SnapshotData
*** 107,113 ****
      uint32        regd_count;        /* refcount on RegisteredSnapshots */
      pairingheap_node ph_node;    /* link in the RegisteredSnapshots heap */

!     int64        whenTaken;        /* timestamp when snapshot was taken */
      XLogRecPtr    lsn;            /* position in the WAL stream when taken */
  } SnapshotData;

--- 107,113 ----
      uint32        regd_count;        /* refcount on RegisteredSnapshots */
      pairingheap_node ph_node;    /* link in the RegisteredSnapshots heap */

!     IntegerTimestamp whenTaken; /* timestamp when snapshot was taken */
      XLogRecPtr    lsn;            /* position in the WAL stream when taken */
  } SnapshotData;

diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 21651b1..ae9eb1c 100644
*** a/src/include/utils/timestamp.h
--- b/src/include/utils/timestamp.h
*************** extern bool TimestampDifferenceExceeds(T
*** 105,116 ****
   * Prototypes for functions to deal with integer timestamps, when the native
   * format is float timestamps.
   */
  #ifndef HAVE_INT64_TIMESTAMP
! extern int64 GetCurrentIntegerTimestamp(void);
! extern TimestampTz IntegerTimestampToTimestampTz(int64 timestamp);
  #else
! #define GetCurrentIntegerTimestamp()    GetCurrentTimestamp()
! #define IntegerTimestampToTimestampTz(timestamp) (timestamp)
  #endif

  extern TimestampTz time_t_to_timestamptz(pg_time_t tm);
--- 105,118 ----
   * Prototypes for functions to deal with integer timestamps, when the native
   * format is float timestamps.
   */
+ extern IntegerTimestamp GetCurrentIntegerTimestamp(void);
  #ifndef HAVE_INT64_TIMESTAMP
! extern TimestampTz Int64ToTimestampTz(int64 timestamp);
! extern TimestampTz IntegerTimestampToTimestampTz(IntegerTimestamp timestamp);
  #else
! //#define GetCurrentIntegerTimestamp()    GetCurrentTimestamp()
! #define Int64ToTimestampTz(timestamp) (timestamp)
! #define IntegerTimestampToTimestampTz(timestamp) ((timestamp).integer_ts)
  #endif

  extern TimestampTz time_t_to_timestamptz(pg_time_t tm);

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers