Re: logical copy_replication_slot issues
| От | Arseny Sher |
|---|---|
| Тема | Re: logical copy_replication_slot issues |
| Дата | |
| Msg-id | 87imjh7nky.fsf@ars-thinkpad обсуждение исходный текст |
| Ответ на | Re: logical copy_replication_slot issues (Arseny Sher <a.sher@postgrespro.ru>) |
| Ответы |
Re: logical copy_replication_slot issues
|
| Список | pgsql-hackers |
I wrote:
> It looks good to me now.
After lying for some time in my head it reminded me that
CreateInitDecodingContext not only pegs the LSN, but also xmin, so
attached makes a minor comment correction.
While taking a look at the nearby code it seemed weird to me that
GetOldestSafeDecodingTransactionId checks PGXACT->xid, not xmin. Don't
want to investigate this at the moment though, and not for this thread.
Also not for this thread, but I've noticed
pg_copy_logical_replication_slot doesn't allow to change plugin name
which is an omission in my view. It would be useful and trivial to do.
-- cheers, arseny
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2c9d5de6d9..da634bef0e 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -121,7 +121,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
*/
static void
create_logical_replication_slot(char *name, char *plugin,
- bool temporary, XLogRecPtr restart_lsn)
+ bool temporary, XLogRecPtr restart_lsn,
+ bool find_startpoint)
{
LogicalDecodingContext *ctx = NULL;
@@ -139,16 +140,18 @@ create_logical_replication_slot(char *name, char *plugin,
temporary ? RS_TEMPORARY : RS_EPHEMERAL);
/*
- * Create logical decoding context, to build the initial snapshot.
+ * Create logical decoding context to find start point or, if we don't
+ * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
*/
ctx = CreateInitDecodingContext(plugin, NIL,
- false, /* do not build snapshot */
+ false, /* do not build data snapshot */
restart_lsn,
logical_read_local_xlog_page, NULL, NULL,
NULL);
/* build initial snapshot, might take a while */
- DecodingContextFindStartpoint(ctx);
+ if (find_startpoint)
+ DecodingContextFindStartpoint(ctx);
/* don't need the decoding context anymore */
FreeDecodingContext(ctx);
@@ -179,7 +182,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
create_logical_replication_slot(NameStr(*name),
NameStr(*plugin),
temporary,
- InvalidXLogRecPtr);
+ InvalidXLogRecPtr,
+ true);
values[0] = NameGetDatum(&MyReplicationSlot->data.name);
values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
@@ -683,10 +687,19 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
/* Create new slot and acquire it */
if (logical_slot)
+ {
+ /*
+ * WAL required for building snapshot could be removed as we haven't
+ * reserved WAL yet. So we create a new logical replication slot
+ * without building an initial snapshot. A reasonable start point for
+ * decoding will be provided by the source slot.
+ */
create_logical_replication_slot(NameStr(*dst_name),
plugin,
temporary,
- src_restart_lsn);
+ src_restart_lsn,
+ false);
+ }
else
create_physical_replication_slot(NameStr(*dst_name),
true,
@@ -703,6 +716,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
TransactionId copy_xmin;
TransactionId copy_catalog_xmin;
XLogRecPtr copy_restart_lsn;
+ XLogRecPtr copy_confirmed_flush;
bool copy_islogical;
char *copy_name;
@@ -714,6 +728,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
copy_xmin = src->data.xmin;
copy_catalog_xmin = src->data.catalog_xmin;
copy_restart_lsn = src->data.restart_lsn;
+ copy_confirmed_flush = src->data.confirmed_flush;
/* for existence check */
copy_name = pstrdup(NameStr(src->data.name));
@@ -738,6 +753,13 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
NameStr(*src_name)),
errdetail("The source replication slot was modified incompatibly during the copy operation.")));
+ /* The source slot must have a consistent snapshot */
+ if (src_islogical && XLogRecPtrIsInvalid(copy_confirmed_flush))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("cannot copy a logical replication slot that doesn't have confirmed_flush_lsn"),
+ errhint("Retry when the source replication slot creation is finished.")));
+
/* Install copied values again */
SpinLockAcquire(&MyReplicationSlot->mutex);
MyReplicationSlot->effective_xmin = copy_effective_xmin;
@@ -746,6 +768,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
MyReplicationSlot->data.xmin = copy_xmin;
MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
+ MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
SpinLockRelease(&MyReplicationSlot->mutex);
ReplicationSlotMarkDirty();
В списке pgsql-hackers по дате отправления: