*** a/doc/src/sgml/catalogs.sgml --- b/doc/src/sgml/catalogs.sgml *************** *** 1986,1992 **** --- 1986,1994 ---- a = no action, r = restrict, c = cascade, + C = each cascade, n = set null, + N = each set null, d = set default *************** *** 1999,2005 **** --- 2001,2009 ---- a = no action, r = restrict, c = cascade, + C = each cascade, n = set null, + N = each set null, d = set default *************** *** 2102,2107 **** --- 2106,2118 ---- If a check constraint, a human-readable representation of the expression + + + confisarray + bool + + If a foreign key, true if a EACH REFERENCE foreign key + *** a/doc/src/sgml/ddl.sgml --- b/doc/src/sgml/ddl.sgml *************** *** 866,871 **** CREATE TABLE order_items ( --- 866,990 ---- + + Each Foreign Key Constraints + + + each foreign key + + + + foreign key array + + + + constraint + each foreign key + + + + referential integrity + + + + Another option you have with foreign keys is to use a + referencing column which is an array of elements with + the same type (or a compatible one) as the referenced + column in the related table. This feature, commonly known + as foreign key arrays, is implemented + in PostgreSQL with each foreign key constraints, + as described in the following example: + + + CREATE TABLE drivers ( + driver_id integer PRIMARY KEY, + first_name text, + last_name text, + ... + ); + + CREATE TABLE races ( + race_id integer PRIMARY KEY, + title text, + race_day DATE, + ... + final_positions integer[] EACH REFERENCES drivers + ); + + + The above example uses an array (final_positions) + to store the results of a race: for each of its elements + a referential integrity check is enforced on the + drivers table. + Note that EACH REFERENCES is an extension + of PostgreSQL and it is not included in the SQL standard. + + + + As far as referential actions are concerned, + when working with foreign key arrays, you have two more + options that can be used with your + EACH REFERENCES definition: + EACH CASCADE and + EACH SET NULL. Depending on + the triggering action (DELETE or + UPDATE) on the referenced table, + every element in the referencing array will be either + deleted/updated or set to the null value. + + + + For instance, you can change the definition of the + races table so that a DELETE + on the drivers table will remove + the referencing elements from the final_positions + array: + + + CREATE TABLE races ( + ... + final_positions integer[] EACH REFERENCES drivers + ON DELETE EACH CASCADE ON UPDATE EACH CASCADE + ); + + + Consequently, an UPDATE of + the driver_id column will be propagated + to any element of the final_positions + field in the races table. + + + + Even though the most common use case for foreign key arrays + is on a single column key, you can define an each foreign + key constraint on a group of columns. As the following + contrived example shows, it needs to be written in table constraint form: + + CREATE TABLE t1 ( + a integer PRIMARY KEY, + b integer, + c integer[], + FOREIGN KEY (b, EACH c) REFERENCES other_table (c1, c2) + ); + + + On top of standard foreign key requirements, EACH + constraints require that the referencing column is an array + of a compatible type of the corresponding referenced column. + Current implementation forbids EACH CASCADE + and EACH SET NULL actions on multi-column + each foreign key constraints. + + + + For more detailed information on foreign key arrays + options and special cases, please refer to the documentation + for and + . + + + + Exclusion Constraints *** a/doc/src/sgml/func.sgml --- b/doc/src/sgml/func.sgml *************** *** 10318,10323 **** SELECT NULLIF(value, '(none)') ... --- 10318,10329 ---- array_prepend + array_remove + + + array_replace + + array_to_string *************** *** 10436,10441 **** SELECT NULLIF(value, '(none)') ... --- 10442,10470 ---- + array_remove(anyarray, anyelement) + + + anyarray + remove each element from the array that is equal to a given value + (currently works with one dimensional arrays only) + array_remove(ARRAY[1,2,2,3], 2) + {1,3} + + + + + array_replace(anyarray, anyelement, anyelement) + + + anyarray + replaces each element in the array that is equal to a given value with a new value + array_replace(ARRAY[1,2,5,4], 5, 3) + {1,2,3,4} + + + + array_to_string(anyarray, text , text) *************** *** 10513,10518 **** SELECT NULLIF(value, '(none)') ... --- 10542,10552 ---- + When using array_remove with multi-dimensional + arrays, elements will be set to NULL as fallback measure. + + + See also about the aggregate function array_agg for use with arrays. *** a/doc/src/sgml/ref/create_table.sgml --- b/doc/src/sgml/ref/create_table.sgml *************** *** 51,57 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI DEFAULT default_expr | UNIQUE index_parameters | PRIMARY KEY index_parameters | ! REFERENCES reftable [ ( refcolumn ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] } [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] --- 51,57 ---- DEFAULT default_expr | UNIQUE index_parameters | PRIMARY KEY index_parameters | ! [EACH] REFERENCES reftable [ ( refcolumn ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] } [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] *************** *** 62,68 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI UNIQUE ( column_name [, ... ] ) index_parameters | PRIMARY KEY ( column_name [, ... ] ) index_parameters | EXCLUDE [ USING index_method ] ( exclude_element WITH operator [, ... ] ) index_parameters [ WHERE ( predicate ) ] | ! FOREIGN KEY ( column_name [, ... ] ) REFERENCES reftable [ ( refcolumn [, ... ] ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] } [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] --- 62,68 ---- UNIQUE ( column_name [, ... ] ) index_parameters | PRIMARY KEY ( column_name [, ... ] ) index_parameters | EXCLUDE [ USING index_method ] ( exclude_element WITH operator [, ... ] ) index_parameters [ WHERE ( predicate ) ] | ! FOREIGN KEY ( [EACH] column_name [, ... ] ) REFERENCES reftable [ ( refcolumn [, ... ] ) ] [ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE action ] [ ON UPDATE action ] } [ DEFERRABLE | NOT DEFERRABLE ] [ INITIALLY DEFERRED | INITIALLY IMMEDIATE ] *************** *** 563,572 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI ! REFERENCES reftable [ ( refcolumn ) ] [ MATCH matchtype ] [ ON DELETE action ] [ ON UPDATE action ] (column constraint) ! FOREIGN KEY ( column [, ... ] ) REFERENCES reftable [ ( refcolumn [, ... ] ) ] [ MATCH matchtype ] [ ON DELETE action ] --- 563,573 ---- ! REFERENCES reftable [ ( refcolumn ) ] [ MATCH matchtype ] [ ON DELETE action ] [ ON UPDATE action ] (column constraint) ! ! FOREIGN KEY ( [EACH] column [, ... ] ) REFERENCES reftable [ ( refcolumn [, ... ] ) ] [ MATCH matchtype ] [ ON DELETE action ] *************** *** 588,593 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI --- 589,606 ---- + In case the column name column + is prepended with the EACH keyword and column is an array of elements compatible + with the corresponding refcolumn + in reftable, an + each foreign key constraint is put in place (see for more information). + Multi-column keys with one or more EACH definitions + are allowed (with some limitations in referential actions as described further down). + + + A value inserted into the referencing column(s) is matched against the values of the referenced table and referenced columns using the given match type. There are three match types: MATCH *************** *** 647,652 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI --- 660,667 ---- Delete any rows referencing the deleted row, or update the value of the referencing column to the new value of the referenced column, respectively. + With each foreign key constraints, it is limited + to the ON DELETE action only. *************** *** 668,673 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI --- 683,716 ---- + + + EACH CASCADE + + + EACH CASCADE can be used only with + each foreign key constraints. + Delete any element in the array that is referencing + the deleted row, or update the value of every + referencing element in the array to the new value of the + referenced column, respectively. + Current implementation is limited to single-column foreign keys. + + + + + + EACH SET NULL + + + EACH SET NULL can be used only with + each foreign key constraints. + Set any referencing element in the array to null. + Current implementation is limited to single-column foreign keys. + + + + *************** *** 680,685 **** CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI --- 723,835 ---- + + EACH REFERENCES reftable [ ( refcolumn ) ] [ MATCH matchtype ] [ ON DELETE action ] [ ON UPDATE action ] (column constraint) + + + + The EACH REFERENCES definition specifies + a foreign key array, a special kind of foreign key + constraint (also known as each foreign key constraint). + It requires that the referencing column is an array of elements + of the same type (or a compatible one) as the referenced column + in the referenced table. The value of each element of the + refcolumn array + will be matched against some row of reftable. + + + + Foreign key arrays are an extension of PostgreSQL + and are not included in the SQL standard. + + + + Even with foreign key arrays, modifications in the referenced + column can trigger actions to be performed on the referencing array. + Similarly to standard foreign keys, you can specify these + actions using the ON DELETE and + ON UPDATE clauses. + There are the following possible actions for each clause: + + + + NO ACTION + + + Same as standard foreign key constraints. This is the default action. + + + + + + RESTRICT + + + Same as standard foreign key constraints. + + + + + + CASCADE + + + Can be used with ON DELETE only. + Delete any rows referencing any of the deleted rows. + + + + + + SET NULL + + + Set the referencing array column to null. + + + + + + SET DEFAULT + + + Set the referencing array column to the default value. + + + + + + EACH CASCADE + + + Delete any element in the array that is referencing + the deleted row, or update the value of every + referencing element in the array to the new value of the + referenced column, respectively. + In case the referencing array is multi-dimensional, + action on delete is automatically converted to + EACH SET NULL (it is recommended + that you directly use this action if you plan to use + multi-dimensional referencing arrays). + + + + + + EACH SET NULL + + + Set any referencing element in the array to null. + + + + + + + + + DEFERRABLE NOT DEFERRABLE *************** *** 1426,1431 **** CREATE TABLE employees OF employee_type ( --- 1576,1592 ---- effect can be had using the OID feature. + + + Foreign Key Arrays + + + Foreign key arrays (also known as each foreign key constraints), + the EACH REFERENCES, EACH CASCADE + and EACH SET NULL clauses are a PostgreSQL + extension and not in the standard. + + *** a/src/backend/catalog/heap.c --- b/src/backend/catalog/heap.c *************** *** 1935,1940 **** StoreRelCheck(Relation rel, char *ccname, Node *expr, --- 1935,1942 ---- NULL, NULL, 0, + false, + NULL, ' ', ' ', ' ', *** a/src/backend/catalog/index.c --- b/src/backend/catalog/index.c *************** *** 1147,1152 **** index_constraint_create(Relation heapRelation, --- 1147,1154 ---- NULL, NULL, 0, + false, + NULL, ' ', ' ', ' ', *** a/src/backend/catalog/information_schema.sql --- b/src/backend/catalog/information_schema.sql *************** *** 1165,1171 **** CREATE VIEW referential_constraints AS --- 1165,1173 ---- CAST( CASE con.confupdtype WHEN 'c' THEN 'CASCADE' + WHEN 'C' THEN 'EACH CASCADE' WHEN 'n' THEN 'SET NULL' + WHEN 'N' THEN 'EACH SET NULL' WHEN 'd' THEN 'SET DEFAULT' WHEN 'r' THEN 'RESTRICT' WHEN 'a' THEN 'NO ACTION' END *************** *** 1173,1183 **** CREATE VIEW referential_constraints AS CAST( CASE con.confdeltype WHEN 'c' THEN 'CASCADE' WHEN 'n' THEN 'SET NULL' WHEN 'd' THEN 'SET DEFAULT' WHEN 'r' THEN 'RESTRICT' WHEN 'a' THEN 'NO ACTION' END ! AS character_data) AS delete_rule FROM (pg_namespace ncon INNER JOIN pg_constraint con ON ncon.oid = con.connamespace --- 1175,1189 ---- CAST( CASE con.confdeltype WHEN 'c' THEN 'CASCADE' + WHEN 'C' THEN 'EACH CASCADE' WHEN 'n' THEN 'SET NULL' + WHEN 'N' THEN 'EACH SET NULL' WHEN 'd' THEN 'SET DEFAULT' WHEN 'r' THEN 'RESTRICT' WHEN 'a' THEN 'NO ACTION' END ! AS character_data) AS delete_rule, ! ! con.confiseach AS is_each FROM (pg_namespace ncon INNER JOIN pg_constraint con ON ncon.oid = con.connamespace *** a/src/backend/catalog/pg_constraint.c --- b/src/backend/catalog/pg_constraint.c *************** *** 58,63 **** CreateConstraintEntry(const char *constraintName, --- 58,65 ---- const Oid *ppEqOp, const Oid *ffEqOp, int foreignNKeys, + bool confisEach, + const bool *foreignEach, char foreignUpdateType, char foreignDeleteType, char foreignMatchType, *************** *** 76,81 **** CreateConstraintEntry(const char *constraintName, --- 78,84 ---- Datum values[Natts_pg_constraint]; ArrayType *conkeyArray; ArrayType *confkeyArray; + ArrayType *confeachArray; ArrayType *conpfeqopArray; ArrayType *conppeqopArray; ArrayType *conffeqopArray; *************** *** 126,135 **** CreateConstraintEntry(const char *constraintName, --- 129,144 ---- fkdatums[i] = ObjectIdGetDatum(ffEqOp[i]); conffeqopArray = construct_array(fkdatums, foreignNKeys, OIDOID, sizeof(Oid), true, 'i'); + for (i = 0; i < foreignNKeys; i++) { + fkdatums[i] = BoolGetDatum(foreignEach[i]); + } + confeachArray = construct_array(fkdatums, foreignNKeys, + BOOLOID, 1, true, 'c'); } else { confkeyArray = NULL; + confeachArray = NULL; conpfeqopArray = NULL; conppeqopArray = NULL; conffeqopArray = NULL; *************** *** 171,176 **** CreateConstraintEntry(const char *constraintName, --- 180,186 ---- values[Anum_pg_constraint_conislocal - 1] = BoolGetDatum(conIsLocal); values[Anum_pg_constraint_coninhcount - 1] = Int32GetDatum(conInhCount); values[Anum_pg_constraint_conisonly - 1] = BoolGetDatum(conIsOnly); + values[Anum_pg_constraint_coniseach - 1] = BoolGetDatum(confisEach); if (conkeyArray) values[Anum_pg_constraint_conkey - 1] = PointerGetDatum(conkeyArray); *************** *** 182,187 **** CreateConstraintEntry(const char *constraintName, --- 192,202 ---- else nulls[Anum_pg_constraint_confkey - 1] = true; + if (confeachArray) + values[Anum_pg_constraint_confeach - 1] = PointerGetDatum(confeachArray); + else + nulls[Anum_pg_constraint_confeach - 1] = true; + if (conpfeqopArray) values[Anum_pg_constraint_conpfeqop - 1] = PointerGetDatum(conpfeqopArray); else *** a/src/backend/commands/tablecmds.c --- b/src/backend/commands/tablecmds.c *************** *** 35,40 **** --- 35,41 ---- #include "catalog/pg_inherits_fn.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" + #include "catalog/pg_operator.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" *************** *** 5609,5614 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, --- 5610,5616 ---- Relation pkrel; int16 pkattnum[INDEX_MAX_KEYS]; int16 fkattnum[INDEX_MAX_KEYS]; + bool fkatteach[INDEX_MAX_KEYS]; Oid pktypoid[INDEX_MAX_KEYS]; Oid fktypoid[INDEX_MAX_KEYS]; Oid opclasses[INDEX_MAX_KEYS]; *************** *** 5683,5688 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, --- 5685,5691 ---- */ MemSet(pkattnum, 0, sizeof(pkattnum)); MemSet(fkattnum, 0, sizeof(fkattnum)); + MemSet(fkatteach, 0, sizeof(fkatteach)); MemSet(pktypoid, 0, sizeof(pktypoid)); MemSet(fktypoid, 0, sizeof(fktypoid)); MemSet(opclasses, 0, sizeof(opclasses)); *************** *** 5695,5700 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, --- 5698,5723 ---- fkattnum, fktypoid); /* + * If is an EACH FK, decode the content of the fk_each_attrs array. + * Each node must contain either "t" or "f" and is translated to the + * corresponding boolean value. + */ + if (fkconstraint->fk_is_each) + { + ListCell *l; + int attnum; + + attnum = 0; + foreach(l, fkconstraint->fk_each_attrs) + { + char *value = strVal(lfirst(l)); + fkatteach[attnum] = strcmp("t", value) == 0; + attnum++; + } + + } + + /* * If the attribute list for the referenced table was omitted, lookup the * definition of the primary key and use it. Otherwise, validate the * supplied attribute list. In either case, discover the index OID and *************** *** 5736,5741 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, --- 5759,5807 ---- (errcode(ERRCODE_INVALID_FOREIGN_KEY), errmsg("number of referencing and referenced columns for foreign key disagree"))); + /* Enforce each foreign key restrictions */ + if (fkconstraint->fk_is_each) + { + /* + * ON UPDATE CASCADE action is not supported on EACH foreign keys + */ + if (fkconstraint->fk_upd_action == FKCONSTR_ACTION_CASCADE) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FOREIGN_KEY), + errmsg("ON UPDATE CASCADE action is not supported on " + "EACH foreign keys"), + errhint("Use ON UPDATE EACH CASCADE, instead"))); + + /* + * EACH CASCADE and EACH SET NULL actions are only available + * in single-column EACH foreign keys + */ + if (numpks > 1 && + (fkconstraint->fk_upd_action == FKCONSTR_ACTION_ARRCASCADE + || fkconstraint->fk_upd_action == FKCONSTR_ACTION_ARRSETNULL + || fkconstraint->fk_del_action == FKCONSTR_ACTION_ARRCASCADE + || fkconstraint->fk_del_action == FKCONSTR_ACTION_ARRSETNULL)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FOREIGN_KEY), + errmsg("EACH CASCADE and EACH SET NULL actions are only " + "available on single-column EACH foreign keys"))); + } + else + { + /* + * EACH CASCADE and EACH SET NULL actions are only available + * in EACH foreign keys + */ + if (fkconstraint->fk_upd_action == FKCONSTR_ACTION_ARRCASCADE + || fkconstraint->fk_upd_action == FKCONSTR_ACTION_ARRSETNULL + || fkconstraint->fk_del_action == FKCONSTR_ACTION_ARRCASCADE + || fkconstraint->fk_del_action == FKCONSTR_ACTION_ARRSETNULL) + ereport(ERROR, + (errcode(ERRCODE_INVALID_FOREIGN_KEY), + errmsg("EACH CASCADE and EACH SET NULL actions are only " + "available on EACH foreign keys"))); + } + for (i = 0; i < numpks; i++) { Oid pktype = pktypoid[i]; *************** *** 5783,5801 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", eqstrategy, opcintype, opcintype, opfamily); ! /* ! * Are there equality operators that take exactly the FK type? Assume ! * we should look through any domain here. ! */ ! fktyped = getBaseType(fktype); ! pfeqop = get_opfamily_member(opfamily, opcintype, fktyped, ! eqstrategy); ! if (OidIsValid(pfeqop)) ! ffeqop = get_opfamily_member(opfamily, fktyped, fktyped, eqstrategy); else ! ffeqop = InvalidOid; /* keep compiler quiet */ if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) { --- 5849,5894 ---- elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", eqstrategy, opcintype, opcintype, opfamily); ! if (fkatteach[i]) ! { ! /* ! * For every EACH FK, look if an equality operator that takes ! * exactly the FK element type exists. Assume we should look ! * through any domain here. ! */ ! fktyped=get_base_element_type(fktype); ! if (!OidIsValid(fktyped)) ! ereport(ERROR, ! (errcode(ERRCODE_DATATYPE_MISMATCH), ! errmsg("foreign key constraint \"%s\" " ! "cannot be implemented", ! fkconstraint->conname), ! errdetail("Type of key column \"%s\" " ! "is not an array type: %s", ! strVal(list_nth(fkconstraint->fk_attrs, i)), ! format_type_be(fktype)))); ! ffeqop = ARRAY_EQ_OP; ! ! pfeqop = get_opfamily_member(opfamily, opcintype, fktyped, eqstrategy); + } else ! { ! /* ! * Are there equality operators that take exactly the FK type? Assume ! * we should look through any domain here. ! */ ! fktyped = getBaseType(fktype); ! ! pfeqop = get_opfamily_member(opfamily, opcintype, fktyped, ! eqstrategy); ! if (OidIsValid(pfeqop)) ! ffeqop = get_opfamily_member(opfamily, fktyped, fktyped, ! eqstrategy); ! else ! ffeqop = InvalidOid; /* keep compiler quiet */ ! } if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) { *************** *** 5812,5823 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, Oid target_typeids[2]; input_typeids[0] = pktype; ! input_typeids[1] = fktype; target_typeids[0] = opcintype; target_typeids[1] = opcintype; if (can_coerce_type(2, input_typeids, target_typeids, COERCION_IMPLICIT)) ! pfeqop = ffeqop = ppeqop; } if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) --- 5905,5925 ---- Oid target_typeids[2]; input_typeids[0] = pktype; ! input_typeids[1] = fktyped; target_typeids[0] = opcintype; target_typeids[1] = opcintype; if (can_coerce_type(2, input_typeids, target_typeids, COERCION_IMPLICIT)) ! { ! pfeqop = ppeqop; ! ! /* ! * In caso of an EACH FK the ffeqop must be left untouched ! * otherwise use the primary equality operator. ! */ ! if (!fkatteach[i]) ! ffeqop = ppeqop; ! } } if (!(OidIsValid(pfeqop) && OidIsValid(ffeqop))) *************** *** 5859,5864 **** ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel, --- 5961,5968 ---- ppeqoperators, ffeqoperators, numpks, + fkconstraint->fk_is_each, + fkatteach, fkconstraint->fk_upd_action, fkconstraint->fk_del_action, fkconstraint->fk_matchtype, *************** *** 6606,6616 **** createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, --- 6710,6730 ---- fk_trigger->initdeferred = false; fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_del"); break; + case FKCONSTR_ACTION_ARRCASCADE: + fk_trigger->deferrable = false; + fk_trigger->initdeferred = false; + fk_trigger->funcname = SystemFuncName("RI_FKey_eachcascade_del"); + break; case FKCONSTR_ACTION_SETNULL: fk_trigger->deferrable = false; fk_trigger->initdeferred = false; fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_del"); break; + case FKCONSTR_ACTION_ARRSETNULL: + fk_trigger->deferrable = false; + fk_trigger->initdeferred = false; + fk_trigger->funcname = SystemFuncName("RI_FKey_eachsetnull_del"); + break; case FKCONSTR_ACTION_SETDEFAULT: fk_trigger->deferrable = false; fk_trigger->initdeferred = false; *************** *** 6659,6669 **** createForeignKeyTriggers(Relation rel, Constraint *fkconstraint, --- 6773,6793 ---- fk_trigger->initdeferred = false; fk_trigger->funcname = SystemFuncName("RI_FKey_cascade_upd"); break; + case FKCONSTR_ACTION_ARRCASCADE: + fk_trigger->deferrable = false; + fk_trigger->initdeferred = false; + fk_trigger->funcname = SystemFuncName("RI_FKey_eachcascade_upd"); + break; case FKCONSTR_ACTION_SETNULL: fk_trigger->deferrable = false; fk_trigger->initdeferred = false; fk_trigger->funcname = SystemFuncName("RI_FKey_setnull_upd"); break; + case FKCONSTR_ACTION_ARRSETNULL: + fk_trigger->deferrable = false; + fk_trigger->initdeferred = false; + fk_trigger->funcname = SystemFuncName("RI_FKey_eachsetnull_upd"); + break; case FKCONSTR_ACTION_SETDEFAULT: fk_trigger->deferrable = false; fk_trigger->initdeferred = false; *** a/src/backend/commands/trigger.c --- b/src/backend/commands/trigger.c *************** *** 443,448 **** CreateTrigger(CreateTrigStmt *stmt, const char *queryString, --- 443,450 ---- NULL, NULL, 0, + false, + NULL, ' ', ' ', ' ', *************** *** 863,878 **** ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid) --- 865,884 ---- switch (funcoid) { case F_RI_FKEY_CASCADE_UPD: + case F_RI_FKEY_EACHCASCADE_UPD: case F_RI_FKEY_RESTRICT_UPD: case F_RI_FKEY_SETNULL_UPD: + case F_RI_FKEY_EACHSETNULL_UPD: case F_RI_FKEY_SETDEFAULT_UPD: case F_RI_FKEY_NOACTION_UPD: funcnum = 0; break; case F_RI_FKEY_CASCADE_DEL: + case F_RI_FKEY_EACHCASCADE_DEL: case F_RI_FKEY_RESTRICT_DEL: case F_RI_FKEY_SETNULL_DEL: + case F_RI_FKEY_EACHSETNULL_DEL: case F_RI_FKEY_SETDEFAULT_DEL: case F_RI_FKEY_NOACTION_DEL: funcnum = 1; *************** *** 977,988 **** ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid) --- 983,1000 ---- case F_RI_FKEY_CASCADE_UPD: fkcon->fk_upd_action = FKCONSTR_ACTION_CASCADE; break; + case F_RI_FKEY_EACHCASCADE_UPD: + fkcon->fk_upd_action = FKCONSTR_ACTION_ARRCASCADE; + break; case F_RI_FKEY_RESTRICT_UPD: fkcon->fk_upd_action = FKCONSTR_ACTION_RESTRICT; break; case F_RI_FKEY_SETNULL_UPD: fkcon->fk_upd_action = FKCONSTR_ACTION_SETNULL; break; + case F_RI_FKEY_EACHSETNULL_UPD: + fkcon->fk_upd_action = FKCONSTR_ACTION_ARRSETNULL; + break; case F_RI_FKEY_SETDEFAULT_UPD: fkcon->fk_upd_action = FKCONSTR_ACTION_SETDEFAULT; break; *************** *** 998,1009 **** ConvertTriggerToFK(CreateTrigStmt *stmt, Oid funcoid) --- 1010,1027 ---- case F_RI_FKEY_CASCADE_DEL: fkcon->fk_del_action = FKCONSTR_ACTION_CASCADE; break; + case F_RI_FKEY_EACHCASCADE_DEL: + fkcon->fk_del_action = FKCONSTR_ACTION_ARRCASCADE; + break; case F_RI_FKEY_RESTRICT_DEL: fkcon->fk_del_action = FKCONSTR_ACTION_RESTRICT; break; case F_RI_FKEY_SETNULL_DEL: fkcon->fk_del_action = FKCONSTR_ACTION_SETNULL; break; + case F_RI_FKEY_EACHSETNULL_DEL: + fkcon->fk_del_action = FKCONSTR_ACTION_ARRSETNULL; + break; case F_RI_FKEY_SETDEFAULT_DEL: fkcon->fk_del_action = FKCONSTR_ACTION_SETDEFAULT; break; *** a/src/backend/commands/typecmds.c --- b/src/backend/commands/typecmds.c *************** *** 2948,2953 **** domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid, --- 2948,2955 ---- NULL, NULL, 0, + false, + NULL, ' ', ' ', ' ', *** a/src/backend/nodes/copyfuncs.c --- b/src/backend/nodes/copyfuncs.c *************** *** 2364,2369 **** _copyConstraint(const Constraint *from) --- 2364,2371 ---- COPY_SCALAR_FIELD(fk_matchtype); COPY_SCALAR_FIELD(fk_upd_action); COPY_SCALAR_FIELD(fk_del_action); + COPY_SCALAR_FIELD(fk_is_each); + COPY_NODE_FIELD(fk_each_attrs); COPY_SCALAR_FIELD(skip_validation); COPY_SCALAR_FIELD(initially_valid); *** a/src/backend/nodes/equalfuncs.c --- b/src/backend/nodes/equalfuncs.c *************** *** 2199,2204 **** _equalConstraint(const Constraint *a, const Constraint *b) --- 2199,2206 ---- COMPARE_SCALAR_FIELD(fk_matchtype); COMPARE_SCALAR_FIELD(fk_upd_action); COMPARE_SCALAR_FIELD(fk_del_action); + COMPARE_SCALAR_FIELD(fk_is_each); + COMPARE_NODE_FIELD(fk_each_attrs); COMPARE_SCALAR_FIELD(skip_validation); COMPARE_SCALAR_FIELD(initially_valid); *** a/src/backend/nodes/outfuncs.c --- b/src/backend/nodes/outfuncs.c *************** *** 2626,2631 **** _outConstraint(StringInfo str, const Constraint *node) --- 2626,2633 ---- WRITE_CHAR_FIELD(fk_matchtype); WRITE_CHAR_FIELD(fk_upd_action); WRITE_CHAR_FIELD(fk_del_action); + WRITE_BOOL_FIELD(fk_is_each); + WRITE_NODE_FIELD(fk_each_attrs); WRITE_BOOL_FIELD(skip_validation); WRITE_BOOL_FIELD(initially_valid); break; *** a/src/backend/parser/gram.y --- b/src/backend/parser/gram.y *************** *** 320,326 **** static void processCASbits(int cas_bits, int location, const char *constrType, execute_param_clause using_clause returning_clause opt_enum_val_list enum_val_list table_func_column_list create_generic_options alter_generic_options ! relation_expr_list dostmt_opt_list %type opt_fdw_options fdw_options %type fdw_option --- 320,326 ---- execute_param_clause using_clause returning_clause opt_enum_val_list enum_val_list table_func_column_list create_generic_options alter_generic_options ! relation_expr_list dostmt_opt_list foreignKeyColumnList %type opt_fdw_options fdw_options %type fdw_option *************** *** 379,385 **** static void processCASbits(int cas_bits, int location, const char *constrType, %type def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr func_expr AexprConst indirection_el columnref in_expr having_clause func_table array_expr ! ExclusionWhereClause %type ExclusionConstraintList ExclusionConstraintElem %type func_arg_list %type func_arg_expr --- 379,385 ---- %type def_arg columnElem where_clause where_or_current_clause a_expr b_expr c_expr func_expr AexprConst indirection_el columnref in_expr having_clause func_table array_expr ! ExclusionWhereClause foreignKeyColumnElem %type ExclusionConstraintList ExclusionConstraintElem %type func_arg_list %type func_arg_expr *************** *** 645,650 **** static void processCASbits(int cas_bits, int location, const char *constrType, --- 645,651 ---- %left JOIN CROSS LEFT FULL RIGHT INNER_P NATURAL /* kluge to keep xml_whitespace_option from causing shift/reduce conflicts */ %right PRESERVE STRIP_P + %nonassoc EACH %% *************** *** 2715,2720 **** ColConstraintElem: --- 2716,2738 ---- n->fk_matchtype = $4; n->fk_upd_action = (char) ($5 >> 8); n->fk_del_action = (char) ($5 & 0xFF); + n->fk_is_each = false; + n->skip_validation = false; + n->initially_valid = true; + $$ = (Node *)n; + } + | EACH REFERENCES qualified_name opt_column_list key_match key_actions + { + Constraint *n = makeNode(Constraint); + n->contype = CONSTR_FOREIGN; + n->location = @1; + n->pktable = $3; + n->fk_attrs = NIL; + n->pk_attrs = $4; + n->fk_matchtype = $5; + n->fk_upd_action = (char) ($6 >> 8); + n->fk_del_action = (char) ($6 & 0xFF); + n->fk_is_each = true; n->skip_validation = false; n->initially_valid = true; $$ = (Node *)n; *************** *** 2900,2917 **** ConstraintElem: yyscanner); $$ = (Node *)n; } ! | FOREIGN KEY '(' columnList ')' REFERENCES qualified_name opt_column_list key_match key_actions ConstraintAttributeSpec { Constraint *n = makeNode(Constraint); n->contype = CONSTR_FOREIGN; n->location = @1; n->pktable = $7; ! n->fk_attrs = $4; n->pk_attrs = $8; n->fk_matchtype = $9; n->fk_upd_action = (char) ($10 >> 8); n->fk_del_action = (char) ($10 & 0xFF); processCASbits($11, @11, "FOREIGN KEY", &n->deferrable, &n->initdeferred, &n->skip_validation, --- 2918,2959 ---- yyscanner); $$ = (Node *)n; } ! | FOREIGN KEY '(' foreignKeyColumnList ')' REFERENCES qualified_name opt_column_list key_match key_actions ConstraintAttributeSpec { Constraint *n = makeNode(Constraint); n->contype = CONSTR_FOREIGN; n->location = @1; n->pktable = $7; ! n->fk_attrs = $4; n->pk_attrs = $8; n->fk_matchtype = $9; n->fk_upd_action = (char) ($10 >> 8); n->fk_del_action = (char) ($10 & 0xFF); + + /* + * Split the content of foreignKeyColumnList + * in two separate list. One lis of fileds + * and one list of boolean values. + */ + { + ListCell *i; + + n->fk_attrs = NIL; + n->fk_is_each = false; + n->fk_each_attrs = NIL; + foreach (i, $4) + { + ForeignKeyColumnElem *elem = + (ForeignKeyColumnElem *)lfirst(i); + + n->fk_attrs = lappend(n->fk_attrs, elem->name); + n->fk_is_each |= elem->each; + n->fk_each_attrs = lappend(n->fk_each_attrs, + makeString(elem->each?"t":"f")); + } + } + processCASbits($11, @11, "FOREIGN KEY", &n->deferrable, &n->initdeferred, &n->skip_validation, *************** *** 2921,2926 **** ConstraintElem: --- 2963,2991 ---- } ; + + foreignKeyColumnList: + foreignKeyColumnElem { $$ = list_make1($1); } + | foreignKeyColumnList ',' foreignKeyColumnElem { $$ = lappend($1, $3); } + ; + + foreignKeyColumnElem: + EACH ColId + { + ForeignKeyColumnElem *n = makeNode(ForeignKeyColumnElem); + n->name = (Node *) makeString($2); + n->each = true; + $$ = (Node *) n; + } + | ColId + { + ForeignKeyColumnElem *n = makeNode(ForeignKeyColumnElem); + n->name = (Node *) makeString($1); + n->each = false; + $$ = (Node *) n; + } + ; + opt_column_list: '(' columnList ')' { $$ = $2; } | /*EMPTY*/ { $$ = NIL; } *************** *** 3012,3017 **** key_action: --- 3077,3084 ---- | CASCADE { $$ = FKCONSTR_ACTION_CASCADE; } | SET NULL_P { $$ = FKCONSTR_ACTION_SETNULL; } | SET DEFAULT { $$ = FKCONSTR_ACTION_SETDEFAULT; } + | EACH CASCADE { $$ = FKCONSTR_ACTION_ARRCASCADE; } + | EACH SET NULL_P { $$ = FKCONSTR_ACTION_ARRSETNULL; } ; OptInherit: INHERITS '(' qualified_name_list ')' { $$ = $3; } *** a/src/backend/parser/parse_utilcmd.c --- b/src/backend/parser/parse_utilcmd.c *************** *** 536,541 **** transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column) --- 536,543 ---- * list of FK constraints to be processed later. */ constraint->fk_attrs = list_make1(makeString(column->colname)); + constraint->fk_each_attrs = list_make1(makeString( + constraint->fk_is_each?"t":"f")); cxt->fkconstraints = lappend(cxt->fkconstraints, constraint); break; *** a/src/backend/utils/adt/arrayfuncs.c --- b/src/backend/utils/adt/arrayfuncs.c *************** *** 5174,5176 **** array_unnest(PG_FUNCTION_ARGS) --- 5174,5600 ---- SRF_RETURN_DONE(funcctx); } } + + /* + * Remove any occurrence of an element from an array + * + * If used on a multi-dimensional array it will raise an error. + * + */ + Datum + array_remove(PG_FUNCTION_ARGS) + { + ArrayType *v; + Datum old_value = PG_GETARG_DATUM(1); + bool old_value_isnull = PG_ARGISNULL(1); + Oid element_type; + ArrayType *result; + Datum *values; + bool *nulls; + Datum elt; + int ndim; + int *dim; + int nitems; + int nresult; + int i; + int32 nbytes = 0; + int32 dataoffset; + bool hasnulls; + int typlen; + bool typbyval; + char typalign; + char *s; + bits8 *bitmap; + int bitmask; + Oid collation = PG_GET_COLLATION(); + bool changed = false; + TypeCacheEntry *typentry; + FunctionCallInfoData locfcinfo; + + /* + * If the first argument is null + * return NULL + */ + if (PG_ARGISNULL(0)) + PG_RETURN_NULL(); + + v = PG_GETARG_ARRAYTYPE_P(0); + + ndim = ARR_NDIM(v); + + /* + * If used on a multi-dimensional array the matching elements + * will be replaced with NULLs as fallback. + */ + if (ndim > 1) { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("removing elements is not supported on multidimensional arrays"))); + } + + dim = ARR_DIMS(v); + element_type = ARR_ELEMTYPE(v); + nitems = ArrayGetNItems(ndim, dim); + + /* Check for empty array */ + if (nitems <= 0) + { + /* Return empty array */ + PG_RETURN_ARRAYTYPE_P(construct_empty_array(element_type)); + } + + /* + * We arrange to look up the equality function only once per series of + * calls, assuming the element type doesn't change underneath us. + */ + typentry = (TypeCacheEntry *) fcinfo->flinfo->fn_extra; + if (typentry == NULL || + typentry->type_id != element_type) + { + typentry = lookup_type_cache(element_type, + TYPECACHE_EQ_OPR_FINFO); + if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify an equality operator for type %s", + format_type_be(element_type)))); + fcinfo->flinfo->fn_extra = (void *) typentry; + } + typlen = typentry->typlen; + typbyval = typentry->typbyval; + typalign = typentry->typalign; + + /* + * apply the operator to each pair of array elements. + */ + InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2, + collation, NULL, NULL); + + /* Allocate temporary arrays for new values */ + values = (Datum *) palloc(nitems * sizeof(Datum)); + nulls = (bool *) palloc(nitems * sizeof(bool)); + + /* Loop over source data */ + s = ARR_DATA_PTR(v); + bitmap = ARR_NULLBITMAP(v); + bitmask = 1; + hasnulls = false; + nresult=0; + + for (i = 0; i < nitems; i++) + { + bool isNull; + bool oprresult; + bool skip; + + /* Get source element, checking for NULL */ + if (bitmap && (*bitmap & bitmask) == 0) + { + isNull = true; + skip = old_value_isnull; + } + else + { + elt = fetch_att(s, typbyval, typlen); + s = att_addlength_datum(s, typlen, elt); + s = (char *) att_align_nominal(s, typalign); + isNull = false; + + /* + * Apply the operator to the element pair + */ + locfcinfo.arg[0] = elt; + locfcinfo.arg[1] = old_value; + locfcinfo.argnull[0] = false; + locfcinfo.argnull[1] = false; + locfcinfo.isnull = false; + oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo)); + if (!oprresult) + { + values[nresult] = elt; + skip = false; + } + else + skip = true; + } + + if (!skip) + { + nulls[nresult] = isNull; + if (isNull) + hasnulls = true; + else + { + /* Update total result size */ + nbytes = att_addlength_datum(nbytes, typlen, values[nresult]); + nbytes = att_align_nominal(nbytes, typalign); + /* This should never overflow */ + Assert(AllocSizeIsValid(nbytes)); + } + nresult++; + } + else + changed = true; + + /* advance bitmap pointer if any */ + if (bitmap) + { + bitmask <<= 1; + if (bitmask == 0x100) + { + bitmap++; + bitmask = 1; + } + } + } + + /* + * If not changed just return the original array + */ + if (!changed) + { + pfree(values); + pfree(nulls); + PG_RETURN_ARRAYTYPE_P(v); + } + + /* Allocate and initialize the result array */ + if (hasnulls) + { + dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nresult); + nbytes += dataoffset; + } + else + { + dataoffset = 0; /* marker for no null bitmap */ + nbytes += ARR_OVERHEAD_NONULLS(ndim); + } + result = (ArrayType *) palloc0(nbytes); + SET_VARSIZE(result, nbytes); + result->ndim = ndim; + result->dataoffset = dataoffset; + result->elemtype = element_type; + memcpy(ARR_DIMS(result), ARR_DIMS(v), 2 * ndim * sizeof(int)); + + /* Adjust the final length */ + ARR_DIMS(result)[0] = nresult; + + CopyArrayEls(result, + values, nulls, nresult, + typlen, typbyval, typalign, + false); + + pfree(values); + pfree(nulls); + + PG_RETURN_ARRAYTYPE_P(result); + } + + /* + * Replace any occurrence of an element in an array + */ + Datum + array_replace(PG_FUNCTION_ARGS) + { + ArrayType *v; + Datum old_value = PG_GETARG_DATUM(1); + bool old_value_isnull = PG_ARGISNULL(1); + Datum new_value = PG_GETARG_DATUM(2); + bool new_value_isnull = PG_ARGISNULL(2); + Oid element_type; + ArrayType *result; + Datum *values; + bool *nulls; + Datum elt; + int *dim; + int ndim; + int nitems; + int i; + int32 nbytes = 0; + int32 dataoffset; + bool hasnulls; + int typlen; + bool typbyval; + char typalign; + char *s; + bits8 *bitmap; + int bitmask; + bool changed = false; + Oid collation = PG_GET_COLLATION(); + + TypeCacheEntry *typentry; + FunctionCallInfoData locfcinfo; + + /* + * If the first argument is null + * return NULL + */ + if (PG_ARGISNULL(0)) + PG_RETURN_NULL(); + + v = PG_GETARG_ARRAYTYPE_P(0); + + ndim = ARR_NDIM(v); + dim = ARR_DIMS(v); + element_type = ARR_ELEMTYPE(v); + nitems = ArrayGetNItems(ndim, dim); + + /* Check for empty array */ + if (nitems <= 0) + { + /* Return empty array */ + PG_RETURN_ARRAYTYPE_P(construct_empty_array(element_type)); + } + + /* + * We arrange to look up the equality function only once per series of + * calls, assuming the element type doesn't change underneath us. + */ + typentry = (TypeCacheEntry *) fcinfo->flinfo->fn_extra; + if (typentry == NULL || + typentry->type_id != element_type) + { + typentry = lookup_type_cache(element_type, + TYPECACHE_EQ_OPR_FINFO); + if (!OidIsValid(typentry->eq_opr_finfo.fn_oid)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("could not identify an equality operator for type %s", + format_type_be(element_type)))); + fcinfo->flinfo->fn_extra = (void *) typentry; + } + typlen = typentry->typlen; + typbyval = typentry->typbyval; + typalign = typentry->typalign; + + /* detoast new_value if necessary */ + if (typlen == -1 && !new_value_isnull) + new_value = PointerGetDatum(PG_DETOAST_DATUM(new_value)); + + /* + * apply the operator to each pair of array elements. + */ + InitFunctionCallInfoData(locfcinfo, &typentry->eq_opr_finfo, 2, + collation, NULL, NULL); + + /* Allocate temporary arrays for new values */ + values = (Datum *) palloc(nitems * sizeof(Datum)); + nulls = (bool *) palloc(nitems * sizeof(bool)); + + /* Loop over source data */ + s = ARR_DATA_PTR(v); + bitmap = ARR_NULLBITMAP(v); + bitmask = 1; + hasnulls = false; + + for (i = 0; i < nitems; i++) + { + bool isNull; + bool oprresult; + + /* Get source element, checking for NULL */ + if (bitmap && (*bitmap & bitmask) == 0) + { + if (old_value_isnull) + { + values[i] = new_value; + isNull = false; + changed = true; + } + else + isNull = true; + } + else + { + elt = fetch_att(s, typbyval, typlen); + s = att_addlength_datum(s, typlen, elt); + s = (char *) att_align_nominal(s, typalign); + isNull = false; + + /* + * Apply the operator to the element pair + */ + locfcinfo.arg[0] = elt; + locfcinfo.arg[1] = old_value; + locfcinfo.argnull[0] = false; + locfcinfo.argnull[1] = false; + locfcinfo.isnull = false; + oprresult = DatumGetBool(FunctionCallInvoke(&locfcinfo)); + if (!oprresult) + values[i] = elt; + else + { + changed = true; + if (!new_value_isnull) + values[i] = new_value; + else + isNull = true; + } + } + + nulls[i] = isNull; + if (isNull) + hasnulls = true; + else + { + /* Update total result size */ + nbytes = att_addlength_datum(nbytes, typlen, values[i]); + nbytes = att_align_nominal(nbytes, typalign); + /* check for overflow of total request */ + if (!AllocSizeIsValid(nbytes)) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("array size exceeds the maximum allowed (%d)", + (int) MaxAllocSize))); + } + + /* advance bitmap pointer if any */ + if (bitmap) + { + bitmask <<= 1; + if (bitmask == 0x100) + { + bitmap++; + bitmask = 1; + } + } + } + + /* + * If not changed just return the original array + */ + if (!changed) + { + pfree(values); + pfree(nulls); + PG_RETURN_ARRAYTYPE_P(v); + } + + /* Allocate and initialize the result array */ + if (hasnulls) + { + dataoffset = ARR_OVERHEAD_WITHNULLS(ndim, nitems); + nbytes += dataoffset; + } + else + { + dataoffset = 0; /* marker for no null bitmap */ + nbytes += ARR_OVERHEAD_NONULLS(ndim); + } + result = (ArrayType *) palloc0(nbytes); + SET_VARSIZE(result, nbytes); + result->ndim = ndim; + result->dataoffset = dataoffset; + result->elemtype = element_type; + memcpy(ARR_DIMS(result), ARR_DIMS(v), 2 * ndim * sizeof(int)); + + CopyArrayEls(result, + values, nulls, nitems, + typlen, typbyval, typalign, + false); + + pfree(values); + pfree(nulls); + + PG_RETURN_ARRAYTYPE_P(result); + } *** a/src/backend/utils/adt/ri_triggers.c --- b/src/backend/utils/adt/ri_triggers.c *************** *** 77,82 **** --- 77,86 ---- #define RI_PLAN_RESTRICT_UPD_CHECKREF 8 #define RI_PLAN_SETNULL_DEL_DOUPDATE 9 #define RI_PLAN_SETNULL_UPD_DOUPDATE 10 + #define RI_PLAN_EACHCASCADE_DEL_DOUPDATE 11 + #define RI_PLAN_EACHCASCADE_UPD_DOUPDATE 12 + #define RI_PLAN_EACHSETNULL_DEL_DOUPDATE 13 + #define RI_PLAN_EACHSETNULL_UPD_DOUPDATE 14 #define MAX_QUOTED_NAME_LEN (NAMEDATALEN*2+3) #define MAX_QUOTED_REL_NAME_LEN (MAX_QUOTED_NAME_LEN*2) *************** *** 106,117 **** typedef struct RI_ConstraintInfo --- 110,124 ---- NameData conname; /* name of the FK constraint */ Oid pk_relid; /* referenced relation */ Oid fk_relid; /* referencing relation */ + bool confiseach; /* is an EACH FK */ char confupdtype; /* foreign key's ON UPDATE action */ char confdeltype; /* foreign key's ON DELETE action */ char confmatchtype; /* foreign key's match type */ int nkeys; /* number of key columns */ int16 pk_attnums[RI_MAX_NUMKEYS]; /* attnums of referenced cols */ int16 fk_attnums[RI_MAX_NUMKEYS]; /* attnums of referencing cols */ + bool fk_each_atts[RI_MAX_NUMKEYS]; /* referencing cols is + * an EACH FK */ Oid pf_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = * FK) */ Oid pp_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (PK = *************** *** 194,200 **** static void ri_GenerateQual(StringInfo buf, const char *sep, const char *leftop, Oid leftoptype, Oid opoid, ! const char *rightop, Oid rightoptype); static void ri_add_cast_to(StringInfo buf, Oid typid); static void ri_GenerateQualCollation(StringInfo buf, Oid collation); static int ri_NullCheck(Relation rel, HeapTuple tup, --- 201,208 ---- const char *sep, const char *leftop, Oid leftoptype, Oid opoid, ! const char *rightop, Oid rightoptype, ! bool is_array); static void ri_add_cast_to(StringInfo buf, Oid typid); static void ri_GenerateQualCollation(StringInfo buf, Oid collation); static int ri_NullCheck(Relation rel, HeapTuple tup, *************** *** 455,460 **** RI_FKey_check(PG_FUNCTION_ARGS) --- 463,469 ---- if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) { StringInfoData querybuf; + StringInfoData countbuf; char pkrelname[MAX_QUOTED_REL_NAME_LEN]; char attname[MAX_QUOTED_NAME_LEN]; char paramname[16]; *************** *** 466,477 **** RI_FKey_check(PG_FUNCTION_ARGS) --- 475,498 ---- * SELECT 1 FROM ONLY WHERE pkatt1 = $1 [AND ...] FOR SHARE * The type id's for the $ parameters are those of the * corresponding FK attributes. + * + * In case of an EACH foreign key, the previous query is used to count + * the number of matching rows and see if every combination is + * actually referenced. + * The wrapping query is + * SELECT 1 WHERE 1 * + * (SELECT count(DISTINCT y) FROM UNNEST($1) y WHERE y IS NOT NULL) + * [ * ...] = (SELECT count(*) FROM () z) * ---------- */ initStringInfo(&querybuf); quoteRelationName(pkrelname, pk_rel); appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x", pkrelname); querysep = "WHERE"; + if (riinfo.confiseach) { + initStringInfo(&countbuf); + appendStringInfo(&countbuf, "SELECT 1 WHERE 1"); + } for (i = 0; i < riinfo.nkeys; i++) { Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); *************** *** 480,497 **** RI_FKey_check(PG_FUNCTION_ARGS) quoteOneName(attname, RIAttName(pk_rel, riinfo.pk_attnums[i])); sprintf(paramname, "$%d", i + 1); ri_GenerateQual(&querybuf, querysep, attname, pk_type, riinfo.pf_eq_oprs[i], ! paramname, fk_type); querysep = "AND"; queryoids[i] = fk_type; } appendStringInfo(&querybuf, " FOR SHARE OF x"); ! /* Prepare and save the plan */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); } /* --- 501,539 ---- quoteOneName(attname, RIAttName(pk_rel, riinfo.pk_attnums[i])); sprintf(paramname, "$%d", i + 1); + /* + * In case of an EACH foreign key, we check that every + * DISTINCT NOT NULL value in the array is present in the PK table. + */ + if (riinfo.fk_each_atts[i]) + { + appendStringInfo(&countbuf, + " * (SELECT count(DISTINCT y) " + "FROM UNNEST(%s) y WHERE y IS NOT NULL)" + , paramname); + } ri_GenerateQual(&querybuf, querysep, attname, pk_type, riinfo.pf_eq_oprs[i], ! paramname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = fk_type; } appendStringInfo(&querybuf, " FOR SHARE OF x"); ! if (riinfo.confiseach) { ! appendStringInfo(&countbuf, " = " ! "(SELECT count(*) FROM (%s) z)", querybuf.data); ! ! /* Prepare and save the plan for each foreign keys */ ! qplan = ri_PlanCheck(countbuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); ! } ! else ! /* Prepare and save the plan for standard foreign keys */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); } /* *************** *** 644,650 **** ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel, ri_GenerateQual(&querybuf, querysep, attname, pk_type, riinfo->pp_eq_oprs[i], ! paramname, pk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 686,693 ---- ri_GenerateQual(&querybuf, querysep, attname, pk_type, riinfo->pp_eq_oprs[i], ! paramname, pk_type, ! false); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 801,807 **** RI_FKey_noaction_del(PG_FUNCTION_ARGS) ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 844,851 ---- ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 989,995 **** RI_FKey_noaction_upd(PG_FUNCTION_ARGS) ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 1033,1040 ---- ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 1151,1157 **** RI_FKey_cascade_del(PG_FUNCTION_ARGS) ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 1196,1203 ---- ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 1337,1343 **** RI_FKey_cascade_upd(PG_FUNCTION_ARGS) ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; --- 1383,1390 ---- ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! false); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; *************** *** 1510,1516 **** RI_FKey_restrict_del(PG_FUNCTION_ARGS) ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 1557,1564 ---- ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 1693,1699 **** RI_FKey_restrict_upd(PG_FUNCTION_ARGS) ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = "AND"; queryoids[i] = pk_type; } --- 1741,1748 ---- ri_GenerateQual(&querybuf, querysep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = "AND"; queryoids[i] = pk_type; } *************** *** 1863,1869 **** RI_FKey_setnull_del(PG_FUNCTION_ARGS) ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; --- 1912,1919 ---- ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; *************** *** 2076,2082 **** RI_FKey_setnull_upd(PG_FUNCTION_ARGS) ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); qualsep = "AND"; queryoids[i] = pk_type; } --- 2126,2133 ---- ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); qualsep = "AND"; queryoids[i] = pk_type; } *************** *** 2251,2257 **** RI_FKey_setdefault_del(PG_FUNCTION_ARGS) ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; --- 2302,2309 ---- ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); querysep = ","; qualsep = "AND"; queryoids[i] = pk_type; *************** *** 2455,2461 **** RI_FKey_setdefault_upd(PG_FUNCTION_ARGS) ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type); qualsep = "AND"; queryoids[i] = pk_type; } --- 2507,2514 ---- ri_GenerateQual(&qualbuf, qualsep, paramname, pk_type, riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); qualsep = "AND"; queryoids[i] = pk_type; } *************** *** 2512,2637 **** RI_FKey_setdefault_upd(PG_FUNCTION_ARGS) /* ---------- ! * RI_FKey_keyequal_upd_pk - * ! * Check if we have a key change on an update to a PK relation. This is ! * used by the AFTER trigger queue manager to see if it can skip queuing ! * an instance of an RI trigger. * ---------- */ ! bool ! RI_FKey_keyequal_upd_pk(Trigger *trigger, Relation pk_rel, ! HeapTuple old_row, HeapTuple new_row) { RI_ConstraintInfo riinfo; /* * Get arguments. */ ! ri_FetchConstraintInfo(&riinfo, trigger, pk_rel, true); /* * Nothing to do if no column names to compare given */ if (riinfo.nkeys == 0) ! return true; switch (riinfo.confmatchtype) { case FKCONSTR_MATCH_UNSPECIFIED: case FKCONSTR_MATCH_FULL: ! /* Return true if keys are equal */ ! return ri_KeysEqual(pk_rel, old_row, new_row, &riinfo, true); ! /* Handle MATCH PARTIAL set null delete. */ case FKCONSTR_MATCH_PARTIAL: ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("MATCH PARTIAL not yet implemented"))); ! break; } ! /* Never reached */ elog(ERROR, "invalid confmatchtype"); ! return false; } /* ---------- ! * RI_FKey_keyequal_upd_fk - * ! * Check if we have a key change on an update to an FK relation. This is ! * used by the AFTER trigger queue manager to see if it can skip queuing ! * an instance of an RI trigger. * ---------- */ ! bool ! RI_FKey_keyequal_upd_fk(Trigger *trigger, Relation fk_rel, ! HeapTuple old_row, HeapTuple new_row) { RI_ConstraintInfo riinfo; /* * Get arguments. */ ! ri_FetchConstraintInfo(&riinfo, trigger, fk_rel, false); /* * Nothing to do if no column names to compare given */ if (riinfo.nkeys == 0) ! return true; switch (riinfo.confmatchtype) { case FKCONSTR_MATCH_UNSPECIFIED: case FKCONSTR_MATCH_FULL: ! /* Return true if keys are equal */ ! return ri_KeysEqual(fk_rel, old_row, new_row, &riinfo, false); ! /* Handle MATCH PARTIAL set null delete. */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("MATCH PARTIAL not yet implemented"))); ! break; ! } ! /* Never reached */ ! elog(ERROR, "invalid confmatchtype"); ! return false; ! } ! /* ---------- ! * RI_Initial_Check - ! * ! * Check an entire table for non-matching values using a single query. ! * This is not a trigger procedure, but is called during ALTER TABLE ! * ADD FOREIGN KEY to validate the initial table contents. ! * ! * We expect that the caller has made provision to prevent any problems ! * caused by concurrent actions. This could be either by locking rel and ! * pkrel at ShareRowExclusiveLock or higher, or by otherwise ensuring ! * that triggers implementing the checks are already active. ! * Hence, we do not need to lock individual rows for the check. ! * ! * If the check fails because the current user doesn't have permissions ! * to read both tables, return false to let our caller know that they will ! * need to do something else to check the constraint. ! * ---------- ! */ ! bool ! RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) ! { ! RI_ConstraintInfo riinfo; ! const char *constrname = trigger->tgname; ! StringInfoData querybuf; ! char pkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; char pkattname[MAX_QUOTED_NAME_LEN + 3]; char fkattname[MAX_QUOTED_NAME_LEN + 3]; RangeTblEntry *pkrte; RangeTblEntry *fkrte; ! const char *sep; int i; int save_nestlevel; char workmembuf[32]; --- 2565,3486 ---- /* ---------- ! * RI_FKey_eachcascade_del - * ! * Cascaded delete of array elements in "each" foreign key ! * references at delete event on PK table. * ---------- */ ! Datum ! RI_FKey_eachcascade_del(PG_FUNCTION_ARGS) { + TriggerData *trigdata = (TriggerData *) fcinfo->context; RI_ConstraintInfo riinfo; + Relation fk_rel; + Relation pk_rel; + HeapTuple old_row; + RI_QueryKey qkey; + SPIPlanPtr qplan; + int i; + + /* + * Check that this is a valid trigger call on the right time and event. + */ + ri_CheckTrigger(fcinfo, "RI_FKey_eachcascade_del", RI_TRIGTYPE_DELETE); /* * Get arguments. */ ! ri_FetchConstraintInfo(&riinfo, ! trigdata->tg_trigger, trigdata->tg_relation, true); /* * Nothing to do if no column names to compare given */ if (riinfo.nkeys == 0) ! return PointerGetDatum(NULL); ! ! /* ! * Get the relation descriptors of the FK and PK tables and the old tuple. ! * ! * fk_rel is opened in RowExclusiveLock mode since that's what our ! * eventual UPDATE will get on it. ! */ ! fk_rel = heap_open(riinfo.fk_relid, RowExclusiveLock); ! pk_rel = trigdata->tg_relation; ! old_row = trigdata->tg_trigtuple; switch (riinfo.confmatchtype) { + /* ---------- + * SQL3 11.9 + * Gereral rules 6) a) i): + * MATCH or MATCH FULL + * ... ON DELETE CASCADE + * ---------- + */ case FKCONSTR_MATCH_UNSPECIFIED: case FKCONSTR_MATCH_FULL: ! ri_BuildQueryKeyFull(&qkey, &riinfo, ! RI_PLAN_EACHCASCADE_DEL_DOUPDATE); ! switch (ri_NullCheck(pk_rel, old_row, &qkey, RI_KEYPAIR_PK_IDX)) ! { ! case RI_KEYS_ALL_NULL: ! case RI_KEYS_SOME_NULL: ! ! /* ! * No update - MATCH FULL means there cannot be any ! * reference to old key if it contains NULL ! */ ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! ! case RI_KEYS_NONE_NULL: ! ! /* ! * Have a full qualified key - continue below ! */ ! break; ! } ! ! if (SPI_connect() != SPI_OK_CONNECT) ! elog(ERROR, "SPI_connect failed"); ! ! /* ! * Fetch or prepare a saved plan for the cascade delete operation ! */ ! if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) ! { ! StringInfoData querybuf; ! StringInfoData qualbuf; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char attname[MAX_QUOTED_NAME_LEN]; ! char paramname[16]; ! const char *querysep; ! const char *qualsep; ! Oid queryoids[RI_MAX_NUMKEYS]; ! ! /* ---------- ! * The query string built is ! * UPDATE ONLY SET fkatt1 = CASE array_ndims(fkatt1) ! * WHEN 1 THEN array_remove(fkatt1, $1) ! * ELSE array_replace(fkatt1, $1, NULL) END [, ...] ! * WHERE $1 = ANY(fkatt1) [AND ...] ! * The type id's for the $ parameters are those of the ! * corresponding PK attributes. ! * ---------- ! */ ! initStringInfo(&querybuf); ! initStringInfo(&qualbuf); ! quoteRelationName(fkrelname, fk_rel); ! appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname); ! querysep = ""; ! qualsep = "WHERE"; ! for (i = 0; i < riinfo.nkeys; i++) ! { ! Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); ! Oid fk_type = RIAttType(fk_rel, riinfo.fk_attnums[i]); ! Oid fk_element_type = get_base_element_type(fk_type); ! ! quoteOneName(attname, ! RIAttName(fk_rel, riinfo.fk_attnums[i])); ! appendStringInfo(&querybuf, ! "%s %s = CASE array_ndims(%s) " ! "WHEN 1 THEN array_remove(%s, $%d", ! querysep, attname, attname, attname, i + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf, ! ") ELSE array_replace(%s, $%d", ! attname, i + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf, ! ", NULL"); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf, ! ") END"); ! sprintf(paramname, "$%d", i + 1); ! ri_GenerateQual(&qualbuf, qualsep, ! paramname, pk_type, ! riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); ! querysep = ","; ! qualsep = "AND"; ! queryoids[i] = pk_type; ! } ! appendStringInfoString(&querybuf, qualbuf.data); ! ! /* Prepare and save the plan */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); ! } ! ! /* ! * We have a plan now. Run it to update the existing references. ! */ ! ri_PerformCheck(&qkey, qplan, ! fk_rel, pk_rel, ! old_row, NULL, ! true, /* must detect new rows */ ! SPI_OK_UPDATE, ! NameStr(riinfo.conname)); ! ! if (SPI_finish() != SPI_OK_FINISH) ! elog(ERROR, "SPI_finish failed"); ! ! heap_close(fk_rel, RowExclusiveLock); ! ! return PointerGetDatum(NULL); ! ! /* ! * Handle MATCH PARTIAL cascade update. ! */ case FKCONSTR_MATCH_PARTIAL: ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("MATCH PARTIAL not yet implemented"))); ! return PointerGetDatum(NULL); } ! /* ! * Never reached ! */ elog(ERROR, "invalid confmatchtype"); ! return PointerGetDatum(NULL); } + /* ---------- ! * RI_FKey_eachcascade_upd - * ! * Cascaded update of array elements in "each" foreign key ! * references at update event on PK table. * ---------- */ ! Datum ! RI_FKey_eachcascade_upd(PG_FUNCTION_ARGS) { + TriggerData *trigdata = (TriggerData *) fcinfo->context; RI_ConstraintInfo riinfo; + Relation fk_rel; + Relation pk_rel; + HeapTuple new_row; + HeapTuple old_row; + RI_QueryKey qkey; + SPIPlanPtr qplan; + int i; + int j; + + /* + * Check that this is a valid trigger call on the right time and event. + */ + ri_CheckTrigger(fcinfo, "RI_FKey_eachcascade_upd", RI_TRIGTYPE_UPDATE); /* * Get arguments. */ ! ri_FetchConstraintInfo(&riinfo, ! trigdata->tg_trigger, trigdata->tg_relation, true); /* * Nothing to do if no column names to compare given */ if (riinfo.nkeys == 0) ! return PointerGetDatum(NULL); ! ! /* ! * Get the relation descriptors of the FK and PK tables and the new and ! * old tuple. ! * ! * fk_rel is opened in RowExclusiveLock mode since that's what our ! * eventual UPDATE will get on it. ! */ ! fk_rel = heap_open(riinfo.fk_relid, RowExclusiveLock); ! pk_rel = trigdata->tg_relation; ! new_row = trigdata->tg_newtuple; ! old_row = trigdata->tg_trigtuple; switch (riinfo.confmatchtype) { + /* ---------- + * SQL3 11.9 + * Gereral rules 7) a) i): + * MATCH or MATCH FULL + * ... ON UPDATE CASCADE + * ---------- + */ case FKCONSTR_MATCH_UNSPECIFIED: case FKCONSTR_MATCH_FULL: ! ri_BuildQueryKeyFull(&qkey, &riinfo, ! RI_PLAN_EACHCASCADE_UPD_DOUPDATE); ! switch (ri_NullCheck(pk_rel, old_row, &qkey, RI_KEYPAIR_PK_IDX)) ! { ! case RI_KEYS_ALL_NULL: ! case RI_KEYS_SOME_NULL: ! /* ! * No update - MATCH FULL means there cannot be any ! * reference to old key if it contains NULL ! */ ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! case RI_KEYS_NONE_NULL: ! ! /* ! * Have a full qualified key - continue below ! */ ! break; ! } ! ! /* ! * No need to do anything if old and new keys are equal ! */ ! if (ri_KeysEqual(pk_rel, old_row, new_row, &riinfo, true)) ! { ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! } ! ! if (SPI_connect() != SPI_OK_CONNECT) ! elog(ERROR, "SPI_connect failed"); ! ! /* ! * Fetch or prepare a saved plan for the cascaded update of ! * foreign references ! */ ! if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) ! { ! StringInfoData querybuf; ! StringInfoData qualbuf; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char attname[MAX_QUOTED_NAME_LEN]; ! char paramname[16]; ! const char *querysep; ! const char *qualsep; ! Oid queryoids[RI_MAX_NUMKEYS * 2]; ! ! /* ---------- ! * The query string built is ! * UPDATE ONLY ! * SET fkatt1 = array_replace(fkatt1, $n, $1) [, ...] ! * WHERE $n = fkatt1 [AND ...] ! * The type id's for the $ parameters are those of the ! * corresponding PK attributes. ! * ---------- ! */ ! initStringInfo(&querybuf); ! initStringInfo(&qualbuf); ! quoteRelationName(fkrelname, fk_rel); ! appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname); ! querysep = ""; ! qualsep = "WHERE"; ! for (i = 0, j = riinfo.nkeys; i < riinfo.nkeys; i++, j++) ! { ! Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); ! Oid fk_type = RIAttType(fk_rel, riinfo.fk_attnums[i]); ! Oid fk_element_type = get_base_element_type(fk_type); ! ! quoteOneName(attname, ! RIAttName(fk_rel, riinfo.fk_attnums[i])); ! appendStringInfo(&querybuf, ! "%s %s = array_replace(%s, $%d", ! querysep, attname, attname, j + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf, ", $%d", i + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf,")"); ! sprintf(paramname, "$%d", j + 1); ! ri_GenerateQual(&qualbuf, qualsep, ! paramname, pk_type, ! riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); ! querysep = ","; ! qualsep = "AND"; ! queryoids[i] = pk_type; ! queryoids[j] = pk_type; ! } ! appendStringInfoString(&querybuf, qualbuf.data); ! ! /* Prepare and save the plan */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys * 2, queryoids, ! &qkey, fk_rel, pk_rel, true); ! } ! ! /* ! * We have a plan now. Run it to update the existing references. ! */ ! ri_PerformCheck(&qkey, qplan, ! fk_rel, pk_rel, ! old_row, new_row, ! true, /* must detect new rows */ ! SPI_OK_UPDATE, ! NameStr(riinfo.conname)); ! ! if (SPI_finish() != SPI_OK_FINISH) ! elog(ERROR, "SPI_finish failed"); ! ! heap_close(fk_rel, RowExclusiveLock); ! ! return PointerGetDatum(NULL); ! ! /* ! * Handle MATCH PARTIAL cascade update. ! */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("MATCH PARTIAL not yet implemented"))); ! return PointerGetDatum(NULL); ! } ! ! /* ! * Never reached ! */ ! elog(ERROR, "invalid confmatchtype"); ! return PointerGetDatum(NULL); ! } ! ! ! /* ---------- ! * RI_FKey_eachsetnull_del - ! * ! * Set foreign key array element references to NULL values ! * at delete event on PK table. ! * ---------- ! */ ! Datum ! RI_FKey_eachsetnull_del(PG_FUNCTION_ARGS) ! { ! TriggerData *trigdata = (TriggerData *) fcinfo->context; ! RI_ConstraintInfo riinfo; ! Relation fk_rel; ! Relation pk_rel; ! HeapTuple old_row; ! RI_QueryKey qkey; ! SPIPlanPtr qplan; ! int i; ! ! /* ! * Check that this is a valid trigger call on the right time and event. ! */ ! ri_CheckTrigger(fcinfo, "RI_FKey_eachsetnull_del", RI_TRIGTYPE_DELETE); ! ! /* ! * Get arguments. ! */ ! ri_FetchConstraintInfo(&riinfo, ! trigdata->tg_trigger, trigdata->tg_relation, true); ! ! /* ! * Nothing to do if no column names to compare given ! */ ! if (riinfo.nkeys == 0) ! return PointerGetDatum(NULL); ! ! /* ! * Get the relation descriptors of the FK and PK tables and the old tuple. ! * ! * fk_rel is opened in RowExclusiveLock mode since that's what our ! * eventual UPDATE will get on it. ! */ ! fk_rel = heap_open(riinfo.fk_relid, RowExclusiveLock); ! pk_rel = trigdata->tg_relation; ! old_row = trigdata->tg_trigtuple; ! ! switch (riinfo.confmatchtype) ! { ! /* ---------- ! * SQL3 11.9 ! * Gereral rules 6) a) ii): ! * MATCH or MATCH FULL ! * ... ON DELETE SET NULL ! * ---------- ! */ ! case FKCONSTR_MATCH_UNSPECIFIED: ! case FKCONSTR_MATCH_FULL: ! ri_BuildQueryKeyFull(&qkey, &riinfo, ! RI_PLAN_EACHSETNULL_DEL_DOUPDATE); ! ! switch (ri_NullCheck(pk_rel, old_row, &qkey, RI_KEYPAIR_PK_IDX)) ! { ! case RI_KEYS_ALL_NULL: ! case RI_KEYS_SOME_NULL: ! ! /* ! * No update - MATCH FULL means there cannot be any ! * reference to old key if it contains NULL ! */ ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! ! case RI_KEYS_NONE_NULL: ! ! /* ! * Have a full qualified key - continue below ! */ ! break; ! } ! ! if (SPI_connect() != SPI_OK_CONNECT) ! elog(ERROR, "SPI_connect failed"); ! ! /* ! * Fetch or prepare a saved plan for the set null delete operation ! */ ! if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) ! { ! StringInfoData querybuf; ! StringInfoData qualbuf; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char attname[MAX_QUOTED_NAME_LEN]; ! char paramname[16]; ! const char *querysep; ! const char *qualsep; ! Oid queryoids[RI_MAX_NUMKEYS]; ! ! /* ---------- ! * The query string built is ! * UPDATE ONLY ! * SET fkatt1 = array_replace(fkatt1, $1, NULL) [, ...] ! * WHERE $1 = ANY(fkatt1) [AND ...] ! * The type id's for the $ parameters are those of the ! * corresponding PK attributes. ! * ---------- ! */ ! initStringInfo(&querybuf); ! initStringInfo(&qualbuf); ! quoteRelationName(fkrelname, fk_rel); ! appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname); ! querysep = ""; ! qualsep = "WHERE"; ! for (i = 0; i < riinfo.nkeys; i++) ! { ! Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); ! Oid fk_type = RIAttType(fk_rel, riinfo.fk_attnums[i]); ! Oid fk_element_type = get_base_element_type(fk_type); ! ! quoteOneName(attname, ! RIAttName(fk_rel, riinfo.fk_attnums[i])); ! appendStringInfo(&querybuf, ! "%s %s = array_replace(%s, $%d", ! querysep, attname, attname, i + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf, ", NULL"); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf,")"); ! sprintf(paramname, "$%d", i + 1); ! ri_GenerateQual(&qualbuf, qualsep, ! paramname, pk_type, ! riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); ! querysep = ","; ! qualsep = "AND"; ! queryoids[i] = pk_type; ! } ! appendStringInfoString(&querybuf, qualbuf.data); ! ! /* Prepare and save the plan */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, true); ! } ! ! /* ! * We have a plan now. Run it to update the existing references. ! */ ! ri_PerformCheck(&qkey, qplan, ! fk_rel, pk_rel, ! old_row, NULL, ! true, /* must detect new rows */ ! SPI_OK_UPDATE, ! NameStr(riinfo.conname)); ! ! if (SPI_finish() != SPI_OK_FINISH) ! elog(ERROR, "SPI_finish failed"); ! ! heap_close(fk_rel, RowExclusiveLock); ! ! return PointerGetDatum(NULL); ! ! /* ! * Handle MATCH PARTIAL set null delete. ! */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("MATCH PARTIAL not yet implemented"))); ! return PointerGetDatum(NULL); ! } ! ! /* ! * Never reached ! */ ! elog(ERROR, "invalid confmatchtype"); ! return PointerGetDatum(NULL); ! } ! ! ! /* ---------- ! * RI_FKey_eachsetnull_upd - ! * ! * Set foreign key array element references to NULL ! * at update event on PK table. ! * ---------- ! */ ! Datum ! RI_FKey_eachsetnull_upd(PG_FUNCTION_ARGS) ! { ! TriggerData *trigdata = (TriggerData *) fcinfo->context; ! RI_ConstraintInfo riinfo; ! Relation fk_rel; ! Relation pk_rel; ! HeapTuple new_row; ! HeapTuple old_row; ! RI_QueryKey qkey; ! SPIPlanPtr qplan; ! int i; ! bool use_cached_query; ! ! /* ! * Check that this is a valid trigger call on the right time and event. ! */ ! ri_CheckTrigger(fcinfo, "RI_FKey_eachsetnull_upd", RI_TRIGTYPE_UPDATE); ! ! /* ! * Get arguments. ! */ ! ri_FetchConstraintInfo(&riinfo, ! trigdata->tg_trigger, trigdata->tg_relation, true); ! ! /* ! * Nothing to do if no column names to compare given ! */ ! if (riinfo.nkeys == 0) ! return PointerGetDatum(NULL); ! ! /* ! * Get the relation descriptors of the FK and PK tables and the new and ! * old tuple. ! * ! * fk_rel is opened in RowExclusiveLock mode since that's what our ! * eventual UPDATE will get on it. ! */ ! fk_rel = heap_open(riinfo.fk_relid, RowExclusiveLock); ! pk_rel = trigdata->tg_relation; ! new_row = trigdata->tg_newtuple; ! old_row = trigdata->tg_trigtuple; ! ! switch (riinfo.confmatchtype) ! { ! /* ---------- ! * SQL3 11.9 ! * Gereral rules 7) a) ii) 2): ! * MATCH FULL ! * ... ON UPDATE SET NULL ! * ---------- ! */ ! case FKCONSTR_MATCH_UNSPECIFIED: ! case FKCONSTR_MATCH_FULL: ! ri_BuildQueryKeyFull(&qkey, &riinfo, ! RI_PLAN_EACHSETNULL_UPD_DOUPDATE); ! ! switch (ri_NullCheck(pk_rel, old_row, &qkey, RI_KEYPAIR_PK_IDX)) ! { ! case RI_KEYS_ALL_NULL: ! case RI_KEYS_SOME_NULL: ! ! /* ! * No update - MATCH FULL means there cannot be any ! * reference to old key if it contains NULL ! */ ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! ! case RI_KEYS_NONE_NULL: ! ! /* ! * Have a full qualified key - continue below ! */ ! break; ! } ! ! /* ! * No need to do anything if old and new keys are equal ! */ ! if (ri_KeysEqual(pk_rel, old_row, new_row, &riinfo, true)) ! { ! heap_close(fk_rel, RowExclusiveLock); ! return PointerGetDatum(NULL); ! } ! ! if (SPI_connect() != SPI_OK_CONNECT) ! elog(ERROR, "SPI_connect failed"); ! ! /* ! * "MATCH " only changes columns corresponding to the ! * referenced columns that have changed in pk_rel. This means the ! * "SET attrn=NULL [, attrn=NULL]" string will be change as well. ! * In this case, we need to build a temporary plan rather than use ! * our cached plan, unless the update happens to change all ! * columns in the key. Fortunately, for the most common case of a ! * single-column foreign key, this will be true. ! * ! * In case you're wondering, the inequality check works because we ! * know that the old key value has no NULLs (see above). ! */ ! ! use_cached_query = (riinfo.confmatchtype == FKCONSTR_MATCH_FULL) || ! ri_AllKeysUnequal(pk_rel, old_row, new_row, ! &riinfo, true); ! ! /* ! * Fetch or prepare a saved plan for the set null update operation ! * if possible, or build a temporary plan if not. ! */ ! if (!use_cached_query || ! (qplan = ri_FetchPreparedPlan(&qkey)) == NULL) ! { ! StringInfoData querybuf; ! StringInfoData qualbuf; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char attname[MAX_QUOTED_NAME_LEN]; ! char paramname[16]; ! const char *querysep; ! const char *qualsep; ! Oid queryoids[RI_MAX_NUMKEYS]; ! ! /* ---------- ! * The query string built is ! * UPDATE ONLY ! * SET fkatt1 = array_replace(fkatt1, $1, NULL) [, ...] ! * WHERE $n = ANY(fkatt1) [AND ...] ! * The type id's for the $ parameters are those of the ! * corresponding PK attributes. ! * ---------- ! */ ! initStringInfo(&querybuf); ! initStringInfo(&qualbuf); ! quoteRelationName(fkrelname, fk_rel); ! appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname); ! querysep = ""; ! qualsep = "WHERE"; ! for (i = 0; i < riinfo.nkeys; i++) ! { ! Oid pk_type = RIAttType(pk_rel, riinfo.pk_attnums[i]); ! Oid fk_type = RIAttType(fk_rel, riinfo.fk_attnums[i]); ! ! quoteOneName(attname, ! RIAttName(fk_rel, riinfo.fk_attnums[i])); ! ! /* ! * MATCH - only change columns corresponding ! * to changed columns in pk_rel's key ! */ ! if (riinfo.confmatchtype == FKCONSTR_MATCH_FULL || ! !ri_OneKeyEqual(pk_rel, i, old_row, new_row, ! &riinfo, true)) ! { ! Oid fk_element_type = get_base_element_type(fk_type); ! appendStringInfo(&querybuf, ! "%s %s = array_replace(%s, $%d", ! querysep, attname, attname, i + 1); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf, ", NULL"); ! if (pk_type != fk_element_type) ! ri_add_cast_to(&querybuf, fk_element_type); ! appendStringInfo(&querybuf,")"); ! querysep = ","; ! } ! sprintf(paramname, "$%d", i + 1); ! ri_GenerateQual(&qualbuf, qualsep, ! paramname, pk_type, ! riinfo.pf_eq_oprs[i], ! attname, fk_type, ! riinfo.fk_each_atts[i]); ! qualsep = "AND"; ! queryoids[i] = pk_type; ! } ! appendStringInfoString(&querybuf, qualbuf.data); ! ! /* ! * Prepare the plan. Save it only if we're building the ! * "standard" plan. ! */ ! qplan = ri_PlanCheck(querybuf.data, riinfo.nkeys, queryoids, ! &qkey, fk_rel, pk_rel, ! use_cached_query); ! } ! ! /* ! * We have a plan now. Run it to update the existing references. ! */ ! ri_PerformCheck(&qkey, qplan, ! fk_rel, pk_rel, ! old_row, NULL, ! true, /* must detect new rows */ ! SPI_OK_UPDATE, ! NameStr(riinfo.conname)); ! ! if (SPI_finish() != SPI_OK_FINISH) ! elog(ERROR, "SPI_finish failed"); ! ! heap_close(fk_rel, RowExclusiveLock); ! ! return PointerGetDatum(NULL); ! ! /* ! * Handle MATCH PARTIAL set null update. ! */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("MATCH PARTIAL not yet implemented"))); ! return PointerGetDatum(NULL); ! } ! ! /* ! * Never reached ! */ ! elog(ERROR, "invalid confmatchtype"); ! return PointerGetDatum(NULL); ! } ! ! ! /* ---------- ! * RI_FKey_keyequal_upd_pk - ! * ! * Check if we have a key change on an update to a PK relation. This is ! * used by the AFTER trigger queue manager to see if it can skip queuing ! * an instance of an RI trigger. ! * ---------- ! */ ! bool ! RI_FKey_keyequal_upd_pk(Trigger *trigger, Relation pk_rel, ! HeapTuple old_row, HeapTuple new_row) ! { ! RI_ConstraintInfo riinfo; ! ! /* ! * Get arguments. ! */ ! ri_FetchConstraintInfo(&riinfo, trigger, pk_rel, true); ! ! /* ! * Nothing to do if no column names to compare given ! */ ! if (riinfo.nkeys == 0) ! return true; ! ! switch (riinfo.confmatchtype) ! { ! case FKCONSTR_MATCH_UNSPECIFIED: ! case FKCONSTR_MATCH_FULL: ! /* Return true if keys are equal */ ! return ri_KeysEqual(pk_rel, old_row, new_row, &riinfo, true); ! ! /* Handle MATCH PARTIAL set null delete. */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("MATCH PARTIAL not yet implemented"))); ! break; ! } ! ! /* Never reached */ ! elog(ERROR, "invalid confmatchtype"); ! return false; ! } ! ! /* ---------- ! * RI_FKey_keyequal_upd_fk - ! * ! * Check if we have a key change on an update to an FK relation. This is ! * used by the AFTER trigger queue manager to see if it can skip queuing ! * an instance of an RI trigger. ! * ---------- ! */ ! bool ! RI_FKey_keyequal_upd_fk(Trigger *trigger, Relation fk_rel, ! HeapTuple old_row, HeapTuple new_row) ! { ! RI_ConstraintInfo riinfo; ! ! /* ! * Get arguments. ! */ ! ri_FetchConstraintInfo(&riinfo, trigger, fk_rel, false); ! ! /* ! * Nothing to do if no column names to compare given ! */ ! if (riinfo.nkeys == 0) ! return true; ! ! switch (riinfo.confmatchtype) ! { ! case FKCONSTR_MATCH_UNSPECIFIED: ! case FKCONSTR_MATCH_FULL: ! /* Return true if keys are equal */ ! return ri_KeysEqual(fk_rel, old_row, new_row, &riinfo, false); ! ! /* Handle MATCH PARTIAL set null delete. */ ! case FKCONSTR_MATCH_PARTIAL: ! ereport(ERROR, ! (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ! errmsg("MATCH PARTIAL not yet implemented"))); ! break; ! } ! ! /* Never reached */ ! elog(ERROR, "invalid confmatchtype"); ! return false; ! } ! ! /* ---------- ! * RI_Initial_Check - ! * ! * Check an entire table for non-matching values using a single query. ! * This is not a trigger procedure, but is called during ALTER TABLE ! * ADD FOREIGN KEY to validate the initial table contents. ! * ! * We expect that the caller has made provision to prevent any problems ! * caused by concurrent actions. This could be either by locking rel and ! * pkrel at ShareRowExclusiveLock or higher, or by otherwise ensuring ! * that triggers implementing the checks are already active. ! * Hence, we do not need to lock individual rows for the check. ! * ! * If the check fails because the current user doesn't have permissions ! * to read both tables, return false to let our caller know that they will ! * need to do something else to check the constraint. ! * ---------- ! */ ! bool ! RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) ! { ! RI_ConstraintInfo riinfo; ! const char *constrname = trigger->tgname; ! StringInfoData querybuf; ! StringInfoData qualbuf; ! StringInfoData recheckbuf; ! char pkrelname[MAX_QUOTED_REL_NAME_LEN]; ! char fkrelname[MAX_QUOTED_REL_NAME_LEN]; char pkattname[MAX_QUOTED_NAME_LEN + 3]; char fkattname[MAX_QUOTED_NAME_LEN + 3]; RangeTblEntry *pkrte; RangeTblEntry *fkrte; ! const char *sep, *qual_sep, *recheck_sep; int i; int save_nestlevel; char workmembuf[32]; *************** *** 2678,2695 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) * The query string built is: * SELECT fk.keycols FROM ONLY relname fk * LEFT OUTER JOIN ONLY pkrelname pk ! * ON (pk.pkkeycol1=fk.keycol1 [AND ...]) * WHERE pk.pkkeycol1 IS NULL AND * For MATCH unspecified: * (fk.keycol1 IS NOT NULL [AND ...]) * For MATCH FULL: * (fk.keycol1 IS NOT NULL [OR ...]) * * We attach COLLATE clauses to the operators when comparing columns * that have different collations. *---------- */ initStringInfo(&querybuf); appendStringInfo(&querybuf, "SELECT "); sep = ""; for (i = 0; i < riinfo.nkeys; i++) --- 3527,3557 ---- * The query string built is: * SELECT fk.keycols FROM ONLY relname fk * LEFT OUTER JOIN ONLY pkrelname pk ! * ON (pk.pkkeycol1=fk.keycol1 [AND ...] ! * [AND NOT EXISTS()]) * WHERE pk.pkkeycol1 IS NULL AND * For MATCH unspecified: * (fk.keycol1 IS NOT NULL [AND ...]) * For MATCH FULL: * (fk.keycol1 IS NOT NULL [OR ...]) * + * In case of an EACH foreign key, a recheck subquery is added to + * the join condition in order to check that every combination of keys + * is actually referenced. + * The RECHECK_SUBQUERY is + * SELECT 1 FROM + * unnest(fk.keycol1) x1(x1) [CROSS JOIN ...] + * LEFT OUTER JOIN ONLY pkrelname pk + * ON (pk.pkkeycol1=x1.x1 [AND ...]) + * WHERE pk.pkkeycol1 IS NULL AND + * (fk.keycol1 IS NOT NULL [AND ...]) + * * We attach COLLATE clauses to the operators when comparing columns * that have different collations. *---------- */ initStringInfo(&querybuf); + initStringInfo(&qualbuf); appendStringInfo(&querybuf, "SELECT "); sep = ""; for (i = 0; i < riinfo.nkeys; i++) *************** *** 2706,2711 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) --- 3568,3584 ---- " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON", fkrelname, pkrelname); + if (riinfo.confiseach) + { + initStringInfo(&recheckbuf); + appendStringInfo(&recheckbuf, + "SELECT 1 FROM"); + appendStringInfo(&qualbuf, + "LEFT OUTER JOIN ONLY %s pk ON", + pkrelname); + recheck_sep = ""; + qual_sep = "("; + } strcpy(pkattname, "pk."); strcpy(fkattname, "fk."); sep = "("; *************** *** 2723,2748 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) ri_GenerateQual(&querybuf, sep, pkattname, pk_type, riinfo.pf_eq_oprs[i], ! fkattname, fk_type); if (pk_coll != fk_coll) ri_GenerateQualCollation(&querybuf, pk_coll); sep = "AND"; } /* * It's sufficient to test any one pk attribute for null to detect a join ! * failure. */ quoteOneName(pkattname, RIAttName(pk_rel, riinfo.pk_attnums[0])); ! appendStringInfo(&querybuf, ") WHERE pk.%s IS NULL AND (", pkattname); sep = ""; for (i = 0; i < riinfo.nkeys; i++) { quoteOneName(fkattname, RIAttName(fk_rel, riinfo.fk_attnums[i])); ! appendStringInfo(&querybuf, "%sfk.%s IS NOT NULL", sep, fkattname); switch (riinfo.confmatchtype) { case FKCONSTR_MATCH_UNSPECIFIED: --- 3596,3664 ---- ri_GenerateQual(&querybuf, sep, pkattname, pk_type, riinfo.pf_eq_oprs[i], ! fkattname, fk_type, ! riinfo.fk_each_atts[i]); if (pk_coll != fk_coll) ri_GenerateQualCollation(&querybuf, pk_coll); sep = "AND"; + + /* + * In case of an EACH foreign key, we check if there is at least + * a value in the array that is not present in the PK table. + */ + if (riinfo.fk_each_atts[i]) { + Oid fk_element_type = get_base_element_type(fk_type); + char unnest_name[16]; + + sprintf(unnest_name, "x%d.x%d", i + 1, i + 1); + appendStringInfo(&recheckbuf, + " %s unnest(%s) x%d(x%d)", + recheck_sep, fkattname, i + 1, i + 1); + ri_GenerateQual(&qualbuf, qual_sep, + pkattname, pk_type, + riinfo.pf_eq_oprs[i], + unnest_name, fk_element_type, + false); + if (pk_coll != fk_coll) + ri_GenerateQualCollation(&qualbuf, pk_coll); + recheck_sep = "CROSS JOIN"; + qual_sep = "AND"; + } } /* * It's sufficient to test any one pk attribute for null to detect a join ! * failure in the recheck subquery. */ quoteOneName(pkattname, RIAttName(pk_rel, riinfo.pk_attnums[0])); ! if (riinfo.confiseach) ! { ! appendStringInfo(&recheckbuf, " %s) WHERE pk.%s IS NULL AND (", ! qualbuf.data, pkattname); ! resetStringInfo(&qualbuf); ! recheck_sep = ""; ! } sep = ""; for (i = 0; i < riinfo.nkeys; i++) { quoteOneName(fkattname, RIAttName(fk_rel, riinfo.fk_attnums[i])); ! appendStringInfo(&qualbuf, "%sfk.%s IS NOT NULL", sep, fkattname); + + /* + * In the recheck subquery we always ignore any NULL value + * inside the array. + */ + if (riinfo.fk_each_atts[i]) + { + appendStringInfo(&recheckbuf, + "%sx%d.x%d IS NOT NULL", + recheck_sep, i + 1, i + 1); + recheck_sep = " AND "; + } + switch (riinfo.confmatchtype) { case FKCONSTR_MATCH_UNSPECIFIED: *************** *** 2762,2768 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel) break; } } ! appendStringInfo(&querybuf, ")"); /* * Temporarily increase work_mem so that the check query can be executed --- 3678,3692 ---- break; } } ! if (riinfo.confiseach) ! appendStringInfo(&querybuf, " AND NOT EXISTS(%s))", recheckbuf.data); ! ! /* ! * It's sufficient to test any one pk attribute for null to detect a join ! * failure. ! */ ! appendStringInfo(&querybuf, ") WHERE pk.%s IS NULL AND (%s)", ! pkattname, qualbuf.data); /* * Temporarily increase work_mem so that the check query can be executed *************** *** 2928,2934 **** ri_GenerateQual(StringInfo buf, const char *sep, const char *leftop, Oid leftoptype, Oid opoid, ! const char *rightop, Oid rightoptype) { HeapTuple opertup; Form_pg_operator operform; --- 3852,3859 ---- const char *sep, const char *leftop, Oid leftoptype, Oid opoid, ! const char *rightop, Oid rightoptype, ! bool is_array) { HeapTuple opertup; Form_pg_operator operform; *************** *** 2949,2957 **** ri_GenerateQual(StringInfo buf, ri_add_cast_to(buf, operform->oprleft); appendStringInfo(buf, " OPERATOR(%s.", quote_identifier(nspname)); appendStringInfoString(buf, oprname); ! appendStringInfo(buf, ") %s", rightop); ! if (rightoptype != operform->oprright) ! ri_add_cast_to(buf, operform->oprright); ReleaseSysCache(opertup); } --- 3874,3896 ---- ri_add_cast_to(buf, operform->oprleft); appendStringInfo(buf, " OPERATOR(%s.", quote_identifier(nspname)); appendStringInfoString(buf, oprname); ! /* ! * If rightoptype is an array of leftoptype check equality using ANY(). ! * Needed for array support in foreign keys. ! */ ! if (is_array) ! { ! appendStringInfo(buf, ") ANY (%s", rightop); ! if (rightoptype != get_array_type (operform->oprright)) ! ri_add_cast_to(buf, get_array_type (operform->oprright)); ! appendStringInfo(buf, ")"); ! } ! else ! { ! appendStringInfo(buf, ") %s", rightop); ! if (rightoptype != operform->oprright) ! ri_add_cast_to(buf, operform->oprright); ! } ReleaseSysCache(opertup); } *************** *** 3179,3184 **** ri_FetchConstraintInfo(RI_ConstraintInfo *riinfo, --- 4118,4124 ---- riinfo->confupdtype = conForm->confupdtype; riinfo->confdeltype = conForm->confdeltype; riinfo->confmatchtype = conForm->confmatchtype; + riinfo->confiseach = conForm->confiseach; /* * We expect the arrays to be 1-D arrays of the right types; verify that. *************** *** 3219,3224 **** ri_FetchConstraintInfo(RI_ConstraintInfo *riinfo, --- 4159,4180 ---- pfree(arr); /* free de-toasted copy, if any */ adatum = SysCacheGetAttr(CONSTROID, tup, + Anum_pg_constraint_confeach, &isNull); + if (isNull) + elog(ERROR, "null confeach for constraint %u", constraintOid); + arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ + numkeys = ARR_DIMS(arr)[0]; + if (ARR_NDIM(arr) != 1 || + numkeys != riinfo->nkeys || + numkeys > RI_MAX_NUMKEYS || + ARR_HASNULL(arr) || + ARR_ELEMTYPE(arr) != BOOLOID) + elog(ERROR, "confeach is not a 1-D boolean array"); + memcpy(riinfo->fk_each_atts, ARR_DATA_PTR(arr), numkeys * sizeof(int16)); + if ((Pointer) arr != DatumGetPointer(adatum)) + pfree(arr); /* free de-toasted copy, if any */ + + adatum = SysCacheGetAttr(CONSTROID, tup, Anum_pg_constraint_conpfeqop, &isNull); if (isNull) elog(ERROR, "null conpfeqop for constraint %u", constraintOid); *************** *** 4077,4086 **** RI_FKey_trigger_type(Oid tgfoid) --- 5033,5046 ---- { case F_RI_FKEY_CASCADE_DEL: case F_RI_FKEY_CASCADE_UPD: + case F_RI_FKEY_EACHCASCADE_DEL: + case F_RI_FKEY_EACHCASCADE_UPD: case F_RI_FKEY_RESTRICT_DEL: case F_RI_FKEY_RESTRICT_UPD: case F_RI_FKEY_SETNULL_DEL: case F_RI_FKEY_SETNULL_UPD: + case F_RI_FKEY_EACHSETNULL_DEL: + case F_RI_FKEY_EACHSETNULL_UPD: case F_RI_FKEY_SETDEFAULT_DEL: case F_RI_FKEY_SETDEFAULT_UPD: case F_RI_FKEY_NOACTION_DEL: *** a/src/backend/utils/adt/ruleutils.c --- b/src/backend/utils/adt/ruleutils.c *************** *** 159,164 **** static char *pg_get_viewdef_worker(Oid viewoid, int prettyFlags); --- 159,167 ---- static char *pg_get_triggerdef_worker(Oid trigid, bool pretty); static void decompile_column_index_array(Datum column_index_array, Oid relId, StringInfo buf); + static void decompile_fk_column_index_array(Datum column_index_array, + Datum each_array, + Oid relId, StringInfo buf); static char *pg_get_ruledef_worker(Oid ruleoid, int prettyFlags); static char *pg_get_indexdef_worker(Oid indexrelid, int colno, const Oid *excludeOps, *************** *** 1144,1149 **** pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, --- 1147,1153 ---- case CONSTRAINT_FOREIGN: { Datum val; + Datum each; bool isnull; const char *string; *************** *** 1156,1166 **** pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, if (isnull) elog(ERROR, "null conkey for constraint %u", constraintId); ! decompile_column_index_array(val, conForm->conrelid, &buf); /* add foreign relation name */ ! appendStringInfo(&buf, ") REFERENCES %s(", generate_relation_name(conForm->confrelid, NIL)); --- 1160,1177 ---- if (isnull) elog(ERROR, "null conkey for constraint %u", constraintId); + each = SysCacheGetAttr(CONSTROID, tup, + Anum_pg_constraint_confeach, &isnull); + if (isnull) + elog(ERROR, "null confeach for constraint %u", + constraintId); ! decompile_fk_column_index_array(val, each, conForm->conrelid, &buf); ! ! appendStringInfo(&buf, ") REFERENCES "); /* add foreign relation name */ ! appendStringInfo(&buf, "%s(", generate_relation_name(conForm->confrelid, NIL)); *************** *** 1207,1215 **** pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, --- 1218,1232 ---- case FKCONSTR_ACTION_CASCADE: string = "CASCADE"; break; + case FKCONSTR_ACTION_ARRCASCADE: + string = "EACH CASCADE"; + break; case FKCONSTR_ACTION_SETNULL: string = "SET NULL"; break; + case FKCONSTR_ACTION_ARRSETNULL: + string = "EACH SET NULL"; + break; case FKCONSTR_ACTION_SETDEFAULT: string = "SET DEFAULT"; break; *************** *** 1233,1241 **** pg_get_constraintdef_worker(Oid constraintId, bool fullCommand, --- 1250,1264 ---- case FKCONSTR_ACTION_CASCADE: string = "CASCADE"; break; + case FKCONSTR_ACTION_ARRCASCADE: + string = "EACH CASCADE"; + break; case FKCONSTR_ACTION_SETNULL: string = "SET NULL"; break; + case FKCONSTR_ACTION_ARRSETNULL: + string = "EACH SET NULL"; + break; case FKCONSTR_ACTION_SETDEFAULT: string = "SET DEFAULT"; break; *************** *** 1440,1445 **** decompile_column_index_array(Datum column_index_array, Oid relId, --- 1463,1513 ---- } } + /* + * Convert an int16[] Datum and an bool[] Datum into a comma-separated + * list of column names for the indicated relation prefixed by + * an optional EACH keyword; append the list to buf. + * + * The two arrays must have the same cardinality. + */ + static void + decompile_fk_column_index_array(Datum column_index_array, + Datum each_array, + Oid relId, StringInfo buf) + { + Datum *keys; + int nKeys; + Datum *bools; + int nBools; + int j; + + /* Extract data from array of int16 */ + deconstruct_array(DatumGetArrayTypeP(column_index_array), + INT2OID, 2, true, 's', + &keys, NULL, &nKeys); + + /* Extract data from array of bool */ + deconstruct_array(DatumGetArrayTypeP(each_array), + BOOLOID, 1, true, 'c', + &bools, NULL, &nBools); + + if (nKeys != nBools) + elog(ERROR, "wrong confeach cardinality"); + + for (j = 0; j < nKeys; j++) + { + char *colName; + char *each; + + colName = get_relid_attribute_name(relId, DatumGetInt16(keys[j])); + each = DatumGetBool(bools[j])?"EACH ":""; + + if (j == 0) + appendStringInfo(buf, "%s%s", each, quote_identifier(colName)); + else + appendStringInfo(buf, ", %s%s", each, quote_identifier(colName)); + } + } /* ---------- * get_expr - Decompile an expression tree *** a/src/include/catalog/pg_constraint.h --- b/src/include/catalog/pg_constraint.h *************** *** 91,96 **** CATALOG(pg_constraint,2606) --- 91,99 ---- /* Has a local definition and cannot be inherited */ bool conisonly; + /* true if an EACH REFERENCE foreign key */ + bool confiseach; + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* * Columns of conrelid that the constraint applies to, if known (this is *************** *** 104,109 **** CATALOG(pg_constraint,2606) --- 107,118 ---- int2 confkey[1]; /* + * If a foreign key, true if EACH foreign key for each column of + * the constraint + */ + bool confeach[1]; + + /* * If a foreign key, the OIDs of the PK = FK equality operators for each * column of the constraint */ *************** *** 150,156 **** typedef FormData_pg_constraint *Form_pg_constraint; * compiler constants for pg_constraint * ---------------- */ ! #define Natts_pg_constraint 24 #define Anum_pg_constraint_conname 1 #define Anum_pg_constraint_connamespace 2 #define Anum_pg_constraint_contype 3 --- 159,165 ---- * compiler constants for pg_constraint * ---------------- */ ! #define Natts_pg_constraint 26 #define Anum_pg_constraint_conname 1 #define Anum_pg_constraint_connamespace 2 #define Anum_pg_constraint_contype 3 *************** *** 167,180 **** typedef FormData_pg_constraint *Form_pg_constraint; #define Anum_pg_constraint_conislocal 14 #define Anum_pg_constraint_coninhcount 15 #define Anum_pg_constraint_conisonly 16 ! #define Anum_pg_constraint_conkey 17 ! #define Anum_pg_constraint_confkey 18 ! #define Anum_pg_constraint_conpfeqop 19 ! #define Anum_pg_constraint_conppeqop 20 ! #define Anum_pg_constraint_conffeqop 21 ! #define Anum_pg_constraint_conexclop 22 ! #define Anum_pg_constraint_conbin 23 ! #define Anum_pg_constraint_consrc 24 /* Valid values for contype */ --- 176,191 ---- #define Anum_pg_constraint_conislocal 14 #define Anum_pg_constraint_coninhcount 15 #define Anum_pg_constraint_conisonly 16 ! #define Anum_pg_constraint_coniseach 17 ! #define Anum_pg_constraint_conkey 18 ! #define Anum_pg_constraint_confkey 19 ! #define Anum_pg_constraint_confeach 20 ! #define Anum_pg_constraint_conpfeqop 21 ! #define Anum_pg_constraint_conppeqop 22 ! #define Anum_pg_constraint_conffeqop 23 ! #define Anum_pg_constraint_conexclop 24 ! #define Anum_pg_constraint_conbin 25 ! #define Anum_pg_constraint_consrc 26 /* Valid values for contype */ *************** *** 221,226 **** extern Oid CreateConstraintEntry(const char *constraintName, --- 232,239 ---- const Oid *ppEqOp, const Oid *ffEqOp, int foreignNKeys, + bool confisEach, + const bool *foreignEach, char foreignUpdateType, char foreignDeleteType, char foreignMatchType, *** a/src/include/catalog/pg_proc.h --- b/src/include/catalog/pg_proc.h *************** *** 870,875 **** DESCR("aggregate final function"); --- 870,880 ---- DATA(insert OID = 2335 ( array_agg PGNSP PGUID 12 1 0 0 0 t f f f f f i 1 0 2277 "2283" _null_ _null_ _null_ _null_ aggregate_dummy _null_ _null_ _null_ )); DESCR("concatenate aggregate input into an array"); + DATA(insert OID = 3160 ( array_remove PGNSP PGUID 12 1 0 0 0 f f f f f f i 2 0 2277 "2277 2283" _null_ _null_ _null_ _null_ array_remove _null_ _null_ _null_ )); + DESCR("remove any occurrence of an element from an array"); + DATA(insert OID = 3161 ( array_replace PGNSP PGUID 12 1 0 0 0 f f f f f f i 3 0 2277 "2277 2283 2283" _null_ _null_ _null_ _null_ array_replace _null_ _null_ _null_ )); + DESCR("replace any occurrence of an element in an array"); + DATA(insert OID = 760 ( smgrin PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 210 "2275" _null_ _null_ _null_ _null_ smgrin _null_ _null_ _null_ )); DESCR("I/O"); DATA(insert OID = 761 ( smgrout PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 2275 "210" _null_ _null_ _null_ _null_ smgrout _null_ _null_ _null_ )); *************** *** 1984,1989 **** DESCR("referential integrity ON DELETE NO ACTION"); --- 1989,2003 ---- DATA(insert OID = 1655 ( RI_FKey_noaction_upd PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_noaction_upd _null_ _null_ _null_ )); DESCR("referential integrity ON UPDATE NO ACTION"); + DATA(insert OID = 3162 ( RI_FKey_eachcascade_del PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_eachcascade_del _null_ _null_ _null_ )); + DESCR("referential integrity ON DELETE EACH CASCADE"); + DATA(insert OID = 3164 ( RI_FKey_eachcascade_upd PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_eachcascade_upd _null_ _null_ _null_ )); + DESCR("referential integrity ON UPDATE EACH CASCADE"); + DATA(insert OID = 3165 ( RI_FKey_eachsetnull_del PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_eachsetnull_del _null_ _null_ _null_ )); + DESCR("referential integrity ON DELETE EACH SET NULL"); + DATA(insert OID = 3166 ( RI_FKey_eachsetnull_upd PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 2279 "" _null_ _null_ _null_ _null_ RI_FKey_eachsetnull_upd _null_ _null_ _null_ )); + DESCR("referential integrity ON UPDATE EACH SET NULL"); + DATA(insert OID = 1666 ( varbiteq PGNSP PGUID 12 1 0 0 0 f f f t t f i 2 0 16 "1562 1562" _null_ _null_ _null_ _null_ biteq _null_ _null_ _null_ )); DATA(insert OID = 1667 ( varbitne PGNSP PGUID 12 1 0 0 0 f f f t t f i 2 0 16 "1562 1562" _null_ _null_ _null_ _null_ bitne _null_ _null_ _null_ )); DATA(insert OID = 1668 ( varbitge PGNSP PGUID 12 1 0 0 0 f f f t t f i 2 0 16 "1562 1562" _null_ _null_ _null_ _null_ bitge _null_ _null_ _null_ )); *** a/src/include/nodes/nodes.h --- b/src/include/nodes/nodes.h *************** *** 395,400 **** typedef enum NodeTag --- 395,401 ---- T_XmlSerialize, T_WithClause, T_CommonTableExpr, + T_ForeignKeyColumnElem, /* * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h) *** a/src/include/nodes/parsenodes.h --- b/src/include/nodes/parsenodes.h *************** *** 570,575 **** typedef struct DefElem --- 570,589 ---- } DefElem; /* + * ForeignKeyColumnElem - foreign key column (used in foreign key constaint) + * + * For a foreign key attribute, 'name' is the name of the table column to + * index, and each is true if it is an EACH fk. + */ + typedef struct ForeignKeyColumnElem + { + NodeTag type; + Node *name; /* name of the column, or NULL */ + bool each; /* true if an EACH foreign key */ + + } ForeignKeyColumnElem; + + /* * LockingClause - raw representation of FOR UPDATE/SHARE options * * Note: lockedRels == NIL means "all relations in query". Otherwise it *************** *** 1510,1515 **** typedef enum ConstrType /* types of constraints */ --- 1524,1531 ---- #define FKCONSTR_ACTION_CASCADE 'c' #define FKCONSTR_ACTION_SETNULL 'n' #define FKCONSTR_ACTION_SETDEFAULT 'd' + #define FKCONSTR_ACTION_ARRCASCADE 'C' + #define FKCONSTR_ACTION_ARRSETNULL 'N' /* Foreign key matchtype codes */ #define FKCONSTR_MATCH_FULL 'f' *************** *** 1552,1557 **** typedef struct Constraint --- 1568,1575 ---- char fk_matchtype; /* FULL, PARTIAL, UNSPECIFIED */ char fk_upd_action; /* ON UPDATE action */ char fk_del_action; /* ON DELETE action */ + bool fk_is_each; /* is EACH REFERENCE foreign key? */ + List *fk_each_attrs; /* EACH REFERENCE attrs */ /* Fields used for constraints that allow a NOT VALID specification */ bool skip_validation; /* skip validation of existing rows? */ *** a/src/include/utils/array.h --- b/src/include/utils/array.h *************** *** 211,216 **** extern Datum generate_subscripts_nodir(PG_FUNCTION_ARGS); --- 211,218 ---- extern Datum array_fill(PG_FUNCTION_ARGS); extern Datum array_fill_with_lower_bounds(PG_FUNCTION_ARGS); extern Datum array_unnest(PG_FUNCTION_ARGS); + extern Datum array_remove(PG_FUNCTION_ARGS); + extern Datum array_replace(PG_FUNCTION_ARGS); extern Datum array_ref(ArrayType *array, int nSubscripts, int *indx, int arraytyplen, int elmlen, bool elmbyval, char elmalign, *** a/src/include/utils/builtins.h --- b/src/include/utils/builtins.h *************** *** 984,993 **** extern Datum RI_FKey_noaction_del(PG_FUNCTION_ARGS); --- 984,997 ---- extern Datum RI_FKey_noaction_upd(PG_FUNCTION_ARGS); extern Datum RI_FKey_cascade_del(PG_FUNCTION_ARGS); extern Datum RI_FKey_cascade_upd(PG_FUNCTION_ARGS); + extern Datum RI_FKey_eachcascade_del(PG_FUNCTION_ARGS); + extern Datum RI_FKey_eachcascade_upd(PG_FUNCTION_ARGS); extern Datum RI_FKey_restrict_del(PG_FUNCTION_ARGS); extern Datum RI_FKey_restrict_upd(PG_FUNCTION_ARGS); extern Datum RI_FKey_setnull_del(PG_FUNCTION_ARGS); extern Datum RI_FKey_setnull_upd(PG_FUNCTION_ARGS); + extern Datum RI_FKey_eachsetnull_del(PG_FUNCTION_ARGS); + extern Datum RI_FKey_eachsetnull_upd(PG_FUNCTION_ARGS); extern Datum RI_FKey_setdefault_del(PG_FUNCTION_ARGS); extern Datum RI_FKey_setdefault_upd(PG_FUNCTION_ARGS); *** a/src/test/regress/expected/arrays.out --- b/src/test/regress/expected/arrays.out *************** *** 1542,1547 **** select unnest(array[1,2,3,null,4,null,null,5,6]::text[]); --- 1542,1603 ---- 6 (9 rows) + select array_remove(array[1,2,2,3], 2); + array_remove + -------------- + {1,3} + (1 row) + + select array_remove(array[1,2,2,3], 5); + array_remove + -------------- + {1,2,2,3} + (1 row) + + select array_remove(array[1,NULL,NULL,3], NULL); + array_remove + -------------- + {1,3} + (1 row) + + select array_remove(array['A','C','D','C','R'], 'R'); + array_remove + -------------- + {A,C,D,C} + (1 row) + + select array_remove(array[1,3,4,5,6,10], 5); + array_remove + -------------- + {1,3,4,6,10} + (1 row) + + select array_remove('{{1,2,2},{1,4,3}}', 2); -- fails + ERROR: removing elements is not supported on multidimensional arrays + select array_replace(array[1,2,5,4],5,3); + array_replace + --------------- + {1,2,3,4} + (1 row) + + select array_replace(array[1,2,5,4],5,NULL); + array_replace + --------------- + {1,2,NULL,4} + (1 row) + + select array_replace(array[1,2,NULL,4,NULL],NULL,5); + array_replace + --------------- + {1,2,5,4,5} + (1 row) + + select array_replace(array['A','B','D','B'],'B','C'); + array_replace + --------------- + {A,C,D,C} + (1 row) + -- Insert/update on a column that is array of composite create temp table t1 (f1 int8_tbl[]); insert into t1 (f1[5].q1) values(42); *** a/src/test/regress/expected/btree_index.out --- b/src/test/regress/expected/btree_index.out *************** *** 106,129 **** set enable_seqscan to false; set enable_indexscan to true; set enable_bitmapscan to false; select proname from pg_proc where proname like E'RI\\_FKey%del' order by 1; ! proname ! ------------------------ RI_FKey_cascade_del RI_FKey_noaction_del RI_FKey_restrict_del RI_FKey_setdefault_del RI_FKey_setnull_del ! (5 rows) set enable_indexscan to false; set enable_bitmapscan to true; select proname from pg_proc where proname like E'RI\\_FKey%del' order by 1; ! proname ! ------------------------ RI_FKey_cascade_del RI_FKey_noaction_del RI_FKey_restrict_del RI_FKey_setdefault_del RI_FKey_setnull_del ! (5 rows) --- 106,133 ---- set enable_indexscan to true; set enable_bitmapscan to false; select proname from pg_proc where proname like E'RI\\_FKey%del' order by 1; ! proname ! ------------------------- RI_FKey_cascade_del + RI_FKey_eachcascade_del + RI_FKey_eachsetnull_del RI_FKey_noaction_del RI_FKey_restrict_del RI_FKey_setdefault_del RI_FKey_setnull_del ! (7 rows) set enable_indexscan to false; set enable_bitmapscan to true; select proname from pg_proc where proname like E'RI\\_FKey%del' order by 1; ! proname ! ------------------------- RI_FKey_cascade_del + RI_FKey_eachcascade_del + RI_FKey_eachsetnull_del RI_FKey_noaction_del RI_FKey_restrict_del RI_FKey_setdefault_del RI_FKey_setnull_del ! (7 rows) *** /dev/null --- b/src/test/regress/expected/each_foreign_key.out *************** *** 0 **** --- 1,1044 ---- + -- EACH FK CONSTRAINTS (FOREIGN KEY ARRAYS) + -- + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 text ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Insert test data into PKTABLEFORARRAY + INSERT INTO PKTABLEFORARRAY VALUES (1, 'Test1'); + INSERT INTO PKTABLEFORARRAY VALUES (2, 'Test2'); + INSERT INTO PKTABLEFORARRAY VALUES (3, 'Test3'); + INSERT INTO PKTABLEFORARRAY VALUES (4, 'Test4'); + INSERT INTO PKTABLEFORARRAY VALUES (5, 'Test5'); + -- Check alter table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAY; + -- Check alter table with rows + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 1); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAY; + -- Check alter table with failing rows + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{10,1}', 2); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fkarray" + DETAIL: Key (ftest1)=({10,1}) is not present in table "pktableforarray". + DROP TABLE FKTABLEFORARRAY; + -- Check create table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + CREATE TABLE FKTABLEFORARRAYMDIM ( ftest1 int[][] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + CREATE TABLE FKTABLEFORARRAYNOTNULL ( ftest1 int[] NOT NULL EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + -- Insert successful rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 3); + INSERT INTO FKTABLEFORARRAY VALUES ('{2}', 4); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 5); + INSERT INTO FKTABLEFORARRAY VALUES ('{3}', 6); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 7); + INSERT INTO FKTABLEFORARRAY VALUES ('{4,5}', 8); + INSERT INTO FKTABLEFORARRAY VALUES ('{4,4}', 9); + INSERT INTO FKTABLEFORARRAY VALUES (NULL, 10); + INSERT INTO FKTABLEFORARRAY VALUES ('{}', 11); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,NULL}', 12); + INSERT INTO FKTABLEFORARRAY VALUES ('{NULL}', 13); + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{4,5},{1,2},{1,3}}', 14); + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{4,5},{NULL,2},{NULL,3}}', 15); + -- Insert failed rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{6}', 16); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({6}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES ('{4,6}', 17); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({4,6}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES ('{6,NULL}', 18); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({6,NULL}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES ('{6,NULL,4,NULL}', 19); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({6,NULL,4,NULL}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{1,2},{6,NULL}}', 20); + ERROR: insert or update on table "fktableforarraymdim" violates foreign key constraint "fktableforarraymdim_ftest1_fkey" + DETAIL: Key (ftest1)=({{1,2},{6,NULL}}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAYNOTNULL VALUES (NULL, 21); + ERROR: null value in column "ftest1" violates not-null constraint + DETAIL: Failing row contains (null, 21). + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ----------+-------- + {1} | 3 + {2} | 4 + {1} | 5 + {3} | 6 + {1} | 7 + {4,5} | 8 + {4,4} | 9 + | 10 + {} | 11 + {1,NULL} | 12 + {NULL} | 13 + (11 rows) + + -- Delete a row from PK TABLE (must fail due to ON DELETE NO ACTION) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + ERROR: update or delete on table "pktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" on table "fktableforarray" + DETAIL: Key (ptest1)=(1) is still referenced from table "fktableforarray". + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ----------+-------- + {1} | 3 + {2} | 4 + {1} | 5 + {3} | 6 + {1} | 7 + {4,5} | 8 + {4,4} | 9 + | 10 + {} | 11 + {1,NULL} | 12 + {NULL} | 13 + (11 rows) + + -- Update a row from PK TABLE (must fail due to ON UPDATE NO ACTION) + UPDATE PKTABLEFORARRAY SET ptest1=7 WHERE ptest1=1; + ERROR: update or delete on table "pktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" on table "fktableforarray" + DETAIL: Key (ptest1)=(1) is still referenced from table "fktableforarray". + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ----------+-------- + {1} | 3 + {2} | 4 + {1} | 5 + {3} | 6 + {1} | 7 + {4,5} | 8 + {4,4} | 9 + | 10 + {} | 11 + {1,NULL} | 12 + {NULL} | 13 + (11 rows) + + -- Check UPDATE on FKTABLE + UPDATE FKTABLEFORARRAY SET ftest1=ARRAY[1] WHERE ftest2=4; + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ----------+-------- + {1} | 3 + {1} | 5 + {3} | 6 + {1} | 7 + {4,5} | 8 + {4,4} | 9 + | 10 + {} | 11 + {1,NULL} | 12 + {NULL} | 13 + {1} | 4 + (11 rows) + + DROP TABLE FKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAYNOTNULL; + DROP TABLE FKTABLEFORARRAYMDIM; + -- Allowed references with actions + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE SET NULL, ftest2 int ); + INSERT INTO PKTABLEFORARRAY VALUES (100, 'TO BE UPDATED'); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,100}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 0 + count + ------- + 0 + (1 row) + + UPDATE PKTABLEFORARRAY SET ptest1 = 1000, ptest2 = 'TO BE REMOVED' WHERE ptest1 = 100; + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 1 + count + ------- + 1 + (1 row) + + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE NO ACTION, ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,1000}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 1 + count + ------- + 1 + (1 row) + + DELETE FROM PKTABLEFORARRAY WHERE ptest1 = 1000; -- From the previous example + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 0 + count + ------- + 0 + (1 row) + + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE NO ACTION, ftest2 int ); + INSERT INTO PKTABLEFORARRAY VALUES (100, 'TO BE REMOVED'); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,100}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 1 + count + ------- + 1 + (1 row) + + DELETE FROM PKTABLEFORARRAY WHERE ptest1 = 100; + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 1 + count + ------- + 1 + (1 row) + + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + -- Not allowed references (ON UPDATE CASCADE) + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Use ON UPDATE EACH CASCADE, instead + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Use ON UPDATE EACH CASCADE, instead + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Use ON UPDATE EACH CASCADE, instead + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Use ON UPDATE EACH CASCADE, instead + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Use ON UPDATE EACH CASCADE, instead + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Use ON UPDATE EACH CASCADE, instead + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE CASCADE, ftest2 int ); + ERROR: ON UPDATE CASCADE action is not supported on EACH foreign keys + HINT: Use ON UPDATE EACH CASCADE, instead + DROP TABLE FKTABLEFORARRAY; + ERROR: table "fktableforarray" does not exist + -- Cleanup + DROP TABLE PKTABLEFORARRAY; + -- Check reference on empty table + CREATE TABLE PKTABLEFORARRAY (ptest1 int PRIMARY KEY); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + CREATE TABLE FKTABLEFORARRAY (ftest1 int[] EACH REFERENCES PKTABLEFORARRAY); + INSERT INTO FKTABLEFORARRAY VALUES ('{}'); + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Repeat a similar test using CHAR(1) keys rather than INTEGER + CREATE TABLE PKTABLEFORARRAY ( ptest1 CHAR(1) PRIMARY KEY, ptest2 text ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'Test A'); + INSERT INTO PKTABLEFORARRAY VALUES ('B', 'Test B'); + INSERT INTO PKTABLEFORARRAY VALUES ('C', 'Test C'); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 char(1)[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE RESTRICT ON DELETE RESTRICT, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{"A"}', 1); + INSERT INTO FKTABLEFORARRAY VALUES ('{"B"}', 2); + INSERT INTO FKTABLEFORARRAY VALUES ('{"C"}', 3); + INSERT INTO FKTABLEFORARRAY VALUES ('{"A","B","C"}', 4); + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{"D"}', 5); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({D}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES ('{"A","B","D"}', 6); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({A,B,D}) is not present in table "pktableforarray". + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------+-------- + {A} | 1 + {B} | 2 + {C} | 3 + {A,B,C} | 4 + (4 rows) + + -- Delete a row from PK TABLE (must fail due to ON DELETE RESTRICT) + DELETE FROM PKTABLEFORARRAY WHERE ptest1='A'; + ERROR: update or delete on table "pktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" on table "fktableforarray" + DETAIL: Key (ptest1)=(A) is still referenced from table "fktableforarray". + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------+-------- + {A} | 1 + {B} | 2 + {C} | 3 + {A,B,C} | 4 + (4 rows) + + -- Update a row from PK TABLE (must fail due to ON UPDATE RESTRICT) + UPDATE PKTABLEFORARRAY SET ptest1='D' WHERE ptest1='B'; + ERROR: update or delete on table "pktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" on table "fktableforarray" + DETAIL: Key (ptest1)=(B) is still referenced from table "fktableforarray". + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------+-------- + {A} | 1 + {B} | 2 + {C} | 3 + {A,B,C} | 4 + (4 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Repeat a similar test using FLOAT keys coerced from INTEGER + CREATE TABLE PKTABLEFORARRAY ( ptest1 FLOAT PRIMARY KEY, ptest2 text ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1.0, 'Test 1'); + INSERT INTO PKTABLEFORARRAY VALUES (2.0, 'Test 2'); + INSERT INTO PKTABLEFORARRAY VALUES (3.0, 'Test 3'); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2,3], 4); + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[4], 5); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({4}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2,5], 6); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_ftest1_fkey" + DETAIL: Key (ftest1)=({1,2,5}) is not present in table "pktableforarray". + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Composite primary keys + CREATE TABLE PKTABLEFORARRAY ( id1 CHAR(1), id2 CHAR(1), ptest2 text, PRIMARY KEY (id1, id2) ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'A', 'Test A'); + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'B', 'Test B'); + INSERT INTO PKTABLEFORARRAY VALUES ('B', 'C', 'Test B'); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( fid1 CHAR(1), fid2 CHAR(1)[], ftest2 text, FOREIGN KEY (fid1, EACH fid2) REFERENCES PKTABLEFORARRAY); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('A', ARRAY['A','B'], '1'); + INSERT INTO FKTABLEFORARRAY VALUES ('B', ARRAY['C'], '2'); + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('A', ARRAY['A','B', 'C'], '3'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_fid1_fkey" + DETAIL: Key (fid1, fid2)=(A, {A,B,C}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY VALUES ('B', ARRAY['A'], '4'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_fid1_fkey" + DETAIL: Key (fid1, fid2)=(B, {A}) is not present in table "pktableforarray". + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Test foreign key arrays with composite type + CREATE TYPE INVOICEID AS (year_part INTEGER, progressive_part INTEGER); + CREATE TABLE PKTABLEFORARRAY ( id INVOICEID PRIMARY KEY, ptest2 text); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2010, 99), 'Last invoice for 2010'); + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2011, 1), 'First invoice for 2011'); + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2011, 2), 'Second invoice for 2011'); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, invoice_ids INVOICEID[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE SET NULL ON DELETE SET NULL, ftest2 TEXT ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2010,99)']::INVOICEID[], 'Product A'); + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,1)','(2011,2)']::INVOICEID[], 'Product B'); + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,2)']::INVOICEID[], 'Product C'); + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,99)']::INVOICEID[], 'Product A'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_invoice_ids_fkey" + DETAIL: Key (invoice_ids)=({"(2011,99)"}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,1)','(2010,1)']::INVOICEID[], 'Product B'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_invoice_ids_fkey" + DETAIL: Key (invoice_ids)=({"(2011,1)","(2010,1)"}) is not present in table "pktableforarray". + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + id | invoice_ids | ftest2 + ----+-------------------------+----------- + 1 | {"(2010,99)"} | Product A + 2 | {"(2011,1)","(2011,2)"} | Product B + 3 | {"(2011,2)"} | Product C + (3 rows) + + -- Delete a row from PK TABLE (ON DELETE SET NULL) + DELETE FROM PKTABLEFORARRAY WHERE id=ROW(2010,99); + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + id | invoice_ids | ftest2 + ----+-------------------------+----------- + 2 | {"(2011,1)","(2011,2)"} | Product B + 3 | {"(2011,2)"} | Product C + 1 | | Product A + (3 rows) + + -- Update a row from PK TABLE (ON UPDATE SET NULL) + UPDATE PKTABLEFORARRAY SET id=ROW(2011,99) WHERE id=ROW(2011,1); + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + id | invoice_ids | ftest2 + ----+--------------+----------- + 3 | {"(2011,2)"} | Product C + 1 | | Product A + 2 | | Product B + (3 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + DROP TYPE INVOICEID; + -- Repeat a similar test for SET DEFAULT actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] NOT NULL DEFAULT ARRAY[]::int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE SET DEFAULT ON DELETE SET DEFAULT, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,3], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2], 5); + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {1} | 1 + {2} | 2 + {3} | 3 + {2,3} | 4 + {1,2} | 5 + (5 rows) + + -- Delete a row from PK TABLE (ON DELETE SET DEFAULT) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {2} | 2 + {3} | 3 + {2,3} | 4 + {} | 1 + {} | 5 + (5 rows) + + -- Update a row from PK TABLE (ON UPDATE SET DEFAULT) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {2} | 2 + {} | 1 + {} | 5 + {} | 3 + {} | 4 + (5 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Repeat a similar test for CASCADE actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] NOT NULL DEFAULT ARRAY[]::int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,3], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2], 5); + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {1} | 1 + {2} | 2 + {3} | 3 + {2,3} | 4 + {1,2} | 5 + (5 rows) + + -- Delete a row from PK TABLE (ON DELETE CASCADE) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {2} | 2 + {3} | 3 + {2,3} | 4 + (3 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Test for EACH SET NULL actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE EACH SET NULL ON DELETE EACH SET NULL, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,2,1], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,3,3], 5); + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------+-------- + {1} | 1 + {2} | 2 + {3} | 3 + {2,2,1} | 4 + {1,3,3} | 5 + (5 rows) + + -- Delete a row from PK TABLE (ON DELETE EACH SET NULL) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ------------+-------- + {2} | 2 + {3} | 3 + {NULL} | 1 + {2,2,NULL} | 4 + {NULL,3,3} | 5 + (5 rows) + + -- Update a row from PK TABLE (ON UPDATE EACH SET NULL) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ------------------+-------- + {2} | 2 + {NULL} | 1 + {2,2,NULL} | 4 + {NULL} | 3 + {NULL,NULL,NULL} | 5 + (5 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Test for EACH CASCADE actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE EACH CASCADE ON DELETE EACH CASCADE, ftest2 int ); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,2,1], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,3,3], 5); + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + ---------+-------- + {1} | 1 + {2} | 2 + {3} | 3 + {2,2,1} | 4 + {1,3,3} | 5 + (5 rows) + + -- Delete a row from PK TABLE (ON DELETE EACH CASCADE) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {2} | 2 + {3} | 3 + {} | 1 + {2,2} | 4 + {3,3} | 5 + (5 rows) + + -- Update a row from PK TABLE (ON UPDATE EACH CASCADE) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + ftest1 | ftest2 + --------+-------- + {2} | 2 + {} | 1 + {2,2} | 4 + {5} | 3 + {5,5} | 5 + (5 rows) + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Check EACH CASCADE actions with cast + CREATE TABLE PKTABLEFORARRAY (c smallint PRIMARY KEY); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + CREATE TABLE FKTABLEFORARRAY (c int[] EACH REFERENCES PKTABLEFORARRAY + ON UPDATE EACH CASCADE + ON DELETE EACH CASCADE); + INSERT INTO PKTABLEFORARRAY VALUES (1), (2); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,2}'); + UPDATE PKTABLEFORARRAY SET c = 3 WHERE c = 2; + DELETE FROM PKTABLEFORARRAY WHERE c = 1; + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Check EACH SET NULL actions with cast + CREATE TABLE PKTABLEFORARRAY (c smallint PRIMARY KEY); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + CREATE TABLE FKTABLEFORARRAY (c int[] EACH REFERENCES PKTABLEFORARRAY + ON UPDATE EACH SET NULL + ON DELETE EACH SET NULL); + INSERT INTO PKTABLEFORARRAY VALUES (1), (2); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,2}'); + UPDATE PKTABLEFORARRAY SET c = 3 WHERE c = 2; + DELETE FROM PKTABLEFORARRAY WHERE c = 1; + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Check for an array column referencing another array column (NOT FOREIGN KEY ARRAY) + -- Create primary table with a primary key array + CREATE TABLE PKTABLEFORARRAY ( id INT[] PRIMARY KEY, ptest2 text); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, fids INT[] REFERENCES PKTABLEFORARRAY, ftest2 TEXT ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('{1,1}', 'A'); + INSERT INTO PKTABLEFORARRAY VALUES ('{1,2}', 'B'); + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{1,1}', 'Product A'); + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{1,2}', 'Product B'); + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{0,1}', 'Product C'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_fids_fkey" + DETAIL: Key (fids)=({0,1}) is not present in table "pktableforarray". + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{2,1}', 'Product D'); + ERROR: insert or update on table "fktableforarray" violates foreign key constraint "fktableforarray_fids_fkey" + DETAIL: Key (fids)=({2,1}) is not present in table "pktableforarray". + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + -- Check ARRAY actions on normal FK + -- Create primary table with a primary key array + CREATE TABLE PKTABLEFORARRAY ( id INT PRIMARY KEY, ptest2 text); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforarray_pkey" for table "pktableforarray" + -- Create an each foreign key without an array (ERROR) + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT EACH REFERENCES PKTABLEFORARRAY ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + ERROR: foreign key constraint "fktableforarray_ftest2_fkey" cannot be implemented + DETAIL: Type of key column "ftest2" is not an array type: integer + -- Create the foreign table with forbidden actions (ERROR) + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on EACH foreign keys + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON UPDATE EACH CASCADE ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on EACH foreign keys + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on EACH foreign keys + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON UPDATE EACH SET NULL ); + NOTICE: CREATE TABLE will create implicit sequence "fktableforarray_id_seq" for serial column "fktableforarray.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "fktableforarray_pkey" for table "fktableforarray" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on EACH foreign keys + -- Cleanup + DROP TABLE PKTABLEFORARRAY; + -- --------------------------------------- + -- Multi-column "EACH" foreign key tests + -- --------------------------------------- + -- Create DIM1 table with two-column primary key + CREATE TABLE DIM1 (X INTEGER NOT NULL, Y INTEGER NOT NULL, PRIMARY KEY (X, Y)); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "dim1_pkey" for table "dim1" + -- Populate DIM1 table pairs + INSERT INTO DIM1 SELECT x.t, x.t * y.t + FROM (SELECT generate_series(1, 10) AS t) x, + (SELECT generate_series(0, 10) AS t) y; + -- Test with TABLE declaration of an each foreign key constraint (NO ACTION) + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[], + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,2,4}'); -- FAILS (2 is not present) + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=(4, {0,2,4}) is not present in table "dim1". + INSERT INTO F1 VALUES (4, '{0,NULL,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + -- Try updates + UPDATE F1 SET y = '{0,2,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET y = '{0,2,3,4,6}' WHERE x = 2; -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=(2, {0,2,3,4,6}) is not present in table "dim1". + UPDATE F1 SET x = 20, y = '{0,2,3,4,6}' WHERE x = 2; -- FAILS (20 does not exist) + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=(20, {0,2,3,4,6}) is not present in table "dim1". + UPDATE F1 SET y = '{0,4,8}' WHERE x = 4; -- OK + UPDATE F1 SET y = '{0,5,NULL,10}' WHERE x = 5; -- OK + DROP TABLE F1; + -- Test with FOREIGN KEY after TABLE population + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[] + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,2,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + -- Try updates + UPDATE F1 SET y = '{0,2,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET y = '{0,2,3,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET x = 20, y = '{0,2,3,4,6}' WHERE x = 2; -- OK + -- Add foreign key (FAILS) + ALTER TABLE F1 ADD FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y); + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=(20, {0,2,3,4,6}) is not present in table "dim1". + DROP TABLE F1; + -- Limitation on EACH CASCADE an EACH SET NULL on multi-column keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH CASCADE ON UPDATE EACH CASCADE + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH CASCADE + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON UPDATE EACH CASCADE + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH SET NULL ON UPDATE EACH SET NULL + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH SET NULL + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON UPDATE EACH SET NULL + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f2_pkey" for table "f2" + ERROR: EACH CASCADE and EACH SET NULL actions are only available on single-column EACH foreign keys + -- Test of ON DELETE CASCADE and ON UPDATE SET NULL with multi-columns + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[], + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE CASCADE ON UPDATE SET NULL + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + -- Insert dummy dimension record + INSERT INTO DIM1 VALUES (199, 199); + INSERT INTO DIM1 VALUES (199, 200); + INSERT INTO DIM1 VALUES (199, 201); + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,NULL,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + INSERT INTO F1 VALUES (199, '{199, 200}'); -- OK + SELECT COUNT(*) FROM F1 WHERE x = 199; -- should return 1 + count + ------- + 1 + (1 row) + + DELETE FROM DIM1 WHERE x = 199 AND y = 200; -- should remove the whole row in F1 + SELECT COUNT(*) FROM F1 WHERE x = 199; -- should return 0 + count + ------- + 0 + (1 row) + + INSERT INTO F1 VALUES (199, '{199, 201}'); -- OK + SELECT COUNT(*) FROM F1 WHERE x = 199 AND y IS NULL; -- should return 0 + count + ------- + 0 + (1 row) + + UPDATE DIM1 SET y = 200 WHERE x = 199 AND y = 201; -- should set the whole row to NULL in F1 + SELECT COUNT(*) FROM F1 WHERE x = 199 AND y IS NULL; -- should return 1 + count + ------- + 1 + (1 row) + + DROP TABLE F1; + -- Test with TABLE declaration of a two-dim each foreign key constraint (NO ACTION) + CREATE TABLE F1 ( + x INTEGER[] PRIMARY KEY, y INTEGER[], + FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y) + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + -- Insert facts + INSERT INTO F1 VALUES ('{1}', '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES ('{1,2}', '{0,1,2,3,4,5}'); -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=({1,2}, {0,1,2,3,4,5}) is not present in table "dim1". + INSERT INTO F1 VALUES ('{1,2}', '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES ('{1,3}', '{0,3,9,12}'); -- FAILS ({1,12} is not present) + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=({1,3}, {0,3,9,12}) is not present in table "dim1". + INSERT INTO F1 VALUES ('{1,3}', '{0,3,9, NULL}'); -- OK + -- Try updates + UPDATE F1 SET y = '{0,2,4,6,8}' WHERE x = '{1,2}'; -- OK + UPDATE F1 SET y = '{0,2,3,4,6,8}' WHERE x = '{1,2}'; -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=({1,2}, {0,2,3,4,6,8}) is not present in table "dim1". + UPDATE F1 SET x = '{1,20}', y = '{0,2,3,4,6}' WHERE x = '{1,2}'; -- FAILS (20 does not exist) + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=({1,20}, {0,2,3,4,6}) is not present in table "dim1". + DROP TABLE F1; + -- Test with two-dim each FOREIGN KEY after TABLE population + CREATE TABLE F1 ( + x INTEGER[] PRIMARY KEY, y INTEGER[] + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + INSERT INTO F1 VALUES ('{1}', '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES ('{1,2}', '{0,2,4,6}'); -- OK + -- Add foreign key (OK) + ALTER TABLE F1 ADD FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y); + DROP TABLE F1; + CREATE TABLE F1 ( + x INTEGER[] PRIMARY KEY, y INTEGER[] + ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + INSERT INTO F1 VALUES ('{1,2}', '{0,1,2,3,4,5}'); + -- Add foreign key (FAILS) + ALTER TABLE F1 ADD FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y); + ERROR: insert or update on table "f1" violates foreign key constraint "f1_x_fkey" + DETAIL: Key (x, y)=({1,2}, {0,1,2,3,4,5}) is not present in table "dim1". + UPDATE F1 SET y = '{0,2,4,6,8,NULL}' WHERE x = '{1,2}'; -- OK + -- Add foreign key (OK) + ALTER TABLE F1 ADD FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y); + DROP TABLE F1; + -- Cleanup + DROP TABLE DIM1; + -- Check for potential name conflicts (with internal integrity checks) + CREATE TABLE x1(x1 int, x2 int, PRIMARY KEY(x1,x2)); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "x1_pkey" for table "x1" + INSERT INTO x1 VALUES + (1,4), + (1,5), + (2,4), + (2,5), + (3,6), + (3,7) + ; + CREATE TABLE x2(x1 int[], x2 int[], FOREIGN KEY(EACH x1, EACH x2) REFERENCES x1); + INSERT INTO x2 VALUES ('{1,2}','{4,5}'); + INSERT INTO x2 VALUES ('{1,2}','{5,6}'); -- FAILS + ERROR: insert or update on table "x2" violates foreign key constraint "x2_x1_fkey" + DETAIL: Key (x1, x2)=({1,2}, {5,6}) is not present in table "x1". + DROP TABLE x2; + CREATE TABLE x2(x1 int[], x2 int[]); + INSERT INTO x2 VALUES ('{1,2}','{4,5}'); + INSERT INTO x2 VALUES ('{1,2}','{5,6}'); + ALTER TABLE x2 ADD CONSTRAINT fk_const FOREIGN KEY(EACH x1, EACH x2) REFERENCES x1; -- FAILS + ERROR: insert or update on table "x2" violates foreign key constraint "fk_const" + DETAIL: Key (x1, x2)=({1,2}, {5,6}) is not present in table "x1". + DROP TABLE x2; + DROP TABLE x1; + -- --------------------------------------- + -- Multi-dimensional "EACH" foreign key tests + -- --------------------------------------- + -- Create DIM1 table with two-column primary key + CREATE TABLE DIM1 (X INTEGER NOT NULL PRIMARY KEY, + CODE TEXT NOT NULL UNIQUE); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "dim1_pkey" for table "dim1" + NOTICE: CREATE TABLE / UNIQUE will create implicit index "dim1_code_key" for table "dim1" + -- Populate DIM1 table pairs + INSERT INTO DIM1 SELECT t, 'DIM1-' || lpad(t::TEXT, 2, '0') + FROM (SELECT generate_series(1, 10)) x(t); + -- Test with TABLE declaration of an each foreign key constraint (NO ACTION) + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] EACH REFERENCES DIM1 + ); + NOTICE: CREATE TABLE will create implicit sequence "f1_id_seq" for serial column "f1.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_slots_fkey" + DETAIL: Key (slots)=({{NULL,1,NULL},{NULL,NULL,11},{NULL,NULL,6}}) is not present in table "dim1". + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + UPDATE F1 SET SLOTS = '{{NULL, 1, NULL}, {NULL, NULL, 3}, {7, 8, 10}}' WHERE ID = 1; -- OK + UPDATE F1 SET SLOTS = '{{100, 100, 100}, {NULL, NULL, 20}, {7, 8, 10}}' WHERE ID = 1; -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_slots_fkey" + DETAIL: Key (slots)=({{100,100,100},{NULL,NULL,20},{7,8,10}}) is not present in table "dim1". + DROP TABLE F1; + -- Test with postponed foreign key + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] + ); + NOTICE: CREATE TABLE will create implicit sequence "f1_id_seq" for serial column "f1.id" + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "f1_pkey" for table "f1" + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1; -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_slots_fkey" + DETAIL: Key (slots)=({{NULL,1,NULL},{NULL,NULL,11},{NULL,NULL,6}}) is not present in table "dim1". + DELETE FROM F1 WHERE ID = 2; -- REMOVE ISSUE + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1; -- NOW OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- FAILS + ERROR: insert or update on table "f1" violates foreign key constraint "f1_slots_fkey" + DETAIL: Key (slots)=({{NULL,1,NULL},{NULL,NULL,11},{NULL,NULL,6}}) is not present in table "dim1". + DROP TABLE F1; + -- Cleanup + DROP TABLE DIM1; + -- Leave tables in the database + CREATE TABLE PKTABLEFOREACHFK ( ptest1 int PRIMARY KEY, ptest2 text ); + NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "pktableforeachfk_pkey" for table "pktableforeachfk" + CREATE TABLE FKTABLEFOREACHFK ( ftest1 int[] EACH REFERENCES PKTABLEFOREACHFK, ftest2 int ); *** a/src/test/regress/parallel_schedule --- b/src/test/regress/parallel_schedule *************** *** 92,98 **** test: rules # ---------- # Another group of parallel tests # ---------- ! test: select_views portals_p2 foreign_key cluster dependency guc bitmapops combocid tsearch tsdicts foreign_data window xmlmap functional_deps advisory_lock json # ---------- # Another group of parallel tests --- 92,98 ---- # ---------- # Another group of parallel tests # ---------- ! test: select_views portals_p2 foreign_key cluster dependency guc bitmapops combocid tsearch tsdicts foreign_data window xmlmap functional_deps advisory_lock each_foreign_key # ---------- # Another group of parallel tests *** a/src/test/regress/serial_schedule --- b/src/test/regress/serial_schedule *************** *** 99,104 **** test: rules --- 99,105 ---- test: select_views test: portals_p2 test: foreign_key + test: each_foreign_key test: cluster test: dependency test: guc *** a/src/test/regress/sql/arrays.sql --- b/src/test/regress/sql/arrays.sql *************** *** 430,435 **** select unnest(array[1,2,3,4.5]::float8[]); --- 430,445 ---- select unnest(array[1,2,3,4.5]::numeric[]); select unnest(array[1,2,3,null,4,null,null,5,6]); select unnest(array[1,2,3,null,4,null,null,5,6]::text[]); + select array_remove(array[1,2,2,3], 2); + select array_remove(array[1,2,2,3], 5); + select array_remove(array[1,NULL,NULL,3], NULL); + select array_remove(array['A','C','D','C','R'], 'R'); + select array_remove(array[1,3,4,5,6,10], 5); + select array_remove('{{1,2,2},{1,4,3}}', 2); -- fails + select array_replace(array[1,2,5,4],5,3); + select array_replace(array[1,2,5,4],5,NULL); + select array_replace(array[1,2,NULL,4,NULL],NULL,5); + select array_replace(array['A','B','D','B'],'B','C'); -- Insert/update on a column that is array of composite *** /dev/null --- b/src/test/regress/sql/each_foreign_key.sql *************** *** 0 **** --- 1,765 ---- + -- EACH FK CONSTRAINTS (FOREIGN KEY ARRAYS) + -- + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 text ); + + -- Insert test data into PKTABLEFORARRAY + INSERT INTO PKTABLEFORARRAY VALUES (1, 'Test1'); + INSERT INTO PKTABLEFORARRAY VALUES (2, 'Test2'); + INSERT INTO PKTABLEFORARRAY VALUES (3, 'Test3'); + INSERT INTO PKTABLEFORARRAY VALUES (4, 'Test4'); + INSERT INTO PKTABLEFORARRAY VALUES (5, 'Test5'); + + -- Check alter table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAY; + + -- Check alter table with rows + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 1); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAY; + + -- Check alter table with failing rows + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[], ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{10,1}', 2); + ALTER TABLE FKTABLEFORARRAY ADD CONSTRAINT FKARRAY FOREIGN KEY (EACH ftest1) REFERENCES PKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAY; + + -- Check create table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + CREATE TABLE FKTABLEFORARRAYMDIM ( ftest1 int[][] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + CREATE TABLE FKTABLEFORARRAYNOTNULL ( ftest1 int[] NOT NULL EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + + -- Insert successful rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 3); + INSERT INTO FKTABLEFORARRAY VALUES ('{2}', 4); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 5); + INSERT INTO FKTABLEFORARRAY VALUES ('{3}', 6); + INSERT INTO FKTABLEFORARRAY VALUES ('{1}', 7); + INSERT INTO FKTABLEFORARRAY VALUES ('{4,5}', 8); + INSERT INTO FKTABLEFORARRAY VALUES ('{4,4}', 9); + INSERT INTO FKTABLEFORARRAY VALUES (NULL, 10); + INSERT INTO FKTABLEFORARRAY VALUES ('{}', 11); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,NULL}', 12); + INSERT INTO FKTABLEFORARRAY VALUES ('{NULL}', 13); + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{4,5},{1,2},{1,3}}', 14); + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{4,5},{NULL,2},{NULL,3}}', 15); + + -- Insert failed rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{6}', 16); + INSERT INTO FKTABLEFORARRAY VALUES ('{4,6}', 17); + INSERT INTO FKTABLEFORARRAY VALUES ('{6,NULL}', 18); + INSERT INTO FKTABLEFORARRAY VALUES ('{6,NULL,4,NULL}', 19); + INSERT INTO FKTABLEFORARRAYMDIM VALUES ('{{1,2},{6,NULL}}', 20); + INSERT INTO FKTABLEFORARRAYNOTNULL VALUES (NULL, 21); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (must fail due to ON DELETE NO ACTION) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (must fail due to ON UPDATE NO ACTION) + UPDATE PKTABLEFORARRAY SET ptest1=7 WHERE ptest1=1; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Check UPDATE on FKTABLE + UPDATE FKTABLEFORARRAY SET ftest1=ARRAY[1] WHERE ftest2=4; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + DROP TABLE FKTABLEFORARRAY; + DROP TABLE FKTABLEFORARRAYNOTNULL; + DROP TABLE FKTABLEFORARRAYMDIM; + + -- Allowed references with actions + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE SET NULL, ftest2 int ); + INSERT INTO PKTABLEFORARRAY VALUES (100, 'TO BE UPDATED'); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,100}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 0 + UPDATE PKTABLEFORARRAY SET ptest1 = 1000, ptest2 = 'TO BE REMOVED' WHERE ptest1 = 100; + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 1 + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE NO ACTION, ftest2 int ); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,1000}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 1 + DELETE FROM PKTABLEFORARRAY WHERE ptest1 = 1000; -- From the previous example + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 0 + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE NO ACTION, ftest2 int ); + INSERT INTO PKTABLEFORARRAY VALUES (100, 'TO BE REMOVED'); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,100}', 100); + SELECT COUNT(*) FROM FKTABLEFORARRAY; -- Should return 1 + DELETE FROM PKTABLEFORARRAY WHERE ptest1 = 100; + SELECT COUNT(*) FROM FKTABLEFORARRAY WHERE ftest1 IS NULL; -- Should return 1 + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE NO ACTION, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE RESTRICT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE SET DEFAULT, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE EACH CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE EACH SET NULL, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + + -- Not allowed references (ON UPDATE CASCADE) + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE NO ACTION ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE RESTRICT ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET DEFAULT ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE SET NULL ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ON UPDATE CASCADE, ftest2 int ); + DROP TABLE FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE PKTABLEFORARRAY; + + -- Check reference on empty table + CREATE TABLE PKTABLEFORARRAY (ptest1 int PRIMARY KEY); + CREATE TABLE FKTABLEFORARRAY (ftest1 int[] EACH REFERENCES PKTABLEFORARRAY); + INSERT INTO FKTABLEFORARRAY VALUES ('{}'); + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Repeat a similar test using CHAR(1) keys rather than INTEGER + CREATE TABLE PKTABLEFORARRAY ( ptest1 CHAR(1) PRIMARY KEY, ptest2 text ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'Test A'); + INSERT INTO PKTABLEFORARRAY VALUES ('B', 'Test B'); + INSERT INTO PKTABLEFORARRAY VALUES ('C', 'Test C'); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 char(1)[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE RESTRICT ON DELETE RESTRICT, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{"A"}', 1); + INSERT INTO FKTABLEFORARRAY VALUES ('{"B"}', 2); + INSERT INTO FKTABLEFORARRAY VALUES ('{"C"}', 3); + INSERT INTO FKTABLEFORARRAY VALUES ('{"A","B","C"}', 4); + + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('{"D"}', 5); + INSERT INTO FKTABLEFORARRAY VALUES ('{"A","B","D"}', 6); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (must fail due to ON DELETE RESTRICT) + DELETE FROM PKTABLEFORARRAY WHERE ptest1='A'; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (must fail due to ON UPDATE RESTRICT) + UPDATE PKTABLEFORARRAY SET ptest1='D' WHERE ptest1='B'; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Repeat a similar test using FLOAT keys coerced from INTEGER + CREATE TABLE PKTABLEFORARRAY ( ptest1 FLOAT PRIMARY KEY, ptest2 text ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1.0, 'Test 1'); + INSERT INTO PKTABLEFORARRAY VALUES (2.0, 'Test 2'); + INSERT INTO PKTABLEFORARRAY VALUES (3.0, 'Test 3'); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2,3], 4); + + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[4], 5); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2,5], 6); + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Composite primary keys + CREATE TABLE PKTABLEFORARRAY ( id1 CHAR(1), id2 CHAR(1), ptest2 text, PRIMARY KEY (id1, id2) ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'A', 'Test A'); + INSERT INTO PKTABLEFORARRAY VALUES ('A', 'B', 'Test B'); + INSERT INTO PKTABLEFORARRAY VALUES ('B', 'C', 'Test B'); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( fid1 CHAR(1), fid2 CHAR(1)[], ftest2 text, FOREIGN KEY (fid1, EACH fid2) REFERENCES PKTABLEFORARRAY); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('A', ARRAY['A','B'], '1'); + INSERT INTO FKTABLEFORARRAY VALUES ('B', ARRAY['C'], '2'); + + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES ('A', ARRAY['A','B', 'C'], '3'); + INSERT INTO FKTABLEFORARRAY VALUES ('B', ARRAY['A'], '4'); + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Test foreign key arrays with composite type + CREATE TYPE INVOICEID AS (year_part INTEGER, progressive_part INTEGER); + CREATE TABLE PKTABLEFORARRAY ( id INVOICEID PRIMARY KEY, ptest2 text); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2010, 99), 'Last invoice for 2010'); + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2011, 1), 'First invoice for 2011'); + INSERT INTO PKTABLEFORARRAY VALUES (ROW(2011, 2), 'Second invoice for 2011'); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, invoice_ids INVOICEID[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE SET NULL ON DELETE SET NULL, ftest2 TEXT ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2010,99)']::INVOICEID[], 'Product A'); + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,1)','(2011,2)']::INVOICEID[], 'Product B'); + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,2)']::INVOICEID[], 'Product C'); + + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,99)']::INVOICEID[], 'Product A'); + INSERT INTO FKTABLEFORARRAY(invoice_ids, ftest2) VALUES (ARRAY['(2011,1)','(2010,1)']::INVOICEID[], 'Product B'); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE SET NULL) + DELETE FROM PKTABLEFORARRAY WHERE id=ROW(2010,99); + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (ON UPDATE SET NULL) + UPDATE PKTABLEFORARRAY SET id=ROW(2011,99) WHERE id=ROW(2011,1); + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + DROP TYPE INVOICEID; + + -- Repeat a similar test for SET DEFAULT actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] NOT NULL DEFAULT ARRAY[]::int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE SET DEFAULT ON DELETE SET DEFAULT, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,3], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2], 5); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE SET DEFAULT) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (ON UPDATE SET DEFAULT) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Repeat a similar test for CASCADE actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] NOT NULL DEFAULT ARRAY[]::int[] EACH REFERENCES PKTABLEFORARRAY ON DELETE CASCADE, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,3], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,2], 5); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE CASCADE) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Test for EACH SET NULL actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE EACH SET NULL ON DELETE EACH SET NULL, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,2,1], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,3,3], 5); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE EACH SET NULL) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (ON UPDATE EACH SET NULL) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Test for EACH CASCADE actions + CREATE TABLE PKTABLEFORARRAY ( ptest1 int PRIMARY KEY, ptest2 int ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES (1, 1); + INSERT INTO PKTABLEFORARRAY VALUES (2, 2); + INSERT INTO PKTABLEFORARRAY VALUES (3, 3); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( ftest1 int[] EACH REFERENCES PKTABLEFORARRAY ON UPDATE EACH CASCADE ON DELETE EACH CASCADE, ftest2 int ); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1], 1); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2], 2); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[3], 3); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[2,2,1], 4); + INSERT INTO FKTABLEFORARRAY VALUES (ARRAY[1,3,3], 5); + + -- Check FKTABLE + SELECT * FROM FKTABLEFORARRAY; + + -- Delete a row from PK TABLE (ON DELETE EACH CASCADE) + DELETE FROM PKTABLEFORARRAY WHERE ptest1=1; + + -- Check FKTABLE for removal of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Update a row from PK TABLE (ON UPDATE EACH CASCADE) + UPDATE PKTABLEFORARRAY SET ptest1=5 WHERE ptest1=3; + + -- Check FKTABLE for update of matched row + SELECT * FROM FKTABLEFORARRAY; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Check EACH CASCADE actions with cast + CREATE TABLE PKTABLEFORARRAY (c smallint PRIMARY KEY); + CREATE TABLE FKTABLEFORARRAY (c int[] EACH REFERENCES PKTABLEFORARRAY + ON UPDATE EACH CASCADE + ON DELETE EACH CASCADE); + INSERT INTO PKTABLEFORARRAY VALUES (1), (2); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,2}'); + UPDATE PKTABLEFORARRAY SET c = 3 WHERE c = 2; + DELETE FROM PKTABLEFORARRAY WHERE c = 1; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Check EACH SET NULL actions with cast + CREATE TABLE PKTABLEFORARRAY (c smallint PRIMARY KEY); + CREATE TABLE FKTABLEFORARRAY (c int[] EACH REFERENCES PKTABLEFORARRAY + ON UPDATE EACH SET NULL + ON DELETE EACH SET NULL); + INSERT INTO PKTABLEFORARRAY VALUES (1), (2); + INSERT INTO FKTABLEFORARRAY VALUES ('{1,2}'); + UPDATE PKTABLEFORARRAY SET c = 3 WHERE c = 2; + DELETE FROM PKTABLEFORARRAY WHERE c = 1; + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Check for an array column referencing another array column (NOT FOREIGN KEY ARRAY) + -- Create primary table with a primary key array + CREATE TABLE PKTABLEFORARRAY ( id INT[] PRIMARY KEY, ptest2 text); + + -- Create the foreign table + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, fids INT[] REFERENCES PKTABLEFORARRAY, ftest2 TEXT ); + + -- Populate the primary table + INSERT INTO PKTABLEFORARRAY VALUES ('{1,1}', 'A'); + INSERT INTO PKTABLEFORARRAY VALUES ('{1,2}', 'B'); + + -- Insert valid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{1,1}', 'Product A'); + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{1,2}', 'Product B'); + + -- Insert invalid rows into FK TABLE + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{0,1}', 'Product C'); + INSERT INTO FKTABLEFORARRAY (fids, ftest2) VALUES ('{2,1}', 'Product D'); + + -- Cleanup + DROP TABLE FKTABLEFORARRAY; + DROP TABLE PKTABLEFORARRAY; + + -- Check ARRAY actions on normal FK + -- Create primary table with a primary key array + CREATE TABLE PKTABLEFORARRAY ( id INT PRIMARY KEY, ptest2 text); + + -- Create an each foreign key without an array (ERROR) + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT EACH REFERENCES PKTABLEFORARRAY ); + + -- Create the foreign table with forbidden actions (ERROR) + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON DELETE EACH CASCADE ); + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON UPDATE EACH CASCADE ); + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON DELETE EACH SET NULL ); + CREATE TABLE FKTABLEFORARRAY ( id SERIAL PRIMARY KEY, ftest2 INT[] REFERENCES PKTABLEFORARRAY ON UPDATE EACH SET NULL ); + + -- Cleanup + DROP TABLE PKTABLEFORARRAY; + + + -- --------------------------------------- + -- Multi-column "EACH" foreign key tests + -- --------------------------------------- + + -- Create DIM1 table with two-column primary key + CREATE TABLE DIM1 (X INTEGER NOT NULL, Y INTEGER NOT NULL, PRIMARY KEY (X, Y)); + -- Populate DIM1 table pairs + INSERT INTO DIM1 SELECT x.t, x.t * y.t + FROM (SELECT generate_series(1, 10) AS t) x, + (SELECT generate_series(0, 10) AS t) y; + + + -- Test with TABLE declaration of an each foreign key constraint (NO ACTION) + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[], + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) + ); + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,2,4}'); -- FAILS (2 is not present) + INSERT INTO F1 VALUES (4, '{0,NULL,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + -- Try updates + UPDATE F1 SET y = '{0,2,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET y = '{0,2,3,4,6}' WHERE x = 2; -- FAILS + UPDATE F1 SET x = 20, y = '{0,2,3,4,6}' WHERE x = 2; -- FAILS (20 does not exist) + UPDATE F1 SET y = '{0,4,8}' WHERE x = 4; -- OK + UPDATE F1 SET y = '{0,5,NULL,10}' WHERE x = 5; -- OK + DROP TABLE F1; + + + -- Test with FOREIGN KEY after TABLE population + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[] + ); + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,2,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + -- Try updates + UPDATE F1 SET y = '{0,2,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET y = '{0,2,3,4,6}' WHERE x = 2; -- OK + UPDATE F1 SET x = 20, y = '{0,2,3,4,6}' WHERE x = 2; -- OK + -- Add foreign key (FAILS) + ALTER TABLE F1 ADD FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y); + DROP TABLE F1; + + -- Limitation on EACH CASCADE an EACH SET NULL on multi-column keys + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH CASCADE ON UPDATE EACH CASCADE + ); + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH CASCADE + ); + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH CASCADE + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON UPDATE EACH CASCADE + ); + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH SET NULL ON UPDATE EACH SET NULL + ); + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE EACH SET NULL + ); + CREATE TABLE F2 ( + x INTEGER PRIMARY KEY, y INTEGER[], + -- FAILS (multi-column foreign key forbids EACH SET NULL + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON UPDATE EACH SET NULL + ); + + -- Test of ON DELETE CASCADE and ON UPDATE SET NULL with multi-columns + CREATE TABLE F1 ( + x INTEGER PRIMARY KEY, y INTEGER[], + FOREIGN KEY (x, EACH y) REFERENCES DIM1(x, y) ON DELETE CASCADE ON UPDATE SET NULL + ); + -- Insert dummy dimension record + INSERT INTO DIM1 VALUES (199, 199); + INSERT INTO DIM1 VALUES (199, 200); + INSERT INTO DIM1 VALUES (199, 201); + -- Insert facts + INSERT INTO F1 VALUES (1, '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES (2, '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES (3, '{0,3,6,9,0,3,6,9,0,0,0,0,9,9}'); -- OK (multiple occurrences) + INSERT INTO F1 VALUES (4, '{0,NULL,4}'); -- OK + INSERT INTO F1 VALUES (5, '{0,NULL,5}'); -- OK + INSERT INTO F1 VALUES (199, '{199, 200}'); -- OK + SELECT COUNT(*) FROM F1 WHERE x = 199; -- should return 1 + DELETE FROM DIM1 WHERE x = 199 AND y = 200; -- should remove the whole row in F1 + SELECT COUNT(*) FROM F1 WHERE x = 199; -- should return 0 + INSERT INTO F1 VALUES (199, '{199, 201}'); -- OK + SELECT COUNT(*) FROM F1 WHERE x = 199 AND y IS NULL; -- should return 0 + UPDATE DIM1 SET y = 200 WHERE x = 199 AND y = 201; -- should set the whole row to NULL in F1 + SELECT COUNT(*) FROM F1 WHERE x = 199 AND y IS NULL; -- should return 1 + DROP TABLE F1; + + + -- Test with TABLE declaration of a two-dim each foreign key constraint (NO ACTION) + CREATE TABLE F1 ( + x INTEGER[] PRIMARY KEY, y INTEGER[], + FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y) + ); + -- Insert facts + INSERT INTO F1 VALUES ('{1}', '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES ('{1,2}', '{0,1,2,3,4,5}'); -- FAILS + INSERT INTO F1 VALUES ('{1,2}', '{0,2,4,6}'); -- OK + INSERT INTO F1 VALUES ('{1,3}', '{0,3,9,12}'); -- FAILS ({1,12} is not present) + INSERT INTO F1 VALUES ('{1,3}', '{0,3,9, NULL}'); -- OK + -- Try updates + UPDATE F1 SET y = '{0,2,4,6,8}' WHERE x = '{1,2}'; -- OK + UPDATE F1 SET y = '{0,2,3,4,6,8}' WHERE x = '{1,2}'; -- FAILS + UPDATE F1 SET x = '{1,20}', y = '{0,2,3,4,6}' WHERE x = '{1,2}'; -- FAILS (20 does not exist) + DROP TABLE F1; + + + -- Test with two-dim each FOREIGN KEY after TABLE population + CREATE TABLE F1 ( + x INTEGER[] PRIMARY KEY, y INTEGER[] + ); + INSERT INTO F1 VALUES ('{1}', '{0,1,2,3,4,5}'); -- OK + INSERT INTO F1 VALUES ('{1,2}', '{0,2,4,6}'); -- OK + -- Add foreign key (OK) + ALTER TABLE F1 ADD FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y); + DROP TABLE F1; + CREATE TABLE F1 ( + x INTEGER[] PRIMARY KEY, y INTEGER[] + ); + INSERT INTO F1 VALUES ('{1,2}', '{0,1,2,3,4,5}'); + -- Add foreign key (FAILS) + ALTER TABLE F1 ADD FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y); + UPDATE F1 SET y = '{0,2,4,6,8,NULL}' WHERE x = '{1,2}'; -- OK + -- Add foreign key (OK) + ALTER TABLE F1 ADD FOREIGN KEY (EACH x, EACH y) REFERENCES DIM1(x, y); + DROP TABLE F1; + + -- Cleanup + DROP TABLE DIM1; + + -- Check for potential name conflicts (with internal integrity checks) + CREATE TABLE x1(x1 int, x2 int, PRIMARY KEY(x1,x2)); + INSERT INTO x1 VALUES + (1,4), + (1,5), + (2,4), + (2,5), + (3,6), + (3,7) + ; + CREATE TABLE x2(x1 int[], x2 int[], FOREIGN KEY(EACH x1, EACH x2) REFERENCES x1); + INSERT INTO x2 VALUES ('{1,2}','{4,5}'); + INSERT INTO x2 VALUES ('{1,2}','{5,6}'); -- FAILS + DROP TABLE x2; + CREATE TABLE x2(x1 int[], x2 int[]); + INSERT INTO x2 VALUES ('{1,2}','{4,5}'); + INSERT INTO x2 VALUES ('{1,2}','{5,6}'); + ALTER TABLE x2 ADD CONSTRAINT fk_const FOREIGN KEY(EACH x1, EACH x2) REFERENCES x1; -- FAILS + DROP TABLE x2; + DROP TABLE x1; + + + -- --------------------------------------- + -- Multi-dimensional "EACH" foreign key tests + -- --------------------------------------- + + -- Create DIM1 table with two-column primary key + CREATE TABLE DIM1 (X INTEGER NOT NULL PRIMARY KEY, + CODE TEXT NOT NULL UNIQUE); + -- Populate DIM1 table pairs + INSERT INTO DIM1 SELECT t, 'DIM1-' || lpad(t::TEXT, 2, '0') + FROM (SELECT generate_series(1, 10)) x(t); + + -- Test with TABLE declaration of an each foreign key constraint (NO ACTION) + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] EACH REFERENCES DIM1 + ); + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- FAILS + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + UPDATE F1 SET SLOTS = '{{NULL, 1, NULL}, {NULL, NULL, 3}, {7, 8, 10}}' WHERE ID = 1; -- OK + UPDATE F1 SET SLOTS = '{{100, 100, 100}, {NULL, NULL, 20}, {7, 8, 10}}' WHERE ID = 1; -- FAILS + DROP TABLE F1; + + -- Test with postponed foreign key + CREATE TABLE F1 ( + ID SERIAL PRIMARY KEY, + SLOTS INTEGER[3][3] + ); + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 3}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{{1, 2, 3}, {4, 5, 6}, {7, 8, 9}}'); -- OK + INSERT INTO F1(SLOTS) VALUES ('{1, 2, 3, 4, 5, 6, 7, 8, 9}'); -- OK + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1; -- FAILS + DELETE FROM F1 WHERE ID = 2; -- REMOVE ISSUE + ALTER TABLE F1 ADD FOREIGN KEY (EACH SLOTS) REFERENCES DIM1; -- NOW OK + INSERT INTO F1(SLOTS) VALUES ('{{NULL, 1, NULL}, {NULL, NULL, 11}, {NULL, NULL, 6}}'); -- FAILS + DROP TABLE F1; + + -- Cleanup + DROP TABLE DIM1; + + -- Leave tables in the database + CREATE TABLE PKTABLEFOREACHFK ( ptest1 int PRIMARY KEY, ptest2 text ); + CREATE TABLE FKTABLEFOREACHFK ( ftest1 int[] EACH REFERENCES PKTABLEFOREACHFK, ftest2 int );