Обсуждение: Background worker with Listen

Поиск
Список
Период
Сортировка

Background worker with Listen

От
Ihnat Peter | TSS Group a.s.
Дата:

I am trying to create background worker which listens to notifications and do some work after receiving one.

I got 2 problems:

-          Worker is receiving notifications from every channel not only the registered channel (in my case “foo”)

-          Notifications are not logged in the server log – I cannot store the payloads for further work

Any help is welcomed.

 

Here is the code:

PG_MODULE_MAGIC;

 

void _PG_init(void);

void _PG_fini(void);

 

static volatile sig_atomic_t got_sigterm = false;

static volatile sig_atomic_t got_sigusr1 = false;

static char *notify_database = NULL;

static emit_log_hook_type prev_log_hook = NULL;

                                                

static void

bgw_sigterm(SIGNAL_ARGS)

{

                int save_errno = errno;

                got_sigterm = true;

                if (MyProc)

                                SetLatch(&MyProc->procLatch);

                errno = save_errno;

}

 

static void

bgw_sigusr1(SIGNAL_ARGS)

{

                int save_errno = errno;

                got_sigusr1 = true;

                if (MyProc)

                                SetLatch(&MyProc->procLatch);

               errno = save_errno;

}     

 

static void

notify_main(Datum main_arg)

{

                pqsignal(SIGTERM, bgw_sigterm);

                pqsignal(SIGUSR1, bgw_sigusr1);

 

                BackgroundWorkerUnblockSignals();

                BackgroundWorkerInitializeConnection(notify_database, NULL);

                 EnableNotifyInterrupt();

 

                pgstat_report_activity(STATE_RUNNING, "background_worker");

                StartTransactionCommand();

                Async_Listen("foo");

                CommitTransactionCommand();

                pgstat_report_activity(STATE_IDLE, NULL);

  

                while (!got_sigterm)

                {

                                int           rc;

 

                                rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 10000L);

                                ResetLatch(&MyProc->procLatch);

 

                                if (rc & WL_POSTMASTER_DEATH)

                                                proc_exit(1);

      

                                if (got_sigusr1)

                                {

                                                got_sigusr1 = false;

                                                elog(INFO, " background_worker: notification received");

                                                // DO SOME WORK WITH STORED NOTIFICATIONS

                                }            

          

                }

 

                elog(LOG, "background_worker: finished");

                proc_exit(0);

}

 

static void

store_notification(ErrorData *edata)

{

                // HERE STORE THE NOTIFICATION FROM SERVER LOG

 

                if (prev_log_hook)

                                (*prev_log_hook) (edata);

}

 

void

_PG_init(void)

{

                BackgroundWorker worker;

                DefineCustomStringVariable("postgres", NULL, NULL, &notify_database,

                                           "postgres",

                                           PGC_POSTMASTER, 0, NULL, NULL, NULL);

 

                MemSet(&worker, 0, sizeof(BackgroundWorker));

                snprintf(worker.bgw_name, BGW_MAXLEN, "background_worker");

                worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;

                worker.bgw_start_time = BgWorkerStart_RecoveryFinished;

                worker.bgw_main = notify_main;

                worker.bgw_restart_time = 10;

                worker.bgw_main_arg = (Datum) 0;

                worker.bgw_notify_pid = 0;

                RegisterBackgroundWorker(&worker);

   

                prev_log_hook = emit_log_hook;

                emit_log_hook = store_notification;

}

 

void

_PG_fini(void)

{

                emit_log_hook = prev_log_hook;

}

Re: Background worker with Listen

От
Peter Kardoš
Дата:
Hi Peter!

The solution to this problem would be also interesting for me. We have application which use sending data to background worker and it's look like the asynchronous notification can be ideal solution. But we do not found solution how to work with this notifications. Worker calculating data and update some tables. Amount of data is too big and therefore we can’t calculate and update tables only in fixed period.
If the notification can’t be used for this, then it is possible to use other method how to send data to worker or wake up worker?


Best regards,
Peter K.


From: Ihnat Peter | TSS Group a(dot)s(dot) <Ihnat(at)TSSGROUP(dot)sk>>
>> I got 2 problems:
>>
>> -          Worker is receiving notifications from every channel not only the registered channel (in my case "foo")
>>
>> -          Notifications are not logged in the server log - I cannot store the payloads for further work
>> Any help is welcomed.
>>
>> Here is the code:
>>
>> PG_MODULE_MAGIC;
>>
>> void _PG_init(void);
>> void _PG_fini(void);
>>
>> static volatile sig_atomic_t got_sigterm = false;
>> static volatile sig_atomic_t got_sigusr1 = false;
>> static char *notify_database = NULL;
>> static emit_log_hook_type prev_log_hook = NULL;
>>
>> static void
>> bgw_sigterm(SIGNAL_ARGS)
>> {
>>                 int save_errno = errno;
>>                 got_sigterm = true;
>>                 if (MyProc)
>>                                 SetLatch(&MyProc->procLatch);
>>                 errno = save_errno;
>> }
>>
>> static void
>> bgw_sigusr1(SIGNAL_ARGS)
>> {
>>                 int save_errno = errno;
>>                 got_sigusr1 = true;
>>                 if (MyProc)
>>                                 SetLatch(&MyProc->procLatch);
>>                errno = save_errno;
>> }
>>
>> static void
>> notify_main(Datum main_arg)
>> {
>>                 pqsignal(SIGTERM, bgw_sigterm);
>>                 pqsignal(SIGUSR1, bgw_sigusr1);
>>
>>                 BackgroundWorkerUnblockSignals();
>>                 BackgroundWorkerInitializeConnection(notify_database, NULL);
>>                  EnableNotifyInterrupt();
>>
>>                 pgstat_report_activity(STATE_RUNNING, "background_worker");
>>                 StartTransactionCommand();
>>                 Async_Listen("foo");
>>                 CommitTransactionCommand();
>>                 pgstat_report_activity(STATE_IDLE, NULL);
>>
>>                 while (!got_sigterm)
>>                 {
>>                                 int           rc;
>>
>>                                 rc = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 10000L);
>>                                 ResetLatch(&MyProc->procLatch);
>>
>>                                 if (rc & WL_POSTMASTER_DEATH)
>>                                                 proc_exit(1);
>>
>>                                 if (got_sigusr1)
>>                                 {
>>                                                 got_sigusr1 = false;
>>                                                 elog(INFO, " background_worker: notification received");
>>                                                 // DO SOME WORK WITH STORED NOTIFICATIONS
>>                                 }
>>
>>                 }
>>
>>                 elog(LOG, "background_worker: finished");
>>                 proc_exit(0);
>> }
>>
>> static void
>> store_notification(ErrorData *edata)
>> {
>>                 // HERE STORE THE NOTIFICATION FROM SERVER LOG
>>
>>                 if (prev_log_hook)
>>                                 (*prev_log_hook) (edata);
>> }
>>
>> void
>> _PG_init(void)
>> {
>>                 BackgroundWorker worker;
>>                 DefineCustomStringVariable("postgres", NULL, NULL, &notify_database,
>>                                            "postgres",
>>                                            PGC_POSTMASTER, 0, NULL, NULL, NULL);
>>
>>                 MemSet(&worker, 0, sizeof(BackgroundWorker));
>>                 snprintf(worker.bgw_name, BGW_MAXLEN, "background_worker");
>>                 worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
>>                 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
>>                 worker.bgw_main = notify_main;
>>                 worker.bgw_restart_time = 10;
>>                 worker.bgw_main_arg = (Datum) 0;
>>                 worker.bgw_notify_pid = 0;
>>                 RegisterBackgroundWorker(&worker);
>>
>>                 prev_log_hook = emit_log_hook;
>>                 emit_log_hook = store_notification;
>> }
>>
>> void
>> _PG_fini(void)
>> {
>>                 emit_log_hook = prev_log_hook;
>> }


Re: Background worker with Listen

От
Jinhua Luo
Дата:
Why not use libpq in worker? i.e. your worker works just like a pure PG client.

In my project, I uses worker in this way and it works well. I do not
use any backend API to access the database.

2016-04-21 15:51 GMT+08:00 Ihnat Peter | TSS Group a.s. <Ihnat@tssgroup.sk>:
> I am trying to create background worker which listens to notifications and
> do some work after receiving one.
>
> I got 2 problems:
>
> -          Worker is receiving notifications from every channel not only the
> registered channel (in my case “foo”)
>
> -          Notifications are not logged in the server log – I cannot store
> the payloads for further work
>
> Any help is welcomed.
>
>
>
> Here is the code:
>
> PG_MODULE_MAGIC;
>
>
>
> void _PG_init(void);
>
> void _PG_fini(void);
>
>
>
> static volatile sig_atomic_t got_sigterm = false;
>
> static volatile sig_atomic_t got_sigusr1 = false;
>
> static char *notify_database = NULL;
>
> static emit_log_hook_type prev_log_hook = NULL;
>
>
>
> static void
>
> bgw_sigterm(SIGNAL_ARGS)
>
> {
>
>                 int save_errno = errno;
>
>                 got_sigterm = true;
>
>                 if (MyProc)
>
>                                 SetLatch(&MyProc->procLatch);
>
>                 errno = save_errno;
>
> }
>
>
>
> static void
>
> bgw_sigusr1(SIGNAL_ARGS)
>
> {
>
>                 int save_errno = errno;
>
>                 got_sigusr1 = true;
>
>                 if (MyProc)
>
>                                 SetLatch(&MyProc->procLatch);
>
>                errno = save_errno;
>
> }
>
>
>
> static void
>
> notify_main(Datum main_arg)
>
> {
>
>                 pqsignal(SIGTERM, bgw_sigterm);
>
>                 pqsignal(SIGUSR1, bgw_sigusr1);
>
>
>
>                 BackgroundWorkerUnblockSignals();
>
>                 BackgroundWorkerInitializeConnection(notify_database, NULL);
>
>                  EnableNotifyInterrupt();
>
>
>
>                 pgstat_report_activity(STATE_RUNNING, "background_worker");
>
>                 StartTransactionCommand();
>
>                 Async_Listen("foo");
>
>                 CommitTransactionCommand();
>
>                 pgstat_report_activity(STATE_IDLE, NULL);
>
>
>
>                 while (!got_sigterm)
>
>                 {
>
>                                 int           rc;
>
>
>
>                                 rc = WaitLatch(&MyProc->procLatch,
> WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 10000L);
>
>                                 ResetLatch(&MyProc->procLatch);
>
>
>
>                                 if (rc & WL_POSTMASTER_DEATH)
>
>                                                 proc_exit(1);
>
>
>
>                                 if (got_sigusr1)
>
>                                 {
>
>                                                 got_sigusr1 = false;
>
>                                                 elog(INFO, "
> background_worker: notification received");
>
>                                                 // DO SOME WORK WITH STORED
> NOTIFICATIONS
>
>                                 }
>
>
>
>                 }
>
>
>
>                 elog(LOG, "background_worker: finished");
>
>                 proc_exit(0);
>
> }
>
>
>
> static void
>
> store_notification(ErrorData *edata)
>
> {
>
>                 // HERE STORE THE NOTIFICATION FROM SERVER LOG
>
>
>
>                 if (prev_log_hook)
>
>                                 (*prev_log_hook) (edata);
>
> }
>
>
>
> void
>
> _PG_init(void)
>
> {
>
>                 BackgroundWorker worker;
>
>                 DefineCustomStringVariable("postgres", NULL, NULL,
> ¬ify_database,
>
>                                            "postgres",
>
>                                            PGC_POSTMASTER, 0, NULL, NULL,
> NULL);
>
>
>
>                 MemSet(&worker, 0, sizeof(BackgroundWorker));
>
>                 snprintf(worker.bgw_name, BGW_MAXLEN, "background_worker");
>
>                 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
> BGWORKER_BACKEND_DATABASE_CONNECTION;
>
>                 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
>
>                 worker.bgw_main = notify_main;
>
>                 worker.bgw_restart_time = 10;
>
>                 worker.bgw_main_arg = (Datum) 0;
>
>                 worker.bgw_notify_pid = 0;
>
>                 RegisterBackgroundWorker(&worker);
>
>
>
>                 prev_log_hook = emit_log_hook;
>
>                 emit_log_hook = store_notification;
>
> }
>
>
>
> void
>
> _PG_fini(void)
>
> {
>
>                 emit_log_hook = prev_log_hook;
>
> }


Re: Background worker with Listen

От
Pavel Stehule
Дата:


2016-04-26 11:17 GMT+02:00 Jinhua Luo <luajit.io@gmail.com>:
Why not use libpq in worker? i.e. your worker works just like a pure PG client.

there must be some overhead from using client API on server side.

Regards

Pavel
 

In my project, I uses worker in this way and it works well. I do not
use any backend API to access the database.

2016-04-21 15:51 GMT+08:00 Ihnat Peter | TSS Group a.s. <Ihnat@tssgroup.sk>:
> I am trying to create background worker which listens to notifications and
> do some work after receiving one.
>
> I got 2 problems:
>
> -          Worker is receiving notifications from every channel not only the
> registered channel (in my case “foo”)
>
> -          Notifications are not logged in the server log – I cannot store
> the payloads for further work
>
> Any help is welcomed.
>
>
>
> Here is the code:
>
> PG_MODULE_MAGIC;
>
>
>
> void _PG_init(void);
>
> void _PG_fini(void);
>
>
>
> static volatile sig_atomic_t got_sigterm = false;
>
> static volatile sig_atomic_t got_sigusr1 = false;
>
> static char *notify_database = NULL;
>
> static emit_log_hook_type prev_log_hook = NULL;
>
>
>
> static void
>
> bgw_sigterm(SIGNAL_ARGS)
>
> {
>
>                 int save_errno = errno;
>
>                 got_sigterm = true;
>
>                 if (MyProc)
>
>                                 SetLatch(&MyProc->procLatch);
>
>                 errno = save_errno;
>
> }
>
>
>
> static void
>
> bgw_sigusr1(SIGNAL_ARGS)
>
> {
>
>                 int save_errno = errno;
>
>                 got_sigusr1 = true;
>
>                 if (MyProc)
>
>                                 SetLatch(&MyProc->procLatch);
>
>                errno = save_errno;
>
> }
>
>
>
> static void
>
> notify_main(Datum main_arg)
>
> {
>
>                 pqsignal(SIGTERM, bgw_sigterm);
>
>                 pqsignal(SIGUSR1, bgw_sigusr1);
>
>
>
>                 BackgroundWorkerUnblockSignals();
>
>                 BackgroundWorkerInitializeConnection(notify_database, NULL);
>
>                  EnableNotifyInterrupt();
>
>
>
>                 pgstat_report_activity(STATE_RUNNING, "background_worker");
>
>                 StartTransactionCommand();
>
>                 Async_Listen("foo");
>
>                 CommitTransactionCommand();
>
>                 pgstat_report_activity(STATE_IDLE, NULL);
>
>
>
>                 while (!got_sigterm)
>
>                 {
>
>                                 int           rc;
>
>
>
>                                 rc = WaitLatch(&MyProc->procLatch,
> WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 10000L);
>
>                                 ResetLatch(&MyProc->procLatch);
>
>
>
>                                 if (rc & WL_POSTMASTER_DEATH)
>
>                                                 proc_exit(1);
>
>
>
>                                 if (got_sigusr1)
>
>                                 {
>
>                                                 got_sigusr1 = false;
>
>                                                 elog(INFO, "
> background_worker: notification received");
>
>                                                 // DO SOME WORK WITH STORED
> NOTIFICATIONS
>
>                                 }
>
>
>
>                 }
>
>
>
>                 elog(LOG, "background_worker: finished");
>
>                 proc_exit(0);
>
> }
>
>
>
> static void
>
> store_notification(ErrorData *edata)
>
> {
>
>                 // HERE STORE THE NOTIFICATION FROM SERVER LOG
>
>
>
>                 if (prev_log_hook)
>
>                                 (*prev_log_hook) (edata);
>
> }
>
>
>
> void
>
> _PG_init(void)
>
> {
>
>                 BackgroundWorker worker;
>
>                 DefineCustomStringVariable("postgres", NULL, NULL,
> &notify_database,
>
>                                            "postgres",
>
>                                            PGC_POSTMASTER, 0, NULL, NULL,
> NULL);
>
>
>
>                 MemSet(&worker, 0, sizeof(BackgroundWorker));
>
>                 snprintf(worker.bgw_name, BGW_MAXLEN, "background_worker");
>
>                 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
> BGWORKER_BACKEND_DATABASE_CONNECTION;
>
>                 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
>
>                 worker.bgw_main = notify_main;
>
>                 worker.bgw_restart_time = 10;
>
>                 worker.bgw_main_arg = (Datum) 0;
>
>                 worker.bgw_notify_pid = 0;
>
>                 RegisterBackgroundWorker(&worker);
>
>
>
>                 prev_log_hook = emit_log_hook;
>
>                 emit_log_hook = store_notification;
>
> }
>
>
>
> void
>
> _PG_fini(void)
>
> {
>
>                 emit_log_hook = prev_log_hook;
>
> }


--
Sent via pgsql-general mailing list (pgsql-general@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-general

Re: Background worker with Listen

От
Ihnat Peter | TSS Group a.s.
Дата:

I don’t want to create client to do some work in the DB if the DB can do it itself on server side much faster.

The situation is that some data are inserted in to many tables and if these data need to be post processed a notification is generated. Then the worker needs to wake up and do its stuff.

Maybe this can be done in a different way than listening for notifications?

 

From: Pavel Stehule [mailto:pavel.stehule@gmail.com]
Sent: Tuesday, April 26, 2016 11:22 AM
To: Jinhua Luo
Cc: Ihnat Peter | TSS Group a.s.; pgsql-general@postgresql.org
Subject: Re: [GENERAL] Background worker with Listen

 

 

 

2016-04-26 11:17 GMT+02:00 Jinhua Luo <luajit.io@gmail.com>:

Why not use libpq in worker? i.e. your worker works just like a pure PG client.

 

there must be some overhead from using client API on server side.

Regards

Pavel

 


In my project, I uses worker in this way and it works well. I do not
use any backend API to access the database.

2016-04-21 15:51 GMT+08:00 Ihnat Peter | TSS Group a.s. <Ihnat@tssgroup.sk>:
> I am trying to create background worker which listens to notifications and
> do some work after receiving one.
>
> I got 2 problems:
>
> -          Worker is receiving notifications from every channel not only the
> registered channel (in my case “foo”)
>
> -          Notifications are not logged in the server log – I cannot store

> the payloads for further work
>
> Any help is welcomed.
>
>
>
> Here is the code:
>
> PG_MODULE_MAGIC;
>
>
>
> void _PG_init(void);
>
> void _PG_fini(void);
>
>
>
> static volatile sig_atomic_t got_sigterm = false;
>
> static volatile sig_atomic_t got_sigusr1 = false;
>
> static char *notify_database = NULL;
>
> static emit_log_hook_type prev_log_hook = NULL;
>
>
>
> static void
>
> bgw_sigterm(SIGNAL_ARGS)
>
> {
>
>                 int save_errno = errno;
>
>                 got_sigterm = true;
>
>                 if (MyProc)
>
>                                 SetLatch(&MyProc->procLatch);
>
>                 errno = save_errno;
>
> }
>
>
>
> static void
>
> bgw_sigusr1(SIGNAL_ARGS)
>
> {
>
>                 int save_errno = errno;
>
>                 got_sigusr1 = true;
>
>                 if (MyProc)
>
>                                 SetLatch(&MyProc->procLatch);
>
>                errno = save_errno;
>
> }
>
>
>
> static void
>
> notify_main(Datum main_arg)
>
> {
>
>                 pqsignal(SIGTERM, bgw_sigterm);
>
>                 pqsignal(SIGUSR1, bgw_sigusr1);
>
>
>
>                 BackgroundWorkerUnblockSignals();
>
>                 BackgroundWorkerInitializeConnection(notify_database, NULL);
>
>                  EnableNotifyInterrupt();
>
>
>
>                 pgstat_report_activity(STATE_RUNNING, "background_worker");
>
>                 StartTransactionCommand();
>
>                 Async_Listen("foo");
>
>                 CommitTransactionCommand();
>
>                 pgstat_report_activity(STATE_IDLE, NULL);
>
>
>
>                 while (!got_sigterm)
>
>                 {
>
>                                 int           rc;
>
>
>
>                                 rc = WaitLatch(&MyProc->procLatch,
> WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 10000L);
>
>                                 ResetLatch(&MyProc->procLatch);
>
>
>
>                                 if (rc & WL_POSTMASTER_DEATH)
>
>                                                 proc_exit(1);
>
>
>
>                                 if (got_sigusr1)
>
>                                 {
>
>                                                 got_sigusr1 = false;
>
>                                                 elog(INFO, "
> background_worker: notification received");
>
>                                                 // DO SOME WORK WITH STORED
> NOTIFICATIONS
>
>                                 }
>
>
>
>                 }
>
>
>
>                 elog(LOG, "background_worker: finished");
>
>                 proc_exit(0);
>
> }
>
>
>
> static void
>
> store_notification(ErrorData *edata)
>
> {
>
>                 // HERE STORE THE NOTIFICATION FROM SERVER LOG
>
>
>
>                 if (prev_log_hook)
>
>                                 (*prev_log_hook) (edata);
>
> }
>
>
>
> void
>
> _PG_init(void)
>
> {
>
>                 BackgroundWorker worker;
>
>                 DefineCustomStringVariable("postgres", NULL, NULL,
> &notify_database,
>
>                                            "postgres",
>
>                                            PGC_POSTMASTER, 0, NULL, NULL,
> NULL);
>
>
>
>                 MemSet(&worker, 0, sizeof(BackgroundWorker));
>
>                 snprintf(worker.bgw_name, BGW_MAXLEN, "background_worker");
>
>                 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
> BGWORKER_BACKEND_DATABASE_CONNECTION;
>
>                 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
>
>                 worker.bgw_main = notify_main;
>
>                 worker.bgw_restart_time = 10;
>
>                 worker.bgw_main_arg = (Datum) 0;
>
>                 worker.bgw_notify_pid = 0;
>
>                 RegisterBackgroundWorker(&worker);
>
>
>
>                 prev_log_hook = emit_log_hook;
>
>                 emit_log_hook = store_notification;
>
> }
>
>
>
> void
>
> _PG_fini(void)
>
> {
>
>                 emit_log_hook = prev_log_hook;
>
> }

--
Sent via pgsql-general mailing list (pgsql-general@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-general

 

Re: Background worker with Listen

От
John R Pierce
Дата:
On 4/27/2016 1:40 AM, Ihnat Peter | TSS Group a.s. wrote:

I don’t want to create client to do some work in the DB if the DB can do it itself on server side much faster.

The situation is that some data are inserted in to many tables and if these data need to be post processed a notification is generated. Then the worker needs to wake up and do its stuff.

Maybe this can be done in a different way than listening for notifications?


that could be done with a trigger.



-- 
john r pierce, recycling bits in santa cruz

Re: Background worker with Listen

От
Ihnat Peter | TSS Group a.s.
Дата:

I don’t want to create client to do some work in the DB if the DB can do it itself on server side much faster.

The situation is that some data are inserted in to many tables and if these data need to be post processed a notification is generated. Then the worker needs to wake up and do its stuff.

Maybe this can be done in a different way than listening for notifications?

 

 

From: Pavel Stehule [mailto:pavel.stehule@gmail.com]
Sent: Tuesday, April 26, 2016 11:22 AM
To: Jinhua Luo
Cc: Ihnat Peter | TSS Group a.s.; pgsql-general@postgresql.org
Subject: Re: [GENERAL] Background worker with Listen

 

 

 

2016-04-26 11:17 GMT+02:00 Jinhua Luo <luajit.io@gmail.com>:

Why not use libpq in worker? i.e. your worker works just like a pure PG client.

 

there must be some overhead from using client API on server side.

Regards

Pavel

 


In my project, I uses worker in this way and it works well. I do not
use any backend API to access the database.

2016-04-21 15:51 GMT+08:00 Ihnat Peter | TSS Group a.s. <Ihnat@tssgroup.sk>:
> I am trying to create background worker which listens to notifications and
> do some work after receiving one.
>
> I got 2 problems:
>
> -          Worker is receiving notifications from every channel not only the
> registered channel (in my case “foo”)
>
> -          Notifications are not logged in the server log – I cannot store

> the payloads for further work
>
> Any help is welcomed.
>
>
>
> Here is the code:
>
> PG_MODULE_MAGIC;
>
>
>
> void _PG_init(void);
>
> void _PG_fini(void);
>
>
>
> static volatile sig_atomic_t got_sigterm = false;
>
> static volatile sig_atomic_t got_sigusr1 = false;
>
> static char *notify_database = NULL;
>
> static emit_log_hook_type prev_log_hook = NULL;
>
>
>
> static void
>
> bgw_sigterm(SIGNAL_ARGS)
>
> {
>
>                 int save_errno = errno;
>
>                 got_sigterm = true;
>
>                 if (MyProc)
>
>                                 SetLatch(&MyProc->procLatch);
>
>                 errno = save_errno;
>
> }
>
>
>
> static void
>
> bgw_sigusr1(SIGNAL_ARGS)
>
> {
>
>                 int save_errno = errno;
>
>                 got_sigusr1 = true;
>
>                 if (MyProc)
>
>                                 SetLatch(&MyProc->procLatch);
>
>                errno = save_errno;
>
> }
>
>
>
> static void
>
> notify_main(Datum main_arg)
>
> {
>
>                 pqsignal(SIGTERM, bgw_sigterm);
>
>                 pqsignal(SIGUSR1, bgw_sigusr1);
>
>
>
>                 BackgroundWorkerUnblockSignals();
>
>                 BackgroundWorkerInitializeConnection(notify_database, NULL);
>
>                  EnableNotifyInterrupt();
>
>
>
>                 pgstat_report_activity(STATE_RUNNING, "background_worker");
>
>                 StartTransactionCommand();
>
>                 Async_Listen("foo");
>
>                 CommitTransactionCommand();
>
>                 pgstat_report_activity(STATE_IDLE, NULL);
>
>
>
>                 while (!got_sigterm)
>
>                 {
>
>                                 int           rc;
>
>
>
>                                 rc = WaitLatch(&MyProc->procLatch,
> WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 10000L);
>
>                                 ResetLatch(&MyProc->procLatch);
>
>
>
>                                 if (rc & WL_POSTMASTER_DEATH)
>
>                                                 proc_exit(1);
>
>
>
>                                 if (got_sigusr1)
>
>                                 {
>
>                                                 got_sigusr1 = false;
>
>                                                 elog(INFO, "
> background_worker: notification received");
>
>                                                 // DO SOME WORK WITH STORED
> NOTIFICATIONS
>
>                                 }
>
>
>
>                 }
>
>
>
>                 elog(LOG, "background_worker: finished");
>
>                 proc_exit(0);
>
> }
>
>
>
> static void
>
> store_notification(ErrorData *edata)
>
> {
>
>                 // HERE STORE THE NOTIFICATION FROM SERVER LOG
>
>
>
>                 if (prev_log_hook)
>
>                                 (*prev_log_hook) (edata);
>
> }
>
>
>
> void
>
> _PG_init(void)
>
> {
>
>                 BackgroundWorker worker;
>
>                 DefineCustomStringVariable("postgres", NULL, NULL,
> &notify_database,
>
>                                            "postgres",
>
>                                            PGC_POSTMASTER, 0, NULL, NULL,
> NULL);
>
>
>
>                 MemSet(&worker, 0, sizeof(BackgroundWorker));
>
>                 snprintf(worker.bgw_name, BGW_MAXLEN, "background_worker");
>
>                 worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
> BGWORKER_BACKEND_DATABASE_CONNECTION;
>
>                 worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
>
>                 worker.bgw_main = notify_main;
>
>                 worker.bgw_restart_time = 10;
>
>                 worker.bgw_main_arg = (Datum) 0;
>
>                 worker.bgw_notify_pid = 0;
>
>                 RegisterBackgroundWorker(&worker);
>
>
>
>                 prev_log_hook = emit_log_hook;
>
>                 emit_log_hook = store_notification;
>
> }
>
>
>
> void
>
> _PG_fini(void)
>
> {
>
>                 emit_log_hook = prev_log_hook;
>
> }

--
Sent via pgsql-general mailing list (pgsql-general@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-general