contrib/dblink update

Поиск
Список
Период
Сортировка
От Joe Conway
Тема contrib/dblink update
Дата
Msg-id 3CBA5374.1010400@joeconway.com
обсуждение исходный текст
Ответы Re: contrib/dblink update  (Tom Lane <tgl@sss.pgh.pa.us>)
Re: contrib/dblink update  (Bruce Momjian <pgman@candle.pha.pa.us>)
Re: contrib/dblink update  (Bruce Momjian <pgman@candle.pha.pa.us>)
Список pgsql-patches
Attached is an update to contrib/dblink. Please apply if there are no
objections.

Major changes:
   - removed cursor wrap around input sql to allow for remote
     execution of INSERT/UPDATE/DELETE
   - dblink now returns a resource id instead of a real pointer
   - added several utility functions

I'm still hoping to add explicit cursor open/fetch/close support before
7.3 is released, but I need a bit more time on that.

On a somewhat unrelated topic, I never got any feedback on the
unknownin/out patch and the mb_substring patch. Is there anything else I
need to do to get those applied?

Thanks,

Joe
diff -cNr dblink.orig/README.dblink dblink/README.dblink
*** dblink.orig/README.dblink    Thu Dec 13 02:48:39 2001
--- dblink/README.dblink    Sun Apr 14 20:02:06 2002
***************
*** 3,9 ****
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
--- 3,10 ----
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
!  * ALL RIGHTS RESERVED;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
***************
*** 25,36 ****
   */


! Version 0.3 (14 June, 2001):
!   Function to test returning data set from remote database
!   Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel

  Release Notes:

    Version 0.3
      - fixed dblink invalid pointer causing corrupt elog message
      - fixed dblink_tok improper handling of null results
--- 26,44 ----
   */


! Version 0.4 (7 April, 2002):
!   Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and
!   various utility functions.
!   Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel

  Release Notes:

+   Version 0.4
+     - removed cursor wrap around input sql to allow for remote
+       execution of INSERT/UPDATE/DELETE
+     - dblink now returns a resource id instead of a real pointer
+     - added several utility functions -- see below
+
    Version 0.3
      - fixed dblink invalid pointer causing corrupt elog message
      - fixed dblink_tok improper handling of null results
***************
*** 51,64 ****

    installs following functions into database template1:

!      dblink() - returns a pointer to results from remote query
!      dblink_tok() - extracts and returns individual field results

  Documentation
  ==================================================================
  Name

! dblink -- Returns a pointer to a data set from a remote database

  Synopsis

--- 59,94 ----

    installs following functions into database template1:

!      dblink(text,text) RETURNS setof int
!        - returns a resource id for results from remote query
!      dblink_tok(int,int) RETURNS text
!        - extracts and returns individual field results
!      dblink_strtok(text,text,int) RETURNS text
!        - extracts and returns individual token from delimited text
!      dblink_get_pkey(name) RETURNS setof text
!        - returns the field names of a relation's primary key fields
!      dblink_last_oid(int) RETURNS oid
!        - returns the last inserted oid
!      dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text
!        - builds an insert statement using a local tuple, replacing the
!          selection key field values with alternate supplied values
!      dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text
!        - builds a delete statement using supplied values for selection
!          key field values
!      dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text
!        - builds an update statement using a local tuple, replacing the
!          selection key field values with alternate supplied values
!      dblink_current_query() RETURNS text
!        - returns the current query string
!      dblink_replace(text,text,text) RETURNS text
!        - replace all occurences of substring-a in the input-string
!          with substring-b

  Documentation
  ==================================================================
  Name

! dblink -- Returns a resource id for a data set from a remote database

  Synopsis

***************
*** 78,84 ****

  Outputs

!   Returns setof int (pointer)

  Example usage

--- 108,114 ----

  Outputs

!   Returns setof int (res_id)

  Example usage

***************
*** 94,106 ****

  Synopsis

! dblink_tok(int pointer, int fnumber)

  Inputs

!   pointer

!     a pointer returned by a call to dblink()

    fnumber

--- 124,136 ----

  Synopsis

! dblink_tok(int res_id, int fnumber)

  Inputs

!   res_id

!     a resource id returned by a call to dblink()

    fnumber

***************
*** 131,136 ****
--- 161,415 ----
     select f1, f2 from myremotetable where f1 like 'bytea%';

  ==================================================================
+ Name
+
+ dblink_strtok -- Extracts and returns individual token from delimited text
+
+ Synopsis
+
+ dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
+
+ Inputs
+
+   inputstring
+
+     any string you want to parse a token out of;
+     e.g. 'f=1&g=3&h=4'
+
+   delimiter
+
+     a single character to use as the delimiter;
+     e.g. '&' or '='
+
+   posn
+
+     the position of the token of interest, 0 based;
+     e.g. 1
+
+ Outputs
+
+   Returns text
+
+ Example usage
+
+ test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
+  dblink_strtok
+ ---------------
+  3
+ (1 row)
+
+ ==================================================================
+ Name
+
+ dblink_get_pkey -- returns the field names of a relation's primary
+                    key fields
+
+ Synopsis
+
+ dblink_get_pkey(name relname) RETURNS setof text
+
+ Inputs
+
+   relname
+
+     any relation name;
+     e.g. 'foobar'
+
+ Outputs
+
+   Returns setof text -- one row for each primary key field, in order of
+                         precedence
+
+ Example usage
+
+ test=# select dblink_get_pkey('foobar');
+  dblink_get_pkey
+ -----------------
+  f1
+  f2
+  f3
+  f4
+  f5
+ (5 rows)
+
+
+ ==================================================================
+ Name
+
+ dblink_last_oid -- Returns last inserted oid
+
+ Synopsis
+
+ dblink_last_oid(int res_id) RETURNS oid
+
+ Inputs
+
+   res_id
+
+     any resource id returned by dblink function;
+
+ Outputs
+
+   Returns oid of last inserted tuple
+
+ Example usage
+
+ test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
+                ,'insert into mytable (f1, f2) values (1,2)'));
+
+  dblink_last_oid
+ ----------------
+  16553
+ (1 row)
+
+
+ ==================================================================
+ Name
+
+ dblink_build_sql_insert -- builds an insert statement using a local
+                            tuple, replacing the selection key field
+                            values with alternate supplied values
+ dblink_build_sql_delete -- builds a delete statement using supplied
+                            values for selection key field values
+ dblink_build_sql_update -- builds an update statement using a local
+                            tuple, replacing the selection key field
+                            values with alternate supplied values
+
+
+ Synopsis
+
+ dblink_build_sql_insert(name relname
+                          ,int2vector primary_key_attnums
+                          ,int2 num_primary_key_atts
+                          ,_text src_pk_att_vals_array
+                          ,_text tgt_pk_att_vals_array) RETURNS text
+ dblink_build_sql_delete(name relname
+                          ,int2vector primary_key_attnums
+                          ,int2 num_primary_key_atts
+                          ,_text tgt_pk_att_vals_array) RETURNS text
+ dblink_build_sql_update(name relname
+                          ,int2vector primary_key_attnums
+                          ,int2 num_primary_key_atts
+                          ,_text src_pk_att_vals_array
+                          ,_text tgt_pk_att_vals_array) RETURNS text
+
+ Inputs
+
+   relname
+
+     any relation name;
+     e.g. 'foobar'
+
+   primary_key_attnums
+
+     vector of primary key attnums (1 based, see pg_index.indkey);
+     e.g. '1 2'
+
+   num_primary_key_atts
+
+     number of primary key attnums in the vector; e.g. 2
+
+   src_pk_att_vals_array
+
+     array of primary key values, used to look up the local matching
+     tuple, the values of which are then used to construct the SQL
+     statement
+
+   tgt_pk_att_vals_array
+
+     array of primary key values, used to replace the local tuple
+     values in the SQL statement
+
+ Outputs
+
+   Returns text -- requested SQL statement
+
+ Example usage
+
+ test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
+              dblink_build_sql_insert
+ --------------------------------------------------
+  INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
+ (1 row)
+
+ test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
+            dblink_build_sql_delete
+ ---------------------------------------------
+  DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
+ (1 row)
+
+ test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
+                    dblink_build_sql_update
+ -------------------------------------------------------------
+  UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
+ (1 row)
+
+
+ ==================================================================
+ Name
+
+ dblink_current_query -- returns the current query string
+
+ Synopsis
+
+ dblink_current_query () RETURNS text
+
+ Inputs
+
+   None
+
+ Outputs
+
+   Returns text -- a copy of the currently executing query
+
+ Example usage
+
+ test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where
proname= ''byteacat''') as f1) as t1; 
+                                                                 dblink_current_query
+
-----------------------------------------------------------------------------------------------------------------------------------------------------
+  select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname
=''byteacat''') as f1) as t1; 
+ (1 row)
+
+
+ ==================================================================
+ Name
+
+ dblink_replace -- replace all occurences of substring-a in the
+                   input-string with substring-b
+
+ Synopsis
+
+ dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
+
+ Inputs
+
+   input-string
+
+     the starting string, before replacement of substring-a
+
+   substring-a
+
+     the substring to find and replace
+
+   substring-b
+
+     the substring to be substituted in place of substring-a
+
+ Outputs
+
+   Returns text -- a copy of the starting string, but with all occurences of
+                   substring-a replaced with substring-b
+
+ Example usage
+
+ test=# select dblink_replace('12345678901234567890','56','hello');
+        dblink_replace
+ ----------------------------
+  1234hello78901234hello7890
+ (1 row)
+
+ ==================================================================
+

  -- Joe Conway

diff -cNr dblink.orig/dblink.c dblink/dblink.c
*** dblink.orig/dblink.c    Wed Oct 24 22:49:19 2001
--- dblink/dblink.c    Sun Apr 14 20:03:30 2002
***************
*** 3,9 ****
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
--- 3,10 ----
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
!  * ALL RIGHTS RESERVED;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
***************
*** 26,48 ****

  #include "dblink.h"

  PG_FUNCTION_INFO_V1(dblink);
  Datum
  dblink(PG_FUNCTION_ARGS)
  {
!     PGconn       *conn = NULL;
!     PGresult   *res = NULL;
!     dblink_results *results;
!     char       *optstr;
!     char       *sqlstatement;
!     char       *curstr = "DECLARE mycursor CURSOR FOR ";
!     char       *execstatement;
!     char       *msg;
!     int            ntuples = 0;
!     ReturnSetInfo *rsi;
!
!     if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
!         elog(ERROR, "dblink: NULL arguments are not permitted");

      if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
          elog(ERROR, "dblink: function called in context that does not accept a set result");
--- 27,49 ----

  #include "dblink.h"

+ /* Global */
+ List    *res_id = NIL;
+ int        res_id_index = 0;
+
  PG_FUNCTION_INFO_V1(dblink);
  Datum
  dblink(PG_FUNCTION_ARGS)
  {
!     PGconn            *conn = NULL;
!     PGresult        *res = NULL;
!     dblink_results    *results;
!     char            *optstr;
!     char            *sqlstatement;
!     char            *execstatement;
!     char            *msg;
!     int                ntuples = 0;
!     ReturnSetInfo    *rsi;

      if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
          elog(ERROR, "dblink: function called in context that does not accept a set result");
***************
*** 61,81 ****
              elog(ERROR, "dblink: connection error: %s", msg);
          }

!         res = PQexec(conn, "BEGIN");
!         if (PQresultStatus(res) != PGRES_COMMAND_OK)
!         {
!             msg = pstrdup(PQerrorMessage(conn));
!             PQclear(res);
!             PQfinish(conn);
!             elog(ERROR, "dblink: begin error: %s", msg);
!         }
!         PQclear(res);
!
!         execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
          if (execstatement != NULL)
          {
!             strcpy(execstatement, curstr);
!             strcat(execstatement, sqlstatement);
              strcat(execstatement, "\0");
          }
          else
--- 62,71 ----
              elog(ERROR, "dblink: connection error: %s", msg);
          }

!         execstatement = (char *) palloc(strlen(sqlstatement) + 1);
          if (execstatement != NULL)
          {
!             strcpy(execstatement, sqlstatement);
              strcat(execstatement, "\0");
          }
          else
***************
*** 94,163 ****
              /*
               * got results, start fetching them
               */
-             PQclear(res);
-
-             res = PQexec(conn, "FETCH ALL in mycursor");
-             if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
-             {
-                 msg = pstrdup(PQerrorMessage(conn));
-                 PQclear(res);
-                 PQfinish(conn);
-                 elog(ERROR, "dblink: sql error: %s", msg);
-             }
-
              ntuples = PQntuples(res);

!             if (ntuples > 0)
!             {
!
!                 results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
!                 results->tup_num = 0;
!                 results->res = res;
!                 res = NULL;
!
!                 fcinfo->flinfo->fn_extra = (void *) results;
!
!                 results = NULL;
!                 results = fcinfo->flinfo->fn_extra;
!
!                 /* close the cursor */
!                 res = PQexec(conn, "CLOSE mycursor");
!                 PQclear(res);
!
!                 /* commit the transaction */
!                 res = PQexec(conn, "COMMIT");
!                 PQclear(res);
!
!                 /* close the connection to the database and cleanup */
!                 PQfinish(conn);
!
!                 rsi = (ReturnSetInfo *) fcinfo->resultinfo;
!                 rsi->isDone = ExprMultipleResult;
!
!                 PG_RETURN_POINTER(results);
!
!             }
!             else
!             {

!                 PQclear(res);

!                 /* close the cursor */
!                 res = PQexec(conn, "CLOSE mycursor");
!                 PQclear(res);

!                 /* commit the transaction */
!                 res = PQexec(conn, "COMMIT");
!                 PQclear(res);

!                 /* close the connection to the database and cleanup */
!                 PQfinish(conn);

!                 rsi = (ReturnSetInfo *) fcinfo->resultinfo;
!                 rsi->isDone = ExprEndResult;

!                 PG_RETURN_NULL();
!             }
          }
      }
      else
--- 84,119 ----
              /*
               * got results, start fetching them
               */
              ntuples = PQntuples(res);

!             /*
!              * increment resource index
!              */
!             res_id_index++;

!             results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
!             results->tup_num = 0;
!             results->res_id_index = res_id_index;
!             results->res = res;

!             /*
!              * Append node to res_id to hold pointer to results.
!              * Needed by dblink_tok to access the data
!              */
!             append_res_ptr(results);

!             /*
!              * save pointer to results for the next function manager call
!              */
!             fcinfo->flinfo->fn_extra = (void *) results;

!             /* close the connection to the database and cleanup */
!             PQfinish(conn);

!             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
!             rsi->isDone = ExprMultipleResult;

!             PG_RETURN_INT32(res_id_index);
          }
      }
      else
***************
*** 165,173 ****
          /*
           * check for more results
           */
-
          results = fcinfo->flinfo->fn_extra;
          results->tup_num++;
          ntuples = PQntuples(results->res);

          if (results->tup_num < ntuples)
--- 121,130 ----
          /*
           * check for more results
           */
          results = fcinfo->flinfo->fn_extra;
+
          results->tup_num++;
+         res_id_index = results->res_id_index;
          ntuples = PQntuples(results->res);

          if (results->tup_num < ntuples)
***************
*** 179,196 ****
              rsi = (ReturnSetInfo *) fcinfo->resultinfo;
              rsi->isDone = ExprMultipleResult;

!             PG_RETURN_POINTER(results);
!
          }
          else
          {
              /*
               * or if no more, clean things up
               */
-
              results = fcinfo->flinfo->fn_extra;

              PQclear(results->res);

              rsi = (ReturnSetInfo *) fcinfo->resultinfo;
              rsi->isDone = ExprEndResult;
--- 136,154 ----
              rsi = (ReturnSetInfo *) fcinfo->resultinfo;
              rsi->isDone = ExprMultipleResult;

!             PG_RETURN_INT32(res_id_index);
          }
          else
          {
              /*
               * or if no more, clean things up
               */
              results = fcinfo->flinfo->fn_extra;

+             remove_res_ptr(results);
              PQclear(results->res);
+             pfree(results);
+             fcinfo->flinfo->fn_extra = NULL;

              rsi = (ReturnSetInfo *) fcinfo->resultinfo;
              rsi->isDone = ExprEndResult;
***************
*** 214,249 ****
  dblink_tok(PG_FUNCTION_ARGS)
  {
      dblink_results *results;
!     int            fldnum;
!     text       *result_text;
!     char       *result;
!     int            nfields = 0;
!     int            text_len = 0;
!
!     if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
!         elog(ERROR, "dblink: NULL arguments are not permitted");

!     results = (dblink_results *) PG_GETARG_POINTER(0);
      if (results == NULL)
!         elog(ERROR, "dblink: function called with invalid result pointer");

      fldnum = PG_GETARG_INT32(1);
      if (fldnum < 0)
!         elog(ERROR, "dblink: field number < 0 not permitted");

      nfields = PQnfields(results->res);
      if (fldnum > (nfields - 1))
!         elog(ERROR, "dblink: field number %d does not exist", fldnum);

      if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
-     {
-
          PG_RETURN_NULL();
-
-     }
      else
      {
-
          text_len = PQgetlength(results->res, results->tup_num, fldnum);

          result = (char *) palloc(text_len + 1);
--- 172,208 ----
  dblink_tok(PG_FUNCTION_ARGS)
  {
      dblink_results *results;
!     int                fldnum;
!     text            *result_text;
!     char            *result;
!     int                nfields = 0;
!     int                text_len = 0;

!     results = get_res_ptr(PG_GETARG_INT32(0));
      if (results == NULL)
!     {
!         if (res_id != NIL)
!         {
!             freeList(res_id);
!             res_id = NIL;
!             res_id_index = 0;
!         }
!
!         elog(ERROR, "dblink_tok: function called with invalid resource id");
!     }

      fldnum = PG_GETARG_INT32(1);
      if (fldnum < 0)
!         elog(ERROR, "dblink_tok: field number < 0 not permitted");

      nfields = PQnfields(results->res);
      if (fldnum > (nfields - 1))
!         elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);

      if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
          PG_RETURN_NULL();
      else
      {
          text_len = PQgetlength(results->res, results->tup_num, fldnum);

          result = (char *) palloc(text_len + 1);
***************
*** 259,270 ****
--- 218,838 ----
          result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));

          PG_RETURN_TEXT_P(result_text);
+     }
+ }
+
+
+ /*
+  * dblink_strtok
+  * parse input string
+  * return ord item (0 based)
+  * based on provided field separator
+  */
+ PG_FUNCTION_INFO_V1(dblink_strtok);
+ Datum
+ dblink_strtok(PG_FUNCTION_ARGS)
+ {
+     char        *fldtext;
+     char        *fldsep;
+     int            fldnum;
+     char        *buffer;
+     text        *result_text;
+
+     fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
+     fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
+     fldnum = PG_GETARG_INT32(2);
+
+     if (fldtext[0] == '\0')
+     {
+         elog(ERROR, "get_strtok: blank list not permitted");
+     }
+     if (fldsep[0] == '\0')
+     {
+         elog(ERROR, "get_strtok: blank field separator not permitted");
+     }

+     buffer = get_strtok(fldtext, fldsep, fldnum);
+
+     pfree(fldtext);
+     pfree(fldsep);
+
+     if (buffer == NULL)
+     {
+         PG_RETURN_NULL();
+     }
+     else
+     {
+         result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
+         pfree(buffer);
+
+         PG_RETURN_TEXT_P(result_text);
      }
  }


  /*
+  * dblink_get_pkey
+  *
+  * Return comma delimited list of primary key
+  * fields for the supplied relation,
+  * or NULL if none exists.
+  */
+ PG_FUNCTION_INFO_V1(dblink_get_pkey);
+ Datum
+ dblink_get_pkey(PG_FUNCTION_ARGS)
+ {
+     char                    *relname;
+     Oid                        relid;
+     char                    **result;
+     text                    *result_text;
+     int16                    numatts;
+     ReturnSetInfo            *rsi;
+     dblink_array_results    *ret_set;
+
+     if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
+         elog(ERROR, "dblink: function called in context that does not accept a set result");
+
+     if (fcinfo->flinfo->fn_extra == NULL)
+     {
+         relname = NameStr(*PG_GETARG_NAME(0));
+
+         /*
+          * Convert relname to rel OID.
+          */
+         relid = get_relid_from_relname(relname);
+         if (!OidIsValid(relid))
+             elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+                  relname);
+
+         /*
+          * get an array of attnums.
+          */
+         result = get_pkey_attnames(relid, &numatts);
+
+         if ((result != NULL) && (numatts > 0))
+         {
+             ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
+
+             ret_set->elem_num = 0;
+             ret_set->num_elems = numatts;
+             ret_set->res = result;
+
+             fcinfo->flinfo->fn_extra = (void *) ret_set;
+
+             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+             rsi->isDone = ExprMultipleResult;
+
+             result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
+
+             PG_RETURN_TEXT_P(result_text);
+         }
+         else
+         {
+             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+             rsi->isDone = ExprEndResult;
+
+             PG_RETURN_NULL();
+         }
+     }
+     else
+     {
+         /*
+          * check for more results
+          */
+         ret_set = fcinfo->flinfo->fn_extra;
+         ret_set->elem_num++;
+         result = ret_set->res;
+
+         if (ret_set->elem_num < ret_set->num_elems)
+         {
+             /*
+              * fetch next one
+              */
+             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+             rsi->isDone = ExprMultipleResult;
+
+             result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
+             PG_RETURN_TEXT_P(result_text);
+         }
+         else
+         {
+             int        i;
+
+             /*
+              * or if no more, clean things up
+              */
+             for (i = 0; i < ret_set->num_elems; i++)
+                 pfree(result[i]);
+
+             pfree(ret_set->res);
+             pfree(ret_set);
+
+             rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+             rsi->isDone = ExprEndResult;
+
+             PG_RETURN_NULL();
+         }
+     }
+     PG_RETURN_NULL();
+ }
+
+
+ /*
+  * dblink_last_oid
+  * return last inserted oid
+  */
+ PG_FUNCTION_INFO_V1(dblink_last_oid);
+ Datum
+ dblink_last_oid(PG_FUNCTION_ARGS)
+ {
+     dblink_results *results;
+
+     results = get_res_ptr(PG_GETARG_INT32(0));
+     if (results == NULL)
+     {
+         if (res_id != NIL)
+         {
+             freeList(res_id);
+             res_id = NIL;
+             res_id_index = 0;
+         }
+
+         elog(ERROR, "dblink_tok: function called with invalid resource id");
+     }
+
+     PG_RETURN_OID(PQoidValue(results->res));
+ }
+
+
+ /*
+  * dblink_build_sql_insert
+  *
+  * Used to generate an SQL insert statement
+  * based on an existing tuple in a local relation.
+  * This is useful for selectively replicating data
+  * to another server via dblink.
+  *
+  * API:
+  * <relname> - name of local table of interest
+  * <pkattnums> - an int2vector of attnums which will be used
+  * to identify the local tuple of interest
+  * <pknumatts> - number of attnums in pkattnums
+  * <src_pkattvals_arry> - text array of key values which will be used
+  * to identify the local tuple of interest
+  * <tgt_pkattvals_arry> - text array of key values which will be used
+  * to build the string for execution remotely. These are substituted
+  * for their counterparts in src_pkattvals_arry
+  */
+ PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
+ Datum
+ dblink_build_sql_insert(PG_FUNCTION_ARGS)
+ {
+     Oid            relid;
+     char        *relname;
+     int16        *pkattnums;
+     int16        pknumatts;
+     char        **src_pkattvals;
+     char        **tgt_pkattvals;
+     ArrayType    *src_pkattvals_arry;
+     ArrayType    *tgt_pkattvals_arry;
+     int            src_ndim;
+     int            *src_dim;
+     int            src_nitems;
+     int            tgt_ndim;
+     int            *tgt_dim;
+     int            tgt_nitems;
+     int            i;
+     char        *ptr;
+     char        *sql;
+     text        *sql_text;
+
+     relname = NameStr(*PG_GETARG_NAME(0));
+
+     /*
+      * Convert relname to rel OID.
+      */
+     relid = get_relid_from_relname(relname);
+     if (!OidIsValid(relid))
+         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+              relname);
+
+     pkattnums = (int16 *) PG_GETARG_POINTER(1);
+     pknumatts = PG_GETARG_INT16(2);
+     /*
+      * There should be at least one key attribute
+      */
+     if (pknumatts == 0)
+         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
+
+     src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
+     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
+
+     /*
+      * Source array is made up of key values that will be used to
+      * locate the tuple of interest from the local system.
+      */
+     src_ndim = ARR_NDIM(src_pkattvals_arry);
+     src_dim = ARR_DIMS(src_pkattvals_arry);
+     src_nitems = ArrayGetNItems(src_ndim, src_dim);
+
+     /*
+      * There should be one source array key value for each key attnum
+      */
+     if (src_nitems != pknumatts)
+         elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
+
+     /*
+      * get array of pointers to c-strings from the input source array
+      */
+     src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
+     ptr = ARR_DATA_PTR(src_pkattvals_arry);
+     for (i = 0; i < src_nitems; i++)
+     {
+         src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+         ptr += INTALIGN(*(int32 *) ptr);
+     }
+
+     /*
+      * Target array is made up of key values that will be used to
+      * build the SQL string for use on the remote system.
+      */
+     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
+     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
+     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
+
+     /*
+      * There should be one target array key value for each key attnum
+      */
+     if (tgt_nitems != pknumatts)
+         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
+
+     /*
+      * get array of pointers to c-strings from the input target array
+      */
+     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
+     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
+     for (i = 0; i < tgt_nitems; i++)
+     {
+         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+         ptr += INTALIGN(*(int32 *) ptr);
+     }
+
+     /*
+      * Prep work is finally done. Go get the SQL string.
+      */
+     sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+     /*
+      * Make it into TEXT for return to the client
+      */
+     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
+
+     /*
+      * And send it
+      */
+     PG_RETURN_TEXT_P(sql_text);
+ }
+
+
+ /*
+  * dblink_build_sql_delete
+  *
+  * Used to generate an SQL delete statement.
+  * This is useful for selectively replicating a
+  * delete to another server via dblink.
+  *
+  * API:
+  * <relname> - name of remote table of interest
+  * <pkattnums> - an int2vector of attnums which will be used
+  * to identify the remote tuple of interest
+  * <pknumatts> - number of attnums in pkattnums
+  * <tgt_pkattvals_arry> - text array of key values which will be used
+  * to build the string for execution remotely.
+  */
+ PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
+ Datum
+ dblink_build_sql_delete(PG_FUNCTION_ARGS)
+ {
+     Oid            relid;
+     char        *relname;
+     int16        *pkattnums;
+     int16        pknumatts;
+     char        **tgt_pkattvals;
+     ArrayType    *tgt_pkattvals_arry;
+     int            tgt_ndim;
+     int            *tgt_dim;
+     int            tgt_nitems;
+     int            i;
+     char        *ptr;
+     char        *sql;
+     text        *sql_text;
+
+     relname = NameStr(*PG_GETARG_NAME(0));
+
+     /*
+      * Convert relname to rel OID.
+      */
+     relid = get_relid_from_relname(relname);
+     if (!OidIsValid(relid))
+         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+              relname);
+
+     pkattnums = (int16 *) PG_GETARG_POINTER(1);
+     pknumatts = PG_GETARG_INT16(2);
+     /*
+      * There should be at least one key attribute
+      */
+     if (pknumatts == 0)
+         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
+
+     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
+
+     /*
+      * Target array is made up of key values that will be used to
+      * build the SQL string for use on the remote system.
+      */
+     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
+     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
+     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
+
+     /*
+      * There should be one target array key value for each key attnum
+      */
+     if (tgt_nitems != pknumatts)
+         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
+
+     /*
+      * get array of pointers to c-strings from the input target array
+      */
+     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
+     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
+     for (i = 0; i < tgt_nitems; i++)
+     {
+         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+         ptr += INTALIGN(*(int32 *) ptr);
+     }
+
+     /*
+      * Prep work is finally done. Go get the SQL string.
+      */
+     sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
+
+     /*
+      * Make it into TEXT for return to the client
+      */
+     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
+
+     /*
+      * And send it
+      */
+     PG_RETURN_TEXT_P(sql_text);
+ }
+
+
+ /*
+  * dblink_build_sql_update
+  *
+  * Used to generate an SQL update statement
+  * based on an existing tuple in a local relation.
+  * This is useful for selectively replicating data
+  * to another server via dblink.
+  *
+  * API:
+  * <relname> - name of local table of interest
+  * <pkattnums> - an int2vector of attnums which will be used
+  * to identify the local tuple of interest
+  * <pknumatts> - number of attnums in pkattnums
+  * <src_pkattvals_arry> - text array of key values which will be used
+  * to identify the local tuple of interest
+  * <tgt_pkattvals_arry> - text array of key values which will be used
+  * to build the string for execution remotely. These are substituted
+  * for their counterparts in src_pkattvals_arry
+  */
+ PG_FUNCTION_INFO_V1(dblink_build_sql_update);
+ Datum
+ dblink_build_sql_update(PG_FUNCTION_ARGS)
+ {
+     Oid            relid;
+     char        *relname;
+     int16        *pkattnums;
+     int16        pknumatts;
+     char        **src_pkattvals;
+     char        **tgt_pkattvals;
+     ArrayType    *src_pkattvals_arry;
+     ArrayType    *tgt_pkattvals_arry;
+     int            src_ndim;
+     int            *src_dim;
+     int            src_nitems;
+     int            tgt_ndim;
+     int            *tgt_dim;
+     int            tgt_nitems;
+     int            i;
+     char        *ptr;
+     char        *sql;
+     text        *sql_text;
+
+     relname = NameStr(*PG_GETARG_NAME(0));
+
+     /*
+      * Convert relname to rel OID.
+      */
+     relid = get_relid_from_relname(relname);
+     if (!OidIsValid(relid))
+         elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+              relname);
+
+     pkattnums = (int16 *) PG_GETARG_POINTER(1);
+     pknumatts = PG_GETARG_INT16(2);
+     /*
+      * There should be one source array key values for each key attnum
+      */
+     if (pknumatts == 0)
+         elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
+
+     src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
+     tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
+
+     /*
+      * Source array is made up of key values that will be used to
+      * locate the tuple of interest from the local system.
+      */
+     src_ndim = ARR_NDIM(src_pkattvals_arry);
+     src_dim = ARR_DIMS(src_pkattvals_arry);
+     src_nitems = ArrayGetNItems(src_ndim, src_dim);
+
+     /*
+      * There should be one source array key value for each key attnum
+      */
+     if (src_nitems != pknumatts)
+         elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
+
+     /*
+      * get array of pointers to c-strings from the input source array
+      */
+     src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
+     ptr = ARR_DATA_PTR(src_pkattvals_arry);
+     for (i = 0; i < src_nitems; i++)
+     {
+         src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+         ptr += INTALIGN(*(int32 *) ptr);
+     }
+
+     /*
+      * Target array is made up of key values that will be used to
+      * build the SQL string for use on the remote system.
+      */
+     tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
+     tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
+     tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
+
+     /*
+      * There should be one target array key value for each key attnum
+      */
+     if (tgt_nitems != pknumatts)
+         elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
+
+     /*
+      * get array of pointers to c-strings from the input target array
+      */
+     tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
+     ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
+     for (i = 0; i < tgt_nitems; i++)
+     {
+         tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+         ptr += INTALIGN(*(int32 *) ptr);
+     }
+
+     /*
+      * Prep work is finally done. Go get the SQL string.
+      */
+     sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+     /*
+      * Make it into TEXT for return to the client
+      */
+     sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
+
+     /*
+      * And send it
+      */
+     PG_RETURN_TEXT_P(sql_text);
+ }
+
+
+ /*
+  * dblink_current_query
+  * return the current query string
+  * to allow its use in (among other things)
+  * rewrite rules
+  */
+ PG_FUNCTION_INFO_V1(dblink_current_query);
+ Datum
+ dblink_current_query(PG_FUNCTION_ARGS)
+ {
+     text        *result_text;
+
+     result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
+     PG_RETURN_TEXT_P(result_text);
+ }
+
+
+ /*
+  * dblink_replace_text
+  * replace all occurences of 'old_sub_str' in 'orig_str'
+  * with 'new_sub_str' to form 'new_str'
+  *
+  * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
+  * otherwise returns 'new_str'
+  */
+ PG_FUNCTION_INFO_V1(dblink_replace_text);
+ Datum
+ dblink_replace_text(PG_FUNCTION_ARGS)
+ {
+     text        *left_text;
+     text        *right_text;
+     text        *buf_text;
+     text        *ret_text;
+     char        *ret_str;
+     int            curr_posn;
+     text        *src_text = PG_GETARG_TEXT_P(0);
+     int            src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
+     text        *from_sub_text = PG_GETARG_TEXT_P(1);
+     int            from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
+     text        *to_sub_text = PG_GETARG_TEXT_P(2);
+     char        *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
+     StringInfo    str = makeStringInfo();
+
+     if (src_text_len == 0 || from_sub_text_len == 0)
+         PG_RETURN_TEXT_P(src_text);
+
+     buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
+     curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text),
PointerGetDatum(from_sub_text)));
+
+     while (curr_posn > 0)
+     {
+         left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1,
DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1)); 
+         right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text),
DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) +
from_sub_text_len,-1)); 
+
+         appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
+         appendStringInfo(str, to_sub_str);
+
+         pfree(buf_text);
+         pfree(left_text);
+         buf_text = right_text;
+         curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text),
PointerGetDatum(from_sub_text)));
+     }
+
+     appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
+     pfree(buf_text);
+
+     ret_str = pstrdup(str->data);
+     ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
+
+     PG_RETURN_TEXT_P(ret_text);
+ }
+
+
+ /*************************************************************
   * internal functions
   */

***************
*** 285,293 ****
--- 853,1408 ----
      MemSet(retval, 0, sizeof(dblink_results));

      retval->tup_num = -1;
+     retval->res_id_index =-1;
      retval->res = NULL;

      MemoryContextSwitchTo(oldcontext);

      return retval;
  }
+
+
+ /*
+  * init_dblink_array_results
+  *     - create an empty dblink_array_results data structure
+  */
+ dblink_array_results *
+ init_dblink_array_results(MemoryContext fn_mcxt)
+ {
+     MemoryContext oldcontext;
+     dblink_array_results *retval;
+
+     oldcontext = MemoryContextSwitchTo(fn_mcxt);
+
+     retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
+     MemSet(retval, 0, sizeof(dblink_array_results));
+
+     retval->elem_num = -1;
+     retval->num_elems = 0;
+     retval->res = NULL;
+
+     MemoryContextSwitchTo(oldcontext);
+
+     return retval;
+ }
+
+ /*
+  * get_pkey_attnames
+  *
+  * Get the primary key attnames for the given relation.
+  * Return NULL, and set numatts = 0, if no primary key exists.
+  */
+ char **
+ get_pkey_attnames(Oid relid, int16 *numatts)
+ {
+     Relation        indexRelation;
+     ScanKeyData        entry;
+     HeapScanDesc    scan;
+     HeapTuple        indexTuple;
+     int                i;
+     char            **result = NULL;
+     Relation        rel;
+     TupleDesc        tupdesc;
+
+     /*
+      * Open relation using relid, get tupdesc
+      */
+     rel = relation_open(relid, AccessShareLock);
+     tupdesc = rel->rd_att;
+
+     /*
+      * Initialize numatts to 0 in case no primary key
+      * exists
+      */
+     *numatts = 0;
+
+     /*
+      * Use relid to get all related indexes
+      */
+     indexRelation = heap_openr(IndexRelationName, AccessShareLock);
+     ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
+                            F_OIDEQ, ObjectIdGetDatum(relid));
+     scan = heap_beginscan(indexRelation, false, SnapshotNow,
+                           1, &entry);
+
+     while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0)))
+     {
+         Form_pg_index    index = (Form_pg_index) GETSTRUCT(indexTuple);
+
+         /*
+          * We're only interested if it is the primary key
+          */
+         if (index->indisprimary == TRUE)
+         {
+             i = 0;
+             while (index->indkey[i++] != 0)
+                 (*numatts)++;
+
+             if (*numatts > 0)
+             {
+                 result = (char **) palloc(*numatts * sizeof(char *));
+                 for (i = 0; i < *numatts; i++)
+                     result[i] = SPI_fname(tupdesc, index->indkey[i]);
+             }
+             break;
+         }
+     }
+     heap_endscan(scan);
+     heap_close(indexRelation, AccessShareLock);
+     relation_close(rel, AccessShareLock);
+
+     return result;
+ }
+
+
+ /*
+  * get_strtok
+  *
+  * parse input string
+  * return ord item (0 based)
+  * based on provided field separator
+  */
+ char *
+ get_strtok(char *fldtext, char *fldsep, int fldnum)
+ {
+     int            j = 0;
+     char        *result;
+
+     if (fldnum < 0)
+     {
+         elog(ERROR, "get_strtok: field number < 0 not permitted");
+     }
+
+     if (fldsep[0] == '\0')
+     {
+         elog(ERROR, "get_strtok: blank field separator not permitted");
+     }
+
+     result = strtok(fldtext, fldsep);
+     for (j = 1; j < fldnum + 1; j++)
+     {
+         result = strtok(NULL, fldsep);
+         if (result == NULL)
+             return NULL;
+     }
+
+     return pstrdup(result);
+ }
+
+ char *
+ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+ {
+     Relation        rel;
+     char            *relname;
+     HeapTuple        tuple;
+     TupleDesc        tupdesc;
+     int                natts;
+     StringInfo        str = makeStringInfo();
+     char            *sql = NULL;
+     char            *val = NULL;
+     int16            key;
+     unsigned int    i;
+
+     /*
+      * Open relation using relid
+      */
+     rel = relation_open(relid, AccessShareLock);
+     relname =  RelationGetRelationName(rel);
+     tupdesc = rel->rd_att;
+     natts = tupdesc->natts;
+
+     tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+
+     appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
+     for (i = 0; i < natts; i++)
+     {
+         if (i > 0)
+             appendStringInfo(str, ",");
+
+         appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
+     }
+
+     appendStringInfo(str, ") VALUES(");
+
+     /*
+      * remember attvals are 1 based
+      */
+     for (i = 0; i < natts; i++)
+     {
+         if (i > 0)
+             appendStringInfo(str, ",");
+
+         if (tgt_pkattvals != NULL)
+             key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
+         else
+             key = -1;
+
+         if (key > -1)
+             val = pstrdup(tgt_pkattvals[key]);
+         else
+             val = SPI_getvalue(tuple, tupdesc, i + 1);
+
+         if (val != NULL)
+         {
+             appendStringInfo(str, quote_literal_cstr(val));
+             pfree(val);
+         }
+         else
+             appendStringInfo(str, "NULL");
+     }
+     appendStringInfo(str, ")");
+
+     sql = pstrdup(str->data);
+     pfree(str->data);
+     pfree(str);
+     relation_close(rel, AccessShareLock);
+
+     return (sql);
+ }
+
+ char *
+ get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
+ {
+     Relation        rel;
+     char            *relname;
+     TupleDesc        tupdesc;
+     int                natts;
+     StringInfo        str = makeStringInfo();
+     char            *sql = NULL;
+     char            *val = NULL;
+     unsigned int    i;
+
+     /*
+      * Open relation using relid
+      */
+     rel = relation_open(relid, AccessShareLock);
+     relname =  RelationGetRelationName(rel);
+     tupdesc = rel->rd_att;
+     natts = tupdesc->natts;
+
+     appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
+     for (i = 0; i < pknumatts; i++)
+     {
+         int16    pkattnum = pkattnums[i];
+
+         if (i > 0)
+             appendStringInfo(str, " AND ");
+
+         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
+
+         if (tgt_pkattvals != NULL)
+             val = pstrdup(tgt_pkattvals[i]);
+         else
+             elog(ERROR, "Target key array must not be NULL");
+
+         if (val != NULL)
+         {
+             appendStringInfo(str, "=");
+             appendStringInfo(str, quote_literal_cstr(val));
+             pfree(val);
+         }
+         else
+             appendStringInfo(str, "IS NULL");
+     }
+
+     sql = pstrdup(str->data);
+     pfree(str->data);
+     pfree(str);
+     relation_close(rel, AccessShareLock);
+
+     return (sql);
+ }
+
+ char *
+ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+ {
+     Relation        rel;
+     char            *relname;
+     HeapTuple        tuple;
+     TupleDesc        tupdesc;
+     int                natts;
+     StringInfo        str = makeStringInfo();
+     char            *sql = NULL;
+     char            *val = NULL;
+     int16            key;
+     int                i;
+
+     /*
+      * Open relation using relid
+      */
+     rel = relation_open(relid, AccessShareLock);
+     relname =  RelationGetRelationName(rel);
+     tupdesc = rel->rd_att;
+     natts = tupdesc->natts;
+
+     tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+
+     appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
+
+     for (i = 0; i < natts; i++)
+     {
+         if (i > 0)
+             appendStringInfo(str, ",");
+
+         appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
+         appendStringInfo(str, "=");
+
+         if (tgt_pkattvals != NULL)
+             key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
+         else
+             key = -1;
+
+         if (key > -1)
+             val = pstrdup(tgt_pkattvals[key]);
+         else
+             val = SPI_getvalue(tuple, tupdesc, i + 1);
+
+         if (val != NULL)
+         {
+             appendStringInfo(str, quote_literal_cstr(val));
+             pfree(val);
+         }
+         else
+             appendStringInfo(str, "NULL");
+     }
+
+     appendStringInfo(str, " WHERE ");
+
+     for (i = 0; i < pknumatts; i++)
+     {
+         int16    pkattnum = pkattnums[i];
+
+         if (i > 0)
+             appendStringInfo(str, " AND ");
+
+         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
+
+         if (tgt_pkattvals != NULL)
+             val = pstrdup(tgt_pkattvals[i]);
+         else
+             val = SPI_getvalue(tuple, tupdesc, pkattnum);
+
+         if (val != NULL)
+         {
+             appendStringInfo(str, "=");
+             appendStringInfo(str, quote_literal_cstr(val));
+             pfree(val);
+         }
+         else
+             appendStringInfo(str, "IS NULL");
+     }
+
+     sql = pstrdup(str->data);
+     pfree(str->data);
+     pfree(str);
+     relation_close(rel, AccessShareLock);
+
+     return (sql);
+ }
+
+ /*
+  * Return a properly quoted literal value.
+  * Uses quote_literal in quote.c
+  */
+ static char *
+ quote_literal_cstr(char *rawstr)
+ {
+     text        *rawstr_text;
+     text        *result_text;
+     char        *result;
+
+     rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
+     result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
+     result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
+
+     return result;
+ }
+
+ /*
+  * Return a properly quoted identifier.
+  * Uses quote_ident in quote.c
+  */
+ static char *
+ quote_ident_cstr(char *rawstr)
+ {
+     text        *rawstr_text;
+     text        *result_text;
+     char        *result;
+
+     rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
+     result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
+     result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
+
+     return result;
+ }
+
+ int16
+ get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
+ {
+     int        i;
+
+     /*
+      * Not likely a long list anyway, so just scan for
+      * the value
+      */
+     for (i = 0; i < pknumatts; i++)
+         if (key == pkattnums[i])
+             return i;
+
+     return -1;
+ }
+
+ HeapTuple
+ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
+ {
+     Relation        rel;
+     char            *relname;
+     TupleDesc        tupdesc;
+     StringInfo        str = makeStringInfo();
+     char            *sql = NULL;
+     int                ret;
+     HeapTuple        tuple;
+     int                i;
+     char            *val = NULL;
+
+     /*
+      * Open relation using relid
+      */
+     rel = relation_open(relid, AccessShareLock);
+     relname =  RelationGetRelationName(rel);
+     tupdesc = rel->rd_att;
+
+     /*
+      * Connect to SPI manager
+      */
+     if ((ret = SPI_connect()) < 0)
+         elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret);
+
+     /*
+      * Build sql statement to look up tuple of interest
+      * Use src_pkattvals as the criteria.
+      */
+     appendStringInfo(str, "SELECT * from %s WHERE ", relname);
+
+     for (i = 0; i < pknumatts; i++)
+     {
+         int16    pkattnum = pkattnums[i];
+
+         if (i > 0)
+             appendStringInfo(str, " AND ");
+
+         appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
+
+         val = pstrdup(src_pkattvals[i]);
+         if (val != NULL)
+         {
+             appendStringInfo(str, "=");
+             appendStringInfo(str, quote_literal_cstr(val));
+             pfree(val);
+         }
+         else
+             appendStringInfo(str, "IS NULL");
+     }
+
+     sql = pstrdup(str->data);
+     pfree(str->data);
+     pfree(str);
+     /*
+      * Retrieve the desired tuple
+      */
+     ret = SPI_exec(sql, 0);
+     pfree(sql);
+
+     /*
+      * Only allow one qualifying tuple
+      */
+     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
+     {
+         elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record.");
+     }
+     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
+     {
+         SPITupleTable *tuptable = SPI_tuptable;
+         tuple = SPI_copytuple(tuptable->vals[0]);
+
+         return tuple;
+     }
+     else
+     {
+         /*
+          * no qualifying tuples
+          */
+         return NULL;
+     }
+
+     /*
+      * never reached, but keep compiler quiet
+      */
+     return NULL;
+ }
+
+ Oid
+ get_relid_from_relname(char *relname)
+ {
+ #ifdef NamespaceRelationName
+     Oid                relid;
+
+     relid = RelnameGetRelid(relname);
+ #else
+     Relation        rel;
+     Oid                relid;
+
+     rel = relation_openr(relname, AccessShareLock);
+     relid = RelationGetRelid(rel);
+     relation_close(rel, AccessShareLock);
+ #endif   /* NamespaceRelationName */
+
+     return relid;
+ }
+
+ dblink_results    *
+ get_res_ptr(int32 res_id_index)
+ {
+     List    *ptr;
+
+     /*
+      * short circuit empty list
+      */
+     if(res_id == NIL)
+         return NULL;
+
+     /*
+      * OK, should be good to go
+      */
+     foreach(ptr, res_id)
+     {
+         dblink_results    *this_res_id = (dblink_results *) lfirst(ptr);
+         if (this_res_id->res_id_index == res_id_index)
+             return this_res_id;
+     }
+     return NULL;
+ }
+
+ /*
+  * Add node to global List res_id
+  */
+ void
+ append_res_ptr(dblink_results *results)
+ {
+     res_id = lappend(res_id, results);
+ }
+
+ /*
+  * Remove node from global List
+  * using res_id_index
+  */
+ void
+ remove_res_ptr(dblink_results *results)
+ {
+     res_id = lremove(results, res_id);
+
+     if (res_id == NIL)
+         res_id_index = 0;
+ }
+
+
diff -cNr dblink.orig/dblink.h dblink/dblink.h
*** dblink.orig/dblink.h    Mon Nov  5 09:46:22 2001
--- dblink/dblink.h    Sun Apr 14 18:54:39 2002
***************
*** 3,9 ****
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
--- 3,10 ----
   *
   * Functions returning results from a remote database
   *
!  * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
!  * ALL RIGHTS RESERVED;
   *
   * Permission to use, copy, modify, and distribute this software and its
   * documentation for any purpose, without fee, and without a written agreement
***************
*** 33,42 ****
--- 34,64 ----
  #include "libpq-int.h"
  #include "fmgr.h"
  #include "access/tupdesc.h"
+ #include "access/heapam.h"
+ #include "catalog/catname.h"
+ #include "catalog/pg_index.h"
+ #include "catalog/pg_type.h"
  #include "executor/executor.h"
+ #include "executor/spi.h"
+ #include "lib/stringinfo.h"
  #include "nodes/nodes.h"
  #include "nodes/execnodes.h"
+ #include "nodes/pg_list.h"
+ #include "parser/parse_type.h"
+ #include "tcop/tcopprot.h"
  #include "utils/builtins.h"
+ #include "utils/fmgroids.h"
+ #include "utils/array.h"
+ #include "utils/syscache.h"
+
+ #ifdef NamespaceRelationName
+ #include "catalog/namespace.h"
+ #endif   /* NamespaceRelationName */
+
+ /*
+  * Max SQL statement size
+  */
+ #define DBLINK_MAX_SQLSTATE_SIZE        16384

  /*
   * This struct holds the results of the remote query.
***************
*** 50,70 ****
      int            tup_num;

      /*
       * the actual query results
       */
      PGresult   *res;
-
  }    dblink_results;

  /*
   * External declarations
   */
  extern Datum dblink(PG_FUNCTION_ARGS);
  extern Datum dblink_tok(PG_FUNCTION_ARGS);

  /*
   * Internal declarations
   */
  dblink_results *init_dblink_results(MemoryContext fn_mcxt);

  #endif   /* DBLINK_H */
--- 72,145 ----
      int            tup_num;

      /*
+      * resource index number for this context
+      */
+     int            res_id_index;
+
+     /*
       * the actual query results
       */
      PGresult   *res;
  }    dblink_results;

+
+ /*
+  * This struct holds results in the form of an array.
+  * Use fn_extra to hold a pointer to it across calls
+  */
+ typedef struct
+ {
+     /*
+      * elem being accessed
+      */
+     int            elem_num;
+
+     /*
+      * number of elems
+      */
+     int            num_elems;
+
+     /*
+      * the actual array
+      */
+     void        *res;
+
+ }    dblink_array_results;
+
  /*
   * External declarations
   */
  extern Datum dblink(PG_FUNCTION_ARGS);
  extern Datum dblink_tok(PG_FUNCTION_ARGS);
+ extern Datum dblink_strtok(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
+ extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
+ extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
+ extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
+ extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
+ extern Datum dblink_current_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_replace_text(PG_FUNCTION_ARGS);

  /*
   * Internal declarations
   */
  dblink_results *init_dblink_results(MemoryContext fn_mcxt);
+ dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
+ char **get_pkey_attnames(Oid relid, int16 *numatts);
+ char *get_strtok(char *fldtext, char *fldsep, int fldnum);
+ char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber);
+ char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+ char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
+ char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+ static char *quote_literal_cstr(char *rawstr);
+ static char *quote_ident_cstr(char *rawstr);
+ int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
+ HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
+ Oid get_relid_from_relname(char *relname);
+ dblink_results    *get_res_ptr(int32 res_id_index);
+ void append_res_ptr(dblink_results *results);
+ void remove_res_ptr(dblink_results *results);
+
+ extern char    *debug_query_string;

  #endif   /* DBLINK_H */
diff -cNr dblink.orig/dblink.sql.in dblink/dblink.sql.in
*** dblink.orig/dblink.sql.in    Thu Jun 14 09:49:03 2001
--- dblink/dblink.sql.in    Fri Apr 12 14:36:49 2002
***************
*** 1,5 ****
! CREATE FUNCTION dblink (text,text) RETURNS setof int
!   AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c';

! CREATE FUNCTION dblink_tok (int,int) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c';
--- 1,38 ----
! CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
!   AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
!   WITH (isstrict);

! CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c'
!   WITH (iscachable, isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text
!   AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
!   AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c'
!   WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
!   AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';
!
! CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text
!   AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c'
!   WITH (iscachable, isstrict);

В списке pgsql-patches по дате отправления:

Предыдущее
От: Bruce Momjian
Дата:
Сообщение: Re: ANSI Compliant Inserts
Следующее
От: Tom Lane
Дата:
Сообщение: Re: Win32 Error descriptions + config