contrib/dblink update
| От | Joe Conway |
|---|---|
| Тема | contrib/dblink update |
| Дата | |
| Msg-id | 3CBA5374.1010400@joeconway.com обсуждение исходный текст |
| Ответы |
Re: contrib/dblink update
Re: contrib/dblink update Re: contrib/dblink update |
| Список | pgsql-patches |
Attached is an update to contrib/dblink. Please apply if there are no
objections.
Major changes:
- removed cursor wrap around input sql to allow for remote
execution of INSERT/UPDATE/DELETE
- dblink now returns a resource id instead of a real pointer
- added several utility functions
I'm still hoping to add explicit cursor open/fetch/close support before
7.3 is released, but I need a bit more time on that.
On a somewhat unrelated topic, I never got any feedback on the
unknownin/out patch and the mb_substring patch. Is there anything else I
need to do to get those applied?
Thanks,
Joe
diff -cNr dblink.orig/README.dblink dblink/README.dblink
*** dblink.orig/README.dblink Thu Dec 13 02:48:39 2001
--- dblink/README.dblink Sun Apr 14 20:02:06 2002
***************
*** 3,9 ****
*
* Functions returning results from a remote database
*
! * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
--- 3,10 ----
*
* Functions returning results from a remote database
*
! * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
! * ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
***************
*** 25,36 ****
*/
! Version 0.3 (14 June, 2001):
! Function to test returning data set from remote database
! Tested under Linux (Red Hat 6.2 and 7.0) and PostgreSQL 7.1 and 7.2devel
Release Notes:
Version 0.3
- fixed dblink invalid pointer causing corrupt elog message
- fixed dblink_tok improper handling of null results
--- 26,44 ----
*/
! Version 0.4 (7 April, 2002):
! Functions allowing remote database INSERT/UPDATE/DELETE/SELECT, and
! various utility functions.
! Tested under Linux (Red Hat 7.2) and PostgreSQL 7.2 and 7.3devel
Release Notes:
+ Version 0.4
+ - removed cursor wrap around input sql to allow for remote
+ execution of INSERT/UPDATE/DELETE
+ - dblink now returns a resource id instead of a real pointer
+ - added several utility functions -- see below
+
Version 0.3
- fixed dblink invalid pointer causing corrupt elog message
- fixed dblink_tok improper handling of null results
***************
*** 51,64 ****
installs following functions into database template1:
! dblink() - returns a pointer to results from remote query
! dblink_tok() - extracts and returns individual field results
Documentation
==================================================================
Name
! dblink -- Returns a pointer to a data set from a remote database
Synopsis
--- 59,94 ----
installs following functions into database template1:
! dblink(text,text) RETURNS setof int
! - returns a resource id for results from remote query
! dblink_tok(int,int) RETURNS text
! - extracts and returns individual field results
! dblink_strtok(text,text,int) RETURNS text
! - extracts and returns individual token from delimited text
! dblink_get_pkey(name) RETURNS setof text
! - returns the field names of a relation's primary key fields
! dblink_last_oid(int) RETURNS oid
! - returns the last inserted oid
! dblink_build_sql_insert(name,int2vector,int2,_text,_text) RETURNS text
! - builds an insert statement using a local tuple, replacing the
! selection key field values with alternate supplied values
! dblink_build_sql_delete(name,int2vector,int2,_text) RETURNS text
! - builds a delete statement using supplied values for selection
! key field values
! dblink_build_sql_update(name,int2vector,int2,_text,_text) RETURNS text
! - builds an update statement using a local tuple, replacing the
! selection key field values with alternate supplied values
! dblink_current_query() RETURNS text
! - returns the current query string
! dblink_replace(text,text,text) RETURNS text
! - replace all occurences of substring-a in the input-string
! with substring-b
Documentation
==================================================================
Name
! dblink -- Returns a resource id for a data set from a remote database
Synopsis
***************
*** 78,84 ****
Outputs
! Returns setof int (pointer)
Example usage
--- 108,114 ----
Outputs
! Returns setof int (res_id)
Example usage
***************
*** 94,106 ****
Synopsis
! dblink_tok(int pointer, int fnumber)
Inputs
! pointer
! a pointer returned by a call to dblink()
fnumber
--- 124,136 ----
Synopsis
! dblink_tok(int res_id, int fnumber)
Inputs
! res_id
! a resource id returned by a call to dblink()
fnumber
***************
*** 131,136 ****
--- 161,415 ----
select f1, f2 from myremotetable where f1 like 'bytea%';
==================================================================
+ Name
+
+ dblink_strtok -- Extracts and returns individual token from delimited text
+
+ Synopsis
+
+ dblink_strtok(text inputstring, text delimiter, int posn) RETURNS text
+
+ Inputs
+
+ inputstring
+
+ any string you want to parse a token out of;
+ e.g. 'f=1&g=3&h=4'
+
+ delimiter
+
+ a single character to use as the delimiter;
+ e.g. '&' or '='
+
+ posn
+
+ the position of the token of interest, 0 based;
+ e.g. 1
+
+ Outputs
+
+ Returns text
+
+ Example usage
+
+ test=# select dblink_strtok(dblink_strtok('f=1&g=3&h=4','&',1),'=',1);
+ dblink_strtok
+ ---------------
+ 3
+ (1 row)
+
+ ==================================================================
+ Name
+
+ dblink_get_pkey -- returns the field names of a relation's primary
+ key fields
+
+ Synopsis
+
+ dblink_get_pkey(name relname) RETURNS setof text
+
+ Inputs
+
+ relname
+
+ any relation name;
+ e.g. 'foobar'
+
+ Outputs
+
+ Returns setof text -- one row for each primary key field, in order of
+ precedence
+
+ Example usage
+
+ test=# select dblink_get_pkey('foobar');
+ dblink_get_pkey
+ -----------------
+ f1
+ f2
+ f3
+ f4
+ f5
+ (5 rows)
+
+
+ ==================================================================
+ Name
+
+ dblink_last_oid -- Returns last inserted oid
+
+ Synopsis
+
+ dblink_last_oid(int res_id) RETURNS oid
+
+ Inputs
+
+ res_id
+
+ any resource id returned by dblink function;
+
+ Outputs
+
+ Returns oid of last inserted tuple
+
+ Example usage
+
+ test=# select dblink_last_oid(dblink('hostaddr=127.0.0.1 port=5432 dbname=mydb user=postgres password=mypasswd'
+ ,'insert into mytable (f1, f2) values (1,2)'));
+
+ dblink_last_oid
+ ----------------
+ 16553
+ (1 row)
+
+
+ ==================================================================
+ Name
+
+ dblink_build_sql_insert -- builds an insert statement using a local
+ tuple, replacing the selection key field
+ values with alternate supplied values
+ dblink_build_sql_delete -- builds a delete statement using supplied
+ values for selection key field values
+ dblink_build_sql_update -- builds an update statement using a local
+ tuple, replacing the selection key field
+ values with alternate supplied values
+
+
+ Synopsis
+
+ dblink_build_sql_insert(name relname
+ ,int2vector primary_key_attnums
+ ,int2 num_primary_key_atts
+ ,_text src_pk_att_vals_array
+ ,_text tgt_pk_att_vals_array) RETURNS text
+ dblink_build_sql_delete(name relname
+ ,int2vector primary_key_attnums
+ ,int2 num_primary_key_atts
+ ,_text tgt_pk_att_vals_array) RETURNS text
+ dblink_build_sql_update(name relname
+ ,int2vector primary_key_attnums
+ ,int2 num_primary_key_atts
+ ,_text src_pk_att_vals_array
+ ,_text tgt_pk_att_vals_array) RETURNS text
+
+ Inputs
+
+ relname
+
+ any relation name;
+ e.g. 'foobar'
+
+ primary_key_attnums
+
+ vector of primary key attnums (1 based, see pg_index.indkey);
+ e.g. '1 2'
+
+ num_primary_key_atts
+
+ number of primary key attnums in the vector; e.g. 2
+
+ src_pk_att_vals_array
+
+ array of primary key values, used to look up the local matching
+ tuple, the values of which are then used to construct the SQL
+ statement
+
+ tgt_pk_att_vals_array
+
+ array of primary key values, used to replace the local tuple
+ values in the SQL statement
+
+ Outputs
+
+ Returns text -- requested SQL statement
+
+ Example usage
+
+ test=# select dblink_build_sql_insert('foo','1 2',2,'{"1", "a"}','{"1", "b''a"}');
+ dblink_build_sql_insert
+ --------------------------------------------------
+ INSERT INTO foo(f1,f2,f3) VALUES('1','b''a','1')
+ (1 row)
+
+ test=# select dblink_build_sql_delete('MyFoo','1 2',2,'{"1", "b"}');
+ dblink_build_sql_delete
+ ---------------------------------------------
+ DELETE FROM "MyFoo" WHERE f1='1' AND f2='b'
+ (1 row)
+
+ test=# select dblink_build_sql_update('foo','1 2',2,'{"1", "a"}','{"1", "b"}');
+ dblink_build_sql_update
+ -------------------------------------------------------------
+ UPDATE foo SET f1='1',f2='b',f3='1' WHERE f1='1' AND f2='b'
+ (1 row)
+
+
+ ==================================================================
+ Name
+
+ dblink_current_query -- returns the current query string
+
+ Synopsis
+
+ dblink_current_query () RETURNS text
+
+ Inputs
+
+ None
+
+ Outputs
+
+ Returns text -- a copy of the currently executing query
+
+ Example usage
+
+ test=# select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where
proname= ''byteacat''') as f1) as t1;
+ dblink_current_query
+
-----------------------------------------------------------------------------------------------------------------------------------------------------
+ select dblink_current_query() from (select dblink('dbname=template1','select oid, proname from pg_proc where proname
=''byteacat''') as f1) as t1;
+ (1 row)
+
+
+ ==================================================================
+ Name
+
+ dblink_replace -- replace all occurences of substring-a in the
+ input-string with substring-b
+
+ Synopsis
+
+ dblink_replace(text input-string, text substring-a, text substring-b) RETURNS text
+
+ Inputs
+
+ input-string
+
+ the starting string, before replacement of substring-a
+
+ substring-a
+
+ the substring to find and replace
+
+ substring-b
+
+ the substring to be substituted in place of substring-a
+
+ Outputs
+
+ Returns text -- a copy of the starting string, but with all occurences of
+ substring-a replaced with substring-b
+
+ Example usage
+
+ test=# select dblink_replace('12345678901234567890','56','hello');
+ dblink_replace
+ ----------------------------
+ 1234hello78901234hello7890
+ (1 row)
+
+ ==================================================================
+
-- Joe Conway
diff -cNr dblink.orig/dblink.c dblink/dblink.c
*** dblink.orig/dblink.c Wed Oct 24 22:49:19 2001
--- dblink/dblink.c Sun Apr 14 20:03:30 2002
***************
*** 3,9 ****
*
* Functions returning results from a remote database
*
! * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
--- 3,10 ----
*
* Functions returning results from a remote database
*
! * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
! * ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
***************
*** 26,48 ****
#include "dblink.h"
PG_FUNCTION_INFO_V1(dblink);
Datum
dblink(PG_FUNCTION_ARGS)
{
! PGconn *conn = NULL;
! PGresult *res = NULL;
! dblink_results *results;
! char *optstr;
! char *sqlstatement;
! char *curstr = "DECLARE mycursor CURSOR FOR ";
! char *execstatement;
! char *msg;
! int ntuples = 0;
! ReturnSetInfo *rsi;
!
! if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
! elog(ERROR, "dblink: NULL arguments are not permitted");
if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
elog(ERROR, "dblink: function called in context that does not accept a set result");
--- 27,49 ----
#include "dblink.h"
+ /* Global */
+ List *res_id = NIL;
+ int res_id_index = 0;
+
PG_FUNCTION_INFO_V1(dblink);
Datum
dblink(PG_FUNCTION_ARGS)
{
! PGconn *conn = NULL;
! PGresult *res = NULL;
! dblink_results *results;
! char *optstr;
! char *sqlstatement;
! char *execstatement;
! char *msg;
! int ntuples = 0;
! ReturnSetInfo *rsi;
if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
elog(ERROR, "dblink: function called in context that does not accept a set result");
***************
*** 61,81 ****
elog(ERROR, "dblink: connection error: %s", msg);
}
! res = PQexec(conn, "BEGIN");
! if (PQresultStatus(res) != PGRES_COMMAND_OK)
! {
! msg = pstrdup(PQerrorMessage(conn));
! PQclear(res);
! PQfinish(conn);
! elog(ERROR, "dblink: begin error: %s", msg);
! }
! PQclear(res);
!
! execstatement = (char *) palloc(strlen(curstr) + strlen(sqlstatement) + 1);
if (execstatement != NULL)
{
! strcpy(execstatement, curstr);
! strcat(execstatement, sqlstatement);
strcat(execstatement, "\0");
}
else
--- 62,71 ----
elog(ERROR, "dblink: connection error: %s", msg);
}
! execstatement = (char *) palloc(strlen(sqlstatement) + 1);
if (execstatement != NULL)
{
! strcpy(execstatement, sqlstatement);
strcat(execstatement, "\0");
}
else
***************
*** 94,163 ****
/*
* got results, start fetching them
*/
- PQclear(res);
-
- res = PQexec(conn, "FETCH ALL in mycursor");
- if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- msg = pstrdup(PQerrorMessage(conn));
- PQclear(res);
- PQfinish(conn);
- elog(ERROR, "dblink: sql error: %s", msg);
- }
-
ntuples = PQntuples(res);
! if (ntuples > 0)
! {
!
! results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
! results->tup_num = 0;
! results->res = res;
! res = NULL;
!
! fcinfo->flinfo->fn_extra = (void *) results;
!
! results = NULL;
! results = fcinfo->flinfo->fn_extra;
!
! /* close the cursor */
! res = PQexec(conn, "CLOSE mycursor");
! PQclear(res);
!
! /* commit the transaction */
! res = PQexec(conn, "COMMIT");
! PQclear(res);
!
! /* close the connection to the database and cleanup */
! PQfinish(conn);
!
! rsi = (ReturnSetInfo *) fcinfo->resultinfo;
! rsi->isDone = ExprMultipleResult;
!
! PG_RETURN_POINTER(results);
!
! }
! else
! {
! PQclear(res);
! /* close the cursor */
! res = PQexec(conn, "CLOSE mycursor");
! PQclear(res);
! /* commit the transaction */
! res = PQexec(conn, "COMMIT");
! PQclear(res);
! /* close the connection to the database and cleanup */
! PQfinish(conn);
! rsi = (ReturnSetInfo *) fcinfo->resultinfo;
! rsi->isDone = ExprEndResult;
! PG_RETURN_NULL();
! }
}
}
else
--- 84,119 ----
/*
* got results, start fetching them
*/
ntuples = PQntuples(res);
! /*
! * increment resource index
! */
! res_id_index++;
! results = init_dblink_results(fcinfo->flinfo->fn_mcxt);
! results->tup_num = 0;
! results->res_id_index = res_id_index;
! results->res = res;
! /*
! * Append node to res_id to hold pointer to results.
! * Needed by dblink_tok to access the data
! */
! append_res_ptr(results);
! /*
! * save pointer to results for the next function manager call
! */
! fcinfo->flinfo->fn_extra = (void *) results;
! /* close the connection to the database and cleanup */
! PQfinish(conn);
! rsi = (ReturnSetInfo *) fcinfo->resultinfo;
! rsi->isDone = ExprMultipleResult;
! PG_RETURN_INT32(res_id_index);
}
}
else
***************
*** 165,173 ****
/*
* check for more results
*/
-
results = fcinfo->flinfo->fn_extra;
results->tup_num++;
ntuples = PQntuples(results->res);
if (results->tup_num < ntuples)
--- 121,130 ----
/*
* check for more results
*/
results = fcinfo->flinfo->fn_extra;
+
results->tup_num++;
+ res_id_index = results->res_id_index;
ntuples = PQntuples(results->res);
if (results->tup_num < ntuples)
***************
*** 179,196 ****
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
! PG_RETURN_POINTER(results);
!
}
else
{
/*
* or if no more, clean things up
*/
-
results = fcinfo->flinfo->fn_extra;
PQclear(results->res);
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
--- 136,154 ----
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprMultipleResult;
! PG_RETURN_INT32(res_id_index);
}
else
{
/*
* or if no more, clean things up
*/
results = fcinfo->flinfo->fn_extra;
+ remove_res_ptr(results);
PQclear(results->res);
+ pfree(results);
+ fcinfo->flinfo->fn_extra = NULL;
rsi = (ReturnSetInfo *) fcinfo->resultinfo;
rsi->isDone = ExprEndResult;
***************
*** 214,249 ****
dblink_tok(PG_FUNCTION_ARGS)
{
dblink_results *results;
! int fldnum;
! text *result_text;
! char *result;
! int nfields = 0;
! int text_len = 0;
!
! if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
! elog(ERROR, "dblink: NULL arguments are not permitted");
! results = (dblink_results *) PG_GETARG_POINTER(0);
if (results == NULL)
! elog(ERROR, "dblink: function called with invalid result pointer");
fldnum = PG_GETARG_INT32(1);
if (fldnum < 0)
! elog(ERROR, "dblink: field number < 0 not permitted");
nfields = PQnfields(results->res);
if (fldnum > (nfields - 1))
! elog(ERROR, "dblink: field number %d does not exist", fldnum);
if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
- {
-
PG_RETURN_NULL();
-
- }
else
{
-
text_len = PQgetlength(results->res, results->tup_num, fldnum);
result = (char *) palloc(text_len + 1);
--- 172,208 ----
dblink_tok(PG_FUNCTION_ARGS)
{
dblink_results *results;
! int fldnum;
! text *result_text;
! char *result;
! int nfields = 0;
! int text_len = 0;
! results = get_res_ptr(PG_GETARG_INT32(0));
if (results == NULL)
! {
! if (res_id != NIL)
! {
! freeList(res_id);
! res_id = NIL;
! res_id_index = 0;
! }
!
! elog(ERROR, "dblink_tok: function called with invalid resource id");
! }
fldnum = PG_GETARG_INT32(1);
if (fldnum < 0)
! elog(ERROR, "dblink_tok: field number < 0 not permitted");
nfields = PQnfields(results->res);
if (fldnum > (nfields - 1))
! elog(ERROR, "dblink_tok: field number %d does not exist", fldnum);
if (PQgetisnull(results->res, results->tup_num, fldnum) == 1)
PG_RETURN_NULL();
else
{
text_len = PQgetlength(results->res, results->tup_num, fldnum);
result = (char *) palloc(text_len + 1);
***************
*** 259,270 ****
--- 218,838 ----
result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result)));
PG_RETURN_TEXT_P(result_text);
+ }
+ }
+
+
+ /*
+ * dblink_strtok
+ * parse input string
+ * return ord item (0 based)
+ * based on provided field separator
+ */
+ PG_FUNCTION_INFO_V1(dblink_strtok);
+ Datum
+ dblink_strtok(PG_FUNCTION_ARGS)
+ {
+ char *fldtext;
+ char *fldsep;
+ int fldnum;
+ char *buffer;
+ text *result_text;
+
+ fldtext = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(0))));
+ fldsep = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(PG_GETARG_TEXT_P(1))));
+ fldnum = PG_GETARG_INT32(2);
+
+ if (fldtext[0] == '\0')
+ {
+ elog(ERROR, "get_strtok: blank list not permitted");
+ }
+ if (fldsep[0] == '\0')
+ {
+ elog(ERROR, "get_strtok: blank field separator not permitted");
+ }
+ buffer = get_strtok(fldtext, fldsep, fldnum);
+
+ pfree(fldtext);
+ pfree(fldsep);
+
+ if (buffer == NULL)
+ {
+ PG_RETURN_NULL();
+ }
+ else
+ {
+ result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(buffer)));
+ pfree(buffer);
+
+ PG_RETURN_TEXT_P(result_text);
}
}
/*
+ * dblink_get_pkey
+ *
+ * Return comma delimited list of primary key
+ * fields for the supplied relation,
+ * or NULL if none exists.
+ */
+ PG_FUNCTION_INFO_V1(dblink_get_pkey);
+ Datum
+ dblink_get_pkey(PG_FUNCTION_ARGS)
+ {
+ char *relname;
+ Oid relid;
+ char **result;
+ text *result_text;
+ int16 numatts;
+ ReturnSetInfo *rsi;
+ dblink_array_results *ret_set;
+
+ if (fcinfo->resultinfo == NULL || !IsA(fcinfo->resultinfo, ReturnSetInfo))
+ elog(ERROR, "dblink: function called in context that does not accept a set result");
+
+ if (fcinfo->flinfo->fn_extra == NULL)
+ {
+ relname = NameStr(*PG_GETARG_NAME(0));
+
+ /*
+ * Convert relname to rel OID.
+ */
+ relid = get_relid_from_relname(relname);
+ if (!OidIsValid(relid))
+ elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+ relname);
+
+ /*
+ * get an array of attnums.
+ */
+ result = get_pkey_attnames(relid, &numatts);
+
+ if ((result != NULL) && (numatts > 0))
+ {
+ ret_set = init_dblink_array_results(fcinfo->flinfo->fn_mcxt);
+
+ ret_set->elem_num = 0;
+ ret_set->num_elems = numatts;
+ ret_set->res = result;
+
+ fcinfo->flinfo->fn_extra = (void *) ret_set;
+
+ rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+ rsi->isDone = ExprMultipleResult;
+
+ result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
+
+ PG_RETURN_TEXT_P(result_text);
+ }
+ else
+ {
+ rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+ rsi->isDone = ExprEndResult;
+
+ PG_RETURN_NULL();
+ }
+ }
+ else
+ {
+ /*
+ * check for more results
+ */
+ ret_set = fcinfo->flinfo->fn_extra;
+ ret_set->elem_num++;
+ result = ret_set->res;
+
+ if (ret_set->elem_num < ret_set->num_elems)
+ {
+ /*
+ * fetch next one
+ */
+ rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+ rsi->isDone = ExprMultipleResult;
+
+ result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(result[ret_set->elem_num])));
+ PG_RETURN_TEXT_P(result_text);
+ }
+ else
+ {
+ int i;
+
+ /*
+ * or if no more, clean things up
+ */
+ for (i = 0; i < ret_set->num_elems; i++)
+ pfree(result[i]);
+
+ pfree(ret_set->res);
+ pfree(ret_set);
+
+ rsi = (ReturnSetInfo *) fcinfo->resultinfo;
+ rsi->isDone = ExprEndResult;
+
+ PG_RETURN_NULL();
+ }
+ }
+ PG_RETURN_NULL();
+ }
+
+
+ /*
+ * dblink_last_oid
+ * return last inserted oid
+ */
+ PG_FUNCTION_INFO_V1(dblink_last_oid);
+ Datum
+ dblink_last_oid(PG_FUNCTION_ARGS)
+ {
+ dblink_results *results;
+
+ results = get_res_ptr(PG_GETARG_INT32(0));
+ if (results == NULL)
+ {
+ if (res_id != NIL)
+ {
+ freeList(res_id);
+ res_id = NIL;
+ res_id_index = 0;
+ }
+
+ elog(ERROR, "dblink_tok: function called with invalid resource id");
+ }
+
+ PG_RETURN_OID(PQoidValue(results->res));
+ }
+
+
+ /*
+ * dblink_build_sql_insert
+ *
+ * Used to generate an SQL insert statement
+ * based on an existing tuple in a local relation.
+ * This is useful for selectively replicating data
+ * to another server via dblink.
+ *
+ * API:
+ * <relname> - name of local table of interest
+ * <pkattnums> - an int2vector of attnums which will be used
+ * to identify the local tuple of interest
+ * <pknumatts> - number of attnums in pkattnums
+ * <src_pkattvals_arry> - text array of key values which will be used
+ * to identify the local tuple of interest
+ * <tgt_pkattvals_arry> - text array of key values which will be used
+ * to build the string for execution remotely. These are substituted
+ * for their counterparts in src_pkattvals_arry
+ */
+ PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
+ Datum
+ dblink_build_sql_insert(PG_FUNCTION_ARGS)
+ {
+ Oid relid;
+ char *relname;
+ int16 *pkattnums;
+ int16 pknumatts;
+ char **src_pkattvals;
+ char **tgt_pkattvals;
+ ArrayType *src_pkattvals_arry;
+ ArrayType *tgt_pkattvals_arry;
+ int src_ndim;
+ int *src_dim;
+ int src_nitems;
+ int tgt_ndim;
+ int *tgt_dim;
+ int tgt_nitems;
+ int i;
+ char *ptr;
+ char *sql;
+ text *sql_text;
+
+ relname = NameStr(*PG_GETARG_NAME(0));
+
+ /*
+ * Convert relname to rel OID.
+ */
+ relid = get_relid_from_relname(relname);
+ if (!OidIsValid(relid))
+ elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+ relname);
+
+ pkattnums = (int16 *) PG_GETARG_POINTER(1);
+ pknumatts = PG_GETARG_INT16(2);
+ /*
+ * There should be at least one key attribute
+ */
+ if (pknumatts == 0)
+ elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
+
+ src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
+ tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
+
+ /*
+ * Source array is made up of key values that will be used to
+ * locate the tuple of interest from the local system.
+ */
+ src_ndim = ARR_NDIM(src_pkattvals_arry);
+ src_dim = ARR_DIMS(src_pkattvals_arry);
+ src_nitems = ArrayGetNItems(src_ndim, src_dim);
+
+ /*
+ * There should be one source array key value for each key attnum
+ */
+ if (src_nitems != pknumatts)
+ elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
+
+ /*
+ * get array of pointers to c-strings from the input source array
+ */
+ src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
+ ptr = ARR_DATA_PTR(src_pkattvals_arry);
+ for (i = 0; i < src_nitems; i++)
+ {
+ src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+ ptr += INTALIGN(*(int32 *) ptr);
+ }
+
+ /*
+ * Target array is made up of key values that will be used to
+ * build the SQL string for use on the remote system.
+ */
+ tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
+ tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
+ tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
+
+ /*
+ * There should be one target array key value for each key attnum
+ */
+ if (tgt_nitems != pknumatts)
+ elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
+
+ /*
+ * get array of pointers to c-strings from the input target array
+ */
+ tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
+ ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
+ for (i = 0; i < tgt_nitems; i++)
+ {
+ tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+ ptr += INTALIGN(*(int32 *) ptr);
+ }
+
+ /*
+ * Prep work is finally done. Go get the SQL string.
+ */
+ sql = get_sql_insert(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+ /*
+ * Make it into TEXT for return to the client
+ */
+ sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
+
+ /*
+ * And send it
+ */
+ PG_RETURN_TEXT_P(sql_text);
+ }
+
+
+ /*
+ * dblink_build_sql_delete
+ *
+ * Used to generate an SQL delete statement.
+ * This is useful for selectively replicating a
+ * delete to another server via dblink.
+ *
+ * API:
+ * <relname> - name of remote table of interest
+ * <pkattnums> - an int2vector of attnums which will be used
+ * to identify the remote tuple of interest
+ * <pknumatts> - number of attnums in pkattnums
+ * <tgt_pkattvals_arry> - text array of key values which will be used
+ * to build the string for execution remotely.
+ */
+ PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
+ Datum
+ dblink_build_sql_delete(PG_FUNCTION_ARGS)
+ {
+ Oid relid;
+ char *relname;
+ int16 *pkattnums;
+ int16 pknumatts;
+ char **tgt_pkattvals;
+ ArrayType *tgt_pkattvals_arry;
+ int tgt_ndim;
+ int *tgt_dim;
+ int tgt_nitems;
+ int i;
+ char *ptr;
+ char *sql;
+ text *sql_text;
+
+ relname = NameStr(*PG_GETARG_NAME(0));
+
+ /*
+ * Convert relname to rel OID.
+ */
+ relid = get_relid_from_relname(relname);
+ if (!OidIsValid(relid))
+ elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+ relname);
+
+ pkattnums = (int16 *) PG_GETARG_POINTER(1);
+ pknumatts = PG_GETARG_INT16(2);
+ /*
+ * There should be at least one key attribute
+ */
+ if (pknumatts == 0)
+ elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
+
+ tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
+
+ /*
+ * Target array is made up of key values that will be used to
+ * build the SQL string for use on the remote system.
+ */
+ tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
+ tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
+ tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
+
+ /*
+ * There should be one target array key value for each key attnum
+ */
+ if (tgt_nitems != pknumatts)
+ elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
+
+ /*
+ * get array of pointers to c-strings from the input target array
+ */
+ tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
+ ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
+ for (i = 0; i < tgt_nitems; i++)
+ {
+ tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+ ptr += INTALIGN(*(int32 *) ptr);
+ }
+
+ /*
+ * Prep work is finally done. Go get the SQL string.
+ */
+ sql = get_sql_delete(relid, pkattnums, pknumatts, tgt_pkattvals);
+
+ /*
+ * Make it into TEXT for return to the client
+ */
+ sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
+
+ /*
+ * And send it
+ */
+ PG_RETURN_TEXT_P(sql_text);
+ }
+
+
+ /*
+ * dblink_build_sql_update
+ *
+ * Used to generate an SQL update statement
+ * based on an existing tuple in a local relation.
+ * This is useful for selectively replicating data
+ * to another server via dblink.
+ *
+ * API:
+ * <relname> - name of local table of interest
+ * <pkattnums> - an int2vector of attnums which will be used
+ * to identify the local tuple of interest
+ * <pknumatts> - number of attnums in pkattnums
+ * <src_pkattvals_arry> - text array of key values which will be used
+ * to identify the local tuple of interest
+ * <tgt_pkattvals_arry> - text array of key values which will be used
+ * to build the string for execution remotely. These are substituted
+ * for their counterparts in src_pkattvals_arry
+ */
+ PG_FUNCTION_INFO_V1(dblink_build_sql_update);
+ Datum
+ dblink_build_sql_update(PG_FUNCTION_ARGS)
+ {
+ Oid relid;
+ char *relname;
+ int16 *pkattnums;
+ int16 pknumatts;
+ char **src_pkattvals;
+ char **tgt_pkattvals;
+ ArrayType *src_pkattvals_arry;
+ ArrayType *tgt_pkattvals_arry;
+ int src_ndim;
+ int *src_dim;
+ int src_nitems;
+ int tgt_ndim;
+ int *tgt_dim;
+ int tgt_nitems;
+ int i;
+ char *ptr;
+ char *sql;
+ text *sql_text;
+
+ relname = NameStr(*PG_GETARG_NAME(0));
+
+ /*
+ * Convert relname to rel OID.
+ */
+ relid = get_relid_from_relname(relname);
+ if (!OidIsValid(relid))
+ elog(ERROR, "dblink_get_pkey: relation \"%s\" does not exist",
+ relname);
+
+ pkattnums = (int16 *) PG_GETARG_POINTER(1);
+ pknumatts = PG_GETARG_INT16(2);
+ /*
+ * There should be one source array key values for each key attnum
+ */
+ if (pknumatts == 0)
+ elog(ERROR, "dblink_build_sql_insert: number of key attributes must be > 0.");
+
+ src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
+ tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
+
+ /*
+ * Source array is made up of key values that will be used to
+ * locate the tuple of interest from the local system.
+ */
+ src_ndim = ARR_NDIM(src_pkattvals_arry);
+ src_dim = ARR_DIMS(src_pkattvals_arry);
+ src_nitems = ArrayGetNItems(src_ndim, src_dim);
+
+ /*
+ * There should be one source array key value for each key attnum
+ */
+ if (src_nitems != pknumatts)
+ elog(ERROR, "dblink_build_sql_insert: source key array length does not match number of key attributes.");
+
+ /*
+ * get array of pointers to c-strings from the input source array
+ */
+ src_pkattvals = (char **) palloc(src_nitems * sizeof(char *));
+ ptr = ARR_DATA_PTR(src_pkattvals_arry);
+ for (i = 0; i < src_nitems; i++)
+ {
+ src_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+ ptr += INTALIGN(*(int32 *) ptr);
+ }
+
+ /*
+ * Target array is made up of key values that will be used to
+ * build the SQL string for use on the remote system.
+ */
+ tgt_ndim = ARR_NDIM(tgt_pkattvals_arry);
+ tgt_dim = ARR_DIMS(tgt_pkattvals_arry);
+ tgt_nitems = ArrayGetNItems(tgt_ndim, tgt_dim);
+
+ /*
+ * There should be one target array key value for each key attnum
+ */
+ if (tgt_nitems != pknumatts)
+ elog(ERROR, "dblink_build_sql_insert: target key array length does not match number of key attributes.");
+
+ /*
+ * get array of pointers to c-strings from the input target array
+ */
+ tgt_pkattvals = (char **) palloc(tgt_nitems * sizeof(char *));
+ ptr = ARR_DATA_PTR(tgt_pkattvals_arry);
+ for (i = 0; i < tgt_nitems; i++)
+ {
+ tgt_pkattvals[i] = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(ptr)));
+ ptr += INTALIGN(*(int32 *) ptr);
+ }
+
+ /*
+ * Prep work is finally done. Go get the SQL string.
+ */
+ sql = get_sql_update(relid, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
+
+ /*
+ * Make it into TEXT for return to the client
+ */
+ sql_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(sql)));
+
+ /*
+ * And send it
+ */
+ PG_RETURN_TEXT_P(sql_text);
+ }
+
+
+ /*
+ * dblink_current_query
+ * return the current query string
+ * to allow its use in (among other things)
+ * rewrite rules
+ */
+ PG_FUNCTION_INFO_V1(dblink_current_query);
+ Datum
+ dblink_current_query(PG_FUNCTION_ARGS)
+ {
+ text *result_text;
+
+ result_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(debug_query_string)));
+ PG_RETURN_TEXT_P(result_text);
+ }
+
+
+ /*
+ * dblink_replace_text
+ * replace all occurences of 'old_sub_str' in 'orig_str'
+ * with 'new_sub_str' to form 'new_str'
+ *
+ * returns 'orig_str' if 'old_sub_str' == '' or 'orig_str' == ''
+ * otherwise returns 'new_str'
+ */
+ PG_FUNCTION_INFO_V1(dblink_replace_text);
+ Datum
+ dblink_replace_text(PG_FUNCTION_ARGS)
+ {
+ text *left_text;
+ text *right_text;
+ text *buf_text;
+ text *ret_text;
+ char *ret_str;
+ int curr_posn;
+ text *src_text = PG_GETARG_TEXT_P(0);
+ int src_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(src_text)));
+ text *from_sub_text = PG_GETARG_TEXT_P(1);
+ int from_sub_text_len = DatumGetInt32(DirectFunctionCall1(textlen, PointerGetDatum(from_sub_text)));
+ text *to_sub_text = PG_GETARG_TEXT_P(2);
+ char *to_sub_str = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(to_sub_text)));
+ StringInfo str = makeStringInfo();
+
+ if (src_text_len == 0 || from_sub_text_len == 0)
+ PG_RETURN_TEXT_P(src_text);
+
+ buf_text = DatumGetTextPCopy(PointerGetDatum(src_text));
+ curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text),
PointerGetDatum(from_sub_text)));
+
+ while (curr_posn > 0)
+ {
+ left_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text), 1,
DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) - 1));
+ right_text = DatumGetTextP(DirectFunctionCall3(text_substr, PointerGetDatum(buf_text),
DatumGetInt32(DirectFunctionCall2(textpos,PointerGetDatum(buf_text), PointerGetDatum(from_sub_text))) +
from_sub_text_len,-1));
+
+ appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(left_text))));
+ appendStringInfo(str, to_sub_str);
+
+ pfree(buf_text);
+ pfree(left_text);
+ buf_text = right_text;
+ curr_posn = DatumGetInt32(DirectFunctionCall2(textpos, PointerGetDatum(buf_text),
PointerGetDatum(from_sub_text)));
+ }
+
+ appendStringInfo(str, DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(buf_text))));
+ pfree(buf_text);
+
+ ret_str = pstrdup(str->data);
+ ret_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(ret_str)));
+
+ PG_RETURN_TEXT_P(ret_text);
+ }
+
+
+ /*************************************************************
* internal functions
*/
***************
*** 285,293 ****
--- 853,1408 ----
MemSet(retval, 0, sizeof(dblink_results));
retval->tup_num = -1;
+ retval->res_id_index =-1;
retval->res = NULL;
MemoryContextSwitchTo(oldcontext);
return retval;
}
+
+
+ /*
+ * init_dblink_array_results
+ * - create an empty dblink_array_results data structure
+ */
+ dblink_array_results *
+ init_dblink_array_results(MemoryContext fn_mcxt)
+ {
+ MemoryContext oldcontext;
+ dblink_array_results *retval;
+
+ oldcontext = MemoryContextSwitchTo(fn_mcxt);
+
+ retval = (dblink_array_results *) palloc(sizeof(dblink_array_results));
+ MemSet(retval, 0, sizeof(dblink_array_results));
+
+ retval->elem_num = -1;
+ retval->num_elems = 0;
+ retval->res = NULL;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return retval;
+ }
+
+ /*
+ * get_pkey_attnames
+ *
+ * Get the primary key attnames for the given relation.
+ * Return NULL, and set numatts = 0, if no primary key exists.
+ */
+ char **
+ get_pkey_attnames(Oid relid, int16 *numatts)
+ {
+ Relation indexRelation;
+ ScanKeyData entry;
+ HeapScanDesc scan;
+ HeapTuple indexTuple;
+ int i;
+ char **result = NULL;
+ Relation rel;
+ TupleDesc tupdesc;
+
+ /*
+ * Open relation using relid, get tupdesc
+ */
+ rel = relation_open(relid, AccessShareLock);
+ tupdesc = rel->rd_att;
+
+ /*
+ * Initialize numatts to 0 in case no primary key
+ * exists
+ */
+ *numatts = 0;
+
+ /*
+ * Use relid to get all related indexes
+ */
+ indexRelation = heap_openr(IndexRelationName, AccessShareLock);
+ ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indrelid,
+ F_OIDEQ, ObjectIdGetDatum(relid));
+ scan = heap_beginscan(indexRelation, false, SnapshotNow,
+ 1, &entry);
+
+ while (HeapTupleIsValid(indexTuple = heap_getnext(scan, 0)))
+ {
+ Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
+
+ /*
+ * We're only interested if it is the primary key
+ */
+ if (index->indisprimary == TRUE)
+ {
+ i = 0;
+ while (index->indkey[i++] != 0)
+ (*numatts)++;
+
+ if (*numatts > 0)
+ {
+ result = (char **) palloc(*numatts * sizeof(char *));
+ for (i = 0; i < *numatts; i++)
+ result[i] = SPI_fname(tupdesc, index->indkey[i]);
+ }
+ break;
+ }
+ }
+ heap_endscan(scan);
+ heap_close(indexRelation, AccessShareLock);
+ relation_close(rel, AccessShareLock);
+
+ return result;
+ }
+
+
+ /*
+ * get_strtok
+ *
+ * parse input string
+ * return ord item (0 based)
+ * based on provided field separator
+ */
+ char *
+ get_strtok(char *fldtext, char *fldsep, int fldnum)
+ {
+ int j = 0;
+ char *result;
+
+ if (fldnum < 0)
+ {
+ elog(ERROR, "get_strtok: field number < 0 not permitted");
+ }
+
+ if (fldsep[0] == '\0')
+ {
+ elog(ERROR, "get_strtok: blank field separator not permitted");
+ }
+
+ result = strtok(fldtext, fldsep);
+ for (j = 1; j < fldnum + 1; j++)
+ {
+ result = strtok(NULL, fldsep);
+ if (result == NULL)
+ return NULL;
+ }
+
+ return pstrdup(result);
+ }
+
+ char *
+ get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+ {
+ Relation rel;
+ char *relname;
+ HeapTuple tuple;
+ TupleDesc tupdesc;
+ int natts;
+ StringInfo str = makeStringInfo();
+ char *sql = NULL;
+ char *val = NULL;
+ int16 key;
+ unsigned int i;
+
+ /*
+ * Open relation using relid
+ */
+ rel = relation_open(relid, AccessShareLock);
+ relname = RelationGetRelationName(rel);
+ tupdesc = rel->rd_att;
+ natts = tupdesc->natts;
+
+ tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+
+ appendStringInfo(str, "INSERT INTO %s(", quote_ident_cstr(relname));
+ for (i = 0; i < natts; i++)
+ {
+ if (i > 0)
+ appendStringInfo(str, ",");
+
+ appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
+ }
+
+ appendStringInfo(str, ") VALUES(");
+
+ /*
+ * remember attvals are 1 based
+ */
+ for (i = 0; i < natts; i++)
+ {
+ if (i > 0)
+ appendStringInfo(str, ",");
+
+ if (tgt_pkattvals != NULL)
+ key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
+ else
+ key = -1;
+
+ if (key > -1)
+ val = pstrdup(tgt_pkattvals[key]);
+ else
+ val = SPI_getvalue(tuple, tupdesc, i + 1);
+
+ if (val != NULL)
+ {
+ appendStringInfo(str, quote_literal_cstr(val));
+ pfree(val);
+ }
+ else
+ appendStringInfo(str, "NULL");
+ }
+ appendStringInfo(str, ")");
+
+ sql = pstrdup(str->data);
+ pfree(str->data);
+ pfree(str);
+ relation_close(rel, AccessShareLock);
+
+ return (sql);
+ }
+
+ char *
+ get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals)
+ {
+ Relation rel;
+ char *relname;
+ TupleDesc tupdesc;
+ int natts;
+ StringInfo str = makeStringInfo();
+ char *sql = NULL;
+ char *val = NULL;
+ unsigned int i;
+
+ /*
+ * Open relation using relid
+ */
+ rel = relation_open(relid, AccessShareLock);
+ relname = RelationGetRelationName(rel);
+ tupdesc = rel->rd_att;
+ natts = tupdesc->natts;
+
+ appendStringInfo(str, "DELETE FROM %s WHERE ", quote_ident_cstr(relname));
+ for (i = 0; i < pknumatts; i++)
+ {
+ int16 pkattnum = pkattnums[i];
+
+ if (i > 0)
+ appendStringInfo(str, " AND ");
+
+ appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
+
+ if (tgt_pkattvals != NULL)
+ val = pstrdup(tgt_pkattvals[i]);
+ else
+ elog(ERROR, "Target key array must not be NULL");
+
+ if (val != NULL)
+ {
+ appendStringInfo(str, "=");
+ appendStringInfo(str, quote_literal_cstr(val));
+ pfree(val);
+ }
+ else
+ appendStringInfo(str, "IS NULL");
+ }
+
+ sql = pstrdup(str->data);
+ pfree(str->data);
+ pfree(str);
+ relation_close(rel, AccessShareLock);
+
+ return (sql);
+ }
+
+ char *
+ get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals)
+ {
+ Relation rel;
+ char *relname;
+ HeapTuple tuple;
+ TupleDesc tupdesc;
+ int natts;
+ StringInfo str = makeStringInfo();
+ char *sql = NULL;
+ char *val = NULL;
+ int16 key;
+ int i;
+
+ /*
+ * Open relation using relid
+ */
+ rel = relation_open(relid, AccessShareLock);
+ relname = RelationGetRelationName(rel);
+ tupdesc = rel->rd_att;
+ natts = tupdesc->natts;
+
+ tuple = get_tuple_of_interest(relid, pkattnums, pknumatts, src_pkattvals);
+
+ appendStringInfo(str, "UPDATE %s SET ", quote_ident_cstr(relname));
+
+ for (i = 0; i < natts; i++)
+ {
+ if (i > 0)
+ appendStringInfo(str, ",");
+
+ appendStringInfo(str, NameStr(tupdesc->attrs[i]->attname));
+ appendStringInfo(str, "=");
+
+ if (tgt_pkattvals != NULL)
+ key = get_attnum_pk_pos(pkattnums, pknumatts, i + 1);
+ else
+ key = -1;
+
+ if (key > -1)
+ val = pstrdup(tgt_pkattvals[key]);
+ else
+ val = SPI_getvalue(tuple, tupdesc, i + 1);
+
+ if (val != NULL)
+ {
+ appendStringInfo(str, quote_literal_cstr(val));
+ pfree(val);
+ }
+ else
+ appendStringInfo(str, "NULL");
+ }
+
+ appendStringInfo(str, " WHERE ");
+
+ for (i = 0; i < pknumatts; i++)
+ {
+ int16 pkattnum = pkattnums[i];
+
+ if (i > 0)
+ appendStringInfo(str, " AND ");
+
+ appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
+
+ if (tgt_pkattvals != NULL)
+ val = pstrdup(tgt_pkattvals[i]);
+ else
+ val = SPI_getvalue(tuple, tupdesc, pkattnum);
+
+ if (val != NULL)
+ {
+ appendStringInfo(str, "=");
+ appendStringInfo(str, quote_literal_cstr(val));
+ pfree(val);
+ }
+ else
+ appendStringInfo(str, "IS NULL");
+ }
+
+ sql = pstrdup(str->data);
+ pfree(str->data);
+ pfree(str);
+ relation_close(rel, AccessShareLock);
+
+ return (sql);
+ }
+
+ /*
+ * Return a properly quoted literal value.
+ * Uses quote_literal in quote.c
+ */
+ static char *
+ quote_literal_cstr(char *rawstr)
+ {
+ text *rawstr_text;
+ text *result_text;
+ char *result;
+
+ rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
+ result_text = DatumGetTextP(DirectFunctionCall1(quote_literal, PointerGetDatum(rawstr_text)));
+ result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
+
+ return result;
+ }
+
+ /*
+ * Return a properly quoted identifier.
+ * Uses quote_ident in quote.c
+ */
+ static char *
+ quote_ident_cstr(char *rawstr)
+ {
+ text *rawstr_text;
+ text *result_text;
+ char *result;
+
+ rawstr_text = DatumGetTextP(DirectFunctionCall1(textin, CStringGetDatum(rawstr)));
+ result_text = DatumGetTextP(DirectFunctionCall1(quote_ident, PointerGetDatum(rawstr_text)));
+ result = DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(result_text)));
+
+ return result;
+ }
+
+ int16
+ get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key)
+ {
+ int i;
+
+ /*
+ * Not likely a long list anyway, so just scan for
+ * the value
+ */
+ for (i = 0; i < pknumatts; i++)
+ if (key == pkattnums[i])
+ return i;
+
+ return -1;
+ }
+
+ HeapTuple
+ get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals)
+ {
+ Relation rel;
+ char *relname;
+ TupleDesc tupdesc;
+ StringInfo str = makeStringInfo();
+ char *sql = NULL;
+ int ret;
+ HeapTuple tuple;
+ int i;
+ char *val = NULL;
+
+ /*
+ * Open relation using relid
+ */
+ rel = relation_open(relid, AccessShareLock);
+ relname = RelationGetRelationName(rel);
+ tupdesc = rel->rd_att;
+
+ /*
+ * Connect to SPI manager
+ */
+ if ((ret = SPI_connect()) < 0)
+ elog(ERROR, "get_tuple_of_interest: SPI_connect returned %d", ret);
+
+ /*
+ * Build sql statement to look up tuple of interest
+ * Use src_pkattvals as the criteria.
+ */
+ appendStringInfo(str, "SELECT * from %s WHERE ", relname);
+
+ for (i = 0; i < pknumatts; i++)
+ {
+ int16 pkattnum = pkattnums[i];
+
+ if (i > 0)
+ appendStringInfo(str, " AND ");
+
+ appendStringInfo(str, NameStr(tupdesc->attrs[pkattnum - 1]->attname));
+
+ val = pstrdup(src_pkattvals[i]);
+ if (val != NULL)
+ {
+ appendStringInfo(str, "=");
+ appendStringInfo(str, quote_literal_cstr(val));
+ pfree(val);
+ }
+ else
+ appendStringInfo(str, "IS NULL");
+ }
+
+ sql = pstrdup(str->data);
+ pfree(str->data);
+ pfree(str);
+ /*
+ * Retrieve the desired tuple
+ */
+ ret = SPI_exec(sql, 0);
+ pfree(sql);
+
+ /*
+ * Only allow one qualifying tuple
+ */
+ if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
+ {
+ elog(ERROR, "get_tuple_of_interest: Source criteria may not match more than one record.");
+ }
+ else if (ret == SPI_OK_SELECT && SPI_processed == 1)
+ {
+ SPITupleTable *tuptable = SPI_tuptable;
+ tuple = SPI_copytuple(tuptable->vals[0]);
+
+ return tuple;
+ }
+ else
+ {
+ /*
+ * no qualifying tuples
+ */
+ return NULL;
+ }
+
+ /*
+ * never reached, but keep compiler quiet
+ */
+ return NULL;
+ }
+
+ Oid
+ get_relid_from_relname(char *relname)
+ {
+ #ifdef NamespaceRelationName
+ Oid relid;
+
+ relid = RelnameGetRelid(relname);
+ #else
+ Relation rel;
+ Oid relid;
+
+ rel = relation_openr(relname, AccessShareLock);
+ relid = RelationGetRelid(rel);
+ relation_close(rel, AccessShareLock);
+ #endif /* NamespaceRelationName */
+
+ return relid;
+ }
+
+ dblink_results *
+ get_res_ptr(int32 res_id_index)
+ {
+ List *ptr;
+
+ /*
+ * short circuit empty list
+ */
+ if(res_id == NIL)
+ return NULL;
+
+ /*
+ * OK, should be good to go
+ */
+ foreach(ptr, res_id)
+ {
+ dblink_results *this_res_id = (dblink_results *) lfirst(ptr);
+ if (this_res_id->res_id_index == res_id_index)
+ return this_res_id;
+ }
+ return NULL;
+ }
+
+ /*
+ * Add node to global List res_id
+ */
+ void
+ append_res_ptr(dblink_results *results)
+ {
+ res_id = lappend(res_id, results);
+ }
+
+ /*
+ * Remove node from global List
+ * using res_id_index
+ */
+ void
+ remove_res_ptr(dblink_results *results)
+ {
+ res_id = lremove(results, res_id);
+
+ if (res_id == NIL)
+ res_id_index = 0;
+ }
+
+
diff -cNr dblink.orig/dblink.h dblink/dblink.h
*** dblink.orig/dblink.h Mon Nov 5 09:46:22 2001
--- dblink/dblink.h Sun Apr 14 18:54:39 2002
***************
*** 3,9 ****
*
* Functions returning results from a remote database
*
! * Copyright (c) Joseph Conway <joe.conway@mail.com>, 2001;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
--- 3,10 ----
*
* Functions returning results from a remote database
*
! * Copyright (c) Joseph Conway <mail@joeconway.com>, 2001, 2002,
! * ALL RIGHTS RESERVED;
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without a written agreement
***************
*** 33,42 ****
--- 34,64 ----
#include "libpq-int.h"
#include "fmgr.h"
#include "access/tupdesc.h"
+ #include "access/heapam.h"
+ #include "catalog/catname.h"
+ #include "catalog/pg_index.h"
+ #include "catalog/pg_type.h"
#include "executor/executor.h"
+ #include "executor/spi.h"
+ #include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "nodes/execnodes.h"
+ #include "nodes/pg_list.h"
+ #include "parser/parse_type.h"
+ #include "tcop/tcopprot.h"
#include "utils/builtins.h"
+ #include "utils/fmgroids.h"
+ #include "utils/array.h"
+ #include "utils/syscache.h"
+
+ #ifdef NamespaceRelationName
+ #include "catalog/namespace.h"
+ #endif /* NamespaceRelationName */
+
+ /*
+ * Max SQL statement size
+ */
+ #define DBLINK_MAX_SQLSTATE_SIZE 16384
/*
* This struct holds the results of the remote query.
***************
*** 50,70 ****
int tup_num;
/*
* the actual query results
*/
PGresult *res;
-
} dblink_results;
/*
* External declarations
*/
extern Datum dblink(PG_FUNCTION_ARGS);
extern Datum dblink_tok(PG_FUNCTION_ARGS);
/*
* Internal declarations
*/
dblink_results *init_dblink_results(MemoryContext fn_mcxt);
#endif /* DBLINK_H */
--- 72,145 ----
int tup_num;
/*
+ * resource index number for this context
+ */
+ int res_id_index;
+
+ /*
* the actual query results
*/
PGresult *res;
} dblink_results;
+
+ /*
+ * This struct holds results in the form of an array.
+ * Use fn_extra to hold a pointer to it across calls
+ */
+ typedef struct
+ {
+ /*
+ * elem being accessed
+ */
+ int elem_num;
+
+ /*
+ * number of elems
+ */
+ int num_elems;
+
+ /*
+ * the actual array
+ */
+ void *res;
+
+ } dblink_array_results;
+
/*
* External declarations
*/
extern Datum dblink(PG_FUNCTION_ARGS);
extern Datum dblink_tok(PG_FUNCTION_ARGS);
+ extern Datum dblink_strtok(PG_FUNCTION_ARGS);
+ extern Datum dblink_get_pkey(PG_FUNCTION_ARGS);
+ extern Datum dblink_last_oid(PG_FUNCTION_ARGS);
+ extern Datum dblink_build_sql_insert(PG_FUNCTION_ARGS);
+ extern Datum dblink_build_sql_delete(PG_FUNCTION_ARGS);
+ extern Datum dblink_build_sql_update(PG_FUNCTION_ARGS);
+ extern Datum dblink_current_query(PG_FUNCTION_ARGS);
+ extern Datum dblink_replace_text(PG_FUNCTION_ARGS);
/*
* Internal declarations
*/
dblink_results *init_dblink_results(MemoryContext fn_mcxt);
+ dblink_array_results *init_dblink_array_results(MemoryContext fn_mcxt);
+ char **get_pkey_attnames(Oid relid, int16 *numatts);
+ char *get_strtok(char *fldtext, char *fldsep, int fldnum);
+ char *getvalue(HeapTuple tuple, TupleDesc tupdesc, int fnumber);
+ char *get_sql_insert(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+ char *get_sql_delete(Oid relid, int16 *pkattnums, int16 pknumatts, char **tgt_pkattvals);
+ char *get_sql_update(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals, char **tgt_pkattvals);
+ static char *quote_literal_cstr(char *rawstr);
+ static char *quote_ident_cstr(char *rawstr);
+ int16 get_attnum_pk_pos(int16 *pkattnums, int16 pknumatts, int16 key);
+ HeapTuple get_tuple_of_interest(Oid relid, int16 *pkattnums, int16 pknumatts, char **src_pkattvals);
+ Oid get_relid_from_relname(char *relname);
+ dblink_results *get_res_ptr(int32 res_id_index);
+ void append_res_ptr(dblink_results *results);
+ void remove_res_ptr(dblink_results *results);
+
+ extern char *debug_query_string;
#endif /* DBLINK_H */
diff -cNr dblink.orig/dblink.sql.in dblink/dblink.sql.in
*** dblink.orig/dblink.sql.in Thu Jun 14 09:49:03 2001
--- dblink/dblink.sql.in Fri Apr 12 14:36:49 2002
***************
*** 1,5 ****
! CREATE FUNCTION dblink (text,text) RETURNS setof int
! AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c';
! CREATE FUNCTION dblink_tok (int,int) RETURNS text
! AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c';
--- 1,38 ----
! CREATE OR REPLACE FUNCTION dblink (text,text) RETURNS setof int
! AS 'MODULE_PATHNAME','dblink' LANGUAGE 'c'
! WITH (isstrict);
! CREATE OR REPLACE FUNCTION dblink_tok (int,int) RETURNS text
! AS 'MODULE_PATHNAME','dblink_tok' LANGUAGE 'c'
! WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_strtok (text,text,int) RETURNS text
! AS 'MODULE_PATHNAME','dblink_strtok' LANGUAGE 'c'
! WITH (iscachable, isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_get_pkey (name) RETURNS setof text
! AS 'MODULE_PATHNAME','dblink_get_pkey' LANGUAGE 'c'
! WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_last_oid (int) RETURNS oid
! AS 'MODULE_PATHNAME','dblink_last_oid' LANGUAGE 'c'
! WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_build_sql_insert (name, int2vector, int2, _text, _text) RETURNS text
! AS 'MODULE_PATHNAME','dblink_build_sql_insert' LANGUAGE 'c'
! WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_build_sql_delete (name, int2vector, int2, _text) RETURNS text
! AS 'MODULE_PATHNAME','dblink_build_sql_delete' LANGUAGE 'c'
! WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_build_sql_update (name, int2vector, int2, _text, _text) RETURNS text
! AS 'MODULE_PATHNAME','dblink_build_sql_update' LANGUAGE 'c'
! WITH (isstrict);
!
! CREATE OR REPLACE FUNCTION dblink_current_query () RETURNS text
! AS 'MODULE_PATHNAME','dblink_current_query' LANGUAGE 'c';
!
! CREATE OR REPLACE FUNCTION dblink_replace (text,text,text) RETURNS text
! AS 'MODULE_PATHNAME','dblink_replace_text' LANGUAGE 'c'
! WITH (iscachable, isstrict);
В списке pgsql-patches по дате отправления: