Re: Speed dblink using alternate libpq tuple storage

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: Speed dblink using alternate libpq tuple storage
Дата
Msg-id 20120202.165137.155927529.horiguchi.kyotaro@oss.ntt.co.jp
обсуждение исходный текст
Ответ на Re: Speed dblink using alternate libpq tuple storage  (Kyotaro HORIGUCHI <horiguchi.kyotaro@oss.ntt.co.jp>)
Ответы Re: Speed dblink using alternate libpq tuple storage  (Marko Kreen <markokr@gmail.com>)
Список pgsql-hackers
Hello, This is new version of dblink.c

- Memory is properly freed when realloc returns NULL in storeHandler().

- The bug that free() in finishStoreInfo() will be fed with garbage pointer when malloc for sinfo->valbuflen fails is
fixed.

regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 36a8e3e..28c967c 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -63,11 +63,23 @@ typedef struct remoteConn    bool        newXactForCursor;        /* Opened a transaction for a
cursor*/} remoteConn;
 
+typedef struct storeInfo
+{
+    Tuplestorestate *tuplestore;
+    int nattrs;
+    MemoryContext oldcontext;
+    AttInMetadata *attinmeta;
+    char** valbuf;
+    int *valbuflen;
+    bool error_occurred;
+    bool nummismatch;
+    ErrorData *edata;
+} storeInfo;
+/* * Internal declarations */static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
-static void materializeResult(FunctionCallInfo fcinfo, PGresult *res);static remoteConn *getConnectionByName(const
char*name);static HTAB *createConnHash(void);static void createNewConnection(const char *name, remoteConn *rconn);
 
@@ -90,6 +102,10 @@ static char *escape_param_str(const char *from);static void validate_pkattnums(Relation rel,
          int2vector *pkattnums_arg, int32 pknumatts_arg,                   int **pkattnums, int *pknumatts);
 
+static void initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo);
+static void finishStoreInfo(storeInfo *sinfo);
+static int storeHandler(PGresult *res, void *param, PGrowValue *columns);
+/* Global */static remoteConn *pconn = NULL;
@@ -503,6 +519,7 @@ dblink_fetch(PG_FUNCTION_ARGS)    char       *curname = NULL;    int            howmany = 0;
bool       fail = true;    /* default to backward compatible */
 
+    storeInfo   storeinfo;    DBLINK_INIT;
@@ -559,15 +576,36 @@ dblink_fetch(PG_FUNCTION_ARGS)    appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
/*
 
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterRowProcessor(conn, storeHandler, &storeinfo);
+
+    /*     * Try to execute the query.  Note that since libpq uses malloc, the     * PGresult will be long-lived even
thoughwe are still in a short-lived     * memory context.     */    res = PQexec(conn, buf.data);
 
+    finishStoreInfo(&storeinfo);
+    if (!res ||        (PQresultStatus(res) != PGRES_COMMAND_OK &&         PQresultStatus(res) != PGRES_TUPLES_OK))
{
+        /* finishStoreInfo saves the fields referred to below. */
+        if (storeinfo.nummismatch)
+        {
+            /* This is only for backward compatibility */
+            ereport(ERROR,
+                    (errcode(ERRCODE_DATATYPE_MISMATCH),
+                     errmsg("remote query result rowtype does not match "
+                            "the specified FROM clause rowtype")));
+        }
+        else if (storeinfo.edata)
+            ReThrowError(storeinfo.edata);
+        dblink_res_error(conname, res, "could not fetch from cursor", fail);        return (Datum) 0;    }
@@ -579,8 +617,8 @@ dblink_fetch(PG_FUNCTION_ARGS)                (errcode(ERRCODE_INVALID_CURSOR_NAME),
errmsg("cursor \"%s\" does not exist", curname)));    }
 
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
@@ -640,6 +678,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    remoteConn *rconn = NULL;    bool
      fail = true;    /* default to backward compatible */    bool        freeconn = false;
 
+    storeInfo   storeinfo;    /* check to see if caller supports us returning a tuplestore */    if (rsinfo == NULL ||
!IsA(rsinfo,ReturnSetInfo))
 
@@ -715,164 +754,225 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)    rsinfo->setResult = NULL;
rsinfo->setDesc= NULL;
 
+
+    /*
+     * Result is stored into storeinfo.tuplestore instead of
+     * res->result retuned by PQexec/PQgetResult below
+     */
+    initStoreInfo(&storeinfo, fcinfo);
+    PQregisterRowProcessor(conn, storeHandler, &storeinfo);
+    /* synchronous query, or async result retrieval */    if (!is_async)        res = PQexec(conn, sql);    else
-    {        res = PQgetResult(conn);
-        /* NULL means we're all done with the async results */
-        if (!res)
-            return (Datum) 0;
-    }
-    /* if needed, close the connection to the database and cleanup */
-    if (freeconn)
-        PQfinish(conn);
+    finishStoreInfo(&storeinfo);
-    if (!res ||
-        (PQresultStatus(res) != PGRES_COMMAND_OK &&
-         PQresultStatus(res) != PGRES_TUPLES_OK))
+    /* NULL res from async get means we're all done with the results */
+    if (res || !is_async)    {
-        dblink_res_error(conname, res, "could not execute query", fail);
-        return (Datum) 0;
+        if (freeconn)
+            PQfinish(conn);
+
+        if (!res ||
+            (PQresultStatus(res) != PGRES_COMMAND_OK &&
+             PQresultStatus(res) != PGRES_TUPLES_OK))
+        {
+            /* finishStoreInfo saves the fields referred to below. */
+            if (storeinfo.nummismatch)
+            {
+                /* This is only for backward compatibility */
+                ereport(ERROR,
+                        (errcode(ERRCODE_DATATYPE_MISMATCH),
+                         errmsg("remote query result rowtype does not match "
+                                "the specified FROM clause rowtype")));
+            }
+            else if (storeinfo.edata)
+                ReThrowError(storeinfo.edata);
+
+            dblink_res_error(conname, res, "could not execute query", fail);
+            return (Datum) 0;
+        }    }
+    PQclear(res);
-    materializeResult(fcinfo, res);    return (Datum) 0;}
-/*
- * Materialize the PGresult to return them as the function result.
- * The res will be released in this function.
- */static void
-materializeResult(FunctionCallInfo fcinfo, PGresult *res)
+initStoreInfo(storeInfo *sinfo, FunctionCallInfo fcinfo){    ReturnSetInfo *rsinfo = (ReturnSetInfo *)
fcinfo->resultinfo;
+    TupleDesc    tupdesc;
+    int i;
+    
+    switch (get_call_result_type(fcinfo, NULL, &tupdesc))
+    {
+        case TYPEFUNC_COMPOSITE:
+            /* success */
+            break;
+        case TYPEFUNC_RECORD:
+            /* failed to determine actual type of RECORD */
+            ereport(ERROR,
+                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                     errmsg("function returning record called in context "
+                            "that cannot accept type record")));
+            break;
+        default:
+            /* result type isn't composite */
+            elog(ERROR, "return type must be a row type");
+            break;
+    }
+    
+    sinfo->oldcontext = MemoryContextSwitchTo(
+        rsinfo->econtext->ecxt_per_query_memory);
+
+    /* make sure we have a persistent copy of the tupdesc */
+    tupdesc = CreateTupleDescCopy(tupdesc);
+
+    sinfo->error_occurred = FALSE;
+    sinfo->nummismatch = FALSE;
+    sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+    sinfo->edata = NULL;
+    sinfo->nattrs = tupdesc->natts;
+    sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+    sinfo->valbuf = NULL;
+    sinfo->valbuflen = NULL;
+
+    /* Preallocate memory of same size with c string array for values. */
+    sinfo->valbuf = (char **)malloc(sinfo->nattrs * sizeof(char*));
+    if (sinfo->valbuf)
+        sinfo->valbuflen = (int *)malloc(sinfo->nattrs * sizeof(int));
+    if (sinfo->valbuflen == NULL)
+    {
+        if (sinfo->valbuf)
+            free(sinfo->valbuf);
-    Assert(rsinfo->returnMode == SFRM_Materialize);
+        ereport(ERROR,
+                (errcode(ERRCODE_OUT_OF_MEMORY),
+                 errmsg("out of memory")));
+    }
-    PG_TRY();
+    for (i = 0 ; i < sinfo->nattrs ; i++)    {
-        TupleDesc    tupdesc;
-        bool        is_sql_cmd = false;
-        int            ntuples;
-        int            nfields;
+        sinfo->valbuf[i] = NULL;
+        sinfo->valbuflen[i] = -1;
+    }
-        if (PQresultStatus(res) == PGRES_COMMAND_OK)
-        {
-            is_sql_cmd = true;
+    rsinfo->setResult = sinfo->tuplestore;
+    rsinfo->setDesc = tupdesc;
+}
-            /*
-             * need a tuple descriptor representing one TEXT column to return
-             * the command status string as our result tuple
-             */
-            tupdesc = CreateTemplateTupleDesc(1, false);
-            TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
-                               TEXTOID, -1, 0);
-            ntuples = 1;
-            nfields = 1;
-        }
-        else
+static void
+finishStoreInfo(storeInfo *sinfo)
+{
+    int i;
+
+    if (sinfo->valbuf)
+    {
+        for (i = 0 ; i < sinfo->nattrs ; i++)        {
-            Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
+            if (sinfo->valbuf[i])
+                free(sinfo->valbuf[i]);
+        }
+        free(sinfo->valbuf);
+        sinfo->valbuf = NULL;
+    }
-            is_sql_cmd = false;
+    if (sinfo->valbuflen)
+    {
+        free(sinfo->valbuflen);
+        sinfo->valbuflen = NULL;
+    }
+    MemoryContextSwitchTo(sinfo->oldcontext);
+}
-            /* get a tuple descriptor for our result type */
-            switch (get_call_result_type(fcinfo, NULL, &tupdesc))
-            {
-                case TYPEFUNC_COMPOSITE:
-                    /* success */
-                    break;
-                case TYPEFUNC_RECORD:
-                    /* failed to determine actual type of RECORD */
-                    ereport(ERROR,
-                            (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                        errmsg("function returning record called in context "
-                               "that cannot accept type record")));
-                    break;
-                default:
-                    /* result type isn't composite */
-                    elog(ERROR, "return type must be a row type");
-                    break;
-            }
+static int
+storeHandler(PGresult *res, void *param, PGrowValue *columns)
+{
+    storeInfo *sinfo = (storeInfo *)param;
+    HeapTuple  tuple;
+    int        fields = PQnfields(res);
+    int        i;
+    char      *cstrs[PQnfields(res)];
-            /* make sure we have a persistent copy of the tupdesc */
-            tupdesc = CreateTupleDescCopy(tupdesc);
-            ntuples = PQntuples(res);
-            nfields = PQnfields(res);
-        }
+    if (sinfo->error_occurred)
+        return FALSE;
-        /*
-         * check result and tuple descriptor have the same number of columns
-         */
-        if (nfields != tupdesc->natts)
-            ereport(ERROR,
-                    (errcode(ERRCODE_DATATYPE_MISMATCH),
-                     errmsg("remote query result rowtype does not match "
-                            "the specified FROM clause rowtype")));
+    if (sinfo->nattrs != fields)
+    {
+        sinfo->error_occurred = TRUE;
+        sinfo->nummismatch = TRUE;
+        finishStoreInfo(sinfo);
+
+        /* This error will be processed in
+         * dblink_record_internal(). So do not set error message
+         * here. */
+        return FALSE;
+    }
-        if (ntuples > 0)
+    /*
+     * value input functions assumes that the input string is
+     * terminated by zero. We should make the values to be so.
+     */
+    for(i = 0 ; i < fields ; i++)
+    {
+        int len = columns[i].len;
+        if (len < 0)
+            cstrs[i] = NULL;
+        else        {
-            AttInMetadata *attinmeta;
-            Tuplestorestate *tupstore;
-            MemoryContext oldcontext;
-            int            row;
-            char      **values;
-
-            attinmeta = TupleDescGetAttInMetadata(tupdesc);
-
-            oldcontext = MemoryContextSwitchTo(
-                                    rsinfo->econtext->ecxt_per_query_memory);
-            tupstore = tuplestore_begin_heap(true, false, work_mem);
-            rsinfo->setResult = tupstore;
-            rsinfo->setDesc = tupdesc;
-            MemoryContextSwitchTo(oldcontext);
-
-            values = (char **) palloc(nfields * sizeof(char *));
-
-            /* put all tuples into the tuplestore */
-            for (row = 0; row < ntuples; row++)
-            {
-                HeapTuple    tuple;
+            char *tmp = NULL;
-                if (!is_sql_cmd)
-                {
-                    int            i;
+            /*
+             * Divide calls to malloc and realloc so that things will
+             * go fine even on the systems of which realloc() does not
+             * accept NULL as old memory block.
+             */
+            if (sinfo->valbuf[i] == NULL)
+                tmp = (char *)malloc(len + 1);
+            else
+                tmp = (char *)realloc(sinfo->valbuf[i], len + 1);
-                    for (i = 0; i < nfields; i++)
-                    {
-                        if (PQgetisnull(res, row, i))
-                            values[i] = NULL;
-                        else
-                            values[i] = PQgetvalue(res, row, i);
-                    }
-                }
-                else
-                {
-                    values[0] = PQcmdStatus(res);
-                }
+            /*
+             * sinfo->valbuf[n] will be freed in finishStoreInfo()
+             * when realloc returns NULL.
+             */
+            if (tmp == NULL)
+                return FALSE;
-                /* build the tuple and put it into the tuplestore. */
-                tuple = BuildTupleFromCStrings(attinmeta, values);
-                tuplestore_puttuple(tupstore, tuple);
-            }
+            sinfo->valbuf[i] = tmp;
+            sinfo->valbuflen[i] = len + 1;
-            /* clean up and return the tuplestore */
-            tuplestore_donestoring(tupstore);
+            cstrs[i] = sinfo->valbuf[i];
+            memcpy(cstrs[i], columns[i].value, len);
+            cstrs[i][len] = '\0';        }
+    }
-        PQclear(res);
+    PG_TRY();
+    {
+        tuple = BuildTupleFromCStrings(sinfo->attinmeta, cstrs);
+        tuplestore_puttuple(sinfo->tuplestore, tuple);    }    PG_CATCH();    {
-        /* be sure to release the libpq result */
-        PQclear(res);
-        PG_RE_THROW();
+        MemoryContext context;
+        /*
+         * Store exception for later ReThrow and cancel the exception.
+         */
+        sinfo->error_occurred = TRUE;
+        context = MemoryContextSwitchTo(sinfo->oldcontext);
+        sinfo->edata = CopyErrorData();
+        MemoryContextSwitchTo(context);
+        FlushErrorState();
+        return FALSE;    }    PG_END_TRY();
+
+    return TRUE;}/*

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Heikki Linnakangas
Дата:
Сообщение: Re: Scaling XLog insertion (was Re: Moving more work outside WALInsertLock)
Следующее
От: Fujii Masao
Дата:
Сообщение: Re: pg_stats_recovery view