*** a/doc/src/sgml/filelist.sgml --- b/doc/src/sgml/filelist.sgml *************** *** 67,76 **** --- 67,77 ---- + %allfiles; *** a/doc/src/sgml/plproxy.sgml --- b/doc/src/sgml/plproxy.sgml *************** *** 0 **** --- 1,221 ---- + + + + PL/Proxy - Procedural Language for Remote Calls + + PL/Proxy + + + The PL/Proxy procedural language makes + easy to create remote calls. + + + + To install PL/Proxy in a particular database, use + createlang plproxy dbname. + + + + + If a language is installed into template1, all subsequently + created databases will have the language installed automatically. + + + + + PL/Proxy is untrusted language, because it does remote accesses. + That means only superusers can create functions in that language. + + + + PL/Proxy Language + + + Functions in PL/Proxy are declared via the standard + + syntax: + + + CREATE FUNCTION funcname (argument-list) + RETURNS return-type + AS $$ + -- PL/Proxy function body + $$ LANGUAGE plproxy; + + + + The language is similar to plpgsql - string quoting, comments, + semicolon at the statements end. + + It contains only 4 statements: CONNECT, + CLUSTER, RUN and + SELECT. + + Each function needs to have either CONNECT or + pair of CLUSTER + RUN statements + to specify where to run the function. + + The SELECT statement is optional, if it is + missing, there will be default query generated based on proxy function + signature. + + The RUN statment is also optional, it defaults + to RUN ON ANY which means the query will be run + random partition. + + + CONNECT + CONNECT 'libpq connstr'; + Specifies exact location where to connect and execute the query. + If several functions have same connstr, they will use same connection. + + + + CLUSTER + + CLUSTER 'cluster_name'; + Specifies exact cluster name to be run on. The cluster name will + be passed to plproxy.get_cluster_* functions. + + CLUSTER cluster_func(..); + Cluster name can be dynamically decided upon proxy function arguments. + cluster_func should return text value of final cluster name. + + + + + RUN ON + RUN ON ALL; + Query will be run on all partitions in cluster in parallel. + + RUN ON ANY; + Query will be run on random partition. + + RUN ON <NR>; + Run on partition number <NR>. + + RUN ON partition_func(..); + Run partition_func() which should return one or more hash values. + PL/Proxy accepts int2, int4 and int8 values from hash function. + Query will be run on tagged partitions. If more than one partition was + tagged, query will be sent in parallel to them. + + + + + + SELECT + SELECT .... ; + By default, PL/Proxy generates query based on its own signature. + But this can be overrided by giving explicit SELECT + statement to run. + + Everything after SELECT until semicolon is + taken as SQL to be passed on. Only argument substitution is done on the + contents, otherwise the text is unparsed. To avoid a table column to be + parsed as function argument, table aliases should be used. + + Query result should have same number of columns as function result + and same names too. + + + + + + Argument substitution + Proxy function arguments can be referenced using name or + $n syntax. Everything that is not argument + reference is just passed on. + + + + + + Configuring PL/Proxy Cluster + + PL/Proxy can be used in either CONNECT mode or CLUSTER mode. + + In CONNECT mode PL/Proxy acts as a pass through proxy to another database. + Each PL/Proxy function contains a libpq connect string for the connection + to a database it will proxy the request to. + + PL/Proxy can also be used in CLUSTER mode where it provides support for + partitioning data across multiple databases based on a clustering function. + + When using PL/Proxy in CONNECT mode no configuration functions are required. + However, using PL/Proxy in CLUSTER mode requires the following configuration + functions to be defined. + + + plproxy.get_cluster_version(cluster_name) + + plproxy.get_cluster_version(cluster_name text) returns integer + + The get_cluster_version function is called on each request, it should return + the version number of the current configuration for a particular cluster. + If the version number returned by this function is higher than the one plproxy + has cached, then the configuration and partition information will be reloaded + by calling the get_cluster_config() and get_cluster_paritions() functions. + + This is an example function that does not lookup the version number for an + external source such as a configuration table. + + + CREATE OR REPLACE FUNCTION plproxy.get_cluster_version(cluster_name text) + RETURNS int4 AS $$ + BEGIN + IF cluster_name = 'a_cluster' THEN + RETURN 1; + END IF; + RAISE EXCEPTION 'Unknown cluster'; + END; + $$ LANGUAGE plpgsql; + + + + + + plproxy.get_cluster_partitions(cluster_name) + + plproxy.get_cluster_partitions(cluster_name text) returns setof text + + This is called when a new partition configuration needs to be loaded. + It should return connect strings to the partitions in the cluster. + The connstrings should be returned in the correct order. The total + number of connstrings returned must be a power of 2. If two or more + connstrings are equal then they will use the same connection. + + If the string "user=" does not appear in a connect string then + user=CURRENT_USER will be appended to the connection string by PL/Proxy. + This will cause PL/Proxy to connect to the partition database using + the same username as was used to connect to the proxy database. + Since plproxy does not know any passwords, the partition databases + should be using "trust" authentication for connections from the proxy database + to allow connections to the proxy database without requiring a password. + If the connect strings contain an explicit username then an explicit + password can also be set in the connstring. + + Best way to set explicit passwords is to add them to .pgpass file + in home dir of the user Postgres server runs at. + + An example function without the use of separate configuration tables: + + + CREATE OR REPLACE FUNCTION plproxy.get_cluster_partitions(cluster_name text) + RETURNS SETOF text AS $$ + BEGIN + IF cluster_name = 'a_cluster' THEN + RETURN NEXT 'dbname=part00 host=127.0.0.1'; + RETURN NEXT 'dbname=part01 host=127.0.0.1'; + RETURN NEXT 'dbname=part02 host=127.0.0.1'; + RETURN NEXT 'dbname=part03 host=127.0.0.1'; + RETURN; + END IF; + RAISE EXCEPTION 'Unknown cluster'; + END; + $$ LANGUAGE plpgsql; + + + + + *** a/doc/src/sgml/postgres.sgml --- b/doc/src/sgml/postgres.sgml *************** *** 210,219 **** --- 210,220 ---- &xplang; &plsql; &pltcl; &plperl; &plpython; + &plproxy; &spi; *** a/src/include/catalog/pg_pltemplate.h --- b/src/include/catalog/pg_pltemplate.h *************** DATA(insert ( "plpgsql" t t "plpgsql_ca *** 69,75 **** --- 69,76 ---- DATA(insert ( "pltcl" t t "pltcl_call_handler" _null_ "$libdir/pltcl" _null_ )); DATA(insert ( "pltclu" f f "pltclu_call_handler" _null_ "$libdir/pltcl" _null_ )); DATA(insert ( "plperl" t t "plperl_call_handler" "plperl_validator" "$libdir/plperl" _null_ )); DATA(insert ( "plperlu" f f "plperl_call_handler" "plperl_validator" "$libdir/plperl" _null_ )); DATA(insert ( "plpythonu" f f "plpython_call_handler" _null_ "$libdir/plpython" _null_ )); + DATA(insert ( "plproxy" f f "plproxy_call_handler" _null_ "$libdir/plproxy" _null_ )); #endif /* PG_PLTEMPLATE_H */ *** a/src/pl/Makefile --- b/src/pl/Makefile *************** *** 10,20 **** subdir = src/pl top_builddir = ../.. include $(top_builddir)/src/Makefile.global ! DIRS = plpgsql ifeq ($(with_perl), yes) DIRS += plperl endif --- 10,20 ---- subdir = src/pl top_builddir = ../.. include $(top_builddir)/src/Makefile.global ! DIRS = plpgsql plproxy ifeq ($(with_perl), yes) DIRS += plperl endif *** a/src/pl/plproxy/Makefile --- b/src/pl/plproxy/Makefile *************** *** 0 **** --- 1,89 ---- + #------------------------------------------------------------------------- + # + # Makefile for the plproxy shared object + # + # $PostgreSQL$ + # + #------------------------------------------------------------------------- + + subdir = src/pl/plproxy + top_builddir = ../../.. + include $(top_builddir)/src/Makefile.global + + NAME= plproxy + SRCS = cluster.c execute.c function.c main.c \ + query.c result.c type.c poll_compat.c + OBJS = scanner.o parser.tab.o $(SRCS:.c=.o) + + REGRESS = plproxy_init plproxy_test plproxy_select plproxy_many \ + plproxy_errors plproxy_clustermap plproxy_dynamic_record + REGRESS_OPTS = --load-language=plpgsql + + override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + SHLIB_LINK = $(libpq_pgport) $(LDFLAGS) $(LIBS) + rpath = + + + all: all-lib + + # Shared library stuff + include $(top_srcdir)/src/Makefile.shlib + + + install: installdirs all install-lib + + installdirs: installdirs-lib + + uninstall: uninstall-lib + + + # Force these dependencies to be known even without dependency info built: + $(OBJS): plproxy.h rowstamp.h + execute.o: poll_compat.h + poll_compat.o: poll_compat.h + + scanner.o: parser.tab.h + parser.tab.h: parser.tab.c + + $(srcdir)/parser.tab.c: parser.y + ifdef YACC + $(YACC) -d $(YFLAGS) $< + mv -f y.tab.c $(srcdir)/parser.tab.c + mv -f y.tab.h $(srcdir)/parser.tab.h + else + @$(missing) bison $< $@ + endif + + # Because we use %option case-insensitive, flex's results could vary + # depending on what the compile-time locale setting is. Hence, force + # it to see LC_CTYPE=C to ensure consistent build results. + + $(srcdir)/scanner.c: scanner.l + ifdef FLEX + LC_CTYPE=C $(FLEX) $(FLEXFLAGS) -o'$@' $< + else + @$(missing) flex $< $@ + endif + + distprep: $(srcdir)/scanner.c $(srcdir)/parser.tab.h $(srcdir)/parser.tab.c + + # parser.tab.c, parser.tab.h and scanner.c are in the distribution tarball, + # so they are not cleaned here. + clean distclean: clean-lib + rm -f $(OBJS) + # And the garbage that might have been left behind by partial build: + @rm -f y.tab.h y.tab.c y.output lex.yy.c + + maintainer-clean: clean + rm -f $(srcdir)/parser.tab.c $(srcdir)/parser.tab.h $(srcdir)/scanner.c + + installcheck: submake + $(top_builddir)/src/test/regress/pg_regress \ + --inputdir=$(srcdir) \ + --psqldir=$(PSQLDIR) \ + $(REGRESS_OPTS) $(REGRESS) + + .PHONY: submake + submake: + $(MAKE) -C $(top_builddir)/src/test/regress pg_regress$(X) + *** a/src/pl/plproxy/cluster.c --- b/src/pl/plproxy/cluster.c *************** *** 0 **** --- 1,469 ---- + /* + * PL/Proxy - easy access to partitioned database. + * + * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + /* + * Cluster info management. + * + * Info structures are kept in separate memory context: cluster_mem. + */ + + #include "plproxy.h" + + /* Permanent memory area for cluster info structures */ + static MemoryContext cluster_mem; + + /* + * Singly linked list of clusters. + * + * For searching by name. If there will be lots of clusters + * should use some faster search method, HTAB probably. + */ + static ProxyCluster *cluster_list = NULL; + + /* + * Similar list for fake clusters (for CONNECT functions). + * + * Cluster name will be actual connect string. + */ + static ProxyCluster *fake_cluster_list = NULL; + + /* plan for fetching cluster version */ + static void *version_plan; + + /* plan for fetching cluster partitions */ + static void *partlist_plan; + + /* query for fetching cluster version */ + static const char version_sql[] = "select * from plproxy.get_cluster_version($1)"; + + /* query for fetching cluster partitions */ + static const char part_sql[] = "select * from plproxy.get_cluster_partitions($1)"; + + + /* + * Connsetion count should be non-zero and power of 2. + */ + static bool + check_valid_partcount(int n) + { + return (n > 0) && !(n & (n - 1)); + } + + /* + * Create cache memory area and prepare plans + */ + void + plproxy_cluster_cache_init(void) + { + /* + * create long-lived memory context + */ + + cluster_mem = AllocSetContextCreate(TopMemoryContext, + "PL/Proxy cluster context", + ALLOCSET_SMALL_MINSIZE, + ALLOCSET_SMALL_INITSIZE, + ALLOCSET_SMALL_MAXSIZE); + } + + /* initialize plans on demand */ + static void + plproxy_cluster_plan_init(void) + { + void *tmp_ver_plan, *tmp_part_plan; + Oid types[] = {TEXTOID}; + static int init_done = 0; + + if (init_done) + return; + + /* + * prepare plans for fetching configuration. + */ + + tmp_ver_plan = SPI_prepare(version_sql, 1, types); + if (tmp_ver_plan == NULL) + elog(ERROR, "PL/Proxy: plproxy.get_cluster_version() SQL fails: %s", + SPI_result_code_string(SPI_result)); + + tmp_part_plan = SPI_prepare(part_sql, 1, types); + if (tmp_part_plan == NULL) + elog(ERROR, "PL/Proxy: plproxy.get_cluster_partitions() SQL fails: %s", + SPI_result_code_string(SPI_result)); + + /* + * Store them only if all successful. + */ + version_plan = SPI_saveplan(tmp_ver_plan); + partlist_plan = SPI_saveplan(tmp_part_plan); + + init_done = 1; + } + + /* + * Drop partition and connection data from cluster. + */ + static void + free_connlist(ProxyCluster *cluster) + { + int i; + ProxyConnection *conn; + + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + if (conn->db) + PQfinish(conn->db); + if (conn->res) + PQclear(conn->res); + if (conn->connstr) + pfree((void *) conn->connstr); + } + pfree(cluster->part_map); + pfree(cluster->conn_list); + + cluster->part_map = NULL; + cluster->part_count = 0; + cluster->part_mask = 0; + cluster->conn_list = NULL; + cluster->conn_count = 0; + } + + /* + * Add new database connection if it does not exists. + */ + static ProxyConnection * + add_connection(ProxyCluster *cluster, char *connstr) + { + int i; + ProxyConnection *conn; + char *username; + StringInfo final; + + final = makeStringInfo(); + appendStringInfoString(final, connstr); + + /* append current user if not specified in connstr */ + if (strstr(connstr, "user=") == NULL) + { + username = GetUserNameFromId(GetSessionUserId()); + appendStringInfo(final, " user=%s", username); + } + + /* check if already have it */ + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + if (strcmp(conn->connstr, final->data) == 0) + return conn; + } + + /* add new connection */ + conn = &cluster->conn_list[cluster->conn_count++]; + conn->connstr = MemoryContextStrdup(cluster_mem, final->data); + + return conn; + } + + /* + * Fetch cluster version. + * Called for each execution. + */ + static int + get_version(ProxyFunction *func, Datum dname) + { + Datum bin_val; + bool isnull; + char nulls[1]; + int err; + + nulls[0] = (dname == (Datum) NULL) ? 'n' : ' '; + + err = SPI_execute_plan(version_plan, &dname, nulls, false, 0); + if (err != SPI_OK_SELECT) + plproxy_error(func, "get_version: spi error: %s", + SPI_result_code_string(err)); + if (SPI_processed != 1) + plproxy_error(func, "get_version: got %d rows", + SPI_processed); + + bin_val = SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, 1, &isnull); + if (isnull) + plproxy_error(func, "get_version: got NULL?"); + + return DatumGetInt32(bin_val); + } + + /* fetch list of parts */ + static int + reload_parts(ProxyCluster *cluster, Datum dname, ProxyFunction *func) + { + int err, + i; + ProxyConnection *conn; + char *connstr; + MemoryContext old_ctx; + TupleDesc desc; + HeapTuple row; + + /* run query */ + err = SPI_execute_plan(partlist_plan, &dname, NULL, false, 0); + if (err != SPI_OK_SELECT) + plproxy_error(func, "get_partlist: spi error"); + if (!check_valid_partcount(SPI_processed)) + plproxy_error(func, "get_partlist: invalid part count"); + + /* check column types */ + desc = SPI_tuptable->tupdesc; + if (desc->natts < 1) + plproxy_error(func, "Partition config must have at least 1 columns"); + if (SPI_gettypeid(desc, 1) != TEXTOID) + plproxy_error(func, "partition column 1 must be text"); + + /* free old one */ + if (cluster->conn_list) + free_connlist(cluster); + + cluster->part_count = SPI_processed; + cluster->part_mask = cluster->part_count - 1; + + /* allocate lists */ + old_ctx = MemoryContextSwitchTo(cluster_mem); + cluster->part_map = palloc0(SPI_processed * sizeof(ProxyConnection *)); + cluster->conn_list = palloc0(SPI_processed * sizeof(ProxyConnection)); + MemoryContextSwitchTo(old_ctx); + + /* fill values */ + for (i = 0; i < SPI_processed; i++) + { + row = SPI_tuptable->vals[i]; + + connstr = SPI_getvalue(row, desc, 1); + if (connstr == NULL) + plproxy_error(func, "connstr must not be NULL"); + + conn = add_connection(cluster, connstr); + cluster->part_map[i] = conn; + } + + return 0; + } + + /* allocate new cluster */ + static ProxyCluster * + new_cluster(const char *name) + { + ProxyCluster *cluster; + MemoryContext old_ctx; + + old_ctx = MemoryContextSwitchTo(cluster_mem); + + cluster = palloc0(sizeof(*cluster)); + cluster->name = pstrdup(name); + + MemoryContextSwitchTo(old_ctx); + + return cluster; + } + + /* + * Get cached or create new fake cluster. + */ + static ProxyCluster * + fake_cluster(ProxyFunction *func) + { + ProxyCluster *cluster; + ProxyConnection *conn; + MemoryContext old_ctx; + + /* search if cached */ + for (cluster = fake_cluster_list; cluster; cluster = cluster->next) + { + if (strcmp(cluster->name, func->connect_str) == 0) + break; + } + + if (cluster) + return cluster; + + /* create if not */ + + old_ctx = MemoryContextSwitchTo(cluster_mem); + + cluster = palloc0(sizeof(*cluster)); + cluster->name = pstrdup(func->connect_str); + cluster->version = 1; + cluster->part_count = 1; + cluster->part_mask = 0; + cluster->conn_count = 1; + cluster->part_map = palloc(sizeof(ProxyConnection *)); + cluster->conn_list = palloc0(sizeof(ProxyConnection)); + conn = &cluster->conn_list[0]; + cluster->part_map[0] = conn; + + conn->connstr = pstrdup(cluster->name); + conn->state = C_NONE; + + MemoryContextSwitchTo(old_ctx); + + cluster->next = fake_cluster_list; + fake_cluster_list = cluster; + + return cluster; + } + + /* + * Call resolve function + */ + static const char * + cluster_resolve_name(ProxyFunction *func, FunctionCallInfo fcinfo) + { + const char *name; + HeapTuple row; + TupleDesc desc; + + plproxy_query_exec(func, fcinfo, func->cluster_sql); + + if (SPI_processed != 1) + plproxy_error(func, "'%s' returned %d rows, expected 1", + func->cluster_sql->sql, SPI_processed); + + desc = SPI_tuptable->tupdesc; + if (SPI_gettypeid(desc, 1) != TEXTOID) + plproxy_error(func, "expected text"); + + row = SPI_tuptable->vals[0]; + name = SPI_getvalue(row, desc, 1); + if (name == NULL) + plproxy_error(func, "Cluster name map func returned NULL"); + + return name; + } + + /* + * Find cached cluster of create new one. + * + * Function argument is only for error handling. + * Just func->cluster_name is used. + */ + ProxyCluster * + plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo) + { + ProxyCluster *cluster; + int cur_version; + const char *name; + Datum dname; + + /* functions used CONNECT */ + if (func->connect_str) + return fake_cluster(func); + + /* initialize plans on demand only */ + plproxy_cluster_plan_init(); + + if (func->cluster_sql) + name = cluster_resolve_name(func, fcinfo); + else + name = func->cluster_name; + + /* create Datum for name */ + dname = DirectFunctionCall1(textin, CStringGetDatum(name)); + + /* fetch serial, also check if exists */ + cur_version = get_version(func, dname); + + /* search if cached */ + for (cluster = cluster_list; cluster; cluster = cluster->next) + { + if (strcmp(cluster->name, name) == 0) + break; + } + + /* create if not */ + if (!cluster) + { + cluster = new_cluster(name); + cluster->next = cluster_list; + cluster_list = cluster; + } + + /* update if needed */ + if (cur_version != cluster->version) + { + reload_parts(cluster, dname, func); + cluster->version = cur_version; + } + + return cluster; + } + + static void + clean_cluster(ProxyCluster *cluster, struct timeval * now) + { + ProxyConnection *conn; + time_t age; + int i; + bool drop; + + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + if (conn->res) + { + PQclear(conn->res); + conn->res = NULL; + } + if (!conn->db) + continue; + + drop = false; + if (PQstatus(conn->db) != CONNECTION_OK) + { + drop = true; + } + else if (PLPROXY_CONN_LIFETIME > 0) + { + age = now->tv_sec - conn->connect_time; + if (age >= PLPROXY_CONN_LIFETIME) + drop = true; + } + + if (drop) + { + PQfinish(conn->db); + conn->db = NULL; + conn->state = C_NONE; + } + } + } + + /* + * Clean old connections and results from all clusters. + */ + void + plproxy_cluster_maint(struct timeval * now) + { + ProxyCluster *cluster; + + for (cluster = cluster_list; cluster; cluster = cluster->next) + clean_cluster(cluster, now); + for (cluster = fake_cluster_list; cluster; cluster = cluster->next) + clean_cluster(cluster, now); + } *** a/src/pl/plproxy/execute.c --- b/src/pl/plproxy/execute.c *************** *** 0 **** --- 1,724 ---- + /* + * PL/Proxy - easy access to partitioned database. + * + * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + /* + * Actual execution logic is here. + * + * - Tag particural databases, where query must be sent. + * - Send the query. + * - Fetch the results. + */ + + #include "plproxy.h" + + #include + + #include "poll_compat.h" + + /* some error happened */ + static void + conn_error(ProxyFunction *func, ProxyConnection *conn, const char *desc) + { + plproxy_error(func, "%s: %s", + desc, PQerrorMessage(conn->db)); + } + + /* Compare if major/minor match. Works on "MAJ.MIN.*" */ + static bool + cmp_branch(const char *this, const char *that) + { + int dot = 0; + int i; + + for (i = 0; this[i] || that[i]; i++) + { + /* allow just maj.min verson */ + if (dot && this[i] == '.' && !that[i]) + return true; + if (dot && that[i] == '.' && !this[i]) + return true; + + /* compare, different length is also handled here */ + if (this[i] != that[i]) + return false; + + /* stop on second dot */ + if (this[i] == '.' && dot++) + return true; + } + return true; + } + + static void + flush_connection(ProxyFunction *func, ProxyConnection *conn) + { + int res; + + /* flush it down */ + res = PQflush(conn->db); + + /* set actual state */ + if (res > 0) + conn->state = C_QUERY_WRITE; + else if (res == 0) + conn->state = C_QUERY_READ; + else + conn_error(func, conn, "PQflush"); + } + + /* + * Small sanity checking for new connections. + * + * Current checks: + * - Does there happen any encoding conversations? + * - Difference in standard_conforming_strings. + */ + static int + tune_connection(ProxyFunction *func, ProxyConnection *conn) + { + const char *this_enc, *dst_enc; + const char *dst_ver; + StringInfo sql = NULL; + + /* + * check if target server has same backend version. + */ + dst_ver = PQparameterStatus(conn->db, "server_version"); + conn->same_ver = cmp_branch(dst_ver, PG_VERSION); + + /* + * sync client_encoding + */ + this_enc = pg_get_client_encoding_name(); + dst_enc = PQparameterStatus(conn->db, "client_encoding"); + if (dst_enc && strcmp(this_enc, dst_enc)) + { + if (!sql) + sql = makeStringInfo(); + appendStringInfo(sql, "set client_encoding = '%s'; ", this_enc); + } + + /* + * if second time in this function, they should be active already. + */ + if (sql && conn->tuning) + { + /* display SET query */ + appendStringInfo(sql, "-- does not seem to apply"); + conn_error(func, conn, sql->data); + } + + /* + * send tuning query + */ + if (sql) + { + conn->tuning = 1; + conn->state = C_QUERY_WRITE; + if (!PQsendQuery(conn->db, sql->data)) + conn_error(func, conn, "PQsendQuery"); + pfree(sql->data); + pfree(sql); + + flush_connection(func, conn); + return 1; + } + + conn->tuning = 0; + return 0; + } + + /* send the query to server connection */ + static void + send_query(ProxyFunction *func, ProxyConnection *conn, + const char **values, int *plengths, int *pformats) + { + int res; + struct timeval now; + ProxyQuery *q = func->remote_sql; + int binary_result = 0; + + gettimeofday(&now, NULL); + conn->query_time = now.tv_sec; + + tune_connection(func, conn); + if (conn->tuning) + return; + + /* use binary result only on same backend ver */ + if (PLPROXY_USE_BINARY && conn->same_ver) + { + /* binary recv for non-record types */ + if (func->ret_scalar) + { + if (func->ret_scalar->has_recv) + binary_result = 1; + } + else + { + if (func->ret_composite->use_binary) + binary_result = 1; + } + } + + /* send query */ + conn->state = C_QUERY_WRITE; + res = PQsendQueryParams(conn->db, q->sql, q->arg_count, + NULL, /* paramTypes */ + values, /* paramValues */ + plengths, /* paramLengths */ + pformats, /* paramFormats */ + binary_result); /* resultformat, 0-text, 1-bin */ + if (!res) + conn_error(func, conn, "PQsendQueryParams"); + + /* flush it down */ + flush_connection(func, conn); + } + + /* returns false of conn should be dropped */ + static bool + check_old_conn(ProxyFunction *func, ProxyConnection *conn, struct timeval * now) + { + time_t t; + int res; + struct pollfd pfd; + + if (PQstatus(conn->db) != CONNECTION_OK) + return false; + + /* check if too old */ + if (PLPROXY_CONN_LIFETIME > 0) + { + t = now->tv_sec - conn->connect_time; + if (t >= PLPROXY_CONN_LIFETIME) + return false; + } + + /* how long ts been idle */ + t = now->tv_sec - conn->query_time; + if (t < PLPROXY_IDLE_CONN_CHECK) + return true; + + /* + * Simple way to check if old connection is stable - look if there + * are events pending. If there are drop the connection. + */ + intr_loop: + pfd.fd = PQsocket(conn->db); + pfd.events = POLLIN; + pfd.revents = 0; + + res = poll(&pfd, 1, 0); + if (res > 0) + { + elog(WARNING, "PL/Proxy: detected unstable connection"); + return false; + } + else if (res < 0) + { + if (errno == EINTR) + goto intr_loop; + plproxy_error(func, "check_old_conn: select failed: %s", + strerror(errno)); + } + + /* seems ok */ + return true; + } + + /* check existing conn status or launch new conn */ + static void + prepare_conn(ProxyFunction *func, ProxyConnection *conn) + { + struct timeval now; + + gettimeofday(&now, NULL); + + /* state should be C_READY or C_NONE */ + switch (conn->state) + { + case C_DONE: + conn->state = C_READY; + case C_READY: + if (check_old_conn(func, conn, &now)) + return; + + case C_CONNECT_READ: + case C_CONNECT_WRITE: + case C_QUERY_READ: + case C_QUERY_WRITE: + /* close rotten connection */ + elog(NOTICE, "PL/Proxy: dropping stale conn"); + PQfinish(conn->db); + conn->db = NULL; + conn->state = C_NONE; + case C_NONE: + break; + } + + conn->connect_time = now.tv_sec; + + /* launch new connection */ + conn->db = PQconnectStart(conn->connstr); + if (conn->db == NULL) + plproxy_error(func, "No memory for PGconn"); + + /* tag connection dirty */ + conn->state = C_CONNECT_WRITE; + + if (PQstatus(conn->db) == CONNECTION_BAD) + conn_error(func, conn, "PQconnectStart"); + } + + /* + * Connection has a resultset avalable, fetch it. + * + * Returns true if there may be more results coming, + * false if all done. + */ + static bool + another_result(ProxyFunction *func, ProxyConnection *conn) + { + PGresult *res; + + /* got one */ + res = PQgetResult(conn->db); + if (res == NULL) + { + if (conn->tuning) + conn->state = C_READY; + else + conn->state = C_DONE; + return false; + } + + switch (PQresultStatus(res)) + { + case PGRES_TUPLES_OK: + if (conn->res) + conn_error(func, conn, "double result?"); + conn->res = res; + break; + case PGRES_COMMAND_OK: + PQclear(res); + break; + default: + PQclear(res); + conn_error(func, conn, "remote error"); + } + return true; + } + + /* + * Called when select() told that conn is avail for reading/writing. + * + * It should call postgres handlers and then change state if needed. + */ + static void + handle_conn(ProxyFunction *func, ProxyConnection *conn) + { + int res; + PostgresPollingStatusType poll_res; + + switch (conn->state) + { + case C_CONNECT_READ: + case C_CONNECT_WRITE: + poll_res = PQconnectPoll(conn->db); + switch (poll_res) + { + case PGRES_POLLING_WRITING: + conn->state = C_CONNECT_WRITE; + break; + case PGRES_POLLING_READING: + conn->state = C_CONNECT_READ; + break; + case PGRES_POLLING_OK: + conn->state = C_READY; + break; + case PGRES_POLLING_ACTIVE: + case PGRES_POLLING_FAILED: + conn_error(func, conn, "PQconnectPoll"); + } + break; + case C_QUERY_WRITE: + flush_connection(func, conn); + break; + case C_QUERY_READ: + res = PQconsumeInput(conn->db); + if (res == 0) + conn_error(func, conn, "PQconsumeInput"); + + /* loop until PQgetResult returns NULL */ + while (1) + { + /* if PQisBusy, then incomplete result */ + if (PQisBusy(conn->db)) + break; + + /* got one */ + if (!another_result(func, conn)) + break; + } + case C_NONE: + case C_DONE: + case C_READY: + break; + } + } + + /* + * Check if tagged connections have interesting events. + * + * Currenly uses select() as it should be enough + * on small number of sockets. + */ + static int + poll_conns(ProxyFunction *func, ProxyCluster *cluster) + { + static struct pollfd *pfd_cache = NULL; + static int pfd_allocated = 0; + + int i, + res, + fd; + ProxyConnection *conn; + struct pollfd *pf; + int numfds = 0; + int ev = 0; + + if (pfd_allocated < cluster->conn_count) + { + struct pollfd *tmp; + int num = cluster->conn_count; + if (num < 64) + num = 64; + if (pfd_cache == NULL) + tmp = malloc(num * sizeof(struct pollfd)); + else + tmp = realloc(pfd_cache, num * sizeof(struct pollfd)); + if (!tmp) + elog(ERROR, "no mem for pollfd cache"); + pfd_cache = tmp; + pfd_allocated = num; + } + + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + if (!conn->run_on) + continue; + + /* decide what to do */ + switch (conn->state) + { + case C_DONE: + case C_READY: + case C_NONE: + continue; + case C_CONNECT_READ: + case C_QUERY_READ: + ev = POLLIN; + break; + case C_CONNECT_WRITE: + case C_QUERY_WRITE: + ev = POLLOUT; + break; + } + + /* add fd to proper set */ + pf = pfd_cache + numfds++; + pf->fd = PQsocket(conn->db); + pf->events = ev; + pf->revents = 0; + } + + /* wait for events */ + res = poll(pfd_cache, numfds, 1000); + if (res == 0) + return 0; + if (res < 0) + { + if (errno == EINTR) + return 0; + plproxy_error(func, "poll() failed: %s", strerror(errno)); + } + + /* now recheck the conns */ + pf = pfd_cache; + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + if (!conn->run_on) + continue; + + switch (conn->state) + { + case C_DONE: + case C_READY: + case C_NONE: + continue; + case C_CONNECT_READ: + case C_QUERY_READ: + case C_CONNECT_WRITE: + case C_QUERY_WRITE: + break; + } + + /* + * they should be in same order as called, + */ + fd = PQsocket(conn->db); + if (pf->fd != fd) + elog(WARNING, "fd order from poll() is messed up?"); + + if (pf->revents) + handle_conn(func, conn); + + pf++; + } + return 1; + } + + /* Run the query on all tagged connections in parallel */ + static void + remote_execute(ProxyFunction *func, + const char **values, int *plengths, int *pformats) + { + ExecStatusType err; + ProxyConnection *conn; + ProxyCluster *cluster = func->cur_cluster; + int i, + pending; + struct timeval now; + + /* either launch connection or send query */ + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + if (!conn->run_on) + continue; + + /* check if conn is alive, and launch if not */ + prepare_conn(func, conn); + + /* if conn is ready, then send query away */ + if (conn->state == C_READY) + send_query(func, conn, values, plengths, pformats); + } + + /* now loop until all results are arrived */ + pending = 1; + while (pending) + { + /* allow postgres to cancel processing */ + CHECK_FOR_INTERRUPTS(); + + /* wait for events */ + if (poll_conns(func, cluster) == 0) + continue; + + /* recheck */ + pending = 0; + gettimeofday(&now, NULL); + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + if (!conn->run_on) + continue; + + /* login finished, send query */ + if (conn->state == C_READY) + send_query(func, conn, values, plengths, pformats); + + if (conn->state != C_DONE) + pending++; + } + } + + /* review results, calculate total */ + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + + if ((conn->run_on || conn->res) + && !(conn->run_on && conn->res)) + plproxy_error(func, "run_on does not match res"); + + if (!conn->run_on) + continue; + + if (conn->state != C_DONE) + plproxy_error(func, "Unfinished connection"); + if (conn->res == NULL) + plproxy_error(func, "Lost result"); + + err = PQresultStatus(conn->res); + if (err != PGRES_TUPLES_OK) + plproxy_error(func, "Remote error: %s", + PQresultErrorMessage(conn->res)); + + cluster->ret_total += PQntuples(conn->res); + } + } + + /* Run hash function and tag connections */ + static void + tag_hash_partitions(ProxyFunction *func, FunctionCallInfo fcinfo) + { + int i; + TupleDesc desc; + Oid htype; + ProxyCluster *cluster = func->cur_cluster; + + /* execute cached plan */ + plproxy_query_exec(func, fcinfo, func->hash_sql); + + /* get header */ + desc = SPI_tuptable->tupdesc; + htype = SPI_gettypeid(desc, 1); + + /* tag connections */ + for (i = 0; i < SPI_processed; i++) + { + bool isnull; + uint32 hashval = 0; + HeapTuple row = SPI_tuptable->vals[i]; + Datum val = SPI_getbinval(row, desc, 1, &isnull); + + if (isnull) + plproxy_error(func, "Hash function returned NULL"); + + if (htype == INT4OID) + hashval = DatumGetInt32(val); + else if (htype == INT8OID) + hashval = DatumGetInt64(val); + else if (htype == INT2OID) + hashval = DatumGetInt16(val); + else + plproxy_error(func, "Hash result must be int2, int4 or int8"); + + hashval &= cluster->part_mask; + cluster->part_map[hashval]->run_on = 1; + } + + /* sanity check */ + if (SPI_processed == 0 || SPI_processed > 1) + if (!fcinfo->flinfo->fn_retset) + plproxy_error(func, "Only set-returning function" + " allows hashcount <> 1"); + } + + /* Clean old results and prepare for new one */ + void + plproxy_clean_results(ProxyCluster *cluster) + { + int i; + ProxyConnection *conn; + + if (!cluster) + return; + + cluster->ret_total = 0; + cluster->ret_cur_conn = 0; + + for (i = 0; i < cluster->conn_count; i++) + { + conn = &cluster->conn_list[i]; + if (conn->res) + { + PQclear(conn->res); + conn->res = NULL; + } + conn->pos = 0; + conn->run_on = 0; + } + /* conn state checks are done in prepare_conn */ + } + + /* Select partitions and execute query on them */ + void + plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo) + { + const char *values[FUNC_MAX_ARGS]; + int plengths[FUNC_MAX_ARGS]; + int pformats[FUNC_MAX_ARGS]; + int i; + int gotbin; + ProxyCluster *cluster = func->cur_cluster; + + /* clean old results */ + plproxy_clean_results(cluster); + + /* tag interesting partitions */ + switch (func->run_type) + { + case R_HASH: + tag_hash_partitions(func, fcinfo); + break; + case R_ALL: + for (i = 0; i < cluster->part_count; i++) + cluster->part_map[i]->run_on = 1; + break; + case R_EXACT: + i = func->exact_nr; + if (i < 0 || i >= cluster->part_count) + plproxy_error(func, "part number out of range"); + cluster->part_map[i]->run_on = 1; + break; + case R_ANY: + i = random() & cluster->part_mask; + cluster->part_map[i]->run_on = 1; + break; + default: + plproxy_error(func, "uninitialized run_type"); + } + + /* prepare args */ + gotbin = 0; + for (i = 0; i < func->remote_sql->arg_count; i++) + { + plengths[i] = 0; + pformats[i] = 0; + if (PG_ARGISNULL(i)) + { + values[i] = NULL; + } + else + { + int idx = func->remote_sql->arg_lookup[i]; + bool bin = PLPROXY_USE_BINARY; + + values[i] = plproxy_send_type(func->arg_types[idx], + PG_GETARG_DATUM(idx), + bin, + &plengths[i], + &pformats[i]); + + if (pformats[i]) + gotbin = 1; + } + } + + if (gotbin) + remote_execute(func, values, plengths, pformats); + else + remote_execute(func, values, NULL, NULL); + } *** a/src/pl/plproxy/expected/plproxy_clustermap.out --- b/src/pl/plproxy/expected/plproxy_clustermap.out *************** *** 0 **** --- 1,71 ---- + create or replace function plproxy.get_cluster_version(cluster_name text) + returns integer as $$ + begin + if cluster_name = 'testcluster' then + return 6; + elsif cluster_name = 'map0' then + return 1; + elsif cluster_name = 'map1' then + return 1; + elsif cluster_name = 'map2' then + return 1; + elsif cluster_name = 'map3' then + return 1; + end if; + raise exception 'no such cluster: %', cluster_name; + end; $$ language plpgsql; + create or replace function plproxy.get_cluster_partitions(cluster_name text) + returns setof text as $$ + begin + if cluster_name = 'testcluster' then + return next 'host=127.0.0.1 dbname=test_part0'; + return next 'host=127.0.0.1 dbname=test_part1'; + return next 'host=127.0.0.1 dbname=test_part2'; + return next 'host=127.0.0.1 dbname=test_part3'; + elsif cluster_name = 'map0' then + return next 'host=127.0.0.1 dbname=test_part0'; + elsif cluster_name = 'map1' then + return next 'host=127.0.0.1 dbname=test_part1'; + elsif cluster_name = 'map2' then + return next 'host=127.0.0.1 dbname=test_part2'; + elsif cluster_name = 'map3' then + return next 'host=127.0.0.1 dbname=test_part3'; + else + raise exception 'no such cluster: %', cluster_name; + end if; + return; + end; $$ language plpgsql; + create function map_cluster(part integer) returns text as $$ + begin + return 'map' || part; + end; + $$ language plpgsql; + create function test_clustermap(part integer) returns setof text as $$ + cluster map_cluster(part); + run on 0; + select current_database(); + $$ language plproxy; + select * from test_clustermap(0); + test_clustermap + ----------------- + test_part0 + (1 row) + + select * from test_clustermap(1); + test_clustermap + ----------------- + test_part1 + (1 row) + + select * from test_clustermap(2); + test_clustermap + ----------------- + test_part2 + (1 row) + + select * from test_clustermap(3); + test_clustermap + ----------------- + test_part3 + (1 row) + *** a/src/pl/plproxy/expected/plproxy_dynamic_record.out --- b/src/pl/plproxy/expected/plproxy_dynamic_record.out *************** *** 0 **** --- 1,51 ---- + -- dynamic query support testing + create or replace function dynamic_query(q text) + returns setof record as $x$ + cluster 'map0'; + run on all; + $x$ language plproxy; + \c test_part0 + create or replace function dynamic_query(q text) + returns setof record as $x$ + declare + ret record; + begin + for ret in execute q loop + return next ret; + end loop; + return; + end; + $x$ language plpgsql; + create table dynamic_query_test ( + id integer, + username text, + other text + ); + insert into dynamic_query_test values ( 1, 'user1', 'blah'); + insert into dynamic_query_test values ( 2, 'user2', 'foo'); + \c regression + select * from dynamic_query('select * from dynamic_query_test') as (id integer, username text, other text); + id | username | other + ----+----------+------- + 1 | user1 | blah + 2 | user2 | foo + (2 rows) + + select * from dynamic_query('select id, username from dynamic_query_test') as foo(id integer, username text); + id | username + ----+---------- + 1 | user1 + 2 | user2 + (2 rows) + + -- test errors + select * from dynamic_query('select * from dynamic_query_test'); + ERROR: a column definition list is required for functions returning "record" + create or replace function dynamic_query_select() + returns setof record as $x$ + cluster 'map0'; + run on all; + select id, username from dynamic_query_test; + $x$ language plproxy; + select * from dynamic_query_select() as (id integer, username text); + ERROR: PL/Proxy function public.dynamic_query_select(0): SELECT statement not allowed for dynamic RECORD functions *** a/src/pl/plproxy/expected/plproxy_errors.out --- b/src/pl/plproxy/expected/plproxy_errors.out *************** *** 0 **** --- 1,66 ---- + -- test bad arg + create function test_err1(dat text) + returns text as $$ + cluster 'testcluster'; + run on hashtext(username); + $$ language plproxy; + select * from test_err1('dat'); + ERROR: column "username" does not exist + LINE 1: select * from hashtext(username) + ^ + QUERY: select * from hashtext(username) + create function test_err2(dat text) + returns text as $$ + cluster 'testcluster'; + run on hashtext($2); + $$ language plproxy; + select * from test_err2('dat'); + ERROR: PL/Proxy function public.test_err2(1): Compile error at line 3: invalid argument reference: $2 + create function test_err3(dat text) + returns text as $$ + cluster 'nonexists'; + run on hashtext($1); + $$ language plproxy; + select * from test_err3('dat'); + ERROR: no such cluster: nonexists + CONTEXT: SQL statement "select * from plproxy.get_cluster_version($1)" + -- should work + create function test_err_none(dat text) + returns text as $$ + cluster 'testcluster'; + run on hashtext($1); + select 'ok'; + $$ language plproxy; + select * from test_err_none('dat'); + test_err_none + --------------- + ok + (1 row) + + --- result map errors + create function test_map_err1(dat text) + returns text as $$ cluster 'testcluster'; run on 0; + select dat as "foo", 'asd' as "bar"; + $$ language plproxy; + select * from test_map_err1('dat'); + ERROR: PL/Proxy function public.test_map_err1(1): single field function but got record + create function test_map_err2(dat text, out res1 text, out res2 text) + returns record as $$ cluster 'testcluster'; run on 0; + select dat as res1; + $$ language plproxy; + select * from test_map_err2('dat'); + ERROR: PL/Proxy function public.test_map_err2(1): Got too few fields from remote end + create function test_map_err3(dat text, out res1 text, out res2 text) + returns record as $$ cluster 'testcluster'; run on 0; + select dat as res1, 'foo' as res_none; + $$ language plproxy; + select * from test_map_err3('dat'); + ERROR: PL/Proxy function public.test_map_err3(1): Field res2 does not exists in result + create function test_map_err4(dat text, out res1 text, out res2 text) + returns record as $$ + --cluster 'testcluster'; + run on hashtext(dat); + select dat as res2, 'foo' as res1; + $$ language plproxy; + select * from test_map_err4('dat'); + ERROR: PL/Proxy function public.test_map_err4(1): Compile error at line 5: CLUSTER statement missing *** a/src/pl/plproxy/expected/plproxy_init.out --- b/src/pl/plproxy/expected/plproxy_init.out *************** *** 0 **** --- 1,2 ---- + CREATE LANGUAGE plproxy; + \set ECHO none *** a/src/pl/plproxy/expected/plproxy_many.out --- b/src/pl/plproxy/expected/plproxy_many.out *************** *** 0 **** --- 1,116 ---- + create or replace function plproxy.get_cluster_version(cluster_name text) + returns integer as $$ + begin + if cluster_name = 'testcluster' then + return 6; + end if; + raise exception 'no such cluster: %', cluster_name; + end; $$ language plpgsql; + create or replace function plproxy.get_cluster_partitions(cluster_name text) + returns setof text as $$ + begin + if cluster_name = 'testcluster' then + return next 'host=127.0.0.1 dbname=test_part0'; + return next 'host=127.0.0.1 dbname=test_part1'; + return next 'host=127.0.0.1 dbname=test_part2'; + return next 'host=127.0.0.1 dbname=test_part3'; + return; + end if; + raise exception 'no such cluster: %', cluster_name; + end; $$ language plpgsql; + \c test_part0 + create function test_multi(part integer, username text) + returns integer as $$ begin return 0; end; $$ language plpgsql; + \c test_part1 + create function test_multi(part integer, username text) + returns integer as $$ begin return 1; end; $$ language plpgsql; + \c test_part2 + create function test_multi(part integer, username text) + returns integer as $$ begin return 2; end; $$ language plpgsql; + \c test_part3 + create function test_multi(part integer, username text) + returns integer as $$ begin return 3; end; $$ language plpgsql; + \c regression + create function test_multi(part integer, username text) + returns integer as $$ cluster 'testcluster'; run on int4(part); $$ language plproxy; + select test_multi(0, 'foo'); + test_multi + ------------ + 0 + (1 row) + + select test_multi(1, 'foo'); + test_multi + ------------ + 1 + (1 row) + + select test_multi(2, 'foo'); + test_multi + ------------ + 2 + (1 row) + + select test_multi(3, 'foo'); + test_multi + ------------ + 3 + (1 row) + + -- test RUN ON ALL + drop function test_multi(integer, text); + create function test_multi(part integer, username text) + returns setof integer as $$ cluster 'testcluster'; run on all; $$ language plproxy; + select test_multi(0, 'foo'); + test_multi + ------------ + 0 + 1 + 2 + 3 + (4 rows) + + -- test RUN ON 2 + drop function test_multi(integer, text); + create function test_multi(part integer, username text) + returns setof integer as $$ cluster 'testcluster'; run on 2; $$ language plproxy; + select test_multi(0, 'foo'); + test_multi + ------------ + 2 + (1 row) + + -- test RUN ON RANDOM + select setseed(0); + setseed + --------- + + (1 row) + + drop function test_multi(integer, text); + create function test_multi(part integer, username text) + returns setof integer as $$ cluster 'testcluster'; run on any; $$ language plproxy; + select test_multi(0, 'foo'); + test_multi + ------------ + 3 + (1 row) + + select test_multi(0, 'foo'); + test_multi + ------------ + 2 + (1 row) + + select test_multi(0, 'foo'); + test_multi + ------------ + 1 + (1 row) + + select test_multi(0, 'foo'); + test_multi + ------------ + 3 + (1 row) + *** a/src/pl/plproxy/expected/plproxy_select.out --- b/src/pl/plproxy/expected/plproxy_select.out *************** *** 0 **** --- 1,37 ---- + -- test regular sql + create function test_select(xuser text, tmp boolean) + returns integer as $x$ + cluster 'testcluster'; + run on hashtext(xuser); + select /********* + junk ; + ********** ****/ id from sel_test where username = xuser + and ';' <> 'as;d''a ; sd' + and $tmp$ ; 'a' $tmp$ <> 'as;d''a ; sd' + and $tmp$ $ $$ $foo$tmp$ <> 'x'; + $x$ language plproxy; + \c test_part + create table sel_test ( + id integer, + username text + ); + insert into sel_test values ( 1, 'user'); + \c regression + select * from test_select('user', true); + test_select + ------------- + 1 + (1 row) + + select * from test_select('xuser', false); + ERROR: PL/Proxy function public.test_select(2): bug: no result + -- test errors + create function test_select_err(xuser text, tmp boolean) + returns integer as $$ + cluster 'testcluster'; + run on hashtext(xuser); + select id from sel_test where username = xuser; + select id from sel_test where username = xuser; + $$ language plproxy; + select * from test_select_err('user', true); + ERROR: PL/Proxy function public.test_select_err(2): Compile error at line 5: Only one SELECT statement allowed *** a/src/pl/plproxy/expected/plproxy_test.out --- b/src/pl/plproxy/expected/plproxy_test.out *************** *** 0 **** --- 1,312 ---- + -- test normal function + create function testfunc(username text, id integer, data text) + returns text as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function testfunc(username text, id integer, data text) + returns text as $$ begin return 'username=' || username; end; $$ language plpgsql; + \c regression + select * from testfunc('user', 1, 'foo'); + testfunc + --------------- + username=user + (1 row) + + select * from testfunc('user', 1, 'foo'); + testfunc + --------------- + username=user + (1 row) + + select * from testfunc('user', 1, 'foo'); + testfunc + --------------- + username=user + (1 row) + + -- test setof text + create function test_set(username text, num integer) + returns setof text as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_set(username text, num integer) + returns setof text as $$ + declare i integer; + begin + i := 0; + while i < num loop + return next 'username=' || username || ' row=' || i; + i := i + 1; + end loop; + return; + end; $$ language plpgsql; + \c regression + select * from test_set('user', 1); + test_set + --------------------- + username=user row=0 + (1 row) + + select * from test_set('user', 0); + test_set + ---------- + (0 rows) + + select * from test_set('user', 3); + test_set + --------------------- + username=user row=0 + username=user row=1 + username=user row=2 + (3 rows) + + -- test record + create type ret_test_rec as ( id integer, dat text); + create function test_record(username text, num integer) + returns ret_test_rec as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create type ret_test_rec as ( id integer, dat text); + create function test_record(username text, num integer) + returns ret_test_rec as $$ + declare ret ret_test_rec%rowtype; + begin + ret := (num, username); + return ret; + end; $$ language plpgsql; + \c regression + select * from test_record('user', 3); + id | dat + ----+------ + 3 | user + (1 row) + + -- test setof record + create function test_record_set(username text, num integer) + returns setof ret_test_rec as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_record_set(username text, num integer) + returns setof ret_test_rec as $$ + declare ret ret_test_rec%rowtype; i integer; + begin + i := 0; + while i < num loop + ret := (i, username); + i := i + 1; + return next ret; + end loop; + return; + end; $$ language plpgsql; + \c regression + select * from test_record_set('user', 1); + id | dat + ----+------ + 0 | user + (1 row) + + select * from test_record_set('user', 0); + id | dat + ----+----- + (0 rows) + + select * from test_record_set('user', 3); + id | dat + ----+------ + 0 | user + 1 | user + 2 | user + (3 rows) + + -- test void + create function test_void(username text, num integer) + returns void as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_void(username text, num integer) + returns void as $$ + begin + return; + end; $$ language plpgsql; + -- look what void actually looks + select * from test_void('void', 2); + test_void + ----------- + + (1 row) + + select test_void('void', 2); + test_void + ----------- + + (1 row) + + \c regression + select * from test_void('user', 1); + test_void + ----------- + + (1 row) + + select * from test_void('user', 3); + test_void + ----------- + + (1 row) + + select test_void('user', 3); + test_void + ----------- + + (1 row) + + select test_void('user', 3); + test_void + ----------- + + (1 row) + + -- test normal outargs + create function test_out1(username text, id integer, out data text) + as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_out1(username text, id integer, out data text) + returns text as $$ begin data := 'username=' || username; return; end; $$ language plpgsql; + \c regression + select * from test_out1('user', 1); + data + --------------- + username=user + (1 row) + + -- test complicated outargs + create function test_out2(username text, id integer, out out_id integer, xdata text, inout xdata2 text, out odata text) + as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_out2(username text, id integer, out out_id integer, xdata text, inout xdata2 text, out odata text) + as $$ begin + out_id = id; + xdata2 := xdata2 || xdata; + odata := 'username=' || username; + return; + end; $$ language plpgsql; + \c regression + select * from test_out2('user', 1, 'xdata', 'xdata2'); + out_id | xdata2 | odata + --------+-------------+--------------- + 1 | xdata2xdata | username=user + (1 row) + + -- test various types + create function test_types(username text, inout vbool boolean, inout xdate timestamp, inout bin bytea) + as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_types(username text, inout vbool boolean, inout xdate timestamp, inout bin bytea) + as $$ begin return; end; $$ language plpgsql; + \c regression + select * from test_types('types', true, '2009-11-04 12:12:02', E'a\\000\\001\\002b'); + vbool | xdate | bin + -------+--------------------------+---------------- + t | Wed Nov 04 12:12:02 2009 | a\000\001\002b + (1 row) + + select * from test_types('types', NULL, NULL, NULL); + vbool | xdate | bin + -------+-------+----- + | | + (1 row) + + -- test user defined types + create domain posint as int4 check (value > 0); + create type struct as (id int4, data text); + create function test_types2(username text, inout v_posint posint, inout v_struct struct, inout arr int8[]) + as $$ cluster 'testcluster'; run on 0; $$ language plproxy; + \c test_part + create domain posint as int4 check (value > 0); + create type struct as (id int4, data text); + create function test_types2(username text, inout v_posint posint, inout v_struct struct, inout arr int8[]) + as $$ begin return; end; $$ language plpgsql; + \c regression + select * from test_types2('types', 4, (2, 'asd'), array[1,2,3]); + v_posint | v_struct | arr + ----------+----------+--------- + 4 | (2,asd) | {1,2,3} + (1 row) + + select * from test_types2('types', NULL, NULL, NULL); + v_posint | v_struct | arr + ----------+----------+----- + | (,) | + (1 row) + + -- test CONNECT + create function test_connect1() returns text + as $$ connect 'dbname=test_part'; select current_database(); $$ language plproxy; + select * from test_connect1(); + test_connect1 + --------------- + test_part + (1 row) + + -- test quoting function + create type "RetWeird" as ( + "ColId" int4, + "ColData" text + ); + create function "testQuoting"(username text, id integer, data text) + returns "RetWeird" as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create type "RetWeird" as ( + "ColId" int4, + "ColData" text + ); + create function "testQuoting"(username text, id integer, data text) + returns "RetWeird" as $$ select 1::int4, 'BazOoka'::text $$ language sql; + \c regression + select * from "testQuoting"('user', '1', 'dat'); + ColId | ColData + -------+--------- + 1 | BazOoka + (1 row) + + -- test hash types function + create or replace function t_hash16(int4) returns int2 as $$ + declare + res int2; + begin + res = $1::int2; + return res; + end; + $$ language plpgsql; + create or replace function t_hash64(int4) returns int8 as $$ + declare + res int8; + begin + res = $1; + return res; + end; + $$ language plpgsql; + create function test_hash16(id integer, data text) + returns text as $$ cluster 'testcluster'; run on t_hash16(id); select data; $$ language plproxy; + select * from test_hash16('0', 'hash16'); + test_hash16 + ------------- + hash16 + (1 row) + + create function test_hash64(id integer, data text) + returns text as $$ cluster 'testcluster'; run on t_hash64(id); select data; $$ language plproxy; + select * from test_hash64('0', 'hash64'); + test_hash64 + ------------- + hash64 + (1 row) + + -- test argument difference + \c test_part + create function test_difftypes(username text, out val1 int2, out val2 float8) + as $$ begin val1 = 1; val2 = 3;return; end; $$ language plpgsql; + \c regression + create function test_difftypes(username text, out val1 int4, out val2 float4) + as $$ cluster 'testcluster'; run on 0; $$ language plproxy; + select * from test_difftypes('types'); + val1 | val2 + ------+------ + 1 | 3 + (1 row) + *** a/src/pl/plproxy/function.c --- b/src/pl/plproxy/function.c *************** *** 0 **** --- 1,479 ---- + /* + * PL/Proxy - easy access to partitioned database. + * + * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + /* + * Function compilation and caching. + * + * Functions here are called with CurrentMemoryContext == SPP Proc context. + * They switch to per-function context only during allocations. + */ + + #include "plproxy.h" + + + /* + * Function cache entry. + * + * As PL/Proxy does not do trigger functions, + * its enough to index just on OID. + * + * This structure is kept in HTAB's context. + */ + typedef struct + { + /* Key value. Must be at the start */ + Oid oid; + /* Pointer to function data */ + ProxyFunction *function; + } HashEntry; + + /* Function cache */ + static HTAB *fn_cache = NULL; + + /* + * During compilation function is linked here. + * + * This avoids memleaks when throwing errors. + */ + static ProxyFunction *partial_func = NULL; + + + + /* Allocate memory in the function's context */ + void * + plproxy_func_alloc(ProxyFunction *func, int size) + { + return MemoryContextAlloc(func->ctx, size); + } + + /* Allocate string in the function's context */ + char * + plproxy_func_strdup(ProxyFunction *func, const char *s) + { + int len = strlen(s) + 1; + char *res = plproxy_func_alloc(func, len); + + memcpy(res, s, len); + return res; + } + + + /* Initialize PL/Proxy function cache */ + void + plproxy_function_cache_init(void) + { + HASHCTL ctl; + int flags; + int max_funcs = 128; + + /* don't allow multiple initializations */ + Assert(fn_cache == NULL); + + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(HashEntry); + ctl.hash = oid_hash; + flags = HASH_ELEM | HASH_FUNCTION; + fn_cache = hash_create("PL/Proxy function cache", max_funcs, &ctl, flags); + } + + + /* Search for function in cache */ + static ProxyFunction * + fn_cache_lookup(Oid fn_oid) + { + HashEntry *hentry; + + hentry = hash_search(fn_cache, &fn_oid, HASH_FIND, NULL); + if (hentry) + return hentry->function; + return NULL; + } + + + /* Insert function into cache */ + static void + fn_cache_insert(ProxyFunction *func) + { + HashEntry *hentry; + bool found; + + hentry = hash_search(fn_cache, &func->oid, HASH_ENTER, &found); + Assert(found == false); + + hentry->function = func; + } + + + /* Delete function from cache */ + static void + fn_cache_delete(ProxyFunction *func) + { + HashEntry *hentry; + + hentry = hash_search(fn_cache, &func->oid, HASH_REMOVE, NULL); + Assert(hentry != NULL); + } + + /* check if function returns untyped RECORD which needs the AS clause */ + static bool + fn_returns_dynamic_record(HeapTuple proc_tuple) + { + Form_pg_proc proc_struct; + proc_struct = (Form_pg_proc) GETSTRUCT(proc_tuple); + if (proc_struct->prorettype == RECORDOID + && (heap_attisnull(proc_tuple, Anum_pg_proc_proargmodes) + || heap_attisnull(proc_tuple, Anum_pg_proc_proargnames))) + return true; + return false; + } + + /* + * Allocate storage for function. + * + * Each functions has its own MemoryContext, + * where everything is allocated. + */ + static ProxyFunction * + fn_new(FunctionCallInfo fcinfo, HeapTuple proc_tuple) + { + ProxyFunction *f; + MemoryContext f_ctx, + old_ctx; + + f_ctx = AllocSetContextCreate(TopMemoryContext, + "PL/Proxy function context", + ALLOCSET_SMALL_MINSIZE, + ALLOCSET_SMALL_INITSIZE, + ALLOCSET_SMALL_MAXSIZE); + + old_ctx = MemoryContextSwitchTo(f_ctx); + + f = palloc0(sizeof(*f)); + f->ctx = f_ctx; + f->oid = fcinfo->flinfo->fn_oid; + plproxy_set_stamp(&f->stamp, proc_tuple); + + if (fn_returns_dynamic_record(proc_tuple)) + f->dynamic_record = 1; + + MemoryContextSwitchTo(old_ctx); + + return f; + } + + + /* + * Delete function and release all associated storage + * + * Function is also deleted from cache. + */ + static void + fn_delete(ProxyFunction *func, bool in_cache) + { + if (in_cache) + fn_cache_delete(func); + + /* free cached plans */ + plproxy_query_freeplan(func->hash_sql); + plproxy_query_freeplan(func->cluster_sql); + + /* release function storage */ + MemoryContextDelete(func->ctx); + } + + /* + * Construct fully-qualified name for function. + */ + static void + fn_set_name(ProxyFunction *func, HeapTuple proc_tuple) + { + /* 2 names, size can double, "" + . + "" + NUL */ + char namebuf[NAMEDATALEN * 4 + 2 + 1 + 2 + 1]; + Form_pg_proc proc_struct; + Form_pg_namespace ns_struct; + HeapTuple ns_tup; + Oid nsoid; + + proc_struct = (Form_pg_proc) GETSTRUCT(proc_tuple); + nsoid = proc_struct->pronamespace; + + ns_tup = SearchSysCache(NAMESPACEOID, + ObjectIdGetDatum(nsoid), 0, 0, 0); + if (!HeapTupleIsValid(ns_tup)) + plproxy_error(func, "Cannot find namespace %u", nsoid); + ns_struct = (Form_pg_namespace) GETSTRUCT(ns_tup); + + snprintf(namebuf, sizeof(namebuf), "%s.%s", + quote_identifier(NameStr(ns_struct->nspname)), + quote_identifier(NameStr(proc_struct->proname))); + func->name = plproxy_func_strdup(func, namebuf); + + ReleaseSysCache(ns_tup); + } + + /* + * Parse source. + * + * It just fetches source and calls actual parser. + */ + static void + fn_parse(ProxyFunction *func, HeapTuple proc_tuple) + { + bool isnull; + Datum src_raw, src_detoast; + char *data; + int size; + + src_raw = SysCacheGetAttr(PROCOID, proc_tuple, Anum_pg_proc_prosrc, &isnull); + if (isnull) + plproxy_error(func, "procedure source datum is null"); + + src_detoast = PointerGetDatum(PG_DETOAST_DATUM_PACKED(src_raw)); + data = VARDATA_ANY(src_detoast); + size = VARSIZE_ANY_EXHDR(src_detoast); + + plproxy_run_parser(func, data, size); + + if (src_raw != src_detoast) + pfree(DatumGetPointer(src_detoast)); + } + + /* + * Get info about own arguments. + */ + static void + fn_get_arguments(ProxyFunction *func, + FunctionCallInfo fcinfo, + HeapTuple proc_tuple) + { + Oid *types; + char **names, + *modes; + int i, + pos, + total; + ProxyType *type; + + total = get_func_arg_info(proc_tuple, &types, &names, &modes); + + func->arg_types = plproxy_func_alloc(func, sizeof(ProxyType *) * total); + func->arg_names = plproxy_func_alloc(func, sizeof(char *) * total); + func->arg_count = 0; + + for (i = 0; i < total; i++) + { + if (modes && modes[i] == 'o') + continue; + type = plproxy_find_type_info(func, types[i], 1); + pos = func->arg_count++; + func->arg_types[pos] = type; + if (names && names[i]) + func->arg_names[pos] = plproxy_func_strdup(func, names[i]); + else + func->arg_names[pos] = NULL; + } + } + + /* + * Get info about return type. + * + * Fills one of ret_scalar or ret_composite. + */ + static void + fn_get_return_type(ProxyFunction *func, + FunctionCallInfo fcinfo, + HeapTuple proc_tuple) + { + Oid ret_oid; + TupleDesc ret_tup; + TypeFuncClass rtc; + MemoryContext old_ctx; + int natts; + + + /* + * get_call_result_type() will return newly allocated tuple, + * except in case of untyped RECORD functions. + */ + old_ctx = MemoryContextSwitchTo(func->ctx); + rtc = get_call_result_type(fcinfo, &ret_oid, &ret_tup); + if (func->dynamic_record && ret_tup) + ret_tup = CreateTupleDescCopy(ret_tup); + MemoryContextSwitchTo(old_ctx); + + switch (rtc) + { + case TYPEFUNC_COMPOSITE: + func->ret_composite = plproxy_composite_info(func, ret_tup); + natts = func->ret_composite->tupdesc->natts; + func->result_map = plproxy_func_alloc(func, natts * sizeof(int)); + break; + case TYPEFUNC_SCALAR: + func->ret_scalar = plproxy_find_type_info(func, ret_oid, 0); + func->result_map = NULL; + break; + case TYPEFUNC_RECORD: + case TYPEFUNC_OTHER: + /* fixme: void type here? */ + plproxy_error(func, "unsupported type"); + break; + } + } + + /* + * Check if cached ->ret_composite is valid, refresh if needed. + */ + static void + fn_refresh_record(FunctionCallInfo fcinfo, + ProxyFunction *func, + HeapTuple proc_tuple) + { + + TypeFuncClass rtc; + TupleDesc tuple_current, tuple_cached; + MemoryContext old_ctx; + int natts; + + /* + * Compare cached tuple to current one. + */ + tuple_cached = func->ret_composite->tupdesc; + rtc = get_call_result_type(fcinfo, NULL, &tuple_current); + Assert(rtc == TYPEFUNC_COMPOSITE); + if (equalTupleDescs(tuple_current, tuple_cached)) + return; + + /* move to function context */ + old_ctx = MemoryContextSwitchTo(func->ctx); + tuple_current = CreateTupleDescCopy(tuple_current); + MemoryContextSwitchTo(old_ctx); + + /* release old data */ + plproxy_free_composite(func->ret_composite); + pfree(func->result_map); + pfree(func->remote_sql); + + /* construct new data */ + func->ret_composite = plproxy_composite_info(func, tuple_current); + natts = func->ret_composite->tupdesc->natts; + func->result_map = plproxy_func_alloc(func, natts * sizeof(int)); + func->remote_sql = plproxy_standard_query(func, true); + } + + /* Show part of compilation -- get source and parse */ + static ProxyFunction * + fn_compile(FunctionCallInfo fcinfo, + HeapTuple proc_tuple, + bool validate) + { + ProxyFunction *f; + Form_pg_proc proc_struct; + + proc_struct = (Form_pg_proc) GETSTRUCT(proc_tuple); + if (proc_struct->provolatile != 'v') + elog(ERROR, "PL/Proxy functions must be volatile"); + + f = fn_new(fcinfo, proc_tuple); + + /* keep reference in case of error half-way */ + partial_func = f; + + /* info from system tables */ + fn_set_name(f, proc_tuple); + fn_get_return_type(f, fcinfo, proc_tuple); + fn_get_arguments(f, fcinfo, proc_tuple); + + /* parse body */ + fn_parse(f, proc_tuple); + + if (f->dynamic_record && f->remote_sql) + plproxy_error(f, "SELECT statement not allowed for dynamic RECORD functions"); + + /* create SELECT stmt if not specified */ + if (f->remote_sql == NULL) + f->remote_sql = plproxy_standard_query(f, true); + + /* prepare local queries */ + if (f->cluster_sql) + plproxy_query_prepare(f, fcinfo, f->cluster_sql); + if (f->hash_sql) + plproxy_query_prepare(f, fcinfo, f->hash_sql); + + /* sanity check */ + if (f->run_type == R_ALL && !fcinfo->flinfo->fn_retset) + plproxy_error(f, "RUN ON ALL requires set-returning function"); + + return f; + } + + /* + * Compile and cache PL/Proxy function. + */ + ProxyFunction * + plproxy_compile(FunctionCallInfo fcinfo, bool validate) + { + ProxyFunction *f; + HeapTuple proc_tuple; + Oid oid; + + /* clean interrupted compile */ + if (partial_func) + { + fn_delete(partial_func, false); + partial_func = NULL; + } + + /* get current fn oid */ + oid = fcinfo->flinfo->fn_oid; + + /* lookup the pg_proc tuple */ + proc_tuple = SearchSysCache(PROCOID, ObjectIdGetDatum(oid), 0, 0, 0); + if (!HeapTupleIsValid(proc_tuple)) + elog(ERROR, "cache lookup failed for function %u", oid); + + /* fn_extra not used, do lookup */ + f = fn_cache_lookup(oid); + + /* if cached, is it still valid? */ + if (f && !plproxy_check_stamp(&f->stamp, proc_tuple)) + { + fn_delete(f, true); + f = NULL; + } + + if (!f) + { + f = fn_compile(fcinfo, proc_tuple, validate); + + fn_cache_insert(f); + + /* now its safe to drop reference */ + partial_func = NULL; + } + else if (f->dynamic_record) + { + /* in case of untyped RECORD, check if cached type is valid */ + fn_refresh_record(fcinfo, f, proc_tuple); + } + + ReleaseSysCache(proc_tuple); + + return f; + } *** a/src/pl/plproxy/main.c --- b/src/pl/plproxy/main.c *************** *** 0 **** --- 1,214 ---- + /* + * PL/Proxy - easy access to partitioned database. + * + * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + /* + * External interface for PostgreSQL core. + * + * List of memory contexts that are touched by this code: + * + * - Query context that is active when plproxy_call_handler is called. + * Function results should be allocated from here. + * + * - SPI Proc context that activates in SPI_connect() and is freed + * in SPI_finish(). This is used for compile-time short-term storage. + * + * - HTAB has its own memory context. + * + * - ProxyFunction->ctx for long-term allocations for functions. + * + * - cluster_mem where info about clusters is stored. + * + * - SPI_saveplan() stores plan info in separate context, + * so it must be freed explicitly. + * + * - libpq uses malloc() so it must be freed explicitly + * + * Because SPI functions do not honour CurrentMemoryContext + * and code should not have assumptions whether core + * functions do allocations or not, the per-function and + * cluster MemoryContext is switched on only when doing actual + * allocations. Otherwise the default context is kept. + */ + + #include "plproxy.h" + + #include + + PG_MODULE_MAGIC; + + PG_FUNCTION_INFO_V1(plproxy_call_handler); + + /* + * Centralised error reporting. + * + * Also frees any pending results. + */ + void + plproxy_error(ProxyFunction *func, const char *fmt,...) + { + char msg[1024]; + va_list ap; + + va_start(ap, fmt); + vsnprintf(msg, sizeof(msg), fmt, ap); + va_end(ap); + + plproxy_clean_results(func->cur_cluster); + + elog(ERROR, "PL/Proxy function %s(%d): %s", + func->name, func->arg_count, msg); + } + + /* + * Library load-time initialization. + * Do the initialization when SPI is active to simplify the code. + */ + static void + plproxy_startup_init(void) + { + static bool initialized = false; + + if (initialized) + return; + + plproxy_function_cache_init(); + plproxy_cluster_cache_init(); + + initialized = true; + } + + /* + * Regular maintenance over all clusters. + */ + static void + run_maint(void) + { + static struct timeval last = {0, 0}; + struct timeval now; + + gettimeofday(&now, NULL); + if (now.tv_sec - last.tv_sec < 2 * 60) + return; + last = now; + + plproxy_cluster_maint(&now); + } + + /* + * Do compilation and execution under SPI. + * + * Result conversion will be done without SPI. + */ + static ProxyFunction * + compile_and_execute(FunctionCallInfo fcinfo) + { + int err; + ProxyFunction *func; + ProxyCluster *cluster; + + /* prepare SPI */ + err = SPI_connect(); + if (err != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect: %s", SPI_result_code_string(err)); + + /* do the initialization also under SPI */ + plproxy_startup_init(); + + /* compile code */ + func = plproxy_compile(fcinfo, false); + + /* get actual cluster to run on */ + cluster = plproxy_find_cluster(func, fcinfo); + + /* fetch PGresults */ + func->cur_cluster = cluster; + plproxy_exec(func, fcinfo); + + /* done with SPI */ + err = SPI_finish(); + if (err != SPI_OK_FINISH) + elog(ERROR, "SPI_finish: %s", SPI_result_code_string(err)); + + return func; + } + + /* + * Logic for set-returning functions. + * + * Currently it uses the simplest, return + * one value/tuple per call mechanism. + */ + static Datum + handle_ret_set(FunctionCallInfo fcinfo) + { + ProxyFunction *func; + FuncCallContext *ret_ctx; + + if (SRF_IS_FIRSTCALL()) + { + func = compile_and_execute(fcinfo); + ret_ctx = SRF_FIRSTCALL_INIT(); + ret_ctx->user_fctx = func; + } + + ret_ctx = SRF_PERCALL_SETUP(); + func = ret_ctx->user_fctx; + + if (func->cur_cluster->ret_total > 0) + { + SRF_RETURN_NEXT(ret_ctx, plproxy_result(func, fcinfo)); + } + else + { + plproxy_clean_results(func->cur_cluster); + SRF_RETURN_DONE(ret_ctx); + } + } + + /* + * The PostgreSQL function & trigger manager calls this function + * for execution of PL/Proxy procedures. + * + * Main entry point for rest of the code. + */ + Datum + plproxy_call_handler(PG_FUNCTION_ARGS) + { + ProxyFunction *func; + Datum ret; + + if (CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "PL/Proxy procedures can't be used as triggers"); + + /* clean old results */ + if (!fcinfo->flinfo->fn_retset || SRF_IS_FIRSTCALL()) + run_maint(); + + if (fcinfo->flinfo->fn_retset) + { + ret = handle_ret_set(fcinfo); + } + else + { + func = compile_and_execute(fcinfo); + ret = plproxy_result(func, fcinfo); + plproxy_clean_results(func->cur_cluster); + } + return ret; + } *** a/src/pl/plproxy/parser.y --- b/src/pl/plproxy/parser.y *************** *** 0 **** --- 1,203 ---- + %{ + /* + * PL/Proxy - easy access to partitioned database. + * + * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + #include "plproxy.h" + + /* define scanner.c functions */ + void plproxy_yy_scan_bytes(const char *bytes, int len); + + /* avoid permanent allocations */ + #define malloc palloc + #define free pfree + + /* remove unused code */ + #define YY_LOCATION_PRINT(File, Loc) (0) + #define YY_(x) (x) + + /* during parsing, keep reference to function here */ + static ProxyFunction *xfunc; + + /* remember what happened */ + static int got_run, got_cluster, got_connect; + + static QueryBuffer *cluster_sql; + static QueryBuffer *select_sql; + static QueryBuffer *hash_sql; + + /* points to one of the above ones */ + static QueryBuffer *cur_sql; + + /* keep the resetting code together with variables */ + static void reset_parser_vars(void) + { + got_run = got_cluster = got_connect = 0; + cur_sql = select_sql = cluster_sql = hash_sql = NULL; + xfunc = NULL; + } + + %} + + %name-prefix="plproxy_yy" + + %token CONNECT CLUSTER RUN ON ALL ANY SELECT + %token IDENT CONST NUMBER FNCALL STRING + %token SQLIDENT SQLPART + + %union + { + const char *str; + } + + %% + + body: | body stmt ; + + stmt: cluster_stmt | run_stmt | select_stmt | connect_stmt ; + + connect_stmt: CONNECT connect_spec ';' { + if (got_connect) + yyerror("Only one CONNECT statement allowed"); + xfunc->run_type = R_EXACT; + got_connect = 1; } + ; + + connect_spec: STRING { xfunc->connect_str = plproxy_func_strdup(xfunc, $1); } + ; + + cluster_stmt: CLUSTER cluster_spec ';' { + if (got_cluster) + yyerror("Only one CLUSTER statement allowed"); + got_cluster = 1; } + ; + + cluster_spec: cluster_name | cluster_func sql_token_list + ; + + cluster_func: FNCALL { cluster_sql = plproxy_query_start(xfunc, false); + cur_sql = cluster_sql; + plproxy_query_add_const(cur_sql, "select "); + plproxy_query_add_const(cur_sql, $1); } + ; + + cluster_name: STRING { xfunc->cluster_name = plproxy_func_strdup(xfunc, $1); } + ; + + run_stmt: RUN ON run_spec ';' { if (got_run) + yyerror("Only one RUN statement allowed"); + got_run = 1; } + ; + + run_spec: hash_func sql_token_list { xfunc->run_type = R_HASH; } + | NUMBER { xfunc->run_type = R_EXACT; xfunc->exact_nr = atoi($1); } + | ANY { xfunc->run_type = R_ANY; } + | ALL { xfunc->run_type = R_ALL; } + ; + + hash_func: FNCALL { hash_sql = plproxy_query_start(xfunc, false); + cur_sql = hash_sql; + plproxy_query_add_const(cur_sql, "select * from "); + plproxy_query_add_const(cur_sql, $1); } + ; + + select_stmt: sql_start sql_token_list ';' ; + + sql_start: SELECT { if (select_sql) + yyerror("Only one SELECT statement allowed"); + select_sql = plproxy_query_start(xfunc, true); + cur_sql = select_sql; + plproxy_query_add_const(cur_sql, $1); } + ; + sql_token_list: sql_token + | sql_token_list sql_token + ; + sql_token: SQLPART { plproxy_query_add_const(cur_sql, $1); } + | SQLIDENT { if (!plproxy_query_add_ident(cur_sql, $1)) + yyerror("invalid argument reference: %s", $1); } + ; + + %% + + /* + * report parser error. + */ + void yyerror(const char *fmt, ...) + { + char buf[1024]; + int lineno = plproxy_yyget_lineno(); + va_list ap; + + va_start(ap, fmt); + vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + /* reinitialize scanner */ + plproxy_yylex_destroy(); + + plproxy_error(xfunc, "Compile error at line %d: %s", lineno, buf); + } + + + /* actually run the flex/bison parser */ + void plproxy_run_parser(ProxyFunction *func, const char *body, int len) + { + /* reset variables, in case there was error exit */ + reset_parser_vars(); + + /* make current function visible to parser */ + xfunc = func; + + /* By default expect RUN ON ANY; */ + xfunc->run_type = R_ANY; + + /* reinitialize scanner */ + plproxy_yylex_startup(); + + /* setup scanner */ + plproxy_yy_scan_bytes(body, len); + + /* run parser */ + yyparse(); + + /* check for mandatory statements */ + if (got_connect) { + if (got_cluster || got_run) + yyerror("CONNECT cannot be used with CLUSTER/RUN"); + } else { + if (!got_cluster) + yyerror("CLUSTER statement missing"); + } + + /* release scanner resources */ + plproxy_yylex_destroy(); + + /* copy hash data if needed */ + if (xfunc->run_type == R_HASH) + xfunc->hash_sql = plproxy_query_finish(hash_sql); + + /* store sql */ + if (select_sql) + xfunc->remote_sql = plproxy_query_finish(select_sql); + + if (cluster_sql) + xfunc->cluster_sql = plproxy_query_finish(cluster_sql); + + reset_parser_vars(); + } + *** a/src/pl/plproxy/plproxy.h --- b/src/pl/plproxy/plproxy.h *************** *** 0 **** --- 1,301 ---- + /* + * PL/Proxy - easy access to partitioned database. + * + * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + /* + * Data structures for PL/Proxy function handler. + */ + + #ifndef plproxy_h_included + #define plproxy_h_included + + #include + #include + #include + #include + + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + #include + + #include "rowstamp.h" + + #include + + /* + * Is PL/Proxy allowed to use binary I/O? + */ + #define PLPROXY_USE_BINARY 0 + + /* + * How long a connection can stay open (in secs). + * Set 0 to disable. + */ + #define PLPROXY_CONN_LIFETIME (3*60*60) + + /* + * Maintenece period in seconds. Connnections will be freed + * from stale results, and checked for lifetime. + */ + #define PLPROXY_MAINT_PERIOD (2*60) + + /* + * Check connections that are idle more than this many seconds. + * Set 0 to always check. + */ + #define PLPROXY_IDLE_CONN_CHECK 2 + + /* Flag indicating where function should be executed */ + typedef enum RunOnType + { + R_HASH = 1, /* partition(s) returned by hash function */ + R_ALL = 2, /* on all partitions */ + R_ANY = 3, /* decide randomly during runtime */ + R_EXACT = 4 /* exact part number */ + } RunOnType; + + /* Connection states for async handler */ + typedef enum ConnState + { + C_NONE = 0, /* no connection object yet */ + C_CONNECT_WRITE, /* login phase: sending data */ + C_CONNECT_READ, /* login phase: waiting for server */ + C_READY, /* connection ready for query */ + C_QUERY_WRITE, /* query phase: sending data */ + C_QUERY_READ, /* query phase: waiting for server */ + C_DONE, /* query done, result available */ + } ConnState; + + /* Single database connection */ + typedef struct + { + const char *connstr; /* Connection string for libpq */ + + /* state */ + PGconn *db; /* libpq connection handle */ + PGresult *res; /* last resultset */ + int pos; /* Current position inside res */ + ConnState state; /* Connection state */ + time_t connect_time; /* When connection was started */ + time_t query_time; /* When last query was sent */ + bool run_on; /* True it this connection should be used */ + bool same_ver; /* True if dest backend has same X.Y ver */ + bool tuning; /* True if tuning query is running on conn */ + } ProxyConnection; + + /* Info about one cluster */ + typedef struct ProxyCluster + { + struct ProxyCluster *next; /* Pointer for building singly-linked list */ + + const char *name; /* Cluster name */ + int version; /* Cluster version */ + + int part_count; /* Number of partitions - power of 2 */ + int part_mask; /* Mask to use to get part number from hash */ + ProxyConnection **part_map; /* Pointers to conn_list */ + + int conn_count; /* Number of actual database connections */ + ProxyConnection *conn_list; /* List of actual database connections */ + + int ret_cur_conn; /* Result walking: index of current conn */ + int ret_cur_pos; /* Result walking: index of current row */ + int ret_total; /* Result walking: total rows left */ + } ProxyCluster; + + /* + * Type info cache. + * + * As the decision to send/receive binary may + * change in runtime, both text and binary + * function calls must be cached. + */ + typedef struct ProxyType + { + char *name; /* Name of the type */ + Oid type_oid; /* Oid of the type */ + + Oid io_param; /* Extra arg for input_func */ + bool for_send; /* True if for outputting */ + bool has_send; /* Has binary output */ + bool has_recv; /* Has binary input */ + bool by_value; /* False if Datum is a pointer to data */ + + /* I/O functions */ + union + { + struct + { + FmgrInfo output_func; + FmgrInfo send_func; + } out; + struct + { + FmgrInfo input_func; + FmgrInfo recv_func; + } in; + } io; + } ProxyType; + + /* + * Info cache for composite return type. + * + * There is AttInMetadata in core, but it does not support + * binary receive, so need our own struct. + */ + typedef struct ProxyComposite + { + TupleDesc tupdesc; /* Return tuple descriptor */ + ProxyType **type_list; /* Column type info */ + char **name_list; /* Quoted column names */ + bool use_binary; /* True if all columns support binary recv */ + } ProxyComposite; + + /* Temp structure for query parsing */ + typedef struct QueryBuffer QueryBuffer; + + /* + * Parsed query where references to function arguments + * are replaced with local args numbered sequentially: $1..$n. + */ + typedef struct ProxyQuery + { + char *sql; /* Prepared SQL string */ + int arg_count; /* Argument count for ->sql */ + int *arg_lookup; /* Maps local references to function args */ + void *plan; /* Optional prepared plan for local queries */ + } ProxyQuery; + + /* + * Complete info about compiled function. + * + * Note: only IN and INOUT arguments are cached here. + */ + typedef struct ProxyFunction + { + const char *name; /* Fully-qualified and quoted function name */ + Oid oid; /* Function OID */ + MemoryContext ctx; /* Where runtime allocations should happen */ + + RowStamp stamp; /* for pg_proc cache validation */ + + ProxyType **arg_types; /* Info about arguments */ + char **arg_names; /* Argument names, may contain NULLs */ + short arg_count; /* Argument count of proxy function */ + + /* if the function returns untyped RECORD that needs AS clause */ + bool dynamic_record; + + /* One of them is defined, other NULL */ + ProxyType *ret_scalar; /* Type info for scalar return val */ + ProxyComposite *ret_composite; /* Type info for composite return val */ + + /* data from function body */ + const char *cluster_name; /* Cluster where function should run */ + ProxyQuery *cluster_sql; /* Optional query for name resolving */ + + RunOnType run_type; /* Run type */ + ProxyQuery *hash_sql; /* Hash execution for R_HASH */ + int exact_nr; /* Hash value for R_EXACT */ + const char *connect_str; /* libpq string for CONNECT function */ + + /* + * calculated data + */ + + ProxyQuery *remote_sql; /* query to be run repotely */ + + /* + * current execution data + */ + + /* + * Cluster to be executed on. In case of CONNECT, + * function's private fake cluster object. + */ + ProxyCluster *cur_cluster; + + /* + * Maps result field num to libpq column num. + * It is filled for each result. NULL when scalar result. + */ + int *result_map; + } ProxyFunction; + + /* main.c */ + Datum plproxy_call_handler(PG_FUNCTION_ARGS); + void plproxy_error(ProxyFunction *func, const char *fmt,...); + + /* function.c */ + void plproxy_function_cache_init(void); + void *plproxy_func_alloc(ProxyFunction *func, int size); + char *plproxy_func_strdup(ProxyFunction *func, const char *s); + ProxyFunction *plproxy_compile(FunctionCallInfo fcinfo, bool validate); + + /* execute.c */ + void plproxy_exec(ProxyFunction *func, FunctionCallInfo fcinfo); + void plproxy_clean_results(ProxyCluster *cluster); + + /* scanner.c */ + int plproxy_yyget_lineno(void); + int plproxy_yylex_destroy(void); + int plproxy_yylex(void); + void plproxy_scanner_sqlmode(bool val); + void plproxy_yylex_startup(void); + + /* parser.y */ + void plproxy_run_parser(ProxyFunction *func, const char *body, int len); + void plproxy_yyerror(const char *fmt,...); + + /* type.c */ + ProxyComposite *plproxy_composite_info(ProxyFunction *func, TupleDesc tupdesc); + ProxyType *plproxy_find_type_info(ProxyFunction *func, Oid oid, bool for_send); + char *plproxy_send_type(ProxyType *type, Datum val, bool allow_bin, int *len, int *fmt); + Datum plproxy_recv_type(ProxyType *type, char *str, int len, bool bin); + HeapTuple plproxy_recv_composite(ProxyComposite *meta, char **values, int *lengths, int *fmts); + void plproxy_free_type(ProxyType *type); + void plproxy_free_composite(ProxyComposite *meta); + + /* cluster.c */ + void plproxy_cluster_cache_init(void); + ProxyCluster *plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo); + void plproxy_cluster_maint(struct timeval * now); + + /* result.c */ + Datum plproxy_result(ProxyFunction *func, FunctionCallInfo fcinfo); + + /* query.c */ + QueryBuffer *plproxy_query_start(ProxyFunction *func, bool add_types); + bool plproxy_query_add_const(QueryBuffer *q, const char *data); + bool plproxy_query_add_ident(QueryBuffer *q, const char *ident); + ProxyQuery *plproxy_query_finish(QueryBuffer *q); + ProxyQuery *plproxy_standard_query(ProxyFunction *func, bool add_types); + void plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q); + void plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q); + void plproxy_query_freeplan(ProxyQuery *q); + + #endif *** a/src/pl/plproxy/poll_compat.c --- b/src/pl/plproxy/poll_compat.c *************** *** 0 **** --- 1,140 ---- + + #include "postgres.h" + + #include "poll_compat.h" + + #ifdef PLPROXY_POLL_COMPAT + + /* + * Emulate poll() with select() + */ + + #include + #include + + /* + * dynamic buffer for fd_set to avoid depending on FD_SETSIZE + */ + + struct fd_buf { + fd_set *set; + int alloc_bytes; + }; + + static void fdbuf_zero(struct fd_buf *buf) + { + if (buf->set) + memset(buf->set, 0, buf->alloc_bytes); + } + + static bool fdbuf_resize(struct fd_buf *buf, int fd) + { + /* get some extra room for quaranteed alignment */ + int need_bytes = fd/8 + 32; + /* default - 2048 fds */ + int alloc = 256; + uint8 *ptr; + + if (buf->alloc_bytes < need_bytes) + { + while (alloc < need_bytes) + alloc *= 2; + + if (!buf->set) + ptr = malloc(alloc); + else + ptr = realloc(buf->set, alloc); + + if (!ptr) + return false; + + /* clean new area */ + memset(ptr + buf->alloc_bytes, 0, alloc - buf->alloc_bytes); + + buf->set = (fd_set *)ptr; + buf->alloc_bytes = alloc; + } + return true; + } + + int poll(struct pollfd *fds, nfds_t nfds, int timeout_ms) + { + static struct fd_buf readfds = { NULL, 0 }; + static struct fd_buf writefds = { NULL, 0 }; + + struct pollfd *pf; + int i, res, fd_max = 0; + struct timeval *tv = NULL; + struct timeval tvreal; + + /* convert timeout_ms to timeval */ + if (timeout_ms >= 0) + { + tvreal.tv_sec = timeout_ms / 1000; + tvreal.tv_usec = (timeout_ms % 1000) * 1000; + tv = &tvreal; + } else if (timeout_ms < -1) + goto err_inval; + + /* + * Convert pollfds to fd sets. + */ + fdbuf_zero(&readfds); + fdbuf_zero(&writefds); + for (i = 0; i < nfds; i++) + { + pf = fds + i; + if (pf->fd < 0) + goto err_badf; + + /* sets must be equal size */ + if (!fdbuf_resize(&readfds, pf->fd)) + goto err_nomem; + if (!fdbuf_resize(&writefds, pf->fd)) + goto err_nomem; + + if (pf->events & POLLIN) + FD_SET(pf->fd, readfds.set); + if (pf->events & POLLOUT) + FD_SET(pf->fd, writefds.set); + if (pf->fd > fd_max) + fd_max = pf->fd; + } + + res = select(fd_max + 1, readfds.set, writefds.set, NULL, tv); + if (res <= 0) + return res; + + /* + * select() and poll() count fd-s differently, + * need to recount them here. + */ + res = 0; + + for (i = 0; i < nfds; i++) + { + pf = fds + i; + pf->revents = 0; + if ((pf->events & POLLIN) && FD_ISSET(pf->fd, readfds.set)) + pf->revents |= POLLIN; + if ((pf->events & POLLOUT) && FD_ISSET(pf->fd, writefds.set)) + pf->revents |= POLLOUT; + if (pf->revents) + res += 1; + } + return res; + + err_nomem: + errno = ENOMEM; + return -1; + + err_badf: + errno = EBADF; + return -1; + err_inval: + errno = EINVAL; + return -1; + } + + #endif /* PLPROXY_POLL_COMPAT */ + *** a/src/pl/plproxy/poll_compat.h --- b/src/pl/plproxy/poll_compat.h *************** *** 0 **** --- 1,58 ---- + + #ifndef POLL_COMPAT_H + #define POLL_COMPAT_H + + /* define to test poll() compat function */ + #if 0 + #define PLPROXY_POLL_COMPAT + #endif + + #include + + /* see if real poll() can be used */ + #ifndef PLPROXY_POLL_COMPAT + #ifdef HAVE_POLL_H + #include + #else + #ifdef HAVE_SYS_POLL_H + #include + #else + #define PLPROXY_POLL_COMPAT + #endif + #endif + #endif + + /* + * Emulate poll() with select(), if needed. + */ + #ifdef PLPROXY_POLL_COMPAT + + /* in/out event types */ + #define POLLIN (1 << 0) + #define POLLOUT (1 << 1) + + /* rest are unused in this implementation */ + #define POLLHUP (1 << 2) + #define POLLPRI (1 << 3) + #define POLLNVAL (1 << 4) + #define POLLERR (1 << 5) + + /* avoid namespace conflicts */ + #define pollfd plproxy_compat_pollfd + #define poll plproxy_compat_poll + #define nfds_t plproxy_compat_nfds_t + + struct pollfd { + int fd; + short events; + short revents; + }; + + typedef unsigned long nfds_t; + + int poll(struct pollfd *fds, nfds_t nfds, int timeout_ms); + + #endif /* PLPROXY_POLL_COMPAT */ + + #endif /* POLL_COMPAT_H */ + *** a/src/pl/plproxy/query.c --- b/src/pl/plproxy/query.c *************** *** 0 **** --- 1,316 ---- + /* + * PL/Proxy - easy access to partitioned database. + * + * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + /* + * SQL statement generation helpers. + */ + + #include "plproxy.h" + + /* + * Temporary info structure for generation. + * + * Later it will be used to make ProxyQuery. + */ + struct QueryBuffer + { + ProxyFunction *func; + StringInfo sql; + int arg_count; + int *arg_lookup; + bool add_types; + }; + + /* + * Prepare temporary structure for query generation. + */ + QueryBuffer * + plproxy_query_start(ProxyFunction *func, bool add_types) + { + QueryBuffer *q = palloc(sizeof(*q)); + + q->func = func; + q->sql = makeStringInfo(); + q->arg_count = 0; + q->add_types = add_types; + q->arg_lookup = palloc(sizeof(int) * func->arg_count); + return q; + } + + /* + * Add string fragment to query. + */ + bool + plproxy_query_add_const(QueryBuffer *q, const char *data) + { + appendStringInfoString(q->sql, data); + return true; + } + + /* + * Helper for adding a parameter reference to the query + */ + static void + add_ref(StringInfo buf, int sql_idx, ProxyFunction *func, int fn_idx, bool add_type) + { + char tmp[32]; + + if (add_type) + sprintf(tmp, "$%d::%s", sql_idx + 1, + func->arg_types[fn_idx]->name); + else + sprintf(tmp, "$%d", sql_idx + 1); + appendStringInfoString(buf, tmp); + } + + /* + * Add a SQL identifier to the query that may possibly be + * a parameter reference. + */ + bool + plproxy_query_add_ident(QueryBuffer *q, const char *ident) + { + int i, + fn_idx = -1, + sql_idx = -1; + + if (ident[0] == '$') + { + fn_idx = atoi(ident + 1) - 1; + if (fn_idx < 0 || fn_idx >= q->func->arg_count) + return false; + } + else + { + for (i = 0; i < q->func->arg_count; i++) + { + if (strcasecmp(ident, q->func->arg_names[i]) == 0) + { + fn_idx = i; + break; + } + } + } + if (fn_idx >= 0) + { + for (i = 0; i < q->arg_count; i++) + { + if (q->arg_lookup[i] == fn_idx) + { + sql_idx = i; + break; + } + } + if (sql_idx < 0) + { + sql_idx = q->arg_count++; + q->arg_lookup[sql_idx] = fn_idx; + } + add_ref(q->sql, sql_idx, q->func, fn_idx, q->add_types); + } + else + appendStringInfoString(q->sql, ident); + return true; + } + + /* + * Create a ProxyQuery based on temporary QueryBuffer. + */ + ProxyQuery * + plproxy_query_finish(QueryBuffer *q) + { + ProxyQuery *pq; + MemoryContext old; + int len; + + old = MemoryContextSwitchTo(q->func->ctx); + + pq = palloc(sizeof(*pq)); + pq->sql = pstrdup(q->sql->data); + pq->arg_count = q->arg_count; + len = q->arg_count * sizeof(int); + pq->arg_lookup = palloc(len); + pq->plan = NULL; + memcpy(pq->arg_lookup, q->arg_lookup, len); + + MemoryContextSwitchTo(old); + + /* unnecessary actually, but lets be correct */ + if (1) + { + pfree(q->sql->data); + pfree(q->sql); + pfree(q->arg_lookup); + memset(q, 0, sizeof(*q)); + pfree(q); + } + + return pq; + } + + /* + * Generate a function call based on own signature. + */ + ProxyQuery * + plproxy_standard_query(ProxyFunction *func, bool add_types) + { + StringInfoData sql; + ProxyQuery *pq; + int i, + len; + + pq = plproxy_func_alloc(func, sizeof(*pq)); + pq->sql = NULL; + pq->plan = NULL; + pq->arg_count = func->arg_count; + len = pq->arg_count * sizeof(int); + pq->arg_lookup = plproxy_func_alloc(func, len); + + initStringInfo(&sql); + appendStringInfo(&sql, "select "); + + /* try to fill in all result column names */ + if (func->ret_composite) + { + ProxyComposite *t = func->ret_composite; + for (i = 0; i < t->tupdesc->natts; i++) + { + appendStringInfo(&sql, "%s%s::%s", + ((i > 0) ? ", " : ""), + t->name_list[i], + t->type_list[i]->name); + } + } + else + /* names not available, do a simple query */ + appendStringInfo(&sql, "r::%s", func->ret_scalar->name); + + /* function call */ + appendStringInfo(&sql, " from %s(", func->name); + + /* fill in function arguments */ + for (i = 0; i < func->arg_count; i++) + { + if (i > 0) + appendStringInfoChar(&sql, ','); + + add_ref(&sql, i, func, i, add_types); + pq->arg_lookup[i] = i; + } + appendStringInfoChar(&sql, ')'); + + /* + * Untyped RECORD needs types specified in AS (..) clause. + */ + if (func->dynamic_record) + { + ProxyComposite *t = func->ret_composite; + appendStringInfo(&sql, " as ("); + for (i = 0; i < t->tupdesc->natts; i++) + { + appendStringInfo(&sql, "%s%s %s", + ((i > 0) ? ", " : ""), + t->name_list[i], + t->type_list[i]->name); + } + appendStringInfoChar(&sql, ')'); + } + + if (func->ret_scalar) + appendStringInfo(&sql, " r"); + + pq->sql = plproxy_func_strdup(func, sql.data); + pfree(sql.data); + + return pq; + } + + /* + * Prepare ProxyQuery for local execution + */ + void + plproxy_query_prepare(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q) + { + int i; + Oid types[FUNC_MAX_ARGS]; + void *plan; + + /* create sql statement in sql */ + for (i = 0; i < q->arg_count; i++) + { + int idx = q->arg_lookup[i]; + + types[i] = func->arg_types[idx]->type_oid; + } + + /* prepare & store plan */ + plan = SPI_prepare(q->sql, q->arg_count, types); + q->plan = SPI_saveplan(plan); + } + + /* + * Execute ProxyQuery locally. + * + * Result will be in SPI_tuptable. + */ + void + plproxy_query_exec(ProxyFunction *func, FunctionCallInfo fcinfo, ProxyQuery *q) + { + int i, + idx, + err; + ProxyType *type; + char arg_nulls[FUNC_MAX_ARGS]; + Datum arg_values[FUNC_MAX_ARGS]; + + /* fill args */ + for (i = 0; i < q->arg_count; i++) + { + idx = q->arg_lookup[i]; + type = func->arg_types[idx]; + if (PG_ARGISNULL(idx)) + { + arg_nulls[i] = 'n'; + arg_values[i] = (Datum) NULL; + } + else + { + arg_nulls[i] = ' '; + arg_values[i] = PG_GETARG_DATUM(idx); + } + } + + /* run query */ + err = SPI_execute_plan(q->plan, arg_values, arg_nulls, true, 0); + if (err != SPI_OK_SELECT) + plproxy_error(func, "query '%s' failed: %s", + q->sql, SPI_result_code_string(err)); + } + + /* + * Free cached plan. + */ + void + plproxy_query_freeplan(ProxyQuery *q) + { + if (!q || !q->plan) + return; + SPI_freeplan(q->plan); + q->plan = NULL; + } *** a/src/pl/plproxy/result.c --- b/src/pl/plproxy/result.c *************** *** 0 **** --- 1,222 ---- + /* + * PL/Proxy - easy access to partitioned database. + * + * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + /* + * Conversion from PGresult to Datum. + * + * Functions here are called with CurrentMemoryContext == query context + * so that palloc()-ed memory stays valid after return to postgres. + */ + + #include "plproxy.h" + + static bool + name_matches(ProxyFunction *func, const char *aname, PGresult *res, int col) + { + const char *fname = PQfname(res, col); + + if (fname == NULL) + plproxy_error(func, "Unnamed result column %d", col + 1); + if (strcmp(aname, fname) == 0) + return true; + return false; + } + + /* fill func->result_map */ + static void + map_results(ProxyFunction *func, PGresult *res) + { + int i, + j, + natts, + nfields = PQnfields(res); + Form_pg_attribute a; + const char *aname; + + if (func->ret_scalar) + { + if (nfields != 1) + plproxy_error(func, + "single field function but got record"); + return; + } + + natts = func->ret_composite->tupdesc->natts; + if (nfields < natts) + plproxy_error(func, "Got too few fields from remote end"); + if (nfields > natts) + plproxy_error(func, "Got too many fields from remote end"); + + for (i = 0; i < natts; i++) + { + /* ->name_list has quoted names, take unquoted from ->tupdesc */ + a = func->ret_composite->tupdesc->attrs[i]; + aname = NameStr(a->attname); + + func->result_map[i] = -1; + if (name_matches(func, aname, res, i)) + /* fast case: 1:1 mapping */ + func->result_map[i] = i; + else + { + /* slow case: messed up ordering */ + for (j = 0; j < nfields; j++) + { + /* already tried this one */ + if (j == i) + continue; + + /* + * fixme: somehow remember the ones that are already mapped? + */ + if (name_matches(func, aname, res, j)) + { + func->result_map[i] = j; + break; + } + } + } + if (func->result_map[i] < 0) + plproxy_error(func, + "Field %s does not exists in result", aname); + + /* oid sanity check. does not seem to work. */ + if (0) + { + Oid arg_oid = func->ret_composite->type_list[i]->type_oid; + Oid col_oid = PQftype(res, func->result_map[i]); + + if (arg_oid < 2000 || col_oid < 2000) + { + if (arg_oid != col_oid) + elog(WARNING, "oids do not match:%d/%d", + arg_oid, col_oid); + } + } + } + } + + /* Return connection where are unreturned rows */ + static ProxyConnection * + walk_results(ProxyFunction *func, ProxyCluster *cluster) + { + ProxyConnection *conn; + + for (; cluster->ret_cur_conn < cluster->conn_count; + cluster->ret_cur_conn++) + { + conn = cluster->conn_list + cluster->ret_cur_conn; + if (conn->res == NULL) + continue; + if (conn->pos == PQntuples(conn->res)) + continue; + + /* first time on this connection? */ + if (conn->pos == 0) + map_results(func, conn->res); + + return conn; + } + + plproxy_error(func, "bug: no result"); + return NULL; + } + + /* Return a tuple */ + static Datum + return_composite(ProxyFunction *func, ProxyConnection *conn, FunctionCallInfo fcinfo) + { + int i, + col; + char *values[FUNC_MAX_ARGS]; + int fmts[FUNC_MAX_ARGS]; + int lengths[FUNC_MAX_ARGS]; + HeapTuple tup; + ProxyComposite *meta = func->ret_composite; + + for (i = 0; i < meta->tupdesc->natts; i++) + { + col = func->result_map[i]; + if (PQgetisnull(conn->res, conn->pos, col)) + { + values[i] = NULL; + lengths[i] = 0; + fmts[i] = 0; + } + else + { + values[i] = PQgetvalue(conn->res, conn->pos, col); + lengths[i] = PQgetlength(conn->res, conn->pos, col); + fmts[i] = PQfformat(conn->res, col); + } + } + tup = plproxy_recv_composite(meta, values, lengths, fmts); + return HeapTupleGetDatum(tup); + } + + /* Return scalar value */ + static Datum + return_scalar(ProxyFunction *func, ProxyConnection *conn, FunctionCallInfo fcinfo) + { + Datum dat; + char *val; + PGresult *res = conn->res; + int row = conn->pos; + + if (func->ret_scalar->type_oid == VOIDOID) + { + dat = (Datum) NULL; + } + else if (PQgetisnull(res, row, 0)) + { + fcinfo->isnull = true; + dat = (Datum) NULL; + } + else + { + val = PQgetvalue(res, row, 0); + if (val == NULL) + plproxy_error(func, "unexcpected NULL"); + dat = plproxy_recv_type(func->ret_scalar, val, + PQgetlength(res, row, 0), + PQfformat(res, 0)); + } + return dat; + } + + /* Return next result Datum */ + Datum + plproxy_result(ProxyFunction *func, FunctionCallInfo fcinfo) + { + Datum dat; + ProxyCluster *cluster = func->cur_cluster; + ProxyConnection *conn; + + conn = walk_results(func, cluster); + + if (func->ret_composite) + dat = return_composite(func, conn, fcinfo); + else + dat = return_scalar(func, conn, fcinfo); + + cluster->ret_total--; + conn->pos++; + + return dat; + } *** a/src/pl/plproxy/rowstamp.h --- b/src/pl/plproxy/rowstamp.h *************** *** 0 **** --- 1,27 ---- + + #ifndef __ROWSTAMP_H__ + #define __ROWSTAMP_H__ + + /* + * Stamp structure to detect row changes. + */ + + typedef struct RowStamp { + TransactionId xmin; + ItemPointerData tid; + } RowStamp; + + static inline void plproxy_set_stamp(RowStamp *stamp, HeapTuple tup) + { + stamp->xmin = HeapTupleHeaderGetXmin(tup->t_data); + stamp->tid = tup->t_self; + } + + static inline bool plproxy_check_stamp(RowStamp *stamp, HeapTuple tup) + { + return stamp->xmin == HeapTupleHeaderGetXmin(tup->t_data) + && ItemPointerEquals(&stamp->tid, &tup->t_self); + } + + #endif + *** a/src/pl/plproxy/scanner.l --- b/src/pl/plproxy/scanner.l *************** *** 0 **** --- 1,320 ---- + %{ + + /* + * PL/Proxy - easy access to partitioned database. + * + * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + #include "plproxy.h" + #include "parser.tab.h" + + /* for standard_conforming_strings */ + #include + + /* shut down crappy flex warnings */ + int yyget_lineno(void); + int yyget_leng(void); + FILE *yyget_in(void); + FILE *yyget_out(void); + char *yyget_text(void); + void plproxy_yyset_lineno(int); + void plproxy_yyset_in(FILE *); + void plproxy_yyset_out(FILE *); + int plproxy_yyget_debug(void); + void plproxy_yyset_debug(int); + int plproxy_yylex_destroy(void); + + /* point to parser value */ + #define yylval plproxy_yylval + + /* + * Allocate in CurrentMemoryContext. + * + * If we want to support flex 2.5.4, we cannot use + * options noyyalloc, noyyrealloc, noyyfree. + * + * Thus such need to hack malloc() et al. + */ + + #define malloc palloc + #define realloc repalloc + #define free(p) do { if (p) pfree(p); } while (0) + + + /* + * Calculare numeric flex version. + */ + #if !defined(YY_FLEX_MAJOR_VERSION) || !defined(YY_FLEX_MINOR_VERSION) + #error Flex required + #endif + #ifndef YY_FLEX_SUBMINOR_VERSION + #define YY_FLEX_SUBMINOR_VERSION 0 + #endif + #define FLXVER ((YY_FLEX_MAJOR_VERSION*1000 + YY_FLEX_MINOR_VERSION)*1000 + YY_FLEX_SUBMINOR_VERSION) + + void plproxy_yylex_startup(void) + { + /* there may be stale pointers around, drop them */ + #if FLXVER < 2005031 + (YY_CURRENT_BUFFER) = NULL; + #else + (yy_buffer_stack) = NULL; + #endif + plproxy_yylex_destroy(); + } + + /* + * compat stuff for older flex + */ + #if FLXVER < 2005031 + + /* old flex */ + + int plproxy_yylex_destroy(void) + { + plproxy_yy_delete_buffer(YY_CURRENT_BUFFER); + YY_CURRENT_BUFFER = NULL; + yy_start = 0; + yy_init = 1; + yylineno = 1; + return 0; + } + + int plproxy_yyget_lineno(void) + { + return yylineno; + } + + #endif + + /* own error handling */ + #define YY_FATAL_ERROR(msg) plproxy_yyerror(msg) + + /* disable stdio related code */ + #define YY_INPUT(buf, res, maxlen) { res = 0; } + + /* shortcut for returning CONST */ + #define RETPART do { yylval.str = yytext; return SQLPART; } while (0) + + /* dollar quoting helpers */ + static void dlr_start(const char *txt); + static bool dlr_stop(const char *txt); + + static const char *unquote(const char *qstr, bool std); + + %} + + %option 8bit case-insensitive + %option warn nodefault yylineno + %option nounput noyywrap never-interactive + %option prefix="plproxy_yy" + + /* states */ + %x sql + %x qident + %x stdq + %x extq + %x longcom + %x dolq + %x plcom + + /* whitespace */ + SPACE [ \t\n\r] + + /* sql ident. include dotted parts also */ + WORD [a-z][a-z0-9_]* + IDENT {WORD}({SPACE}*[.]{SPACE}*{WORD})* + + /* argument ref by val: $1 */ + NUMIDENT [$][0-9]+ + + /* regular int value for hash spec */ + PLNUMBER [0-9]+ + + /* SQL numeric value */ + SQLNUM [0-9][.0-9]* + + /* + * Symbols that may exist in sql. They must be matched one-by-one, + * to avoid conflics with combos. + * + * Excludes: [$'";`] + */ + SQLSYM [-!#%&()*+,/:<=>?@\[\]^{|}~] + + /* Dollar quote ID */ + DOLQ_START [a-z\200-\377_] + DOLQ_CONT [a-z\200-\377_0-9] + DOLQ ({DOLQ_START}{DOLQ_CONT}*) + + %% + + /* PL/Proxy language keywords */ + + cluster { return CLUSTER; } + connect { return CONNECT; } + run { return RUN; } + on { return ON; } + all { return ALL; } + any { return ANY; } + select { BEGIN(sql); yylval.str = yytext; return SELECT; } + + /* function call */ + + {IDENT}{SPACE}*[(] { BEGIN(sql); yylval.str = yytext; return FNCALL; } + + /* PL/Proxy language comments/whitespace */ + + {SPACE}+ { } + [-][-][^\n]* { } + [/][*] { BEGIN(plcom); } + [^*/]+ { } + [*]+[^*/]+ { } + [*]+[/] { BEGIN(INITIAL); } + . { } + + /* PL/Proxy non-keyword elements */ + + {IDENT} { yylval.str = yytext; return IDENT; } + {NUMIDENT} { yylval.str = yytext; return IDENT; } + {PLNUMBER} { yylval.str = yytext; return NUMBER; } + [']([^']+|[']['])*['] { yylval.str = unquote(yytext, true); return STRING; } + + /* unparsed symbol, let parser decide */ + + . { return *(yytext); } + + /* + * Following is parser for SQL statements. + */ + + /* SQL line comment */ + + [-][-][^\n]* { /* \n will be parsed as whitespace */ } + + /* C comment, parse it as whitespace */ + + [/][*] { BEGIN(longcom); } + [^*/]+ { } + [*]+[^*/]+ { } + [*]+[/] { BEGIN(sql); yylval.str = " "; return SQLPART; } + . { } + + /* Dollar quoted string */ + + [$]{DOLQ}?[$] { BEGIN(dolq); dlr_start(yytext); RETPART; } + [^$]+ { RETPART; } + [$]{DOLQ}?[$] { if (dlr_stop(yytext)) { BEGIN(sql); RETPART; } + /* if wrong one, report only 1 char */ + else { yyless(1); RETPART; } } + [$][^$]* { RETPART; } + + /* quoted indentifier */ + + ["] { BEGIN(qident); RETPART; } + [^"]+ { RETPART; } + [\\]. { RETPART; } + ["] { BEGIN(sql); RETPART; } + + /* quoted string start */ + + E['] { BEGIN(extq); RETPART; } + ['] { if (standard_conforming_strings) + BEGIN(stdq); else BEGIN(extq); + RETPART; } + + /* SQL standard quoted string body */ + + [^']+ { RETPART; } + [']['] { RETPART; } + ['] { BEGIN(sql); RETPART; } + + /* extended quoted string body */ + + [^'\\]+ { RETPART; } + [']['] { RETPART; } + [\\]. { RETPART; } + ['] { BEGIN(sql); RETPART; } + . { RETPART; } + + /* SQL identifier */ + + {IDENT} { yylval.str = yytext; return SQLIDENT; } + + /* $x argument reference */ + + {NUMIDENT} { yylval.str = yytext; return SQLIDENT; } + + /* SQL number */ + + {SQLNUM} { RETPART; } + + /* SQL symbol, parse them one-by-one */ + + {SQLSYM} { RETPART; } + + /* compress whitespace to singe ' ' */ + + {SPACE}+ { yylval.str = " "; return SQLPART; } + + /* SQL statement end */ + + [;] { BEGIN(INITIAL); return *(yytext); } + + /* unparsed symbol, let the parser error out */ + + . { return *(yytext); } + + %% + + static char *dlr_token = NULL; + + /* remember dollar quote name */ + static void dlr_start(const char *txt) + { + dlr_token = pstrdup(txt); + } + + /* check if matches stored name */ + static bool dlr_stop(const char *txt) + { + bool res = strcmp(txt, dlr_token) == 0; + if (res) { + pfree(dlr_token); + dlr_token = NULL; + } + return res; + } + + static const char *unquote(const char *qstr, bool std) + { + const char *p; + StringInfoData buf; + + initStringInfo(&buf); + for (p = qstr + 1; *p; p++) { + if (*p == '\'') { + if (*++p == 0) + break; + appendStringInfoChar(&buf, *p); + } else + appendStringInfoChar(&buf, *p); + } + /* leak buf.data */ + return buf.data; + } + *** a/src/pl/plproxy/sql/plproxy_clustermap.sql --- b/src/pl/plproxy/sql/plproxy_clustermap.sql *************** *** 0 **** --- 1,56 ---- + create or replace function plproxy.get_cluster_version(cluster_name text) + returns integer as $$ + begin + if cluster_name = 'testcluster' then + return 6; + elsif cluster_name = 'map0' then + return 1; + elsif cluster_name = 'map1' then + return 1; + elsif cluster_name = 'map2' then + return 1; + elsif cluster_name = 'map3' then + return 1; + end if; + raise exception 'no such cluster: %', cluster_name; + end; $$ language plpgsql; + + create or replace function plproxy.get_cluster_partitions(cluster_name text) + returns setof text as $$ + begin + if cluster_name = 'testcluster' then + return next 'host=127.0.0.1 dbname=test_part0'; + return next 'host=127.0.0.1 dbname=test_part1'; + return next 'host=127.0.0.1 dbname=test_part2'; + return next 'host=127.0.0.1 dbname=test_part3'; + elsif cluster_name = 'map0' then + return next 'host=127.0.0.1 dbname=test_part0'; + elsif cluster_name = 'map1' then + return next 'host=127.0.0.1 dbname=test_part1'; + elsif cluster_name = 'map2' then + return next 'host=127.0.0.1 dbname=test_part2'; + elsif cluster_name = 'map3' then + return next 'host=127.0.0.1 dbname=test_part3'; + else + raise exception 'no such cluster: %', cluster_name; + end if; + return; + end; $$ language plpgsql; + + create function map_cluster(part integer) returns text as $$ + begin + return 'map' || part; + end; + $$ language plpgsql; + + create function test_clustermap(part integer) returns setof text as $$ + cluster map_cluster(part); + run on 0; + select current_database(); + $$ language plproxy; + + select * from test_clustermap(0); + select * from test_clustermap(1); + select * from test_clustermap(2); + select * from test_clustermap(3); + *** a/src/pl/plproxy/sql/plproxy_dynamic_record.sql --- b/src/pl/plproxy/sql/plproxy_dynamic_record.sql *************** *** 0 **** --- 1,43 ---- + -- dynamic query support testing + create or replace function dynamic_query(q text) + returns setof record as $x$ + cluster 'map0'; + run on all; + $x$ language plproxy; + + \c test_part0 + create or replace function dynamic_query(q text) + returns setof record as $x$ + declare + ret record; + begin + for ret in execute q loop + return next ret; + end loop; + return; + end; + $x$ language plpgsql; + create table dynamic_query_test ( + id integer, + username text, + other text + ); + insert into dynamic_query_test values ( 1, 'user1', 'blah'); + insert into dynamic_query_test values ( 2, 'user2', 'foo'); + + \c regression + select * from dynamic_query('select * from dynamic_query_test') as (id integer, username text, other text); + select * from dynamic_query('select id, username from dynamic_query_test') as foo(id integer, username text); + + + -- test errors + select * from dynamic_query('select * from dynamic_query_test'); + + create or replace function dynamic_query_select() + returns setof record as $x$ + cluster 'map0'; + run on all; + select id, username from dynamic_query_test; + $x$ language plproxy; + select * from dynamic_query_select() as (id integer, username text); + *** a/src/pl/plproxy/sql/plproxy_errors.sql --- b/src/pl/plproxy/sql/plproxy_errors.sql *************** *** 0 **** --- 1,63 ---- + + -- test bad arg + create function test_err1(dat text) + returns text as $$ + cluster 'testcluster'; + run on hashtext(username); + $$ language plproxy; + select * from test_err1('dat'); + + create function test_err2(dat text) + returns text as $$ + cluster 'testcluster'; + run on hashtext($2); + $$ language plproxy; + select * from test_err2('dat'); + + create function test_err3(dat text) + returns text as $$ + cluster 'nonexists'; + run on hashtext($1); + $$ language plproxy; + select * from test_err3('dat'); + + -- should work + create function test_err_none(dat text) + returns text as $$ + cluster 'testcluster'; + run on hashtext($1); + select 'ok'; + $$ language plproxy; + select * from test_err_none('dat'); + + --- result map errors + create function test_map_err1(dat text) + returns text as $$ cluster 'testcluster'; run on 0; + select dat as "foo", 'asd' as "bar"; + $$ language plproxy; + select * from test_map_err1('dat'); + + create function test_map_err2(dat text, out res1 text, out res2 text) + returns record as $$ cluster 'testcluster'; run on 0; + select dat as res1; + $$ language plproxy; + select * from test_map_err2('dat'); + + create function test_map_err3(dat text, out res1 text, out res2 text) + returns record as $$ cluster 'testcluster'; run on 0; + select dat as res1, 'foo' as res_none; + $$ language plproxy; + select * from test_map_err3('dat'); + + create function test_map_err4(dat text, out res1 text, out res2 text) + returns record as $$ + --cluster 'testcluster'; + run on hashtext(dat); + select dat as res2, 'foo' as res1; + $$ language plproxy; + select * from test_map_err4('dat'); + + + + + *** a/src/pl/plproxy/sql/plproxy_init.sql --- b/src/pl/plproxy/sql/plproxy_init.sql *************** *** 0 **** --- 1,57 ---- + + + CREATE LANGUAGE plproxy; + + \set ECHO none + + -- create cluster info functions + create schema plproxy; + create or replace function plproxy.get_cluster_version(cluster_name text) + returns integer as $$ + begin + if cluster_name = 'testcluster' then + return 5; + end if; + raise exception 'no such cluster: %', cluster_name; + end; $$ language plpgsql; + + create or replace function + plproxy.get_cluster_partitions(cluster_name text) + returns setof text as $$ + begin + if cluster_name = 'testcluster' then + return next 'host=127.0.0.1 dbname=test_part'; + return; + end if; + raise exception 'no such cluster: %', cluster_name; + end; $$ language plpgsql; + + ------------------------------------------------- + -- intialize part + ------------------------------------------------- + drop database if exists test_part; + create database test_part; + \c test_part + create language plpgsql; + + drop database if exists test_part0; + create database test_part0; + \c test_part0 + create language plpgsql; + + drop database if exists test_part1; + create database test_part1; + \c test_part1 + create language plpgsql; + + drop database if exists test_part2; + create database test_part2; + \c test_part2 + create language plpgsql; + + drop database if exists test_part3; + create database test_part3; + \c test_part3 + create language plpgsql; + + *** a/src/pl/plproxy/sql/plproxy_many.sql --- b/src/pl/plproxy/sql/plproxy_many.sql *************** *** 0 **** --- 1,66 ---- + create or replace function plproxy.get_cluster_version(cluster_name text) + returns integer as $$ + begin + if cluster_name = 'testcluster' then + return 6; + end if; + raise exception 'no such cluster: %', cluster_name; + end; $$ language plpgsql; + + create or replace function plproxy.get_cluster_partitions(cluster_name text) + returns setof text as $$ + begin + if cluster_name = 'testcluster' then + return next 'host=127.0.0.1 dbname=test_part0'; + return next 'host=127.0.0.1 dbname=test_part1'; + return next 'host=127.0.0.1 dbname=test_part2'; + return next 'host=127.0.0.1 dbname=test_part3'; + return; + end if; + raise exception 'no such cluster: %', cluster_name; + end; $$ language plpgsql; + + \c test_part0 + create function test_multi(part integer, username text) + returns integer as $$ begin return 0; end; $$ language plpgsql; + \c test_part1 + create function test_multi(part integer, username text) + returns integer as $$ begin return 1; end; $$ language plpgsql; + \c test_part2 + create function test_multi(part integer, username text) + returns integer as $$ begin return 2; end; $$ language plpgsql; + \c test_part3 + create function test_multi(part integer, username text) + returns integer as $$ begin return 3; end; $$ language plpgsql; + + \c regression + create function test_multi(part integer, username text) + returns integer as $$ cluster 'testcluster'; run on int4(part); $$ language plproxy; + select test_multi(0, 'foo'); + select test_multi(1, 'foo'); + select test_multi(2, 'foo'); + select test_multi(3, 'foo'); + + -- test RUN ON ALL + drop function test_multi(integer, text); + create function test_multi(part integer, username text) + returns setof integer as $$ cluster 'testcluster'; run on all; $$ language plproxy; + select test_multi(0, 'foo'); + + -- test RUN ON 2 + drop function test_multi(integer, text); + create function test_multi(part integer, username text) + returns setof integer as $$ cluster 'testcluster'; run on 2; $$ language plproxy; + select test_multi(0, 'foo'); + + -- test RUN ON RANDOM + select setseed(0); + drop function test_multi(integer, text); + create function test_multi(part integer, username text) + returns setof integer as $$ cluster 'testcluster'; run on any; $$ language plproxy; + select test_multi(0, 'foo'); + select test_multi(0, 'foo'); + select test_multi(0, 'foo'); + select test_multi(0, 'foo'); + + *** a/src/pl/plproxy/sql/plproxy_select.sql --- b/src/pl/plproxy/sql/plproxy_select.sql *************** *** 0 **** --- 1,37 ---- + + -- test regular sql + create function test_select(xuser text, tmp boolean) + returns integer as $x$ + cluster 'testcluster'; + run on hashtext(xuser); + select /********* + junk ; + ********** ****/ id from sel_test where username = xuser + and ';' <> 'as;d''a ; sd' + and $tmp$ ; 'a' $tmp$ <> 'as;d''a ; sd' + and $tmp$ $ $$ $foo$tmp$ <> 'x'; + $x$ language plproxy; + + \c test_part + create table sel_test ( + id integer, + username text + ); + insert into sel_test values ( 1, 'user'); + + \c regression + select * from test_select('user', true); + select * from test_select('xuser', false); + + + -- test errors + create function test_select_err(xuser text, tmp boolean) + returns integer as $$ + cluster 'testcluster'; + run on hashtext(xuser); + select id from sel_test where username = xuser; + select id from sel_test where username = xuser; + $$ language plproxy; + + select * from test_select_err('user', true); + *** a/src/pl/plproxy/sql/plproxy_test.sql --- b/src/pl/plproxy/sql/plproxy_test.sql *************** *** 0 **** --- 1,200 ---- + + -- test normal function + create function testfunc(username text, id integer, data text) + returns text as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function testfunc(username text, id integer, data text) + returns text as $$ begin return 'username=' || username; end; $$ language plpgsql; + \c regression + select * from testfunc('user', 1, 'foo'); + select * from testfunc('user', 1, 'foo'); + select * from testfunc('user', 1, 'foo'); + + + -- test setof text + create function test_set(username text, num integer) + returns setof text as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_set(username text, num integer) + returns setof text as $$ + declare i integer; + begin + i := 0; + while i < num loop + return next 'username=' || username || ' row=' || i; + i := i + 1; + end loop; + return; + end; $$ language plpgsql; + \c regression + select * from test_set('user', 1); + select * from test_set('user', 0); + select * from test_set('user', 3); + + -- test record + create type ret_test_rec as ( id integer, dat text); + create function test_record(username text, num integer) + returns ret_test_rec as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create type ret_test_rec as ( id integer, dat text); + create function test_record(username text, num integer) + returns ret_test_rec as $$ + declare ret ret_test_rec%rowtype; + begin + ret := (num, username); + return ret; + end; $$ language plpgsql; + \c regression + select * from test_record('user', 3); + + -- test setof record + create function test_record_set(username text, num integer) + returns setof ret_test_rec as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_record_set(username text, num integer) + returns setof ret_test_rec as $$ + declare ret ret_test_rec%rowtype; i integer; + begin + i := 0; + while i < num loop + ret := (i, username); + i := i + 1; + return next ret; + end loop; + return; + end; $$ language plpgsql; + \c regression + select * from test_record_set('user', 1); + select * from test_record_set('user', 0); + select * from test_record_set('user', 3); + + + -- test void + create function test_void(username text, num integer) + returns void as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_void(username text, num integer) + returns void as $$ + begin + return; + end; $$ language plpgsql; + -- look what void actually looks + select * from test_void('void', 2); + select test_void('void', 2); + \c regression + select * from test_void('user', 1); + select * from test_void('user', 3); + select test_void('user', 3); + select test_void('user', 3); + + + -- test normal outargs + create function test_out1(username text, id integer, out data text) + as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_out1(username text, id integer, out data text) + returns text as $$ begin data := 'username=' || username; return; end; $$ language plpgsql; + \c regression + select * from test_out1('user', 1); + + -- test complicated outargs + create function test_out2(username text, id integer, out out_id integer, xdata text, inout xdata2 text, out odata text) + as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_out2(username text, id integer, out out_id integer, xdata text, inout xdata2 text, out odata text) + as $$ begin + out_id = id; + xdata2 := xdata2 || xdata; + odata := 'username=' || username; + return; + end; $$ language plpgsql; + \c regression + select * from test_out2('user', 1, 'xdata', 'xdata2'); + + -- test various types + create function test_types(username text, inout vbool boolean, inout xdate timestamp, inout bin bytea) + as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create function test_types(username text, inout vbool boolean, inout xdate timestamp, inout bin bytea) + as $$ begin return; end; $$ language plpgsql; + \c regression + select * from test_types('types', true, '2009-11-04 12:12:02', E'a\\000\\001\\002b'); + select * from test_types('types', NULL, NULL, NULL); + + + -- test user defined types + create domain posint as int4 check (value > 0); + create type struct as (id int4, data text); + + create function test_types2(username text, inout v_posint posint, inout v_struct struct, inout arr int8[]) + as $$ cluster 'testcluster'; run on 0; $$ language plproxy; + + \c test_part + create domain posint as int4 check (value > 0); + create type struct as (id int4, data text); + create function test_types2(username text, inout v_posint posint, inout v_struct struct, inout arr int8[]) + as $$ begin return; end; $$ language plpgsql; + \c regression + select * from test_types2('types', 4, (2, 'asd'), array[1,2,3]); + select * from test_types2('types', NULL, NULL, NULL); + + -- test CONNECT + create function test_connect1() returns text + as $$ connect 'dbname=test_part'; select current_database(); $$ language plproxy; + select * from test_connect1(); + + + -- test quoting function + create type "RetWeird" as ( + "ColId" int4, + "ColData" text + ); + + create function "testQuoting"(username text, id integer, data text) + returns "RetWeird" as $$ cluster 'testcluster'; run on hashtext(username); $$ language plproxy; + \c test_part + create type "RetWeird" as ( + "ColId" int4, + "ColData" text + ); + create function "testQuoting"(username text, id integer, data text) + returns "RetWeird" as $$ select 1::int4, 'BazOoka'::text $$ language sql; + \c regression + select * from "testQuoting"('user', '1', 'dat'); + + -- test hash types function + create or replace function t_hash16(int4) returns int2 as $$ + declare + res int2; + begin + res = $1::int2; + return res; + end; + $$ language plpgsql; + + create or replace function t_hash64(int4) returns int8 as $$ + declare + res int8; + begin + res = $1; + return res; + end; + $$ language plpgsql; + + create function test_hash16(id integer, data text) + returns text as $$ cluster 'testcluster'; run on t_hash16(id); select data; $$ language plproxy; + select * from test_hash16('0', 'hash16'); + + create function test_hash64(id integer, data text) + returns text as $$ cluster 'testcluster'; run on t_hash64(id); select data; $$ language plproxy; + select * from test_hash64('0', 'hash64'); + + -- test argument difference + \c test_part + create function test_difftypes(username text, out val1 int2, out val2 float8) + as $$ begin val1 = 1; val2 = 3;return; end; $$ language plpgsql; + \c regression + create function test_difftypes(username text, out val1 int4, out val2 float4) + as $$ cluster 'testcluster'; run on 0; $$ language plproxy; + select * from test_difftypes('types'); + *** a/src/pl/plproxy/type.c --- b/src/pl/plproxy/type.c *************** *** 0 **** --- 1,336 ---- + /* + * PL/Proxy - easy access to partitioned database. + * + * Copyright (c) 2006 Sven Suursoho, Skype Technologies OÜ + * Copyright (c) 2007 Marko Kreen, Skype Technologies OÜ + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + + /* + * Caches I/O info about scalar values. + */ + + #include "plproxy.h" + + /* + * Checks if we can safely use binary. + */ + static bool usable_binary(Oid oid) + { + switch (oid) + { + case BOOLOID: + case INT2OID: + case INT4OID: + case INT8OID: + case TEXTOID: + case BPCHAROID: + case VARCHAROID: + case FLOAT4OID: + case FLOAT8OID: + case NUMERICOID: + case BYTEAOID: + return true; + + /* integer vs. float issue */ + case TIMESTAMPOID: + case TIMESTAMPTZOID: + case DATEOID: + case TIMEOID: + + /* interval binary fmt changed in 8.1 */ + case INTERVALOID: + default: + return false; + } + } + + /* + * Collects info about fields of a composite type. + * + * Based on TupleDescGetAttInMetadata. + */ + ProxyComposite * + plproxy_composite_info(ProxyFunction *func, TupleDesc tupdesc) + { + int i, + natts = tupdesc->natts; + ProxyComposite *ret; + MemoryContext old_ctx; + Form_pg_attribute a; + ProxyType *type; + const char *name; + + old_ctx = MemoryContextSwitchTo(func->ctx); + + ret = palloc(sizeof(*ret)); + ret->type_list = palloc(sizeof(ProxyType *) * natts); + ret->name_list = palloc0(sizeof(char *) * natts); + ret->tupdesc = BlessTupleDesc(tupdesc); + ret->use_binary = 1; + + MemoryContextSwitchTo(old_ctx); + + for (i = 0; i < natts; i++) + { + a = tupdesc->attrs[i]; + if (a->attisdropped) + plproxy_error(func, "dropped attrs not supported"); + + name = quote_identifier(NameStr(a->attname)); + ret->name_list[i] = plproxy_func_strdup(func, name); + + type = plproxy_find_type_info(func, a->atttypid, 0); + ret->type_list[i] = type; + + if (!type->has_recv) + ret->use_binary = 0; + } + + return ret; + } + + void + plproxy_free_composite(ProxyComposite *rec) + { + int i; + int natts = rec->tupdesc->natts; + + for (i = 0; i < natts; i++) + { + plproxy_free_type(rec->type_list[i]); + pfree(rec->name_list[i]); + } + pfree(rec->type_list); + pfree(rec->name_list); + FreeTupleDesc(rec->tupdesc); + pfree(rec); + } + + void + plproxy_free_type(ProxyType *type) + { + if (type->name) + pfree(type->name); + + /* hopefully I/O functions do not use ->fn_extra */ + + pfree(type); + } + + /* + * Build result tuple from binary or CString values. + * + * Based on BuildTupleFromCStrings. + */ + HeapTuple + plproxy_recv_composite(ProxyComposite *meta, char **values, int *lengths, int *fmts) + { + TupleDesc tupdesc = meta->tupdesc; + int natts = tupdesc->natts; + Datum *dvalues; + char *nulls; + int i; + HeapTuple tuple; + + dvalues = (Datum *) palloc(natts * sizeof(Datum)); + nulls = (char *) palloc(natts * sizeof(char)); + + /* Call the recv function for each attribute */ + for (i = 0; i < natts; i++) + { + if (tupdesc->attrs[i]->attisdropped) + elog(ERROR, "dropped attrs not supported"); + + dvalues[i] = plproxy_recv_type(meta->type_list[i], + values[i], lengths[i], fmts[i]); + nulls[i] = (values[i] != NULL) ? ' ' : 'n'; + } + + /* Form a tuple */ + tuple = heap_formtuple(tupdesc, dvalues, nulls); + + /* + * Release locally palloc'd space. + */ + for (i = 0; i < natts; i++) + { + if (nulls[i] == 'n') + continue; + if (meta->type_list[i]->by_value) + continue; + pfree(DatumGetPointer(dvalues[i])); + } + pfree(dvalues); + pfree(nulls); + + return tuple; + } + + /* Find info about scalar type */ + ProxyType * + plproxy_find_type_info(ProxyFunction *func, Oid oid, bool for_send) + { + ProxyType *type; + HeapTuple t_type, + t_nsp; + Form_pg_type s_type; + Form_pg_namespace s_nsp; + char namebuf[NAMEDATALEN * 4 + 2 + 1 + 2 + 1]; + Oid nsoid; + + /* fetch pg_type row */ + t_type = SearchSysCache(TYPEOID, ObjectIdGetDatum(oid), 0, 0, 0); + if (!HeapTupleIsValid(t_type)) + plproxy_error(func, "cache lookup failed for type %u", oid); + + /* typname, typnamespace, PG_CATALOG_NAMESPACE, PG_PUBLIC_NAMESPACE */ + s_type = (Form_pg_type) GETSTRUCT(t_type); + nsoid = s_type->typnamespace; + + if (nsoid != PG_CATALOG_NAMESPACE) + { + t_nsp = SearchSysCache(NAMESPACEOID, ObjectIdGetDatum(nsoid), 0, 0, 0); + if (!HeapTupleIsValid(t_nsp)) + plproxy_error(func, "cache lookup failed for namespace %u", nsoid); + s_nsp = (Form_pg_namespace) GETSTRUCT(t_nsp); + snprintf(namebuf, sizeof(namebuf), "%s.%s", + quote_identifier(NameStr(s_nsp->nspname)), + quote_identifier(NameStr(s_type->typname))); + ReleaseSysCache(t_nsp); + } + else + { + snprintf(namebuf, sizeof(namebuf), "%s", quote_identifier(NameStr(s_type->typname))); + } + + /* sanity check */ + switch (s_type->typtype) + { + default: + case 'p': + if (oid != VOIDOID) + plproxy_error(func, "unsupported pseudo type: %s (%u)", + namebuf, oid); + case 'b': + case 'c': + case 'd': + break; + } + + /* allocate & fill structure */ + type = plproxy_func_alloc(func, sizeof(*type)); + memset(type, 0, sizeof(*type)); + + type->type_oid = oid; + type->io_param = getTypeIOParam(t_type); + type->for_send = for_send; + type->by_value = s_type->typbyval; + type->name = plproxy_func_strdup(func, namebuf); + + /* decide what function is needed */ + if (for_send) + { + fmgr_info_cxt(s_type->typoutput, &type->io.out.output_func, func->ctx); + if (OidIsValid(s_type->typsend) && usable_binary(oid)) + { + fmgr_info_cxt(s_type->typsend, &type->io.out.send_func, func->ctx); + type->has_send = 1; + } + } + else + { + fmgr_info_cxt(s_type->typinput, &type->io.in.input_func, func->ctx); + if (OidIsValid(s_type->typreceive) && usable_binary(oid)) + { + fmgr_info_cxt(s_type->typreceive, &type->io.in.recv_func, func->ctx); + type->has_recv = 1; + } + } + + ReleaseSysCache(t_type); + + return type; + } + + + /* Convert a Datum to parameter for libpq */ + char * + plproxy_send_type(ProxyType *type, Datum val, bool allow_bin, int *len, int *fmt) + { + bytea *bin; + char *res; + + Assert(type->for_send == 1); + + if (allow_bin && type->has_send) + { + bin = SendFunctionCall(&type->io.out.send_func, val); + res = VARDATA(bin); + *len = VARSIZE(bin) - VARHDRSZ; + *fmt = 1; + } + else + { + res = OutputFunctionCall(&type->io.out.output_func, val); + *len = 0; + *fmt = 0; + } + return res; + } + + /* + * Point StringInfo to fixed buffer. + * + * Supposedly StringInfo wants 0 at the end. + * Luckily libpq already provides it so all is fine. + * + * Although it should not matter to binary I/O functions. + */ + static void + setFixedStringInfo(StringInfo str, void *data, int len) + { + str->data = data; + str->maxlen = len; + str->len = len; + str->cursor = 0; + } + + /* Convert a libpq result to Datum */ + Datum + plproxy_recv_type(ProxyType *type, char *val, int len, bool bin) + { + Datum res; + StringInfoData buf; + + Assert(type->for_send == 0); + + if (bin) + { + if (!type->has_recv) + elog(ERROR, "PL/Proxy: type %u recv not supported", type->type_oid); + + /* avoid unnecessary copy */ + setFixedStringInfo(&buf, val, len); + + res = ReceiveFunctionCall(&type->io.in.recv_func, + &buf, type->io_param, -1); + } + else + { + res = InputFunctionCall(&type->io.in.input_func, + val, type->io_param, -1); + } + return res; + }