SVN Commit by dpage: r4229 - in trunk/pgadmin3/xtra/pgagent: . include
От | svn@pgadmin.org |
---|---|
Тема | SVN Commit by dpage: r4229 - in trunk/pgadmin3/xtra/pgagent: . include |
Дата | |
Msg-id | 200505202149.j4KLnNhW028277@developer.pgadmin.org обсуждение исходный текст |
Список | pgadmin-hackers |
Author: dpage Date: 2005-05-20 22:49:23 +0100 (Fri, 20 May 2005) New Revision: 4229 Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp trunk/pgadmin3/xtra/pgagent/include/job.h trunk/pgadmin3/xtra/pgagent/job.cpp trunk/pgadmin3/xtra/pgagent/pgAgent.cpp Log: Add threading support to pgAgent to allow multiple jobs to start in the same timeslot. Seems to work OK with no memory leaksor sync problems, however more testing is required. Also need to add code to keep track of threads that are createdso they can be killed if the app terminates. Modified: trunk/pgadmin3/xtra/pgagent/connection.cpp =================================================================== --- trunk/pgadmin3/xtra/pgagent/connection.cpp 2005-05-20 15:44:01 UTC (rev 4228) +++ trunk/pgadmin3/xtra/pgagent/connection.cpp 2005-05-20 21:49:23 UTC (rev 4229) @@ -13,6 +13,7 @@ DBconn *DBconn::primaryConn; wxString DBconn::basicConnectString; +static wxMutex s_PoolLock; DBconn::DBconn(const wxString &name) { @@ -64,6 +65,8 @@ DBconn *DBconn::InitConnection(const wxString &connectString) { + wxMutexLocker lock(s_PoolLock); + basicConnectString=connectString; wxString dbname; @@ -96,6 +99,8 @@ DBconn *DBconn::Get(const wxString &dbname) { + wxMutexLocker lock(s_PoolLock); + DBconn *thisConn = primaryConn, *testConn; // find an existing connection @@ -132,50 +137,77 @@ void DBconn::Return() { + wxMutexLocker lock(s_PoolLock); + LogMessage(_("Returning connection to database ") + this->dbname, LOG_DEBUG); inUse = false; } void DBconn::ClearConnections(bool all) { + wxMutexLocker lock(s_PoolLock); + if (all) LogMessage(_("Clearing all connections"), LOG_DEBUG); else LogMessage(_("Clearing inactive connections"), LOG_DEBUG); DBconn *thisConn=primaryConn, *deleteConn; + int total=0, free=0, deleted=0; - // Find the last connection - while (thisConn->next != 0) - thisConn = thisConn->next; + if (thisConn) + { - // Delete connections as required - // If a connection is not in use, delete it, and reset the next and previous - // pointers appropriately. If it is in use, don't touch it. - while (thisConn->prev != 0) - { - if ((!thisConn->inUse) || all) + total++; + + // Find the last connection + while (thisConn->next != 0) { - deleteConn = thisConn; + total++; + + if (!thisConn->inUse) + free++; - thisConn = deleteConn->prev; - - thisConn->next = deleteConn->next; - - if (deleteConn->next) - deleteConn->next->prev = deleteConn->prev; - - delete deleteConn; + thisConn = thisConn->next; } - else + if (!thisConn->inUse) + free++; + + // Delete connections as required + // If a connection is not in use, delete it, and reset the next and previous + // pointers appropriately. If it is in use, don't touch it. + while (thisConn->prev != 0) { - thisConn = thisConn->prev; + if ((!thisConn->inUse) || all) + { + deleteConn = thisConn; + thisConn = deleteConn->prev; + thisConn->next = deleteConn->next; + if (deleteConn->next) + deleteConn->next->prev = deleteConn->prev; + delete deleteConn; + deleted++; + } + else + { + thisConn = thisConn->prev; + } } + + if (all) + { + delete thisConn; + deleted++; + } + + wxString tmp; + tmp.Printf(_("Connection stats: total - %d, free - %d, deleted - %d"), total, free, deleted); + LogMessage(tmp, LOG_DEBUG); + } + else + LogMessage(_("No connections found!"), LOG_DEBUG); - if (all) - delete thisConn; - } Modified: trunk/pgadmin3/xtra/pgagent/include/job.h =================================================================== --- trunk/pgadmin3/xtra/pgagent/include/job.h 2005-05-20 15:44:01 UTC (rev 4228) +++ trunk/pgadmin3/xtra/pgagent/include/job.h 2005-05-20 21:49:23 UTC (rev 4229) @@ -37,11 +37,14 @@ public: JobThread(const wxString &jid); ~JobThread(); + bool Runnable() { return runnable; } virtual void *Entry(); private: wxString jobid; + bool runnable; + Job *job; }; #endif // JOB_H Modified: trunk/pgadmin3/xtra/pgagent/job.cpp =================================================================== --- trunk/pgadmin3/xtra/pgagent/job.cpp 2005-05-20 15:44:01 UTC (rev 4228) +++ trunk/pgadmin3/xtra/pgagent/job.cpp 2005-05-20 21:49:23 UTC (rev 4229) @@ -11,12 +11,16 @@ #include "pgAgent.h" +wxSemaphore *getDb; + Job::Job(DBconn *conn, const wxString &jid) { threadConn=conn; jobid=jid; status=wxT(""); + LogMessage(_("Starting job: ") + jobid, LOG_DEBUG); + int rc=threadConn->ExecuteVoid( wxT("UPDATE pgagent.pga_job SET jobagentid=") + backendPid + wxT(", joblastrun=now() ") wxT(" WHERE jobagentid IS NULL AND jobid=") + jobid); @@ -58,6 +62,8 @@ ); } threadConn->Return(); + + LogMessage(_("Completed job: ") + jobid, LOG_DEBUG); } @@ -117,7 +123,7 @@ stepConn=DBconn::Get(steps->GetString(wxT("jstdbname"))); if (stepConn) { - LogMessage(_("Executing step ") + stepid + _(" on database ") + steps->GetString(wxT("jstdbname")),LOG_DEBUG); + LogMessage(_("Executing step ") + stepid + _(" (part of job ") + jobid + wxT(")"), LOG_DEBUG); rc=stepConn->ExecuteVoid(steps->GetString(wxT("jstcode"))); stepConn->Return(); } @@ -168,7 +174,16 @@ : wxThread(wxTHREAD_DETACHED) { LogMessage(_("Creating job thread for job ") + jid, LOG_DEBUG); + + runnable = false; jobid = jid; + + DBconn *threadConn=DBconn::Get(serviceDBname); + job = new Job(threadConn, jobid); + + if (job->Runnable()) + runnable = true; + } @@ -180,7 +195,11 @@ void *JobThread::Entry() { - LogMessage(_("Running job thread for job ") + jobid, LOG_DEBUG); + if (runnable) + { + job->Execute(); + delete job; + } return(NULL); } \ No newline at end of file Modified: trunk/pgadmin3/xtra/pgagent/pgAgent.cpp =================================================================== --- trunk/pgadmin3/xtra/pgagent/pgAgent.cpp 2005-05-20 15:44:01 UTC (rev 4228) +++ trunk/pgadmin3/xtra/pgagent/pgAgent.cpp 2005-05-20 21:49:23 UTC (rev 4229) @@ -89,32 +89,25 @@ if (res) { - wxString jobid=res->GetString(wxT("jobid")); - delete res; + while(res->HasData()) + { + wxString jobid=res->GetString(wxT("jobid")); - if (jobid != wxT("")) - { - DBconn *threadConn=DBconn::Get(serviceDBname); - Job job(threadConn, jobid); + JobThread *jt = new JobThread(jobid); + + if (jt->Runnable()) + { + jt->Create(); + jt->Run(); + foundJobToExecute = true; + } + res->MoveNext(); - if (job.Runnable()) - { - foundJobToExecute=true; - LogMessage(_("Running job: ") + jobid, LOG_DEBUG); - - // JobThread *jt = new JobThread(jobid); - // jt->Run(); - // jt->Wait(); - - job.Execute(); - LogMessage(_("Completed job: ") + jobid, LOG_DEBUG); - } - } - else - { - LogMessage(_("No jobs to run - sleeping..."), LOG_DEBUG); - WaitAWhile(); - } + } + + delete res; + LogMessage(_("Sleeping..."), LOG_DEBUG); + WaitAWhile(); } else {
В списке pgadmin-hackers по дате отправления: