Re: [HACKERS] make async slave to wait for lsn to be replayed
| От | Kyotaro Horiguchi |
|---|---|
| Тема | Re: [HACKERS] make async slave to wait for lsn to be replayed |
| Дата | |
| Msg-id | 20210121.173009.235021120161403875.horikyota.ntt@gmail.com обсуждение исходный текст |
| Ответ на | Re: [HACKERS] make async slave to wait for lsn to be replayed (a.pervushina@postgrespro.ru) |
| Ответы |
Re: [HACKERS] make async slave to wait for lsn to be replayed
|
| Список | pgsql-hackers |
Hello.
At Wed, 18 Nov 2020 15:05:00 +0300, a.pervushina@postgrespro.ru wrote in
> I've changed the BEGIN WAIT FOR LSN statement to core functions
> pg_waitlsn, pg_waitlsn_infinite and pg_waitlsn_no_wait.
> Currently the functions work inside repeatable read transactions, but
> waitlsn creates a snapshot if called first in a transaction block,
> which can possibly lead the transaction to working incorrectly, so the
> function gives a warning.
According to the discuttion here, implementing as functions is not
optimal. As a Poc, I made it as a procedure. However I'm not sure it
is the correct implement as a native procedure but it seems working as
expected.
> Usage examples
> ==========
> select pg_waitlsn(‘LSN’, timeout);
> select pg_waitlsn_infinite(‘LSN’);
> select pg_waitlsn_no_wait(‘LSN’);
The first and second usage is coverd by a single procedure. The last
function is equivalent to pg_last_wal_replay_lsn(). As the result, the
following procedure is provided in the attached.
pg_waitlsn(wait_lsn pg_lsn, timeout integer DEFAULT -1)
Any opinions mainly compared to implementation as a command?
regards.
--
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 470e113b33..4283b98eb4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -42,6 +42,7 @@
#include "catalog/pg_database.h"
#include "commands/progress.h"
#include "commands/tablespace.h"
+#include "commands/wait.h"
#include "common/controldata_utils.h"
#include "executor/instrument.h"
#include "miscadmin.h"
@@ -7463,6 +7464,15 @@ StartupXLOG(void)
break;
}
+ /*
+ * If we replayed an LSN that someone was waiting for,
+ * set latches in shared memory array to notify the waiter.
+ */
+ if (XLogCtl->lastReplayedEndRecPtr >= GetMinWaitedLSN())
+ {
+ WaitSetLatch(XLogCtl->lastReplayedEndRecPtr);
+ }
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, LOG, false);
} while (record != NULL);
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index fa58afd9d7..c19d49e7a4 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1460,6 +1460,10 @@ LANGUAGE internal
STRICT IMMUTABLE PARALLEL SAFE
AS 'unicode_is_normalized';
+CREATE OR REPLACE PROCEDURE
+ pg_waitlsn(wait_lsn pg_lsn, timeout integer DEFAULT -1)
+ LANGUAGE internal AS 'pg_waitlsn';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index e8504f0ae4..2c0bd41336 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -60,6 +60,7 @@ OBJS = \
user.o \
vacuum.o \
variable.o \
- view.o
+ view.o \
+ wait.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index f9bbe97b50..959e96b7e0 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -23,6 +23,7 @@
#include "access/syncscan.h"
#include "access/twophase.h"
#include "commands/async.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, WaitShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -268,6 +270,11 @@ CreateSharedMemoryAndSemaphores(void)
SyncScanShmemInit();
AsyncShmemInit();
+ /*
+ * Init array of events for the wait clause in shared memory
+ */
+ WaitShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index c87ffc6549..2b4d73ba2f 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -38,6 +38,7 @@
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
+#include "commands/wait.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -713,6 +714,9 @@ LockErrorCleanup(void)
AbortStrongLockAcquire();
+ /* If waitlsn was interrupted, then stop waiting for that LSN */
+ DeleteWaitedLSN();
+
/* Nothing to do if we weren't waiting for a lock */
if (lockAwaited == NULL)
{
diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c
index 4096faff9a..90876da120 100644
--- a/src/backend/utils/adt/misc.c
+++ b/src/backend/utils/adt/misc.c
@@ -373,8 +373,6 @@ pg_sleep(PG_FUNCTION_ARGS)
* less than the specified time when WaitLatch is terminated early by a
* non-query-canceling signal such as SIGHUP.
*/
-#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
-
endtime = GetNowFloat() + secs;
for (;;)
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b5f52d4e4a..918eaedfd5 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11375,4 +11375,8 @@
proname => 'is_normalized', prorettype => 'bool', proargtypes => 'text text',
prosrc => 'unicode_is_normalized' },
+{ oid => '9313', descr => 'wait for LSN to be replayed',
+ proname => 'pg_waitlsn', prokind => 'p',prorettype => 'void', proargtypes => 'pg_lsn int4',
+ proargnames => '{wait_lsn,timeout}',
+ prosrc => 'pg_waitlsn' }
]
diff --git a/src/include/utils/timestamp.h b/src/include/utils/timestamp.h
index 63bf71ac61..6c4ecd704d 100644
--- a/src/include/utils/timestamp.h
+++ b/src/include/utils/timestamp.h
@@ -113,4 +113,6 @@ extern int date2isoyearday(int year, int mon, int mday);
extern bool TimestampTimestampTzRequiresRewrite(void);
+#define GetNowFloat() ((float8) GetCurrentTimestamp() / 1000000.0)
+
#endif /* TIMESTAMP_H */
В списке pgsql-hackers по дате отправления: