Обсуждение: Allow substitute allocators for PGresult.
Hello. This message is a proposal of a pair of patches that enables the memory allocator for PGresult in libpq to be replaced. The comment at the the begging of pqexpbuffer.c says that libpq should not rely on palloc(). Besides, Tom Lane said that palloc should not be visible outside the backend(*1) and I agree with it. *1: http://archives.postgresql.org/pgsql-hackers/1999-02/msg00364.php On the other hand, in dblink, dblink-plus (our product!), and maybe FDW's connect to other PostgreSQL servers are seem to copy the result of the query contained in PGresult into tuple store. I guess that this is in order to avoid memory leakage on termination in halfway. But it is rather expensive to copy whole PGresult, and the significance grows as the data received gets larger. Furthermore, it requires about twice as much memory as the net size of the data. And it is fruitless to copy'n modify libpq or reinvent it from scratch. So we shall be happy to be able to use palloc's in libpq at least for PGresult for such case in spite of the policy. For these reasons, I propose to make allocators for PGresult replaceable. The modifications are made up into two patches. 1. dupEvents() and pqAddTuple() get new memory block by malloc currently, but the aquired memory block is linked into PGresultfinally. So I think it is preferable to use pqResultAlloc() or its descendents in consistensy with the nature ofthe place to link. But there is not PQresultRealloc() and it will be costly, so pqAddTuple() is not modified in this patch. 2. Define three function pointers PQpgresult_(malloc|realloc|free) and replace the calls to malloc/realloc/free in thefour functions below with these pointers. PQmakeEmptyPGresult() pqResultAlloc() PQclear() pqAddTuple() This patches make the tools run in backend process and use libpq possible to handle PGresult as it is with no copy, no more memory. (Of cource, someone wants to use his/her custom allocator forPGresult on standalone tools could do that using this feature.) Three files are attached to this message. First, the patch with respect to "1" above. Second, the patch with respect to "2" above. Third, a very simple sample program. I have built and briefly tested on CentOS6, with the sample program mentioned above and valgrind, but not on Windows. How do you think about this? Regards, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 113aab0..8e32b18 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -49,7 +49,7 @@ static int static_client_encoding = PG_SQL_ASCII;static bool static_std_strings = false; -static PGEvent *dupEvents(PGEvent *events, int count); +static PGEvent *dupEvents(PGresult *res, PGEvent *events, int count);static bool PQsendQueryStart(PGconn *conn);static intPQsendQueryGuts(PGconn *conn, const char *command, @@ -186,7 +186,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) /* copy events last; result must be validif we need to PQclear */ if (conn->nEvents > 0) { - result->events = dupEvents(conn->events, conn->nEvents); + result->events = dupEvents(result, conn->events, conn->nEvents); if (!result->events) { PQclear(result); @@ -337,7 +337,7 @@ PQcopyResult(const PGresult *src, int flags) /* Wants to copy PGEvents? */ if ((flags & PG_COPYRES_EVENTS)&& src->nEvents > 0) { - dest->events = dupEvents(src->events, src->nEvents); + dest->events = dupEvents(dest, dest->events, src->nEvents); if (!dest->events) { PQclear(dest); @@ -374,7 +374,7 @@ PQcopyResult(const PGresult *src, int flags) * Also, the resultInitialized flags are all cleared. */staticPGEvent * -dupEvents(PGEvent *events, int count) +dupEvents(PGresult *res, PGEvent *events, int count){ PGEvent *newEvents; int i; @@ -382,7 +382,7 @@ dupEvents(PGEvent *events, int count) if (!events || count <= 0) return NULL; - newEvents = (PGEvent *) malloc(count * sizeof(PGEvent)); + newEvents = (PGEvent *) pqResultAlloc(res, count * sizeof(PGEvent), TRUE); if (!newEvents) return NULL; @@ -392,14 +392,9 @@ dupEvents(PGEvent *events, int count) newEvents[i].passThrough = events[i].passThrough; newEvents[i].data = NULL; newEvents[i].resultInitialized = FALSE; - newEvents[i].name = strdup(events[i].name); + newEvents[i].name = pqResultStrdup(res, events[i].name); if (!newEvents[i].name) - { - while (--i >= 0) - free(newEvents[i].name); - free(newEvents); return NULL; - } } return newEvents; @@ -661,12 +656,8 @@ PQclear(PGresult *res) (void) res->events[i].proc(PGEVT_RESULTDESTROY, &evt, res->events[i].passThrough); } - free(res->events[i].name); } - if (res->events) - free(res->events); - /* Free all the subsidiary blocks */ while ((block = res->curBlock) != NULL) { diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1af8df6..3b26c7c 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,3 +160,6 @@ PQconnectStartParams 157PQping 158PQpingParams 159PQlibVersion 160 +PQpgresult_malloc 161 +PQpgresult_realloc 162 +PQpgresult_free 163 diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 8e32b18..a574848 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -67,6 +67,15 @@ static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target);staticint check_field_number(const PGresult *res, int field_num); +/* --- + * malloc/realloc/free for PGResult is replasable for in-backend use + * Note that the events having the event id PGEVT_RESULTDESTROY won't + * fire when you free the memory blocks for PGresult without + * PQclear(). + */ +void *(*PQpgresult_malloc)(size_t size) = malloc; +void *(*PQpgresult_realloc)(void *ptr, size_t size) = realloc; +void (*PQpgresult_free)(void *ptr) = free;/* ---------------- * Space management for PGresult. @@ -138,7 +147,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status){ PGresult *result; - result = (PGresult *) malloc(sizeof(PGresult)); + result = (PGresult *) PQpgresult_malloc(sizeof(PGresult)); if (!result) return NULL; @@ -536,7 +545,8 @@ pqResultAlloc(PGresult *res, size_t nBytes, bool isBinary) */ if (nBytes >= PGRESULT_SEP_ALLOC_THRESHOLD) { - block = (PGresult_data *) malloc(nBytes + PGRESULT_BLOCK_OVERHEAD); + block = + (PGresult_data *) PQpgresult_malloc(nBytes + PGRESULT_BLOCK_OVERHEAD); if (!block) returnNULL; space = block->space + PGRESULT_BLOCK_OVERHEAD; @@ -560,7 +570,7 @@ pqResultAlloc(PGresult *res, size_t nBytes, bool isBinary) } /* Otherwise, start a new block. */ - block = (PGresult_data *) malloc(PGRESULT_DATA_BLOCKSIZE); + block = (PGresult_data *) PQpgresult_malloc(PGRESULT_DATA_BLOCKSIZE); if (!block) return NULL; block->next= res->curBlock; @@ -662,12 +672,12 @@ PQclear(PGresult *res) while ((block = res->curBlock) != NULL) { res->curBlock = block->next; - free(block); + PQpgresult_free(block); } /* Free the top-level tuple pointer array */ if (res->tuples) - free(res->tuples); + PQpgresult_free(res->tuples); /* zero out the pointer fields to catch programming errors */ res->attDescs= NULL; @@ -679,7 +689,7 @@ PQclear(PGresult *res) /* res->curBlock was zeroed out earlier */ /* Free the PGresult structureitself */ - free(res); + PQpgresult_free(res);}/* @@ -844,10 +854,11 @@ pqAddTuple(PGresult *res, PGresAttValue *tup) if (res->tuples == NULL) newTuples= (PGresAttValue **) - malloc(newSize * sizeof(PGresAttValue *)); + PQpgresult_malloc(newSize * sizeof(PGresAttValue *)); else newTuples = (PGresAttValue**) - realloc(res->tuples, newSize * sizeof(PGresAttValue *)); + PQpgresult_realloc(res->tuples, + newSize * sizeof(PGresAttValue *)); if (!newTuples) return FALSE; /* malloc or realloc failed */ res->tupArrSize = newSize; diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index d13a5b9..c958df1 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -226,6 +226,14 @@ typedef struct pgresAttDesc} PGresAttDesc;/* ---------------- + * malloc/realloc/free for PGResult is replasable for in-backend use + * ---------------- + */ +extern void *(*PQpgresult_malloc)(size_t size); +extern void *(*PQpgresult_realloc)(void *ptr, size_t size); +extern void (*PQpgresult_free)(void *ptr); + +/* ---------------- * Exported functions of libpq * ---------------- */
On 11.11.2011 11:18, Kyotaro HORIGUCHI wrote: > The comment at the the begging of pqexpbuffer.c says that libpq > should not rely on palloc(). Besides, Tom Lane said that palloc > should not be visible outside the backend(*1) and I agree with > it. > > *1: http://archives.postgresql.org/pgsql-hackers/1999-02/msg00364.php > > On the other hand, in dblink, dblink-plus (our product!), and > maybe FDW's connect to other PostgreSQL servers are seem to copy > the result of the query contained in PGresult into tuple store. I > guess that this is in order to avoid memory leakage on > termination in halfway. > > But it is rather expensive to copy whole PGresult, and the > significance grows as the data received gets larger. Furthermore, > it requires about twice as much memory as the net size of the > data. And it is fruitless to copy'n modify libpq or reinvent it > from scratch. So we shall be happy to be able to use palloc's in > libpq at least for PGresult for such case in spite of the policy. > > > For these reasons, I propose to make allocators for PGresult > replaceable. You could use the resource owner mechanism to track them. Register a callback function with RegisterResourceReleaseCallback(). Whenever a PGresult is returned from libpq, add it to e.g a linked list, kept in TopMemoryContext, and also store a reference to CurrentResourceOwner in the list element. In the callback function, scan through the list and free all the PGresults associated with the resource owner that's being released. -- Heikki Linnakangas EnterpriseDB http://www.enterprisedb.com
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes: > On 11.11.2011 11:18, Kyotaro HORIGUCHI wrote: >> The comment at the the begging of pqexpbuffer.c says that libpq >> should not rely on palloc(). Besides, Tom Lane said that palloc >> should not be visible outside the backend(*1) and I agree with >> it. >> >> *1: http://archives.postgresql.org/pgsql-hackers/1999-02/msg00364.php >> >> On the other hand, in dblink, dblink-plus (our product!), and >> maybe FDW's connect to other PostgreSQL servers are seem to copy >> the result of the query contained in PGresult into tuple store. I >> guess that this is in order to avoid memory leakage on >> termination in halfway. >> >> But it is rather expensive to copy whole PGresult, and the >> significance grows as the data received gets larger. Furthermore, >> it requires about twice as much memory as the net size of the >> data. And it is fruitless to copy'n modify libpq or reinvent it >> from scratch. So we shall be happy to be able to use palloc's in >> libpq at least for PGresult for such case in spite of the policy. >> >> For these reasons, I propose to make allocators for PGresult >> replaceable. > You could use the resource owner mechanism to track them. Heikki's idea is probably superior so far as PG backend usage is concerned in isolation, but I wonder if there are scenarios where a client application would like to be able to manage libpq's allocations. If so, Kyotaro-san's approach would solve more problems than just dblink's. However, the bigger picture here is that I think Kyotaro-san's desire to not have dblink return a tuplestore may be misplaced. Tuplestores can spill to disk, while PGresults don't; so the larger the result, the more important it is to push it into a tuplestore and PQclear it as soon as possible. Despite that worry, it'd likely be a good idea to adopt one or the other of these solutions anyway, because I think there are corner cases where dblink.c can leak a PGresult --- for instance, what if dblink_res_error fails due to out-of-memory before reaching PQclear? And we could get rid of the awkward and none-too-cheap PG_TRY blocks that it uses to try to defend against such leaks in other places. So I'm in favor of making a change along that line, although I'd want to see more evidence before considering changing dblink to not return tuplestores. regards, tom lane
* Tom Lane (tgl@sss.pgh.pa.us) wrote: > Heikki's idea is probably superior so far as PG backend usage is > concerned in isolation, but I wonder if there are scenarios where a > client application would like to be able to manage libpq's allocations. The answer to that is certainly 'yes'. It was one of the first things that I complained about when moving from Oracle to PG. With OCI, you can bulk load results directly into application-allocated memory areas. Haven't been following the dblink discussion, so not going to comment about that piece. Thanks, Stephen
Stephen Frost <sfrost@snowman.net> writes: > * Tom Lane (tgl@sss.pgh.pa.us) wrote: >> Heikki's idea is probably superior so far as PG backend usage is >> concerned in isolation, but I wonder if there are scenarios where a >> client application would like to be able to manage libpq's allocations. > The answer to that is certainly 'yes'. It was one of the first things > that I complained about when moving from Oracle to PG. With OCI, you > can bulk load results directly into application-allocated memory areas. Well, loading data in a form whereby the application can access it without going through the PGresult accessor functions would be an entirely different (and vastly larger) project. I'm not sure I want to open that can of worms --- it seems like you could write a huge amount of code trying to provide every format someone might want, and still find that there were impedance mismatches for many applications. AIUI Kyotaro-san is just suggesting that the app should be able to provide a substitute malloc function for use in allocating PGresult space (and not, I think, anything else that libpq allocates internally). Basically this would allow PGresults to be cleaned up with methods other than calling PQclear on each one. It wouldn't affect how you'd interact with one while you had it. That seems like pretty much exactly what we want for preventing memory leaks in the backend; but is it going to be useful for other apps? regards, tom lane
On Sat, Nov 12, 2011 at 12:48 AM, Tom Lane <tgl@sss.pgh.pa.us> wrote: > AIUI Kyotaro-san is just suggesting that the app should be able to > provide a substitute malloc function for use in allocating PGresult > space (and not, I think, anything else that libpq allocates internally). > Basically this would allow PGresults to be cleaned up with methods other > than calling PQclear on each one. It wouldn't affect how you'd interact > with one while you had it. That seems like pretty much exactly what we > want for preventing memory leaks in the backend; but is it going to be > useful for other apps? I think it will. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp> writes: > Hello. This message is a proposal of a pair of patches that > enables the memory allocator for PGresult in libpq to be > replaced. Since there seems to be rough consensus that something like this would be a good idea, I looked more closely at the details of the patch. I think the design could use some adjustment. To start with, the patch proposes exposing some global variables that affect the behavior of libpq process-wide. This seems like a pretty bad design, because a single process could contain multiple usages of libpq with different requirements. As an example, if dblink.c were to set these variables inside a backend process, it would break usage of libpq from PL/Perl via DBI. Global variables also tend to be a bad idea whenever you think about multi-threaded applications --- they require locking facilities, which are not in this patch. I think it'd be better to consider the PGresult alloc/free functions to be a property of a PGconn, which you'd set with a function call along the lines of PQsetResultAllocator(conn, alloc_func, realloc_func, free_func) after having successfully opened a connection. Then we just have some more PGconn fields (and I guess PGresult will need a copy of the free_func pointer) and no new global variables. I am also feeling dubious about whether it's a good idea to expect the functions to have exactly the signature of malloc/free. They are essentially callbacks, and in most places where a library provides for callbacks, it's customary to include a "void *" passthrough argument in case the callback needs some context information. I am not sure that dblink.c would need such a thing, but if we're trying to design a general-purpose feature, then we probably should have it. The cost would be having shim functions inside libpq for the default case, but it doesn't seem likely that they'd cost enough to notice. The patch lacks any user documentation, which it surely must have if we are claiming this is a user-visible feature. And I think it could use some attention to updating code comments, notably the large block about PGresult space management near the top of fe-exec.c. Usually, when writing a feature of this sort, it's a good idea to implement a prototype use-case to make sure you've not overlooked anything. So I'd feel happier about the patch if it came along with a patch to make dblink.c use it to prevent memory leaks. regards, tom lane
Heikki Linnakangas <heikki.linnakangas@enterprisedb.com> writes: > On 11.11.2011 11:18, Kyotaro HORIGUCHI wrote: >> For these reasons, I propose to make allocators for PGresult >> replaceable. > You could use the resource owner mechanism to track them. BTW, I just thought of a potentially fatal objection to making PGresult allocation depend on palloc: libpq is absolutely not prepared to handle losing control on out-of-memory. While I'm not certain that its behavior with malloc is entirely desirable either (it tends to loop in hopes of getting the memory next time), we cannot just plop in palloc in place of malloc and imagine that we're not breaking it. This makes me think that Heikki's approach is by far the more tenable one, so far as dblink is concerned. Perhaps the substitute-malloc idea is still useful for some other application, but I'm inclined to put that idea on the back burner until we have a concrete use case for it. regards, tom lane
On 12/11/2011 07:36, Robert Haas wrote: > On Sat, Nov 12, 2011 at 12:48 AM, Tom Lane<tgl@sss.pgh.pa.us> wrote: >> AIUI Kyotaro-san is just suggesting that the app should be able to >> provide a substitute malloc function for use in allocating PGresult >> space (and not, I think, anything else that libpq allocates internally). >> Basically this would allow PGresults to be cleaned up with methods other >> than calling PQclear on each one. It wouldn't affect how you'd interact >> with one while you had it. That seems like pretty much exactly what we >> want for preventing memory leaks in the backend; but is it going to be >> useful for other apps? > > I think it will. Maybe I've just talking nonsense, I just have little experience hacking the pgsql and pdo-pgsql exstensions, but to me it would seem something that could easily avoid an extra duplication of the data returned by pqgetvalue. To me it seems a pretty nice win. Cheers -- Matteo Beccati Development & Consulting - http://www.beccati.com/
* Tom Lane (tgl@sss.pgh.pa.us) wrote: > Well, loading data in a form whereby the application can access it > without going through the PGresult accessor functions would be an > entirely different (and vastly larger) project. Looking through the thread, I agree that it's a different thing than what's being discussed here. > I'm not sure I want > to open that can of worms --- it seems like you could write a huge > amount of code trying to provide every format someone might want, > and still find that there were impedance mismatches for many > applications. The OCI approach is actually very similar to how we handle our catalogs internally.. Imagine you define a C struct which matched your table structure, then you allocate 5000 (or however) of those, give the base pointer to the 'getResult' call and a integer array of offsets into that structure for each of the columns. There might have been a few other minor things (like some notion of how to handle NULLs), but it was pretty straight-forward from the C perspective, imv. Trying to provide alternative formats (I'm guessing you were referring to something like XML..? Or some complex structure?) would certainly be a whole different ballgame. Thanks, Stephen > AIUI Kyotaro-san is just suggesting that the app should be able to > provide a substitute malloc function for use in allocating PGresult > space (and not, I think, anything else that libpq allocates internally). > Basically this would allow PGresults to be cleaned up with methods other > than calling PQclear on each one. It wouldn't affect how you'd interact > with one while you had it. That seems like pretty much exactly what we > want for preventing memory leaks in the backend; but is it going to be > useful for other apps? > > regards, tom lane
Hello, At Fri, 11 Nov 2011 11:29:30 +0200, Heikki Linnakangas wrote > You could use the resource owner mechanism to track > them. Register a callback function with > RegisterResourceReleaseCallback(). Thank you for letting me know about it. I have dug up a message in pg-hackers refering to the mechanism on discussion about postgresql-fdw. I'll put further thought into dblink-plus taking it into account. By the way, thinking about memory management for the result in libpq is considerable as another issue. At Sat, 12 Nov 2011 12:29:50 -0500, Tom Lane wrote > To start with, the patch proposes exposing some global > variables that affect the behavior of libpq process-wide. This > seems like a pretty bad design, because a single process could > contain multiple usages of libpq You're right to say the design is bad. I've designed it to have minimal impact on libpq by limiting usage and imposing any reponsibility on the users, that is the developers of the modules using it. If there are any other applications that want to use their own allocators, there are some points to be considered. I think it is preferable consiering multi-threading to make libpq write PGresult into memory blocks passed from the application like OCI does, instead of letting libpq itself make request for them. This approach hands the responsibility of memory management to the user and gives them the capability to avoid memory exhaustion by their own measures. On the other hand, this way could produce the situation that libpq cannot write all of the data to receive from the server onto handed memory block. So, the API must be able to return the partial data to the caller. More advancing, if libpq could store the result directly into user-allocated memory space using tuplestore-like interface, it is better on performance if the final storage is a tuplestore itself. I will be happy with the part-by-part passing of result. So I will think about this as the next issue. > So I'd feel happier about the patch if it came along with a > patch to make dblink.c use it to prevent memory leaks. I take it is about my original patch. Mmm, I heard that dblink copies received data in PGResult to tuple store not only because of the memory leak, but less memory usage (after the copy is finished). I think I could show you the patch ignoring the latter, but it might take some time for me to start from understand dblink and tuplestore closely... If I find RegisterResourceReleaseCallback short for our requirement, I will show it. If not, I withdraw this patch for ongoing CF and propose another patch based on the discussion above at another time. Please let me have a little more time. regards, -- Kyotaro Horiguchi NTT Open Source Software Center
Hello, me> I'll put further thought into dblink-plus taking it into me> account. .. me> Please let me have a little more time. I've inquired the developer of dblink-plus about RegisterResourceReleaseCallback(). He said that the function is in bad compatibility with current implementation. In addition to this, storing into tuplestore directly seems to me a good idea than palloc'ed PGresult. So I tried to make libpq/PGresult be able to handle alternative tuple store by hinting to PGconn, and modify dblink to use the mechanism as the first sample code. I will show it as a series of patches in next message. regards, -- Kyotaro Horiguchi NTT Open Source Software Center
Hello, This is the next version of Allow substitute allocators for PGresult. Totally chaning the concept from the previous one, this patch allows libpq to handle alternative tuple store for received tuples. Design guidelines are shown below. - No need to modify existing client code of libpq. - Existing libpq client runs with roughly same performance, and dblink with modification runs faster to some extent and requires less memory. I have measured roughly of run time and memory requirement for three configurations on CentOS6 on Vbox with 2GB mem 4 cores running on Win7-Corei7, transferring (30 bytes * 2 cols) * 2000000 tuples (120MB net) within this virutal machine. The results are below. xfer time Peak RSS Original : 6.02s 850MB libpq patch + Original dblink : 6.11s 850MB full patch : 4.44s 643MB xfer time here is the mean of five 'real time's measured by running sql script like this after warmup run. === test.sql select dblink_connect('c', 'host=localhost port=5432 dbname=test'); select * from dblink('c', 'select a,c from foo limit 2000000') as (a text, b bytea) limit 1; select dblink_disconnect('c'); === $ for i in $(seq 1 10); do time psql test -f t.sql; done === Peak RSS is measured by picking up heap Rss in /proc/[pid]/smaps. It seems somewhat slow using patched libpq and original dblink, but it seems within error range too. If this amount of slowdown is not permissible, it might be improved by restoring the static call route before for extra redundancy of the code. On the other hand, full patch version seems obviously fast and requires less memory. Isn't it nice? This patch consists of two sub patches. The first is a patch for libpq to allow rewiring tuple storage mechanism. But default behavior is not changed. Existing libpq client should run with it. The second is modify dblink to storing received tuples into tuplestore directly using the mechanism above. regards, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index a360d78..1af8df6 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,7 +160,3 @@ PQconnectStartParams 157PQping 158PQpingParams 159PQlibVersion 160 -PQregisterTupleAdder 161 -PQgetAsCstring 162 -PQgetAddTupleParam 163 -PQsetAddTupleErrMes 164 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 437be26..50f3f83 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2692,7 +2692,6 @@ makeEmptyPGconn(void) conn->allow_ssl_try = true; conn->wait_ssl_try = false;#endif - conn->addTupleFunc = NULL; /* * We try to send at least 8K at a time, which is the usual size of pipe @@ -5065,10 +5064,3 @@ PQregisterThreadLock(pgthreadlock_t newhandler) return prev;} - -void -PQregisterTupleAdder(PGconn *conn, addTupleFunction func, void *param) -{ - conn->addTupleFunc = func; - conn->addTupleFuncParam = param; -} diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index c8ec9bd..113aab0 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -48,6 +48,7 @@ char *const pgresStatus[] = {static int static_client_encoding = PG_SQL_ASCII;static bool static_std_strings= false; +static PGEvent *dupEvents(PGEvent *events, int count);static bool PQsendQueryStart(PGconn *conn);static int PQsendQueryGuts(PGconn*conn, @@ -65,9 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target);static int check_field_number(const PGresult *res, int field_num); -static void *pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func, - int id, size_t len); -static void *pqAddTuple(PGresult *res, PGresAttValue *tup); +/* ---------------- * Space management for PGresult. @@ -161,9 +160,6 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) result->curBlock = NULL; result->curOffset= 0; result->spaceLeft = 0; - result->addTupleFunc = pqDefaultAddTupleFunc; - result->addTupleFuncParam = NULL; - result->addTupleFuncErrMes = NULL; if (conn) { @@ -198,12 +194,6 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) } result->nEvents = conn->nEvents; } - - if (conn->addTupleFunc) - { - result->addTupleFunc = conn->addTupleFunc; - result->addTupleFuncParam = conn->addTupleFuncParam; - } } else { @@ -497,33 +487,6 @@ PQresultAlloc(PGresult *res, size_t nBytes) return pqResultAlloc(res, nBytes, TRUE);} -void * -pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func, int id, size_t len) -{ - void *p; - - switch (func) - { - case ADDTUP_ALLOC_TEXT: - return pqResultAlloc(res, len, TRUE); - - case ADDTUP_ALLOC_BINARY: - p = pqResultAlloc(res, len, FALSE); - - if (id == -1) - res->addTupleFuncParam = p; - - return p; - - case ADDTUP_ADD_TUPLE: - return pqAddTuple(res, res->addTupleFuncParam); - - default: - /* Ignore */ - break; - } - return NULL; -}/* * pqResultAlloc - * Allocate subsidiary storage for a PGresult. @@ -867,9 +830,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple * add a row pointerto the PGresult structure, growing it if necessary - * Returns tup if OK, NULL if not enough memory to add the row. + * Returns TRUE if OK, FALSE if not enough memory to add the row */ -static void * +intpqAddTuple(PGresult *res, PGresAttValue *tup){ if (res->ntups >= res->tupArrSize) @@ -895,13 +858,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup) newTuples = (PGresAttValue **) realloc(res->tuples, newSize * sizeof(PGresAttValue *)); if (!newTuples) - return NULL; /* malloc or realloc failed */ + return FALSE; /* malloc or realloc failed */ res->tupArrSize = newSize; res->tuples = newTuples; } res->tuples[res->ntups] = tup; res->ntups++; - return tup; + return TRUE;}/* @@ -2859,43 +2822,6 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num) return 0;} -/* PQgetAsCString - * returns the field as C string. - */ -char * -PQgetAsCstring(PGresAttValue *attval) -{ - return attval->len == NULL_LEN ? NULL : attval->value; -} - -/* PQgetAddTupleParam - * Get the pointer to the contextual parameter from PGresult which is - * registered to PGconn by PQregisterTupleAdder - */ -void * -PQgetAddTupleParam(const PGresult *res) -{ - if (!res) - return NULL; - return res->addTupleFuncParam; -} - -/* PQsetAddTupleErrMes - * Set the error message pass back to the caller of addTupleFunc - * mes must be a malloc'ed memory block and it is released by the - * caller of addTupleFunc if set. - * You can replace the previous message by alternative mes, or clear - * it with NULL. - */ -void -PQsetAddTupleErrMes(PGresult *res, char *mes) -{ - /* Free existing message */ - if (res->addTupleFuncErrMes) - free(res->addTupleFuncErrMes); - res->addTupleFuncErrMes = mes; -} -/* PQnparams: * returns the number of input parameters of a prepared statement. */ diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index c7f74ae..77c4d5a 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -733,10 +733,9 @@ getAnotherTuple(PGconn *conn, bool binary) if (conn->curTuple == NULL) { conn->curTuple= (PGresAttValue *) - result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1, - nfields * sizeof(PGresAttValue)); + pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); if (conn->curTuple == NULL) - goto addTupleError; + goto outOfMemory; MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); /* @@ -758,7 +757,7 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) - goto addTupleError; + goto outOfMemory; } if (pqGetnchar(bitmap, nbytes, conn)) @@ -788,12 +787,9 @@ getAnotherTuple(PGconn *conn, bool binary) vlen = 0; if (tup[i].value == NULL) { - AddTupFunc func = - (binary ? ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT); - tup[i].value = - (char *) result->addTupleFunc(result, func, i, vlen + 1); + tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); if (tup[i].value == NULL) - goto addTupleError; + goto outOfMemory; } tup[i].len = vlen; /* read in the value */ @@ -816,9 +812,8 @@ getAnotherTuple(PGconn *conn, bool binary) } /* Success! Store the completed tuple in the result*/ - if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0)) - goto addTupleError; - + if (!pqAddTuple(result, tup)) + goto outOfMemory; /* and reset for a new message */ conn->curTuple = NULL; @@ -826,7 +821,7 @@ getAnotherTuple(PGconn *conn, bool binary) free(bitmap); return 0; -addTupleError: +outOfMemory: /* Replace partially constructed result with an error result */ /* @@ -834,21 +829,8 @@ addTupleError: * there's not enough memory to concatenate messages... */ pqClearAsyncResult(conn); - resetPQExpBuffer(&conn->errorMessage); - - /* - * If error message is passed from addTupleFunc, set it into - * PGconn, assume out of memory if not. - */ - appendPQExpBufferStr(&conn->errorMessage, - libpq_gettext(result->addTupleFuncErrMes ? - result->addTupleFuncErrMes : - "out of memory for query result\n")); - if (result->addTupleFuncErrMes) - { - free(result->addTupleFuncErrMes); - result->addTupleFuncErrMes = NULL; - } + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory for query result\n")); /* * XXX: if PQmakeEmptyPGresult() fails,there's probably not much we can diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index d14b57a..45a84d8 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -634,10 +634,9 @@ getAnotherTuple(PGconn *conn, int msgLength) if (conn->curTuple == NULL) { conn->curTuple= (PGresAttValue *) - result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1, - nfields * sizeof(PGresAttValue)); + pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); if (conn->curTuple == NULL) - goto addTupleError; + goto outOfMemory; MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); } tup = conn->curTuple; @@ -674,12 +673,11 @@ getAnotherTuple(PGconn *conn, int msgLength) vlen = 0; if (tup[i].value == NULL) { - AddTupFunc func = (result->attDescs[i].format != 0 ? - ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT); - tup[i].value = - (char *) result->addTupleFunc(result, func, i, vlen + 1); + bool isbinary = (result->attDescs[i].format != 0); + + tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); if (tup[i].value == NULL) - goto addTupleError; + goto outOfMemory; } tup[i].len = vlen; /* read in the value */ @@ -691,36 +689,22 @@ getAnotherTuple(PGconn *conn, int msgLength) } /* Success! Store the completed tuple in theresult */ - if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0)) - goto addTupleError; - + if (!pqAddTuple(result, tup)) + goto outOfMemory; /* and reset for a new message */ conn->curTuple = NULL; return 0; -addTupleError: +outOfMemory: /* * Replace partially constructed result with an error result. First * discard the old result totry to win back some memory. */ pqClearAsyncResult(conn); - resetPQExpBuffer(&conn->errorMessage); - - /* - * If error message is passed from addTupleFunc, set it into - * PGconn, assume out of memory if not. - */ - appendPQExpBufferStr(&conn->errorMessage, - libpq_gettext(result->addTupleFuncErrMes ? - result->addTupleFuncErrMes : - "out of memory for query result\n")); - if (result->addTupleFuncErrMes) - { - free(result->addTupleFuncErrMes); - result->addTupleFuncErrMes = NULL; - } + printfPQExpBuffer(&conn->errorMessage, + libpq_gettext("out of memory for query result\n")); pqSaveErrorResult(conn); /* Discard thefailed message by pretending we read it */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index bdce294..d13a5b9 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -116,16 +116,6 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */} PGPing; -/* AddTupFunc is one of the parameters of addTupleFunc that decides - * the function of the addTupleFunction. See addTupleFunction for - * details */ -typedef enum -{ - ADDTUP_ALLOC_TEXT, /* Returns non-aligned memory for text value */ - ADDTUP_ALLOC_BINARY, /* Returns aligned memory for binary value */ - ADDTUP_ADD_TUPLE /* Adds tuple data into tuple storage */ -} AddTupFunc; -/* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications.*/ @@ -235,12 +225,6 @@ typedef struct pgresAttDesc int atttypmod; /* type-specific modifier info */} PGresAttDesc; -typedef struct pgresAttValue -{ - int len; /* length in bytes of the value */ - char *value; /* actual value, plus terminating zero byte */ -} PGresAttValue; -/* ---------------- * Exported functions of libpq * ---------------- @@ -432,52 +416,6 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const * keywords, const char *const * values, int expand_dbname); -/* - * Typedef for tuple storage function. - * - * This function pointer is used for tuple storage function in - * PGresult and PGconn. - * - * addTupleFunction is called for four types of function designated by - * the enum AddTupFunc. - * - * id is the identifier for allocated memory block. The caller sets -1 - * for PGresAttValue array, and 0 to number of cols - 1 for each - * column. - * - * ADDTUP_ALLOC_TEXT requests the size bytes memory block for a text - * value which may not be alingned to the word boundary. - * - * ADDTUP_ALLOC_BINARY requests the size bytes memory block for a - * binary value which is aligned to the word boundary. - * - * ADDTUP_ADD_TUPLE requests to add tuple data into storage, and - * free the memory blocks allocated by this function if necessary. - * id and size are ignored. - * - * This function must return non-NULL value for success and must - * return NULL for failure and may set error message by - * PQsetAddTupleErrMes in malloc'ed memory. Assumed by caller as out - * of memory if the error message is NULL on failure. This function is - * assumed not to throw any exception. - */ - typedef void *(*addTupleFunction)(PGresult *res, AddTupFunc func, - int id, size_t size); - -/* - * Register alternative tuple storage function to PGconn. - * - * By registering this function, pg_result disables its own tuple - * storage and calls it to append rows one by one. - * - * func is tuple store function. See addTupleFunction. - * - * addTupFuncParam is contextual storage that can be get with - * PQgetAddTupleParam in func. - */ -extern void PQregisterTupleAdder(PGconn *conn, addTupleFunction func, - void *addTupFuncParam); -/* Force the write buffer to be written (or at least try) */extern int PQflush(PGconn *conn); @@ -516,9 +454,6 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num, intfield_num);extern int PQgetlength(const PGresult *res, int tup_num, int field_num);extern int PQgetisnull(constPGresult *res, int tup_num, int field_num); -extern char *PQgetAsCstring(PGresAttValue *attdesc); -extern void *PQgetAddTupleParam(const PGresult *res); -extern void PQsetAddTupleErrMes(PGresult *res, char *mes);extern int PQnparams(const PGresult *res);extern Oid PQparamtype(const PGresult *res, int param_num); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 45e4c93..64dfcb2 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -134,6 +134,12 @@ typedef struct pgresParamDesc#define NULL_LEN (-1) /* pg_result len for NULL value */ +typedef struct pgresAttValue +{ + int len; /* length in bytes of the value */ + char *value; /* actual value, plus terminating zero byte */ +} PGresAttValue; +/* Typedef for message-field list entries */typedef struct pgMessageField{ @@ -203,11 +209,6 @@ struct pg_result PGresult_data *curBlock; /* most recently allocated block */ int curOffset; /* start offset of free space in block */ int spaceLeft; /* number of free bytesremaining in block */ - - addTupleFunction addTupleFunc; /* Tuple storage function. See - * addTupleFunction for details. */ - void *addTupleFuncParam; /* Contextual parameter for addTupleFunc */ - char *addTupleFuncErrMes; /* Error message returned from addTupFunc */};/* PGAsyncStatusType defines the stateof the query-execution state machine */ @@ -442,13 +443,6 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer;/* expansible string */ - - /* Tuple store function. The two fields below is copied to newly - * created PGresult if addTupleFunc is not NULL. Use default - * function if addTupleFunc is NULL. */ - addTupleFunction addTupleFunc; /* Tuple storage function. See - * addTupleFunction for details. */ - void *addTupleFuncParam; /* Contextual parameter for addTupFunc */};/* PGcancel stores all data necessary to cancela connection. A copy of this @@ -513,6 +507,7 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); +extern int pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code, const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 62c810a..fb2e10e 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,11 +63,23 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor*/} remoteConn; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + AttInMetadata *attinmeta; + MemoryContext oldcontext; + char *attrvalbuf; + void **valbuf; + size_t *valbufsize; + bool error_occurred; + bool nummismatch; +} storeInfo; +/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); -static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn); @@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); +static void finishStoreInfo(storeInfo *sinfo); +static void *addTuple(PGresult *res, AddTupFunc func, int id, size_t size); +/* Global */static remoteConn *pconn = NULL; @@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; @@ -559,15 +576,30 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterTupleAdder(conn, addTuple, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even thoughwe are still in a short-lived * memory context. */ res = PQexec(conn, buf.data); + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* This is only for backward compatibility */ + if (storeinfo.nummismatch) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); dblink_res_error(conname, res, "could not fetchfrom cursor", fail); return (Datum) 0; } @@ -580,7 +612,6 @@ dblink_fetch(PG_FUNCTION_ARGS) errmsg("cursor \"%s\" does not exist", curname))); } - materializeResult(fcinfo, res); return (Datum) 0;} @@ -640,6 +671,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo,ReturnSetInfo)) @@ -715,164 +747,206 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc= NULL; + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterTupleAdder(conn, addTuple, &storeinfo); + /* synchronous query, or async result retrieval */ if (!is_async) res = PQexec(conn, sql); else - { res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; - } - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + finishStoreInfo(&storeinfo); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + /* This is only for backward compatibility */ + if (storeinfo.nummismatch) + { + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } } - materializeResult(fcinfo, res); return (Datum) 0;} -/* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */static void -materializeResult(FunctionCallInfo fcinfo, PGresult *res) +initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - - Assert(rsinfo->returnMode == SFRM_Materialize); - - PG_TRY(); + TupleDesc tupdesc; + int i; + + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) { - TupleDesc tupdesc; - bool is_sql_cmd = false; - int ntuples; - int nfields; + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + sinfo->oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->nattrs = tupdesc->natts; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *)); + sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t)); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + sinfo->valbuf[i] = NULL; + sinfo->valbufsize[i] = 0; + } - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + /* Preallocate memory of same size with PGresAttDesc array for values. */ + sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue)); - is_sql_cmd = false; + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; +} - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } +static void +finishStoreInfo(storeInfo *sinfo) +{ + int i; - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + if (sinfo->valbuf[i]) + { + free(sinfo->valbuf[i]); + sinfo->valbuf[i] = NULL; } + } + if (sinfo->attrvalbuf) + free(sinfo->attrvalbuf); + sinfo->attrvalbuf = NULL; + MemoryContextSwitchTo(sinfo->oldcontext); +} - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); +static void * +addTuple(PGresult *res, AddTupFunc func, int id, size_t size) +{ + storeInfo *sinfo = (storeInfo *)PQgetAddTupleParam(res); + HeapTuple tuple; + int fields = PQnfields(res); + int i; + PGresAttValue *attval; + char **cstrs; - if (ntuples > 0) - { - AttInMetadata *attinmeta; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; - - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + if (sinfo->error_occurred) + return NULL; - values = (char **) palloc(nfields * sizeof(char *)); + switch (func) + { + case ADDTUP_ALLOC_TEXT: + case ADDTUP_ALLOC_BINARY: + if (id == -1) + return sinfo->attrvalbuf; + + if (id < 0 || id >= sinfo->nattrs) + return NULL; - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) + if (sinfo->valbufsize[id] < size) { - HeapTuple tuple; + if (sinfo->valbuf[id] == NULL) + sinfo->valbuf[id] = malloc(size); + else + sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size); + sinfo->valbufsize[id] = size; + } + return sinfo->valbuf[id]; - if (!is_sql_cmd) - { - int i; + case ADDTUP_ADD_TUPLE: + break; /* Go through */ + default: + /* Ignore */ + break; + } - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else - { - values[0] = PQcmdStatus(res); - } + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + finishStoreInfo(sinfo); - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); - } + PQsetAddTupleErrMes(res, + strdup("function returning record called in " + "context that cannot accept type record")); + return NULL; + } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); - } + /* + * Rewrite PGresAttDesc[] to char(*)[] in-place. + */ + Assert(sizeof(char*) <= sizeof(PGresAttValue)); + attval = (PGresAttValue *)sinfo->attrvalbuf; + cstrs = (char **)sinfo->attrvalbuf; + for(i = 0 ; i < fields ; i++) + cstrs[i] = PQgetAsCstring(attval++); - PQclear(res); + PG_TRY(); + { + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); } PG_CATCH(); { - /* be sure to release the libpq result */ - PQclear(res); - PG_RE_THROW(); + /* + * Return the error message in the exception to the caller and + * cancel the exception. + */ + ErrorData *edata; + + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + + finishStoreInfo(sinfo); + + edata = CopyErrorData(); + FlushErrorState(); + + PQsetAddTupleErrMes(res, strdup(edata->message)); + return NULL; } PG_END_TRY(); + + return sinfo->attrvalbuf;}/*
Ouch! I'm sorry for making a reverse patch for the first modification. This is an amendment of the message below. The body text is copied into this message. http://archives.postgresql.org/message-id/20111201.192419.103527179.horiguchi.kyotaro@oss.ntt.co.jp ======= Hello, This is the next version of Allow substitute allocators for PGresult. Totally chaning the concept from the previous one, this patch allows libpq to handle alternative tuple store for received tuples. Design guidelines are shown below. - No need to modify existing client code of libpq. - Existing libpq client runs with roughly same performance, and dblink with modification runs faster to some extent and requires less memory. I have measured roughly of run time and memory requirement for three configurations on CentOS6 on Vbox with 2GB mem 4 cores running on Win7-Corei7, transferring (30 bytes * 2 cols) * 2000000 tuples (120MB net) within this virutal machine. The results are below. xfer time Peak RSS Original : 6.02s 850MB libpq patch + Original dblink : 6.11s 850MB full patch : 4.44s 643MB xfer time here is the mean of five 'real time's measured by running sql script like this after warmup run. === test.sql select dblink_connect('c', 'host=localhost port=5432 dbname=test'); select * from dblink('c', 'select a,c from foo limit 2000000') as (a text, b bytea) limit 1; select dblink_disconnect('c'); === $ for i in $(seq 1 10); do time psql test -f t.sql; done === Peak RSS is measured by picking up heap Rss in /proc/[pid]/smaps. It seems somewhat slow using patched libpq and original dblink, but it seems within error range too. If this amount of slowdown is not permissible, it might be improved by restoring the static call route before for extra redundancy of the code. On the other hand, full patch version seems obviously fast and requires less memory. Isn't it nice? This patch consists of two sub patches. The first is a patch for libpq to allow rewiring tuple storage mechanism. But default behavior is not changed. Existing libpq client should run with it. The second is modify dblink to storing received tuples into tuplestore directly using the mechanism above. regards, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1af8df6..a360d78 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,3 +160,7 @@ PQconnectStartParams 157PQping 158PQpingParams 159PQlibVersion 160 +PQregisterTupleAdder 161 +PQgetAsCstring 162 +PQgetAddTupleParam 163 +PQsetAddTupleErrMes 164 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 50f3f83..437be26 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2692,6 +2692,7 @@ makeEmptyPGconn(void) conn->allow_ssl_try = true; conn->wait_ssl_try = false;#endif + conn->addTupleFunc = NULL; /* * We try to send at least 8K at a time, which is the usual size of pipe @@ -5064,3 +5065,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler) return prev;} + +void +PQregisterTupleAdder(PGconn *conn, addTupleFunction func, void *param) +{ + conn->addTupleFunc = func; + conn->addTupleFuncParam = param; +} diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 113aab0..c8ec9bd 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -48,7 +48,6 @@ char *const pgresStatus[] = {static int static_client_encoding = PG_SQL_ASCII;static bool static_std_strings= false; -static PGEvent *dupEvents(PGEvent *events, int count);static bool PQsendQueryStart(PGconn *conn);static int PQsendQueryGuts(PGconn*conn, @@ -66,7 +65,9 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target);static int check_field_number(const PGresult *res, int field_num); - +static void *pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func, + int id, size_t len); +static void *pqAddTuple(PGresult *res, PGresAttValue *tup);/* ---------------- * Space management for PGresult. @@ -160,6 +161,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) result->curBlock = NULL; result->curOffset= 0; result->spaceLeft = 0; + result->addTupleFunc = pqDefaultAddTupleFunc; + result->addTupleFuncParam = NULL; + result->addTupleFuncErrMes = NULL; if (conn) { @@ -194,6 +198,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) } result->nEvents = conn->nEvents; } + + if (conn->addTupleFunc) + { + result->addTupleFunc = conn->addTupleFunc; + result->addTupleFuncParam = conn->addTupleFuncParam; + } } else { @@ -487,6 +497,33 @@ PQresultAlloc(PGresult *res, size_t nBytes) return pqResultAlloc(res, nBytes, TRUE);} +void * +pqDefaultAddTupleFunc(PGresult *res, AddTupFunc func, int id, size_t len) +{ + void *p; + + switch (func) + { + case ADDTUP_ALLOC_TEXT: + return pqResultAlloc(res, len, TRUE); + + case ADDTUP_ALLOC_BINARY: + p = pqResultAlloc(res, len, FALSE); + + if (id == -1) + res->addTupleFuncParam = p; + + return p; + + case ADDTUP_ADD_TUPLE: + return pqAddTuple(res, res->addTupleFuncParam); + + default: + /* Ignore */ + break; + } + return NULL; +}/* * pqResultAlloc - * Allocate subsidiary storage for a PGresult. @@ -830,9 +867,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple * add a row pointerto the PGresult structure, growing it if necessary - * Returns TRUE if OK, FALSE if not enough memory to add the row + * Returns tup if OK, NULL if not enough memory to add the row. */ -int +static void *pqAddTuple(PGresult *res, PGresAttValue *tup){ if (res->ntups >= res->tupArrSize) @@ -858,13 +895,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup) newTuples = (PGresAttValue **) realloc(res->tuples, newSize * sizeof(PGresAttValue *)); if (!newTuples) - return FALSE; /* malloc or realloc failed */ + return NULL; /* malloc or realloc failed */ res->tupArrSize = newSize; res->tuples = newTuples; } res->tuples[res->ntups] = tup; res->ntups++; - return TRUE; + return tup;}/* @@ -2822,6 +2859,43 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num) return 0;} +/* PQgetAsCString + * returns the field as C string. + */ +char * +PQgetAsCstring(PGresAttValue *attval) +{ + return attval->len == NULL_LEN ? NULL : attval->value; +} + +/* PQgetAddTupleParam + * Get the pointer to the contextual parameter from PGresult which is + * registered to PGconn by PQregisterTupleAdder + */ +void * +PQgetAddTupleParam(const PGresult *res) +{ + if (!res) + return NULL; + return res->addTupleFuncParam; +} + +/* PQsetAddTupleErrMes + * Set the error message pass back to the caller of addTupleFunc + * mes must be a malloc'ed memory block and it is released by the + * caller of addTupleFunc if set. + * You can replace the previous message by alternative mes, or clear + * it with NULL. + */ +void +PQsetAddTupleErrMes(PGresult *res, char *mes) +{ + /* Free existing message */ + if (res->addTupleFuncErrMes) + free(res->addTupleFuncErrMes); + res->addTupleFuncErrMes = mes; +} +/* PQnparams: * returns the number of input parameters of a prepared statement. */ diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index 77c4d5a..c7f74ae 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary) if (conn->curTuple == NULL) { conn->curTuple= (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); + result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1, + nfields * sizeof(PGresAttValue)); if (conn->curTuple == NULL) - goto outOfMemory; + goto addTupleError; MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); /* @@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) - goto outOfMemory; + goto addTupleError; } if (pqGetnchar(bitmap, nbytes, conn)) @@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary) vlen = 0; if (tup[i].value == NULL) { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); + AddTupFunc func = + (binary ? ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT); + tup[i].value = + (char *) result->addTupleFunc(result, func, i, vlen + 1); if (tup[i].value == NULL) - goto outOfMemory; + goto addTupleError; } tup[i].len = vlen; /* read in the value */ @@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary) } /* Success! Store the completed tuple in the result*/ - if (!pqAddTuple(result, tup)) - goto outOfMemory; + if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0)) + goto addTupleError; + /* and reset for a new message */ conn->curTuple = NULL; @@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary) free(bitmap); return 0; -outOfMemory: +addTupleError: /* Replace partially constructed result with an error result */ /* @@ -829,8 +834,21 @@ outOfMemory: * there's not enough memory to concatenate messages... */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->addTupleFuncErrMes ? + result->addTupleFuncErrMes : + "out of memory for query result\n")); + if (result->addTupleFuncErrMes) + { + free(result->addTupleFuncErrMes); + result->addTupleFuncErrMes = NULL; + } /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 45a84d8..d14b57a 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength) if (conn->curTuple == NULL) { conn->curTuple= (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); + result->addTupleFunc(result, ADDTUP_ALLOC_BINARY, -1, + nfields * sizeof(PGresAttValue)); if (conn->curTuple == NULL) - goto outOfMemory; + goto addTupleError; MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); } tup = conn->curTuple; @@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength) vlen = 0; if (tup[i].value == NULL) { - bool isbinary = (result->attDescs[i].format != 0); - - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); + AddTupFunc func = (result->attDescs[i].format != 0 ? + ADDTUP_ALLOC_BINARY : ADDTUP_ALLOC_TEXT); + tup[i].value = + (char *) result->addTupleFunc(result, func, i, vlen + 1); if (tup[i].value == NULL) - goto outOfMemory; + goto addTupleError; } tup[i].len = vlen; /* read in the value */ @@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength) } /* Success! Store the completed tuple in theresult */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; + if (!result->addTupleFunc(result, ADDTUP_ADD_TUPLE, 0, 0)) + goto addTupleError; + /* and reset for a new message */ conn->curTuple = NULL; return 0; -outOfMemory: +addTupleError: /* * Replace partially constructed result with an error result. First * discard the old resultto try to win back some memory. */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->addTupleFuncErrMes ? + result->addTupleFuncErrMes : + "out of memory for query result\n")); + if (result->addTupleFuncErrMes) + { + free(result->addTupleFuncErrMes); + result->addTupleFuncErrMes = NULL; + } pqSaveErrorResult(conn); /* Discard the failed message by pretending we read it */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index d13a5b9..bdce294 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -116,6 +116,16 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */} PGPing; +/* AddTupFunc is one of the parameters of addTupleFunc that decides + * the function of the addTupleFunction. See addTupleFunction for + * details */ +typedef enum +{ + ADDTUP_ALLOC_TEXT, /* Returns non-aligned memory for text value */ + ADDTUP_ALLOC_BINARY, /* Returns aligned memory for binary value */ + ADDTUP_ADD_TUPLE /* Adds tuple data into tuple storage */ +} AddTupFunc; +/* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications.*/ @@ -225,6 +235,12 @@ typedef struct pgresAttDesc int atttypmod; /* type-specific modifier info */} PGresAttDesc; +typedef struct pgresAttValue +{ + int len; /* length in bytes of the value */ + char *value; /* actual value, plus terminating zero byte */ +} PGresAttValue; +/* ---------------- * Exported functions of libpq * ---------------- @@ -416,6 +432,52 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const * keywords, const char *const * values, int expand_dbname); +/* + * Typedef for tuple storage function. + * + * This function pointer is used for tuple storage function in + * PGresult and PGconn. + * + * addTupleFunction is called for four types of function designated by + * the enum AddTupFunc. + * + * id is the identifier for allocated memory block. The caller sets -1 + * for PGresAttValue array, and 0 to number of cols - 1 for each + * column. + * + * ADDTUP_ALLOC_TEXT requests the size bytes memory block for a text + * value which may not be alingned to the word boundary. + * + * ADDTUP_ALLOC_BINARY requests the size bytes memory block for a + * binary value which is aligned to the word boundary. + * + * ADDTUP_ADD_TUPLE requests to add tuple data into storage, and + * free the memory blocks allocated by this function if necessary. + * id and size are ignored. + * + * This function must return non-NULL value for success and must + * return NULL for failure and may set error message by + * PQsetAddTupleErrMes in malloc'ed memory. Assumed by caller as out + * of memory if the error message is NULL on failure. This function is + * assumed not to throw any exception. + */ + typedef void *(*addTupleFunction)(PGresult *res, AddTupFunc func, + int id, size_t size); + +/* + * Register alternative tuple storage function to PGconn. + * + * By registering this function, pg_result disables its own tuple + * storage and calls it to append rows one by one. + * + * func is tuple store function. See addTupleFunction. + * + * addTupFuncParam is contextual storage that can be get with + * PQgetAddTupleParam in func. + */ +extern void PQregisterTupleAdder(PGconn *conn, addTupleFunction func, + void *addTupFuncParam); +/* Force the write buffer to be written (or at least try) */extern int PQflush(PGconn *conn); @@ -454,6 +516,9 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num, intfield_num);extern int PQgetlength(const PGresult *res, int tup_num, int field_num);extern int PQgetisnull(constPGresult *res, int tup_num, int field_num); +extern char *PQgetAsCstring(PGresAttValue *attdesc); +extern void *PQgetAddTupleParam(const PGresult *res); +extern void PQsetAddTupleErrMes(PGresult *res, char *mes);extern int PQnparams(const PGresult *res);extern Oid PQparamtype(const PGresult *res, int param_num); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 64dfcb2..45e4c93 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -134,12 +134,6 @@ typedef struct pgresParamDesc#define NULL_LEN (-1) /* pg_result len for NULL value */ -typedef struct pgresAttValue -{ - int len; /* length in bytes of the value */ - char *value; /* actual value, plus terminating zero byte */ -} PGresAttValue; -/* Typedef for message-field list entries */typedef struct pgMessageField{ @@ -209,6 +203,11 @@ struct pg_result PGresult_data *curBlock; /* most recently allocated block */ int curOffset; /* start offset of free space in block */ int spaceLeft; /* number of free bytesremaining in block */ + + addTupleFunction addTupleFunc; /* Tuple storage function. See + * addTupleFunction for details. */ + void *addTupleFuncParam; /* Contextual parameter for addTupleFunc */ + char *addTupleFuncErrMes; /* Error message returned from addTupFunc */};/* PGAsyncStatusType defines the stateof the query-execution state machine */ @@ -443,6 +442,13 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer;/* expansible string */ + + /* Tuple store function. The two fields below is copied to newly + * created PGresult if addTupleFunc is not NULL. Use default + * function if addTupleFunc is NULL. */ + addTupleFunction addTupleFunc; /* Tuple storage function. See + * addTupleFunction for details. */ + void *addTupleFuncParam; /* Contextual parameter for addTupFunc */};/* PGcancel stores all data necessary to cancela connection. A copy of this @@ -507,7 +513,6 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); -extern int pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code, const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 62c810a..fb2e10e 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,11 +63,23 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor*/} remoteConn; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + AttInMetadata *attinmeta; + MemoryContext oldcontext; + char *attrvalbuf; + void **valbuf; + size_t *valbufsize; + bool error_occurred; + bool nummismatch; +} storeInfo; +/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); -static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn); @@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); +static void finishStoreInfo(storeInfo *sinfo); +static void *addTuple(PGresult *res, AddTupFunc func, int id, size_t size); +/* Global */static remoteConn *pconn = NULL; @@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; @@ -559,15 +576,30 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterTupleAdder(conn, addTuple, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even thoughwe are still in a short-lived * memory context. */ res = PQexec(conn, buf.data); + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* This is only for backward compatibility */ + if (storeinfo.nummismatch) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); dblink_res_error(conname, res, "could not fetchfrom cursor", fail); return (Datum) 0; } @@ -580,7 +612,6 @@ dblink_fetch(PG_FUNCTION_ARGS) errmsg("cursor \"%s\" does not exist", curname))); } - materializeResult(fcinfo, res); return (Datum) 0;} @@ -640,6 +671,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo,ReturnSetInfo)) @@ -715,164 +747,206 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc= NULL; + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterTupleAdder(conn, addTuple, &storeinfo); + /* synchronous query, or async result retrieval */ if (!is_async) res = PQexec(conn, sql); else - { res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; - } - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + finishStoreInfo(&storeinfo); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + /* This is only for backward compatibility */ + if (storeinfo.nummismatch) + { + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } } - materializeResult(fcinfo, res); return (Datum) 0;} -/* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */static void -materializeResult(FunctionCallInfo fcinfo, PGresult *res) +initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - - Assert(rsinfo->returnMode == SFRM_Materialize); - - PG_TRY(); + TupleDesc tupdesc; + int i; + + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) { - TupleDesc tupdesc; - bool is_sql_cmd = false; - int ntuples; - int nfields; + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + sinfo->oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->nattrs = tupdesc->natts; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *)); + sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t)); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + sinfo->valbuf[i] = NULL; + sinfo->valbufsize[i] = 0; + } - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + /* Preallocate memory of same size with PGresAttDesc array for values. */ + sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue)); - is_sql_cmd = false; + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; +} - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } +static void +finishStoreInfo(storeInfo *sinfo) +{ + int i; - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + if (sinfo->valbuf[i]) + { + free(sinfo->valbuf[i]); + sinfo->valbuf[i] = NULL; } + } + if (sinfo->attrvalbuf) + free(sinfo->attrvalbuf); + sinfo->attrvalbuf = NULL; + MemoryContextSwitchTo(sinfo->oldcontext); +} - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); +static void * +addTuple(PGresult *res, AddTupFunc func, int id, size_t size) +{ + storeInfo *sinfo = (storeInfo *)PQgetAddTupleParam(res); + HeapTuple tuple; + int fields = PQnfields(res); + int i; + PGresAttValue *attval; + char **cstrs; - if (ntuples > 0) - { - AttInMetadata *attinmeta; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; - - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + if (sinfo->error_occurred) + return NULL; - values = (char **) palloc(nfields * sizeof(char *)); + switch (func) + { + case ADDTUP_ALLOC_TEXT: + case ADDTUP_ALLOC_BINARY: + if (id == -1) + return sinfo->attrvalbuf; + + if (id < 0 || id >= sinfo->nattrs) + return NULL; - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) + if (sinfo->valbufsize[id] < size) { - HeapTuple tuple; + if (sinfo->valbuf[id] == NULL) + sinfo->valbuf[id] = malloc(size); + else + sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size); + sinfo->valbufsize[id] = size; + } + return sinfo->valbuf[id]; - if (!is_sql_cmd) - { - int i; + case ADDTUP_ADD_TUPLE: + break; /* Go through */ + default: + /* Ignore */ + break; + } - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else - { - values[0] = PQcmdStatus(res); - } + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + finishStoreInfo(sinfo); - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); - } + PQsetAddTupleErrMes(res, + strdup("function returning record called in " + "context that cannot accept type record")); + return NULL; + } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); - } + /* + * Rewrite PGresAttDesc[] to char(*)[] in-place. + */ + Assert(sizeof(char*) <= sizeof(PGresAttValue)); + attval = (PGresAttValue *)sinfo->attrvalbuf; + cstrs = (char **)sinfo->attrvalbuf; + for(i = 0 ; i < fields ; i++) + cstrs[i] = PQgetAsCstring(attval++); - PQclear(res); + PG_TRY(); + { + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); } PG_CATCH(); { - /* be sure to release the libpq result */ - PQclear(res); - PG_RE_THROW(); + /* + * Return the error message in the exception to the caller and + * cancel the exception. + */ + ErrorData *edata; + + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + + finishStoreInfo(sinfo); + + edata = CopyErrorData(); + FlushErrorState(); + + PQsetAddTupleErrMes(res, strdup(edata->message)); + return NULL; } PG_END_TRY(); + + return sinfo->attrvalbuf;}/*
Hello, The documentation had slipped my mind. This is the patch to add the documentation of PGresult customstorage. It shows in section '31.19. Alternative resultstorage'. regards, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 252ff8c..dc2acb6 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -7229,6 +7229,325 @@ int PQisthreadsafe(); </sect1> + <sect1 id="libpq-alterstorage"> + <title>Alternative result storage</title> + + <indexterm zone="libpq-alterstorage"> + <primary>PGresult</primary> + <secondary>PGconn</secondary> + </indexterm> + + <para> + As the standard usage, users can get the result of command + execution from <structname>PGresult</structname> aquired + with <function>PGgetResult</function> + from <structname>PGConn</structname>. While the memory areas for + the PGresult are allocated with malloc() internally within calls of + command execution functions such as <function>PQexec</function> + and <function>PQgetResult</function>. If you have difficulties to + handle the result records in the form of PGresult, you can instruct + PGconn to store them into your own storage instead of PGresult. + </para> + + <variablelist> + <varlistentry id="libpq-registertupleadder"> + <term> + <function>PQregisterTupleAdder</function> + <indexterm> + <primary>PQregisterTupleAdder</primary> + </indexterm> + </term> + + <listitem> + <para> + Sets a function to allocate memory for each tuple and column + values, and add the completed tuple into your storage. +<synopsis> +void PQregisterTupleAdder(PGconn *conn, + addTupleFunction func, + void *param); +</synopsis> + </para> + + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object to set the tuple adder + function. PGresult created from this connection calles + this function to store the result tuples instead of + storing into its internal storage. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + Tuple adder function to set. NULL means to use the + default storage. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed + to <parameter>func</parameter>. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-addtuplefunction"> + <term> + <type>addTupleFunction</type> + <indexterm> + <primary>addTupleFunction</primary> + </indexterm> + </term> + + <listitem> + <para> + The type for the callback function to serve memory blocks for + each tuple and its column values, and to add the constructed + tuple into your own storage. +<synopsis> +typedef enum +{ + ADDTUP_ALLOC_TEXT, + ADDTUP_ALLOC_BINARY, + ADDTUP_ADD_TUPLE +} AddTupFunc; + +void *(*addTupleFunction)(PGresult *res, + AddTupFunc func, + int id, + size_t size); +</synopsis> + </para> + + <para> + Generally this function must return NULL for failure and should + set the error message + with <function>PGsetAddTupleErrMes</function> if the cause is + other than out of memory. This funcion must not throw any + exception. This function is called in the sequence following. + + <itemizedlist spacing="compact"> + <listitem> + <simpara>Call with <parameter>func</parameter> + = <firstterm>ADDTUP_ALLOC_BINARY</firstterm> + and <parameter>id</parameter> = -1 to request the memory + for tuple used as an array + of <type>PGresAttValue</type> </simpara> + </listitem> + <listitem> + <simpara>Call with <parameter>func</parameter> + = <firstterm>ADDTUP_ALLOC_TEXT</firstterm> + or <firstterm>ADDTUP_ALLOC_TEXT</firstterm> + and <parameter>id</parameter> is zero or positive number + to request the memory for each column value in current + tuple.</simpara> + </listitem> + <listitem> + <simpara>Call with <parameter>func</parameter> + = <firstterm>ADDTUP_ADD_TUPLE</firstterm> to request the + constructed tuple to store.</simpara> + </listitem> + </itemizedlist> + </para> + <para> + Calling <type>addTupleFunction</type> + with <parameter>func</parameter> = + <firstterm>ADDTUP_ALLOC_TEXT</firstterm> is telling to return a + memory block with at least <parameter>size</parameter> bytes + which may not be aligned to the word boundary. + <parameter>id</parameter> is a zero or positive number + distinguishes the usage of requested memory block, that is the + position of the column for which the memory block is used. + </para> + <para> + When <parameter>func</parameter> + = <firstterm>ADDTUP_ALLOC_BINARY</firstterm>, this function is + telled to return a memory block with at + least <parameter>size</parameter> bytes which is aligned to the + word boundary. + <parameter>id</parameter> is the identifier distinguishes the + usage of requested memory block. -1 means that it is used as an + array of <type>PGresAttValue</type> to store the tuple. Zero or + positive numbers have the same meanings as for + <firstterm>ADDTUP_ALLOC_BINARY</firstterm>. + </para> + <para>When <parameter>func</parameter> + = <firstterm>ADDTUP_ADD_TUPLE</firstterm>, this function is + telled to store the <type>PGresAttValue</type> structure + constructed by the caller into your storage. The pointer to the + tuple structure is not passed so you should memorize the + pointer to the memory block passed the caller on + <parameter>func</parameter> + = <parameter>ADDTUP_ALLOC_BINARY</parameter> + with <parameter>id</parameter> is -1. This function must return + any non-NULL values for success. You must properly put back the + memory blocks passed to the caller for this function if needed. + </para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + An <type>enum</type> value telling the function to perform. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed to func. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqgestasctring"> + <term> + <function>PQgetAsCstring</function> + <indexterm> + <primary>PQgetAsCstring</primary> + </indexterm> + </term> + <listitem> + <para> + Get the value of the column pointed + by <parameter>attval</parameter> in the form of + zero-terminated C string. Returns NULL if the value is null. +<synopsis> +char *PQgetAsCstring(PGresAttValue *attval) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>attval</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresAttValue</type> object + to retrieve the value. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqgetaddtupleparam"> + <term> + <function>PQgetAddTupleParam</function> + <indexterm> + <primary>PQgetAddTupleParam</primary> + </indexterm> + </term> + <listitem> + <para> + Get the pointer passed to <function>PQregisterTupleAdder</function> + as <parameter>param</parameter>. +<synopsis> +void *PQgetTupleParam(PGresult *res) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqsetaddtupleerrmes"> + <term> + <function>PQsetAddTupleErrMes</function> + <indexterm> + <primary>PQsetAddTupleErrMes</primary> + </indexterm> + </term> + <listitem> + <para> + Set the message for the error occurred in <type>addTupleFunction</type>. + If this message is not set, the error is assumed to be out of + memory. +<synopsis> +void PQsetAddTupleErrMes(PGresult *res, char *mes) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object + in <type>addTupleFunction</type>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>mes</parameter></term> + <listitem> + <para> + A pointer to the memory block containing the error + message, which must be allocated by alloc(). The + memory block will be freed with free() in the caller + of + <type>addTupleFunction</type> only if it returns NULL. + </para> + <para> + If <parameter>res</parameter> already has a message + previously set, it is freed and the given message is + set. Set NULL to cancel the the costom message. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + </sect1> + + <sect1 id="libpq-build"> <title>Building <application>libpq</application> Programs</title>
On 12/01/2011 05:48 AM, Kyotaro HORIGUCHI wrote: > xfer time Peak RSS > Original : 6.02s 850MB > libpq patch + Original dblink : 6.11s 850MB > full patch : 4.44s 643MB > These look like interesting results. Currently Tom is listed as the reviewer on this patch, based on comments made before the CF really started. And the patch has been incorrectly been sitting in "Waiting for author" for the last week; oops. I'm not sure what to do with this one now except raise a general call to see if anyone wants to take a look at it, now that it seems to be in good enough shape to deliver measurable results. -- Greg Smith 2ndQuadrant US greg@2ndQuadrant.com Baltimore, MD PostgreSQL Training, Services, and 24x7 Support www.2ndQuadrant.us
Greg Smith <greg@2ndQuadrant.com> writes: > On 12/01/2011 05:48 AM, Kyotaro HORIGUCHI wrote: >> xfer time Peak RSS >> Original : 6.02s 850MB >> libpq patch + Original dblink : 6.11s 850MB >> full patch : 4.44s 643MB > These look like interesting results. Currently Tom is listed as the > reviewer on this patch, based on comments made before the CF really > started. And the patch has been incorrectly been sitting in "Waiting > for author" for the last week; oops. I'm not sure what to do with this > one now except raise a general call to see if anyone wants to take a > look at it, now that it seems to be in good enough shape to deliver > measurable results. I did list myself as reviewer some time ago, but if anyone else wants to take it I won't be offended ;-) regards, tom lane
On Thu, Dec 8, 2011 at 5:41 AM, Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp> wrote: > This is the patch to add the documentation of PGresult custom > storage. It shows in section '31.19. Alternative result > storage'. It would be good to consolidate this into the main patch. I find the names of the functions added here to be quite confusing and would suggest renaming them. I expected PQgetAsCstring to do something similar to PQgetvalue, but the code is completely different, and even after reading the documentation I still don't understand what that function is supposed to be used for. Why "as cstring"? What would the other option be? I also don't think the "add tuple" terminology is particularly good. It's not obvious from the name that what you're doing is overriding the way memory is allocated and results are stored. Also, what about the problem Tom mentioned here? http://archives.postgresql.org/message-id/1042.1321123761@sss.pgh.pa.us -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
Hello, thank you for taking the time for comment. At Wed, 21 Dec 2011 11:09:59 -0500, Robert Haas <robertmhaas@gmail.com> wrote > I find the names of the functions added here to be quite > confusing and would suggest renaming them. I expected > PQgetAsCstring to do something similar to PQgetvalue, but the > code is completely different, To be honest, I've also felt that kind of perplexity. If the problem is simply of the naming, I can propose the another name "PQreadAttValue"... This is not so good too... But... > and even after reading the documentation I still don't > understand what that function is supposed to be used for. Why > "as cstring"? What would the other option be? Is it a problem of the poor description? Or about the raison d'être of the function? The immediate cause of the existence of the function is that getAnotherTuple internally stores the field values of the tuples sent from the server, in the form of PGresAttValue, and I have found only one route to store a tuple into TupleStore is BuildeTupleFromCStrings() to tupelstore_puttuple() which is dblink does in materializeResult(), and of cource C-string is the most natural format in C program, and I have hesitated to modify execTuples.c, and I wanted to hide the details of PGresAttValue. Assuming that the values are passed as PGresAttValue* is given (for the reasons of performance and the extent of the modification), the "adding tuple" functions should get the value from the struct. This can be done in two ways from the view of authority (`encapsulation', in other words) and convenience, one is with the knowledge of the structure, and the other is without it. PQgetAsCstring is the latter approach. (And it is inconsistent with the fact that the definition of PGresAttValue is moved into lipq-fe.h from libpq-int.h. The details of the structure should be hidden like PGresult in this approach). But it is not obvious that the choice is better than the another one. If we consider that PGresAttValue is too simple and stable to hide the details, PQgetAsCString will be taken off and the problem will go out. PGresAttValue needs documentation in this case. I prefer to handle PGresAttValue directly if no problem. > I also don't think the "add tuple" terminology is particularly good. > It's not obvious from the name that what you're doing is overriding > the way memory is allocated and results are stored. This phrase is taken from pqAddTuple() in fe-exec.c at first and have not been changed after the function is integrated with other functions. I propose "tuple storage handler" for the alternative. - typedef void *(*addTupleFunction)(...); + typedef void *(*tupleStorageHandler)(...); - typedef enum { ADDTUP_*, } AddTupFunc; + typedef enum { TSHANDLER_*, } TSHandlerCommand; - void *PQgetAddTupleParam(...); + void *PQgetTupleStrageHandlerContext(...); - void PQregisterTupleAdder(...); + void PQregisterTupleStoreHandler(...); - addTupleFunction PGresult.addTupleFunc; + tupleStorageHandler PGresult.tupleStorageHandlerFunc; - void *PGresult.addTuleFuncParam; + void *PGresult.tupleStorageHandlerContext; - char *PGresult.addTuleFuncErrMes; + void *PGresult.tupelStrageHandlerErrMes; > Also, what about the problem Tom mentioned here? > > http://archives.postgresql.org/message-id/1042.1321123761@sss.pgh.pa.us The plan that simply replace malloc's with something like palloc's is abandoned for the narrow scope. dblink-plus copies whole PGresult into TupleStore in order to avoid making orphaned memory on SIGINT. The resource owner mechanism is principally applicable to that but practically hard for the reason that current implementation without radically modification couldn't accept it.. In addition to that, dblink also does same thing for maybe the same reason with dblink-plus and another reason as far as I heard. Whatever the reason is, both dblink and dblink-plus do the same thing that could lower the performance than expected. If TupleStore(TupleDesc) is preferable to PGresult for in-backend use and oridinary(client-use) libpq users can handle only PGresult, the mechanism like this patch would be reuired to maintain the compatibility, I think. To the contrary, if there is no significant reason to use TupleStore in backend use - it implies that existing mechanisms like resource owner can save the backend inexpensively from possible inconvenience caused by using PGresult storage in backends - PGresult should be used as it is. I think TupleStore prepared to be used in backend is preferable for the usage and don't want to get data making detour via PGresult. regards, -- Kyotaro Horiguchi NTT Open Source Software Center
One patch that fell off the truck during a turn in the November CommitFest was "Allow substitute allocators for PGresult". Re-reading the whole thing again, it actually turned into a rather different submission in the middle, and I know I didn't follow that shift correctly then. I'm replying to its thread but have changed the subject to reflect that change. From a procedural point of view, I don't feel right kicking this back to its author on a Friday night when the deadline for resubmitting it would be Sunday. Instead I've refreshed the patch myself and am adding it to the January CommitFest. The new patch is a single file; it's easy enough to split out the dblink changes if someone wants to work with the pieces separately. After my meta-review I think we should get another reviewer familiar with using dblink to look at this next. This is fundamentally a performance patch now. Some results and benchmarking code were submitted along with it; the other issues are moot if those aren't reproducible. The secondary goal for a new review here is to provide another opinion on the naming issues and abstraction concerns raised so far. To clear out the original line of thinking, this is not a replacement low-level storage allocator anymore. The idea of using such a mechanism to help catch memory leaks has also been dropped. Instead this adds and documents a new path for libpq callers to more directly receive tuples, for both improved speed and lower memory usage. dblink has been modified to use this new mechanism. Benchmarking by the author suggests no significant change in libpq speed when only that change was made, while the modified dblink using the new mechanism was significantly faster. It jumped from 332K tuples/sec to 450K, a 35% gain, and had a lower memory footprint too. Test methodology and those results are at http://archives.postgresql.org/pgsql-hackers/2011-12/msg00008.php Robert Haas did a quick code review of this already, it along with author response mixed in are at http://archives.postgresql.org/pgsql-hackers/2011-12/msg01149.php I see two areas of contention there: -There are several naming bits no one is happy with yet. Robert didn't like some of them, but neither did Kyotaro. I don't have an opinion myself. Is it the case that some changes to the existing code's terminology are what's actually needed to make this all better? Or is this just fundamentally warty and there's nothing to be done about it. Dunno. -There is an abstraction wrapper vs. coding convenience trade-off centering around PGresAttValue. It sounded to me like it raised always fun questions like "where's the right place for the line between lipq-fe.h and libpq-int.h to be?" dblink is pretty popular, and this is a big performance win for it. If naming and API boundary issues are the worst problems here, this sounds like something well worth pursuing as part of 9.2's still advancing performance theme. -- Greg Smith 2ndQuadrant US greg@2ndQuadrant.com Baltimore, MD PostgreSQL Training, Services, and 24x7 Support www.2ndQuadrant.com
Вложения
Hello, This is revised and rebased version of the patch. a. Old term `Add Tuple Function' is changed to 'Store Handler'. The reason why not `storage' is simply length of the symbols. b. I couldn't find the place to settle PGgetAsCString() in. It is removed and storeHandler()@dblink.c touches PGresAttValue directly in this new patch. Definition of PGresAttValue stays in lipq-fe.h and provided with comment. c. Refine error handling of dblink.c. I think it preserves the previous behavior for column number mismatch and type conversionexception. d. Document is revised. > It jumped from 332K tuples/sec to 450K, a 35% gain, and had a > lower memory footprint too. Test methodology and those results > are at > http://archives.postgresql.org/pgsql-hackers/2011-12/msg00008.php It is a disappointment that I found that the gain had become lower than that according to the re-measuring. For CentOS6.2 and other conditions are the same to the previous testing, the overall performance became hihger and the loss of libpq patch was 1.8% and the gain of full patch had been fallen to 5.6%. But the reduction of the memory usage was not changed. Original : 3.96s 100.0% w/libpq patch : 4.03s 101.8% w/libpq+dblink patch : 3.74s 94.4% The attachments are listed below. libpq_altstore_20120117.patch - Allow alternative storage for libpql. dblink_perf_20120117.patch - Modify dblink to use alternative storage mechanism. libpq_altstore_doc_20120117.patch - Document for libpq_altstore. Shows in "31.19. Alternatie result storage" regards, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1af8df6..83525e1 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,3 +160,6 @@ PQconnectStartParams 157PQping 158PQpingParams 159PQlibVersion 160 +PQregisterStoreHandler 161 +PQgetStoreHandlerParam 163 +PQsetStoreHandlerErrMes 164 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index d454538..5559f0b 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2692,6 +2692,7 @@ makeEmptyPGconn(void) conn->allow_ssl_try = true; conn->wait_ssl_try = false;#endif + conn->storeHandler = NULL; /* * We try to send at least 8K at a time, which is the usual size of pipe @@ -5076,3 +5077,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler) return prev;} + +void +PQregisterStoreHandler(PGconn *conn, StoreHandler func, void *param) +{ + conn->storeHandler = func; + conn->storeHandlerParam = param; +} diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index b743566..96e5974 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -67,6 +67,10 @@ static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target);staticint check_field_number(const PGresult *res, int field_num); +static void *pqDefaultStoreHandler(PGresult *res, PQStoreFunc func, + int id, size_t len); +static void *pqAddTuple(PGresult *res, PGresAttValue *tup); +/* ---------------- * Space management for PGresult. @@ -160,6 +164,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) result->curBlock = NULL; result->curOffset= 0; result->spaceLeft = 0; + result->storeHandler = pqDefaultStoreHandler; + result->storeHandlerParam = NULL; + result->storeHandlerErrMes = NULL; if (conn) { @@ -194,6 +201,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) } result->nEvents = conn->nEvents; } + + if (conn->storeHandler) + { + result->storeHandler = conn->storeHandler; + result->storeHandlerParam = conn->storeHandlerParam; + } } else { @@ -487,6 +500,33 @@ PQresultAlloc(PGresult *res, size_t nBytes) return pqResultAlloc(res, nBytes, TRUE);} +void * +pqDefaultStoreHandler(PGresult *res, PQStoreFunc func, int id, size_t len) +{ + void *p; + + switch (func) + { + case PQSF_ALLOC_TEXT: + return pqResultAlloc(res, len, TRUE); + + case PQSF_ALLOC_BINARY: + p = pqResultAlloc(res, len, FALSE); + + if (id == -1) + res->storeHandlerParam = p; + + return p; + + case PQSF_ADD_TUPLE: + return pqAddTuple(res, res->storeHandlerParam); + + default: + /* Ignore */ + break; + } + return NULL; +}/* * pqResultAlloc - * Allocate subsidiary storage for a PGresult. @@ -830,9 +870,9 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple * add a row pointerto the PGresult structure, growing it if necessary - * Returns TRUE if OK, FALSE if not enough memory to add the row + * Returns tup if OK, NULL if not enough memory to add the row. */ -int +static void *pqAddTuple(PGresult *res, PGresAttValue *tup){ if (res->ntups >= res->tupArrSize) @@ -858,13 +898,13 @@ pqAddTuple(PGresult *res, PGresAttValue *tup) newTuples = (PGresAttValue **) realloc(res->tuples, newSize * sizeof(PGresAttValue *)); if (!newTuples) - return FALSE; /* malloc or realloc failed */ + return NULL; /* malloc or realloc failed */ res->tupArrSize = newSize; res->tuples = newTuples; } res->tuples[res->ntups] = tup; res->ntups++; - return TRUE; + return tup;}/* @@ -2822,6 +2862,35 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num) return 0;} +/* PQgetAddStoreHandlerParam + * Get the pointer to the contextual parameter from PGresult which is + * registered to PGconn by PQregisterStoreHandler + */ +void * +PQgetStoreHandlerParam(const PGresult *res) +{ + if (!res) + return NULL; + return res->storeHandlerParam; +} + +/* PQsetStorHandlerErrMes + * Set the error message pass back to the caller of StoreHandler. + * + * mes must be a malloc'ed memory block and it will be released by + * the caller of StoreHandler. You can replace the previous message + * by alternative mes, or clear it with NULL. The previous one will + * be freed internally. + */ +void +PQsetStoreHandlerErrMes(PGresult *res, char *mes) +{ + /* Free existing message */ + if (res->storeHandlerErrMes) + free(res->storeHandlerErrMes); + res->storeHandlerErrMes = mes; +} +/* PQnparams: * returns the number of input parameters of a prepared statement. */ diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index a7c3899..205502b 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -733,9 +733,10 @@ getAnotherTuple(PGconn *conn, bool binary) if (conn->curTuple == NULL) { conn->curTuple= (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); + result->storeHandler(result, PQSF_ALLOC_BINARY, -1, + nfields * sizeof(PGresAttValue)); if (conn->curTuple == NULL) - goto outOfMemory; + goto addTupleError; MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); /* @@ -757,7 +758,7 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) - goto outOfMemory; + goto addTupleError; } if (pqGetnchar(bitmap, nbytes, conn)) @@ -787,9 +788,12 @@ getAnotherTuple(PGconn *conn, bool binary) vlen = 0; if (tup[i].value == NULL) { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); + PQStoreFunc func = + (binary ? PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT); + tup[i].value = + (char *) result->storeHandler(result, func, i, vlen + 1); if (tup[i].value == NULL) - goto outOfMemory; + goto addTupleError; } tup[i].len = vlen; /* read in the value */ @@ -812,8 +816,9 @@ getAnotherTuple(PGconn *conn, bool binary) } /* Success! Store the completed tuple in the result*/ - if (!pqAddTuple(result, tup)) - goto outOfMemory; + if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0)) + goto addTupleError; + /* and reset for a new message */ conn->curTuple = NULL; @@ -821,7 +826,7 @@ getAnotherTuple(PGconn *conn, bool binary) free(bitmap); return 0; -outOfMemory: +addTupleError: /* Replace partially constructed result with an error result */ /* @@ -829,8 +834,21 @@ outOfMemory: * there's not enough memory to concatenate messages... */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->storeHandlerErrMes ? + result->storeHandlerErrMes : + "out of memory for query result\n")); + if (result->storeHandlerErrMes) + { + free(result->storeHandlerErrMes); + result->storeHandlerErrMes = NULL; + } /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 892dcbc..117c38a 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -634,9 +634,10 @@ getAnotherTuple(PGconn *conn, int msgLength) if (conn->curTuple == NULL) { conn->curTuple= (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); + result->storeHandler(result, PQSF_ALLOC_BINARY, -1, + nfields * sizeof(PGresAttValue)); if (conn->curTuple == NULL) - goto outOfMemory; + goto addTupleError; MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); } tup = conn->curTuple; @@ -673,11 +674,12 @@ getAnotherTuple(PGconn *conn, int msgLength) vlen = 0; if (tup[i].value == NULL) { - bool isbinary = (result->attDescs[i].format != 0); - - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); + PQStoreFunc func = (result->attDescs[i].format != 0 ? + PQSF_ALLOC_BINARY : PQSF_ALLOC_TEXT); + tup[i].value = + (char *) result->storeHandler(result, func, i, vlen + 1); if (tup[i].value == NULL) - goto outOfMemory; + goto addTupleError; } tup[i].len = vlen; /* read in the value */ @@ -689,22 +691,36 @@ getAnotherTuple(PGconn *conn, int msgLength) } /* Success! Store the completed tuple in theresult */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; + if (!result->storeHandler(result, PQSF_ADD_TUPLE, 0, 0)) + goto addTupleError; + /* and reset for a new message */ conn->curTuple = NULL; return 0; -outOfMemory: +addTupleError: /* * Replace partially constructed result with an error result. First * discard the old resultto try to win back some memory. */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->storeHandlerErrMes ? + result->storeHandlerErrMes : + "out of memory for query result\n")); + if (result->storeHandlerErrMes) + { + free(result->storeHandlerErrMes); + result->storeHandlerErrMes = NULL; + } pqSaveErrorResult(conn); /* Discard the failed message by pretending we read it */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index ef26ab9..6d86fa0 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -116,6 +116,16 @@ typedef enum PQPING_NO_ATTEMPT /* connection not attempted (bad params) */} PGPing; +/* PQStoreFunc is the enum for one of the parameters of storeHandler + * that decides what to do. See the typedef StoreHandler for + * details */ +typedef enum +{ + PQSF_ALLOC_TEXT, /* Requested non-aligned memory for text value */ + PQSF_ALLOC_BINARY, /* Requested aligned memory for binary value */ + PQSF_ADD_TUPLE /* Requested to add tuple data into store */ +} PQStoreFunc; +/* PGconn encapsulates a connection to the backend. * The contents of this struct are not supposed to be known to applications.*/ @@ -149,6 +159,15 @@ typedef struct pgNotify struct pgNotify *next; /* list link */} PGnotify; +/* PGresAttValue represents a value of one tuple field in string form. + NULL is represented as len < 0. Otherwise value points to a null + terminated C string with the length of len. */ +typedef struct pgresAttValue +{ + int len; /* length in bytes of the value */ + char *value; /* actual value, plus terminating zero byte */ +} PGresAttValue; +/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message); @@ -416,6 +435,52 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const * keywords, const char *const * values, int expand_dbname); +/* + * Typedef for alternative result store handler. + * + * This function pointer is used for alternative result store handler + * callback in PGresult and PGconn. + * + * StoreHandler is called for three functions designated by the enum + * PQStoreFunc. + * + * id is the identifier for allocated memory block. The caller sets -1 + * for PGresAttValue array, and 0 to number of cols - 1 for each + * column. + * + * PQSF_ALLOC_TEXT requests the size bytes memory block for a text + * value which may not be alingned to the word boundary. + * + * PQSF_ALLOC_BINARY requests the size bytes memory block for a binary + * value which is aligned to the word boundary. + * + * PQSF_ADD_TUPLE requests to add tuple data into the result store, + * and free the memory blocks allocated by this function if necessary. + * id and size are to be ignored for this function. + * + * This function must return non-NULL value for success and must + * return NULL for failure and may set error message by + * PQsetStoreHandlerErrMes. It is assumed by caller as out of memory + * when the error message is NULL on failure. This function is assumed + * not to throw any exception. + */ +typedef void *(*StoreHandler)(PGresult *res, PQStoreFunc func, + int id, size_t size); + +/* + * Register alternative result store function to PGconn. + * + * By registering this function, pg_result disables its own result + * store and calls it to append rows one by one. + * + * func is tuple store function. See the typedef StoreHandler. + * + * storeHandlerParam is the contextual variable that can be get with + * PQgetStoreHandlerParam in StoreHandler. + */ +extern void PQregisterStoreHandler(PGconn *conn, StoreHandler func, + void *storeHandlerParam); +/* Force the write buffer to be written (or at least try) */extern int PQflush(PGconn *conn); @@ -454,6 +519,8 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num, intfield_num);extern int PQgetlength(const PGresult *res, int tup_num, int field_num);extern int PQgetisnull(constPGresult *res, int tup_num, int field_num); +extern void *PQgetStoreHandlerParam(const PGresult *res); +extern void PQsetStoreHandlerErrMes(PGresult *res, char *mes);extern int PQnparams(const PGresult *res);extern Oid PQparamtype(const PGresult *res, int param_num); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index d967d60..e28e712 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -134,12 +134,6 @@ typedef struct pgresParamDesc#define NULL_LEN (-1) /* pg_result len for NULL value */ -typedef struct pgresAttValue -{ - int len; /* length in bytes of the value */ - char *value; /* actual value, plus terminating zero byte */ -} PGresAttValue; -/* Typedef for message-field list entries */typedef struct pgMessageField{ @@ -209,6 +203,11 @@ struct pg_result PGresult_data *curBlock; /* most recently allocated block */ int curOffset; /* start offset of free space in block */ int spaceLeft; /* number of free bytesremaining in block */ + + StoreHandler storeHandler; /* Result store handler. See + * StoreHandler for details. */ + void *storeHandlerParam; /* Contextual parameter for storeHandler */ + char *storeHandlerErrMes; /* Error message from storeHandler */};/* PGAsyncStatusType defines the state of the query-executionstate machine */ @@ -443,6 +442,13 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer;/* expansible string */ + + /* Tuple store handler. The two fields below is copied to newly + * created PGresult if tupStoreHandler is not NULL. Use default + * function if NULL. */ + StoreHandler storeHandler; /* Result store handler. See + * StoreHandler for details. */ + void *storeHandlerParam; /* Contextual parameter for storeHandler */};/* PGcancel stores all data necessary to cancela connection. A copy of this @@ -507,7 +513,6 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); -extern int pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code, const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 36a8e3e..a8685a9 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,11 +63,24 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor*/} remoteConn; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + AttInMetadata *attinmeta; + MemoryContext oldcontext; + char *attrvalbuf; + void **valbuf; + size_t *valbufsize; + bool error_occurred; + bool nummismatch; + ErrorData *edata; +} storeInfo; +/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); -static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn); @@ -90,6 +103,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); +static void finishStoreInfo(storeInfo *sinfo); +static void *storeHandler(PGresult *res, PQStoreFunc func, int id, size_t size); +/* Global */static remoteConn *pconn = NULL; @@ -503,6 +520,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; @@ -559,15 +577,36 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterStoreHandler(conn, storeHandler, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even thoughwe are still in a short-lived * memory context. */ res = PQexec(conn, buf.data); + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + dblink_res_error(conname, res, "could not fetch from cursor", fail); return (Datum) 0; } @@ -579,8 +618,8 @@ dblink_fetch(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} @@ -640,6 +679,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo,ReturnSetInfo)) @@ -715,164 +755,213 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc= NULL; + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterStoreHandler(conn, storeHandler, &storeinfo); + /* synchronous query, or async result retrieval */ if (!is_async) res = PQexec(conn, sql); else - { res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; - } - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + finishStoreInfo(&storeinfo); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} -/* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */static void -materializeResult(FunctionCallInfo fcinfo, PGresult *res) +initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - - Assert(rsinfo->returnMode == SFRM_Materialize); - - PG_TRY(); + TupleDesc tupdesc; + int i; + + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) { - TupleDesc tupdesc; - bool is_sql_cmd = false; - int ntuples; - int nfields; + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + sinfo->oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->edata = NULL; + sinfo->nattrs = tupdesc->natts; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->valbuf = (void **)malloc(sinfo->nattrs * sizeof(void *)); + sinfo->valbufsize = (size_t *)malloc(sinfo->nattrs * sizeof(size_t)); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + sinfo->valbuf[i] = NULL; + sinfo->valbufsize[i] = 0; + } - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + /* Preallocate memory of same size with PGresAttDesc array for values. */ + sinfo->attrvalbuf = (char *) malloc(sinfo->nattrs * sizeof(PGresAttValue)); - is_sql_cmd = false; + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; +} - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } +static void +finishStoreInfo(storeInfo *sinfo) +{ + int i; - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + if (sinfo->valbuf[i]) + { + free(sinfo->valbuf[i]); + sinfo->valbuf[i] = NULL; } + } + if (sinfo->attrvalbuf) + free(sinfo->attrvalbuf); + sinfo->attrvalbuf = NULL; + MemoryContextSwitchTo(sinfo->oldcontext); +} - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); +static void * +storeHandler(PGresult *res, PQStoreFunc func, int id, size_t size) +{ + storeInfo *sinfo = (storeInfo *)PQgetStoreHandlerParam(res); + HeapTuple tuple; + int fields = PQnfields(res); + int i; + PGresAttValue *attval; + char **cstrs; - if (ntuples > 0) - { - AttInMetadata *attinmeta; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; - - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + if (sinfo->error_occurred) + return NULL; + + switch (func) + { + case PQSF_ALLOC_TEXT: + case PQSF_ALLOC_BINARY: + if (id == -1) + return sinfo->attrvalbuf; - values = (char **) palloc(nfields * sizeof(char *)); + if (id < 0 || id >= sinfo->nattrs) + return NULL; - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) + if (sinfo->valbufsize[id] < size) { - HeapTuple tuple; + if (sinfo->valbuf[id] == NULL) + sinfo->valbuf[id] = malloc(size); + else + sinfo->valbuf[id] = realloc(sinfo->valbuf[id], size); + sinfo->valbufsize[id] = size; + } + return sinfo->valbuf[id]; - if (!is_sql_cmd) - { - int i; + case PQSF_ADD_TUPLE: + break; /* Go through */ + default: + /* Ignore */ + break; + } - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else - { - values[0] = PQcmdStatus(res); - } + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + finishStoreInfo(sinfo); - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); - } + /* This error will be processed in + * dblink_record_internal(). So do not set error message + * here. */ + return NULL; + } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); - } + /* + * Rewrite PGresAttValue[] to char(*)[] in-place. + */ + Assert(sizeof(char*) <= sizeof(PGresAttValue)); - PQclear(res); + attval = (PGresAttValue *)sinfo->attrvalbuf; + cstrs = (char **)sinfo->attrvalbuf; + for(i = 0 ; i < fields ; i++) + { + if (attval->len < 0) + cstrs[i] = NULL; + else + cstrs[i] = attval->value; + } + + PG_TRY(); + { + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); } PG_CATCH(); { - /* be sure to release the libpq result */ - PQclear(res); - PG_RE_THROW(); + MemoryContext context; + /* + * Store exception for later ReThrow and cancel the exception. + */ + sinfo->error_occurred = TRUE; + context = MemoryContextSwitchTo(sinfo->oldcontext); + sinfo->edata = CopyErrorData(); + MemoryContextSwitchTo(context); + FlushErrorState(); + + return NULL; } PG_END_TRY(); + + return sinfo->attrvalbuf;}/* diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 72c9384..8803999 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -7233,6 +7233,293 @@ int PQisthreadsafe(); </sect1> + <sect1 id="libpq-alterstorage"> + <title>Alternative result storage</title> + + <indexterm zone="libpq-alterstorage"> + <primary>PGresult</primary> + <secondary>PGconn</secondary> + </indexterm> + + <para> + As the standard usage, users can get the result of command + execution from <structname>PGresult</structname> aquired + with <function>PGgetResult</function> + from <structname>PGConn</structname>. While the memory areas for + the PGresult are allocated with malloc() internally within calls of + command execution functions such as <function>PQexec</function> + and <function>PQgetResult</function>. If you have difficulties to + handle the result records in the form of PGresult, you can instruct + PGconn to store them into your own storage instead of PGresult. + </para> + + <variablelist> + <varlistentry id="libpq-registerstorehandler"> + <term> + <function>PQregisterStoreHandler</function> + <indexterm> + <primary>PQregisterStoreHandler</primary> + </indexterm> + </term> + + <listitem> + <para> + Sets a callback function to allocate memory for each tuple and + column values, and add the complete tuple into the alternative + result storage. +<synopsis> +void PQregisterStoreHandler(PGconn *conn, + StoreHandler func, + void *param); +</synopsis> + </para> + + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object to set the storage handler + function. PGresult created from this connection calls this + function to store the result instead of storing into its + internal storage. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + Storage handler function to set. NULL means to use the + default storage. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed + to <parameter>func</parameter>. You can get this poiner + in <type>StoreHandler</type> + by <function>PQgetStoreHandlerParam</function>. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-storehandler"> + <term> + <type>Storehandler</type> + <indexterm> + <primary>StoreHandler</primary> + </indexterm> + </term> + + <listitem> + <para> + The type for the storage handler callback function. +<synopsis> +typedef enum +{ + PQSF_ALLOC_TEXT, + PQSF_ALLOC_BINARY, + PQSF_ADD_TUPLE +} PQStoreFunc; + +void *(*StoreHandler)(PGresult *res, + PQStoreFunc func, + int id, + size_t size); +</synopsis> + </para> + + <para> + Generally this function must return NULL for failure and should + set the error message + with <function>PGsetStoreHandlerErrMes</function> if the cause + is other than out of memory. This funcion must not throw any + exception. This function is called in the sequence following. + + <itemizedlist spacing="compact"> + <listitem> + <simpara>Call with <parameter>func</parameter> + = <firstterm>PQSF_ALLOC_BINARY</firstterm> + and <parameter>id</parameter> = -1 to request the memory + for a tuple to be used as an array + of <type>PGresAttValue</type>. </simpara> + </listitem> + <listitem> + <simpara>Call with <parameter>func</parameter> + = <firstterm>PQSF_ALLOC_TEXT</firstterm> + or <firstterm>PQSF_ALLOC_BINARY</firstterm> + and <parameter>id</parameter> is zero to the number of columns + - 1 to request the memory for each column value in current + tuple.</simpara> + </listitem> + <listitem> + <simpara>Call with <parameter>func</parameter> + = <firstterm>PQSF_ADD_TUPLE</firstterm> to request the + constructed tuple to be stored.</simpara> + </listitem> + </itemizedlist> + </para> + <para> + Calling <type>StoreHandler</type> + with <parameter>func</parameter> = + <firstterm>PQSF_ALLOC_TEXT</firstterm> is telling to return a + memory block with at least <parameter>size</parameter> bytes + which may not be aligned to the word boundary. + <parameter>id</parameter> is a zero or positive number + distinguishes the usage of requested memory block, that is the + position of the column for which the memory block is used. + </para> + <para> + When <parameter>func</parameter> + = <firstterm>PQSF_ALLOC_BINARY</firstterm>, this function is + telled to return a memory block with at + least <parameter>size</parameter> bytes which is aligned to the + word boundary. + <parameter>id</parameter> is the identifier distinguishes the + usage of requested memory block. -1 means that it is used as an + array of <type>PGresAttValue</type> to store the tuple. Zero or + positive numbers have the same meanings as for + <firstterm>PQSF_ALLOC_BINARY</firstterm>. + </para> + <para>When <parameter>func</parameter> + = <firstterm>PQSF_ADD_TUPLE</firstterm>, this function is + telled to store the <type>PGresAttValue</type> structure + constructed by the caller into your storage. The pointer to the + tuple structure is not passed so you should memorize the + pointer to the memory block passed back the caller on + <parameter>func</parameter> + = <parameter>PQSF_ALLOC_BINARY</parameter> + with <parameter>id</parameter> is -1. This function must return + any non-NULL values for success. You must properly put back the + memory blocks passed to the caller in this function if needed. + </para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + An <type>enum</type> value telling the function to perform. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed to func. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqgetstorehandlerparam"> + <term> + <function>PQgetStoreHandlerParam</function> + <indexterm> + <primary>PQgetStoreHandlerParam</primary> + </indexterm> + </term> + <listitem> + <para> + Get the pointer passed to <function>PQregisterStoreHandler</function> + as <parameter>param</parameter>. +<synopsis> +void *PQgetStoreHandlerParam(PGresult *res) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqsetstorehandlererrmes"> + <term> + <function>PQsetStoreHandlerErrMes</function> + <indexterm> + <primary>PQsetStoreHandlerErrMes</primary> + </indexterm> + </term> + <listitem> + <para> + Set the message for the error occurred + in <type>StoreHandler</type>. If this message is not set, the + caller assumes the error to be out of memory. +<synopsis> +void PQsetStoreHandlerErrMes(PGresult *res, char *mes) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object + passed to <type>StoreHandler</type>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>mes</parameter></term> + <listitem> + <para> + A pointer to the memory block containing the error + message, which is allocated + by <function>malloc()</function>. The memory block + will be freed with <function>free()</function> in the + caller of + <type>StoreHandler</type> only if it returns NULL. + </para> + <para> + If <parameter>res</parameter> already has a message previously + set, it is freed and then the given message is set. Set NULL + to cancel the the costom message. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + </sect1> + + <sect1 id="libpq-build"> <title>Building <application>libpq</application> Programs</title>
On Tue, Jan 17, 2012 at 05:53:33PM +0900, Kyotaro HORIGUCHI wrote: > Hello, This is revised and rebased version of the patch. > > a. Old term `Add Tuple Function' is changed to 'Store > Handler'. The reason why not `storage' is simply length of the > symbols. > > b. I couldn't find the place to settle PGgetAsCString() in. It is > removed and storeHandler()@dblink.c touches PGresAttValue > directly in this new patch. Definition of PGresAttValue stays > in lipq-fe.h and provided with comment. > > c. Refine error handling of dblink.c. I think it preserves the > previous behavior for column number mismatch and type > conversion exception. > > d. Document is revised. First, my priority is one-the-fly result processing, not the allocation optimizing. And this patch seems to make it possible, I can process results row-by-row, without the need to buffer all of them in PQresult. Which is great! But the current API seems clumsy, I guess its because the patch grew from trying to replace the low-level allocator. I would like to propose better one-shot API with: void *(*RowStoreHandler)(PGresult *res, PGresAttValue *columns); where the PGresAttValue * is allocated once, inside PQresult. And the pointers inside point directly to network buffer. Ofcourse this requires replacing the current per-column malloc+copy pattern with per-row parse+handle pattern, but I think resulting API will be better: 1) Pass-through processing do not need to care about unnecessary per-row allocations. 2) Handlers that want to copy of the row (like regular libpq), can optimize allocations by having "global" view of the row. (Eg. One allocation for row header + data). This also optimizes call patterns - first libpq parses packet, then row handler processes row, no unnecessary back-and-forth. Summary - current API has various assumptions how the row is processed, let's remove those. -- marko
> > > > c. Refine error handling of dblink.c. I think it preserves the > > previous behavior for column number mismatch and type > > conversion exception. Hello, I don't know if this cover following issue. I just mention it for the case you didn't notice it and would like to handle this rather cosmetic issue as well. http://archives.postgresql.org/pgsql-bugs/2011-08/msg00113.php best regards, Marc Mamin
On Sat, Jan 21, 2012 at 1:52 PM, Marc Mamin <M.Mamin@intershop.de> wrote: >> > >> > c. Refine error handling of dblink.c. I think it preserves the >> > previous behavior for column number mismatch and type >> > conversion exception. > > Hello, > > I don't know if this cover following issue. > I just mention it for the case you didn't notice it and would like to > handle this rather cosmetic issue as well. > > http://archives.postgresql.org/pgsql-bugs/2011-08/msg00113.php It is not relevant to this thread, but seems good idea to implement indeed. It should be simple matter of creating handler that uses dblink_res_error() to report the notice. Perhaps you could create and submit the patch by yourself? For reference, here it the full flow in PL/Proxy: 1) PQsetNoticeReceiver: https://github.com/markokr/plproxy-dev/blob/master/src/execute.c#L422 2) handle_notice: https://github.com/markokr/plproxy-dev/blob/master/src/execute.c#L370 3) plproxy_remote_error: https://github.com/markokr/plproxy-dev/blob/master/src/main.c#L82 -- marko
Thank you for the comment, > First, my priority is one-the-fly result processing, > not the allocation optimizing. And this patch seems to make > it possible, I can process results row-by-row, without the > need to buffer all of them in PQresult. Which is great! > > But the current API seems clumsy, I guess its because the > patch grew from trying to replace the low-level allocator. Exactly. > I would like to propose better one-shot API with: > > void *(*RowStoreHandler)(PGresult *res, PGresAttValue *columns); > > where the PGresAttValue * is allocated once, inside PQresult. > And the pointers inside point directly to network buffer. Good catch, thank you. The patch is dragging too much from the old implementation. It is no need to copy the data inside getAnotherTuple to do it, as you say. > Ofcourse this requires replacing the current per-column malloc+copy > pattern with per-row parse+handle pattern, but I think resulting > API will be better: > > 1) Pass-through processing do not need to care about unnecessary > per-row allocations. > > 2) Handlers that want to copy of the row (like regular libpq), > can optimize allocations by having "global" view of the row. > (Eg. One allocation for row header + data). > > This also optimizes call patterns - first libpq parses packet, > then row handler processes row, no unnecessary back-and-forth. > > > Summary - current API has various assumptions how the row is > processed, let's remove those. Thank you, I rewrite the patch to make it realize. regards, -- Kyotaro Horiguchi NTT Open Source Software Center
Hello, This is a new version of the patch formerly known as 'alternative storage for libpq'. - Changed the concept to 'Alternative Row Processor' from 'Storage handler'. Symbol names are also changed. - Callback function is modified following to the comment. - From the restriction of time, I did minimum check for this patch. The purpose of this patch is to show the new implement. - Proformance is not measured for this patch for the same reason. I will do that on next monday. - The meaning of PGresAttValue is changed. The field 'value' now contains a value withOUT terminating zero. This change seemsto have no effect on any other portion within the whole source tree of postgresql from what I've seen. > > I would like to propose better one-shot API with: > > > > void *(*RowStoreHandler)(PGresult *res, PGresAttValue *columns); ... > > 1) Pass-through processing do not need to care about unnecessary > > per-row allocations. > > > > 2) Handlers that want to copy of the row (like regular libpq), > > can optimize allocations by having "global" view of the row. > > (Eg. One allocation for row header + data). I expect the new implementation is far more better than the orignal. regargs, -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1af8df6..c47af3a 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,3 +160,6 @@ PQconnectStartParams 157PQping 158PQpingParams 159PQlibVersion 160 +PQregisterRowProcessor 161 +PQgetRowProcessorParam 163 +PQsetRowProcessorErrMes 164 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index d454538..93803d5 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2692,6 +2692,7 @@ makeEmptyPGconn(void) conn->allow_ssl_try = true; conn->wait_ssl_try = false;#endif + conn->rowProcessor = NULL; /* * We try to send at least 8K at a time, which is the usual size of pipe @@ -5076,3 +5077,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler) return prev;} + +void +PQregisterRowProcessor(PGconn *conn, RowProcessor func, void *param) +{ + conn->rowProcessor = func; + conn->rowProcessorParam = param; +} diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index b743566..5d78b39 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -66,7 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target);static int check_field_number(const PGresult *res, int field_num); - +static void *pqAddTuple(PGresult *res, PGresAttValue *columns);/* ---------------- * Space management for PGresult. @@ -160,6 +160,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) result->curBlock = NULL; result->curOffset= 0; result->spaceLeft = 0; + result->rowProcessor = pqAddTuple; + result->rowProcessorParam = NULL; + result->rowProcessorErrMes = NULL; if (conn) { @@ -194,6 +197,12 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) } result->nEvents = conn->nEvents; } + + if (conn->rowProcessor) + { + result->rowProcessor = conn->rowProcessor; + result->rowProcessorParam = conn->rowProcessorParam; + } } else { @@ -445,7 +454,7 @@ PQsetvalue(PGresult *res, int tup_num, int field_num, char *value, int len) } /* add itto the array */ - if (!pqAddTuple(res, tup)) + if (pqAddTuple(res, tup) == NULL) return FALSE; } @@ -701,7 +710,6 @@ pqClearAsyncResult(PGconn *conn) if (conn->result) PQclear(conn->result); conn->result =NULL; - conn->curTuple = NULL;}/* @@ -756,7 +764,6 @@ pqPrepareAsyncResult(PGconn *conn) */ res = conn->result; conn->result = NULL; /* handingover ownership to caller */ - conn->curTuple = NULL; /* just in case */ if (!res) res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); else @@ -829,12 +836,17 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* * pqAddTuple - * add a row pointer to the PGresult structure, growing it if necessary - * Returns TRUE if OK, FALSE if not enough memory to add the row + * add a row to the PGresult structure, growing it if necessary + * Returns the pointer to the new tuple if OK, NULL if not enough + * memory to add the row. */ -int -pqAddTuple(PGresult *res, PGresAttValue *tup) +void * +pqAddTuple(PGresult *res, PGresAttValue *columns){ + PGresAttValue *tup; + int nfields = res->numAttributes; + int i; + if (res->ntups >= res->tupArrSize) { /* @@ -858,13 +870,39 @@ pqAddTuple(PGresult *res, PGresAttValue *tup) newTuples = (PGresAttValue **) realloc(res->tuples, newSize * sizeof(PGresAttValue *)); if (!newTuples) - return FALSE; /* malloc or realloc failed */ + return NULL; /* malloc or realloc failed */ res->tupArrSize = newSize; res->tuples = newTuples; } + + tup = (PGresAttValue *) + pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE); + if (tup == NULL) return NULL; + memcpy(tup, columns, nfields * sizeof(PGresAttValue)); + + for (i = 0 ; i < nfields ; i++) + { + tup[i].len = columns[i].len; + if (tup[i].len == NULL_LEN) + { + tup[i].value = res->null_field; + } + else + { + bool isbinary = (res->attDescs[i].format != 0); + tup[i].value = + (char *)pqResultAlloc(res, tup[i].len + 1, isbinary); + if (tup[i].value == NULL) + return NULL; + memcpy(tup[i].value, columns[i].value, tup[i].len); + /* We have to terminate this ourselves */ + tup[i].value[tup[i].len] = '\0'; + } + } + res->tuples[res->ntups] = tup; res->ntups++; - return TRUE; + return tup;}/* @@ -1223,7 +1261,6 @@ PQsendQueryStart(PGconn *conn) /* initialize async result-accumulation state */ conn->result= NULL; - conn->curTuple = NULL; /* ready to send command message */ return true; @@ -2822,6 +2859,35 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num) return 0;} +/* PQgetAddRowProcessorParam + * Get the pointer to the contextual parameter from PGresult which is + * registered to PGconn by PQregisterRowProcessor + */ +void * +PQgetRowProcessorParam(const PGresult *res) +{ + if (!res) + return NULL; + return res->rowProcessorParam; +} + +/* PQsetRowProcessorErrMes + * Set the error message pass back to the caller of RowProcessor. + * + * mes must be a malloc'ed memory block and it will be released by + * the caller of RowProcessor. You can replace the previous message + * by alternative mes, or clear it with NULL. The previous one will + * be freed internally. + */ +void +PQsetRowProcessorErrMes(PGresult *res, char *mes) +{ + /* Free existing message */ + if (res->rowProcessorErrMes) + free(res->rowProcessorErrMes); + res->rowProcessorErrMes = mes; +} +/* PQnparams: * returns the number of input parameters of a prepared statement. */ diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index ce0eac3..546534a 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)}/* + * pqGetnchar: + * skip len bytes in input buffer. + */ +int +pqSkipnchar(size_t len, PGconn *conn) +{ + if (len > (size_t) (conn->inEnd - conn->inCursor)) + return EOF; + + conn->inCursor += len; + + if (conn->Pfdebug) + fprintf(conn->Pfdebug, "From backend (%lu skipped)\n", + (unsigned long) len); + + return 0; +} + +/* * pqPutnchar: * write exactly len bytes to the current message */ @@ -238,6 +257,7 @@ pqPutnchar(const char *s, size_t len, PGconn *conn) return 0;} +/* * pqGetInt * read a 2 or 4 byte integer and convert from network byte order diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index a7c3899..9abbb29 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -715,7 +715,7 @@ getAnotherTuple(PGconn *conn, bool binary){ PGresult *result = conn->result; int nfields= result->numAttributes; - PGresAttValue *tup; + PGresAttValue tup[result->numAttributes]; /* the backend sends us a bitmap of which attributes are null */ char std_bitmap[64]; /* used unless it doesn't fit */ @@ -729,26 +729,11 @@ getAnotherTuple(PGconn *conn, bool binary) result->binary = binary; - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) + if (binary) { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - - /* - * If it's binary, fix the column format indicators. We assume the - * backend will consistently send either B or D, not a mix. - */ - if (binary) - { - for (i = 0; i < nfields; i++) - result->attDescs[i].format = 1; - } + for (i = 0; i < nfields; i++) + result->attDescs[i].format = 1; } - tup = conn->curTuple; /* Get the null-value bitmap */ nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE; @@ -757,7 +742,7 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) - goto outOfMemory; + goto rowProcessError; } if (pqGetnchar(bitmap, nbytes, conn)) @@ -785,19 +770,17 @@ getAnotherTuple(PGconn *conn, bool binary) vlen = vlen - 4; if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); - if (tup[i].value == NULL) - goto outOfMemory; - } + + /* + * Buffer content may be shifted on reloading data. So we must + * set the pointer to the value on every scan. + */ + tup[i].value = conn->inBuffer + conn->inCursor; tup[i].len = vlen; - /* read in the value */ + /* Skip the value */ if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) + if (pqSkipnchar(vlen, conn)) goto EOFexit; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; } /* advance the bitmap stuff */ bitcnt++; @@ -812,16 +795,15 @@ getAnotherTuple(PGconn *conn, bool binary) } /* Success! Store the completed tuple in the result*/ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; + if (!result->rowProcessor(result, tup)) + goto rowProcessError; if (bitmap != std_bitmap) free(bitmap); return 0; -outOfMemory: +rowProcessError: + /* Replace partially constructed result with an error result */ /* @@ -829,8 +811,21 @@ outOfMemory: * there's not enough memory to concatenate messages... */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->rowProcessorErrMes ? + result->rowProcessorErrMes : + "out of memory for query result\n")); + if (result->rowProcessorErrMes) + { + free(result->rowProcessorErrMes); + result->rowProcessorErrMes = NULL; + } /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 892dcbc..18342c7 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -625,22 +625,12 @@ getAnotherTuple(PGconn *conn, int msgLength){ PGresult *result = conn->result; int nfields = result->numAttributes; - PGresAttValue *tup; + PGresAttValue tup[result->numAttributes]; int tupnfields; /* # fields from tuple */ int vlen; /* length of the current field value */ int i; /* Allocate tuple space if firsttime for this data message */ - if (conn->curTuple == NULL) - { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - } - tup = conn->curTuple; - /* Get the field count and make sure it's what we expect */ if (pqGetInt(&tupnfields, 2, conn)) return EOF; @@ -671,40 +661,46 @@ getAnotherTuple(PGconn *conn, int msgLength) } if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - bool isbinary = (result->attDescs[i].format != 0); - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ + /* + * Buffer content may be shifted on reloading data. So we must + * set the pointer to the value every scan. + */ + tup[i].value = conn->inBuffer + conn->inCursor; + tup[i].len = vlen; if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) + if (pqSkipnchar(vlen, conn)) return EOF; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; } /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; - + if (!result->rowProcessor(result, tup)) + goto rowProcessError; + return 0; -outOfMemory: +rowProcessError: /* * Replace partially constructed result with an error result. First * discard the old resultto try to win back some memory. */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->rowProcessorErrMes ? + result->rowProcessorErrMes : + "out of memory for query result\n")); + if (result->rowProcessorErrMes) + { + free(result->rowProcessorErrMes); + result->rowProcessorErrMes = NULL; + } pqSaveErrorResult(conn); /* Discard the failed message by pretending we read it */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index ef26ab9..0931211 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -149,6 +149,15 @@ typedef struct pgNotify struct pgNotify *next; /* list link */} PGnotify; +/* PGresAttValue represents a value of one tuple field in string form. + NULL is represented as len < 0. Otherwise value points to a string + without null termination of the length of len. */ +typedef struct pgresAttValue +{ + int len; /* length in bytes of the value */ + char *value; /* actual value, without null termination */ +} PGresAttValue; +/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message); @@ -416,6 +425,31 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const * keywords, const char *const * values, int expand_dbname); +/* + * Typedef for alternative row processor. + * + * This function must return non-NULL value for success and must + * return NULL for failure and may set error message by + * PQsetRowProcessorErrMes. It is assumed by caller as out of memory + * when the error message is NULL on failure. This function is assumed + * not to throw any exception. + */ +typedef void *(*RowProcessor)(PGresult *res, PGresAttValue *columns); + +/* + * Register alternative result store function to PGconn. + * + * By registering this function, pg_result disables its own result + * store and calls it for rows one by one. + * + * func is row processor function. See the typedef RowProcessor. + * + * rowProcessorParam is the contextual variable that can be get with + * PQgetRowProcessorParam in RowProcessor. + */ +extern void PQregisterRowProcessor(PGconn *conn, RowProcessor func, + void *rowProcessorParam); +/* Force the write buffer to be written (or at least try) */extern int PQflush(PGconn *conn); @@ -454,6 +488,8 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num, intfield_num);extern int PQgetlength(const PGresult *res, int tup_num, int field_num);extern int PQgetisnull(constPGresult *res, int tup_num, int field_num); +extern void *PQgetRowProcessorParam(const PGresult *res); +extern void PQsetRowProcessorErrMes(PGresult *res, char *mes);extern int PQnparams(const PGresult *res);extern Oid PQparamtype(const PGresult *res, int param_num); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index d967d60..51ac927 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -134,12 +134,6 @@ typedef struct pgresParamDesc#define NULL_LEN (-1) /* pg_result len for NULL value */ -typedef struct pgresAttValue -{ - int len; /* length in bytes of the value */ - char *value; /* actual value, plus terminating zero byte */ -} PGresAttValue; -/* Typedef for message-field list entries */typedef struct pgMessageField{ @@ -209,6 +203,11 @@ struct pg_result PGresult_data *curBlock; /* most recently allocated block */ int curOffset; /* start offset of free space in block */ int spaceLeft; /* number of free bytesremaining in block */ + + RowProcessor rowProcessor; /* Result row processor handler. See + * RowProcessor for details. */ + void *rowProcessorParam; /* Contextual parameter for rowProcessor */ + char *rowProcessorErrMes; /* Error message from rowProcessor */};/* PGAsyncStatusType defines the state of the query-executionstate machine */ @@ -398,7 +397,6 @@ struct pg_conn /* Status for asynchronous result construction */ PGresult *result; /* result being constructed */ - PGresAttValue *curTuple; /* tuple currently being read */#ifdef USE_SSL bool allow_ssl_try; /* Allowedto try SSL negotiation */ @@ -443,6 +441,13 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer;/* expansible string */ + + /* Tuple store handler. The two fields below is copied to newly + * created PGresult if rowProcessor is not NULL. Use default + * function if NULL. */ + RowProcessor rowProcessor; /* Result row processor. See + * RowProcessor for details. */ + void *rowProcessorParam; /* Contextual parameter for rowProcessor */};/* PGcancel stores all data necessary to cancela connection. A copy of this @@ -507,7 +512,6 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); -extern int pqAddTuple(PGresult *res, PGresAttValue *tup);extern void pqSaveMessageField(PGresult *res, char code, const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, @@ -560,6 +564,7 @@ extern int pqGets(PQExpBuffer buf, PGconn *conn);extern int pqGets_append(PQExpBuffer buf, PGconn*conn);extern int pqPuts(const char *s, PGconn *conn);extern int pqGetnchar(char *s, size_t len, PGconn *conn); +extern int pqSkipnchar(size_t len, PGconn *conn);extern int pqPutnchar(const char *s, size_t len, PGconn *conn);externint pqGetInt(int *result, size_t bytes, PGconn *conn);extern int pqPutInt(int value, size_t bytes, PGconn*conn); diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 72c9384..9ad3bfd 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -7233,6 +7233,215 @@ int PQisthreadsafe(); </sect1> + <sect1 id="libpq-alterrowprocessor"> + <title>Alternative row processor</title> + + <indexterm zone="libpq-alterrowprocessor"> + <primary>PGresult</primary> + <secondary>PGconn</secondary> + </indexterm> + + <para> + As the standard usage, users can get the result of command + execution from <structname>PGresult</structname> aquired + with <function>PGgetResult</function> + from <structname>PGConn</structname>. While the memory areas for + the PGresult are allocated with malloc() internally within calls of + command execution functions such as <function>PQexec</function> + and <function>PQgetResult</function>. If you have difficulties to + handle the result records in the form of PGresult, you can instruct + PGconn to pass every row to your own row processor instead of + storing into PGresult. + </para> + + <variablelist> + <varlistentry id="libpq-registerrowprocessor"> + <term> + <function>PQregisterRowProcessor</function> + <indexterm> + <primary>PQregisterRowProcessor</primary> + </indexterm> + </term> + + <listitem> + <para> + Sets a callback function to process each row. +<synopsis> +void PQregisterRowProcessor(PGconn *conn, + RowProcessor func, + void *param); +</synopsis> + </para> + + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object to set the storage handler + function. PGresult created from this connection calls this + function to process each row. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + Storage handler function to set. NULL means to use the + default processor. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed + to <parameter>func</parameter>. You can get this pointer + in <type>RowProcessor</type> + by <function>PQgetRowProcessorParam</function>. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-rowprocessor"> + <term> + <type>RowProcessor</type> + <indexterm> + <primary>RowProcessor</primary> + </indexterm> + </term> + + <listitem> + <para> + The type for the row processor callback function. +<synopsis> +void *(*RowProcessor)(PGresult *res, + PGresAttValue *columns); +</synopsis> + </para> + + <para> + Generally this function must return NULL for failure and should + set the error message + with <function>PGsetRowProcessorErrMes</function> if the cause + is other than out of memory. This funcion must not throw any + exception. + </para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>columns</parameter></term> + <listitem> + <para> + An column values of the row to process. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqgetrowprocessorparam"> + <term> + <function>PQgetRowProcessorParam</function> + <indexterm> + <primary>PQgetRowProcessorParam</primary> + </indexterm> + </term> + <listitem> + <para> + Get the pointer passed to <function>PQregisterRowProcessor</function> + as <parameter>param</parameter>. +<synopsis> +void *PQgetRowProcessorParam(PGresult *res) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqsetrowprocessorerrmes"> + <term> + <function>PQsetRowProcessorErrMes</function> + <indexterm> + <primary>PQsetRowProcessorErrMes</primary> + </indexterm> + </term> + <listitem> + <para> + Set the message for the error occurred + in <type>RowProcessor</type>. If this message is not set, the + caller assumes the error to be out of memory. +<synopsis> +void PQsetRowProcessorErrMes(PGresult *res, char *mes) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object + passed to <type>RowProcessor</type>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>mes</parameter></term> + <listitem> + <para> + A pointer to the memory block containing the error message, + which is allocated by <function>malloc()</function>. The + memory block will be freed with <function>free()</function> in + the caller of <type>RowProcessor</type> only if it returns NULL. + </para> + <para> + If <parameter>res</parameter> already has a message previously + set, it is freed and then the given message is set. Set NULL + to cancel the the costom message. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + </sect1> + + <sect1 id="libpq-build"> <title>Building <application>libpq</application> Programs</title> diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 36a8e3e..195ad21 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,11 +63,23 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor*/} remoteConn; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + MemoryContext oldcontext; + AttInMetadata *attinmeta; + char** valbuf; + int *valbuflen; + bool error_occurred; + bool nummismatch; + ErrorData *edata; +} storeInfo; +/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); -static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn); @@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); +static void finishStoreInfo(storeInfo *sinfo); +static void *storeHandler(PGresult *res, PGresAttValue *columns); +/* Global */static remoteConn *pconn = NULL; @@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; @@ -559,15 +576,36 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterRowProcessor(conn, storeHandler, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even thoughwe are still in a short-lived * memory context. */ res = PQexec(conn, buf.data); + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + dblink_res_error(conname, res, "could not fetch from cursor", fail); return (Datum) 0; } @@ -579,8 +617,8 @@ dblink_fetch(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} @@ -640,6 +678,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo,ReturnSetInfo)) @@ -715,164 +754,205 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc= NULL; + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterRowProcessor(conn, storeHandler, &storeinfo); + /* synchronous query, or async result retrieval */ if (!is_async) res = PQexec(conn, sql); else - { res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; - } - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + finishStoreInfo(&storeinfo); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} -/* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */static void -materializeResult(FunctionCallInfo fcinfo, PGresult *res) +initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; - - Assert(rsinfo->returnMode == SFRM_Materialize); - - PG_TRY(); + TupleDesc tupdesc; + int i; + + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) { - TupleDesc tupdesc; - bool is_sql_cmd = false; - int ntuples; - int nfields; + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + sinfo->oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->edata = NULL; + sinfo->nattrs = tupdesc->natts; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + + /* Preallocate memory of same size with c string array for values. */ + sinfo->valbuf = (char **) malloc(sinfo->nattrs * sizeof(char*)); + sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int)); + for (i = 0 ; i < sinfo->nattrs ; i++) + { + sinfo->valbuf[i] = NULL; + sinfo->valbuflen[i] = -1; + } - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; +} - is_sql_cmd = false; +static void +finishStoreInfo(storeInfo *sinfo) +{ + int i; - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + if (sinfo->valbuf) + { + for (i = 0 ; i < sinfo->nattrs ; i++) + { + if (sinfo->valbuf[i]) { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; + free(sinfo->valbuf[i]); + sinfo->valbuf[i] = NULL; } - - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); } + free(sinfo->valbuf); + sinfo->valbuf = NULL; + } - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); - - if (ntuples > 0) - { - AttInMetadata *attinmeta; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; - - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); + if (sinfo->valbuflen) + { + free(sinfo->valbuflen); + sinfo->valbuflen = NULL; + } + MemoryContextSwitchTo(sinfo->oldcontext); +} - values = (char **) palloc(nfields * sizeof(char *)); +static void * +storeHandler(PGresult *res, PGresAttValue *columns) +{ + storeInfo *sinfo = (storeInfo *)PQgetRowProcessorParam(res); + HeapTuple tuple; + int fields = PQnfields(res); + int i; + char *cstrs[PQnfields(res)]; - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) - { - HeapTuple tuple; + if (sinfo->error_occurred) + return NULL; - if (!is_sql_cmd) - { - int i; + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + finishStoreInfo(sinfo); - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else - { - values[0] = PQcmdStatus(res); - } + /* This error will be processed in + * dblink_record_internal(). So do not set error message + * here. */ + return NULL; + } - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); + /* + * value input functions assumes that the value string is + * terminated by zero. We should make the values to be so. + */ + for(i = 0 ; i < fields ; i++) + { + int len = columns[i].len; + if (len < 0) + cstrs[i] = NULL; + else + { + if (sinfo->valbuf[i] == NULL) + { + sinfo->valbuf[i] = (char *)malloc(len + 1); + sinfo->valbuflen[i] = len + 1; + } + else if (sinfo->valbuflen[i] < len + 1) + { + sinfo->valbuf[i] = (char *)realloc(sinfo->valbuf[i], len + 1); + sinfo->valbuflen[i] = len + 1; } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); + cstrs[i] = sinfo->valbuf[i]; + memcpy(cstrs[i], columns[i].value, len); + cstrs[i][len] = '\0'; } + } - PQclear(res); + PG_TRY(); + { + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); } PG_CATCH(); { - /* be sure to release the libpq result */ - PQclear(res); - PG_RE_THROW(); + MemoryContext context; + /* + * Store exception for later ReThrow and cancel the exception. + */ + sinfo->error_occurred = TRUE; + context = MemoryContextSwitchTo(sinfo->oldcontext); + sinfo->edata = CopyErrorData(); + MemoryContextSwitchTo(context); + FlushErrorState(); + + return NULL; } PG_END_TRY(); + + return columns;}/*
On Fri, Jan 27, 2012 at 2:57 AM, Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp> wrote: > Hello, This is a new version of the patch formerly known as > 'alternative storage for libpq'. I took a quick look at the patch and the docs. Looks good and agree with rationale and implementation. I see you covered the pqsetvalue case which is nice. I expect libpq C api clients coded for performance will immediately gravitate to this api. > - The meaning of PGresAttValue is changed. The field 'value' now > contains a value withOUT terminating zero. This change seems to > have no effect on any other portion within the whole source > tree of postgresql from what I've seen. This is a minor point of concern. This function was exposed to support libpqtypes (which your stuff compliments very nicely by the way) and I quickly confirmed removal of the null terminator didn't cause any problems there. I doubt anyone else is inspecting the structure directly (also searched the archives and didn't find anything). This needs to be advertised very loudly in the docs -- I understand why this was done but it's a pretty big change in the way the api works. merlin
On Fri, Jan 27, 2012 at 05:57:01PM +0900, Kyotaro HORIGUCHI wrote: > Hello, This is a new version of the patch formerly known as > 'alternative storage for libpq'. > > - Changed the concept to 'Alternative Row Processor' from > 'Storage handler'. Symbol names are also changed. > > - Callback function is modified following to the comment. > > - From the restriction of time, I did minimum check for this > patch. The purpose of this patch is to show the new implement. > > - Proformance is not measured for this patch for the same > reason. I will do that on next monday. > > - The meaning of PGresAttValue is changed. The field 'value' now > contains a value withOUT terminating zero. This change seems to > have no effect on any other portion within the whole source > tree of postgresql from what I've seen. I think we have general structure in place. Good. Minor notes: = rowhandler api = * It returns bool, so void* is wrong. Instead libpq style is to use int, with 1=OK, 0=Failure. Seems that was also oldpqAddTuple() convention. * Drop PQgetRowProcessorParam(), instead give param as argument. * PQsetRowProcessorErrMes() should strdup() the message. That gets rid of allocator requirements in API. This also makessafe to pass static strings there. If strdup() fails, fall back to generic no-mem message. * Create new struct to replace PGresAttValue for rowhandler usage. RowHandler API is pretty unique and self-contained. Itshould have it's own struct. Main reason is that it allows to properly document it. Otherwise the minor details get lostas they are different from libpq-internal usage. Also this allows two structs to be improved separately. (PGresRawValue?) * Stop storing null_value into ->value. It's libpq internal detail. Instead the ->value should always point into bufferwhere the value info is located, even for NULL. This makes safe to simply subtract pointers to get row size estimate.Seems pqAddTuple() already does null_value logic, so no need to do it in rowhandler api. = libpq = Currently its confusing whether rowProcessor can be NULL, and what should be done if so. I think its better to fix usage so that it is always set. * PQregisterRowProcessor() should use default func if func==NULL. and set default handler if so. * Never set rowProcessor directly, always via PQregisterRowProcessor() * Drop all if(rowProcessor) checks. = dblink = * There are malloc failure checks missing in initStoreInfo() & storeHandler(). -- marko PS. You did not hear it from me, but most raw values are actually nul-terminated in protocol. Think big-endian. And those which are not, you can make so, as the data is not touched anymore. You cannot do it for last value, as next byte may not be allocated. But you could memmove() it lower address so you can null-terminate. I'm not suggesting it for official patch, but it would be fun to know if such hack is benchmarkable, and benchmarkable on realistic load.
On Fri, Jan 27, 2012 at 09:35:04AM -0600, Merlin Moncure wrote: > On Fri, Jan 27, 2012 at 2:57 AM, Kyotaro HORIGUCHI > > - The meaning of PGresAttValue is changed. The field 'value' now > > contains a value withOUT terminating zero. This change seems to > > have no effect on any other portion within the whole source > > tree of postgresql from what I've seen. > > This is a minor point of concern. This function was exposed to > support libpqtypes (which your stuff compliments very nicely by the > way) and I quickly confirmed removal of the null terminator didn't > cause any problems there. I doubt anyone else is inspecting the > structure directly (also searched the archives and didn't find > anything). > > This needs to be advertised very loudly in the docs -- I understand > why this was done but it's a pretty big change in the way the api > works. Note that the non-NUL-terminated PGresAttValue is only used for row handler. So no existing usage is affected. But I agree using same struct in different situations is confusing, thus the request for separate struct for row handler usage. -- marko
Thank you for comments, this is revised version of the patch. The gain of performance is more than expected. Measure script now does query via dblink ten times for stability of measuring, so the figures become about ten times longer than the previous ones. sec % to Original Original : 31.5 100.0% RowProcessor patch : 31.3 99.4% dblink patch : 24.6 78.1% RowProcessor patch alone makes no loss or very-little gain, and full patch gives us 22% gain for the benchmark(*1). The modifications are listed below. - No more use of PGresAttValue for this mechanism, and added PGrowValue instead. PGresAttValue has been put back to libpq-int.h - pqAddTuple() is restored as original and new function paAddRow() to use as RowProcessor. (Previous pqAddTuple implementhad been buggily mixed the two usage of PGresAttValue) - PQgetRowProcessorParam has been dropped. Contextual parameter is passed as one of the parameters of RowProcessor(). - RowProcessor() returns int (as bool, is that libpq convension?) instead of void *. (Actually, void * had already becomeuseless as of previous patch) - PQsetRowProcessorErrMes() is changed to do strdup internally. - The callers of RowProcessor() no more set null_field to PGrowValue.value. Plus, the PGrowValue[] which RowProcessor() receiveshas nfields + 1 elements to be able to make rough estimate by cols->value[nfields].value - cols->value[0].value -something. The somthing here is 4 * nfields for protocol3 and 4 * (non-null fields) for protocol2. I fear that this appliesonly for textual transfer usage... - PQregisterRowProcessor() sets the default handler when given NULL. (pg_conn|pg_result).rowProcessor cannot be NULL forits lifetime. - initStoreInfo() and storeHandler() has been provided with malloc error handling. And more.. - getAnotherTuple()@fe-protocol2.c is not tested utterly. - The uniformity of the size of columns in the test data prevents realloc from execution in dblink... More test should bedone. regards, ===== (*1) The benchmark is done as follows, ==test.sql select dblink_connect('c', 'host=localhost dbname=test'); select * from dblink('c', 'select a,c from foo limit 2000000') as (a text b bytea) limit 1; ...(repeat 9 times more) select dblink_disconnect('c'); == $ for i in $(seq 1 10); do time psql test -f t.sql; done The environment is CentOS 6.2 on VirtualBox on Core i7 965 3.2GHz # of processor 1 Allocated mem 2GB Test DB schema is Column | Type | Modifiers --------+-------+----------- a | text | b | text | c | bytea | Indexes: "foo_a_bt" btree (a) "foo_c_bt" btree (c) test=# select count(*), min(length(a)) as a_min, max(length(a)) as a_max, min(length(c)) as c_min,max(length(c)) as c_max from foo; count | a_min | a_max | c_min | c_max ---------+-------+-------+-------+-------2000000 | 29 | 29 | 29 | 29 (1 row) -- Kyotaro Horiguchi NTT Open Source Software Center diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt index 1af8df6..5ed083c 100644 --- a/src/interfaces/libpq/exports.txt +++ b/src/interfaces/libpq/exports.txt @@ -160,3 +160,5 @@ PQconnectStartParams 157PQping 158PQpingParams 159PQlibVersion 160 +PQregisterRowProcessor 161 +PQsetRowProcessorErrMes 162 diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index d454538..4fe2f41 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -2692,6 +2692,8 @@ makeEmptyPGconn(void) conn->allow_ssl_try = true; conn->wait_ssl_try = false;#endif + conn->rowProcessor = pqAddRow; + conn->rowProcessorParam = NULL; /* * We try to send at least 8K at a time, which is the usual size of pipe @@ -5076,3 +5078,10 @@ PQregisterThreadLock(pgthreadlock_t newhandler) return prev;} + +void +PQregisterRowProcessor(PGconn *conn, RowProcessor func, void *param) +{ + conn->rowProcessor = (func ? func : pqAddRow); + conn->rowProcessorParam = param; +} diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index b743566..82914fd 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -66,6 +66,7 @@ static PGresult *PQexecFinish(PGconn *conn);static int PQsendDescribe(PGconn *conn, char desc_type, const char *desc_target);static int check_field_number(const PGresult *res, int field_num); +static int pqAddTuple(PGresult *res, PGresAttValue *tup);/* ---------------- @@ -160,6 +161,9 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) result->curBlock = NULL; result->curOffset= 0; result->spaceLeft = 0; + result->rowProcessor = pqAddRow; + result->rowProcessorParam = NULL; + result->rowProcessorErrMes = NULL; if (conn) { @@ -194,6 +198,10 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status) } result->nEvents = conn->nEvents; } + + /* copy row processor settings */ + result->rowProcessor = conn->rowProcessor; + result->rowProcessorParam = conn->rowProcessorParam; } else { @@ -701,7 +709,6 @@ pqClearAsyncResult(PGconn *conn) if (conn->result) PQclear(conn->result); conn->result =NULL; - conn->curTuple = NULL;}/* @@ -756,7 +763,6 @@ pqPrepareAsyncResult(PGconn *conn) */ res = conn->result; conn->result = NULL; /* handingover ownership to caller */ - conn->curTuple = NULL; /* just in case */ if (!res) res = PQmakeEmptyPGresult(conn, PGRES_FATAL_ERROR); else @@ -828,9 +834,52 @@ pqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)}/* + * pqAddRow + * add a row to the PGresult structure, growing it if necessary + * Returns TRUE if OK, FALSE if not enough memory to add the row. + */ +int +pqAddRow(PGresult *res, void *param, PGrowValue *columns) +{ + PGresAttValue *tup; + int nfields = res->numAttributes; + int i; + + tup = (PGresAttValue *) + pqResultAlloc(res, nfields * sizeof(PGresAttValue), TRUE); + if (tup == NULL) return FALSE; + + memcpy(tup, columns, nfields * sizeof(PGresAttValue)); + + for (i = 0 ; i < nfields ; i++) + { + tup[i].len = columns[i].len; + if (tup[i].len == NULL_LEN) + { + tup[i].value = res->null_field; + } + else + { + bool isbinary = (res->attDescs[i].format != 0); + tup[i].value = + (char *)pqResultAlloc(res, tup[i].len + 1, isbinary); + if (tup[i].value == NULL) + return FALSE; + + memcpy(tup[i].value, columns[i].value, tup[i].len); + /* We have to terminate this ourselves */ + tup[i].value[tup[i].len] = '\0'; + } + } + + return pqAddTuple(res, tup); +} + +/* * pqAddTuple - * add a row pointer to the PGresult structure, growing it if necessary - * Returns TRUE if OK, FALSE if not enough memory to add the row + * add a row POINTER to the PGresult structure, growing it if + * necessary Returns TRUE if OK, FALSE if not enough memory to add + * the row. */intpqAddTuple(PGresult *res, PGresAttValue *tup) @@ -1223,7 +1272,6 @@ PQsendQueryStart(PGconn *conn) /* initialize async result-accumulation state */ conn->result= NULL; - conn->curTuple = NULL; /* ready to send command message */ return true; @@ -2822,6 +2870,30 @@ PQgetisnull(const PGresult *res, int tup_num, int field_num) return 0;} +/* PQsetRowProcessorErrMes + * Set the error message pass back to the caller of RowProcessor. + * + * You can replace the previous message by alternative mes, or clear + * it with NULL. + */ +void +PQsetRowProcessorErrMes(PGresult *res, char *mes) +{ + /* Free existing message */ + if (res->rowProcessorErrMes) + free(res->rowProcessorErrMes); + + /* + * Set the duped message if mes is not NULL. Failure of strdup + * will be handled as 'Out of memory' by the caller of the + * RowProcessor. + */ + if (mes) + res->rowProcessorErrMes = strdup(mes); + else + res->rowProcessorErrMes = NULL; +} +/* PQnparams: * returns the number of input parameters of a prepared statement. */ diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c index ce0eac3..d11cb3c 100644 --- a/src/interfaces/libpq/fe-misc.c +++ b/src/interfaces/libpq/fe-misc.c @@ -219,6 +219,25 @@ pqGetnchar(char *s, size_t len, PGconn *conn)}/* + * pqGetnchar: + * skip len bytes in input buffer. + */ +int +pqSkipnchar(size_t len, PGconn *conn) +{ + if (len > (size_t) (conn->inEnd - conn->inCursor)) + return EOF; + + conn->inCursor += len; + + if (conn->Pfdebug) + fprintf(conn->Pfdebug, "From backend (%lu skipped)\n", + (unsigned long) len); + + return 0; +} + +/* * pqPutnchar: * write exactly len bytes to the current message */ diff --git a/src/interfaces/libpq/fe-protocol2.c b/src/interfaces/libpq/fe-protocol2.c index a7c3899..496c42e 100644 --- a/src/interfaces/libpq/fe-protocol2.c +++ b/src/interfaces/libpq/fe-protocol2.c @@ -715,7 +715,7 @@ getAnotherTuple(PGconn *conn, bool binary){ PGresult *result = conn->result; int nfields= result->numAttributes; - PGresAttValue *tup; + PGrowValue rowval[result->numAttributes + 1]; /* the backend sends us a bitmap of which attributes are null */ char std_bitmap[64]; /* used unless it doesn't fit */ @@ -729,26 +729,11 @@ getAnotherTuple(PGconn *conn, bool binary) result->binary = binary; - /* Allocate tuple space if first time for this data message */ - if (conn->curTuple == NULL) + if (binary) { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - - /* - * If it's binary, fix the column format indicators. We assume the - * backend will consistently send either B or D, not a mix. - */ - if (binary) - { - for (i = 0; i < nfields; i++) - result->attDescs[i].format = 1; - } + for (i = 0; i < nfields; i++) + result->attDescs[i].format = 1; } - tup = conn->curTuple; /* Get the null-value bitmap */ nbytes = (nfields + BITS_PER_BYTE - 1) / BITS_PER_BYTE; @@ -757,7 +742,7 @@ getAnotherTuple(PGconn *conn, bool binary) { bitmap = (char *) malloc(nbytes); if (!bitmap) - goto outOfMemory; + goto rowProcessError; } if (pqGetnchar(bitmap, nbytes, conn)) @@ -771,34 +756,31 @@ getAnotherTuple(PGconn *conn, bool binary) for (i = 0; i < nfields; i++) { if (!(bmap& 0200)) - { - /* if the field value is absent, make it a null string */ - tup[i].value = result->null_field; - tup[i].len = NULL_LEN; - } + vlen = NULL_LEN; + else if (pqGetInt(&vlen, 4, conn)) + goto EOFexit; else { - /* get the value length (the first four bytes are for length) */ - if (pqGetInt(&vlen, 4, conn)) - goto EOFexit; if (!binary) vlen = vlen - 4; if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, binary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ - if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - goto EOFexit; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; } + + /* + * Buffer content may be shifted on reloading additional + * data. So we must set all pointers on every scan. + * + * rowval[i].value always points to the next address of the + * length field even if the value length is zero or the value + * is NULL for the access safety. + */ + rowval[i].value = conn->inBuffer + conn->inCursor; + rowval[i].len = vlen; + /* Skip the value */ + if (vlen > 0 && pqSkipnchar(vlen, conn)) + goto EOFexit; + /* advance the bitmap stuff */ bitcnt++; if (bitcnt == BITS_PER_BYTE) @@ -811,17 +793,33 @@ getAnotherTuple(PGconn *conn, bool binary) bmap <<= 1; } - /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; + /* + * Set rowval[nfields] for the access safety. We can estimate the + * length of the buffer to store by + * + * rowval[nfields].value - rowval[0].value - 4 * (# of non-nulls)). + */ + rowval[nfields].value = conn->inBuffer + conn->inCursor; + rowval[nfields].len = NULL_LEN; + + /* Success! Pass the completed row values to rowProcessor */ + if (!result->rowProcessor(result, result->rowProcessorParam, rowval)) + goto rowProcessError; + + /* Free garbage message. */ + if (result->rowProcessorErrMes) + { + free(result->rowProcessorErrMes); + result->rowProcessorErrMes = NULL; + } if (bitmap != std_bitmap) free(bitmap); + return 0; -outOfMemory: +rowProcessError: + /* Replace partially constructed result with an error result */ /* @@ -829,8 +827,21 @@ outOfMemory: * there's not enough memory to concatenate messages... */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from RowProcessor, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->rowProcessorErrMes ? + result->rowProcessorErrMes : + "out of memory for query result\n")); + if (result->rowProcessorErrMes) + { + free(result->rowProcessorErrMes); + result->rowProcessorErrMes = NULL; + } /* * XXX: if PQmakeEmptyPGresult() fails, there's probably not much we can diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 892dcbc..b7c6118 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -625,22 +625,12 @@ getAnotherTuple(PGconn *conn, int msgLength){ PGresult *result = conn->result; int nfields = result->numAttributes; - PGresAttValue *tup; + PGrowValue rowval[result->numAttributes + 1]; int tupnfields; /* # fields from tuple */ int vlen; /* length of the current field value */ int i; /* Allocate tuple space iffirst time for this data message */ - if (conn->curTuple == NULL) - { - conn->curTuple = (PGresAttValue *) - pqResultAlloc(result, nfields * sizeof(PGresAttValue), TRUE); - if (conn->curTuple == NULL) - goto outOfMemory; - MemSet(conn->curTuple, 0, nfields * sizeof(PGresAttValue)); - } - tup = conn->curTuple; - /* Get the field count and make sure it's what we expect */ if (pqGetInt(&tupnfields, 2, conn)) return EOF; @@ -663,48 +653,70 @@ getAnotherTuple(PGconn *conn, int msgLength) if (pqGetInt(&vlen, 4, conn)) returnEOF; if (vlen == -1) - { - /* null field */ - tup[i].value = result->null_field; - tup[i].len = NULL_LEN; - continue; - } - if (vlen < 0) + vlen = NULL_LEN; + else if (vlen < 0) vlen = 0; - if (tup[i].value == NULL) - { - bool isbinary = (result->attDescs[i].format != 0); + + /* + * Buffer content may be shifted on reloading additional + * data. So we must set all pointers on every scan. + * + * rowval[i].value always points to the next address of the + * length field even if the value length is zero or the value + * is NULL for the access safety. + */ + rowval[i].value = conn->inBuffer + conn->inCursor; + rowval[i].len = vlen; - tup[i].value = (char *) pqResultAlloc(result, vlen + 1, isbinary); - if (tup[i].value == NULL) - goto outOfMemory; - } - tup[i].len = vlen; - /* read in the value */ - if (vlen > 0) - if (pqGetnchar((char *) (tup[i].value), vlen, conn)) - return EOF; - /* we have to terminate this ourselves */ - tup[i].value[vlen] = '\0'; + /* Skip to the next length field */ + if (vlen > 0 && pqSkipnchar(vlen, conn)) + return EOF; } - /* Success! Store the completed tuple in the result */ - if (!pqAddTuple(result, tup)) - goto outOfMemory; - /* and reset for a new message */ - conn->curTuple = NULL; + /* + * Set rowval[nfields] for the access safety. We can estimate the + * length of the buffer to store by + * + * rowval[nfields].value - rowval[0].value - 4 * nfields. + */ + rowval[nfields].value = conn->inBuffer + conn->inCursor; + rowval[nfields].len = NULL_LEN; + + /* Success! Pass the completed row values to rowProcessor */ + if (!result->rowProcessor(result, result->rowProcessorParam, rowval)) + goto rowProcessError; + + /* Free garbage error message. */ + if (result->rowProcessorErrMes) + { + free(result->rowProcessorErrMes); + result->rowProcessorErrMes = NULL; + } return 0; -outOfMemory: +rowProcessError: /* * Replace partially constructed result with an error result. First * discard the old resultto try to win back some memory. */ pqClearAsyncResult(conn); - printfPQExpBuffer(&conn->errorMessage, - libpq_gettext("out of memory for query result\n")); + resetPQExpBuffer(&conn->errorMessage); + + /* + * If error message is passed from addTupleFunc, set it into + * PGconn, assume out of memory if not. + */ + appendPQExpBufferStr(&conn->errorMessage, + libpq_gettext(result->rowProcessorErrMes ? + result->rowProcessorErrMes : + "out of memory for query result\n")); + if (result->rowProcessorErrMes) + { + free(result->rowProcessorErrMes); + result->rowProcessorErrMes = NULL; + } pqSaveErrorResult(conn); /* Discard the failed message by pretending we read it */ diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h index ef26ab9..27ef007 100644 --- a/src/interfaces/libpq/libpq-fe.h +++ b/src/interfaces/libpq/libpq-fe.h @@ -149,6 +149,16 @@ typedef struct pgNotify struct pgNotify *next; /* list link */} PGnotify; +/* PGrowValue represents a value of one tuple field in string form, + used by RowProcessor. NULL is represented as len < 0. Otherwise + value points to a string without null termination of the length of + len. */ +typedef struct pgRowValue +{ + int len; /* length in bytes of the value */ + char *value; /* actual value, without null termination */ +} PGrowValue; +/* Function types for notice-handling callbacks */typedef void (*PQnoticeReceiver) (void *arg, const PGresult *res);typedefvoid (*PQnoticeProcessor) (void *arg, const char *message); @@ -416,6 +426,32 @@ extern PGPing PQping(const char *conninfo);extern PGPing PQpingParams(const char *const * keywords, const char *const * values, int expand_dbname); +/* + * Typedef for alternative row processor. + * + * This function must return 1 for success and must return 0 for + * failure and may set error message by PQsetRowProcessorErrMes. It + * is assumed by caller as out of memory when the error message is not + * set on failure. This function is assumed not to throw any + * exception. + */ + typedef int (*RowProcessor)(PGresult *res, void *param, + PGrowValue *columns); + +/* + * Register alternative result store function to PGconn. + * + * By registering this function, pg_result disables its own result + * store and calls it for rows one by one. + * + * func is row processor function. See the typedef RowProcessor. + * + * rowProcessorParam is the contextual variable that passed to + * RowProcessor. + */ +extern void PQregisterRowProcessor(PGconn *conn, RowProcessor func, + void *rowProcessorParam); +/* Force the write buffer to be written (or at least try) */extern int PQflush(PGconn *conn); @@ -454,6 +490,7 @@ extern char *PQcmdTuples(PGresult *res);extern char *PQgetvalue(const PGresult *res, int tup_num, intfield_num);extern int PQgetlength(const PGresult *res, int tup_num, int field_num);extern int PQgetisnull(constPGresult *res, int tup_num, int field_num); +extern void PQsetRowProcessorErrMes(PGresult *res, char *mes);extern int PQnparams(const PGresult *res);extern Oid PQparamtype(const PGresult *res, int param_num); diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index d967d60..06d8b26 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -209,6 +209,11 @@ struct pg_result PGresult_data *curBlock; /* most recently allocated block */ int curOffset; /* start offset of free space in block */ int spaceLeft; /* number of free bytesremaining in block */ + + RowProcessor rowProcessor; /* Result row processor handler. See + * RowProcessor for details. */ + void *rowProcessorParam; /* Contextual parameter for rowProcessor */ + char *rowProcessorErrMes; /* Error message from rowProcessor */};/* PGAsyncStatusType defines the state of the query-executionstate machine */ @@ -398,7 +403,6 @@ struct pg_conn /* Status for asynchronous result construction */ PGresult *result; /* result being constructed */ - PGresAttValue *curTuple; /* tuple currently being read */#ifdef USE_SSL bool allow_ssl_try; /* Allowedto try SSL negotiation */ @@ -443,6 +447,13 @@ struct pg_conn /* Buffer for receiving various parts of messages */ PQExpBufferData workBuffer;/* expansible string */ + + /* Tuple store handler. The two fields below is copied to newly + * created PGresult if rowProcessor is not NULL. Use default + * function if NULL. */ + RowProcessor rowProcessor; /* Result row processor. See + * RowProcessor for details. */ + void *rowProcessorParam; /* Contextual parameter for rowProcessor */};/* PGcancel stores all data necessary to cancela connection. A copy of this @@ -507,7 +518,7 @@ extern voidpqInternalNotice(const PGNoticeHooks *hooks, const char *fmt,...)/* This lets gcc check theformat string for consistency. */__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); -extern int pqAddTuple(PGresult *res, PGresAttValue *tup); +extern int pqAddRow(PGresult *res, void *param, PGrowValue *columns);extern void pqSaveMessageField(PGresult *res, charcode, const char *value);extern void pqSaveParameterStatus(PGconn *conn, const char *name, @@ -560,6 +571,7 @@ extern int pqGets(PQExpBuffer buf, PGconn *conn);extern int pqGets_append(PQExpBuffer buf, PGconn*conn);extern int pqPuts(const char *s, PGconn *conn);extern int pqGetnchar(char *s, size_t len, PGconn *conn); +extern int pqSkipnchar(size_t len, PGconn *conn);extern int pqPutnchar(const char *s, size_t len, PGconn *conn);externint pqGetInt(int *result, size_t bytes, PGconn *conn);extern int pqPutInt(int value, size_t bytes, PGconn*conn); diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml index 72c9384..5417df1 100644 --- a/doc/src/sgml/libpq.sgml +++ b/doc/src/sgml/libpq.sgml @@ -7233,6 +7233,199 @@ int PQisthreadsafe(); </sect1> + <sect1 id="libpq-alterrowprocessor"> + <title>Alternative row processor</title> + + <indexterm zone="libpq-alterrowprocessor"> + <primary>PGresult</primary> + <secondary>PGconn</secondary> + </indexterm> + + <para> + As the standard usage, users can get the result of command + execution from <structname>PGresult</structname> aquired + with <function>PGgetResult</function> + from <structname>PGConn</structname>. While the memory areas for + the PGresult are allocated with malloc() internally within calls of + command execution functions such as <function>PQexec</function> + and <function>PQgetResult</function>. If you have difficulties to + handle the result records in the form of PGresult, you can instruct + PGconn to pass every row to your own row processor instead of + storing into PGresult. + </para> + + <variablelist> + <varlistentry id="libpq-registerrowprocessor"> + <term> + <function>PQregisterRowProcessor</function> + <indexterm> + <primary>PQregisterRowProcessor</primary> + </indexterm> + </term> + + <listitem> + <para> + Sets a callback function to process each row. +<synopsis> +void PQregisterRowProcessor(PGconn *conn, + RowProcessor func, + void *param); +</synopsis> + </para> + + <para> + <variablelist> + <varlistentry> + <term><parameter>conn</parameter></term> + <listitem> + <para> + The connection object to set the storage handler + function. PGresult created from this connection calls this + function to process each row. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>func</parameter></term> + <listitem> + <para> + Storage handler function to set. NULL means to use the + default processor. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter passed + to <parameter>func</parameter>. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-rowprocessor"> + <term> + <type>RowProcessor</type> + <indexterm> + <primary>RowProcessor</primary> + </indexterm> + </term> + + <listitem> + <para> + The type for the row processor callback function. +<synopsis> +bool (*RowProcessor)(PGresult *res, + void *param, + PGrowValue *columns); + +typedef struct +{ + int len; /* length in bytes of the value */ + char *value; /* actual value, without null termination */ +} PGrowValue; + +</synopsis> + </para> + + <para> + This function must return TRUE for success, and FALSE for + failure. On failure this function should set the error message + with <function>PGsetRowProcessorErrMes</function> if the cause + is other than out of memory. This funcion must not throw any + exception. + </para> + <variablelist> + <varlistentry> + + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object. + </para> + </listitem> + </varlistentry> + <varlistentry> + + <term><parameter>param</parameter></term> + <listitem> + <para> + A pointer to contextual parameter which is registered + by <function>PQregisterRowProcessor</function>. + </para> + </listitem> + </varlistentry> + <varlistentry> + + <term><parameter>columns</parameter></term> + <listitem> + <para> + Column values of the row to process. + </para> + </listitem> + </varlistentry> + </variablelist> + </listitem> + </varlistentry> + </variablelist> + + <variablelist> + <varlistentry id="libpq-pqsetrowprocessorerrmes"> + <term> + <function>PQsetRowProcessorErrMes</function> + <indexterm> + <primary>PQsetRowProcessorErrMes</primary> + </indexterm> + </term> + <listitem> + <para> + Set the message for the error occurred + in <type>RowProcessor</type>. If this message is not set, the + caller assumes the error to be out of memory. +<synopsis> +void PQsetRowProcessorErrMes(PGresult *res, char *mes) +</synopsis> + </para> + <para> + <variablelist> + <varlistentry> + <term><parameter>res</parameter></term> + <listitem> + <para> + A pointer to the <type>PGresult</type> object + passed to <type>RowProcessor</type>. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><parameter>mes</parameter></term> + <listitem> + <para> + Error message. This will be copied internally so there is + no need to care of the scope. + </para> + <para> + If <parameter>res</parameter> already has a message previously + set, it will be overritten. Set NULL to cancel the the costom + message. + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + </listitem> + </varlistentry> + </variablelist> + </sect1> + + <sect1 id="libpq-build"> <title>Building <application>libpq</application> Programs</title> diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 36a8e3e..e6edcd5 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -63,11 +63,23 @@ typedef struct remoteConn bool newXactForCursor; /* Opened a transaction for a cursor*/} remoteConn; +typedef struct storeInfo +{ + Tuplestorestate *tuplestore; + int nattrs; + MemoryContext oldcontext; + AttInMetadata *attinmeta; + char** valbuf; + int *valbuflen; + bool error_occurred; + bool nummismatch; + ErrorData *edata; +} storeInfo; +/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); -static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn); @@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel, int2vector *pkattnums_arg, int32 pknumatts_arg, int **pkattnums, int *pknumatts); +static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo); +static void finishStoreInfo(storeInfo *sinfo); +static int storeHandler(PGresult *res, void *param, PGrowValue *columns); +/* Global */static remoteConn *pconn = NULL; @@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS) char *curname = NULL; int howmany = 0; bool fail = true; /* default to backward compatible */ + storeInfo storeinfo; DBLINK_INIT; @@ -559,15 +576,36 @@ dblink_fetch(PG_FUNCTION_ARGS) appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterRowProcessor(conn, storeHandler, &storeinfo); + + /* * Try to execute the query. Note that since libpq uses malloc, the * PGresult will be long-lived even thoughwe are still in a short-lived * memory context. */ res = PQexec(conn, buf.data); + finishStoreInfo(&storeinfo); + if (!res || (PQresultStatus(res) != PGRES_COMMAND_OK && PQresultStatus(res) != PGRES_TUPLES_OK)) { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + dblink_res_error(conname, res, "could not fetch from cursor", fail); return (Datum) 0; } @@ -579,8 +617,8 @@ dblink_fetch(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_CURSOR_NAME), errmsg("cursor \"%s\" does not exist", curname))); } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} @@ -640,6 +678,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ bool freeconn = false; + storeInfo storeinfo; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo,ReturnSetInfo)) @@ -715,164 +754,214 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) rsinfo->setResult = NULL; rsinfo->setDesc= NULL; + + /* + * Result is stored into storeinfo.tuplestore instead of + * res->result retuned by PQexec/PQgetResult below + */ + initStoreInfo(&storeinfo, fcinfo); + PQregisterRowProcessor(conn, storeHandler, &storeinfo); + /* synchronous query, or async result retrieval */ if (!is_async) res = PQexec(conn, sql); else - { res = PQgetResult(conn); - /* NULL means we're all done with the async results */ - if (!res) - return (Datum) 0; - } - /* if needed, close the connection to the database and cleanup */ - if (freeconn) - PQfinish(conn); + finishStoreInfo(&storeinfo); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + /* NULL res from async get means we're all done with the results */ + if (res || !is_async) { - dblink_res_error(conname, res, "could not execute query", fail); - return (Datum) 0; + if (freeconn) + PQfinish(conn); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + /* finishStoreInfo saves the fields referred to below. */ + if (storeinfo.nummismatch) + { + /* This is only for backward compatibility */ + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + } + else if (storeinfo.edata) + ReThrowError(storeinfo.edata); + + dblink_res_error(conname, res, "could not execute query", fail); + return (Datum) 0; + } } + PQclear(res); - materializeResult(fcinfo, res); return (Datum) 0;} -/* - * Materialize the PGresult to return them as the function result. - * The res will be released in this function. - */static void -materializeResult(FunctionCallInfo fcinfo, PGresult *res) +initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + int i; + + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + { + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + sinfo->oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + + /* make sure we have a persistent copy of the tupdesc */ + tupdesc = CreateTupleDescCopy(tupdesc); + + sinfo->error_occurred = FALSE; + sinfo->nummismatch = FALSE; + sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); + sinfo->edata = NULL; + sinfo->nattrs = tupdesc->natts; + sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem); + + /* Preallocate memory of same size with c string array for values. */ + sinfo->valbuf = (char **) malloc(sinfo->nattrs * sizeof(char*)); + sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int)); + if (sinfo->valbuf == NULL || sinfo->valbuflen == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); - Assert(rsinfo->returnMode == SFRM_Materialize); - - PG_TRY(); + for (i = 0 ; i < sinfo->nattrs ; i++) { - TupleDesc tupdesc; - bool is_sql_cmd = false; - int ntuples; - int nfields; + sinfo->valbuf[i] = NULL; + sinfo->valbuflen[i] = -1; + } - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - is_sql_cmd = true; - - /* - * need a tuple descriptor representing one TEXT column to return - * the command status string as our result tuple - */ - tupdesc = CreateTemplateTupleDesc(1, false); - TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", - TEXTOID, -1, 0); - ntuples = 1; - nfields = 1; - } - else - { - Assert(PQresultStatus(res) == PGRES_TUPLES_OK); + rsinfo->setResult = sinfo->tuplestore; + rsinfo->setDesc = tupdesc; +} - is_sql_cmd = false; +static void +finishStoreInfo(storeInfo *sinfo) +{ + int i; - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + if (sinfo->valbuf) + { + for (i = 0 ; i < sinfo->nattrs ; i++) + { + if (sinfo->valbuf[i]) { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; + free(sinfo->valbuf[i]); + sinfo->valbuf[i] = NULL; } - - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - ntuples = PQntuples(res); - nfields = PQnfields(res); } + free(sinfo->valbuf); + sinfo->valbuf = NULL; + } - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); - - if (ntuples > 0) - { - AttInMetadata *attinmeta; - Tuplestorestate *tupstore; - MemoryContext oldcontext; - int row; - char **values; - - attinmeta = TupleDescGetAttInMetadata(tupdesc); - - oldcontext = MemoryContextSwitchTo( - rsinfo->econtext->ecxt_per_query_memory); - tupstore = tuplestore_begin_heap(true, false, work_mem); - rsinfo->setResult = tupstore; - rsinfo->setDesc = tupdesc; - MemoryContextSwitchTo(oldcontext); - - values = (char **) palloc(nfields * sizeof(char *)); + if (sinfo->valbuflen) + { + free(sinfo->valbuflen); + sinfo->valbuflen = NULL; + } + MemoryContextSwitchTo(sinfo->oldcontext); +} - /* put all tuples into the tuplestore */ - for (row = 0; row < ntuples; row++) - { - HeapTuple tuple; +static int +storeHandler(PGresult *res, void *param, PGrowValue *columns) +{ + storeInfo *sinfo = (storeInfo *)param; + HeapTuple tuple; + int fields = PQnfields(res); + int i; + char *cstrs[PQnfields(res)]; - if (!is_sql_cmd) - { - int i; + if (sinfo->error_occurred) + return FALSE; - for (i = 0; i < nfields; i++) - { - if (PQgetisnull(res, row, i)) - values[i] = NULL; - else - values[i] = PQgetvalue(res, row, i); - } - } - else - { - values[0] = PQcmdStatus(res); - } + if (sinfo->nattrs != fields) + { + sinfo->error_occurred = TRUE; + sinfo->nummismatch = TRUE; + finishStoreInfo(sinfo); + + /* This error will be processed in + * dblink_record_internal(). So do not set error message + * here. */ + return FALSE; + } - /* build the tuple and put it into the tuplestore. */ - tuple = BuildTupleFromCStrings(attinmeta, values); - tuplestore_puttuple(tupstore, tuple); + /* + * value input functions assumes that the input string is + * terminated by zero. We should make the values to be so. + */ + for(i = 0 ; i < fields ; i++) + { + int len = columns[i].len; + if (len < 0) + cstrs[i] = NULL; + else + { + if (sinfo->valbuf[i] == NULL) + { + sinfo->valbuf[i] = (char *)malloc(len + 1); + sinfo->valbuflen[i] = len + 1; + } + else if (sinfo->valbuflen[i] < len + 1) + { + sinfo->valbuf[i] = (char *)realloc(sinfo->valbuf[i], len + 1); + sinfo->valbuflen[i] = len + 1; } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupstore); + if (sinfo->valbuf[i] == NULL) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + + cstrs[i] = sinfo->valbuf[i]; + memcpy(cstrs[i], columns[i].value, len); + cstrs[i][len] = '\0'; } + } - PQclear(res); + PG_TRY(); + { + tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs); + tuplestore_puttuple(sinfo->tuplestore, tuple); } PG_CATCH(); { - /* be sure to release the libpq result */ - PQclear(res); - PG_RE_THROW(); + MemoryContext context; + /* + * Store exception for later ReThrow and cancel the exception. + */ + sinfo->error_occurred = TRUE; + context = MemoryContextSwitchTo(sinfo->oldcontext); + sinfo->edata = CopyErrorData(); + MemoryContextSwitchTo(context); + FlushErrorState(); + return FALSE; } PG_END_TRY(); + + return TRUE;}/*
On Mon, Jan 30, 2012 at 06:06:57PM +0900, Kyotaro HORIGUCHI wrote: > The gain of performance is more than expected. Measure script now > does query via dblink ten times for stability of measuring, so > the figures become about ten times longer than the previous ones. > > sec % to Original > Original : 31.5 100.0% > RowProcessor patch : 31.3 99.4% > dblink patch : 24.6 78.1% > > RowProcessor patch alone makes no loss or very-little gain, and > full patch gives us 22% gain for the benchmark(*1). Excellent! > - The callers of RowProcessor() no more set null_field to > PGrowValue.value. Plus, the PGrowValue[] which RowProcessor() > receives has nfields + 1 elements to be able to make rough > estimate by cols->value[nfields].value - cols->value[0].value - > something. The somthing here is 4 * nfields for protocol3 and > 4 * (non-null fields) for protocol2. I fear that this applies > only for textual transfer usage... Excact estimate is not important here. And (nfields + 1) elem feels bit too much magic, considering that most users probably do not need it. Without it, the logic would be: total = last.value - first.value + ((last.len > 0) ? last.len : 0) which isn't too complex. So I think we can remove it. = Problems = * Remove the dubious memcpy() in pqAddRow() * I think the dynamic arrays in getAnotherTuple() are not portable enough, please do proper allocation for array. I guessin PQsetResultAttrs()? = Minor notes = These can be argued either way, if you don't like some suggestion, you can drop it. * Move PQregisterRowProcessor() into fe-exec.c, then we can make pqAddRow static. * Should PQclear() free RowProcessor error msg? It seems it should not get outside from getAnotherTuple(), but thats notcertain. Perhaps it would be clearer to free it here too. * Remove the part of comment in getAnotherTuple(): * Buffer content may be shifted on reloading additional * data. So wemust set all pointers on every scan. It's confusing why it needs to clarify that, as there is nobody expecting it. * PGrowValue documentation should mention that ->value pointer is always valid. * dblink: Perhaps some of those mallocs() could be replaced with pallocs() or even StringInfo, which already does the reallocdance? I'm not familiar with dblink, and various struct lifetimes there so I don't know it that actually makes senseor not. It seems this patch is getting ReadyForCommitter soon... -- marko