diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 5610c07..04d322a 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -494,14 +494,18 @@ HasSubscriptionRelations(Oid subid) /* * Get the relations for the subscription. * - * If rel_type is SUB_REL_KIND_SEQUENCE, get only the sequences. If rel_type is - * SUB_REL_KIND_TABLE, get only the tables. If rel_type is SUB_REL_KIND_ALL, - * get both tables and sequences. + * rel_type: + * If SUB_REL_KIND_SEQUENCE, return only the sequences. + * If SUB_REL_KIND_TABLE, return only the tables. + * If SUB_REL_KIND_ALL, return both tables and sequences. + * + * not_all_relations: * If not_all_relations is true for SUB_REL_KIND_TABLE and SUB_REL_KIND_ALL, * return only the relations that are not in a ready state, otherwise return all * the relations of the subscription. If not_all_relations is true for * SUB_REL_KIND_SEQUENCE, return only the sequences that are in init state, * otherwise return all the sequences of the subscription. + * * The returned list is palloc'ed in the current memory context. */ List * diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d23901a..2f9ff8b 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -879,11 +879,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, * Update the subscription to refresh both the publication and the publication * objects associated with the subscription. * - * If the copy_data parameter is true, the function will set the state - * to "init"; otherwise, it will set the state to "ready". When the - * validate_publications is provided with a publication list, the function - * checks that the specified publications exist on the publisher. If - * refresh_all_sequences is true, it will mark all sequences with "init" state + * If 'copy_data' parameter is true, the function will set the state + * to "init"; otherwise, it will set the state to "ready". + * + * When 'validate_publications' is provided with a publication list, the function + * checks that the specified publications exist on the publisher. + * + * If 'refresh_all_sequences' is true, it will mark all sequences with "init" state * for re-synchronization; otherwise, only the newly added relations and * sequences will be updated based on the copy_data parameter. */ @@ -932,8 +934,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, if (validate_publications) check_publications(wrconn, validate_publications); + /* Get the table list from publisher. */ if (reltype == SUB_REL_KIND_ALL) - /* Get the table list from publisher. */ pubrel_names = fetch_table_list(wrconn, sub->publications); /* Get the sequence list from publisher. */ @@ -1050,9 +1052,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sub_remove_rels[remove_rel_len++].state = state; /* - * Since one sequence sync workers synchronizes all the - * sequences, stop the worker only if relation kind is not - * sequence. + * A single sequence-sync worker synchronizes all sequences, + * so only stop workers when relation kind is not sequence. */ if (relkind != RELKIND_SEQUENCE) logicalrep_worker_stop(sub->oid, relid); @@ -1088,6 +1089,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sub->name, get_rel_relkind(relid) == RELKIND_SEQUENCE ? "sequence" : "table"))); } + /* * In case of REFRESH PUBLICATION SEQUENCES, the existing sequences * should be re-synchronized. diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 17759c2..86be218 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -237,7 +237,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, * Walks the workers array and searches for one that matches given * subscription id and relid. * - * We are only interested in the leader apply worker, table sync worker and + * We are only interested in the leader apply worker, table sync worker, or * sequence sync worker. */ LogicalRepWorker * @@ -877,7 +877,7 @@ logicalrep_launcher_onexit(int code, Datum arg) } /* - * Update the failure time for the sequence sync worker in the subscription's + * Update the failure time of the sequence sync worker in the subscription's * apply worker. * * This function is invoked when the sequence sync worker exits due to a @@ -889,6 +889,7 @@ logicalrep_seqsyncworker_failuretime(int code, Datum arg) LogicalRepWorker *worker; LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + worker = logicalrep_apply_worker_find(MyLogicalRepWorker->subid, true); if (worker) worker->sequencesync_failure_time = GetCurrentTimestamp(); diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index 7b1d071..45782c6 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -49,7 +49,7 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, XLogRecPtr *lsn) /* * In the event of crash we can lose (skip over) as many values as we - * pre-logged. We might get duplicate values in this kind of scenarios. So + * pre-logged. We might get duplicate values in this kind of scenario. So * use (last_value + log_cnt) to avoid it. */ appendStringInfo(&cmd, "SELECT (last_value + log_cnt), page_lsn " @@ -87,7 +87,7 @@ fetch_remote_sequence_data(WalReceiverConn *conn, Oid remoteid, XLogRecPtr *lsn) * Copy existing data of a sequence from publisher. * * Fetch the sequence value from the publisher and set the subscriber sequence - * withe the retreived value. Caller is responsible for locking the local + * withe the retrieved value. Caller is responsible for locking the local * relation. */ static XLogRecPtr @@ -115,9 +115,9 @@ copy_sequence(WalReceiverConn *conn, Relation rel) " AND c.relname = %s", quote_literal_cstr(nspname), quote_literal_cstr(relname)); + res = walrcv_exec(conn, cmd.data, lengthof(tableRow), tableRow); - if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -176,8 +176,9 @@ LogicalRepSyncSequences(void) */ #define MAX_SEQUENCES_SYNC_PER_BATCH 100 - /* Get the sequences that should be synchronized. */ StartTransactionCommand(); + + /* Get the sequences that should be synchronized. */ sequences = GetSubscriptionRelations(subid, SUB_REL_KIND_SEQUENCE, true); /* Allocate the tracking info in a permanent memory context. */ @@ -191,7 +192,6 @@ LogicalRepSyncSequences(void) } MemoryContextSwitchTo(oldctx); - CommitTransactionCommand(); /* Is the use of a password mandatory? */ @@ -272,8 +272,8 @@ LogicalRepSyncSequences(void) table_close(sequence_rel, NoLock); /* - * Verify whether the current batch of sequences is synchronized or if - * there are no remaining sequences to synchronize. + * Have we reached the end of the current batch of sequences, + * or last remaining sequences to synchronize? */ if ((((curr_seq + 1) % MAX_SEQUENCES_SYNC_PER_BATCH) == 0) || (curr_seq + 1) == seq_count) @@ -292,6 +292,7 @@ LogicalRepSyncSequences(void) get_subscription_name(subid, false), get_rel_name(done_seq->relid))); } + /* Commit this batch, and prepare for next batch. */ CommitTransactionCommand(); start_txn = true; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 313e5eb..9f77a78 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -143,6 +143,8 @@ void pg_attribute_noreturn() finish_sync_worker(LogicalRepWorkerType wtype) { + Assert(wtype == WORKERTYPE_TABLESYNC || wtype == WORKERTYPE_SEQUENCESYNC); + /* * Commit any outstanding transaction. This is the usual case, unless * there was nothing to do for the table. @@ -171,7 +173,7 @@ finish_sync_worker(LogicalRepWorkerType wtype) /* Find the leader apply worker and signal it. */ logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid); - /* No need to set the sequence failure time when it is a clean exit */ + /* This is a clean exit, so no need to set a sequence failure time. */ if (wtype == WORKERTYPE_SEQUENCESYNC) cancel_before_shmem_exit(logicalrep_seqsyncworker_failuretime, 0); diff --git a/src/test/subscription/t/034_sequences.pl b/src/test/subscription/t/034_sequences.pl index 8f4871d..ecc17f5 100644 --- a/src/test/subscription/t/034_sequences.pl +++ b/src/test/subscription/t/034_sequences.pl @@ -127,7 +127,6 @@ $result = $node_subscriber->safe_psql( 'postgres', qq( ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION SEQUENCES )); - $node_subscriber->poll_query_until('postgres', $synced_query) or die "Timed out while waiting for subscriber to synchronize data";