*** a/doc/src/sgml/filelist.sgml
--- b/doc/src/sgml/filelist.sgml
***************
*** 67,76 ****
--- 67,77 ----
+
%allfiles;
*** a/doc/src/sgml/plproxy.sgml
--- b/doc/src/sgml/plproxy.sgml
***************
*** 0 ****
--- 1,221 ----
+
+
+
+ PL/Proxy - Procedural Language for Remote Calls
+
+ PL/Proxy>>
+
+
+ The PL/Proxy procedural language makes
+ easy to create remote calls.
+
+
+
+ To install PL/Proxy in a particular database, use
+ createlang plproxy dbname>.
+
+
+
+
+ If a language is installed into template1>, all subsequently
+ created databases will have the language installed automatically.
+
+
+
+
+ PL/Proxy is untrusted language, because it does remote accesses.
+ That means only superusers can create functions in that language.
+
+
+
+ PL/Proxy Language
+
+
+ Functions in PL/Proxy are declared via the standard
+
+ syntax:
+
+
+ CREATE FUNCTION funcname (argument-list)
+ RETURNS return-type
+ AS $$
+ -- PL/Proxy function body
+ $$ LANGUAGE plproxy;
+
+
+
+ The language is similar to plpgsql - string quoting, comments,
+ semicolon at the statements end.
+
+ It contains only 4 statements: CONNECT,
+ CLUSTER, RUN and
+ SELECT.
+
+ Each function needs to have either CONNECT or
+ pair of CLUSTER + RUN statements
+ to specify where to run the function.
+
+ The SELECT statement is optional, if it is
+ missing, there will be default query generated based on proxy function
+ signature.
+
+ The RUN statment is also optional, it defaults
+ to RUN ON ANY which means the query will be run
+ random partition.
+
+
+ CONNECT
+ CONNECT 'libpq connstr';
+ Specifies exact location where to connect and execute the query.
+ If several functions have same connstr, they will use same connection.
+
+
+
+ CLUSTER
+
+ CLUSTER 'cluster_name';
+ Specifies exact cluster name to be run on. The cluster name will
+ be passed to plproxy.get_cluster_* functions.
+
+ CLUSTER cluster_func(..);
+ Cluster name can be dynamically decided upon proxy function arguments.
+ cluster_func should return text value of final cluster name.
+
+
+
+
+ RUN ON
+ RUN ON ALL;
+ Query will be run on all partitions in cluster in parallel.
+
+ RUN ON ANY;
+ Query will be run on random partition.
+
+ RUN ON <NR>;
+ Run on partition number <NR>.
+
+ RUN ON partition_func(..);
+ Run partition_func() which should return one or more hash values.
+ PL/Proxy accepts int2, int4 and int8 values from hash function.
+ Query will be run on tagged partitions. If more than one partition was
+ tagged, query will be sent in parallel to them.
+
+
+
+
+
+ SELECT
+ SELECT .... ;
+ By default, PL/Proxy generates query based on its own signature.
+ But this can be overrided by giving explicit SELECT
+ statement to run.
+
+ Everything after SELECT until semicolon is
+ taken as SQL to be passed on. Only argument substitution is done on the
+ contents, otherwise the text is unparsed. To avoid a table column to be
+ parsed as function argument, table aliases should be used.
+
+ Query result should have same number of columns as function result
+ and same names too.
+
+
+
+
+
+ Argument substitution
+ Proxy function arguments can be referenced using name or
+ $n syntax. Everything that is not argument
+ reference is just passed on.
+
+
+
+
+
+ Configuring PL/Proxy Cluster
+
+ PL/Proxy can be used in either CONNECT mode or CLUSTER mode.
+
+ In CONNECT mode PL/Proxy acts as a pass through proxy to another database.
+ Each PL/Proxy function contains a libpq connect string for the connection
+ to a database it will proxy the request to.
+
+ PL/Proxy can also be used in CLUSTER mode where it provides support for
+ partitioning data across multiple databases based on a clustering function.
+
+ When using PL/Proxy in CONNECT mode no configuration functions are required.
+ However, using PL/Proxy in CLUSTER mode requires the following configuration
+ functions to be defined.
+
+
+ plproxy.get_cluster_version(cluster_name)
+
+ plproxy.get_cluster_version(cluster_name text) returns integer
+
+ The get_cluster_version function is called on each request, it should return
+ the version number of the current configuration for a particular cluster.
+ If the version number returned by this function is higher than the one plproxy
+ has cached, then the configuration and partition information will be reloaded
+ by calling the get_cluster_config() and get_cluster_paritions() functions.
+
+ This is an example function that does not lookup the version number for an
+ external source such as a configuration table.
+
+
+ CREATE OR REPLACE FUNCTION plproxy.get_cluster_version(cluster_name text)
+ RETURNS int4 AS $$
+ BEGIN
+ IF cluster_name = 'a_cluster' THEN
+ RETURN 1;
+ END IF;
+ RAISE EXCEPTION 'Unknown cluster';
+ END;
+ $$ LANGUAGE plpgsql;
+
+
+
+
+
+ plproxy.get_cluster_partitions(cluster_name)
+
+ plproxy.get_cluster_partitions(cluster_name text) returns setof text
+
+ This is called when a new partition configuration needs to be loaded.
+ It should return connect strings to the partitions in the cluster.
+ The connstrings should be returned in the correct order. The total
+ number of connstrings returned must be a power of 2. If two or more
+ connstrings are equal then they will use the same connection.
+
+ If the string "user=" does not appear in a connect string then
+ user=CURRENT_USER will be appended to the connection string by PL/Proxy.
+ This will cause PL/Proxy to connect to the partition database using
+ the same username as was used to connect to the proxy database.
+ Since plproxy does not know any passwords, the partition databases
+ should be using "trust" authentication for connections from the proxy database
+ to allow connections to the proxy database without requiring a password.
+ If the connect strings contain an explicit username then an explicit
+ password can also be set in the connstring.
+
+ Best way to set explicit passwords is to add them to .pgpass file
+ in home dir of the user Postgres server runs at.
+
+ An example function without the use of separate configuration tables:
+
+
+ CREATE OR REPLACE FUNCTION plproxy.get_cluster_partitions(cluster_name text)
+ RETURNS SETOF text AS $$
+ BEGIN
+ IF cluster_name = 'a_cluster' THEN
+ RETURN NEXT 'dbname=part00 host=127.0.0.1';
+ RETURN NEXT 'dbname=part01 host=127.0.0.1';
+ RETURN NEXT 'dbname=part02 host=127.0.0.1';
+ RETURN NEXT 'dbname=part03 host=127.0.0.1';
+ RETURN;
+ END IF;
+ RAISE EXCEPTION 'Unknown cluster';
+ END;
+ $$ LANGUAGE plpgsql;
+
+
+
+
+
*** a/doc/src/sgml/postgres.sgml
--- b/doc/src/sgml/postgres.sgml
***************
*** 210,219 ****
--- 210,220 ----
&xplang;
&plsql;
&pltcl;
&plperl;
&plpython;
+ &plproxy;
&spi;
*** a/src/include/catalog/pg_pltemplate.h
--- b/src/include/catalog/pg_pltemplate.h
*************** DATA(insert ( "plpgsql" t t "plpgsql_ca
*** 69,75 ****
--- 69,76 ----
DATA(insert ( "pltcl" t t "pltcl_call_handler" _null_ "$libdir/pltcl" _null_ ));
DATA(insert ( "pltclu" f f "pltclu_call_handler" _null_ "$libdir/pltcl" _null_ ));
DATA(insert ( "plperl" t t "plperl_call_handler" "plperl_validator" "$libdir/plperl" _null_ ));
DATA(insert ( "plperlu" f f "plperl_call_handler" "plperl_validator" "$libdir/plperl" _null_ ));
DATA(insert ( "plpythonu" f f "plpython_call_handler" _null_ "$libdir/plpython" _null_ ));
+ DATA(insert ( "plproxy" f f "plproxy_call_handler" _null_ "$libdir/plproxy" _null_ ));
#endif /* PG_PLTEMPLATE_H */
*** a/src/pl/Makefile
--- b/src/pl/Makefile
***************
*** 10,20 ****
subdir = src/pl
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
! DIRS = plpgsql
ifeq ($(with_perl), yes)
DIRS += plperl
endif
--- 10,20 ----
subdir = src/pl
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
! DIRS = plpgsql plproxy
ifeq ($(with_perl), yes)
DIRS += plperl
endif
*** a/src/pl/plproxy/Makefile
--- b/src/pl/plproxy/Makefile
***************
*** 0 ****
--- 1,89 ----
+ #-------------------------------------------------------------------------
+ #
+ # Makefile for the plproxy shared object
+ #
+ # $PostgreSQL$
+ #
+ #-------------------------------------------------------------------------
+
+ subdir = src/pl/plproxy
+ top_builddir = ../../..
+ include $(top_builddir)/src/Makefile.global
+
+ NAME= plproxy
+ SRCS = cluster.c execute.c function.c main.c \
+ query.c result.c type.c poll_compat.c
+ OBJS = scanner.o parser.tab.o $(SRCS:.c=.o)
+
+ REGRESS = plproxy_init plproxy_test plproxy_select plproxy_many \
+ plproxy_errors plproxy_clustermap plproxy_dynamic_record
+ REGRESS_OPTS = --load-language=plpgsql
+
+ override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
+ SHLIB_LINK = $(libpq_pgport) $(LDFLAGS) $(LIBS)
+ rpath =
+
+
+ all: all-lib
+
+ # Shared library stuff
+ include $(top_srcdir)/src/Makefile.shlib
+
+
+ install: installdirs all install-lib
+
+ installdirs: installdirs-lib
+
+ uninstall: uninstall-lib
+
+
+ # Force these dependencies to be known even without dependency info built:
+ $(OBJS): plproxy.h rowstamp.h
+ execute.o: poll_compat.h
+ poll_compat.o: poll_compat.h
+
+ scanner.o: parser.tab.h
+ parser.tab.h: parser.tab.c
+
+ $(srcdir)/parser.tab.c: parser.y
+ ifdef YACC
+ $(YACC) -d $(YFLAGS) $<
+ mv -f y.tab.c $(srcdir)/parser.tab.c
+ mv -f y.tab.h $(srcdir)/parser.tab.h
+ else
+ @$(missing) bison $< $@
+ endif
+
+ # Because we use %option case-insensitive, flex's results could vary
+ # depending on what the compile-time locale setting is. Hence, force
+ # it to see LC_CTYPE=C to ensure consistent build results.
+
+ $(srcdir)/scanner.c: scanner.l
+ ifdef FLEX
+ LC_CTYPE=C $(FLEX) $(FLEXFLAGS) -o'$@' $<
+ else
+ @$(missing) flex $< $@
+ endif
+
+ distprep: $(srcdir)/scanner.c $(srcdir)/parser.tab.h $(srcdir)/parser.tab.c
+
+ # parser.tab.c, parser.tab.h and scanner.c are in the distribution tarball,
+ # so they are not cleaned here.
+ clean distclean: clean-lib
+ rm -f $(OBJS)
+ # And the garbage that might have been left behind by partial build:
+ @rm -f y.tab.h y.tab.c y.output lex.yy.c
+
+ maintainer-clean: clean
+ rm -f $(srcdir)/parser.tab.c $(srcdir)/parser.tab.h $(srcdir)/scanner.c
+
+ installcheck: submake
+ $(top_builddir)/src/test/regress/pg_regress \
+ --inputdir=$(srcdir) \
+ --psqldir=$(PSQLDIR) \
+ $(REGRESS_OPTS) $(REGRESS)
+
+ .PHONY: submake
+ submake:
+ $(MAKE) -C $(top_builddir)/src/test/regress pg_regress$(X)
+
*** a/src/pl/plproxy/cluster.c
--- b/src/pl/plproxy/cluster.c
***************
*** 0 ****
--- 1,469 ----
+ /*
+ * PL/Proxy - easy access to partitioned database.
+ *
+ * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+ /*
+ * Cluster info management.
+ *
+ * Info structures are kept in separate memory context: cluster_mem.
+ */
+
+ #include "plproxy.h"
+
+ /* Permanent memory area for cluster info structures */
+ static MemoryContext cluster_mem;
+
+ /*
+ * Singly linked list of clusters.
+ *
+ * For searching by name. If there will be lots of clusters
+ * should use some faster search method, HTAB probably.
+ */
+ static ProxyCluster *cluster_list = NULL;
+
+ /*
+ * Similar list for fake clusters (for CONNECT functions).
+ *
+ * Cluster name will be actual connect string.
+ */
+ static ProxyCluster *fake_cluster_list = NULL;
+
+ /* plan for fetching cluster version */
+ static void *version_plan;
+
+ /* plan for fetching cluster partitions */
+ static void *partlist_plan;
+
+ /* query for fetching cluster version */
+ static const char version_sql[] = "select * from plproxy.get_cluster_version($1)";
+
+ /* query for fetching cluster partitions */
+ static const char part_sql[] = "select * from plproxy.get_cluster_partitions($1)";
+
+
+ /*
+ * Connsetion count should be non-zero and power of 2.
+ */
+ static bool
+ check_valid_partcount(int n)
+ {
+ return (n > 0) && !(n & (n - 1));
+ }
+
+ /*
+ * Create cache memory area and prepare plans
+ */
+ void
+ plproxy_cluster_cache_init(void)
+ {
+ /*
+ * create long-lived memory context
+ */
+
+ cluster_mem = AllocSetContextCreate(TopMemoryContext,
+ "PL/Proxy cluster context",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+ }
+
+ /* initialize plans on demand */
+ static void
+ plproxy_cluster_plan_init(void)
+ {
+ void *tmp_ver_plan, *tmp_part_plan;
+ Oid types[] = {TEXTOID};
+ static int init_done = 0;
+
+ if (init_done)
+ return;
+
+ /*
+ * prepare plans for fetching configuration.
+ */
+
+ tmp_ver_plan = SPI_prepare(version_sql, 1, types);
+ if (tmp_ver_plan == NULL)
+ elog(ERROR, "PL/Proxy: plproxy.get_cluster_version() SQL fails: %s",
+ SPI_result_code_string(SPI_result));
+
+ tmp_part_plan = SPI_prepare(part_sql, 1, types);
+ if (tmp_part_plan == NULL)
+ elog(ERROR, "PL/Proxy: plproxy.get_cluster_partitions() SQL fails: %s",
+ SPI_result_code_string(SPI_result));
+
+ /*
+ * Store them only if all successful.
+ */
+ version_plan = SPI_saveplan(tmp_ver_plan);
+ partlist_plan = SPI_saveplan(tmp_part_plan);
+
+ init_done = 1;
+ }
+
+ /*
+ * Drop partition and connection data from cluster.
+ */
+ static void
+ free_connlist(ProxyCluster *cluster)
+ {
+ int i;
+ ProxyConnection *conn;
+
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (conn->db)
+ PQfinish(conn->db);
+ if (conn->res)
+ PQclear(conn->res);
+ if (conn->connstr)
+ pfree((void *) conn->connstr);
+ }
+ pfree(cluster->part_map);
+ pfree(cluster->conn_list);
+
+ cluster->part_map = NULL;
+ cluster->part_count = 0;
+ cluster->part_mask = 0;
+ cluster->conn_list = NULL;
+ cluster->conn_count = 0;
+ }
+
+ /*
+ * Add new database connection if it does not exists.
+ */
+ static ProxyConnection *
+ add_connection(ProxyCluster *cluster, char *connstr)
+ {
+ int i;
+ ProxyConnection *conn;
+ char *username;
+ StringInfo final;
+
+ final = makeStringInfo();
+ appendStringInfoString(final, connstr);
+
+ /* append current user if not specified in connstr */
+ if (strstr(connstr, "user=") == NULL)
+ {
+ username = GetUserNameFromId(GetSessionUserId());
+ appendStringInfo(final, " user=%s", username);
+ }
+
+ /* check if already have it */
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (strcmp(conn->connstr, final->data) == 0)
+ return conn;
+ }
+
+ /* add new connection */
+ conn = &cluster->conn_list[cluster->conn_count++];
+ conn->connstr = MemoryContextStrdup(cluster_mem, final->data);
+
+ return conn;
+ }
+
+ /*
+ * Fetch cluster version.
+ * Called for each execution.
+ */
+ static int
+ get_version(ProxyFunction *func, Datum dname)
+ {
+ Datum bin_val;
+ bool isnull;
+ char nulls[1];
+ int err;
+
+ nulls[0] = (dname == (Datum) NULL) ? 'n' : ' ';
+
+ err = SPI_execute_plan(version_plan, &dname, nulls, false, 0);
+ if (err != SPI_OK_SELECT)
+ plproxy_error(func, "get_version: spi error: %s",
+ SPI_result_code_string(err));
+ if (SPI_processed != 1)
+ plproxy_error(func, "get_version: got %d rows",
+ SPI_processed);
+
+ bin_val = SPI_getbinval(SPI_tuptable->vals[0],
+ SPI_tuptable->tupdesc, 1, &isnull);
+ if (isnull)
+ plproxy_error(func, "get_version: got NULL?");
+
+ return DatumGetInt32(bin_val);
+ }
+
+ /* fetch list of parts */
+ static int
+ reload_parts(ProxyCluster *cluster, Datum dname, ProxyFunction *func)
+ {
+ int err,
+ i;
+ ProxyConnection *conn;
+ char *connstr;
+ MemoryContext old_ctx;
+ TupleDesc desc;
+ HeapTuple row;
+
+ /* run query */
+ err = SPI_execute_plan(partlist_plan, &dname, NULL, false, 0);
+ if (err != SPI_OK_SELECT)
+ plproxy_error(func, "get_partlist: spi error");
+ if (!check_valid_partcount(SPI_processed))
+ plproxy_error(func, "get_partlist: invalid part count");
+
+ /* check column types */
+ desc = SPI_tuptable->tupdesc;
+ if (desc->natts < 1)
+ plproxy_error(func, "Partition config must have at least 1 columns");
+ if (SPI_gettypeid(desc, 1) != TEXTOID)
+ plproxy_error(func, "partition column 1 must be text");
+
+ /* free old one */
+ if (cluster->conn_list)
+ free_connlist(cluster);
+
+ cluster->part_count = SPI_processed;
+ cluster->part_mask = cluster->part_count - 1;
+
+ /* allocate lists */
+ old_ctx = MemoryContextSwitchTo(cluster_mem);
+ cluster->part_map = palloc0(SPI_processed * sizeof(ProxyConnection *));
+ cluster->conn_list = palloc0(SPI_processed * sizeof(ProxyConnection));
+ MemoryContextSwitchTo(old_ctx);
+
+ /* fill values */
+ for (i = 0; i < SPI_processed; i++)
+ {
+ row = SPI_tuptable->vals[i];
+
+ connstr = SPI_getvalue(row, desc, 1);
+ if (connstr == NULL)
+ plproxy_error(func, "connstr must not be NULL");
+
+ conn = add_connection(cluster, connstr);
+ cluster->part_map[i] = conn;
+ }
+
+ return 0;
+ }
+
+ /* allocate new cluster */
+ static ProxyCluster *
+ new_cluster(const char *name)
+ {
+ ProxyCluster *cluster;
+ MemoryContext old_ctx;
+
+ old_ctx = MemoryContextSwitchTo(cluster_mem);
+
+ cluster = palloc0(sizeof(*cluster));
+ cluster->name = pstrdup(name);
+
+ MemoryContextSwitchTo(old_ctx);
+
+ return cluster;
+ }
+
+ /*
+ * Get cached or create new fake cluster.
+ */
+ static ProxyCluster *
+ fake_cluster(ProxyFunction *func)
+ {
+ ProxyCluster *cluster;
+ ProxyConnection *conn;
+ MemoryContext old_ctx;
+
+ /* search if cached */
+ for (cluster = fake_cluster_list; cluster; cluster = cluster->next)
+ {
+ if (strcmp(cluster->name, func->connect_str) == 0)
+ break;
+ }
+
+ if (cluster)
+ return cluster;
+
+ /* create if not */
+
+ old_ctx = MemoryContextSwitchTo(cluster_mem);
+
+ cluster = palloc0(sizeof(*cluster));
+ cluster->name = pstrdup(func->connect_str);
+ cluster->version = 1;
+ cluster->part_count = 1;
+ cluster->part_mask = 0;
+ cluster->conn_count = 1;
+ cluster->part_map = palloc(sizeof(ProxyConnection *));
+ cluster->conn_list = palloc0(sizeof(ProxyConnection));
+ conn = &cluster->conn_list[0];
+ cluster->part_map[0] = conn;
+
+ conn->connstr = pstrdup(cluster->name);
+ conn->state = C_NONE;
+
+ MemoryContextSwitchTo(old_ctx);
+
+ cluster->next = fake_cluster_list;
+ fake_cluster_list = cluster;
+
+ return cluster;
+ }
+
+ /*
+ * Call resolve function
+ */
+ static const char *
+ cluster_resolve_name(ProxyFunction *func, FunctionCallInfo fcinfo)
+ {
+ const char *name;
+ HeapTuple row;
+ TupleDesc desc;
+
+ plproxy_query_exec(func, fcinfo, func->cluster_sql);
+
+ if (SPI_processed != 1)
+ plproxy_error(func, "'%s' returned %d rows, expected 1",
+ func->cluster_sql->sql, SPI_processed);
+
+ desc = SPI_tuptable->tupdesc;
+ if (SPI_gettypeid(desc, 1) != TEXTOID)
+ plproxy_error(func, "expected text");
+
+ row = SPI_tuptable->vals[0];
+ name = SPI_getvalue(row, desc, 1);
+ if (name == NULL)
+ plproxy_error(func, "Cluster name map func returned NULL");
+
+ return name;
+ }
+
+ /*
+ * Find cached cluster of create new one.
+ *
+ * Function argument is only for error handling.
+ * Just func->cluster_name is used.
+ */
+ ProxyCluster *
+ plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo)
+ {
+ ProxyCluster *cluster;
+ int cur_version;
+ const char *name;
+ Datum dname;
+
+ /* functions used CONNECT */
+ if (func->connect_str)
+ return fake_cluster(func);
+
+ /* initialize plans on demand only */
+ plproxy_cluster_plan_init();
+
+ if (func->cluster_sql)
+ name = cluster_resolve_name(func, fcinfo);
+ else
+ name = func->cluster_name;
+
+ /* create Datum for name */
+ dname = DirectFunctionCall1(textin, CStringGetDatum(name));
+
+ /* fetch serial, also check if exists */
+ cur_version = get_version(func, dname);
+
+ /* search if cached */
+ for (cluster = cluster_list; cluster; cluster = cluster->next)
+ {
+ if (strcmp(cluster->name, name) == 0)
+ break;
+ }
+
+ /* create if not */
+ if (!cluster)
+ {
+ cluster = new_cluster(name);
+ cluster->next = cluster_list;
+ cluster_list = cluster;
+ }
+
+ /* update if needed */
+ if (cur_version != cluster->version)
+ {
+ reload_parts(cluster, dname, func);
+ cluster->version = cur_version;
+ }
+
+ return cluster;
+ }
+
+ static void
+ clean_cluster(ProxyCluster *cluster, struct timeval * now)
+ {
+ ProxyConnection *conn;
+ time_t age;
+ int i;
+ bool drop;
+
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (conn->res)
+ {
+ PQclear(conn->res);
+ conn->res = NULL;
+ }
+ if (!conn->db)
+ continue;
+
+ drop = false;
+ if (PQstatus(conn->db) != CONNECTION_OK)
+ {
+ drop = true;
+ }
+ else if (PLPROXY_CONN_LIFETIME > 0)
+ {
+ age = now->tv_sec - conn->connect_time;
+ if (age >= PLPROXY_CONN_LIFETIME)
+ drop = true;
+ }
+
+ if (drop)
+ {
+ PQfinish(conn->db);
+ conn->db = NULL;
+ conn->state = C_NONE;
+ }
+ }
+ }
+
+ /*
+ * Clean old connections and results from all clusters.
+ */
+ void
+ plproxy_cluster_maint(struct timeval * now)
+ {
+ ProxyCluster *cluster;
+
+ for (cluster = cluster_list; cluster; cluster = cluster->next)
+ clean_cluster(cluster, now);
+ for (cluster = fake_cluster_list; cluster; cluster = cluster->next)
+ clean_cluster(cluster, now);
+ }
*** a/src/pl/plproxy/execute.c
--- b/src/pl/plproxy/execute.c
***************
*** 0 ****
--- 1,724 ----
+ /*
+ * PL/Proxy - easy access to partitioned database.
+ *
+ * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+ /*
+ * Actual execution logic is here.
+ *
+ * - Tag particural databases, where query must be sent.
+ * - Send the query.
+ * - Fetch the results.
+ */
+
+ #include "plproxy.h"
+
+ #include
+
+ #include "poll_compat.h"
+
+ /* some error happened */
+ static void
+ conn_error(ProxyFunction *func, ProxyConnection *conn, const char *desc)
+ {
+ plproxy_error(func, "%s: %s",
+ desc, PQerrorMessage(conn->db));
+ }
+
+ /* Compare if major/minor match. Works on "MAJ.MIN.*" */
+ static bool
+ cmp_branch(const char *this, const char *that)
+ {
+ int dot = 0;
+ int i;
+
+ for (i = 0; this[i] || that[i]; i++)
+ {
+ /* allow just maj.min verson */
+ if (dot && this[i] == '.' && !that[i])
+ return true;
+ if (dot && that[i] == '.' && !this[i])
+ return true;
+
+ /* compare, different length is also handled here */
+ if (this[i] != that[i])
+ return false;
+
+ /* stop on second dot */
+ if (this[i] == '.' && dot++)
+ return true;
+ }
+ return true;
+ }
+
+ static void
+ flush_connection(ProxyFunction *func, ProxyConnection *conn)
+ {
+ int res;
+
+ /* flush it down */
+ res = PQflush(conn->db);
+
+ /* set actual state */
+ if (res > 0)
+ conn->state = C_QUERY_WRITE;
+ else if (res == 0)
+ conn->state = C_QUERY_READ;
+ else
+ conn_error(func, conn, "PQflush");
+ }
+
+ /*
+ * Small sanity checking for new connections.
+ *
+ * Current checks:
+ * - Does there happen any encoding conversations?
+ * - Difference in standard_conforming_strings.
+ */
+ static int
+ tune_connection(ProxyFunction *func, ProxyConnection *conn)
+ {
+ const char *this_enc, *dst_enc;
+ const char *dst_ver;
+ StringInfo sql = NULL;
+
+ /*
+ * check if target server has same backend version.
+ */
+ dst_ver = PQparameterStatus(conn->db, "server_version");
+ conn->same_ver = cmp_branch(dst_ver, PG_VERSION);
+
+ /*
+ * sync client_encoding
+ */
+ this_enc = pg_get_client_encoding_name();
+ dst_enc = PQparameterStatus(conn->db, "client_encoding");
+ if (dst_enc && strcmp(this_enc, dst_enc))
+ {
+ if (!sql)
+ sql = makeStringInfo();
+ appendStringInfo(sql, "set client_encoding = '%s'; ", this_enc);
+ }
+
+ /*
+ * if second time in this function, they should be active already.
+ */
+ if (sql && conn->tuning)
+ {
+ /* display SET query */
+ appendStringInfo(sql, "-- does not seem to apply");
+ conn_error(func, conn, sql->data);
+ }
+
+ /*
+ * send tuning query
+ */
+ if (sql)
+ {
+ conn->tuning = 1;
+ conn->state = C_QUERY_WRITE;
+ if (!PQsendQuery(conn->db, sql->data))
+ conn_error(func, conn, "PQsendQuery");
+ pfree(sql->data);
+ pfree(sql);
+
+ flush_connection(func, conn);
+ return 1;
+ }
+
+ conn->tuning = 0;
+ return 0;
+ }
+
+ /* send the query to server connection */
+ static void
+ send_query(ProxyFunction *func, ProxyConnection *conn,
+ const char **values, int *plengths, int *pformats)
+ {
+ int res;
+ struct timeval now;
+ ProxyQuery *q = func->remote_sql;
+ int binary_result = 0;
+
+ gettimeofday(&now, NULL);
+ conn->query_time = now.tv_sec;
+
+ tune_connection(func, conn);
+ if (conn->tuning)
+ return;
+
+ /* use binary result only on same backend ver */
+ if (PLPROXY_USE_BINARY && conn->same_ver)
+ {
+ /* binary recv for non-record types */
+ if (func->ret_scalar)
+ {
+ if (func->ret_scalar->has_recv)
+ binary_result = 1;
+ }
+ else
+ {
+ if (func->ret_composite->use_binary)
+ binary_result = 1;
+ }
+ }
+
+ /* send query */
+ conn->state = C_QUERY_WRITE;
+ res = PQsendQueryParams(conn->db, q->sql, q->arg_count,
+ NULL, /* paramTypes */
+ values, /* paramValues */
+ plengths, /* paramLengths */
+ pformats, /* paramFormats */
+ binary_result); /* resultformat, 0-text, 1-bin */
+ if (!res)
+ conn_error(func, conn, "PQsendQueryParams");
+
+ /* flush it down */
+ flush_connection(func, conn);
+ }
+
+ /* returns false of conn should be dropped */
+ static bool
+ check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now)
+ {
+ time_t t;
+ int res;
+ struct pollfd pfd;
+
+ if (PQstatus(conn->db) != CONNECTION_OK)
+ return false;
+
+ /* check if too old */
+ if (PLPROXY_CONN_LIFETIME > 0)
+ {
+ t = now->tv_sec - conn->connect_time;
+ if (t >= PLPROXY_CONN_LIFETIME)
+ return false;
+ }
+
+ /* how long ts been idle */
+ t = now->tv_sec - conn->query_time;
+ if (t < PLPROXY_IDLE_CONN_CHECK)
+ return true;
+
+ /*
+ * Simple way to check if old connection is stable - look if there
+ * are events pending. If there are drop the connection.
+ */
+ intr_loop:
+ pfd.fd = PQsocket(conn->db);
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+
+ res = poll(&pfd, 1, 0);
+ if (res > 0)
+ {
+ elog(WARNING, "PL/Proxy: detected unstable connection");
+ return false;
+ }
+ else if (res < 0)
+ {
+ if (errno == EINTR)
+ goto intr_loop;
+ plproxy_error(func, "check_old_conn: select failed: %s",
+ strerror(errno));
+ }
+
+ /* seems ok */
+ return true;
+ }
+
+ /* check existing conn status or launch new conn */
+ static void
+ prepare_conn(ProxyFunction *func, ProxyConnection *conn)
+ {
+ struct timeval now;
+
+ gettimeofday(&now, NULL);
+
+ /* state should be C_READY or C_NONE */
+ switch (conn->state)
+ {
+ case C_DONE:
+ conn->state = C_READY;
+ case C_READY:
+ if (check_old_conn(func, conn, &now))
+ return;
+
+ case C_CONNECT_READ:
+ case C_CONNECT_WRITE:
+ case C_QUERY_READ:
+ case C_QUERY_WRITE:
+ /* close rotten connection */
+ elog(NOTICE, "PL/Proxy: dropping stale conn");
+ PQfinish(conn->db);
+ conn->db = NULL;
+ conn->state = C_NONE;
+ case C_NONE:
+ break;
+ }
+
+ conn->connect_time = now.tv_sec;
+
+ /* launch new connection */
+ conn->db = PQconnectStart(conn->connstr);
+ if (conn->db == NULL)
+ plproxy_error(func, "No memory for PGconn");
+
+ /* tag connection dirty */
+ conn->state = C_CONNECT_WRITE;
+
+ if (PQstatus(conn->db) == CONNECTION_BAD)
+ conn_error(func, conn, "PQconnectStart");
+ }
+
+ /*
+ * Connection has a resultset avalable, fetch it.
+ *
+ * Returns true if there may be more results coming,
+ * false if all done.
+ */
+ static bool
+ another_result(ProxyFunction *func, ProxyConnection *conn)
+ {
+ PGresult *res;
+
+ /* got one */
+ res = PQgetResult(conn->db);
+ if (res == NULL)
+ {
+ if (conn->tuning)
+ conn->state = C_READY;
+ else
+ conn->state = C_DONE;
+ return false;
+ }
+
+ switch (PQresultStatus(res))
+ {
+ case PGRES_TUPLES_OK:
+ if (conn->res)
+ conn_error(func, conn, "double result?");
+ conn->res = res;
+ break;
+ case PGRES_COMMAND_OK:
+ PQclear(res);
+ break;
+ default:
+ PQclear(res);
+ conn_error(func, conn, "remote error");
+ }
+ return true;
+ }
+
+ /*
+ * Called when select() told that conn is avail for reading/writing.
+ *
+ * It should call postgres handlers and then change state if needed.
+ */
+ static void
+ handle_conn(ProxyFunction *func, ProxyConnection *conn)
+ {
+ int res;
+ PostgresPollingStatusType poll_res;
+
+ switch (conn->state)
+ {
+ case C_CONNECT_READ:
+ case C_CONNECT_WRITE:
+ poll_res = PQconnectPoll(conn->db);
+ switch (poll_res)
+ {
+ case PGRES_POLLING_WRITING:
+ conn->state = C_CONNECT_WRITE;
+ break;
+ case PGRES_POLLING_READING:
+ conn->state = C_CONNECT_READ;
+ break;
+ case PGRES_POLLING_OK:
+ conn->state = C_READY;
+ break;
+ case PGRES_POLLING_ACTIVE:
+ case PGRES_POLLING_FAILED:
+ conn_error(func, conn, "PQconnectPoll");
+ }
+ break;
+ case C_QUERY_WRITE:
+ flush_connection(func, conn);
+ break;
+ case C_QUERY_READ:
+ res = PQconsumeInput(conn->db);
+ if (res == 0)
+ conn_error(func, conn, "PQconsumeInput");
+
+ /* loop until PQgetResult returns NULL */
+ while (1)
+ {
+ /* if PQisBusy, then incomplete result */
+ if (PQisBusy(conn->db))
+ break;
+
+ /* got one */
+ if (!another_result(func, conn))
+ break;
+ }
+ case C_NONE:
+ case C_DONE:
+ case C_READY:
+ break;
+ }
+ }
+
+ /*
+ * Check if tagged connections have interesting events.
+ *
+ * Currenly uses select() as it should be enough
+ * on small number of sockets.
+ */
+ static int
+ poll_conns(ProxyFunction *func, ProxyCluster *cluster)
+ {
+ static struct pollfd *pfd_cache = NULL;
+ static int pfd_allocated = 0;
+
+ int i,
+ res,
+ fd;
+ ProxyConnection *conn;
+ struct pollfd *pf;
+ int numfds = 0;
+ int ev = 0;
+
+ if (pfd_allocated < cluster->conn_count)
+ {
+ struct pollfd *tmp;
+ int num = cluster->conn_count;
+ if (num < 64)
+ num = 64;
+ if (pfd_cache == NULL)
+ tmp = malloc(num * sizeof(struct pollfd));
+ else
+ tmp = realloc(pfd_cache, num * sizeof(struct pollfd));
+ if (!tmp)
+ elog(ERROR, "no mem for pollfd cache");
+ pfd_cache = tmp;
+ pfd_allocated = num;
+ }
+
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (!conn->run_on)
+ continue;
+
+ /* decide what to do */
+ switch (conn->state)
+ {
+ case C_DONE:
+ case C_READY:
+ case C_NONE:
+ continue;
+ case C_CONNECT_READ:
+ case C_QUERY_READ:
+ ev = POLLIN;
+ break;
+ case C_CONNECT_WRITE:
+ case C_QUERY_WRITE:
+ ev = POLLOUT;
+ break;
+ }
+
+ /* add fd to proper set */
+ pf = pfd_cache + numfds++;
+ pf->fd = PQsocket(conn->db);
+ pf->events = ev;
+ pf->revents = 0;
+ }
+
+ /* wait for events */
+ res = poll(pfd_cache, numfds, 1000);
+ if (res == 0)
+ return 0;
+ if (res < 0)
+ {
+ if (errno == EINTR)
+ return 0;
+ plproxy_error(func, "poll() failed: %s", strerror(errno));
+ }
+
+ /* now recheck the conns */
+ pf = pfd_cache;
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (!conn->run_on)
+ continue;
+
+ switch (conn->state)
+ {
+ case C_DONE:
+ case C_READY:
+ case C_NONE:
+ continue;
+ case C_CONNECT_READ:
+ case C_QUERY_READ:
+ case C_CONNECT_WRITE:
+ case C_QUERY_WRITE:
+ break;
+ }
+
+ /*
+ * they should be in same order as called,
+ */
+ fd = PQsocket(conn->db);
+ if (pf->fd != fd)
+ elog(WARNING, "fd order from poll() is messed up?");
+
+ if (pf->revents)
+ handle_conn(func, conn);
+
+ pf++;
+ }
+ return 1;
+ }
+
+ /* Run the query on all tagged connections in parallel */
+ static void
+ remote_execute(ProxyFunction *func,
+ const char **values, int *plengths, int *pformats)
+ {
+ ExecStatusType err;
+ ProxyConnection *conn;
+ ProxyCluster *cluster = func->cur_cluster;
+ int i,
+ pending;
+ struct timeval now;
+
+ /* either launch connection or send query */
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (!conn->run_on)
+ continue;
+
+ /* check if conn is alive, and launch if not */
+ prepare_conn(func, conn);
+
+ /* if conn is ready, then send query away */
+ if (conn->state == C_READY)
+ send_query(func, conn, values, plengths, pformats);
+ }
+
+ /* now loop until all results are arrived */
+ pending = 1;
+ while (pending)
+ {
+ /* allow postgres to cancel processing */
+ CHECK_FOR_INTERRUPTS();
+
+ /* wait for events */
+ if (poll_conns(func, cluster) == 0)
+ continue;
+
+ /* recheck */
+ pending = 0;
+ gettimeofday(&now, NULL);
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (!conn->run_on)
+ continue;
+
+ /* login finished, send query */
+ if (conn->state == C_READY)
+ send_query(func, conn, values, plengths, pformats);
+
+ if (conn->state != C_DONE)
+ pending++;
+ }
+ }
+
+ /* review results, calculate total */
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+
+ if ((conn->run_on || conn->res)
+ && !(conn->run_on && conn->res))
+ plproxy_error(func, "run_on does not match res");
+
+ if (!conn->run_on)
+ continue;
+
+ if (conn->state != C_DONE)
+ plproxy_error(func, "Unfinished connection");
+ if (conn->res == NULL)
+ plproxy_error(func, "Lost result");
+
+ err = PQresultStatus(conn->res);
+ if (err != PGRES_TUPLES_OK)
+ plproxy_error(func, "Remote error: %s",
+ PQresultErrorMessage(conn->res));
+
+ cluster->ret_total += PQntuples(conn->res);
+ }
+ }
+
+ /* Run hash function and tag connections */
+ static void
+ tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo)
+ {
+ int i;
+ TupleDesc desc;
+ Oid htype;
+ ProxyCluster *cluster = func->cur_cluster;
+
+ /* execute cached plan */
+ plproxy_query_exec(func, fcinfo, func->hash_sql);
+
+ /* get header */
+ desc = SPI_tuptable->tupdesc;
+ htype = SPI_gettypeid(desc, 1);
+
+ /* tag connections */
+ for (i = 0; i < SPI_processed; i++)
+ {
+ bool isnull;
+ uint32 hashval = 0;
+ HeapTuple row = SPI_tuptable->vals[i];
+ Datum val = SPI_getbinval(row, desc, 1, &isnull);
+
+ if (isnull)
+ plproxy_error(func, "Hash function returned NULL");
+
+ if (htype == INT4OID)
+ hashval = DatumGetInt32(val);
+ else if (htype == INT8OID)
+ hashval = DatumGetInt64(val);
+ else if (htype == INT2OID)
+ hashval = DatumGetInt16(val);
+ else
+ plproxy_error(func, "Hash result must be int2, int4 or int8");
+
+ hashval &= cluster->part_mask;
+ cluster->part_map[hashval]->run_on = 1;
+ }
+
+ /* sanity check */
+ if (SPI_processed == 0 || SPI_processed > 1)
+ if (!fcinfo->flinfo->fn_retset)
+ plproxy_error(func, "Only set-returning function"
+ " allows hashcount <> 1");
+ }
+
+ /* Clean old results and prepare for new one */
+ void
+ plproxy_clean_results(ProxyCluster *cluster)
+ {
+ int i;
+ ProxyConnection *conn;
+
+ if (!cluster)
+ return;
+
+ cluster->ret_total = 0;
+ cluster->ret_cur_conn = 0;
+
+ for (i = 0; i < cluster->conn_count; i++)
+ {
+ conn = &cluster->conn_list[i];
+ if (conn->res)
+ {
+ PQclear(conn->res);
+ conn->res = NULL;
+ }
+ conn->pos = 0;
+ conn->run_on = 0;
+ }
+ /* conn state checks are done in prepare_conn */
+ }
+
+ /* Select partitions and execute query on them */
+ void
+ plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo)
+ {
+ const char *values[FUNC_MAX_ARGS];
+ int plengths[FUNC_MAX_ARGS];
+ int pformats[FUNC_MAX_ARGS];
+ int i;
+ int gotbin;
+ ProxyCluster *cluster = func->cur_cluster;
+
+ /* clean old results */
+ plproxy_clean_results(cluster);
+
+ /* tag interesting partitions */
+ switch (func->run_type)
+ {
+ case R_HASH:
+ tag_hash_partitions(func, fcinfo);
+ break;
+ case R_ALL:
+ for (i = 0; i < cluster->part_count; i++)
+ cluster->part_map[i]->run_on = 1;
+ break;
+ case R_EXACT:
+ i = func->exact_nr;
+ if (i < 0 || i >= cluster->part_count)
+ plproxy_error(func, "part number out of range");
+ cluster->part_map[i]->run_on = 1;
+ break;
+ case R_ANY:
+ i = random() & cluster->part_mask;
+ cluster->part_map[i]->run_on = 1;
+ break;
+ default:
+ plproxy_error(func, "uninitialized run_type");
+ }
+
+ /* prepare args */
+ gotbin = 0;
+ for (i = 0; i < func->remote_sql->arg_count; i++)
+ {
+ plengths[i] = 0;
+ pformats[i] = 0;
+ if (PG_ARGISNULL(i))
+ {
+ values[i] = NULL;
+ }
+ else
+ {
+ int idx = func->remote_sql->arg_lookup[i];
+ bool bin = PLPROXY_USE_BINARY;
+
+ values[i] = plproxy_send_type(func->arg_types[idx],
+ PG_GETARG_DATUM(idx),
+ bin,
+ &plengths[i],
+ &pformats[i]);
+
+ if (pformats[i])
+ gotbin = 1;
+ }
+ }
+
+ if (gotbin)
+ remote_execute(func, values, plengths, pformats);
+ else
+ remote_execute(func, values, NULL, NULL);
+ }
*** a/src/pl/plproxy/expected/plproxy_clustermap.out
--- b/src/pl/plproxy/expected/plproxy_clustermap.out
***************
*** 0 ****
--- 1,71 ----
+ create or replace function plproxy.get_cluster_version(cluster_name text)
+ returns integer as $$
+ begin
+ if cluster_name = 'testcluster' then
+ return 6;
+ elsif cluster_name = 'map0' then
+ return 1;
+ elsif cluster_name = 'map1' then
+ return 1;
+ elsif cluster_name = 'map2' then
+ return 1;
+ elsif cluster_name = 'map3' then
+ return 1;
+ end if;
+ raise exception 'no such cluster: %', cluster_name;
+ end; $$ language plpgsql;
+ create or replace function plproxy.get_cluster_partitions(cluster_name text)
+ returns setof text as $$
+ begin
+ if cluster_name = 'testcluster' then
+ return next 'host=127.0.0.1 dbname=test_part0';
+ return next 'host=127.0.0.1 dbname=test_part1';
+ return next 'host=127.0.0.1 dbname=test_part2';
+ return next 'host=127.0.0.1 dbname=test_part3';
+ elsif cluster_name = 'map0' then
+ return next 'host=127.0.0.1 dbname=test_part0';
+ elsif cluster_name = 'map1' then
+ return next 'host=127.0.0.1 dbname=test_part1';
+ elsif cluster_name = 'map2' then
+ return next 'host=127.0.0.1 dbname=test_part2';
+ elsif cluster_name = 'map3' then
+ return next 'host=127.0.0.1 dbname=test_part3';
+ else
+ raise exception 'no such cluster: %', cluster_name;
+ end if;
+ return;
+ end; $$ language plpgsql;
+ create function map_cluster(part integer) returns text as $$
+ begin
+ return 'map' || part;
+ end;
+ $$ language plpgsql;
+ create function test_clustermap(part integer) returns setof text as $$
+ cluster map_cluster(part);
+ run on 0;
+ select current_database();
+ $$ language plproxy;
+ select * from test_clustermap(0);
+ test_clustermap
+ -----------------
+ test_part0
+ (1 row)
+
+ select * from test_clustermap(1);
+ test_clustermap
+ -----------------
+ test_part1
+ (1 row)
+
+ select * from test_clustermap(2);
+ test_clustermap
+ -----------------
+ test_part2
+ (1 row)
+
+ select * from test_clustermap(3);
+ test_clustermap
+ -----------------
+ test_part3
+ (1 row)
+
*** a/src/pl/plproxy/expected/plproxy_dynamic_record.out
--- b/src/pl/plproxy/expected/plproxy_dynamic_record.out
***************
*** 0 ****
--- 1,51 ----
+ -- dynamic query support testing
+ create or replace function dynamic_query(q text)
+ returns setof record as $x$
+ cluster 'map0';
+ run on all;
+ $x$ language plproxy;
+ \c test_part0
+ create or replace function dynamic_query(q text)
+ returns setof record as $x$
+ declare
+ ret record;
+ begin
+ for ret in execute q loop
+ return next ret;
+ end loop;
+ return;
+ end;
+ $x$ language plpgsql;
+ create table dynamic_query_test (
+ id integer,
+ username text,
+ other text
+ );
+ insert into dynamic_query_test values ( 1, 'user1', 'blah');
+ insert into dynamic_query_test values ( 2, 'user2', 'foo');
+ \c regression
+ select * from dynamic_query('select * from dynamic_query_test') as (id integer, username text, other text);
+ id | username | other
+ ----+----------+-------
+ 1 | user1 | blah
+ 2 | user2 | foo
+ (2 rows)
+
+ select * from dynamic_query('select id, username from dynamic_query_test') as foo(id integer, username text);
+ id | username
+ ----+----------
+ 1 | user1
+ 2 | user2
+ (2 rows)
+
+ -- test errors
+ select * from dynamic_query('select * from dynamic_query_test');
+ ERROR: a column definition list is required for functions returning "record"
+ create or replace function dynamic_query_select()
+ returns setof record as $x$
+ cluster 'map0';
+ run on all;
+ select id, username from dynamic_query_test;
+ $x$ language plproxy;
+ select * from dynamic_query_select() as (id integer, username text);
+ ERROR: PL/Proxy function public.dynamic_query_select(0): SELECT statement not allowed for dynamic RECORD functions
*** a/src/pl/plproxy/expected/plproxy_errors.out
--- b/src/pl/plproxy/expected/plproxy_errors.out
***************
*** 0 ****
--- 1,66 ----
+ -- test bad arg
+ create function test_err1(dat text)
+ returns text as $$
+ cluster 'testcluster';
+ run on hashtext(username);
+ $$ language plproxy;
+ select * from test_err1('dat');
+ ERROR: column "username" does not exist
+ LINE 1: select * from hashtext(username)
+ ^
+ QUERY: select * from hashtext(username)
+ create function test_err2(dat text)
+ returns text as $$
+ cluster 'testcluster';
+ run on hashtext($2);
+ $$ language plproxy;
+ select * from test_err2('dat');
+ ERROR: PL/Proxy function public.test_err2(1): Compile error at line 3: invalid argument reference: $2
+ create function test_err3(dat text)
+ returns text as $$
+ cluster 'nonexists';
+ run on hashtext($1);
+ $$ language plproxy;
+ select * from test_err3('dat');
+ ERROR: no such cluster: nonexists
+ CONTEXT: SQL statement "select * from plproxy.get_cluster_version($1)"
+ -- should work
+ create function test_err_none(dat text)
+ returns text as $$
+ cluster 'testcluster';
+ run on hashtext($1);
+ select 'ok';
+ $$ language plproxy;
+ select * from test_err_none('dat');
+ test_err_none
+ ---------------
+ ok
+ (1 row)
+
+ --- result map errors
+ create function test_map_err1(dat text)
+ returns text as $$ cluster 'testcluster'; run on 0;
+ select dat as "foo", 'asd' as "bar";
+ $$ language plproxy;
+ select * from test_map_err1('dat');
+ ERROR: PL/Proxy function public.test_map_err1(1): single field function but got record
+ create function test_map_err2(dat text, out res1 text, out res2 text)
+ returns record as $$ cluster 'testcluster'; run on 0;
+ select dat as res1;
+ $$ language plproxy;
+ select * from test_map_err2('dat');
+ ERROR: PL/Proxy function public.test_map_err2(1): Got too few fields from remote end
+ create function test_map_err3(dat text, out res1 text, out res2 text)
+ returns record as $$ cluster 'testcluster'; run on 0;
+ select dat as res1, 'foo' as res_none;
+ $$ language plproxy;
+ select * from test_map_err3('dat');
+ ERROR: PL/Proxy function public.test_map_err3(1): Field res2 does not exists in result
+ create function test_map_err4(dat text, out res1 text, out res2 text)
+ returns record as $$
+ --cluster 'testcluster';
+ run on hashtext(dat);
+ select dat as res2, 'foo' as res1;
+ $$ language plproxy;
+ select * from test_map_err4('dat');
+ ERROR: PL/Proxy function public.test_map_err4(1): Compile error at line 5: CLUSTER statement missing
*** a/src/pl/plproxy/expected/plproxy_init.out
--- b/src/pl/plproxy/expected/plproxy_init.out
***************
*** 0 ****
--- 1,2 ----
+ CREATE LANGUAGE plproxy;
+ \set ECHO none
*** a/src/pl/plproxy/expected/plproxy_many.out
--- b/src/pl/plproxy/expected/plproxy_many.out
***************
*** 0 ****
--- 1,116 ----
+ create or replace function plproxy.get_cluster_version(cluster_name text)
+ returns integer as $$
+ begin
+ if cluster_name = 'testcluster' then
+ return 6;
+ end if;
+ raise exception 'no such cluster: %', cluster_name;
+ end; $$ language plpgsql;
+ create or replace function plproxy.get_cluster_partitions(cluster_name text)
+ returns setof text as $$
+ begin
+ if cluster_name = 'testcluster' then
+ return next 'host=127.0.0.1 dbname=test_part0';
+ return next 'host=127.0.0.1 dbname=test_part1';
+ return next 'host=127.0.0.1 dbname=test_part2';
+ return next 'host=127.0.0.1 dbname=test_part3';
+ return;
+ end if;
+ raise exception 'no such cluster: %', cluster_name;
+ end; $$ language plpgsql;
+ \c test_part0
+ create function test_multi(part integer, username text)
+ returns integer as $$ begin return 0; end; $$ language plpgsql;
+ \c test_part1
+ create function test_multi(part integer, username text)
+ returns integer as $$ begin return 1; end; $$ language plpgsql;
+ \c test_part2
+ create function test_multi(part integer, username text)
+ returns integer as $$ begin return 2; end; $$ language plpgsql;
+ \c test_part3
+ create function test_multi(part integer, username text)
+ returns integer as $$ begin return 3; end; $$ language plpgsql;
+ \c regression
+ create function test_multi(part integer, username text)
+ returns integer as $$ cluster 'testcluster'; run on int4(part); $$ language plproxy;
+ select test_multi(0, 'foo');
+ test_multi
+ ------------
+ 0
+ (1 row)
+
+ select test_multi(1, 'foo');
+ test_multi
+ ------------
+ 1
+ (1 row)
+
+ select test_multi(2, 'foo');
+ test_multi
+ ------------
+ 2
+ (1 row)
+
+ select test_multi(3, 'foo');
+ test_multi
+ ------------
+ 3
+ (1 row)
+
+ -- test RUN ON ALL
+ drop function test_multi(integer, text);
+ create function test_multi(part integer, username text)
+ returns setof integer as $$ cluster 'testcluster'; run on all; $$ language plproxy;
+ select test_multi(0, 'foo');
+ test_multi
+ ------------
+ 0
+ 1
+ 2
+ 3
+ (4 rows)
+
+ -- test RUN ON 2
+ drop function test_multi(integer, text);
+ create function test_multi(part integer, username text)
+ returns setof integer as $$ cluster 'testcluster'; run on 2; $$ language plproxy;
+ select test_multi(0, 'foo');
+ test_multi
+ ------------
+ 2
+ (1 row)
+
+ -- test RUN ON RANDOM
+ select setseed(0);
+ setseed
+ ---------
+
+ (1 row)
+
+ drop function test_multi(integer, text);
+ create function test_multi(part integer, username text)
+ returns setof integer as $$ cluster 'testcluster'; run on any; $$ language plproxy;
+ select test_multi(0, 'foo');
+ test_multi
+ ------------
+ 3
+ (1 row)
+
+ select test_multi(0, 'foo');
+ test_multi
+ ------------
+ 2
+ (1 row)
+
+ select test_multi(0, 'foo');
+ test_multi
+ ------------
+ 1
+ (1 row)
+
+ select test_multi(0, 'foo');
+ test_multi
+ ------------
+ 3
+ (1 row)
+
*** a/src/pl/plproxy/expected/plproxy_select.out
--- b/src/pl/plproxy/expected/plproxy_select.out
***************
*** 0 ****
--- 1,37 ----
+ -- test regular sql
+ create function test_select(xuser text, tmp boolean)
+ returns integer as $x$
+ cluster 'testcluster';
+ run on hashtext(xuser);
+ select /*********
+ junk ;
+ ********** ****/ id from sel_test where username = xuser
+ and ';' <> 'as;d''a ; sd'
+ and $tmp$ ; 'a' $tmp$ <> 'as;d''a ; sd'
+ and $tmp$ $ $$ $foo$tmp$ <> 'x';
+ $x$ language plproxy;
+ \c test_part
+ create table sel_test (
+ id integer,
+ username text
+ );
+ insert into sel_test values ( 1, 'user');
+ \c regression
+ select * from test_select('user', true);
+ test_select
+ -------------
+ 1
+ (1 row)
+
+ select * from test_select('xuser', false);
+ ERROR: PL/Proxy function public.test_select(2): bug: no result
+ -- test errors
+ create function test_select_err(xuser text, tmp boolean)
+ returns integer as $$
+ cluster 'testcluster';
+ run on hashtext(xuser);
+ select id from sel_test where username = xuser;
+ select id from sel_test where username = xuser;
+ $$ language plproxy;
+ select * from test_select_err('user', true);
+ ERROR: PL/Proxy function public.test_select_err(2): Compile error at line 5: Only one SELECT statement allowed
*** a/src/pl/plproxy/expected/plproxy_test.out
--- b/src/pl/plproxy/expected/plproxy_test.out
***************
*** 0 ****
--- 1,312 ----
+ -- test normal function
+ create function testfunc(username text, id integer, data text)
+ returns text as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function testfunc(username text, id integer, data text)
+ returns text as $$ begin return 'username=' || username; end; $$ language plpgsql;
+ \c regression
+ select * from testfunc('user', 1, 'foo');
+ testfunc
+ ---------------
+ username=user
+ (1 row)
+
+ select * from testfunc('user', 1, 'foo');
+ testfunc
+ ---------------
+ username=user
+ (1 row)
+
+ select * from testfunc('user', 1, 'foo');
+ testfunc
+ ---------------
+ username=user
+ (1 row)
+
+ -- test setof text
+ create function test_set(username text, num integer)
+ returns setof text as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_set(username text, num integer)
+ returns setof text as $$
+ declare i integer;
+ begin
+ i := 0;
+ while i < num loop
+ return next 'username=' || username || ' row=' || i;
+ i := i + 1;
+ end loop;
+ return;
+ end; $$ language plpgsql;
+ \c regression
+ select * from test_set('user', 1);
+ test_set
+ ---------------------
+ username=user row=0
+ (1 row)
+
+ select * from test_set('user', 0);
+ test_set
+ ----------
+ (0 rows)
+
+ select * from test_set('user', 3);
+ test_set
+ ---------------------
+ username=user row=0
+ username=user row=1
+ username=user row=2
+ (3 rows)
+
+ -- test record
+ create type ret_test_rec as ( id integer, dat text);
+ create function test_record(username text, num integer)
+ returns ret_test_rec as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create type ret_test_rec as ( id integer, dat text);
+ create function test_record(username text, num integer)
+ returns ret_test_rec as $$
+ declare ret ret_test_rec%rowtype;
+ begin
+ ret := (num, username);
+ return ret;
+ end; $$ language plpgsql;
+ \c regression
+ select * from test_record('user', 3);
+ id | dat
+ ----+------
+ 3 | user
+ (1 row)
+
+ -- test setof record
+ create function test_record_set(username text, num integer)
+ returns setof ret_test_rec as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_record_set(username text, num integer)
+ returns setof ret_test_rec as $$
+ declare ret ret_test_rec%rowtype; i integer;
+ begin
+ i := 0;
+ while i < num loop
+ ret := (i, username);
+ i := i + 1;
+ return next ret;
+ end loop;
+ return;
+ end; $$ language plpgsql;
+ \c regression
+ select * from test_record_set('user', 1);
+ id | dat
+ ----+------
+ 0 | user
+ (1 row)
+
+ select * from test_record_set('user', 0);
+ id | dat
+ ----+-----
+ (0 rows)
+
+ select * from test_record_set('user', 3);
+ id | dat
+ ----+------
+ 0 | user
+ 1 | user
+ 2 | user
+ (3 rows)
+
+ -- test void
+ create function test_void(username text, num integer)
+ returns void as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_void(username text, num integer)
+ returns void as $$
+ begin
+ return;
+ end; $$ language plpgsql;
+ -- look what void actually looks
+ select * from test_void('void', 2);
+ test_void
+ -----------
+
+ (1 row)
+
+ select test_void('void', 2);
+ test_void
+ -----------
+
+ (1 row)
+
+ \c regression
+ select * from test_void('user', 1);
+ test_void
+ -----------
+
+ (1 row)
+
+ select * from test_void('user', 3);
+ test_void
+ -----------
+
+ (1 row)
+
+ select test_void('user', 3);
+ test_void
+ -----------
+
+ (1 row)
+
+ select test_void('user', 3);
+ test_void
+ -----------
+
+ (1 row)
+
+ -- test normal outargs
+ create function test_out1(username text, id integer, out data text)
+ as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_out1(username text, id integer, out data text)
+ returns text as $$ begin data := 'username=' || username; return; end; $$ language plpgsql;
+ \c regression
+ select * from test_out1('user', 1);
+ data
+ ---------------
+ username=user
+ (1 row)
+
+ -- test complicated outargs
+ create function test_out2(username text, id integer, out out_id integer, xdata text, inout xdata2 text, out odata text)
+ as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_out2(username text, id integer, out out_id integer, xdata text, inout xdata2 text, out odata text)
+ as $$ begin
+ out_id = id;
+ xdata2 := xdata2 || xdata;
+ odata := 'username=' || username;
+ return;
+ end; $$ language plpgsql;
+ \c regression
+ select * from test_out2('user', 1, 'xdata', 'xdata2');
+ out_id | xdata2 | odata
+ --------+-------------+---------------
+ 1 | xdata2xdata | username=user
+ (1 row)
+
+ -- test various types
+ create function test_types(username text, inout vbool boolean, inout xdate timestamp, inout bin bytea)
+ as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_types(username text, inout vbool boolean, inout xdate timestamp, inout bin bytea)
+ as $$ begin return; end; $$ language plpgsql;
+ \c regression
+ select * from test_types('types', true, '2009-11-04 12:12:02', E'a\\000\\001\\002b');
+ vbool | xdate | bin
+ -------+--------------------------+----------------
+ t | Wed Nov 04 12:12:02 2009 | a\000\001\002b
+ (1 row)
+
+ select * from test_types('types', NULL, NULL, NULL);
+ vbool | xdate | bin
+ -------+-------+-----
+ | |
+ (1 row)
+
+ -- test user defined types
+ create domain posint as int4 check (value > 0);
+ create type struct as (id int4, data text);
+ create function test_types2(username text, inout v_posint posint, inout v_struct struct, inout arr int8[])
+ as $$ cluster 'testcluster'; run on 0; $$ language plproxy;
+ \c test_part
+ create domain posint as int4 check (value > 0);
+ create type struct as (id int4, data text);
+ create function test_types2(username text, inout v_posint posint, inout v_struct struct, inout arr int8[])
+ as $$ begin return; end; $$ language plpgsql;
+ \c regression
+ select * from test_types2('types', 4, (2, 'asd'), array[1,2,3]);
+ v_posint | v_struct | arr
+ ----------+----------+---------
+ 4 | (2,asd) | {1,2,3}
+ (1 row)
+
+ select * from test_types2('types', NULL, NULL, NULL);
+ v_posint | v_struct | arr
+ ----------+----------+-----
+ | (,) |
+ (1 row)
+
+ -- test CONNECT
+ create function test_connect1() returns text
+ as $$ connect 'dbname=test_part'; select current_database(); $$ language plproxy;
+ select * from test_connect1();
+ test_connect1
+ ---------------
+ test_part
+ (1 row)
+
+ -- test quoting function
+ create type "RetWeird" as (
+ "ColId" int4,
+ "ColData" text
+ );
+ create function "testQuoting"(username text, id integer, data text)
+ returns "RetWeird" as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create type "RetWeird" as (
+ "ColId" int4,
+ "ColData" text
+ );
+ create function "testQuoting"(username text, id integer, data text)
+ returns "RetWeird" as $$ select 1::int4, 'BazOoka'::text $$ language sql;
+ \c regression
+ select * from "testQuoting"('user', '1', 'dat');
+ ColId | ColData
+ -------+---------
+ 1 | BazOoka
+ (1 row)
+
+ -- test hash types function
+ create or replace function t_hash16(int4) returns int2 as $$
+ declare
+ res int2;
+ begin
+ res = $1::int2;
+ return res;
+ end;
+ $$ language plpgsql;
+ create or replace function t_hash64(int4) returns int8 as $$
+ declare
+ res int8;
+ begin
+ res = $1;
+ return res;
+ end;
+ $$ language plpgsql;
+ create function test_hash16(id integer, data text)
+ returns text as $$ cluster 'testcluster'; run on t_hash16(id); select data; $$ language plproxy;
+ select * from test_hash16('0', 'hash16');
+ test_hash16
+ -------------
+ hash16
+ (1 row)
+
+ create function test_hash64(id integer, data text)
+ returns text as $$ cluster 'testcluster'; run on t_hash64(id); select data; $$ language plproxy;
+ select * from test_hash64('0', 'hash64');
+ test_hash64
+ -------------
+ hash64
+ (1 row)
+
+ -- test argument difference
+ \c test_part
+ create function test_difftypes(username text, out val1 int2, out val2 float8)
+ as $$ begin val1 = 1; val2 = 3;return; end; $$ language plpgsql;
+ \c regression
+ create function test_difftypes(username text, out val1 int4, out val2 float4)
+ as $$ cluster 'testcluster'; run on 0; $$ language plproxy;
+ select * from test_difftypes('types');
+ val1 | val2
+ ------+------
+ 1 | 3
+ (1 row)
+
*** a/src/pl/plproxy/function.c
--- b/src/pl/plproxy/function.c
***************
*** 0 ****
--- 1,479 ----
+ /*
+ * PL/Proxy - easy access to partitioned database.
+ *
+ * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+ /*
+ * Function compilation and caching.
+ *
+ * Functions here are called with CurrentMemoryContext == SPP Proc context.
+ * They switch to per-function context only during allocations.
+ */
+
+ #include "plproxy.h"
+
+
+ /*
+ * Function cache entry.
+ *
+ * As PL/Proxy does not do trigger functions,
+ * its enough to index just on OID.
+ *
+ * This structure is kept in HTAB's context.
+ */
+ typedef struct
+ {
+ /* Key value. Must be at the start */
+ Oid oid;
+ /* Pointer to function data */
+ ProxyFunction *function;
+ } HashEntry;
+
+ /* Function cache */
+ static HTAB *fn_cache = NULL;
+
+ /*
+ * During compilation function is linked here.
+ *
+ * This avoids memleaks when throwing errors.
+ */
+ static ProxyFunction *partial_func = NULL;
+
+
+
+ /* Allocate memory in the function's context */
+ void *
+ plproxy_func_alloc(ProxyFunction *func, int size)
+ {
+ return MemoryContextAlloc(func->ctx, size);
+ }
+
+ /* Allocate string in the function's context */
+ char *
+ plproxy_func_strdup(ProxyFunction *func, const char *s)
+ {
+ int len = strlen(s) + 1;
+ char *res = plproxy_func_alloc(func, len);
+
+ memcpy(res, s, len);
+ return res;
+ }
+
+
+ /* Initialize PL/Proxy function cache */
+ void
+ plproxy_function_cache_init(void)
+ {
+ HASHCTL ctl;
+ int flags;
+ int max_funcs = 128;
+
+ /* don't allow multiple initializations */
+ Assert(fn_cache == NULL);
+
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(Oid);
+ ctl.entrysize = sizeof(HashEntry);
+ ctl.hash = oid_hash;
+ flags = HASH_ELEM | HASH_FUNCTION;
+ fn_cache = hash_create("PL/Proxy function cache", max_funcs, &ctl, flags);
+ }
+
+
+ /* Search for function in cache */
+ static ProxyFunction *
+ fn_cache_lookup(Oid fn_oid)
+ {
+ HashEntry *hentry;
+
+ hentry = hash_search(fn_cache, &fn_oid, HASH_FIND, NULL);
+ if (hentry)
+ return hentry->function;
+ return NULL;
+ }
+
+
+ /* Insert function into cache */
+ static void
+ fn_cache_insert(ProxyFunction *func)
+ {
+ HashEntry *hentry;
+ bool found;
+
+ hentry = hash_search(fn_cache, &func->oid, HASH_ENTER, &found);
+ Assert(found == false);
+
+ hentry->function = func;
+ }
+
+
+ /* Delete function from cache */
+ static void
+ fn_cache_delete(ProxyFunction *func)
+ {
+ HashEntry *hentry;
+
+ hentry = hash_search(fn_cache, &func->oid, HASH_REMOVE, NULL);
+ Assert(hentry != NULL);
+ }
+
+ /* check if function returns untyped RECORD which needs the AS clause */
+ static bool
+ fn_returns_dynamic_record(HeapTuple proc_tuple)
+ {
+ Form_pg_proc proc_struct;
+ proc_struct = (Form_pg_proc) GETSTRUCT(proc_tuple);
+ if (proc_struct->prorettype == RECORDOID
+ && (heap_attisnull(proc_tuple, Anum_pg_proc_proargmodes)
+ || heap_attisnull(proc_tuple, Anum_pg_proc_proargnames)))
+ return true;
+ return false;
+ }
+
+ /*
+ * Allocate storage for function.
+ *
+ * Each functions has its own MemoryContext,
+ * where everything is allocated.
+ */
+ static ProxyFunction *
+ fn_new(FunctionCallInfo fcinfo, HeapTuple proc_tuple)
+ {
+ ProxyFunction *f;
+ MemoryContext f_ctx,
+ old_ctx;
+
+ f_ctx = AllocSetContextCreate(TopMemoryContext,
+ "PL/Proxy function context",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+
+ old_ctx = MemoryContextSwitchTo(f_ctx);
+
+ f = palloc0(sizeof(*f));
+ f->ctx = f_ctx;
+ f->oid = fcinfo->flinfo->fn_oid;
+ plproxy_set_stamp(&f->stamp, proc_tuple);
+
+ if (fn_returns_dynamic_record(proc_tuple))
+ f->dynamic_record = 1;
+
+ MemoryContextSwitchTo(old_ctx);
+
+ return f;
+ }
+
+
+ /*
+ * Delete function and release all associated storage
+ *
+ * Function is also deleted from cache.
+ */
+ static void
+ fn_delete(ProxyFunction *func, bool in_cache)
+ {
+ if (in_cache)
+ fn_cache_delete(func);
+
+ /* free cached plans */
+ plproxy_query_freeplan(func->hash_sql);
+ plproxy_query_freeplan(func->cluster_sql);
+
+ /* release function storage */
+ MemoryContextDelete(func->ctx);
+ }
+
+ /*
+ * Construct fully-qualified name for function.
+ */
+ static void
+ fn_set_name(ProxyFunction *func, HeapTuple proc_tuple)
+ {
+ /* 2 names, size can double, "" + . + "" + NUL */
+ char namebuf[NAMEDATALEN * 4 + 2 + 1 + 2 + 1];
+ Form_pg_proc proc_struct;
+ Form_pg_namespace ns_struct;
+ HeapTuple ns_tup;
+ Oid nsoid;
+
+ proc_struct = (Form_pg_proc) GETSTRUCT(proc_tuple);
+ nsoid = proc_struct->pronamespace;
+
+ ns_tup = SearchSysCache(NAMESPACEOID,
+ ObjectIdGetDatum(nsoid), 0, 0, 0);
+ if (!HeapTupleIsValid(ns_tup))
+ plproxy_error(func, "Cannot find namespace %u", nsoid);
+ ns_struct = (Form_pg_namespace) GETSTRUCT(ns_tup);
+
+ snprintf(namebuf, sizeof(namebuf), "%s.%s",
+ quote_identifier(NameStr(ns_struct->nspname)),
+ quote_identifier(NameStr(proc_struct->proname)));
+ func->name = plproxy_func_strdup(func, namebuf);
+
+ ReleaseSysCache(ns_tup);
+ }
+
+ /*
+ * Parse source.
+ *
+ * It just fetches source and calls actual parser.
+ */
+ static void
+ fn_parse(ProxyFunction *func, HeapTuple proc_tuple)
+ {
+ bool isnull;
+ Datum src_raw, src_detoast;
+ char *data;
+ int size;
+
+ src_raw = SysCacheGetAttr(PROCOID, proc_tuple, Anum_pg_proc_prosrc, &isnull);
+ if (isnull)
+ plproxy_error(func, "procedure source datum is null");
+
+ src_detoast = PointerGetDatum(PG_DETOAST_DATUM_PACKED(src_raw));
+ data = VARDATA_ANY(src_detoast);
+ size = VARSIZE_ANY_EXHDR(src_detoast);
+
+ plproxy_run_parser(func, data, size);
+
+ if (src_raw != src_detoast)
+ pfree(DatumGetPointer(src_detoast));
+ }
+
+ /*
+ * Get info about own arguments.
+ */
+ static void
+ fn_get_arguments(ProxyFunction *func,
+ FunctionCallInfo fcinfo,
+ HeapTuple proc_tuple)
+ {
+ Oid *types;
+ char **names,
+ *modes;
+ int i,
+ pos,
+ total;
+ ProxyType *type;
+
+ total = get_func_arg_info(proc_tuple, &types, &names, &modes);
+
+ func->arg_types = plproxy_func_alloc(func, sizeof(ProxyType *) * total);
+ func->arg_names = plproxy_func_alloc(func, sizeof(char *) * total);
+ func->arg_count = 0;
+
+ for (i = 0; i < total; i++)
+ {
+ if (modes && modes[i] == 'o')
+ continue;
+ type = plproxy_find_type_info(func, types[i], 1);
+ pos = func->arg_count++;
+ func->arg_types[pos] = type;
+ if (names && names[i])
+ func->arg_names[pos] = plproxy_func_strdup(func, names[i]);
+ else
+ func->arg_names[pos] = NULL;
+ }
+ }
+
+ /*
+ * Get info about return type.
+ *
+ * Fills one of ret_scalar or ret_composite.
+ */
+ static void
+ fn_get_return_type(ProxyFunction *func,
+ FunctionCallInfo fcinfo,
+ HeapTuple proc_tuple)
+ {
+ Oid ret_oid;
+ TupleDesc ret_tup;
+ TypeFuncClass rtc;
+ MemoryContext old_ctx;
+ int natts;
+
+
+ /*
+ * get_call_result_type() will return newly allocated tuple,
+ * except in case of untyped RECORD functions.
+ */
+ old_ctx = MemoryContextSwitchTo(func->ctx);
+ rtc = get_call_result_type(fcinfo, &ret_oid, &ret_tup);
+ if (func->dynamic_record && ret_tup)
+ ret_tup = CreateTupleDescCopy(ret_tup);
+ MemoryContextSwitchTo(old_ctx);
+
+ switch (rtc)
+ {
+ case TYPEFUNC_COMPOSITE:
+ func->ret_composite = plproxy_composite_info(func, ret_tup);
+ natts = func->ret_composite->tupdesc->natts;
+ func->result_map = plproxy_func_alloc(func, natts * sizeof(int));
+ break;
+ case TYPEFUNC_SCALAR:
+ func->ret_scalar = plproxy_find_type_info(func, ret_oid, 0);
+ func->result_map = NULL;
+ break;
+ case TYPEFUNC_RECORD:
+ case TYPEFUNC_OTHER:
+ /* fixme: void type here? */
+ plproxy_error(func, "unsupported type");
+ break;
+ }
+ }
+
+ /*
+ * Check if cached ->ret_composite is valid, refresh if needed.
+ */
+ static void
+ fn_refresh_record(FunctionCallInfo fcinfo,
+ ProxyFunction *func,
+ HeapTuple proc_tuple)
+ {
+
+ TypeFuncClass rtc;
+ TupleDesc tuple_current, tuple_cached;
+ MemoryContext old_ctx;
+ int natts;
+
+ /*
+ * Compare cached tuple to current one.
+ */
+ tuple_cached = func->ret_composite->tupdesc;
+ rtc = get_call_result_type(fcinfo, NULL, &tuple_current);
+ Assert(rtc == TYPEFUNC_COMPOSITE);
+ if (equalTupleDescs(tuple_current, tuple_cached))
+ return;
+
+ /* move to function context */
+ old_ctx = MemoryContextSwitchTo(func->ctx);
+ tuple_current = CreateTupleDescCopy(tuple_current);
+ MemoryContextSwitchTo(old_ctx);
+
+ /* release old data */
+ plproxy_free_composite(func->ret_composite);
+ pfree(func->result_map);
+ pfree(func->remote_sql);
+
+ /* construct new data */
+ func->ret_composite = plproxy_composite_info(func, tuple_current);
+ natts = func->ret_composite->tupdesc->natts;
+ func->result_map = plproxy_func_alloc(func, natts * sizeof(int));
+ func->remote_sql = plproxy_standard_query(func, true);
+ }
+
+ /* Show part of compilation -- get source and parse */
+ static ProxyFunction *
+ fn_compile(FunctionCallInfo fcinfo,
+ HeapTuple proc_tuple,
+ bool validate)
+ {
+ ProxyFunction *f;
+ Form_pg_proc proc_struct;
+
+ proc_struct = (Form_pg_proc) GETSTRUCT(proc_tuple);
+ if (proc_struct->provolatile != 'v')
+ elog(ERROR, "PL/Proxy functions must be volatile");
+
+ f = fn_new(fcinfo, proc_tuple);
+
+ /* keep reference in case of error half-way */
+ partial_func = f;
+
+ /* info from system tables */
+ fn_set_name(f, proc_tuple);
+ fn_get_return_type(f, fcinfo, proc_tuple);
+ fn_get_arguments(f, fcinfo, proc_tuple);
+
+ /* parse body */
+ fn_parse(f, proc_tuple);
+
+ if (f->dynamic_record && f->remote_sql)
+ plproxy_error(f, "SELECT statement not allowed for dynamic RECORD functions");
+
+ /* create SELECT stmt if not specified */
+ if (f->remote_sql == NULL)
+ f->remote_sql = plproxy_standard_query(f, true);
+
+ /* prepare local queries */
+ if (f->cluster_sql)
+ plproxy_query_prepare(f, fcinfo, f->cluster_sql);
+ if (f->hash_sql)
+ plproxy_query_prepare(f, fcinfo, f->hash_sql);
+
+ /* sanity check */
+ if (f->run_type == R_ALL && !fcinfo->flinfo->fn_retset)
+ plproxy_error(f, "RUN ON ALL requires set-returning function");
+
+ return f;
+ }
+
+ /*
+ * Compile and cache PL/Proxy function.
+ */
+ ProxyFunction *
+ plproxy_compile(FunctionCallInfo fcinfo, bool validate)
+ {
+ ProxyFunction *f;
+ HeapTuple proc_tuple;
+ Oid oid;
+
+ /* clean interrupted compile */
+ if (partial_func)
+ {
+ fn_delete(partial_func, false);
+ partial_func = NULL;
+ }
+
+ /* get current fn oid */
+ oid = fcinfo->flinfo->fn_oid;
+
+ /* lookup the pg_proc tuple */
+ proc_tuple = SearchSysCache(PROCOID, ObjectIdGetDatum(oid), 0, 0, 0);
+ if (!HeapTupleIsValid(proc_tuple))
+ elog(ERROR, "cache lookup failed for function %u", oid);
+
+ /* fn_extra not used, do lookup */
+ f = fn_cache_lookup(oid);
+
+ /* if cached, is it still valid? */
+ if (f && !plproxy_check_stamp(&f->stamp, proc_tuple))
+ {
+ fn_delete(f, true);
+ f = NULL;
+ }
+
+ if (!f)
+ {
+ f = fn_compile(fcinfo, proc_tuple, validate);
+
+ fn_cache_insert(f);
+
+ /* now its safe to drop reference */
+ partial_func = NULL;
+ }
+ else if (f->dynamic_record)
+ {
+ /* in case of untyped RECORD, check if cached type is valid */
+ fn_refresh_record(fcinfo, f, proc_tuple);
+ }
+
+ ReleaseSysCache(proc_tuple);
+
+ return f;
+ }
*** a/src/pl/plproxy/main.c
--- b/src/pl/plproxy/main.c
***************
*** 0 ****
--- 1,214 ----
+ /*
+ * PL/Proxy - easy access to partitioned database.
+ *
+ * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+ /*
+ * External interface for PostgreSQL core.
+ *
+ * List of memory contexts that are touched by this code:
+ *
+ * - Query context that is active when plproxy_call_handler is called.
+ * Function results should be allocated from here.
+ *
+ * - SPI Proc context that activates in SPI_connect() and is freed
+ * in SPI_finish(). This is used for compile-time short-term storage.
+ *
+ * - HTAB has its own memory context.
+ *
+ * - ProxyFunction->ctx for long-term allocations for functions.
+ *
+ * - cluster_mem where info about clusters is stored.
+ *
+ * - SPI_saveplan() stores plan info in separate context,
+ * so it must be freed explicitly.
+ *
+ * - libpq uses malloc() so it must be freed explicitly
+ *
+ * Because SPI functions do not honour CurrentMemoryContext
+ * and code should not have assumptions whether core
+ * functions do allocations or not, the per-function and
+ * cluster MemoryContext is switched on only when doing actual
+ * allocations. Otherwise the default context is kept.
+ */
+
+ #include "plproxy.h"
+
+ #include
+
+ PG_MODULE_MAGIC;
+
+ PG_FUNCTION_INFO_V1(plproxy_call_handler);
+
+ /*
+ * Centralised error reporting.
+ *
+ * Also frees any pending results.
+ */
+ void
+ plproxy_error(ProxyFunction *func, const char *fmt,...)
+ {
+ char msg[1024];
+ va_list ap;
+
+ va_start(ap, fmt);
+ vsnprintf(msg, sizeof(msg), fmt, ap);
+ va_end(ap);
+
+ plproxy_clean_results(func->cur_cluster);
+
+ elog(ERROR, "PL/Proxy function %s(%d): %s",
+ func->name, func->arg_count, msg);
+ }
+
+ /*
+ * Library load-time initialization.
+ * Do the initialization when SPI is active to simplify the code.
+ */
+ static void
+ plproxy_startup_init(void)
+ {
+ static bool initialized = false;
+
+ if (initialized)
+ return;
+
+ plproxy_function_cache_init();
+ plproxy_cluster_cache_init();
+
+ initialized = true;
+ }
+
+ /*
+ * Regular maintenance over all clusters.
+ */
+ static void
+ run_maint(void)
+ {
+ static struct timeval last = {0, 0};
+ struct timeval now;
+
+ gettimeofday(&now, NULL);
+ if (now.tv_sec - last.tv_sec < 2 * 60)
+ return;
+ last = now;
+
+ plproxy_cluster_maint(&now);
+ }
+
+ /*
+ * Do compilation and execution under SPI.
+ *
+ * Result conversion will be done without SPI.
+ */
+ static ProxyFunction *
+ compile_and_execute(FunctionCallInfo fcinfo)
+ {
+ int err;
+ ProxyFunction *func;
+ ProxyCluster *cluster;
+
+ /* prepare SPI */
+ err = SPI_connect();
+ if (err != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect: %s", SPI_result_code_string(err));
+
+ /* do the initialization also under SPI */
+ plproxy_startup_init();
+
+ /* compile code */
+ func = plproxy_compile(fcinfo, false);
+
+ /* get actual cluster to run on */
+ cluster = plproxy_find_cluster(func, fcinfo);
+
+ /* fetch PGresults */
+ func->cur_cluster = cluster;
+ plproxy_exec(func, fcinfo);
+
+ /* done with SPI */
+ err = SPI_finish();
+ if (err != SPI_OK_FINISH)
+ elog(ERROR, "SPI_finish: %s", SPI_result_code_string(err));
+
+ return func;
+ }
+
+ /*
+ * Logic for set-returning functions.
+ *
+ * Currently it uses the simplest, return
+ * one value/tuple per call mechanism.
+ */
+ static Datum
+ handle_ret_set(FunctionCallInfo fcinfo)
+ {
+ ProxyFunction *func;
+ FuncCallContext *ret_ctx;
+
+ if (SRF_IS_FIRSTCALL())
+ {
+ func = compile_and_execute(fcinfo);
+ ret_ctx = SRF_FIRSTCALL_INIT();
+ ret_ctx->user_fctx = func;
+ }
+
+ ret_ctx = SRF_PERCALL_SETUP();
+ func = ret_ctx->user_fctx;
+
+ if (func->cur_cluster->ret_total > 0)
+ {
+ SRF_RETURN_NEXT(ret_ctx, plproxy_result(func, fcinfo));
+ }
+ else
+ {
+ plproxy_clean_results(func->cur_cluster);
+ SRF_RETURN_DONE(ret_ctx);
+ }
+ }
+
+ /*
+ * The PostgreSQL function & trigger manager calls this function
+ * for execution of PL/Proxy procedures.
+ *
+ * Main entry point for rest of the code.
+ */
+ Datum
+ plproxy_call_handler(PG_FUNCTION_ARGS)
+ {
+ ProxyFunction *func;
+ Datum ret;
+
+ if (CALLED_AS_TRIGGER(fcinfo))
+ elog(ERROR, "PL/Proxy procedures can't be used as triggers");
+
+ /* clean old results */
+ if (!fcinfo->flinfo->fn_retset || SRF_IS_FIRSTCALL())
+ run_maint();
+
+ if (fcinfo->flinfo->fn_retset)
+ {
+ ret = handle_ret_set(fcinfo);
+ }
+ else
+ {
+ func = compile_and_execute(fcinfo);
+ ret = plproxy_result(func, fcinfo);
+ plproxy_clean_results(func->cur_cluster);
+ }
+ return ret;
+ }
*** a/src/pl/plproxy/parser.y
--- b/src/pl/plproxy/parser.y
***************
*** 0 ****
--- 1,203 ----
+ %{
+ /*
+ * PL/Proxy - easy access to partitioned database.
+ *
+ * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+ #include "plproxy.h"
+
+ /* define scanner.c functions */
+ void plproxy_yy_scan_bytes(const char *bytes, int len);
+
+ /* avoid permanent allocations */
+ #define malloc palloc
+ #define free pfree
+
+ /* remove unused code */
+ #define YY_LOCATION_PRINT(File, Loc) (0)
+ #define YY_(x) (x)
+
+ /* during parsing, keep reference to function here */
+ static ProxyFunction *xfunc;
+
+ /* remember what happened */
+ static int got_run, got_cluster, got_connect;
+
+ static QueryBuffer *cluster_sql;
+ static QueryBuffer *select_sql;
+ static QueryBuffer *hash_sql;
+
+ /* points to one of the above ones */
+ static QueryBuffer *cur_sql;
+
+ /* keep the resetting code together with variables */
+ static void reset_parser_vars(void)
+ {
+ got_run = got_cluster = got_connect = 0;
+ cur_sql = select_sql = cluster_sql = hash_sql = NULL;
+ xfunc = NULL;
+ }
+
+ %}
+
+ %name-prefix="plproxy_yy"
+
+ %token CONNECT CLUSTER RUN ON ALL ANY SELECT
+ %token IDENT CONST NUMBER FNCALL STRING
+ %token SQLIDENT SQLPART
+
+ %union
+ {
+ const char *str;
+ }
+
+ %%
+
+ body: | body stmt ;
+
+ stmt: cluster_stmt | run_stmt | select_stmt | connect_stmt ;
+
+ connect_stmt: CONNECT connect_spec ';' {
+ if (got_connect)
+ yyerror("Only one CONNECT statement allowed");
+ xfunc->run_type = R_EXACT;
+ got_connect = 1; }
+ ;
+
+ connect_spec: STRING { xfunc->connect_str = plproxy_func_strdup(xfunc, $1); }
+ ;
+
+ cluster_stmt: CLUSTER cluster_spec ';' {
+ if (got_cluster)
+ yyerror("Only one CLUSTER statement allowed");
+ got_cluster = 1; }
+ ;
+
+ cluster_spec: cluster_name | cluster_func sql_token_list
+ ;
+
+ cluster_func: FNCALL { cluster_sql = plproxy_query_start(xfunc, false);
+ cur_sql = cluster_sql;
+ plproxy_query_add_const(cur_sql, "select ");
+ plproxy_query_add_const(cur_sql, $1); }
+ ;
+
+ cluster_name: STRING { xfunc->cluster_name = plproxy_func_strdup(xfunc, $1); }
+ ;
+
+ run_stmt: RUN ON run_spec ';' { if (got_run)
+ yyerror("Only one RUN statement allowed");
+ got_run = 1; }
+ ;
+
+ run_spec: hash_func sql_token_list { xfunc->run_type = R_HASH; }
+ | NUMBER { xfunc->run_type = R_EXACT; xfunc->exact_nr = atoi($1); }
+ | ANY { xfunc->run_type = R_ANY; }
+ | ALL { xfunc->run_type = R_ALL; }
+ ;
+
+ hash_func: FNCALL { hash_sql = plproxy_query_start(xfunc, false);
+ cur_sql = hash_sql;
+ plproxy_query_add_const(cur_sql, "select * from ");
+ plproxy_query_add_const(cur_sql, $1); }
+ ;
+
+ select_stmt: sql_start sql_token_list ';' ;
+
+ sql_start: SELECT { if (select_sql)
+ yyerror("Only one SELECT statement allowed");
+ select_sql = plproxy_query_start(xfunc, true);
+ cur_sql = select_sql;
+ plproxy_query_add_const(cur_sql, $1); }
+ ;
+ sql_token_list: sql_token
+ | sql_token_list sql_token
+ ;
+ sql_token: SQLPART { plproxy_query_add_const(cur_sql, $1); }
+ | SQLIDENT { if (!plproxy_query_add_ident(cur_sql, $1))
+ yyerror("invalid argument reference: %s", $1); }
+ ;
+
+ %%
+
+ /*
+ * report parser error.
+ */
+ void yyerror(const char *fmt, ...)
+ {
+ char buf[1024];
+ int lineno = plproxy_yyget_lineno();
+ va_list ap;
+
+ va_start(ap, fmt);
+ vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+
+ /* reinitialize scanner */
+ plproxy_yylex_destroy();
+
+ plproxy_error(xfunc, "Compile error at line %d: %s", lineno, buf);
+ }
+
+
+ /* actually run the flex/bison parser */
+ void plproxy_run_parser(ProxyFunction *func, const char *body, int len)
+ {
+ /* reset variables, in case there was error exit */
+ reset_parser_vars();
+
+ /* make current function visible to parser */
+ xfunc = func;
+
+ /* By default expect RUN ON ANY; */
+ xfunc->run_type = R_ANY;
+
+ /* reinitialize scanner */
+ plproxy_yylex_startup();
+
+ /* setup scanner */
+ plproxy_yy_scan_bytes(body, len);
+
+ /* run parser */
+ yyparse();
+
+ /* check for mandatory statements */
+ if (got_connect) {
+ if (got_cluster || got_run)
+ yyerror("CONNECT cannot be used with CLUSTER/RUN");
+ } else {
+ if (!got_cluster)
+ yyerror("CLUSTER statement missing");
+ }
+
+ /* release scanner resources */
+ plproxy_yylex_destroy();
+
+ /* copy hash data if needed */
+ if (xfunc->run_type == R_HASH)
+ xfunc->hash_sql = plproxy_query_finish(hash_sql);
+
+ /* store sql */
+ if (select_sql)
+ xfunc->remote_sql = plproxy_query_finish(select_sql);
+
+ if (cluster_sql)
+ xfunc->cluster_sql = plproxy_query_finish(cluster_sql);
+
+ reset_parser_vars();
+ }
+
*** a/src/pl/plproxy/plproxy.h
--- b/src/pl/plproxy/plproxy.h
***************
*** 0 ****
--- 1,301 ----
+ /*
+ * PL/Proxy - easy access to partitioned database.
+ *
+ * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+ /*
+ * Data structures for PL/Proxy function handler.
+ */
+
+ #ifndef plproxy_h_included
+ #define plproxy_h_included
+
+ #include
+ #include
+ #include
+ #include
+
+ #include
+ #include
+ #include
+ #include
+ #include
+ #include
+ #include
+ #include
+ #include
+ #include
+ #include
+ #include
+
+ #include "rowstamp.h"
+
+ #include
+
+ /*
+ * Is PL/Proxy allowed to use binary I/O?
+ */
+ #define PLPROXY_USE_BINARY 0
+
+ /*
+ * How long a connection can stay open (in secs).
+ * Set 0 to disable.
+ */
+ #define PLPROXY_CONN_LIFETIME (3*60*60)
+
+ /*
+ * Maintenece period in seconds. Connnections will be freed
+ * from stale results, and checked for lifetime.
+ */
+ #define PLPROXY_MAINT_PERIOD (2*60)
+
+ /*
+ * Check connections that are idle more than this many seconds.
+ * Set 0 to always check.
+ */
+ #define PLPROXY_IDLE_CONN_CHECK 2
+
+ /* Flag indicating where function should be executed */
+ typedef enum RunOnType
+ {
+ R_HASH = 1, /* partition(s) returned by hash function */
+ R_ALL = 2, /* on all partitions */
+ R_ANY = 3, /* decide randomly during runtime */
+ R_EXACT = 4 /* exact part number */
+ } RunOnType;
+
+ /* Connection states for async handler */
+ typedef enum ConnState
+ {
+ C_NONE = 0, /* no connection object yet */
+ C_CONNECT_WRITE, /* login phase: sending data */
+ C_CONNECT_READ, /* login phase: waiting for server */
+ C_READY, /* connection ready for query */
+ C_QUERY_WRITE, /* query phase: sending data */
+ C_QUERY_READ, /* query phase: waiting for server */
+ C_DONE, /* query done, result available */
+ } ConnState;
+
+ /* Single database connection */
+ typedef struct
+ {
+ const char *connstr; /* Connection string for libpq */
+
+ /* state */
+ PGconn *db; /* libpq connection handle */
+ PGresult *res; /* last resultset */
+ int pos; /* Current position inside res */
+ ConnState state; /* Connection state */
+ time_t connect_time; /* When connection was started */
+ time_t query_time; /* When last query was sent */
+ bool run_on; /* True it this connection should be used */
+ bool same_ver; /* True if dest backend has same X.Y ver */
+ bool tuning; /* True if tuning query is running on conn */
+ } ProxyConnection;
+
+ /* Info about one cluster */
+ typedef struct ProxyCluster
+ {
+ struct ProxyCluster *next; /* Pointer for building singly-linked list */
+
+ const char *name; /* Cluster name */
+ int version; /* Cluster version */
+
+ int part_count; /* Number of partitions - power of 2 */
+ int part_mask; /* Mask to use to get part number from hash */
+ ProxyConnection **part_map; /* Pointers to conn_list */
+
+ int conn_count; /* Number of actual database connections */
+ ProxyConnection *conn_list; /* List of actual database connections */
+
+ int ret_cur_conn; /* Result walking: index of current conn */
+ int ret_cur_pos; /* Result walking: index of current row */
+ int ret_total; /* Result walking: total rows left */
+ } ProxyCluster;
+
+ /*
+ * Type info cache.
+ *
+ * As the decision to send/receive binary may
+ * change in runtime, both text and binary
+ * function calls must be cached.
+ */
+ typedef struct ProxyType
+ {
+ char *name; /* Name of the type */
+ Oid type_oid; /* Oid of the type */
+
+ Oid io_param; /* Extra arg for input_func */
+ bool for_send; /* True if for outputting */
+ bool has_send; /* Has binary output */
+ bool has_recv; /* Has binary input */
+ bool by_value; /* False if Datum is a pointer to data */
+
+ /* I/O functions */
+ union
+ {
+ struct
+ {
+ FmgrInfo output_func;
+ FmgrInfo send_func;
+ } out;
+ struct
+ {
+ FmgrInfo input_func;
+ FmgrInfo recv_func;
+ } in;
+ } io;
+ } ProxyType;
+
+ /*
+ * Info cache for composite return type.
+ *
+ * There is AttInMetadata in core, but it does not support
+ * binary receive, so need our own struct.
+ */
+ typedef struct ProxyComposite
+ {
+ TupleDesc tupdesc; /* Return tuple descriptor */
+ ProxyType **type_list; /* Column type info */
+ char **name_list; /* Quoted column names */
+ bool use_binary; /* True if all columns support binary recv */
+ } ProxyComposite;
+
+ /* Temp structure for query parsing */
+ typedef struct QueryBuffer QueryBuffer;
+
+ /*
+ * Parsed query where references to function arguments
+ * are replaced with local args numbered sequentially: $1..$n.
+ */
+ typedef struct ProxyQuery
+ {
+ char *sql; /* Prepared SQL string */
+ int arg_count; /* Argument count for ->sql */
+ int *arg_lookup; /* Maps local references to function args */
+ void *plan; /* Optional prepared plan for local queries */
+ } ProxyQuery;
+
+ /*
+ * Complete info about compiled function.
+ *
+ * Note: only IN and INOUT arguments are cached here.
+ */
+ typedef struct ProxyFunction
+ {
+ const char *name; /* Fully-qualified and quoted function name */
+ Oid oid; /* Function OID */
+ MemoryContext ctx; /* Where runtime allocations should happen */
+
+ RowStamp stamp; /* for pg_proc cache validation */
+
+ ProxyType **arg_types; /* Info about arguments */
+ char **arg_names; /* Argument names, may contain NULLs */
+ short arg_count; /* Argument count of proxy function */
+
+ /* if the function returns untyped RECORD that needs AS clause */
+ bool dynamic_record;
+
+ /* One of them is defined, other NULL */
+ ProxyType *ret_scalar; /* Type info for scalar return val */
+ ProxyComposite *ret_composite; /* Type info for composite return val */
+
+ /* data from function body */
+ const char *cluster_name; /* Cluster where function should run */
+ ProxyQuery *cluster_sql; /* Optional query for name resolving */
+
+ RunOnType run_type; /* Run type */
+ ProxyQuery *hash_sql; /* Hash execution for R_HASH */
+ int exact_nr; /* Hash value for R_EXACT */
+ const char *connect_str; /* libpq string for CONNECT function */
+
+ /*
+ * calculated data
+ */
+
+ ProxyQuery *remote_sql; /* query to be run repotely */
+
+ /*
+ * current execution data
+ */
+
+ /*
+ * Cluster to be executed on. In case of CONNECT,
+ * function's private fake cluster object.
+ */
+ ProxyCluster *cur_cluster;
+
+ /*
+ * Maps result field num to libpq column num.
+ * It is filled for each result. NULL when scalar result.
+ */
+ int *result_map;
+ } ProxyFunction;
+
+ /* main.c */
+ Datum plproxy_call_handler(PG_FUNCTION_ARGS);
+ void plproxy_error(ProxyFunction *func, const char *fmt,...);
+
+ /* function.c */
+ void plproxy_function_cache_init(void);
+ void *plproxy_func_alloc(ProxyFunction *func, int size);
+ char *plproxy_func_strdup(ProxyFunction *func, const char *s);
+ ProxyFunction *plproxy_compile(FunctionCallInfo fcinfo, bool validate);
+
+ /* execute.c */
+ void plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo);
+ void plproxy_clean_results(ProxyCluster *cluster);
+
+ /* scanner.c */
+ int plproxy_yyget_lineno(void);
+ int plproxy_yylex_destroy(void);
+ int plproxy_yylex(void);
+ void plproxy_scanner_sqlmode(bool val);
+ void plproxy_yylex_startup(void);
+
+ /* parser.y */
+ void plproxy_run_parser(ProxyFunction *func, const char *body, int len);
+ void plproxy_yyerror(const char *fmt,...);
+
+ /* type.c */
+ ProxyComposite *plproxy_composite_info(ProxyFunction *func, TupleDesc tupdesc);
+ ProxyType *plproxy_find_type_info(ProxyFunction *func, Oid oid, bool for_send);
+ char *plproxy_send_type(ProxyType *type, Datum val, bool allow_bin, int *len, int *fmt);
+ Datum plproxy_recv_type(ProxyType *type, char *str, int len, bool bin);
+ HeapTuple plproxy_recv_composite(ProxyComposite *meta, char **values, int *lengths, int *fmts);
+ void plproxy_free_type(ProxyType *type);
+ void plproxy_free_composite(ProxyComposite *meta);
+
+ /* cluster.c */
+ void plproxy_cluster_cache_init(void);
+ ProxyCluster *plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo);
+ void plproxy_cluster_maint(struct timeval * now);
+
+ /* result.c */
+ Datum plproxy_result(ProxyFunction *func, FunctionCallInfo fcinfo);
+
+ /* query.c */
+ QueryBuffer *plproxy_query_start(ProxyFunction *func, bool add_types);
+ bool plproxy_query_add_const(QueryBuffer *q, const char *data);
+ bool plproxy_query_add_ident(QueryBuffer *q, const char *ident);
+ ProxyQuery *plproxy_query_finish(QueryBuffer *q);
+ ProxyQuery *plproxy_standard_query(ProxyFunction *func, bool add_types);
+ void plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q);
+ void plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q);
+ void plproxy_query_freeplan(ProxyQuery *q);
+
+ #endif
*** a/src/pl/plproxy/poll_compat.c
--- b/src/pl/plproxy/poll_compat.c
***************
*** 0 ****
--- 1,140 ----
+
+ #include "postgres.h"
+
+ #include "poll_compat.h"
+
+ #ifdef PLPROXY_POLL_COMPAT
+
+ /*
+ * Emulate poll() with select()
+ */
+
+ #include
+ #include
+
+ /*
+ * dynamic buffer for fd_set to avoid depending on FD_SETSIZE
+ */
+
+ struct fd_buf {
+ fd_set *set;
+ int alloc_bytes;
+ };
+
+ static void fdbuf_zero(struct fd_buf *buf)
+ {
+ if (buf->set)
+ memset(buf->set, 0, buf->alloc_bytes);
+ }
+
+ static bool fdbuf_resize(struct fd_buf *buf, int fd)
+ {
+ /* get some extra room for quaranteed alignment */
+ int need_bytes = fd/8 + 32;
+ /* default - 2048 fds */
+ int alloc = 256;
+ uint8 *ptr;
+
+ if (buf->alloc_bytes < need_bytes)
+ {
+ while (alloc < need_bytes)
+ alloc *= 2;
+
+ if (!buf->set)
+ ptr = malloc(alloc);
+ else
+ ptr = realloc(buf->set, alloc);
+
+ if (!ptr)
+ return false;
+
+ /* clean new area */
+ memset(ptr + buf->alloc_bytes, 0, alloc - buf->alloc_bytes);
+
+ buf->set = (fd_set *)ptr;
+ buf->alloc_bytes = alloc;
+ }
+ return true;
+ }
+
+ int poll(struct pollfd *fds, nfds_t nfds, int timeout_ms)
+ {
+ static struct fd_buf readfds = { NULL, 0 };
+ static struct fd_buf writefds = { NULL, 0 };
+
+ struct pollfd *pf;
+ int i, res, fd_max = 0;
+ struct timeval *tv = NULL;
+ struct timeval tvreal;
+
+ /* convert timeout_ms to timeval */
+ if (timeout_ms >= 0)
+ {
+ tvreal.tv_sec = timeout_ms / 1000;
+ tvreal.tv_usec = (timeout_ms % 1000) * 1000;
+ tv = &tvreal;
+ } else if (timeout_ms < -1)
+ goto err_inval;
+
+ /*
+ * Convert pollfds to fd sets.
+ */
+ fdbuf_zero(&readfds);
+ fdbuf_zero(&writefds);
+ for (i = 0; i < nfds; i++)
+ {
+ pf = fds + i;
+ if (pf->fd < 0)
+ goto err_badf;
+
+ /* sets must be equal size */
+ if (!fdbuf_resize(&readfds, pf->fd))
+ goto err_nomem;
+ if (!fdbuf_resize(&writefds, pf->fd))
+ goto err_nomem;
+
+ if (pf->events & POLLIN)
+ FD_SET(pf->fd, readfds.set);
+ if (pf->events & POLLOUT)
+ FD_SET(pf->fd, writefds.set);
+ if (pf->fd > fd_max)
+ fd_max = pf->fd;
+ }
+
+ res = select(fd_max + 1, readfds.set, writefds.set, NULL, tv);
+ if (res <= 0)
+ return res;
+
+ /*
+ * select() and poll() count fd-s differently,
+ * need to recount them here.
+ */
+ res = 0;
+
+ for (i = 0; i < nfds; i++)
+ {
+ pf = fds + i;
+ pf->revents = 0;
+ if ((pf->events & POLLIN) && FD_ISSET(pf->fd, readfds.set))
+ pf->revents |= POLLIN;
+ if ((pf->events & POLLOUT) && FD_ISSET(pf->fd, writefds.set))
+ pf->revents |= POLLOUT;
+ if (pf->revents)
+ res += 1;
+ }
+ return res;
+
+ err_nomem:
+ errno = ENOMEM;
+ return -1;
+
+ err_badf:
+ errno = EBADF;
+ return -1;
+ err_inval:
+ errno = EINVAL;
+ return -1;
+ }
+
+ #endif /* PLPROXY_POLL_COMPAT */
+
*** a/src/pl/plproxy/poll_compat.h
--- b/src/pl/plproxy/poll_compat.h
***************
*** 0 ****
--- 1,58 ----
+
+ #ifndef POLL_COMPAT_H
+ #define POLL_COMPAT_H
+
+ /* define to test poll() compat function */
+ #if 0
+ #define PLPROXY_POLL_COMPAT
+ #endif
+
+ #include
+
+ /* see if real poll() can be used */
+ #ifndef PLPROXY_POLL_COMPAT
+ #ifdef HAVE_POLL_H
+ #include
+ #else
+ #ifdef HAVE_SYS_POLL_H
+ #include
+ #else
+ #define PLPROXY_POLL_COMPAT
+ #endif
+ #endif
+ #endif
+
+ /*
+ * Emulate poll() with select(), if needed.
+ */
+ #ifdef PLPROXY_POLL_COMPAT
+
+ /* in/out event types */
+ #define POLLIN (1 << 0)
+ #define POLLOUT (1 << 1)
+
+ /* rest are unused in this implementation */
+ #define POLLHUP (1 << 2)
+ #define POLLPRI (1 << 3)
+ #define POLLNVAL (1 << 4)
+ #define POLLERR (1 << 5)
+
+ /* avoid namespace conflicts */
+ #define pollfd plproxy_compat_pollfd
+ #define poll plproxy_compat_poll
+ #define nfds_t plproxy_compat_nfds_t
+
+ struct pollfd {
+ int fd;
+ short events;
+ short revents;
+ };
+
+ typedef unsigned long nfds_t;
+
+ int poll(struct pollfd *fds, nfds_t nfds, int timeout_ms);
+
+ #endif /* PLPROXY_POLL_COMPAT */
+
+ #endif /* POLL_COMPAT_H */
+
*** a/src/pl/plproxy/query.c
--- b/src/pl/plproxy/query.c
***************
*** 0 ****
--- 1,316 ----
+ /*
+ * PL/Proxy - easy access to partitioned database.
+ *
+ * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+ /*
+ * SQL statement generation helpers.
+ */
+
+ #include "plproxy.h"
+
+ /*
+ * Temporary info structure for generation.
+ *
+ * Later it will be used to make ProxyQuery.
+ */
+ struct QueryBuffer
+ {
+ ProxyFunction *func;
+ StringInfo sql;
+ int arg_count;
+ int *arg_lookup;
+ bool add_types;
+ };
+
+ /*
+ * Prepare temporary structure for query generation.
+ */
+ QueryBuffer *
+ plproxy_query_start(ProxyFunction *func, bool add_types)
+ {
+ QueryBuffer *q = palloc(sizeof(*q));
+
+ q->func = func;
+ q->sql = makeStringInfo();
+ q->arg_count = 0;
+ q->add_types = add_types;
+ q->arg_lookup = palloc(sizeof(int) * func->arg_count);
+ return q;
+ }
+
+ /*
+ * Add string fragment to query.
+ */
+ bool
+ plproxy_query_add_const(QueryBuffer *q, const char *data)
+ {
+ appendStringInfoString(q->sql, data);
+ return true;
+ }
+
+ /*
+ * Helper for adding a parameter reference to the query
+ */
+ static void
+ add_ref(StringInfo buf, int sql_idx, ProxyFunction *func, int fn_idx, bool add_type)
+ {
+ char tmp[32];
+
+ if (add_type)
+ sprintf(tmp, "$%d::%s", sql_idx + 1,
+ func->arg_types[fn_idx]->name);
+ else
+ sprintf(tmp, "$%d", sql_idx + 1);
+ appendStringInfoString(buf, tmp);
+ }
+
+ /*
+ * Add a SQL identifier to the query that may possibly be
+ * a parameter reference.
+ */
+ bool
+ plproxy_query_add_ident(QueryBuffer *q, const char *ident)
+ {
+ int i,
+ fn_idx = -1,
+ sql_idx = -1;
+
+ if (ident[0] == '$')
+ {
+ fn_idx = atoi(ident + 1) - 1;
+ if (fn_idx < 0 || fn_idx >= q->func->arg_count)
+ return false;
+ }
+ else
+ {
+ for (i = 0; i < q->func->arg_count; i++)
+ {
+ if (strcasecmp(ident, q->func->arg_names[i]) == 0)
+ {
+ fn_idx = i;
+ break;
+ }
+ }
+ }
+ if (fn_idx >= 0)
+ {
+ for (i = 0; i < q->arg_count; i++)
+ {
+ if (q->arg_lookup[i] == fn_idx)
+ {
+ sql_idx = i;
+ break;
+ }
+ }
+ if (sql_idx < 0)
+ {
+ sql_idx = q->arg_count++;
+ q->arg_lookup[sql_idx] = fn_idx;
+ }
+ add_ref(q->sql, sql_idx, q->func, fn_idx, q->add_types);
+ }
+ else
+ appendStringInfoString(q->sql, ident);
+ return true;
+ }
+
+ /*
+ * Create a ProxyQuery based on temporary QueryBuffer.
+ */
+ ProxyQuery *
+ plproxy_query_finish(QueryBuffer *q)
+ {
+ ProxyQuery *pq;
+ MemoryContext old;
+ int len;
+
+ old = MemoryContextSwitchTo(q->func->ctx);
+
+ pq = palloc(sizeof(*pq));
+ pq->sql = pstrdup(q->sql->data);
+ pq->arg_count = q->arg_count;
+ len = q->arg_count * sizeof(int);
+ pq->arg_lookup = palloc(len);
+ pq->plan = NULL;
+ memcpy(pq->arg_lookup, q->arg_lookup, len);
+
+ MemoryContextSwitchTo(old);
+
+ /* unnecessary actually, but lets be correct */
+ if (1)
+ {
+ pfree(q->sql->data);
+ pfree(q->sql);
+ pfree(q->arg_lookup);
+ memset(q, 0, sizeof(*q));
+ pfree(q);
+ }
+
+ return pq;
+ }
+
+ /*
+ * Generate a function call based on own signature.
+ */
+ ProxyQuery *
+ plproxy_standard_query(ProxyFunction *func, bool add_types)
+ {
+ StringInfoData sql;
+ ProxyQuery *pq;
+ int i,
+ len;
+
+ pq = plproxy_func_alloc(func, sizeof(*pq));
+ pq->sql = NULL;
+ pq->plan = NULL;
+ pq->arg_count = func->arg_count;
+ len = pq->arg_count * sizeof(int);
+ pq->arg_lookup = plproxy_func_alloc(func, len);
+
+ initStringInfo(&sql);
+ appendStringInfo(&sql, "select ");
+
+ /* try to fill in all result column names */
+ if (func->ret_composite)
+ {
+ ProxyComposite *t = func->ret_composite;
+ for (i = 0; i < t->tupdesc->natts; i++)
+ {
+ appendStringInfo(&sql, "%s%s::%s",
+ ((i > 0) ? ", " : ""),
+ t->name_list[i],
+ t->type_list[i]->name);
+ }
+ }
+ else
+ /* names not available, do a simple query */
+ appendStringInfo(&sql, "r::%s", func->ret_scalar->name);
+
+ /* function call */
+ appendStringInfo(&sql, " from %s(", func->name);
+
+ /* fill in function arguments */
+ for (i = 0; i < func->arg_count; i++)
+ {
+ if (i > 0)
+ appendStringInfoChar(&sql, ',');
+
+ add_ref(&sql, i, func, i, add_types);
+ pq->arg_lookup[i] = i;
+ }
+ appendStringInfoChar(&sql, ')');
+
+ /*
+ * Untyped RECORD needs types specified in AS (..) clause.
+ */
+ if (func->dynamic_record)
+ {
+ ProxyComposite *t = func->ret_composite;
+ appendStringInfo(&sql, " as (");
+ for (i = 0; i < t->tupdesc->natts; i++)
+ {
+ appendStringInfo(&sql, "%s%s %s",
+ ((i > 0) ? ", " : ""),
+ t->name_list[i],
+ t->type_list[i]->name);
+ }
+ appendStringInfoChar(&sql, ')');
+ }
+
+ if (func->ret_scalar)
+ appendStringInfo(&sql, " r");
+
+ pq->sql = plproxy_func_strdup(func, sql.data);
+ pfree(sql.data);
+
+ return pq;
+ }
+
+ /*
+ * Prepare ProxyQuery for local execution
+ */
+ void
+ plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q)
+ {
+ int i;
+ Oid types[FUNC_MAX_ARGS];
+ void *plan;
+
+ /* create sql statement in sql */
+ for (i = 0; i < q->arg_count; i++)
+ {
+ int idx = q->arg_lookup[i];
+
+ types[i] = func->arg_types[idx]->type_oid;
+ }
+
+ /* prepare & store plan */
+ plan = SPI_prepare(q->sql, q->arg_count, types);
+ q->plan = SPI_saveplan(plan);
+ }
+
+ /*
+ * Execute ProxyQuery locally.
+ *
+ * Result will be in SPI_tuptable.
+ */
+ void
+ plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q)
+ {
+ int i,
+ idx,
+ err;
+ ProxyType *type;
+ char arg_nulls[FUNC_MAX_ARGS];
+ Datum arg_values[FUNC_MAX_ARGS];
+
+ /* fill args */
+ for (i = 0; i < q->arg_count; i++)
+ {
+ idx = q->arg_lookup[i];
+ type = func->arg_types[idx];
+ if (PG_ARGISNULL(idx))
+ {
+ arg_nulls[i] = 'n';
+ arg_values[i] = (Datum) NULL;
+ }
+ else
+ {
+ arg_nulls[i] = ' ';
+ arg_values[i] = PG_GETARG_DATUM(idx);
+ }
+ }
+
+ /* run query */
+ err = SPI_execute_plan(q->plan, arg_values, arg_nulls, true, 0);
+ if (err != SPI_OK_SELECT)
+ plproxy_error(func, "query '%s' failed: %s",
+ q->sql, SPI_result_code_string(err));
+ }
+
+ /*
+ * Free cached plan.
+ */
+ void
+ plproxy_query_freeplan(ProxyQuery *q)
+ {
+ if (!q || !q->plan)
+ return;
+ SPI_freeplan(q->plan);
+ q->plan = NULL;
+ }
*** a/src/pl/plproxy/result.c
--- b/src/pl/plproxy/result.c
***************
*** 0 ****
--- 1,222 ----
+ /*
+ * PL/Proxy - easy access to partitioned database.
+ *
+ * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+ /*
+ * Conversion from PGresult to Datum.
+ *
+ * Functions here are called with CurrentMemoryContext == query context
+ * so that palloc()-ed memory stays valid after return to postgres.
+ */
+
+ #include "plproxy.h"
+
+ static bool
+ name_matches(ProxyFunction *func, const char *aname, PGresult *res, int col)
+ {
+ const char *fname = PQfname(res, col);
+
+ if (fname == NULL)
+ plproxy_error(func, "Unnamed result column %d", col + 1);
+ if (strcmp(aname, fname) == 0)
+ return true;
+ return false;
+ }
+
+ /* fill func->result_map */
+ static void
+ map_results(ProxyFunction *func, PGresult *res)
+ {
+ int i,
+ j,
+ natts,
+ nfields = PQnfields(res);
+ Form_pg_attribute a;
+ const char *aname;
+
+ if (func->ret_scalar)
+ {
+ if (nfields != 1)
+ plproxy_error(func,
+ "single field function but got record");
+ return;
+ }
+
+ natts = func->ret_composite->tupdesc->natts;
+ if (nfields < natts)
+ plproxy_error(func, "Got too few fields from remote end");
+ if (nfields > natts)
+ plproxy_error(func, "Got too many fields from remote end");
+
+ for (i = 0; i < natts; i++)
+ {
+ /* ->name_list has quoted names, take unquoted from ->tupdesc */
+ a = func->ret_composite->tupdesc->attrs[i];
+ aname = NameStr(a->attname);
+
+ func->result_map[i] = -1;
+ if (name_matches(func, aname, res, i))
+ /* fast case: 1:1 mapping */
+ func->result_map[i] = i;
+ else
+ {
+ /* slow case: messed up ordering */
+ for (j = 0; j < nfields; j++)
+ {
+ /* already tried this one */
+ if (j == i)
+ continue;
+
+ /*
+ * fixme: somehow remember the ones that are already mapped?
+ */
+ if (name_matches(func, aname, res, j))
+ {
+ func->result_map[i] = j;
+ break;
+ }
+ }
+ }
+ if (func->result_map[i] < 0)
+ plproxy_error(func,
+ "Field %s does not exists in result", aname);
+
+ /* oid sanity check. does not seem to work. */
+ if (0)
+ {
+ Oid arg_oid = func->ret_composite->type_list[i]->type_oid;
+ Oid col_oid = PQftype(res, func->result_map[i]);
+
+ if (arg_oid < 2000 || col_oid < 2000)
+ {
+ if (arg_oid != col_oid)
+ elog(WARNING, "oids do not match:%d/%d",
+ arg_oid, col_oid);
+ }
+ }
+ }
+ }
+
+ /* Return connection where are unreturned rows */
+ static ProxyConnection *
+ walk_results(ProxyFunction *func, ProxyCluster *cluster)
+ {
+ ProxyConnection *conn;
+
+ for (; cluster->ret_cur_conn < cluster->conn_count;
+ cluster->ret_cur_conn++)
+ {
+ conn = cluster->conn_list + cluster->ret_cur_conn;
+ if (conn->res == NULL)
+ continue;
+ if (conn->pos == PQntuples(conn->res))
+ continue;
+
+ /* first time on this connection? */
+ if (conn->pos == 0)
+ map_results(func, conn->res);
+
+ return conn;
+ }
+
+ plproxy_error(func, "bug: no result");
+ return NULL;
+ }
+
+ /* Return a tuple */
+ static Datum
+ return_composite(ProxyFunction *func, ProxyConnection *conn, FunctionCallInfo fcinfo)
+ {
+ int i,
+ col;
+ char *values[FUNC_MAX_ARGS];
+ int fmts[FUNC_MAX_ARGS];
+ int lengths[FUNC_MAX_ARGS];
+ HeapTuple tup;
+ ProxyComposite *meta = func->ret_composite;
+
+ for (i = 0; i < meta->tupdesc->natts; i++)
+ {
+ col = func->result_map[i];
+ if (PQgetisnull(conn->res, conn->pos, col))
+ {
+ values[i] = NULL;
+ lengths[i] = 0;
+ fmts[i] = 0;
+ }
+ else
+ {
+ values[i] = PQgetvalue(conn->res, conn->pos, col);
+ lengths[i] = PQgetlength(conn->res, conn->pos, col);
+ fmts[i] = PQfformat(conn->res, col);
+ }
+ }
+ tup = plproxy_recv_composite(meta, values, lengths, fmts);
+ return HeapTupleGetDatum(tup);
+ }
+
+ /* Return scalar value */
+ static Datum
+ return_scalar(ProxyFunction *func, ProxyConnection *conn, FunctionCallInfo fcinfo)
+ {
+ Datum dat;
+ char *val;
+ PGresult *res = conn->res;
+ int row = conn->pos;
+
+ if (func->ret_scalar->type_oid == VOIDOID)
+ {
+ dat = (Datum) NULL;
+ }
+ else if (PQgetisnull(res, row, 0))
+ {
+ fcinfo->isnull = true;
+ dat = (Datum) NULL;
+ }
+ else
+ {
+ val = PQgetvalue(res, row, 0);
+ if (val == NULL)
+ plproxy_error(func, "unexcpected NULL");
+ dat = plproxy_recv_type(func->ret_scalar, val,
+ PQgetlength(res, row, 0),
+ PQfformat(res, 0));
+ }
+ return dat;
+ }
+
+ /* Return next result Datum */
+ Datum
+ plproxy_result(ProxyFunction *func, FunctionCallInfo fcinfo)
+ {
+ Datum dat;
+ ProxyCluster *cluster = func->cur_cluster;
+ ProxyConnection *conn;
+
+ conn = walk_results(func, cluster);
+
+ if (func->ret_composite)
+ dat = return_composite(func, conn, fcinfo);
+ else
+ dat = return_scalar(func, conn, fcinfo);
+
+ cluster->ret_total--;
+ conn->pos++;
+
+ return dat;
+ }
*** a/src/pl/plproxy/rowstamp.h
--- b/src/pl/plproxy/rowstamp.h
***************
*** 0 ****
--- 1,27 ----
+
+ #ifndef __ROWSTAMP_H__
+ #define __ROWSTAMP_H__
+
+ /*
+ * Stamp structure to detect row changes.
+ */
+
+ typedef struct RowStamp {
+ TransactionId xmin;
+ ItemPointerData tid;
+ } RowStamp;
+
+ static inline void plproxy_set_stamp(RowStamp *stamp, HeapTuple tup)
+ {
+ stamp->xmin = HeapTupleHeaderGetXmin(tup->t_data);
+ stamp->tid = tup->t_self;
+ }
+
+ static inline bool plproxy_check_stamp(RowStamp *stamp, HeapTuple tup)
+ {
+ return stamp->xmin == HeapTupleHeaderGetXmin(tup->t_data)
+ && ItemPointerEquals(&stamp->tid, &tup->t_self);
+ }
+
+ #endif
+
*** a/src/pl/plproxy/scanner.l
--- b/src/pl/plproxy/scanner.l
***************
*** 0 ****
--- 1,320 ----
+ %{
+
+ /*
+ * PL/Proxy - easy access to partitioned database.
+ *
+ * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+ #include "plproxy.h"
+ #include "parser.tab.h"
+
+ /* for standard_conforming_strings */
+ #include
+
+ /* shut down crappy flex warnings */
+ int yyget_lineno(void);
+ int yyget_leng(void);
+ FILE *yyget_in(void);
+ FILE *yyget_out(void);
+ char *yyget_text(void);
+ void plproxy_yyset_lineno(int);
+ void plproxy_yyset_in(FILE *);
+ void plproxy_yyset_out(FILE *);
+ int plproxy_yyget_debug(void);
+ void plproxy_yyset_debug(int);
+ int plproxy_yylex_destroy(void);
+
+ /* point to parser value */
+ #define yylval plproxy_yylval
+
+ /*
+ * Allocate in CurrentMemoryContext.
+ *
+ * If we want to support flex 2.5.4, we cannot use
+ * options noyyalloc, noyyrealloc, noyyfree.
+ *
+ * Thus such need to hack malloc() et al.
+ */
+
+ #define malloc palloc
+ #define realloc repalloc
+ #define free(p) do { if (p) pfree(p); } while (0)
+
+
+ /*
+ * Calculare numeric flex version.
+ */
+ #if !defined(YY_FLEX_MAJOR_VERSION) || !defined(YY_FLEX_MINOR_VERSION)
+ #error Flex required
+ #endif
+ #ifndef YY_FLEX_SUBMINOR_VERSION
+ #define YY_FLEX_SUBMINOR_VERSION 0
+ #endif
+ #define FLXVER ((YY_FLEX_MAJOR_VERSION*1000 + YY_FLEX_MINOR_VERSION)*1000 + YY_FLEX_SUBMINOR_VERSION)
+
+ void plproxy_yylex_startup(void)
+ {
+ /* there may be stale pointers around, drop them */
+ #if FLXVER < 2005031
+ (YY_CURRENT_BUFFER) = NULL;
+ #else
+ (yy_buffer_stack) = NULL;
+ #endif
+ plproxy_yylex_destroy();
+ }
+
+ /*
+ * compat stuff for older flex
+ */
+ #if FLXVER < 2005031
+
+ /* old flex */
+
+ int plproxy_yylex_destroy(void)
+ {
+ plproxy_yy_delete_buffer(YY_CURRENT_BUFFER);
+ YY_CURRENT_BUFFER = NULL;
+ yy_start = 0;
+ yy_init = 1;
+ yylineno = 1;
+ return 0;
+ }
+
+ int plproxy_yyget_lineno(void)
+ {
+ return yylineno;
+ }
+
+ #endif
+
+ /* own error handling */
+ #define YY_FATAL_ERROR(msg) plproxy_yyerror(msg)
+
+ /* disable stdio related code */
+ #define YY_INPUT(buf, res, maxlen) { res = 0; }
+
+ /* shortcut for returning CONST */
+ #define RETPART do { yylval.str = yytext; return SQLPART; } while (0)
+
+ /* dollar quoting helpers */
+ static void dlr_start(const char *txt);
+ static bool dlr_stop(const char *txt);
+
+ static const char *unquote(const char *qstr, bool std);
+
+ %}
+
+ %option 8bit case-insensitive
+ %option warn nodefault yylineno
+ %option nounput noyywrap never-interactive
+ %option prefix="plproxy_yy"
+
+ /* states */
+ %x sql
+ %x qident
+ %x stdq
+ %x extq
+ %x longcom
+ %x dolq
+ %x plcom
+
+ /* whitespace */
+ SPACE [ \t\n\r]
+
+ /* sql ident. include dotted parts also */
+ WORD [a-z][a-z0-9_]*
+ IDENT {WORD}({SPACE}*[.]{SPACE}*{WORD})*
+
+ /* argument ref by val: $1 */
+ NUMIDENT [$][0-9]+
+
+ /* regular int value for hash spec */
+ PLNUMBER [0-9]+
+
+ /* SQL numeric value */
+ SQLNUM [0-9][.0-9]*
+
+ /*
+ * Symbols that may exist in sql. They must be matched one-by-one,
+ * to avoid conflics with combos.
+ *
+ * Excludes: [$'";`]
+ */
+ SQLSYM [-!#%&()*+,/:<=>?@\[\]^{|}~]
+
+ /* Dollar quote ID */
+ DOLQ_START [a-z\200-\377_]
+ DOLQ_CONT [a-z\200-\377_0-9]
+ DOLQ ({DOLQ_START}{DOLQ_CONT}*)
+
+ %%
+
+ /* PL/Proxy language keywords */
+
+ cluster { return CLUSTER; }
+ connect { return CONNECT; }
+ run { return RUN; }
+ on { return ON; }
+ all { return ALL; }
+ any { return ANY; }
+ select { BEGIN(sql); yylval.str = yytext; return SELECT; }
+
+ /* function call */
+
+ {IDENT}{SPACE}*[(] { BEGIN(sql); yylval.str = yytext; return FNCALL; }
+
+ /* PL/Proxy language comments/whitespace */
+
+ {SPACE}+ { }
+ [-][-][^\n]* { }
+ [/][*] { BEGIN(plcom); }
+ [^*/]+ { }
+ [*]+[^*/]+ { }
+ [*]+[/] { BEGIN(INITIAL); }
+ . { }
+
+ /* PL/Proxy non-keyword elements */
+
+ {IDENT} { yylval.str = yytext; return IDENT; }
+ {NUMIDENT} { yylval.str = yytext; return IDENT; }
+ {PLNUMBER} { yylval.str = yytext; return NUMBER; }
+ [']([^']+|[']['])*['] { yylval.str = unquote(yytext, true); return STRING; }
+
+ /* unparsed symbol, let parser decide */
+
+ . { return *(yytext); }
+
+ /*
+ * Following is parser for SQL statements.
+ */
+
+ /* SQL line comment */
+
+ [-][-][^\n]* { /* \n will be parsed as whitespace */ }
+
+ /* C comment, parse it as whitespace */
+
+ [/][*] { BEGIN(longcom); }
+ [^*/]+ { }
+ [*]+[^*/]+ { }
+ [*]+[/] { BEGIN(sql); yylval.str = " "; return SQLPART; }
+ . { }
+
+ /* Dollar quoted string */
+
+ [$]{DOLQ}?[$] { BEGIN(dolq); dlr_start(yytext); RETPART; }
+ [^$]+ { RETPART; }
+ [$]{DOLQ}?[$] { if (dlr_stop(yytext)) { BEGIN(sql); RETPART; }
+ /* if wrong one, report only 1 char */
+ else { yyless(1); RETPART; } }
+ [$][^$]* { RETPART; }
+
+ /* quoted indentifier */
+
+ ["] { BEGIN(qident); RETPART; }
+ [^"]+ { RETPART; }
+ [\\]. { RETPART; }
+ ["] { BEGIN(sql); RETPART; }
+
+ /* quoted string start */
+
+ E['] { BEGIN(extq); RETPART; }
+ ['] { if (standard_conforming_strings)
+ BEGIN(stdq); else BEGIN(extq);
+ RETPART; }
+
+ /* SQL standard quoted string body */
+
+ [^']+ { RETPART; }
+ [']['] { RETPART; }
+ ['] { BEGIN(sql); RETPART; }
+
+ /* extended quoted string body */
+
+ [^'\\]+ { RETPART; }
+ [']['] { RETPART; }
+ [\\]. { RETPART; }
+ ['] { BEGIN(sql); RETPART; }
+ . { RETPART; }
+
+ /* SQL identifier */
+
+ {IDENT} { yylval.str = yytext; return SQLIDENT; }
+
+ /* $x argument reference */
+
+ {NUMIDENT} { yylval.str = yytext; return SQLIDENT; }
+
+ /* SQL number */
+
+ {SQLNUM} { RETPART; }
+
+ /* SQL symbol, parse them one-by-one */
+
+ {SQLSYM} { RETPART; }
+
+ /* compress whitespace to singe ' ' */
+
+ {SPACE}+ { yylval.str = " "; return SQLPART; }
+
+ /* SQL statement end */
+
+ [;] { BEGIN(INITIAL); return *(yytext); }
+
+ /* unparsed symbol, let the parser error out */
+
+ . { return *(yytext); }
+
+ %%
+
+ static char *dlr_token = NULL;
+
+ /* remember dollar quote name */
+ static void dlr_start(const char *txt)
+ {
+ dlr_token = pstrdup(txt);
+ }
+
+ /* check if matches stored name */
+ static bool dlr_stop(const char *txt)
+ {
+ bool res = strcmp(txt, dlr_token) == 0;
+ if (res) {
+ pfree(dlr_token);
+ dlr_token = NULL;
+ }
+ return res;
+ }
+
+ static const char *unquote(const char *qstr, bool std)
+ {
+ const char *p;
+ StringInfoData buf;
+
+ initStringInfo(&buf);
+ for (p = qstr + 1; *p; p++) {
+ if (*p == '\'') {
+ if (*++p == 0)
+ break;
+ appendStringInfoChar(&buf, *p);
+ } else
+ appendStringInfoChar(&buf, *p);
+ }
+ /* leak buf.data */
+ return buf.data;
+ }
+
*** a/src/pl/plproxy/sql/plproxy_clustermap.sql
--- b/src/pl/plproxy/sql/plproxy_clustermap.sql
***************
*** 0 ****
--- 1,56 ----
+ create or replace function plproxy.get_cluster_version(cluster_name text)
+ returns integer as $$
+ begin
+ if cluster_name = 'testcluster' then
+ return 6;
+ elsif cluster_name = 'map0' then
+ return 1;
+ elsif cluster_name = 'map1' then
+ return 1;
+ elsif cluster_name = 'map2' then
+ return 1;
+ elsif cluster_name = 'map3' then
+ return 1;
+ end if;
+ raise exception 'no such cluster: %', cluster_name;
+ end; $$ language plpgsql;
+
+ create or replace function plproxy.get_cluster_partitions(cluster_name text)
+ returns setof text as $$
+ begin
+ if cluster_name = 'testcluster' then
+ return next 'host=127.0.0.1 dbname=test_part0';
+ return next 'host=127.0.0.1 dbname=test_part1';
+ return next 'host=127.0.0.1 dbname=test_part2';
+ return next 'host=127.0.0.1 dbname=test_part3';
+ elsif cluster_name = 'map0' then
+ return next 'host=127.0.0.1 dbname=test_part0';
+ elsif cluster_name = 'map1' then
+ return next 'host=127.0.0.1 dbname=test_part1';
+ elsif cluster_name = 'map2' then
+ return next 'host=127.0.0.1 dbname=test_part2';
+ elsif cluster_name = 'map3' then
+ return next 'host=127.0.0.1 dbname=test_part3';
+ else
+ raise exception 'no such cluster: %', cluster_name;
+ end if;
+ return;
+ end; $$ language plpgsql;
+
+ create function map_cluster(part integer) returns text as $$
+ begin
+ return 'map' || part;
+ end;
+ $$ language plpgsql;
+
+ create function test_clustermap(part integer) returns setof text as $$
+ cluster map_cluster(part);
+ run on 0;
+ select current_database();
+ $$ language plproxy;
+
+ select * from test_clustermap(0);
+ select * from test_clustermap(1);
+ select * from test_clustermap(2);
+ select * from test_clustermap(3);
+
*** a/src/pl/plproxy/sql/plproxy_dynamic_record.sql
--- b/src/pl/plproxy/sql/plproxy_dynamic_record.sql
***************
*** 0 ****
--- 1,43 ----
+ -- dynamic query support testing
+ create or replace function dynamic_query(q text)
+ returns setof record as $x$
+ cluster 'map0';
+ run on all;
+ $x$ language plproxy;
+
+ \c test_part0
+ create or replace function dynamic_query(q text)
+ returns setof record as $x$
+ declare
+ ret record;
+ begin
+ for ret in execute q loop
+ return next ret;
+ end loop;
+ return;
+ end;
+ $x$ language plpgsql;
+ create table dynamic_query_test (
+ id integer,
+ username text,
+ other text
+ );
+ insert into dynamic_query_test values ( 1, 'user1', 'blah');
+ insert into dynamic_query_test values ( 2, 'user2', 'foo');
+
+ \c regression
+ select * from dynamic_query('select * from dynamic_query_test') as (id integer, username text, other text);
+ select * from dynamic_query('select id, username from dynamic_query_test') as foo(id integer, username text);
+
+
+ -- test errors
+ select * from dynamic_query('select * from dynamic_query_test');
+
+ create or replace function dynamic_query_select()
+ returns setof record as $x$
+ cluster 'map0';
+ run on all;
+ select id, username from dynamic_query_test;
+ $x$ language plproxy;
+ select * from dynamic_query_select() as (id integer, username text);
+
*** a/src/pl/plproxy/sql/plproxy_errors.sql
--- b/src/pl/plproxy/sql/plproxy_errors.sql
***************
*** 0 ****
--- 1,63 ----
+
+ -- test bad arg
+ create function test_err1(dat text)
+ returns text as $$
+ cluster 'testcluster';
+ run on hashtext(username);
+ $$ language plproxy;
+ select * from test_err1('dat');
+
+ create function test_err2(dat text)
+ returns text as $$
+ cluster 'testcluster';
+ run on hashtext($2);
+ $$ language plproxy;
+ select * from test_err2('dat');
+
+ create function test_err3(dat text)
+ returns text as $$
+ cluster 'nonexists';
+ run on hashtext($1);
+ $$ language plproxy;
+ select * from test_err3('dat');
+
+ -- should work
+ create function test_err_none(dat text)
+ returns text as $$
+ cluster 'testcluster';
+ run on hashtext($1);
+ select 'ok';
+ $$ language plproxy;
+ select * from test_err_none('dat');
+
+ --- result map errors
+ create function test_map_err1(dat text)
+ returns text as $$ cluster 'testcluster'; run on 0;
+ select dat as "foo", 'asd' as "bar";
+ $$ language plproxy;
+ select * from test_map_err1('dat');
+
+ create function test_map_err2(dat text, out res1 text, out res2 text)
+ returns record as $$ cluster 'testcluster'; run on 0;
+ select dat as res1;
+ $$ language plproxy;
+ select * from test_map_err2('dat');
+
+ create function test_map_err3(dat text, out res1 text, out res2 text)
+ returns record as $$ cluster 'testcluster'; run on 0;
+ select dat as res1, 'foo' as res_none;
+ $$ language plproxy;
+ select * from test_map_err3('dat');
+
+ create function test_map_err4(dat text, out res1 text, out res2 text)
+ returns record as $$
+ --cluster 'testcluster';
+ run on hashtext(dat);
+ select dat as res2, 'foo' as res1;
+ $$ language plproxy;
+ select * from test_map_err4('dat');
+
+
+
+
+
*** a/src/pl/plproxy/sql/plproxy_init.sql
--- b/src/pl/plproxy/sql/plproxy_init.sql
***************
*** 0 ****
--- 1,57 ----
+
+
+ CREATE LANGUAGE plproxy;
+
+ \set ECHO none
+
+ -- create cluster info functions
+ create schema plproxy;
+ create or replace function plproxy.get_cluster_version(cluster_name text)
+ returns integer as $$
+ begin
+ if cluster_name = 'testcluster' then
+ return 5;
+ end if;
+ raise exception 'no such cluster: %', cluster_name;
+ end; $$ language plpgsql;
+
+ create or replace function
+ plproxy.get_cluster_partitions(cluster_name text)
+ returns setof text as $$
+ begin
+ if cluster_name = 'testcluster' then
+ return next 'host=127.0.0.1 dbname=test_part';
+ return;
+ end if;
+ raise exception 'no such cluster: %', cluster_name;
+ end; $$ language plpgsql;
+
+ -------------------------------------------------
+ -- intialize part
+ -------------------------------------------------
+ drop database if exists test_part;
+ create database test_part;
+ \c test_part
+ create language plpgsql;
+
+ drop database if exists test_part0;
+ create database test_part0;
+ \c test_part0
+ create language plpgsql;
+
+ drop database if exists test_part1;
+ create database test_part1;
+ \c test_part1
+ create language plpgsql;
+
+ drop database if exists test_part2;
+ create database test_part2;
+ \c test_part2
+ create language plpgsql;
+
+ drop database if exists test_part3;
+ create database test_part3;
+ \c test_part3
+ create language plpgsql;
+
+
*** a/src/pl/plproxy/sql/plproxy_many.sql
--- b/src/pl/plproxy/sql/plproxy_many.sql
***************
*** 0 ****
--- 1,66 ----
+ create or replace function plproxy.get_cluster_version(cluster_name text)
+ returns integer as $$
+ begin
+ if cluster_name = 'testcluster' then
+ return 6;
+ end if;
+ raise exception 'no such cluster: %', cluster_name;
+ end; $$ language plpgsql;
+
+ create or replace function plproxy.get_cluster_partitions(cluster_name text)
+ returns setof text as $$
+ begin
+ if cluster_name = 'testcluster' then
+ return next 'host=127.0.0.1 dbname=test_part0';
+ return next 'host=127.0.0.1 dbname=test_part1';
+ return next 'host=127.0.0.1 dbname=test_part2';
+ return next 'host=127.0.0.1 dbname=test_part3';
+ return;
+ end if;
+ raise exception 'no such cluster: %', cluster_name;
+ end; $$ language plpgsql;
+
+ \c test_part0
+ create function test_multi(part integer, username text)
+ returns integer as $$ begin return 0; end; $$ language plpgsql;
+ \c test_part1
+ create function test_multi(part integer, username text)
+ returns integer as $$ begin return 1; end; $$ language plpgsql;
+ \c test_part2
+ create function test_multi(part integer, username text)
+ returns integer as $$ begin return 2; end; $$ language plpgsql;
+ \c test_part3
+ create function test_multi(part integer, username text)
+ returns integer as $$ begin return 3; end; $$ language plpgsql;
+
+ \c regression
+ create function test_multi(part integer, username text)
+ returns integer as $$ cluster 'testcluster'; run on int4(part); $$ language plproxy;
+ select test_multi(0, 'foo');
+ select test_multi(1, 'foo');
+ select test_multi(2, 'foo');
+ select test_multi(3, 'foo');
+
+ -- test RUN ON ALL
+ drop function test_multi(integer, text);
+ create function test_multi(part integer, username text)
+ returns setof integer as $$ cluster 'testcluster'; run on all; $$ language plproxy;
+ select test_multi(0, 'foo');
+
+ -- test RUN ON 2
+ drop function test_multi(integer, text);
+ create function test_multi(part integer, username text)
+ returns setof integer as $$ cluster 'testcluster'; run on 2; $$ language plproxy;
+ select test_multi(0, 'foo');
+
+ -- test RUN ON RANDOM
+ select setseed(0);
+ drop function test_multi(integer, text);
+ create function test_multi(part integer, username text)
+ returns setof integer as $$ cluster 'testcluster'; run on any; $$ language plproxy;
+ select test_multi(0, 'foo');
+ select test_multi(0, 'foo');
+ select test_multi(0, 'foo');
+ select test_multi(0, 'foo');
+
+
*** a/src/pl/plproxy/sql/plproxy_select.sql
--- b/src/pl/plproxy/sql/plproxy_select.sql
***************
*** 0 ****
--- 1,37 ----
+
+ -- test regular sql
+ create function test_select(xuser text, tmp boolean)
+ returns integer as $x$
+ cluster 'testcluster';
+ run on hashtext(xuser);
+ select /*********
+ junk ;
+ ********** ****/ id from sel_test where username = xuser
+ and ';' <> 'as;d''a ; sd'
+ and $tmp$ ; 'a' $tmp$ <> 'as;d''a ; sd'
+ and $tmp$ $ $$ $foo$tmp$ <> 'x';
+ $x$ language plproxy;
+
+ \c test_part
+ create table sel_test (
+ id integer,
+ username text
+ );
+ insert into sel_test values ( 1, 'user');
+
+ \c regression
+ select * from test_select('user', true);
+ select * from test_select('xuser', false);
+
+
+ -- test errors
+ create function test_select_err(xuser text, tmp boolean)
+ returns integer as $$
+ cluster 'testcluster';
+ run on hashtext(xuser);
+ select id from sel_test where username = xuser;
+ select id from sel_test where username = xuser;
+ $$ language plproxy;
+
+ select * from test_select_err('user', true);
+
*** a/src/pl/plproxy/sql/plproxy_test.sql
--- b/src/pl/plproxy/sql/plproxy_test.sql
***************
*** 0 ****
--- 1,200 ----
+
+ -- test normal function
+ create function testfunc(username text, id integer, data text)
+ returns text as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function testfunc(username text, id integer, data text)
+ returns text as $$ begin return 'username=' || username; end; $$ language plpgsql;
+ \c regression
+ select * from testfunc('user', 1, 'foo');
+ select * from testfunc('user', 1, 'foo');
+ select * from testfunc('user', 1, 'foo');
+
+
+ -- test setof text
+ create function test_set(username text, num integer)
+ returns setof text as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_set(username text, num integer)
+ returns setof text as $$
+ declare i integer;
+ begin
+ i := 0;
+ while i < num loop
+ return next 'username=' || username || ' row=' || i;
+ i := i + 1;
+ end loop;
+ return;
+ end; $$ language plpgsql;
+ \c regression
+ select * from test_set('user', 1);
+ select * from test_set('user', 0);
+ select * from test_set('user', 3);
+
+ -- test record
+ create type ret_test_rec as ( id integer, dat text);
+ create function test_record(username text, num integer)
+ returns ret_test_rec as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create type ret_test_rec as ( id integer, dat text);
+ create function test_record(username text, num integer)
+ returns ret_test_rec as $$
+ declare ret ret_test_rec%rowtype;
+ begin
+ ret := (num, username);
+ return ret;
+ end; $$ language plpgsql;
+ \c regression
+ select * from test_record('user', 3);
+
+ -- test setof record
+ create function test_record_set(username text, num integer)
+ returns setof ret_test_rec as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_record_set(username text, num integer)
+ returns setof ret_test_rec as $$
+ declare ret ret_test_rec%rowtype; i integer;
+ begin
+ i := 0;
+ while i < num loop
+ ret := (i, username);
+ i := i + 1;
+ return next ret;
+ end loop;
+ return;
+ end; $$ language plpgsql;
+ \c regression
+ select * from test_record_set('user', 1);
+ select * from test_record_set('user', 0);
+ select * from test_record_set('user', 3);
+
+
+ -- test void
+ create function test_void(username text, num integer)
+ returns void as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_void(username text, num integer)
+ returns void as $$
+ begin
+ return;
+ end; $$ language plpgsql;
+ -- look what void actually looks
+ select * from test_void('void', 2);
+ select test_void('void', 2);
+ \c regression
+ select * from test_void('user', 1);
+ select * from test_void('user', 3);
+ select test_void('user', 3);
+ select test_void('user', 3);
+
+
+ -- test normal outargs
+ create function test_out1(username text, id integer, out data text)
+ as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_out1(username text, id integer, out data text)
+ returns text as $$ begin data := 'username=' || username; return; end; $$ language plpgsql;
+ \c regression
+ select * from test_out1('user', 1);
+
+ -- test complicated outargs
+ create function test_out2(username text, id integer, out out_id integer, xdata text, inout xdata2 text, out odata text)
+ as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_out2(username text, id integer, out out_id integer, xdata text, inout xdata2 text, out odata text)
+ as $$ begin
+ out_id = id;
+ xdata2 := xdata2 || xdata;
+ odata := 'username=' || username;
+ return;
+ end; $$ language plpgsql;
+ \c regression
+ select * from test_out2('user', 1, 'xdata', 'xdata2');
+
+ -- test various types
+ create function test_types(username text, inout vbool boolean, inout xdate timestamp, inout bin bytea)
+ as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create function test_types(username text, inout vbool boolean, inout xdate timestamp, inout bin bytea)
+ as $$ begin return; end; $$ language plpgsql;
+ \c regression
+ select * from test_types('types', true, '2009-11-04 12:12:02', E'a\\000\\001\\002b');
+ select * from test_types('types', NULL, NULL, NULL);
+
+
+ -- test user defined types
+ create domain posint as int4 check (value > 0);
+ create type struct as (id int4, data text);
+
+ create function test_types2(username text, inout v_posint posint, inout v_struct struct, inout arr int8[])
+ as $$ cluster 'testcluster'; run on 0; $$ language plproxy;
+
+ \c test_part
+ create domain posint as int4 check (value > 0);
+ create type struct as (id int4, data text);
+ create function test_types2(username text, inout v_posint posint, inout v_struct struct, inout arr int8[])
+ as $$ begin return; end; $$ language plpgsql;
+ \c regression
+ select * from test_types2('types', 4, (2, 'asd'), array[1,2,3]);
+ select * from test_types2('types', NULL, NULL, NULL);
+
+ -- test CONNECT
+ create function test_connect1() returns text
+ as $$ connect 'dbname=test_part'; select current_database(); $$ language plproxy;
+ select * from test_connect1();
+
+
+ -- test quoting function
+ create type "RetWeird" as (
+ "ColId" int4,
+ "ColData" text
+ );
+
+ create function "testQuoting"(username text, id integer, data text)
+ returns "RetWeird" as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy;
+ \c test_part
+ create type "RetWeird" as (
+ "ColId" int4,
+ "ColData" text
+ );
+ create function "testQuoting"(username text, id integer, data text)
+ returns "RetWeird" as $$ select 1::int4, 'BazOoka'::text $$ language sql;
+ \c regression
+ select * from "testQuoting"('user', '1', 'dat');
+
+ -- test hash types function
+ create or replace function t_hash16(int4) returns int2 as $$
+ declare
+ res int2;
+ begin
+ res = $1::int2;
+ return res;
+ end;
+ $$ language plpgsql;
+
+ create or replace function t_hash64(int4) returns int8 as $$
+ declare
+ res int8;
+ begin
+ res = $1;
+ return res;
+ end;
+ $$ language plpgsql;
+
+ create function test_hash16(id integer, data text)
+ returns text as $$ cluster 'testcluster'; run on t_hash16(id); select data; $$ language plproxy;
+ select * from test_hash16('0', 'hash16');
+
+ create function test_hash64(id integer, data text)
+ returns text as $$ cluster 'testcluster'; run on t_hash64(id); select data; $$ language plproxy;
+ select * from test_hash64('0', 'hash64');
+
+ -- test argument difference
+ \c test_part
+ create function test_difftypes(username text, out val1 int2, out val2 float8)
+ as $$ begin val1 = 1; val2 = 3;return; end; $$ language plpgsql;
+ \c regression
+ create function test_difftypes(username text, out val1 int4, out val2 float4)
+ as $$ cluster 'testcluster'; run on 0; $$ language plproxy;
+ select * from test_difftypes('types');
+
*** a/src/pl/plproxy/type.c
--- b/src/pl/plproxy/type.c
***************
*** 0 ****
--- 1,336 ----
+ /*
+ * PL/Proxy - easy access to partitioned database.
+ *
+ * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ
+ * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+ /*
+ * Caches I/O info about scalar values.
+ */
+
+ #include "plproxy.h"
+
+ /*
+ * Checks if we can safely use binary.
+ */
+ static bool usable_binary(Oid oid)
+ {
+ switch (oid)
+ {
+ case BOOLOID:
+ case INT2OID:
+ case INT4OID:
+ case INT8OID:
+ case TEXTOID:
+ case BPCHAROID:
+ case VARCHAROID:
+ case FLOAT4OID:
+ case FLOAT8OID:
+ case NUMERICOID:
+ case BYTEAOID:
+ return true;
+
+ /* integer vs. float issue */
+ case TIMESTAMPOID:
+ case TIMESTAMPTZOID:
+ case DATEOID:
+ case TIMEOID:
+
+ /* interval binary fmt changed in 8.1 */
+ case INTERVALOID:
+ default:
+ return false;
+ }
+ }
+
+ /*
+ * Collects info about fields of a composite type.
+ *
+ * Based on TupleDescGetAttInMetadata.
+ */
+ ProxyComposite *
+ plproxy_composite_info(ProxyFunction *func, TupleDesc tupdesc)
+ {
+ int i,
+ natts = tupdesc->natts;
+ ProxyComposite *ret;
+ MemoryContext old_ctx;
+ Form_pg_attribute a;
+ ProxyType *type;
+ const char *name;
+
+ old_ctx = MemoryContextSwitchTo(func->ctx);
+
+ ret = palloc(sizeof(*ret));
+ ret->type_list = palloc(sizeof(ProxyType *) * natts);
+ ret->name_list = palloc0(sizeof(char *) * natts);
+ ret->tupdesc = BlessTupleDesc(tupdesc);
+ ret->use_binary = 1;
+
+ MemoryContextSwitchTo(old_ctx);
+
+ for (i = 0; i < natts; i++)
+ {
+ a = tupdesc->attrs[i];
+ if (a->attisdropped)
+ plproxy_error(func, "dropped attrs not supported");
+
+ name = quote_identifier(NameStr(a->attname));
+ ret->name_list[i] = plproxy_func_strdup(func, name);
+
+ type = plproxy_find_type_info(func, a->atttypid, 0);
+ ret->type_list[i] = type;
+
+ if (!type->has_recv)
+ ret->use_binary = 0;
+ }
+
+ return ret;
+ }
+
+ void
+ plproxy_free_composite(ProxyComposite *rec)
+ {
+ int i;
+ int natts = rec->tupdesc->natts;
+
+ for (i = 0; i < natts; i++)
+ {
+ plproxy_free_type(rec->type_list[i]);
+ pfree(rec->name_list[i]);
+ }
+ pfree(rec->type_list);
+ pfree(rec->name_list);
+ FreeTupleDesc(rec->tupdesc);
+ pfree(rec);
+ }
+
+ void
+ plproxy_free_type(ProxyType *type)
+ {
+ if (type->name)
+ pfree(type->name);
+
+ /* hopefully I/O functions do not use ->fn_extra */
+
+ pfree(type);
+ }
+
+ /*
+ * Build result tuple from binary or CString values.
+ *
+ * Based on BuildTupleFromCStrings.
+ */
+ HeapTuple
+ plproxy_recv_composite(ProxyComposite *meta, char **values, int *lengths, int *fmts)
+ {
+ TupleDesc tupdesc = meta->tupdesc;
+ int natts = tupdesc->natts;
+ Datum *dvalues;
+ char *nulls;
+ int i;
+ HeapTuple tuple;
+
+ dvalues = (Datum *) palloc(natts * sizeof(Datum));
+ nulls = (char *) palloc(natts * sizeof(char));
+
+ /* Call the recv function for each attribute */
+ for (i = 0; i < natts; i++)
+ {
+ if (tupdesc->attrs[i]->attisdropped)
+ elog(ERROR, "dropped attrs not supported");
+
+ dvalues[i] = plproxy_recv_type(meta->type_list[i],
+ values[i], lengths[i], fmts[i]);
+ nulls[i] = (values[i] != NULL) ? ' ' : 'n';
+ }
+
+ /* Form a tuple */
+ tuple = heap_formtuple(tupdesc, dvalues, nulls);
+
+ /*
+ * Release locally palloc'd space.
+ */
+ for (i = 0; i < natts; i++)
+ {
+ if (nulls[i] == 'n')
+ continue;
+ if (meta->type_list[i]->by_value)
+ continue;
+ pfree(DatumGetPointer(dvalues[i]));
+ }
+ pfree(dvalues);
+ pfree(nulls);
+
+ return tuple;
+ }
+
+ /* Find info about scalar type */
+ ProxyType *
+ plproxy_find_type_info(ProxyFunction *func, Oid oid, bool for_send)
+ {
+ ProxyType *type;
+ HeapTuple t_type,
+ t_nsp;
+ Form_pg_type s_type;
+ Form_pg_namespace s_nsp;
+ char namebuf[NAMEDATALEN * 4 + 2 + 1 + 2 + 1];
+ Oid nsoid;
+
+ /* fetch pg_type row */
+ t_type = SearchSysCache(TYPEOID, ObjectIdGetDatum(oid), 0, 0, 0);
+ if (!HeapTupleIsValid(t_type))
+ plproxy_error(func, "cache lookup failed for type %u", oid);
+
+ /* typname, typnamespace, PG_CATALOG_NAMESPACE, PG_PUBLIC_NAMESPACE */
+ s_type = (Form_pg_type) GETSTRUCT(t_type);
+ nsoid = s_type->typnamespace;
+
+ if (nsoid != PG_CATALOG_NAMESPACE)
+ {
+ t_nsp = SearchSysCache(NAMESPACEOID, ObjectIdGetDatum(nsoid), 0, 0, 0);
+ if (!HeapTupleIsValid(t_nsp))
+ plproxy_error(func, "cache lookup failed for namespace %u", nsoid);
+ s_nsp = (Form_pg_namespace) GETSTRUCT(t_nsp);
+ snprintf(namebuf, sizeof(namebuf), "%s.%s",
+ quote_identifier(NameStr(s_nsp->nspname)),
+ quote_identifier(NameStr(s_type->typname)));
+ ReleaseSysCache(t_nsp);
+ }
+ else
+ {
+ snprintf(namebuf, sizeof(namebuf), "%s", quote_identifier(NameStr(s_type->typname)));
+ }
+
+ /* sanity check */
+ switch (s_type->typtype)
+ {
+ default:
+ case 'p':
+ if (oid != VOIDOID)
+ plproxy_error(func, "unsupported pseudo type: %s (%u)",
+ namebuf, oid);
+ case 'b':
+ case 'c':
+ case 'd':
+ break;
+ }
+
+ /* allocate & fill structure */
+ type = plproxy_func_alloc(func, sizeof(*type));
+ memset(type, 0, sizeof(*type));
+
+ type->type_oid = oid;
+ type->io_param = getTypeIOParam(t_type);
+ type->for_send = for_send;
+ type->by_value = s_type->typbyval;
+ type->name = plproxy_func_strdup(func, namebuf);
+
+ /* decide what function is needed */
+ if (for_send)
+ {
+ fmgr_info_cxt(s_type->typoutput, &type->io.out.output_func, func->ctx);
+ if (OidIsValid(s_type->typsend) && usable_binary(oid))
+ {
+ fmgr_info_cxt(s_type->typsend, &type->io.out.send_func, func->ctx);
+ type->has_send = 1;
+ }
+ }
+ else
+ {
+ fmgr_info_cxt(s_type->typinput, &type->io.in.input_func, func->ctx);
+ if (OidIsValid(s_type->typreceive) && usable_binary(oid))
+ {
+ fmgr_info_cxt(s_type->typreceive, &type->io.in.recv_func, func->ctx);
+ type->has_recv = 1;
+ }
+ }
+
+ ReleaseSysCache(t_type);
+
+ return type;
+ }
+
+
+ /* Convert a Datum to parameter for libpq */
+ char *
+ plproxy_send_type(ProxyType *type, Datum val, bool allow_bin, int *len, int *fmt)
+ {
+ bytea *bin;
+ char *res;
+
+ Assert(type->for_send == 1);
+
+ if (allow_bin && type->has_send)
+ {
+ bin = SendFunctionCall(&type->io.out.send_func, val);
+ res = VARDATA(bin);
+ *len = VARSIZE(bin) - VARHDRSZ;
+ *fmt = 1;
+ }
+ else
+ {
+ res = OutputFunctionCall(&type->io.out.output_func, val);
+ *len = 0;
+ *fmt = 0;
+ }
+ return res;
+ }
+
+ /*
+ * Point StringInfo to fixed buffer.
+ *
+ * Supposedly StringInfo wants 0 at the end.
+ * Luckily libpq already provides it so all is fine.
+ *
+ * Although it should not matter to binary I/O functions.
+ */
+ static void
+ setFixedStringInfo(StringInfo str, void *data, int len)
+ {
+ str->data = data;
+ str->maxlen = len;
+ str->len = len;
+ str->cursor = 0;
+ }
+
+ /* Convert a libpq result to Datum */
+ Datum
+ plproxy_recv_type(ProxyType *type, char *val, int len, bool bin)
+ {
+ Datum res;
+ StringInfoData buf;
+
+ Assert(type->for_send == 0);
+
+ if (bin)
+ {
+ if (!type->has_recv)
+ elog(ERROR, "PL/Proxy: type %u recv not supported", type->type_oid);
+
+ /* avoid unnecessary copy */
+ setFixedStringInfo(&buf, val, len);
+
+ res = ReceiveFunctionCall(&type->io.in.recv_func,
+ &buf, type->io_param, -1);
+ }
+ else
+ {
+ res = InputFunctionCall(&type->io.in.input_func,
+ val, type->io_param, -1);
+ }
+ return res;
+ }