Re: [HACKERS] Domains and arrays and composites, oh my

Поиск
Список
Период
Сортировка
От Tom Lane
Тема Re: [HACKERS] Domains and arrays and composites, oh my
Дата
Msg-id 29279.1508446013@sss.pgh.pa.us
обсуждение исходный текст
Ответ на Re: [HACKERS] Domains and arrays and composites, oh my  (Tom Lane <tgl@sss.pgh.pa.us>)
Ответы Re: [HACKERS] Domains and arrays and composites, oh my  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-hackers
I wrote:
> Andrew Dunstan <andrew.dunstan@2ndquadrant.com> writes:
>> On 09/28/2017 01:02 PM, Tom Lane wrote:
>>>> I do think that treating a function returning a domain-over-composite
>>>> differently from one returning a base composite is a POLA. We'd be very
>>>> hard put to explain the reasons for it to an end user.

>>> Do you have any thoughts about how we ought to resolve that?

>> Not offhand. Maybe we need to revisit the decision not to modify the
>> executor at all.

> I think it's more of a parse analysis change: the issue is whether to
> smash a function's result type to base when determining whether it emits
> columns.  Maybe we could just do that in that context, and otherwise leave
> domains alone.

After fooling with that for awhile, I concluded that the only reasonable
path forward is to go ahead and modify the behavior of
get_expr_result_type and sibling routines.  While this fixes the parser
behavior to be pretty much what I think we want, it means that we've got
holes to fill in a lot of other places.  Most of them will manifest as
unexpected "domaintypename is not a composite type" errors, but there
are definitely places where the net effect is to silently fail to enforce
domain constraints against a constructed row value :-(.  In the attached
still-WIP patch, I think that I've got most of the core code fixed, but
there are at least these holes remaining to fill:

* json_populate_record and sibling routines won't enforce domain
constraints; depending on how they're called, you might or might not
get a "not a composite type" error.  This is because they use two
different methods for getting the target type OID depending on whether
the input prototype record is NULL.  Maybe that was a bad idea.
(I'm disinclined to try to fix this code right now since there are
pending bug fixes nearby; better to wait till that dust settles.)

* Ditto for hstore's populate_record, which is pretty much same logic.

* plpgsql mostly seems to work, but not quite 100%: RETURN QUERY will
fail to enforce domain constraints if the return type is domain over
composite.  It also still needs feature extension to handle d-over-c
variables more fully (e.g. allow field assignment).

* I haven't looked at the other PLs much; I believe they will mostly
fail safe with "not a composite type" errors, but I wouldn't swear
that all code paths will.

It seems like this is probably the way forward, but I'm slightly
discouraged by the fact that the patch footprint is getting bigger
and there are paths where we can get domain-enforcement omissions
rather than something more benign.  Still, we had lots of
domain-enforcement omissions in the early days of the existing
domain feature, if memory serves.  Maybe we should just accept
that working through that will be a process.

>> One thought I had was that we could invent a new return
>> type of TYPEFUNC_DOMAIN_COMPOSITE so there would be less danger of a PL
>> just treating it as an unconstrained base type as it might do if it saw
>> TYPEFUNC_COMPOSITE.

> Hmm.  That would be a way of forcing the issue, no doubt ...

I did that, but it turns out not to help much; turns out a lot of the
broken code is doing stuff on the basis of type_is_rowtype(), which
this patch allows to return true for domains over composite.  Maybe
we should undo that and invent a separate type_is_rowtype_or_domain()
function to be used only by repaired code, but that seems pretty ugly :-(

Anyway, PFA an updated patch that also fixes some conflicts with the
already-committed arrays-of-domains patch.

            regards, tom lane

diff --git a/src/backend/catalog/pg_inherits.c b/src/backend/catalog/pg_inherits.c
index 245a374..1bd8a58 100644
*** a/src/backend/catalog/pg_inherits.c
--- b/src/backend/catalog/pg_inherits.c
*************** has_superclass(Oid relationId)
*** 301,306 ****
--- 301,311 ----
  /*
   * Given two type OIDs, determine whether the first is a complex type
   * (class type) that inherits from the second.
+  *
+  * This essentially asks whether the first type is guaranteed to be coercible
+  * to the second.  Therefore, we allow the first type to be a domain over a
+  * complex type that inherits from the second; that creates no difficulties.
+  * But the second type cannot be a domain.
   */
  bool
  typeInheritsFrom(Oid subclassTypeId, Oid superclassTypeId)
*************** typeInheritsFrom(Oid subclassTypeId, Oid
*** 314,322 ****
      ListCell   *queue_item;

      /* We need to work with the associated relation OIDs */
!     subclassRelid = typeidTypeRelid(subclassTypeId);
      if (subclassRelid == InvalidOid)
!         return false;            /* not a complex type */
      superclassRelid = typeidTypeRelid(superclassTypeId);
      if (superclassRelid == InvalidOid)
          return false;            /* not a complex type */
--- 319,327 ----
      ListCell   *queue_item;

      /* We need to work with the associated relation OIDs */
!     subclassRelid = typeOrDomainTypeRelid(subclassTypeId);
      if (subclassRelid == InvalidOid)
!         return false;            /* not a complex type or domain over one */
      superclassRelid = typeidTypeRelid(superclassTypeId);
      if (superclassRelid == InvalidOid)
          return false;            /* not a complex type */
diff --git a/src/backend/catalog/pg_proc.c b/src/backend/catalog/pg_proc.c
index 571856e..47916cf 100644
*** a/src/backend/catalog/pg_proc.c
--- b/src/backend/catalog/pg_proc.c
*************** ProcedureCreate(const char *procedureNam
*** 262,268 ****
       */
      if (parameterCount == 1 &&
          OidIsValid(parameterTypes->values[0]) &&
!         (relid = typeidTypeRelid(parameterTypes->values[0])) != InvalidOid &&
          get_attnum(relid, procedureName) != InvalidAttrNumber)
          ereport(ERROR,
                  (errcode(ERRCODE_DUPLICATE_COLUMN),
--- 262,268 ----
       */
      if (parameterCount == 1 &&
          OidIsValid(parameterTypes->values[0]) &&
!         (relid = typeOrDomainTypeRelid(parameterTypes->values[0])) != InvalidOid &&
          get_attnum(relid, procedureName) != InvalidAttrNumber)
          ereport(ERROR,
                  (errcode(ERRCODE_DUPLICATE_COLUMN),
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index c1b87e0..7df942b 100644
*** a/src/backend/commands/typecmds.c
--- b/src/backend/commands/typecmds.c
*************** DefineDomain(CreateDomainStmt *stmt)
*** 798,810 ****
      basetypeoid = HeapTupleGetOid(typeTup);

      /*
!      * Base type must be a plain base type, another domain, an enum or a range
!      * type. Domains over pseudotypes would create a security hole.  Domains
!      * over composite types might be made to work in the future, but not
!      * today.
       */
      typtype = baseType->typtype;
      if (typtype != TYPTYPE_BASE &&
          typtype != TYPTYPE_DOMAIN &&
          typtype != TYPTYPE_ENUM &&
          typtype != TYPTYPE_RANGE)
--- 798,813 ----
      basetypeoid = HeapTupleGetOid(typeTup);

      /*
!      * Base type must be a plain base type, a composite type, another domain,
!      * an enum or a range type.  Domains over pseudotypes would create a
!      * security hole.  (It would be shorter to code this to just check for
!      * pseudotypes; but it seems safer to call out the specific typtypes that
!      * are supported, rather than assume that all future typtypes would be
!      * automatically supported.)
       */
      typtype = baseType->typtype;
      if (typtype != TYPTYPE_BASE &&
+         typtype != TYPTYPE_COMPOSITE &&
          typtype != TYPTYPE_DOMAIN &&
          typtype != TYPTYPE_ENUM &&
          typtype != TYPTYPE_RANGE)
diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c
index c5e97ef..a0f537b 100644
*** a/src/backend/executor/execExprInterp.c
--- b/src/backend/executor/execExprInterp.c
*************** ExecEvalWholeRowVar(ExprState *state, Ex
*** 3469,3476 ****
               * generates an INT4 NULL regardless of the dropped column type).
               * If we find a dropped column and cannot verify that case (1)
               * holds, we have to use the slow path to check (2) for each row.
               */
!             var_tupdesc = lookup_rowtype_tupdesc(variable->vartype, -1);

              slot_tupdesc = slot->tts_tupleDescriptor;

--- 3469,3480 ----
               * generates an INT4 NULL regardless of the dropped column type).
               * If we find a dropped column and cannot verify that case (1)
               * holds, we have to use the slow path to check (2) for each row.
+              *
+              * If vartype is a domain over composite, just look through that
+              * to the base composite type.
               */
!             var_tupdesc = lookup_rowtype_tupdesc_domain(variable->vartype,
!                                                         -1, false);

              slot_tupdesc = slot->tts_tupleDescriptor;

diff --git a/src/backend/executor/execSRF.c b/src/backend/executor/execSRF.c
index cce771d..aabe26c 100644
*** a/src/backend/executor/execSRF.c
--- b/src/backend/executor/execSRF.c
*************** init_sexpr(Oid foid, Oid input_collation
*** 734,740 ****
          /* Must save tupdesc in sexpr's context */
          oldcontext = MemoryContextSwitchTo(sexprCxt);

!         if (functypclass == TYPEFUNC_COMPOSITE)
          {
              /* Composite data type, e.g. a table's row type */
              Assert(tupdesc);
--- 734,741 ----
          /* Must save tupdesc in sexpr's context */
          oldcontext = MemoryContextSwitchTo(sexprCxt);

!         if (functypclass == TYPEFUNC_COMPOSITE ||
!             functypclass == TYPEFUNC_COMPOSITE_DOMAIN)
          {
              /* Composite data type, e.g. a table's row type */
              Assert(tupdesc);
diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c
index 42a4ca9..e734bd9 100644
*** a/src/backend/executor/functions.c
--- b/src/backend/executor/functions.c
*************** check_sql_fn_retval(Oid func_id, Oid ret
*** 1711,1717 ****
              }
          }

!         /* Is the rowtype fixed, or determined only at runtime? */
          if (get_func_result_type(func_id, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
          {
              /*
--- 1711,1725 ----
              }
          }

!         /*
!          * Is the rowtype fixed, or determined only at runtime?
!          *
!          * Note: you might expect that TYPEFUNC_COMPOSITE_DOMAIN should be
!          * treated like TYPEFUNC_COMPOSITE, but we intentionally don't.  This
!          * is because SQL functions don't provide any implicit casting to the
!          * result type, so there is no way to produce a domain-over-composite
!          * result except by computing it as an explicit single-column result.
!          */
          if (get_func_result_type(func_id, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
          {
              /*
diff --git a/src/backend/executor/nodeFunctionscan.c b/src/backend/executor/nodeFunctionscan.c
index 9f87a7e..de476ac 100644
*** a/src/backend/executor/nodeFunctionscan.c
--- b/src/backend/executor/nodeFunctionscan.c
*************** ExecInitFunctionScan(FunctionScan *node,
*** 383,389 ****
                                              &funcrettype,
                                              &tupdesc);

!         if (functypclass == TYPEFUNC_COMPOSITE)
          {
              /* Composite data type, e.g. a table's row type */
              Assert(tupdesc);
--- 383,390 ----
                                              &funcrettype,
                                              &tupdesc);

!         if (functypclass == TYPEFUNC_COMPOSITE ||
!             functypclass == TYPEFUNC_COMPOSITE_DOMAIN)
          {
              /* Composite data type, e.g. a table's row type */
              Assert(tupdesc);
diff --git a/src/backend/nodes/makefuncs.c b/src/backend/nodes/makefuncs.c
index b58eb0f..7a67653 100644
*** a/src/backend/nodes/makefuncs.c
--- b/src/backend/nodes/makefuncs.c
*************** makeVarFromTargetEntry(Index varno,
*** 120,127 ****
   * table entry, and varattno == 0 to signal that it references the whole
   * tuple.  (Use of zero here is unclean, since it could easily be confused
   * with error cases, but it's not worth changing now.)  The vartype indicates
!  * a rowtype; either a named composite type, or RECORD.  This function
!  * encapsulates the logic for determining the correct rowtype OID to use.
   *
   * If allowScalar is true, then for the case where the RTE is a single function
   * returning a non-composite result type, we produce a normal Var referencing
--- 120,129 ----
   * table entry, and varattno == 0 to signal that it references the whole
   * tuple.  (Use of zero here is unclean, since it could easily be confused
   * with error cases, but it's not worth changing now.)  The vartype indicates
!  * a rowtype; either a named composite type, or a domain over a named
!  * composite type (only possible if the RTE is a function returning that),
!  * or RECORD.  This function encapsulates the logic for determining the
!  * correct rowtype OID to use.
   *
   * If allowScalar is true, then for the case where the RTE is a single function
   * returning a non-composite result type, we produce a normal Var referencing
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 7961362..1a82749 100644
*** a/src/backend/optimizer/util/clauses.c
--- b/src/backend/optimizer/util/clauses.c
*************** CommuteRowCompareExpr(RowCompareExpr *cl
*** 2356,2361 ****
--- 2356,2365 ----
   * is still what it was when the expression was parsed.  This is needed to
   * guard against improper simplification after ALTER COLUMN TYPE.  (XXX we
   * may well need to make similar checks elsewhere?)
+  *
+  * rowtypeid may come from a whole-row Var, and therefore it can be a domain
+  * over composite, but for this purpose we only care about checking the type
+  * of a contained field.
   */
  static bool
  rowtype_field_matches(Oid rowtypeid, int fieldnum,
*************** rowtype_field_matches(Oid rowtypeid, int
*** 2368,2374 ****
      /* No issue for RECORD, since there is no way to ALTER such a type */
      if (rowtypeid == RECORDOID)
          return true;
!     tupdesc = lookup_rowtype_tupdesc(rowtypeid, -1);
      if (fieldnum <= 0 || fieldnum > tupdesc->natts)
      {
          ReleaseTupleDesc(tupdesc);
--- 2372,2378 ----
      /* No issue for RECORD, since there is no way to ALTER such a type */
      if (rowtypeid == RECORDOID)
          return true;
!     tupdesc = lookup_rowtype_tupdesc_domain(rowtypeid, -1, false);
      if (fieldnum <= 0 || fieldnum > tupdesc->natts)
      {
          ReleaseTupleDesc(tupdesc);
diff --git a/src/backend/parser/parse_coerce.c b/src/backend/parser/parse_coerce.c
index 53457dc..def41b3 100644
*** a/src/backend/parser/parse_coerce.c
--- b/src/backend/parser/parse_coerce.c
*************** coerce_type(ParseState *pstate, Node *no
*** 499,507 ****
--- 499,524 ----
           * Input class type is a subclass of target, so generate an
           * appropriate runtime conversion (removing unneeded columns and
           * possibly rearranging the ones that are wanted).
+          *
+          * We will also get here when the input is a domain over a subclass of
+          * the target type.  To keep life simple for the executor, we define
+          * ConvertRowtypeExpr as only working between regular composite types;
+          * therefore, in such cases insert a RelabelType to smash the input
+          * expression down to its base type.
           */
+         Oid            baseTypeId = getBaseType(inputTypeId);
          ConvertRowtypeExpr *r = makeNode(ConvertRowtypeExpr);

+         if (baseTypeId != inputTypeId)
+         {
+             RelabelType *rt = makeRelabelType((Expr *) node,
+                                               baseTypeId, -1,
+                                               InvalidOid,
+                                               COERCE_IMPLICIT_CAST);
+
+             rt->location = location;
+             node = (Node *) rt;
+         }
          r->arg = (Expr *) node;
          r->resulttype = targetTypeId;
          r->convertformat = cformat;
*************** coerce_record_to_complex(ParseState *pst
*** 966,971 ****
--- 983,990 ----
                           int location)
  {
      RowExpr    *rowexpr;
+     Oid            baseTypeId;
+     int32        baseTypeMod = -1;
      TupleDesc    tupdesc;
      List       *args = NIL;
      List       *newargs;
*************** coerce_record_to_complex(ParseState *pst
*** 1001,1007 ****
                          format_type_be(targetTypeId)),
                   parser_coercion_errposition(pstate, location, node)));

!     tupdesc = lookup_rowtype_tupdesc(targetTypeId, -1);
      newargs = NIL;
      ucolno = 1;
      arg = list_head(args);
--- 1020,1033 ----
                          format_type_be(targetTypeId)),
                   parser_coercion_errposition(pstate, location, node)));

!     /*
!      * Look up the composite type, accounting for possibility that what we are
!      * given is a domain over composite.
!      */
!     baseTypeId = getBaseTypeAndTypmod(targetTypeId, &baseTypeMod);
!     tupdesc = lookup_rowtype_tupdesc(baseTypeId, baseTypeMod);
!
!     /* Process the fields */
      newargs = NIL;
      ucolno = 1;
      arg = list_head(args);
*************** coerce_record_to_complex(ParseState *pst
*** 1070,1079 ****

      rowexpr = makeNode(RowExpr);
      rowexpr->args = newargs;
!     rowexpr->row_typeid = targetTypeId;
      rowexpr->row_format = cformat;
      rowexpr->colnames = NIL;    /* not needed for named target type */
      rowexpr->location = location;
      return (Node *) rowexpr;
  }

--- 1096,1117 ----

      rowexpr = makeNode(RowExpr);
      rowexpr->args = newargs;
!     rowexpr->row_typeid = baseTypeId;
      rowexpr->row_format = cformat;
      rowexpr->colnames = NIL;    /* not needed for named target type */
      rowexpr->location = location;
+
+     /* If target is a domain, apply constraints */
+     if (baseTypeId != targetTypeId)
+     {
+         rowexpr->row_format = COERCE_IMPLICIT_CAST;
+         return coerce_to_domain((Node *) rowexpr,
+                                 baseTypeId, baseTypeMod,
+                                 targetTypeId,
+                                 ccontext, cformat, location,
+                                 false);
+     }
+
      return (Node *) rowexpr;
  }

*************** is_complex_array(Oid typid)
*** 2401,2413 ****

  /*
   * Check whether reltypeId is the row type of a typed table of type
!  * reloftypeId.  (This is conceptually similar to the subtype
!  * relationship checked by typeInheritsFrom().)
   */
  static bool
  typeIsOfTypedTable(Oid reltypeId, Oid reloftypeId)
  {
!     Oid            relid = typeidTypeRelid(reltypeId);
      bool        result = false;

      if (relid)
--- 2439,2451 ----

  /*
   * Check whether reltypeId is the row type of a typed table of type
!  * reloftypeId, or is a domain over such a row type.  (This is conceptually
!  * similar to the subtype relationship checked by typeInheritsFrom().)
   */
  static bool
  typeIsOfTypedTable(Oid reltypeId, Oid reloftypeId)
  {
!     Oid            relid = typeOrDomainTypeRelid(reltypeId);
      bool        result = false;

      if (relid)
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 2f2f2c7..fc0d6bc 100644
*** a/src/backend/parser/parse_func.c
--- b/src/backend/parser/parse_func.c
*************** ParseComplexProjection(ParseState *pstat
*** 1819,1836 ****
      }

      /*
!      * Else do it the hard way with get_expr_result_type().
       *
       * If it's a Var of type RECORD, we have to work even harder: we have to
!      * find what the Var refers to, and pass that to get_expr_result_type.
       * That task is handled by expandRecordVariable().
       */
      if (IsA(first_arg, Var) &&
          ((Var *) first_arg)->vartype == RECORDOID)
          tupdesc = expandRecordVariable(pstate, (Var *) first_arg, 0);
!     else if (get_expr_result_type(first_arg, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
          return NULL;            /* unresolvable RECORD type */
-     Assert(tupdesc);

      for (i = 0; i < tupdesc->natts; i++)
      {
--- 1819,1837 ----
      }

      /*
!      * Else do it the hard way with get_expr_result_tupdesc().
       *
       * If it's a Var of type RECORD, we have to work even harder: we have to
!      * find what the Var refers to, and pass that to get_expr_result_tupdesc.
       * That task is handled by expandRecordVariable().
       */
      if (IsA(first_arg, Var) &&
          ((Var *) first_arg)->vartype == RECORDOID)
          tupdesc = expandRecordVariable(pstate, (Var *) first_arg, 0);
!     else
!         tupdesc = get_expr_result_tupdesc(first_arg, true);
!     if (!tupdesc)
          return NULL;            /* unresolvable RECORD type */

      for (i = 0; i < tupdesc->natts; i++)
      {
diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
index a9273af..ca32a37 100644
*** a/src/backend/parser/parse_relation.c
--- b/src/backend/parser/parse_relation.c
*************** addRangeTableEntryForFunction(ParseState
*** 1496,1502 ****
                           parser_errposition(pstate, exprLocation(funcexpr))));
          }

!         if (functypclass == TYPEFUNC_COMPOSITE)
          {
              /* Composite data type, e.g. a table's row type */
              Assert(tupdesc);
--- 1496,1503 ----
                           parser_errposition(pstate, exprLocation(funcexpr))));
          }

!         if (functypclass == TYPEFUNC_COMPOSITE ||
!             functypclass == TYPEFUNC_COMPOSITE_DOMAIN)
          {
              /* Composite data type, e.g. a table's row type */
              Assert(tupdesc);
*************** expandRTE(RangeTblEntry *rte, int rtinde
*** 2245,2251 ****
                      functypclass = get_expr_result_type(rtfunc->funcexpr,
                                                          &funcrettype,
                                                          &tupdesc);
!                     if (functypclass == TYPEFUNC_COMPOSITE)
                      {
                          /* Composite data type, e.g. a table's row type */
                          Assert(tupdesc);
--- 2246,2253 ----
                      functypclass = get_expr_result_type(rtfunc->funcexpr,
                                                          &funcrettype,
                                                          &tupdesc);
!                     if (functypclass == TYPEFUNC_COMPOSITE ||
!                         functypclass == TYPEFUNC_COMPOSITE_DOMAIN)
                      {
                          /* Composite data type, e.g. a table's row type */
                          Assert(tupdesc);
*************** get_rte_attribute_type(RangeTblEntry *rt
*** 2765,2771 ****
                                                              &funcrettype,
                                                              &tupdesc);

!                         if (functypclass == TYPEFUNC_COMPOSITE)
                          {
                              /* Composite data type, e.g. a table's row type */
                              Form_pg_attribute att_tup;
--- 2767,2774 ----
                                                              &funcrettype,
                                                              &tupdesc);

!                         if (functypclass == TYPEFUNC_COMPOSITE ||
!                             functypclass == TYPEFUNC_COMPOSITE_DOMAIN)
                          {
                              /* Composite data type, e.g. a table's row type */
                              Form_pg_attribute att_tup;
*************** get_rte_attribute_is_dropped(RangeTblEnt
*** 2966,2979 ****
                      if (attnum > atts_done &&
                          attnum <= atts_done + rtfunc->funccolcount)
                      {
-                         TypeFuncClass functypclass;
-                         Oid            funcrettype;
                          TupleDesc    tupdesc;

!                         functypclass = get_expr_result_type(rtfunc->funcexpr,
!                                                             &funcrettype,
!                                                             &tupdesc);
!                         if (functypclass == TYPEFUNC_COMPOSITE)
                          {
                              /* Composite data type, e.g. a table's row type */
                              Form_pg_attribute att_tup;
--- 2969,2979 ----
                      if (attnum > atts_done &&
                          attnum <= atts_done + rtfunc->funccolcount)
                      {
                          TupleDesc    tupdesc;

!                         tupdesc = get_expr_result_tupdesc(rtfunc->funcexpr,
!                                                           true);
!                         if (tupdesc)
                          {
                              /* Composite data type, e.g. a table's row type */
                              Form_pg_attribute att_tup;
diff --git a/src/backend/parser/parse_target.c b/src/backend/parser/parse_target.c
index 2547524..16d3dba 100644
*** a/src/backend/parser/parse_target.c
--- b/src/backend/parser/parse_target.c
*************** transformAssignmentIndirection(ParseStat
*** 725,730 ****
--- 725,732 ----
          else
          {
              FieldStore *fstore;
+             Oid            baseTypeId;
+             int32        baseTypeMod;
              Oid            typrelid;
              AttrNumber    attnum;
              Oid            fieldTypeId;
*************** transformAssignmentIndirection(ParseStat
*** 752,758 ****

              /* No subscripts, so can process field selection here */

!             typrelid = typeidTypeRelid(targetTypeId);
              if (!typrelid)
                  ereport(ERROR,
                          (errcode(ERRCODE_DATATYPE_MISMATCH),
--- 754,767 ----

              /* No subscripts, so can process field selection here */

!             /*
!              * Look up the composite type, accounting for possibility that
!              * what we are given is a domain over composite.
!              */
!             baseTypeMod = targetTypMod;
!             baseTypeId = getBaseTypeAndTypmod(targetTypeId, &baseTypeMod);
!
!             typrelid = typeidTypeRelid(baseTypeId);
              if (!typrelid)
                  ereport(ERROR,
                          (errcode(ERRCODE_DATATYPE_MISMATCH),
*************** transformAssignmentIndirection(ParseStat
*** 796,802 ****
              fstore->arg = (Expr *) basenode;
              fstore->newvals = list_make1(rhs);
              fstore->fieldnums = list_make1_int(attnum);
!             fstore->resulttype = targetTypeId;

              return (Node *) fstore;
          }
--- 805,821 ----
              fstore->arg = (Expr *) basenode;
              fstore->newvals = list_make1(rhs);
              fstore->fieldnums = list_make1_int(attnum);
!             fstore->resulttype = baseTypeId;
!
!             /* If target is a domain, apply constraints */
!             if (baseTypeId != targetTypeId)
!                 return coerce_to_domain((Node *) fstore,
!                                         baseTypeId, baseTypeMod,
!                                         targetTypeId,
!                                         COERCION_IMPLICIT,
!                                         COERCE_IMPLICIT_CAST,
!                                         location,
!                                         false);

              return (Node *) fstore;
          }
*************** ExpandRowReference(ParseState *pstate, N
*** 1387,1408 ****
       * (This can be pretty inefficient if the expression involves nontrivial
       * computation :-(.)
       *
!      * Verify it's a composite type, and get the tupdesc.  We use
!      * get_expr_result_type() because that can handle references to functions
!      * returning anonymous record types.  If that fails, use
!      * lookup_rowtype_tupdesc(), which will almost certainly fail as well, but
!      * it will give an appropriate error message.
       *
       * If it's a Var of type RECORD, we have to work even harder: we have to
!      * find what the Var refers to, and pass that to get_expr_result_type.
       * That task is handled by expandRecordVariable().
       */
      if (IsA(expr, Var) &&
          ((Var *) expr)->vartype == RECORDOID)
          tupleDesc = expandRecordVariable(pstate, (Var *) expr, 0);
!     else if (get_expr_result_type(expr, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
!         tupleDesc = lookup_rowtype_tupdesc_copy(exprType(expr),
!                                                 exprTypmod(expr));
      Assert(tupleDesc);

      /* Generate a list of references to the individual fields */
--- 1406,1423 ----
       * (This can be pretty inefficient if the expression involves nontrivial
       * computation :-(.)
       *
!      * Verify it's a composite type, and get the tupdesc.
!      * get_expr_result_tupdesc() handles this conveniently.
       *
       * If it's a Var of type RECORD, we have to work even harder: we have to
!      * find what the Var refers to, and pass that to get_expr_result_tupdesc.
       * That task is handled by expandRecordVariable().
       */
      if (IsA(expr, Var) &&
          ((Var *) expr)->vartype == RECORDOID)
          tupleDesc = expandRecordVariable(pstate, (Var *) expr, 0);
!     else
!         tupleDesc = get_expr_result_tupdesc(expr, false);
      Assert(tupleDesc);

      /* Generate a list of references to the individual fields */
*************** expandRecordVariable(ParseState *pstate,
*** 1610,1624 ****

      /*
       * We now have an expression we can't expand any more, so see if
!      * get_expr_result_type() can do anything with it.  If not, pass to
!      * lookup_rowtype_tupdesc() which will probably fail, but will give an
!      * appropriate error message while failing.
       */
!     if (get_expr_result_type(expr, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
!         tupleDesc = lookup_rowtype_tupdesc_copy(exprType(expr),
!                                                 exprTypmod(expr));
!
!     return tupleDesc;
  }


--- 1625,1633 ----

      /*
       * We now have an expression we can't expand any more, so see if
!      * get_expr_result_tupdesc() can do anything with it.
       */
!     return get_expr_result_tupdesc(expr, false);
  }


diff --git a/src/backend/parser/parse_type.c b/src/backend/parser/parse_type.c
index d0b3fbe..b032651 100644
*** a/src/backend/parser/parse_type.c
--- b/src/backend/parser/parse_type.c
*************** stringTypeDatum(Type tp, char *string, i
*** 641,647 ****
      return OidInputFunctionCall(typinput, string, typioparam, atttypmod);
  }

! /* given a typeid, return the type's typrelid (associated relation, if any) */
  Oid
  typeidTypeRelid(Oid type_id)
  {
--- 641,650 ----
      return OidInputFunctionCall(typinput, string, typioparam, atttypmod);
  }

! /*
!  * Given a typeid, return the type's typrelid (associated relation), if any.
!  * Returns InvalidOid if type is not a composite type.
!  */
  Oid
  typeidTypeRelid(Oid type_id)
  {
*************** typeidTypeRelid(Oid type_id)
*** 652,658 ****
      typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_id));
      if (!HeapTupleIsValid(typeTuple))
          elog(ERROR, "cache lookup failed for type %u", type_id);
-
      type = (Form_pg_type) GETSTRUCT(typeTuple);
      result = type->typrelid;
      ReleaseSysCache(typeTuple);
--- 655,660 ----
*************** typeidTypeRelid(Oid type_id)
*** 660,665 ****
--- 662,699 ----
  }

  /*
+  * Given a typeid, return the type's typrelid (associated relation), if any.
+  * Returns InvalidOid if type is not a composite type or a domain over one.
+  * This is the same as typeidTypeRelid(getBaseType(type_id)), but faster.
+  */
+ Oid
+ typeOrDomainTypeRelid(Oid type_id)
+ {
+     HeapTuple    typeTuple;
+     Form_pg_type type;
+     Oid            result;
+
+     for (;;)
+     {
+         typeTuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_id));
+         if (!HeapTupleIsValid(typeTuple))
+             elog(ERROR, "cache lookup failed for type %u", type_id);
+         type = (Form_pg_type) GETSTRUCT(typeTuple);
+         if (type->typtype != TYPTYPE_DOMAIN)
+         {
+             /* Not a domain, so done looking through domains */
+             break;
+         }
+         /* It is a domain, so examine the base type instead */
+         type_id = type->typbasetype;
+         ReleaseSysCache(typeTuple);
+     }
+     result = type->typrelid;
+     ReleaseSysCache(typeTuple);
+     return result;
+ }
+
+ /*
   * error context callback for parse failure during parseTypeString()
   */
  static void
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 84759b6..b1e70a0 100644
*** a/src/backend/utils/adt/ruleutils.c
--- b/src/backend/utils/adt/ruleutils.c
*************** get_name_for_var_field(Var *var, int fie
*** 6731,6747 ****

      /*
       * If it's a Var of type RECORD, we have to find what the Var refers to;
!      * if not, we can use get_expr_result_type. If that fails, we try
!      * lookup_rowtype_tupdesc, which will probably fail too, but will ereport
!      * an acceptable message.
       */
      if (!IsA(var, Var) ||
          var->vartype != RECORDOID)
      {
!         if (get_expr_result_type((Node *) var, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
!             tupleDesc = lookup_rowtype_tupdesc_copy(exprType((Node *) var),
!                                                     exprTypmod((Node *) var));
!         Assert(tupleDesc);
          /* Got the tupdesc, so we can extract the field name */
          Assert(fieldno >= 1 && fieldno <= tupleDesc->natts);
          return NameStr(TupleDescAttr(tupleDesc, fieldno - 1)->attname);
--- 6731,6742 ----

      /*
       * If it's a Var of type RECORD, we have to find what the Var refers to;
!      * if not, we can use get_expr_result_tupdesc().
       */
      if (!IsA(var, Var) ||
          var->vartype != RECORDOID)
      {
!         tupleDesc = get_expr_result_tupdesc((Node *) var, false);
          /* Got the tupdesc, so we can extract the field name */
          Assert(fieldno >= 1 && fieldno <= tupleDesc->natts);
          return NameStr(TupleDescAttr(tupleDesc, fieldno - 1)->attname);
*************** get_name_for_var_field(Var *var, int fie
*** 7044,7057 ****

      /*
       * We now have an expression we can't expand any more, so see if
!      * get_expr_result_type() can do anything with it.  If not, pass to
!      * lookup_rowtype_tupdesc() which will probably fail, but will give an
!      * appropriate error message while failing.
       */
!     if (get_expr_result_type(expr, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
!         tupleDesc = lookup_rowtype_tupdesc_copy(exprType(expr),
!                                                 exprTypmod(expr));
!     Assert(tupleDesc);
      /* Got the tupdesc, so we can extract the field name */
      Assert(fieldno >= 1 && fieldno <= tupleDesc->natts);
      return NameStr(TupleDescAttr(tupleDesc, fieldno - 1)->attname);
--- 7039,7047 ----

      /*
       * We now have an expression we can't expand any more, so see if
!      * get_expr_result_tupdesc() can do anything with it.
       */
!     tupleDesc = get_expr_result_tupdesc(expr, false);
      /* Got the tupdesc, so we can extract the field name */
      Assert(fieldno >= 1 && fieldno <= tupleDesc->natts);
      return NameStr(TupleDescAttr(tupleDesc, fieldno - 1)->attname);
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index b7a14dc..48961e3 100644
*** a/src/backend/utils/cache/lsyscache.c
--- b/src/backend/utils/cache/lsyscache.c
*************** get_typtype(Oid typid)
*** 2398,2409 ****
   * type_is_rowtype
   *
   *        Convenience function to determine whether a type OID represents
!  *        a "rowtype" type --- either RECORD or a named composite type.
   */
  bool
  type_is_rowtype(Oid typid)
  {
!     return (typid == RECORDOID || get_typtype(typid) == TYPTYPE_COMPOSITE);
  }

  /*
--- 2398,2423 ----
   * type_is_rowtype
   *
   *        Convenience function to determine whether a type OID represents
!  *        a "rowtype" type --- either RECORD or a named composite type
!  *        (including a domain over a named composite type).
   */
  bool
  type_is_rowtype(Oid typid)
  {
!     if (typid == RECORDOID)
!         return true;            /* easy case */
!     switch (get_typtype(typid))
!     {
!         case TYPTYPE_COMPOSITE:
!             return true;
!         case TYPTYPE_DOMAIN:
!             if (get_typtype(getBaseType(typid)) == TYPTYPE_COMPOSITE)
!                 return true;
!             break;
!         default:
!             break;
!     }
!     return false;
  }

  /*
diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c
index 16c52c5..197b9e5 100644
*** a/src/backend/utils/cache/typcache.c
--- b/src/backend/utils/cache/typcache.c
*************** lookup_type_cache(Oid type_id, int flags
*** 717,723 ****
      /*
       * If requested, get information about a domain type
       */
!     if ((flags & TYPECACHE_DOMAIN_INFO) &&
          (typentry->flags & TCFLAGS_CHECKED_DOMAIN_CONSTRAINTS) == 0 &&
          typentry->typtype == TYPTYPE_DOMAIN)
      {
--- 717,731 ----
      /*
       * If requested, get information about a domain type
       */
!     if ((flags & TYPECACHE_DOMAIN_BASE_INFO) &&
!         typentry->typtype == TYPTYPE_DOMAIN &&
!         typentry->domainBaseType == InvalidOid)
!     {
!         typentry->domainBaseTypmod = -1;
!         typentry->domainBaseType =
!             getBaseTypeAndTypmod(type_id, &typentry->domainBaseTypmod);
!     }
!     if ((flags & TYPECACHE_DOMAIN_CONSTR_INFO) &&
          (typentry->flags & TCFLAGS_CHECKED_DOMAIN_CONSTRAINTS) == 0 &&
          typentry->typtype == TYPTYPE_DOMAIN)
      {
*************** InitDomainConstraintRef(Oid type_id, Dom
*** 1136,1142 ****
                          MemoryContext refctx, bool need_exprstate)
  {
      /* Look up the typcache entry --- we assume it survives indefinitely */
!     ref->tcache = lookup_type_cache(type_id, TYPECACHE_DOMAIN_INFO);
      ref->need_exprstate = need_exprstate;
      /* For safety, establish the callback before acquiring a refcount */
      ref->refctx = refctx;
--- 1144,1150 ----
                          MemoryContext refctx, bool need_exprstate)
  {
      /* Look up the typcache entry --- we assume it survives indefinitely */
!     ref->tcache = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO);
      ref->need_exprstate = need_exprstate;
      /* For safety, establish the callback before acquiring a refcount */
      ref->refctx = refctx;
*************** DomainHasConstraints(Oid type_id)
*** 1227,1233 ****
       * Note: a side effect is to cause the typcache's domain data to become
       * valid.  This is fine since we'll likely need it soon if there is any.
       */
!     typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_INFO);

      return (typentry->domainData != NULL);
  }
--- 1235,1241 ----
       * Note: a side effect is to cause the typcache's domain data to become
       * valid.  This is fine since we'll likely need it soon if there is any.
       */
!     typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO);

      return (typentry->domainData != NULL);
  }
*************** lookup_rowtype_tupdesc_copy(Oid type_id,
*** 1526,1531 ****
--- 1534,1586 ----
  }

  /*
+  * lookup_rowtype_tupdesc_domain
+  *
+  * Same as lookup_rowtype_tupdesc_noerror(), except that the type can also be
+  * a domain over a named composite type; so this is effectively equivalent to
+  * lookup_rowtype_tupdesc_noerror(getBaseType(type_id), typmod, noError)
+  * except for being a tad faster.
+  *
+  * Note: the reason we don't fold the look-through-domain behavior into plain
+  * lookup_rowtype_tupdesc() is that we want callers to know they might be
+  * dealing with a domain.  Otherwise they might construct a tuple that should
+  * be of the domain type, but not apply domain constraints.
+  */
+ TupleDesc
+ lookup_rowtype_tupdesc_domain(Oid type_id, int32 typmod, bool noError)
+ {
+     TupleDesc    tupDesc;
+
+     if (type_id != RECORDOID)
+     {
+         /*
+          * Check for domain or named composite type.  We might as well load
+          * whichever data is needed.
+          */
+         TypeCacheEntry *typentry;
+
+         typentry = lookup_type_cache(type_id,
+                                      TYPECACHE_TUPDESC |
+                                      TYPECACHE_DOMAIN_BASE_INFO);
+         if (typentry->typtype == TYPTYPE_DOMAIN)
+             return lookup_rowtype_tupdesc_noerror(typentry->domainBaseType,
+                                                   typentry->domainBaseTypmod,
+                                                   noError);
+         if (typentry->tupDesc == NULL && !noError)
+             ereport(ERROR,
+                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                      errmsg("type %s is not composite",
+                             format_type_be(type_id))));
+         tupDesc = typentry->tupDesc;
+     }
+     else
+         tupDesc = lookup_rowtype_tupdesc_internal(type_id, typmod, noError);
+     if (tupDesc != NULL)
+         PinTupleDesc(tupDesc);
+     return tupDesc;
+ }
+
+ /*
   * Hash function for the hash table of RecordCacheEntry.
   */
  static uint32
diff --git a/src/backend/utils/fmgr/funcapi.c b/src/backend/utils/fmgr/funcapi.c
index 9c3f451..5caa876 100644
*** a/src/backend/utils/fmgr/funcapi.c
--- b/src/backend/utils/fmgr/funcapi.c
*************** static TypeFuncClass internal_get_result
*** 39,45 ****
  static bool resolve_polymorphic_tupdesc(TupleDesc tupdesc,
                              oidvector *declared_args,
                              Node *call_expr);
! static TypeFuncClass get_type_func_class(Oid typid);


  /*
--- 39,45 ----
  static bool resolve_polymorphic_tupdesc(TupleDesc tupdesc,
                              oidvector *declared_args,
                              Node *call_expr);
! static TypeFuncClass get_type_func_class(Oid typid, Oid *base_typeid);


  /*
*************** get_expr_result_type(Node *expr,
*** 246,259 ****
      {
          /* handle as a generic expression; no chance to resolve RECORD */
          Oid            typid = exprType(expr);

          if (resultTypeId)
              *resultTypeId = typid;
          if (resultTupleDesc)
              *resultTupleDesc = NULL;
!         result = get_type_func_class(typid);
!         if (result == TYPEFUNC_COMPOSITE && resultTupleDesc)
!             *resultTupleDesc = lookup_rowtype_tupdesc_copy(typid, -1);
      }

      return result;
--- 246,262 ----
      {
          /* handle as a generic expression; no chance to resolve RECORD */
          Oid            typid = exprType(expr);
+         Oid            base_typid;

          if (resultTypeId)
              *resultTypeId = typid;
          if (resultTupleDesc)
              *resultTupleDesc = NULL;
!         result = get_type_func_class(typid, &base_typid);
!         if ((result == TYPEFUNC_COMPOSITE ||
!              result == TYPEFUNC_COMPOSITE_DOMAIN) &&
!             resultTupleDesc)
!             *resultTupleDesc = lookup_rowtype_tupdesc_copy(base_typid, -1);
      }

      return result;
*************** internal_get_result_type(Oid funcid,
*** 296,301 ****
--- 299,305 ----
      HeapTuple    tp;
      Form_pg_proc procform;
      Oid            rettype;
+     Oid            base_rettype;
      TupleDesc    tupdesc;

      /* First fetch the function's pg_proc row to inspect its rettype */
*************** internal_get_result_type(Oid funcid,
*** 363,374 ****
          *resultTupleDesc = NULL;    /* default result */

      /* Classify the result type */
!     result = get_type_func_class(rettype);
      switch (result)
      {
          case TYPEFUNC_COMPOSITE:
              if (resultTupleDesc)
!                 *resultTupleDesc = lookup_rowtype_tupdesc_copy(rettype, -1);
              /* Named composite types can't have any polymorphic columns */
              break;
          case TYPEFUNC_SCALAR:
--- 367,379 ----
          *resultTupleDesc = NULL;    /* default result */

      /* Classify the result type */
!     result = get_type_func_class(rettype, &base_rettype);
      switch (result)
      {
          case TYPEFUNC_COMPOSITE:
+         case TYPEFUNC_COMPOSITE_DOMAIN:
              if (resultTupleDesc)
!                 *resultTupleDesc = lookup_rowtype_tupdesc_copy(base_rettype, -1);
              /* Named composite types can't have any polymorphic columns */
              break;
          case TYPEFUNC_SCALAR:
*************** internal_get_result_type(Oid funcid,
*** 394,399 ****
--- 399,444 ----
  }

  /*
+  * get_expr_result_tupdesc
+  *        Get a tupdesc describing the result of a composite-valued expression
+  *
+  * If expression is not composite or rowtype can't be determined, returns NULL
+  * if noError is true, else throws error.
+  *
+  * This is a simpler version of get_expr_result_type() for use when the caller
+  * is only interested in determinate rowtype results.
+  */
+ TupleDesc
+ get_expr_result_tupdesc(Node *expr, bool noError)
+ {
+     TupleDesc    tupleDesc;
+     TypeFuncClass functypclass;
+
+     functypclass = get_expr_result_type(expr, NULL, &tupleDesc);
+
+     if (functypclass == TYPEFUNC_COMPOSITE ||
+         functypclass == TYPEFUNC_COMPOSITE_DOMAIN)
+         return tupleDesc;
+
+     if (!noError)
+     {
+         Oid            exprTypeId = exprType(expr);
+
+         if (exprTypeId != RECORDOID)
+             ereport(ERROR,
+                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                      errmsg("type %s is not composite",
+                             format_type_be(exprTypeId))));
+         else
+             ereport(ERROR,
+                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                      errmsg("record type has not been registered")));
+     }
+
+     return NULL;
+ }
+
+ /*
   * Given the result tuple descriptor for a function with OUT parameters,
   * replace any polymorphic columns (ANYELEMENT etc) with correct data types
   * deduced from the input arguments. Returns TRUE if able to deduce all types,
*************** resolve_polymorphic_argtypes(int numargs
*** 741,763 ****
  /*
   * get_type_func_class
   *        Given the type OID, obtain its TYPEFUNC classification.
   *
   * This is intended to centralize a bunch of formerly ad-hoc code for
   * classifying types.  The categories used here are useful for deciding
   * how to handle functions returning the datatype.
   */
  static TypeFuncClass
! get_type_func_class(Oid typid)
  {
      switch (get_typtype(typid))
      {
          case TYPTYPE_COMPOSITE:
              return TYPEFUNC_COMPOSITE;
          case TYPTYPE_BASE:
-         case TYPTYPE_DOMAIN:
          case TYPTYPE_ENUM:
          case TYPTYPE_RANGE:
              return TYPEFUNC_SCALAR;
          case TYPTYPE_PSEUDO:
              if (typid == RECORDOID)
                  return TYPEFUNC_RECORD;
--- 786,816 ----
  /*
   * get_type_func_class
   *        Given the type OID, obtain its TYPEFUNC classification.
+  *        Also, if it's a domain, return the base type OID.
   *
   * This is intended to centralize a bunch of formerly ad-hoc code for
   * classifying types.  The categories used here are useful for deciding
   * how to handle functions returning the datatype.
   */
  static TypeFuncClass
! get_type_func_class(Oid typid, Oid *base_typeid)
  {
+     *base_typeid = typid;
+
      switch (get_typtype(typid))
      {
          case TYPTYPE_COMPOSITE:
              return TYPEFUNC_COMPOSITE;
          case TYPTYPE_BASE:
          case TYPTYPE_ENUM:
          case TYPTYPE_RANGE:
              return TYPEFUNC_SCALAR;
+         case TYPTYPE_DOMAIN:
+             *base_typeid = typid = getBaseType(typid);
+             if (get_typtype(typid) == TYPTYPE_COMPOSITE)
+                 return TYPEFUNC_COMPOSITE_DOMAIN;
+             else                /* domain base type can't be a pseudotype */
+                 return TYPEFUNC_SCALAR;
          case TYPTYPE_PSEUDO:
              if (typid == RECORDOID)
                  return TYPEFUNC_RECORD;
*************** RelationNameGetTupleDesc(const char *rel
*** 1320,1335 ****
  TupleDesc
  TypeGetTupleDesc(Oid typeoid, List *colaliases)
  {
!     TypeFuncClass functypclass = get_type_func_class(typeoid);
      TupleDesc    tupdesc = NULL;

      /*
       * Build a suitable tupledesc representing the output rows
       */
!     if (functypclass == TYPEFUNC_COMPOSITE)
      {
          /* Composite data type, e.g. a table's row type */
!         tupdesc = lookup_rowtype_tupdesc_copy(typeoid, -1);

          if (colaliases != NIL)
          {
--- 1373,1390 ----
  TupleDesc
  TypeGetTupleDesc(Oid typeoid, List *colaliases)
  {
!     Oid            base_typeoid;
!     TypeFuncClass functypclass = get_type_func_class(typeoid, &base_typeoid);
      TupleDesc    tupdesc = NULL;

      /*
       * Build a suitable tupledesc representing the output rows
       */
!     if (functypclass == TYPEFUNC_COMPOSITE ||
!         functypclass == TYPEFUNC_COMPOSITE_DOMAIN)
      {
          /* Composite data type, e.g. a table's row type */
!         tupdesc = lookup_rowtype_tupdesc_copy(base_typeoid, -1);

          if (colaliases != NIL)
          {
diff --git a/src/include/access/htup_details.h b/src/include/access/htup_details.h
index fa04a63..b0d4c54 100644
*** a/src/include/access/htup_details.h
--- b/src/include/access/htup_details.h
*************** typedef struct DatumTupleFields
*** 134,139 ****
--- 134,144 ----
      Oid            datum_typeid;    /* composite type OID, or RECORDOID */

      /*
+      * datum_typeid cannot be a domain over composite, only plain composite,
+      * even if the datum is meant as a value of a domain-over-composite type.
+      * This is in line with the general principle that CoerceToDomain does not
+      * change the physical representation of the base type value.
+      *
       * Note: field ordering is chosen with thought that Oid might someday
       * widen to 64 bits.
       */
diff --git a/src/include/access/tupdesc.h b/src/include/access/tupdesc.h
index c15610e..2be5af1 100644
*** a/src/include/access/tupdesc.h
--- b/src/include/access/tupdesc.h
*************** typedef struct tupleConstr
*** 60,65 ****
--- 60,71 ----
   * row type, or a value >= 0 to allow the rowtype to be looked up in the
   * typcache.c type cache.
   *
+  * Note that tdtypeid is never the OID of a domain over composite, even if
+  * we are dealing with values that are known (at some higher level) to be of
+  * a domain-over-composite type.  This is because tdtypeid/tdtypmod need to
+  * match up with the type labeling of composite Datums, and those are never
+  * explicitly marked as being of a domain type, either.
+  *
   * Tuple descriptors that live in caches (relcache or typcache, at present)
   * are reference-counted: they can be deleted when their reference count goes
   * to zero.  Tuple descriptors created by the executor need no reference
diff --git a/src/include/funcapi.h b/src/include/funcapi.h
index 951af2a..7c52de9 100644
*** a/src/include/funcapi.h
--- b/src/include/funcapi.h
*************** typedef struct FuncCallContext
*** 143,148 ****
--- 143,152 ----
   *        get_call_result_type.  Note: the cases in which rowtypes cannot be
   *        determined are different from the cases for get_call_result_type.
   *        Do *not* use this if you can use one of the others.
+  *
+  * See also get_expr_result_tupdesc(), which is a convenient wrapper around
+  * get_expr_result_type() for use when the caller only cares about
+  * determinable-rowtype cases.
   *----------
   */

*************** typedef enum TypeFuncClass
*** 151,156 ****
--- 155,161 ----
  {
      TYPEFUNC_SCALAR,            /* scalar result type */
      TYPEFUNC_COMPOSITE,            /* determinable rowtype result */
+     TYPEFUNC_COMPOSITE_DOMAIN,    /* domain over determinable rowtype result */
      TYPEFUNC_RECORD,            /* indeterminate rowtype result */
      TYPEFUNC_OTHER                /* bogus type, eg pseudotype */
  } TypeFuncClass;
*************** extern TypeFuncClass get_func_result_typ
*** 165,170 ****
--- 170,177 ----
                       Oid *resultTypeId,
                       TupleDesc *resultTupleDesc);

+ extern TupleDesc get_expr_result_tupdesc(Node *expr, bool noError);
+
  extern bool resolve_polymorphic_argtypes(int numargs, Oid *argtypes,
                               char *argmodes,
                               Node *call_expr);
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index ccb5123..c2929ac 100644
*** a/src/include/nodes/primnodes.h
--- b/src/include/nodes/primnodes.h
*************** typedef struct Var
*** 166,172 ****
      Index        varno;            /* index of this var's relation in the range
                                   * table, or INNER_VAR/OUTER_VAR/INDEX_VAR */
      AttrNumber    varattno;        /* attribute number of this var, or zero for
!                                  * all */
      Oid            vartype;        /* pg_type OID for the type of this var */
      int32        vartypmod;        /* pg_attribute typmod value */
      Oid            varcollid;        /* OID of collation, or InvalidOid if none */
--- 166,172 ----
      Index        varno;            /* index of this var's relation in the range
                                   * table, or INNER_VAR/OUTER_VAR/INDEX_VAR */
      AttrNumber    varattno;        /* attribute number of this var, or zero for
!                                  * all attrs ("whole-row Var") */
      Oid            vartype;        /* pg_type OID for the type of this var */
      int32        vartypmod;        /* pg_attribute typmod value */
      Oid            varcollid;        /* OID of collation, or InvalidOid if none */
*************** typedef struct FieldSelect
*** 755,760 ****
--- 755,763 ----
   * the assign case of ArrayRef, this is used to implement UPDATE of a
   * portion of a column.
   *
+  * resulttype is always a named composite type (not a domain).  To update
+  * a composite domain value, apply CoerceToDomain to the FieldStore.
+  *
   * A single FieldStore can actually represent updates of several different
   * fields.  The parser only generates FieldStores with single-element lists,
   * but the planner will collapse multiple updates of the same base column
*************** typedef struct ArrayCoerceExpr
*** 849,855 ****
   * needed for the destination type plus possibly others; the columns need not
   * be in the same positions, but are matched up by name.  This is primarily
   * used to convert a whole-row value of an inheritance child table into a
!  * valid whole-row value of its parent table's rowtype.
   * ----------------
   */

--- 852,859 ----
   * needed for the destination type plus possibly others; the columns need not
   * be in the same positions, but are matched up by name.  This is primarily
   * used to convert a whole-row value of an inheritance child table into a
!  * valid whole-row value of its parent table's rowtype.  Both resulttype
!  * and the exposed type of "arg" must be named composite types (not domains).
   * ----------------
   */

*************** typedef struct RowExpr
*** 987,992 ****
--- 991,999 ----
      Oid            row_typeid;        /* RECORDOID or a composite type's ID */

      /*
+      * row_typeid cannot be a domain over composite, only plain composite.  To
+      * create a composite domain value, apply CoerceToDomain to the RowExpr.
+      *
       * Note: we deliberately do NOT store a typmod.  Although a typmod will be
       * associated with specific RECORD types at runtime, it will differ for
       * different backends, and so cannot safely be stored in stored
diff --git a/src/include/parser/parse_type.h b/src/include/parser/parse_type.h
index 7b843d0..af1e314 100644
*** a/src/include/parser/parse_type.h
--- b/src/include/parser/parse_type.h
*************** extern Oid    typeTypeCollation(Type typ);
*** 46,55 ****
  extern Datum stringTypeDatum(Type tp, char *string, int32 atttypmod);

  extern Oid    typeidTypeRelid(Oid type_id);

  extern TypeName *typeStringToTypeName(const char *str);
  extern void parseTypeString(const char *str, Oid *typeid_p, int32 *typmod_p, bool missing_ok);

! #define ISCOMPLEX(typeid) (typeidTypeRelid(typeid) != InvalidOid)

  #endif                            /* PARSE_TYPE_H */
--- 46,57 ----
  extern Datum stringTypeDatum(Type tp, char *string, int32 atttypmod);

  extern Oid    typeidTypeRelid(Oid type_id);
+ extern Oid    typeOrDomainTypeRelid(Oid type_id);

  extern TypeName *typeStringToTypeName(const char *str);
  extern void parseTypeString(const char *str, Oid *typeid_p, int32 *typmod_p, bool missing_ok);

! /* true if typeid is composite, or domain over composite, but not RECORD */
! #define ISCOMPLEX(typeid) (typeOrDomainTypeRelid(typeid) != InvalidOid)

  #endif                            /* PARSE_TYPE_H */
diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h
index 41b645a..ea799a8 100644
*** a/src/include/utils/typcache.h
--- b/src/include/utils/typcache.h
*************** typedef struct TypeCacheEntry
*** 92,97 ****
--- 92,104 ----
      FmgrInfo    rng_subdiff_finfo;    /* difference function, if any */

      /*
+      * Domain's base type and typmod if it's a domain type.  Zeroes if not
+      * domain, or if information hasn't been requested.
+      */
+     Oid            domainBaseType;
+     int32        domainBaseTypmod;
+
+     /*
       * Domain constraint data if it's a domain type.  NULL if not domain, or
       * if domain has no constraints, or if information hasn't been requested.
       */
*************** typedef struct TypeCacheEntry
*** 123,131 ****
  #define TYPECACHE_BTREE_OPFAMILY    0x0200
  #define TYPECACHE_HASH_OPFAMILY        0x0400
  #define TYPECACHE_RANGE_INFO        0x0800
! #define TYPECACHE_DOMAIN_INFO        0x1000
! #define TYPECACHE_HASH_EXTENDED_PROC        0x2000
! #define TYPECACHE_HASH_EXTENDED_PROC_FINFO    0x4000

  /*
   * Callers wishing to maintain a long-lived reference to a domain's constraint
--- 130,139 ----
  #define TYPECACHE_BTREE_OPFAMILY    0x0200
  #define TYPECACHE_HASH_OPFAMILY        0x0400
  #define TYPECACHE_RANGE_INFO        0x0800
! #define TYPECACHE_DOMAIN_BASE_INFO            0x1000
! #define TYPECACHE_DOMAIN_CONSTR_INFO        0x2000
! #define TYPECACHE_HASH_EXTENDED_PROC        0x4000
! #define TYPECACHE_HASH_EXTENDED_PROC_FINFO    0x8000

  /*
   * Callers wishing to maintain a long-lived reference to a domain's constraint
*************** extern TupleDesc lookup_rowtype_tupdesc_
*** 163,168 ****
--- 171,179 ----

  extern TupleDesc lookup_rowtype_tupdesc_copy(Oid type_id, int32 typmod);

+ extern TupleDesc lookup_rowtype_tupdesc_domain(Oid type_id, int32 typmod,
+                               bool noError);
+
  extern void assign_record_type_typmod(TupleDesc tupDesc);

  extern int    compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2);
diff --git a/src/test/regress/expected/domain.out b/src/test/regress/expected/domain.out
index 1e62c57..f7f3948 100644
*** a/src/test/regress/expected/domain.out
--- b/src/test/regress/expected/domain.out
*************** select pg_typeof('{1,2,3}'::dia || 42);
*** 198,203 ****
--- 198,291 ----
  (1 row)

  drop domain dia;
+ -- Test domains over composites
+ create type comptype as (r float8, i float8);
+ create domain dcomptype as comptype;
+ create table dcomptable (d1 dcomptype unique);
+ insert into dcomptable values (row(1,2)::dcomptype);
+ insert into dcomptable values (row(3,4)::comptype);
+ insert into dcomptable values (row(1,2)::dcomptype);  -- fail on uniqueness
+ ERROR:  duplicate key value violates unique constraint "dcomptable_d1_key"
+ DETAIL:  Key (d1)=((1,2)) already exists.
+ insert into dcomptable (d1.r) values(11);
+ select * from dcomptable;
+   d1
+ -------
+  (1,2)
+  (3,4)
+  (11,)
+ (3 rows)
+
+ select (d1).r, (d1).i, (d1).* from dcomptable;
+  r  | i | r  | i
+ ----+---+----+---
+   1 | 2 |  1 | 2
+   3 | 4 |  3 | 4
+  11 |   | 11 |
+ (3 rows)
+
+ update dcomptable set d1.r = (d1).r + 1 where (d1).i > 0;
+ select * from dcomptable;
+   d1
+ -------
+  (11,)
+  (2,2)
+  (4,4)
+ (3 rows)
+
+ alter domain dcomptype add constraint c1 check ((value).r <= (value).i);
+ alter domain dcomptype add constraint c2 check ((value).r > (value).i);  -- fail
+ ERROR:  column "d1" of table "dcomptable" contains values that violate the new constraint
+ select row(2,1)::dcomptype;  -- fail
+ ERROR:  value for domain dcomptype violates check constraint "c1"
+ insert into dcomptable values (row(1,2)::comptype);
+ insert into dcomptable values (row(2,1)::comptype);  -- fail
+ ERROR:  value for domain dcomptype violates check constraint "c1"
+ insert into dcomptable (d1.r) values(99);
+ insert into dcomptable (d1.r, d1.i) values(99, 100);
+ insert into dcomptable (d1.r, d1.i) values(100, 99);  -- fail
+ ERROR:  value for domain dcomptype violates check constraint "c1"
+ update dcomptable set d1.r = (d1).r + 1 where (d1).i > 0;  -- fail
+ ERROR:  value for domain dcomptype violates check constraint "c1"
+ update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+ select * from dcomptable;
+     d1
+ ----------
+  (11,)
+  (99,)
+  (1,3)
+  (3,5)
+  (0,3)
+  (98,101)
+ (6 rows)
+
+ explain (verbose, costs off)
+   update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+                                           QUERY PLAN
+ -----------------------------------------------------------------------------------------------
+  Update on public.dcomptable
+    ->  Seq Scan on public.dcomptable
+          Output: ROW(((d1).r - '1'::double precision), ((d1).i + '1'::double precision)), ctid
+          Filter: ((dcomptable.d1).i > '0'::double precision)
+ (4 rows)
+
+ create rule silly as on delete to dcomptable do instead
+   update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+ \d+ dcomptable
+                                   Table "public.dcomptable"
+  Column |   Type    | Collation | Nullable | Default | Storage  | Stats target | Description
+ --------+-----------+-----------+----------+---------+----------+--------------+-------------
+  d1     | dcomptype |           |          |         | extended |              |
+ Indexes:
+     "dcomptable_d1_key" UNIQUE CONSTRAINT, btree (d1)
+ Rules:
+     silly AS
+     ON DELETE TO dcomptable DO INSTEAD  UPDATE dcomptable SET d1.r = (dcomptable.d1).r - 1::double precision, d1.i =
(dcomptable.d1).i+ 1::double precision 
+   WHERE (dcomptable.d1).i > 0::double precision
+
+ drop table dcomptable;
+ drop type comptype cascade;
+ NOTICE:  drop cascades to type dcomptype
  -- Test domains over arrays of composite
  create type comptype as (r float8, i float8);
  create domain dcomptypea as comptype[];
*************** insert into ddtest2 values('{(-1)}');
*** 762,767 ****
--- 850,863 ----
  alter domain posint add constraint c1 check(value >= 0);
  ERROR:  cannot alter type "posint" because column "ddtest2.f1" uses it
  drop table ddtest2;
+ -- Likewise for domains within domains over composite
+ create domain ddtest1d as ddtest1;
+ create table ddtest2(f1 ddtest1d);
+ insert into ddtest2 values('(-1)');
+ alter domain posint add constraint c1 check(value >= 0);
+ ERROR:  cannot alter type "posint" because column "ddtest2.f1" uses it
+ drop table ddtest2;
+ drop domain ddtest1d;
  -- Likewise for domains within domains over array of composite
  create domain ddtest1d as ddtest1[];
  create table ddtest2(f1 ddtest1d);
diff --git a/src/test/regress/sql/domain.sql b/src/test/regress/sql/domain.sql
index 8fb3e20..5201f00 100644
*** a/src/test/regress/sql/domain.sql
--- b/src/test/regress/sql/domain.sql
*************** select pg_typeof('{1,2,3}'::dia || 42);
*** 120,125 ****
--- 120,164 ----
  drop domain dia;


+ -- Test domains over composites
+
+ create type comptype as (r float8, i float8);
+ create domain dcomptype as comptype;
+ create table dcomptable (d1 dcomptype unique);
+
+ insert into dcomptable values (row(1,2)::dcomptype);
+ insert into dcomptable values (row(3,4)::comptype);
+ insert into dcomptable values (row(1,2)::dcomptype);  -- fail on uniqueness
+ insert into dcomptable (d1.r) values(11);
+
+ select * from dcomptable;
+ select (d1).r, (d1).i, (d1).* from dcomptable;
+ update dcomptable set d1.r = (d1).r + 1 where (d1).i > 0;
+ select * from dcomptable;
+
+ alter domain dcomptype add constraint c1 check ((value).r <= (value).i);
+ alter domain dcomptype add constraint c2 check ((value).r > (value).i);  -- fail
+
+ select row(2,1)::dcomptype;  -- fail
+ insert into dcomptable values (row(1,2)::comptype);
+ insert into dcomptable values (row(2,1)::comptype);  -- fail
+ insert into dcomptable (d1.r) values(99);
+ insert into dcomptable (d1.r, d1.i) values(99, 100);
+ insert into dcomptable (d1.r, d1.i) values(100, 99);  -- fail
+ update dcomptable set d1.r = (d1).r + 1 where (d1).i > 0;  -- fail
+ update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+ select * from dcomptable;
+
+ explain (verbose, costs off)
+   update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+ create rule silly as on delete to dcomptable do instead
+   update dcomptable set d1.r = (d1).r - 1, d1.i = (d1).i + 1 where (d1).i > 0;
+ \d+ dcomptable
+
+ drop table dcomptable;
+ drop type comptype cascade;
+
+
  -- Test domains over arrays of composite

  create type comptype as (r float8, i float8);
*************** insert into ddtest2 values('{(-1)}');
*** 500,505 ****
--- 539,552 ----
  alter domain posint add constraint c1 check(value >= 0);
  drop table ddtest2;

+ -- Likewise for domains within domains over composite
+ create domain ddtest1d as ddtest1;
+ create table ddtest2(f1 ddtest1d);
+ insert into ddtest2 values('(-1)');
+ alter domain posint add constraint c1 check(value >= 0);
+ drop table ddtest2;
+ drop domain ddtest1d;
+
  -- Likewise for domains within domains over array of composite
  create domain ddtest1d as ddtest1[];
  create table ddtest2(f1 ddtest1d);

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Peter Geoghegan
Дата:
Сообщение: Re: [HACKERS] [COMMITTERS] pgsql: Fix traversal of half-frozen update chains
Следующее
От: Tom Lane
Дата:
Сообщение: Re: [HACKERS] Queuing all tables for analyze after recovery