Re: Multiplexing SUGUSR1

Поиск
Список
Период
Сортировка
От Greg Stark
Тема Re: Multiplexing SUGUSR1
Дата
Msg-id 0791A9FC-DBBF-4CDD-880A-D7B151295A54@enterprisedb.com
обсуждение исходный текст
Ответ на Multiplexing SUGUSR1  (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>)
Ответы Re: Multiplexing SUGUSR1  (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>)
Список pgsql-hackers
Does this signal multiplexing solve the "out of signals" problem we  
have generally? I need another signal for the progress indicator too.  
Or is this only useful for other users which need the same locks or  
other resources?

-- 
Greg


On 8 Dec 2008, at 08:04, Heikki Linnakangas <heikki.linnakangas@enterprisedb.com > wrote:

> I've been looking at the signal handling part of the synchronous  
> replication patch. It looks OK, but one thing makes me worry.
>
> To set or clear the flag from PGPROC, to send or handle a signal, we  
> have to acquire ProcArrayLock. Is that safe to do in a signal  
> handler? And is the performance impact of that acceptable?
>
>
> Another observation is that the patch introduces a new function  
> called SendProcSignal. Nothing wrong with that, except that there's  
> an existing function called ProcSendSignal, just above  
> SendProcSignal, so there's some potential for confusion. The old  
> ProcSendSignal function uses the per-backend semaphore to wake up a  
> backend. It's only used to wait for the cleanup lock in bufmgr.c.  
> I'm tempted to remove that altogether, and use the new signal  
> multiplexing for that too, but OTOH if it works, maybe I shouldn't  
> touch it.
>
> Attached is a patch with some minor changes I've made. Mostly  
> cosmetic, but I did modify the sinval code so that ProcState has  
> PGPROC pointer instead of backend pid, so that we don't need to  
> search the ProcArray to find the PGPROC struct of the backend we're  
> signaling.
>
> -- 
>  Heikki Linnakangas
>  EnterpriseDB   http://www.enterprisedb.com
> *** a/src/backend/access/transam/twophase.c --- b/src/backend/access/ 
> transam/twophase.c *************** *** 287,292 ****  
> MarkAsPreparing(TransactionId xid, const char *gid, --- 287,293 ----  
> gxact->proc.databaseId = databaseid; gxact->proc.roleId = owner;  
> gxact->proc.inCommit = false; + gxact->proc.signalFlags = 0;        
> gxact->proc.vacuumFlags = 0; gxact->proc.lwWaiting = false; gxact- 
> >proc.lwExclusive = false; *** a/src/backend/commands/async.c --- b/ 
> src/backend/commands/async.c *************** *** 915,923 ****  
> EnableNotifyInterrupt(void) *     a frontend command. Signal handler  
> execution of inbound notifies *     is disabled until the next  
> EnableNotifyInterrupt call. * ! *     The SIGUSR1 signal handler also  
> needs to call this, so as to ! *     prevent conflicts if one signal  
> interrupts the other. So we ! *     must return the previous state of  
> the flag. */ bool DisableNotifyInterrupt(void) --- 915,924 ---- *     a  
> frontend command. Signal handler execution of inbound notifies *     is  
> disabled until the next EnableNotifyInterrupt call. * ! *     This also  
> needs to be called when SIGUSR1 with ! * PROCSIG_CATCHUP_INTERRUPT  
> is received, so as to prevent conflicts  ! *     if one signal  
> interrupts the other. So we must return the previous ! * state of  
> the flag. */ bool DisableNotifyInterrupt(void) *************** ***  
> 954,960 **** ProcessIncomingNotify(void) nulls[Natts_pg_listener];  
> bool catchup_enabled; ! /* Must prevent SIGUSR1 interrupt while I am  
> running */ catchup_enabled = DisableCatchupInterrupt(); if  
> (Trace_notify) --- 955,961 ---- nulls[Natts_pg_listener]; bool  
> catchup_enabled; ! /* Must prevent catchup interrupt while I am  
> running */ catchup_enabled = DisableCatchupInterrupt(); if  
> (Trace_notify) *** a/src/backend/postmaster/autovacuum.c --- b/src/ 
> backend/postmaster/autovacuum.c *************** *** 1477,1483 ****  
> AutoVacWorkerMain(int argc, char *argv[]) pqsignal(SIGALRM,  
> handle_sig_alarm); pqsignal(SIGPIPE, SIG_IGN); ! pqsignal(SIGUSR1,  
> CatchupInterruptHandler); /* We don't listen for async notifies */  
> pqsignal(SIGUSR2, SIG_IGN); pqsignal(SIGFPE, FloatExceptionHandler);  
> --- 1477,1483 ---- pqsignal(SIGALRM, handle_sig_alarm);  
> pqsignal(SIGPIPE, SIG_IGN); ! pqsignal(SIGUSR1,  
> proc_sigusr1_handler); /* We don't listen for async notifies */  
> pqsignal(SIGUSR2, SIG_IGN); pqsignal(SIGFPE, FloatExceptionHandler);  
> *** a/src/backend/storage/ipc/sinval.c --- b/src/backend/storage/ipc/ 
> sinval.c *************** *** 27,33 **** * need a way to give an idle  
> backend a swift kick in the rear and make * it catch up before the  
> sinval queue overflows and forces it to go * through a cache reset  
> exercise.    This is done by sending SIGUSR1 ! * to any backend that  
> gets too far behind. * * State for catchup events consists of two  
> flags: one saying whether * the signal handler is currently allowed  
> to call ProcessCatchupEvent --- 27,34 ---- * need a way to give an  
> idle backend a swift kick in the rear and make * it catch up before  
> the sinval queue overflows and forces it to go * through a cache  
> reset exercise.    This is done by sending SIGUSR1 ! * with  
> PROCSIG_CATCHUP_INTERRUPT to any backend that gets too far ! *  
> behind. * * State for catchup events consists of two flags: one  
> saying whether * the signal handler is currently allowed to call  
> ProcessCatchupEvent *************** *** 144,152 ****  
> ReceiveSharedInvalidMessages( /* !  * CatchupInterruptHandler * ! *  
> This is the signal handler for SIGUSR1. *    * If we are idle  
> (catchupInterruptEnabled is set), we can safely * invoke  
> ProcessCatchupEvent directly. Otherwise, just set a flag --- 145,154  
> ---- /* ! * HandleCatchupInterrupt * ! * This is called when SIGUSR1  
> with PROCSIG_CATCHUP_INTERRUPT is ! * received. * * If we are idle  
> (catchupInterruptEnabled is set), we can safely * invoke  
> ProcessCatchupEvent directly. Otherwise, just set a flag  
> *************** *** 156,168 **** ReceiveSharedInvalidMessages( *  
> since there's no longer any reason to do anything.) */ void !  
> CatchupInterruptHandler(SIGNAL_ARGS) { - int     save_errno = errno; - / 
> * !      * Note: this is a SIGNAL HANDLER.    You must be very wary what  
> you do !      * here. */ /* Don't joggle the elbow of proc_exit */ ---  
> 158,168 ---- * since there's no longer any reason to do anything.)  
> */ void ! HandleCatchupInterrupt(void) { /* ! * Note: this is called  
> by a SIGNAL HANDLER.     ! * You must be very wary what you do here.  
> */ /* Don't joggle the elbow of proc_exit */ *************** ***  
> 216,223 **** CatchupInterruptHandler(SIGNAL_ARGS) */  
> catchupInterruptOccurred = 1; } - - errno = save_errno; } /* ---  
> 216,221 ---- *************** *** 289,295 ****  
> DisableCatchupInterrupt(void) /* * ProcessCatchupEvent * ! * Respond  
> to a catchup event (SIGUSR1) from another backend. * * This is  
> called either directly from the SIGUSR1 signal handler, * or the  
> next time control reaches the outer idle loop (assuming --- 287,294  
> ---- /* * ProcessCatchupEvent * ! * Respond to a catchup event  
> (SIGUSR1 with PROCSIG_CATCHUP_INTERRUPT) ! * from another backend. *  
> * This is called either directly from the SIGUSR1 signal handler, *  
> or the next time control reaches the outer idle loop (assuming *** a/ 
> src/backend/storage/ipc/sinvaladt.c --- b/src/backend/storage/ipc/ 
> sinvaladt.c *************** *** 21,26 **** --- 21,27 ---- #include  
> "storage/backendid.h" #include "storage/ipc.h" #include "storage/ 
> proc.h" + #include "storage/procarray.h" #include "storage/shmem.h"  
> #include "storage/sinvaladt.h" #include "storage/spin.h"  
> *************** *** 136,144 **** /* Per-backend state in shared  
> invalidation structure */ typedef struct ProcState { ! /* procPid is  
> zero in an inactive ProcState array entry. */ ! pid_t     procPid;     /*  
> PID of backend, for signaling */ ! /* nextMsgNum is meaningless if  
> procPid == 0 or resetState is true. */       int     nextMsgNum;     /* next  
> message number to read */ bool     resetState;     /* backend needs to  
> reset its state */ bool     signaled;     /* backend has been sent catchup  
> signal */ --- 137,145 ---- /* Per-backend state in shared  
> invalidation structure */ typedef struct ProcState { ! /* proc is  
> NULL in an inactive ProcState array entry. */ ! PGPROC     *proc;     /*  
> PGPROC entry of backend, for signaling */ ! /* nextMsgNum is
> meaningless if proc == NULL or resetState is true. */ int  
> nextMsgNum;     /* next message number to read */ bool     resetState;     /*  
> backend needs to reset its state */ bool     signaled;     /* backend has  
> been sent catchup signal */ *************** *** 235,241 ****  
> CreateSharedInvalidationState(void)       /* Mark all backends  
> inactive, and initialize nextLXID */ for (i = 0; i < shmInvalBuffer- 
> >maxBackends; i++)       { ! shmInvalBuffer->procState[i].procPid =  
> 0;     /* inactive */ shmInvalBuffer->procState[i].nextMsgNum = 0;     /*  
> meaningless */ shmInvalBuffer->procState[i].resetState = false;  
> shmInvalBuffer->procState[i].signaled = false; --- 236,242 ---- /*  
> Mark all backends inactive, and initialize nextLXID */ for (i = 0; i  
> < shmInvalBuffer->maxBackends; i++)       { ! shmInvalBuffer- 
> >procState[i].proc = NULL;     /* inactive */ shmInvalBuffer- 
> >procState[i].nextMsgNum = 0;     /* meaningless */ shmInvalBuffer- 
> >procState[i].resetState = false; shmInvalBuffer- 
> >procState[i].signaled = false; *************** *** 266,272 ****  
> SharedInvalBackendInit(void) /* Look for a free entry in the  
> procState array */ for (index = 0; index < segP->lastBackend; index+ 
> +) { ! if (segP->procState[index].procPid == 0)     /* inactive slot?  
> */ { stateP = &segP->procState[index]; break; --- 267,273 ---- /*  
> Look for a free entry in the procState array */ for (index = 0;  
> index < segP->lastBackend; index++) { ! if (segP- 
> >procState[index].proc == NULL)     /* inactive slot? */ { stateP =  
> &segP->procState[index]; break; *************** *** 278,284 ****  
> SharedInvalBackendInit(void) if (segP->lastBackend < segP- 
> >maxBackends) { stateP = &segP->procState[segP->lastBackend]; !  
> Assert(stateP->procPid == 0); segP->lastBackend++; } else ---  
> 279,285 ---- if (segP->lastBackend < segP->maxBackends) { stateP =  
> &segP->procState[segP->lastBackend]; ! Assert(stateP->proc == NULL);  
> segP->lastBackend++; } else *************** *** 303,309 ****  
> SharedInvalBackendInit(void) nextLocalTransactionId = stateP- 
> >nextLXID; /* mark myself active, with all extant messages already  
> read */ ! stateP->procPid = MyProcPid; stateP->nextMsgNum = segP- 
> >maxMsgNum; stateP->resetState = false; stateP->signaled = false;  
> --- 304,310 ---- nextLocalTransactionId = stateP->nextLXID; /* mark  
> myself active, with all extant messages already read */ ! stateP- 
> >proc = MyProc; stateP->nextMsgNum = segP->maxMsgNum; stateP- 
> >resetState = false; stateP->signaled = false; *************** ***  
> 341,347 **** CleanupInvalidationState(int status, Datum arg) stateP- 
> >nextLXID = nextLocalTransactionId; /* Mark myself inactive */ !  
> stateP->procPid = 0; stateP->nextMsgNum = 0; stateP->resetState =  
> false; stateP->signaled = false; --- 342,348 ---- stateP->nextLXID =  
> nextLocalTransactionId;          /* Mark myself inactive */ ! stateP- 
> >proc = NULL; stateP->nextMsgNum = 0; stateP->resetState = false;  
> stateP->signaled = false; *************** *** 349,355 ****  
> CleanupInvalidationState(int status, Datum arg) /* Recompute index  
> of last active backend */ for (i = segP->lastBackend; i > 0; i--)  
> { ! if (segP->procState[i - 1].procPid != 0) break;       } segP- 
> >lastBackend = i; --- 350,356 ---- /* Recompute index of last active  
> backend */ for (i = segP->lastBackend; i > 0; i--) { ! if (segP- 
> >procState[i - 1].proc != NULL) break; } segP->lastBackend = i;  
> *************** *** 374,380 **** BackendIdIsActive(int backendID)  
> { ProcState *stateP = &segP->procState[backendID - 1]; ! result =  
> (stateP->procPid != 0); } else result = false; --- 375,381 ----  
> { ProcState *stateP = &segP->procState[backendID - 1]; ! result =  
> (stateP->proc != NULL); } else result = false; *************** ***  
> 590,596 **** SICleanupQueue(bool callerHasWriteLock, int minFree)  
> int n = stateP->nextMsgNum; /* Ignore if inactive or already in  
> reset state */ ! if (stateP->procPid == 0 || stateP->resetState)  
> continue; /* --- 591,597 ---- int     n = stateP->nextMsgNum; /* Ignore  
> if inactive or already in reset state */ ! if (stateP->proc == NULL  
> || stateP->resetState) continue; /* *************** *** 644,661 ****  
> SICleanupQueue(bool callerHasWriteLock, int minFree) segP- 
> >nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; / 
> * ! * Lastly, signal anyone who needs a catchup interrupt. Since  
> kill() ! * might not be fast, we don't want to hold locks while  
> executing it. */ if (needSig) { ! pid_t    his_pid = needSig->procPid;  
> needSig->signaled = true; LWLockRelease(SInvalReadLock);  
> LWLockRelease(SInvalWriteLock); ! elog(DEBUG4, "sending sinval  
> catchup signal to PID %d", (int) his_pid); ! kill(his_pid, SIGUSR1);  
> if (callerHasWriteLock) LWLockAcquire(SInvalWriteLock,  
> LW_EXCLUSIVE); } --- 645,664 ---- segP->nextThreshold = (numMsgs /  
> CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; /* ! * Lastly, signal anyone  
> who needs a catchup interrupt. Since ! * SendProcSignal() might not  
> be fast, we don't want to hold locks while ! * executing it. */ if  
> (needSig) { ! PGPROC *his_proc = needSig->proc; needSig->signaled =  
> true; LWLockRelease(SInvalReadLock);  
> LWLockRelease(SInvalWriteLock); ! elog(DEBUG4, "sending sinval  
> catchup signal to PID %d", ! (int) his_proc->pid); !  
> SendProcSignal(his_proc, PROCSIG_CATCHUP_INTERRUPT); if  
> (callerHasWriteLock) LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); }  
> *** a/src/backend/storage/lmgr/proc.c --- b/src/backend/storage/lmgr/ 
> proc.c *************** *** 289,294 **** InitProcess(void) ---  
> 289,295 ---- MyProc->databaseId = InvalidOid; MyProc->roleId =  
> InvalidOid; MyProc->inCommit = false; + MyProc->signalFlags = 0;  
> MyProc->vacuumFlags = 0; if (IsAutoVacuumWorkerProcess()) MyProc- 
> >vacuumFlags |= PROC_IS_AUTOVACUUM; *************** *** 428,433 ****  
> InitAuxiliaryProcess(void) --- 429,435 ---- MyProc->databaseId =  
> InvalidOid; MyProc->roleId = InvalidOid; MyProc->inCommit = false; +  
> MyProc->signalFlags = 0; /* we don't set the "is autovacuum" flag in  
> the launcher */ MyProc->vacuumFlags = 0; MyProc->lwWaiting = false;  
> *************** *** 1277,1282 **** ProcSendSignal(int pid) ---  
> 1279,1330 ---- PGSemaphoreUnlock(&proc->sem); } + /* + *  
> SendProcSignal - send the signal with the reason to the process + *  
> (such as backend, autovacuum worker and auxiliary process) + *  
> identified by proc. + */ + void + SendProcSignal(PGPROC *proc, uint8  
> reason) + { + int pid; + + if (proc == NULL) + return; + +  
> LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + pid = proc->pid; + if  
> (pid != 0) + proc->signalFlags |= reason; +  
> LWLockRelease(ProcArrayLock); + +     /* Send SIGUSR1 to the process */  
> +     kill(pid, SIGUSR1); + } + + /* + * CheckProcSignal - check to see  
> if the particular reason has been + * signaled, and clear the signal  
> flag. Should be called after + * receiving SIGUSR1. + */ + bool +  
> CheckProcSignal(uint8 reason) + { +     LWLockAcquire(ProcArrayLock,  
> LW_EXCLUSIVE); + + /* Careful here --- don't clear flag if we  
> haven't seen it set */ + if (MyProc->signalFlags & reason) + { +  
> MyProc->signalFlags &= ~reason; + LWLockRelease(ProcArrayLock); +  
> return true; + } + + LWLockRelease(ProcArrayLock); + +     return  
> false; + } + / 
> *** 
> *** 
> *** 
> ********************************************************************  
> * SIGALRM interrupt support *** a/src/backend/tcop/postgres.c --- b/ 
> src/backend/tcop/postgres.c *************** *** 2437,2442 ****  
> drop_unnamed_stmt(void) --- 2437,2464 ---- */ /* + *  
> proc_sigusr1_handler - handle SIGUSR1 signal. + * + * SIGUSR1 is  
> multiplexed to handle multiple different events. The signalFlags + *  
> bitmask in PGPROC indicates which events have been signaled. + */ +  
> void + proc_sigusr1_handler(SIGNAL_ARGS) + { + int     save_errno =  
> errno; + + if (CheckProcSignal(PROCSIG_CATCHUP_INTERRUPT)) + { + /*  
> + * Catchup interrupt has been sent. + */ +
> HandleCatchupInterrupt(); + } + + errno = save_errno; + } + + /* *  
> quickdie() occurs when signalled SIGQUIT by the postmaster. * * Some  
> backend has bought the farm, *************** *** 3180,3186 ****  
> PostgresMain(int argc, char *argv[], const char *username) * of  
> output during who-knows-what operation... */ pqsignal(SIGPIPE,  
> SIG_IGN); ! pqsignal(SIGUSR1, CatchupInterruptHandler);  
> pqsignal(SIGUSR2, NotifyInterruptHandler); pqsignal(SIGFPE,  
> FloatExceptionHandler); --- 3202,3208 ---- * of output during who- 
> knows-what operation... */ pqsignal(SIGPIPE, SIG_IGN); !  
> pqsignal(SIGUSR1, proc_sigusr1_handler); pqsignal(SIGUSR2,  
> NotifyInterruptHandler); pqsignal(SIGFPE, FloatExceptionHandler);  
> *** a/src/include/storage/proc.h --- b/src/include/storage/proc.h  
> *************** *** 38,43 **** struct XidCache --- 38,46 ----  
> TransactionId xids[PGPROC_MAX_CACHED_SUBXIDS]; }; + /* Signals for  
> PGPROC->signalFlags */ + #define PROCSIG_CATCHUP_INTERRUPT    0x01    /*  
> catchup interrupt */ + /* Flags for PGPROC->vacuumFlags */ #define      
> PROC_IS_AUTOVACUUM    0x01    /* is it an autovac worker? */ #define      
> PROC_IN_VACUUM     0x02    /* currently running lazy vacuum */  
> *************** *** 91,96 **** struct PGPROC --- 94,100 ---- bool  
> inCommit;     /* true if within commit critical section */ + uint8      
> signalFlags;    /* bitmask of signals raised, see above */ uint8      
> vacuumFlags;    /* vacuum-related flags, see above */ /* Info about  
> LWLock the process is currently waiting for, if any. */  
> *************** *** 171,176 **** extern void LockWaitCancel(void);  
> --- 175,183 ---- extern void ProcWaitForSignal(void); extern void  
> ProcSendSignal(int pid); + extern void SendProcSignal(PGPROC *proc,  
> uint8 reason); + extern bool CheckProcSignal(uint8 reason); + extern  
> bool enable_sig_alarm(int delayms, bool is_statement_timeout);    
> extern bool disable_sig_alarm(bool is_statement_timeout); extern  
> void handle_sig_alarm(SIGNAL_ARGS); *** a/src/include/storage/ 
> sinval.h --- b/src/include/storage/sinval.h *************** ***  
> 90,96 **** extern void ReceiveSharedInvalidMessages( void  
> (*resetFunction) (void)); /* signal handler for catchup events  
> (SIGUSR1) */ ! extern void CatchupInterruptHandler(SIGNAL_ARGS); /*  
> * enable/disable processing of catchup events directly from signal  
> handler. --- 90,96 ---- void (*resetFunction) (void)); /* signal  
> handler for catchup events (SIGUSR1) */ ! extern void  
> HandleCatchupInterrupt(void); /* * enable/disable processing of  
> catchup events directly from signal handler. *** a/src/include/tcop/ 
> tcopprot.h --- b/src/include/tcop/tcopprot.h *************** ***  
> 56,61 **** extern List *pg_plan_queries(List *querytrees, int  
> cursorOptions, --- 56,62 ---- extern bool assign_max_stack_depth(int  
> newval, bool doit, GucSource source); + extern void  
> proc_sigusr1_handler(SIGNAL_ARGS); extern void die(SIGNAL_ARGS);    
> extern void quickdie(SIGNAL_ARGS); extern void authdie(SIGNAL_ARGS);
>
> -- 
> Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
> To make changes to your subscription:
> http://www.postgresql.org/mailpref/pgsql-hackers


В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andrew Gierth
Дата:
Сообщение: Regexps vs. locale
Следующее
От: Heikki Linnakangas
Дата:
Сообщение: Re: Multiplexing SUGUSR1