Re: Performance Improvement by reducing WAL for Update Operation

Поиск
Список
Период
Сортировка
От Kyotaro HORIGUCHI
Тема Re: Performance Improvement by reducing WAL for Update Operation
Дата
Msg-id 20121228.170748.90887322.horiguchi.kyotaro@lab.ntt.co.jp
обсуждение исходный текст
Ответ на Re: Performance Improvement by reducing WAL for Update Operation  (Amit Kapila <amit.kapila@huawei.com>)
Ответы Re: Performance Improvement by reducing WAL for Update Operation
Re: Performance Improvement by reducing WAL for Update Operation
Список pgsql-hackers
Hello, I saw this patch and confirmed that
- Coding style looks good.- Appliable onto HEAD.- Some mis-codings are fixed.

And took the performance figures for 4 types of modification
versus 2 benchmarks.

I've see small performace gain (4-8% for execution, and 6-12% for
recovery) and 16% WAL shrink for modified pgbench enhances the
benefit of this patch.

On the other hand I've found no significant loss of performance
for execution and 4% reduction of WAL for original pgbench, but
there might be 4-8% performance loss for recovery.

Attached patches are listed below.

wal_update_changes_lz_v5.patch
 Rather straight implement of wal compression using existing pg_lz compress format.

wal_update_changes_mod_lz_v6_2.patch
 Modify pg_lz to have bulk literal segment format which is available only for WAL compression. Misplaced comment
fixed.



The detail of performance follows.
=====
I've tested involving the mod patch and 'modified' mod
patch.

CentOS6.3/Core i7
wal_level = archive, checkpoint_segments = 30 / 5min

wal_update_changes_mod_lz_v6+ is the version in which memcpy for
segment shorter than 16 bytes to be copied by while(*s)
*d++=*s++.

     postgres                    pgbench
A.    HEAD                        Original
B. wal_update_changes_lz_v5       Original
C. wal_update_changes_mod_lz_v6   Original
D. wal_update_changes_mod_lz_v6+  Original
E.    HEAD                        attached with this patch
F. wal_update_changes_lz_v5       attached with this patch
G. wal_update_changes_mod_lz_v6   attached with this patch
H. wal_update_changes_mod_lz_v6+  attached with this patch

Running doing pgbench -s 10 -i, pgbench -c 10 -j 10 -T 1200
  #trans/s                    WAL MB  WAL kB/tran
1A      346                      760     1.87
1B      347                      730     1.80   (96% of A)
1C      346                      729     1.80   (96% of A)
1D      347                      730     1.80   (96% of A)

1E      192                     2790     6.20
1F      200 (4% faster than E)  2431     5.19   (84% of D)
1G      207 (8% faster than E)  2563     5.28   (85% of D)
1H      199 (4% faster than E)  2421     5.19   (84% of D)

Recovery time
   Recv sec   us/trans
2A      26       62.6
2B      27       64.8  (4% slower than A)
2C      28       67.4  (8% slower than A)
2D      26       62.4  (same as A)

2E     130      629
2F     149      579    ( 8% faster than E)
2G     128      592    ( 6% faster than E)
2H     130      553    (12% faster than E)

For vanilla pgbench, WAL size shrinks slightly and performance
seems same as unpatched postgres(1A vs. 1B, 1C, 1D). For modified
pgbench, WAL size shrinks by about 17% and performance seems to
have a gain by several percent.

Recovery performance looks to have the same tendency. It looks to
produce very small loss outside of the effective range (2A
vs. 2B, 2C) and significant gain within (2E vs. 2F, 2G, 2H).

As a whole, this patch brings very large gain in its effective
range - e.g. updates of relatively small portions in a tuple, but
negligible loss of performance is observed outside of its
effective range on the test machine. I suppose the losses will be
emphasized by the more higher performance of seq write of WAL
devices


regards,

-- 
Kyotaro Horiguchi
NTT Open Source Software Center
*** a/src/backend/access/common/heaptuple.c
--- b/src/backend/access/common/heaptuple.c
***************
*** 60,65 ****
--- 60,66 ---- #include "access/sysattr.h" #include "access/tuptoaster.h" #include "executor/tuptable.h"
+ #include "utils/datum.h"   /* Does att's datatype allow packing into the 1-byte-header varlena format? */
***************
*** 297,308 **** heap_attisnull(HeapTuple tup, int attnum) }  /* ----------------
!  *        nocachegetattr  *
!  *        This only gets called from fastgetattr() macro, in cases where  *        we can't use a cacheoffset and the
valueis not null.  *
 
!  *        This caches attribute offsets in the attribute descriptor.  *  *        An alternative way to speed things
upwould be to cache offsets  *        with the tuple, but that seems more difficult unless you take
 
--- 298,310 ---- }  /* ----------------
!  *        nocachegetattr_with_len  *
!  *        This only gets called in cases where  *        we can't use a cacheoffset and the value is not null.  *
!  *        This caches attribute offsets in the attribute descriptor and
!  *        outputs the length of the attribute value.  *  *        An alternative way to speed things up would be to
cacheoffsets  *        with the tuple, but that seems more difficult unless you take
 
***************
*** 320,328 **** heap_attisnull(HeapTuple tup, int attnum)  * ----------------  */ Datum
! nocachegetattr(HeapTuple tuple,
!                int attnum,
!                TupleDesc tupleDesc) {     HeapTupleHeader tup = tuple->t_data;     Form_pg_attribute *att =
tupleDesc->attrs;
--- 322,331 ----  * ----------------  */ Datum
! nocachegetattr_with_len(HeapTuple tuple,
!                         int attnum,
!                         TupleDesc tupleDesc,
!                         Size *len) {     HeapTupleHeader tup = tuple->t_data;     Form_pg_attribute *att =
tupleDesc->attrs;
***************
*** 381,386 **** nocachegetattr(HeapTuple tuple,
--- 384,392 ----          */         if (att[attnum]->attcacheoff >= 0)         {
+             if (len)
+                 *len = att_getlength(att[attnum]->attlen,
+                                      tp + att[attnum]->attcacheoff);             return fetchatt(att[attnum],
                   tp + att[attnum]->attcacheoff);         }
 
***************
*** 507,515 **** nocachegetattr(HeapTuple tuple,
--- 513,534 ----         }     } 
+     if (len)
+         *len = att_getlength(att[attnum]->attlen, tp + off);     return fetchatt(att[attnum], tp + off); } 
+ /*
+  *    nocachegetattr
+  */
+ Datum
+ nocachegetattr(HeapTuple tuple,
+                int attnum,
+                TupleDesc tupleDesc)
+ {
+     return nocachegetattr_with_len(tuple, attnum, tupleDesc, NULL);
+ }
+  /* ----------------  *        heap_getsysattr  *
***************
*** 618,623 **** heap_copytuple_with_tuple(HeapTuple src, HeapTuple dest)
--- 637,1015 ---- }  /*
+  * Check if the specified attribute's value is same in both given tuples.
+  * and outputs the length of the given attribute in both tuples.
+  */
+ bool
+ heap_attr_get_length_and_check_equals(TupleDesc tupdesc, int attrnum,
+                                       HeapTuple tup1, HeapTuple tup2,
+                                     Size *tup1_attr_len, Size *tup2_attr_len)
+ {
+     Datum        value1,
+                 value2;
+     bool        isnull1,
+                 isnull2;
+     Form_pg_attribute att;
+ 
+     *tup1_attr_len = 0;
+     *tup2_attr_len = 0;
+ 
+     /*
+      * If it's a whole-tuple reference, say "not equal".  It's not really
+      * worth supporting this case, since it could only succeed after a no-op
+      * update, which is hardly a case worth optimizing for.
+      */
+     if (attrnum == 0)
+         return false;
+ 
+     /*
+      * Likewise, automatically say "not equal" for any system attribute other
+      * than OID and tableOID; we cannot expect these to be consistent in a HOT
+      * chain, or even to be set correctly yet in the new tuple.
+      */
+     if (attrnum < 0)
+     {
+         if (attrnum != ObjectIdAttributeNumber &&
+             attrnum != TableOidAttributeNumber)
+             return false;
+     }
+ 
+     /*
+      * Extract the corresponding values.  XXX this is pretty inefficient if
+      * there are many indexed columns.    Should HeapSatisfiesHOTUpdate do a
+      * single heap_deform_tuple call on each tuple, instead?  But that doesn't
+      * work for system columns ...
+      */
+     value1 = heap_getattr_with_len(tup1, attrnum, tupdesc, &isnull1, tup1_attr_len);
+     value2 = heap_getattr_with_len(tup2, attrnum, tupdesc, &isnull2, tup2_attr_len);
+ 
+     /*
+      * If one value is NULL and other is not, then they are certainly not
+      * equal
+      */
+     if (isnull1 != isnull2)
+         return false;
+ 
+     /*
+      * If both are NULL, they can be considered equal.
+      */
+     if (isnull1)
+         return true;
+ 
+     /*
+      * We do simple binary comparison of the two datums.  This may be overly
+      * strict because there can be multiple binary representations for the
+      * same logical value.    But we should be OK as long as there are no false
+      * positives.  Using a type-specific equality operator is messy because
+      * there could be multiple notions of equality in different operator
+      * classes; furthermore, we cannot safely invoke user-defined functions
+      * while holding exclusive buffer lock.
+      */
+     if (attrnum <= 0)
+     {
+         /* The only allowed system columns are OIDs, so do this */
+         return (DatumGetObjectId(value1) == DatumGetObjectId(value2));
+     }
+     else
+     {
+         Assert(attrnum <= tupdesc->natts);
+         att = tupdesc->attrs[attrnum - 1];
+         return datumIsEqual(value1, value2, att->attbyval, att->attlen);
+     }
+ }
+ 
+ /* ----------------
+  * heap_delta_encode
+  *        Forms an encoded data from old and new tuple with the modified columns
+  *        using an algorithm similar to LZ algorithm.
+  *
+  *        tupleDesc - Tuple descriptor.
+  *        oldtup - pointer to the old/history tuple.
+  *        newtup - pointer to the new tuple.
+  *        encdata - pointer to the encoded data using lz algorithm.
+  *
+  *        Encode the bitmap [+padding] [+oid] as a new data. And loop for all
+  *        attributes to find any modifications in the attributes.
+  *
+  *        The unmodified data is encoded as a history tag to the output and the
+  *        modifed data is encoded as new data to the output.
+  *
+  *        If the encoded output data is less than 75% of original data,
+  *        The output data is considered as encoded and proceed further.
+  * ----------------
+  */
+ bool
+ heap_delta_encode(TupleDesc tupleDesc, HeapTuple oldtup, HeapTuple newtup,
+                   PGLZ_Header *encdata)
+ {
+     Form_pg_attribute *att = tupleDesc->attrs;
+     int            numberOfAttributes;
+     int32        new_tup_off = 0,
+                 old_tup_off = 0,
+                 temp_off = 0,
+                 match_off = 0,
+                 change_off = 0;
+     int            attnum;
+     int32        data_len,
+                 old_tup_pad_len,
+                 new_tup_pad_len;
+     Size        old_tup_attr_len,
+                 new_tup_attr_len;
+     bool        is_attr_equals = true;
+     unsigned char *bp = (unsigned char *) encdata + sizeof(PGLZ_Header);
+     unsigned char *bstart = bp;
+     char       *dp = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
+     char       *dstart = dp;
+     char       *history;
+     unsigned char ctrl_dummy = 0;
+     unsigned char *ctrlp = &ctrl_dummy;
+     unsigned char ctrlb = 0;
+     unsigned char ctrl = 0;
+     int32        len,
+                 old_tup_bitmaplen,
+                 new_tup_bitmaplen,
+                 new_tup_len;
+     int32        result_size;
+     int32        result_max;
+ 
+     /* Include the bitmap header in the lz encoded data. */
+     history = (char *) oldtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
+     old_tup_bitmaplen = oldtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+     new_tup_bitmaplen = newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+     new_tup_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ 
+     /*
+      * The maximum encoded data is of 75% of total size. The max tuple size is
+      * already validated as it cannot be more than MaxHeapTupleSize.
+      */
+     result_max = (new_tup_len * 75) / 100;
+     encdata->rawsize = new_tup_len;
+ 
+     /*
+      * Check for output buffer is reached the result_max by advancing the
+      * buffer by the calculated aproximate length for the corresponding
+      * operation.
+      */
+     if ((bp + (2 * new_tup_bitmaplen)) - bstart >= result_max)
+         return false;
+ 
+     /* Copy the bitmap data from new tuple to the encoded data buffer */
+     pglz_out_add(ctrlp, ctrlb, ctrl, bp, new_tup_bitmaplen, dp);
+     dstart = dp;
+ 
+     numberOfAttributes = HeapTupleHeaderGetNatts(newtup->t_data);
+     for (attnum = 1; attnum <= numberOfAttributes; attnum++)
+     {
+         /*
+          * If the attribute is modified by the update operation, store the
+          * appropiate offsets in the WAL record, otherwise skip to the next
+          * attribute.
+          */
+         if (!heap_attr_get_length_and_check_equals(tupleDesc, attnum, oldtup,
+                                newtup, &old_tup_attr_len, &new_tup_attr_len))
+         {
+             is_attr_equals = false;
+             data_len = old_tup_off - match_off;
+ 
+             /*
+              * Check for output buffer is reached the result_max by advancing
+              * the buffer by the calculated aproximate length for the
+              * corresponding operation.
+              */
+             len = PGLZ_GET_HIST_CTRL_BIT_LEN(data_len);
+             if ((bp + len) - bstart >= result_max)
+                 return false;
+ 
+             /*
+              * The match_off value is calculated w.r.t to the tuple t_hoff
+              * value, the bit map len needs to be added to match_off to get
+              * the actual start offfset from the old/history tuple.
+              */
+             match_off += old_tup_bitmaplen;
+ 
+             /*
+              * If any unchanged data presents in the old and new tuples then
+              * encode the data as it needs to copy from history tuple with len
+              * and offset.
+              */
+             pglz_out_tag(ctrlp, ctrlb, ctrl, bp, data_len, match_off, history);
+ 
+             /*
+              * Recalculate the old and new tuple offsets based on padding
+              * present in the tuples
+              */
+             if (!HeapTupleHasNulls(oldtup)
+                 || !att_isnull((attnum - 1), oldtup->t_data->t_bits))
+             {
+                 old_tup_off = att_align_pointer(old_tup_off,
+                                                 att[attnum - 1]->attalign,
+                                                 att[attnum - 1]->attlen,
+                                                 (char *) oldtup->t_data + oldtup->t_data->t_hoff + old_tup_off);
+             }
+ 
+             if (!HeapTupleHasNulls(newtup)
+                 || !att_isnull((attnum - 1), newtup->t_data->t_bits))
+             {
+                 new_tup_off = att_align_pointer(new_tup_off,
+                                                 att[attnum - 1]->attalign,
+                                                 att[attnum - 1]->attlen,
+                                                 (char *) newtup->t_data + newtup->t_data->t_hoff + new_tup_off);
+             }
+ 
+             old_tup_off += old_tup_attr_len;
+             new_tup_off += new_tup_attr_len;
+ 
+             match_off = old_tup_off;
+         }
+         else
+         {
+             /*
+              * Check for output buffer is reached the result_max by advancing
+              * the buffer by the calculated aproximate length for the
+              * corresponding operation.
+              */
+             data_len = new_tup_off - change_off;
+             if ((bp + (2 * data_len)) - bstart >= result_max)
+                 return false;
+ 
+             /* Copy the modified column data to the output buffer if present */
+             pglz_out_add(ctrlp, ctrlb, ctrl, bp, data_len, dp);
+ 
+             /*
+              * calculate the old tuple field start position, required to
+              * ignore if any alignmet is present.
+              */
+             if (!HeapTupleHasNulls(oldtup)
+                 || !att_isnull((attnum - 1), oldtup->t_data->t_bits))
+             {
+                 temp_off = old_tup_off;
+                 old_tup_off = att_align_pointer(old_tup_off,
+                                                 att[attnum - 1]->attalign,
+                                                 att[attnum - 1]->attlen,
+                                                 (char *) oldtup->t_data + oldtup->t_data->t_hoff + old_tup_off);
+ 
+                 old_tup_pad_len = old_tup_off - temp_off;
+ 
+                 /*
+                  * calculate the new tuple field start position to check
+                  * whether any padding is required or not because field
+                  * alignment.
+                  */
+                 temp_off = new_tup_off;
+                 new_tup_off = att_align_pointer(new_tup_off,
+                                                 att[attnum - 1]->attalign,
+                                                 att[attnum - 1]->attlen,
+                                                 (char *) newtup->t_data + newtup->t_data->t_hoff + new_tup_off);
+                 new_tup_pad_len = new_tup_off - temp_off;
+ 
+                 /*
+                  * Checking for that is there any alignment difference between
+                  * old and new tuple attributes.
+                  */
+                 if (old_tup_pad_len != new_tup_pad_len)
+                 {
+                     /*
+                      * If the alignment difference is found between old and
+                      * new tuples and the last attribute value of the new
+                      * tuple is same as old tuple then write the encode as
+                      * history data until the current match.
+                      *
+                      * If the last attribute value of new tuple is not same as
+                      * old tuple then the matched data marking as history is
+                      * already taken care.
+                      */
+                     if (is_attr_equals)
+                     {
+                         /*
+                          * Check for output buffer is reached the result_max
+                          * by advancing the buffer by the calculated
+                          * aproximate length for the corresponding operation.
+                          */
+                         data_len = old_tup_off - old_tup_pad_len - match_off;
+                         len = PGLZ_GET_HIST_CTRL_BIT_LEN(data_len);
+                         if ((bp + len) - bstart >= result_max)
+                             return false;
+ 
+                         match_off += old_tup_bitmaplen;
+                         pglz_out_tag(ctrlp, ctrlb, ctrl, bp, data_len, match_off, history);
+                     }
+ 
+                     match_off = old_tup_off;
+ 
+                     /* Alignment data */
+                     if ((bp + (2 * new_tup_pad_len)) - bstart >= result_max)
+                         return false;
+ 
+                     pglz_out_add(ctrlp, ctrlb, ctrl, bp, new_tup_pad_len, dp);
+                 }
+             }
+ 
+             old_tup_off += old_tup_attr_len;
+             new_tup_off += new_tup_attr_len;
+ 
+             change_off = new_tup_off;
+ 
+             /*
+              * Recalculate the destination pointer with the new offset which
+              * is used while copying the modified data.
+              */
+             dp = dstart + new_tup_off;
+             is_attr_equals = true;
+         }
+     }
+ 
+     /* If any modified column data presents then copy it. */
+     data_len = new_tup_off - change_off;
+     if ((bp + (2 * data_len)) - bstart >= result_max)
+         return false;
+ 
+     pglz_out_add(ctrlp, ctrlb, ctrl, bp, data_len, dp);
+ 
+     /* If any left out old tuple data presents then copy it as history */
+     data_len = old_tup_off - match_off;
+     len = PGLZ_GET_HIST_CTRL_BIT_LEN(data_len);
+     if ((bp + len) - bstart >= result_max)
+         return false;
+ 
+     match_off += old_tup_bitmaplen;
+     pglz_out_tag(ctrlp, ctrlb, ctrl, bp, data_len, match_off, history);
+ 
+     /*
+      * Write out the last control byte and check that we haven't overrun the
+      * output size allowed by the strategy.
+      */
+     *ctrlp = ctrlb;
+ 
+     result_size = bp - bstart;
+     if (result_size >= result_max)
+         return false;
+ 
+     /*
+      * Success - need only fill in the actual length of the compressed datum.
+      */
+     SET_VARSIZE_COMPRESSED(encdata, result_size + sizeof(PGLZ_Header));
+     return true;
+ }
+ 
+ /* ----------------
+  * heap_delta_decode
+  *        Decodes the encoded data to dest tuple with the help of history.
+  *
+  *        encdata - Pointer to the encoded data.
+  *        oldtup - pointer to the history tuple.
+  *        newtup - pointer to the destination tuple.
+  * ----------------
+  */
+ void
+ heap_delta_decode(PGLZ_Header *encdata, HeapTuple oldtup, HeapTuple newtup)
+ {
+     return pglz_decompress_with_history((char *) encdata,
+              (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+                                         &newtup->t_len,
+             (char *) oldtup->t_data + offsetof(HeapTupleHeaderData, t_bits));
+ }
+ 
+ /*  * heap_form_tuple  *        construct a tuple from the given values[] and isnull[] arrays,  *        which are of
thelength indicated by tupleDescriptor->natts
 
*** a/src/backend/access/heap/heapam.c
--- b/src/backend/access/heap/heapam.c
***************
*** 85,90 **** static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
--- 85,91 ----                     TransactionId xid, CommandId cid, int options); static XLogRecPtr
log_heap_update(Relationreln, Buffer oldbuf,                 ItemPointerData from, Buffer newbuf, HeapTuple newtup,
 
+                 HeapTuple oldtup,                 bool all_visible_cleared, bool new_all_visible_cleared); static
boolHeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs,                        HeapTuple oldtup, HeapTuple
newtup);
***************
*** 844,849 **** heapgettup_pagemode(HeapScanDesc scan,
--- 845,898 ----  * definition in access/htup.h is maintained.  */ Datum
+ fastgetattr_with_len(HeapTuple tup, int attnum, TupleDesc tupleDesc,
+                      bool *isnull, int32 *len)
+ {
+     return (
+             (attnum) > 0 ?
+             (
+              (*(isnull) = false),
+              HeapTupleNoNulls(tup) ?
+              (
+               (tupleDesc)->attrs[(attnum) - 1]->attcacheoff >= 0 ?
+               (
+             (*(len) = att_getlength((tupleDesc)->attrs[(attnum - 1)]->attlen,
+                              (char *) (tup)->t_data + (tup)->t_data->t_hoff +
+                              (tupleDesc)->attrs[(attnum) - 1]->attcacheoff)),
+                fetchatt((tupleDesc)->attrs[(attnum) - 1],
+                         (char *) (tup)->t_data + (tup)->t_data->t_hoff +
+                         (tupleDesc)->attrs[(attnum) - 1]->attcacheoff)
+                )
+               :
+               (
+                nocachegetattr_with_len(tup), (attnum), (tupleDesc), (len))
+               )
+              :
+              (
+               att_isnull((attnum) - 1, (tup)->t_data->t_bits) ?
+               (
+                (*(isnull) = true),
+                (*(len) = 0),
+                (Datum) NULL
+                )
+               :
+               (
+                nocachegetattr_with_len((tup), (attnum), (tupleDesc), (len))
+                )
+               )
+              )
+             :
+             (
+              (Datum) NULL
+              )
+         );
+ }
+ 
+ /*
+  * This is formatted so oddly so that the correspondence to the macro
+  * definition in access/htup.h is maintained.
+  */
+ Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,             bool *isnull) {
***************
*** 860,866 **** fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
(tupleDesc)->attrs[(attnum)- 1]->attcacheoff)                )               :
 
!               nocachegetattr((tup), (attnum), (tupleDesc))               )              :              (
--- 909,916 ----                         (tupleDesc)->attrs[(attnum) - 1]->attcacheoff)                )
:
!               (
!                nocachegetattr(tup), (attnum), (tupleDesc))               )              :              (
***************
*** 2383,2389 **** simple_heap_insert(Relation relation, HeapTuple tup) HTSU_Result heap_delete(Relation relation,
ItemPointertid,             CommandId cid, Snapshot crosscheck, bool wait,
 
!             HeapUpdateFailureData *hufd) {     HTSU_Result result;     TransactionId xid =
GetCurrentTransactionId();
--- 2433,2439 ---- HTSU_Result heap_delete(Relation relation, ItemPointer tid,             CommandId cid, Snapshot
crosscheck,bool wait,
 
!             HeapUpdateFailureData * hufd) {     HTSU_Result result;     TransactionId xid =
GetCurrentTransactionId();
***************
*** 3212,3221 **** l2:     /* XLOG stuff */     if (RelationNeedsWAL(relation))     {
!         XLogRecPtr    recptr = log_heap_update(relation, buffer, oldtup.t_self,
!                                              newbuf, heaptup,
!                                              all_visible_cleared,
!                                              all_visible_cleared_new);          if (newbuf != buffer)         {
--- 3262,3273 ----     /* XLOG stuff */     if (RelationNeedsWAL(relation))     {
!         XLogRecPtr    recptr;
! 
!         recptr = log_heap_update(relation, buffer, oldtup.t_self,
!                                  newbuf, heaptup, &oldtup,
!                                  all_visible_cleared,
!                                  all_visible_cleared_new);          if (newbuf != buffer)         {
***************
*** 3282,3355 **** static bool heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,                        HeapTuple
tup1,HeapTuple tup2) {
 
!     Datum        value1,
!                 value2;
!     bool        isnull1,
!                 isnull2;
!     Form_pg_attribute att;
! 
!     /*
!      * If it's a whole-tuple reference, say "not equal".  It's not really
!      * worth supporting this case, since it could only succeed after a no-op
!      * update, which is hardly a case worth optimizing for.
!      */
!     if (attrnum == 0)
!         return false;
! 
!     /*
!      * Likewise, automatically say "not equal" for any system attribute other
!      * than OID and tableOID; we cannot expect these to be consistent in a HOT
!      * chain, or even to be set correctly yet in the new tuple.
!      */
!     if (attrnum < 0)
!     {
!         if (attrnum != ObjectIdAttributeNumber &&
!             attrnum != TableOidAttributeNumber)
!             return false;
!     } 
!     /*
!      * Extract the corresponding values.  XXX this is pretty inefficient if
!      * there are many indexed columns.    Should HeapSatisfiesHOTUpdate do a
!      * single heap_deform_tuple call on each tuple, instead?  But that doesn't
!      * work for system columns ...
!      */
!     value1 = heap_getattr(tup1, attrnum, tupdesc, &isnull1);
!     value2 = heap_getattr(tup2, attrnum, tupdesc, &isnull2);
! 
!     /*
!      * If one value is NULL and other is not, then they are certainly not
!      * equal
!      */
!     if (isnull1 != isnull2)
!         return false;
! 
!     /*
!      * If both are NULL, they can be considered equal.
!      */
!     if (isnull1)
!         return true;
! 
!     /*
!      * We do simple binary comparison of the two datums.  This may be overly
!      * strict because there can be multiple binary representations for the
!      * same logical value.    But we should be OK as long as there are no false
!      * positives.  Using a type-specific equality operator is messy because
!      * there could be multiple notions of equality in different operator
!      * classes; furthermore, we cannot safely invoke user-defined functions
!      * while holding exclusive buffer lock.
!      */
!     if (attrnum <= 0)
!     {
!         /* The only allowed system columns are OIDs, so do this */
!         return (DatumGetObjectId(value1) == DatumGetObjectId(value2));
!     }
!     else
!     {
!         Assert(attrnum <= tupdesc->natts);
!         att = tupdesc->attrs[attrnum - 1];
!         return datumIsEqual(value1, value2, att->attbyval, att->attlen);
!     } }  /*
--- 3334,3344 ---- heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,                        HeapTuple tup1,
HeapTupletup2) {
 
!     Size        tup1_attr_len,
!                 tup2_attr_len; 
!     return heap_attr_get_length_and_check_equals(tupdesc, attrnum, tup1, tup2,
!                                              &tup1_attr_len, &tup2_attr_len); }  /*
***************
*** 4447,4453 **** log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer,  */ static XLogRecPtr
log_heap_update(Relationreln, Buffer oldbuf, ItemPointerData from,
 
!                 Buffer newbuf, HeapTuple newtup,                 bool all_visible_cleared, bool
new_all_visible_cleared){     xl_heap_update xlrec;
 
--- 4436,4442 ----  */ static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
!                 Buffer newbuf, HeapTuple newtup, HeapTuple oldtup,                 bool all_visible_cleared, bool
new_all_visible_cleared){     xl_heap_update xlrec;
 
***************
*** 4456,4461 **** log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
--- 4445,4461 ----     XLogRecPtr    recptr;     XLogRecData rdata[4];     Page        page = BufferGetPage(newbuf);
+     char       *newtupdata;
+     int            newtuplen;
+     int            oldtuplen;
+     bool        compressed = false;
+ 
+     /* Structure which holds max output possible from the LZ algorithm */
+     struct
+     {
+         PGLZ_Header pglzheader;
+         char        buf[MaxHeapTupleSize];
+     }            buf;      /* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
***************
*** 4465,4475 **** log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,     else         info =
XLOG_HEAP_UPDATE;     xlrec.target.node = reln->rd_node;     xlrec.target.tid = from;
 
!     xlrec.all_visible_cleared = all_visible_cleared;     xlrec.newtid = newtup->t_self;
!     xlrec.new_all_visible_cleared = new_all_visible_cleared;      rdata[0].data = (char *) &xlrec;     rdata[0].len =
SizeOfHeapUpdate;
--- 4465,4505 ----     else         info = XLOG_HEAP_UPDATE; 
+     newtupdata = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
+     newtuplen = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+     oldtuplen = oldtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+ 
+     /* Is the update is going to the same page? */
+     if (oldbuf == newbuf)
+     {
+         /*
+          * LZ algorithm can hold only history offset in the range of 1 - 4095.
+          * so the delta encode is restricted for the tuples with length more
+          * than PGLZ_HISTORY_SIZE.
+          */
+         if (oldtuplen < PGLZ_HISTORY_SIZE)
+         {
+             /* Delta-encode the new tuple using the old tuple */
+             if (heap_delta_encode(reln->rd_att, oldtup, newtup,
+                                   &buf.pglzheader))
+             {
+                 compressed = true;
+                 newtupdata = (char *) &buf.pglzheader;
+                 newtuplen = VARSIZE(&buf.pglzheader);
+             }
+         }
+     }
+ 
+     xlrec.flags = 0;     xlrec.target.node = reln->rd_node;     xlrec.target.tid = from;
!     if (all_visible_cleared)
!         xlrec.flags |= XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED;     xlrec.newtid = newtup->t_self;
!     if (new_all_visible_cleared)
!         xlrec.flags |= XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED;
!     if (compressed)
!         xlrec.flags |= XL_HEAP_UPDATE_DELTA_ENCODED;      rdata[0].data = (char *) &xlrec;     rdata[0].len =
SizeOfHeapUpdate;
***************
*** 4496,4504 **** log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,     rdata[2].buffer_std = true;
  rdata[2].next = &(rdata[3]); 
 
!     /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
!     rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
!     rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);     rdata[3].buffer = newbuf;
rdata[3].buffer_std= true;     rdata[3].next = NULL;
 
--- 4526,4537 ----     rdata[2].buffer_std = true;     rdata[2].next = &(rdata[3]); 
!     /*
!      * PG73FORMAT: write bitmap [+ padding] [+ oid] + data follows .........
!      * OR PG93FORMAT [If encoded]: LZ header + Encoded data follows
!      */
!     rdata[3].data = newtupdata;
!     rdata[3].len = newtuplen;     rdata[3].buffer = newbuf;     rdata[3].buffer_std = true;     rdata[3].next =
NULL;
***************
*** 5274,5280 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)
--- 5307,5316 ----     Page        page;     OffsetNumber offnum;     ItemId        lp = NULL;
+     HeapTupleData newtup;
+     HeapTupleData oldtup;     HeapTupleHeader htup;
+     HeapTupleHeader oldtupdata = NULL;     struct     {         HeapTupleHeaderData hdr;
***************
*** 5289,5295 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)      * The visibility map may
needto be fixed even if the heap page is      * already up-to-date.      */
 
!     if (xlrec->all_visible_cleared)     {         Relation    reln = CreateFakeRelcacheEntry(xlrec->target.node);
   BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
 
--- 5325,5331 ----      * The visibility map may need to be fixed even if the heap page is      * already up-to-date.
  */
 
!     if (xlrec->flags & XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED)     {         Relation    reln =
CreateFakeRelcacheEntry(xlrec->target.node);        BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
 
***************
*** 5349,5355 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)     if
(PageGetMaxOffsetNumber(page)< offnum || !ItemIdIsNormal(lp))         elog(PANIC, "heap_update_redo: invalid lp"); 
 
!     htup = (HeapTupleHeader) PageGetItem(page, lp);      htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
        HEAP_XMAX_INVALID |
 
--- 5385,5391 ----     if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp))         elog(PANIC,
"heap_update_redo:invalid lp"); 
 
!     oldtupdata = htup = (HeapTupleHeader) PageGetItem(page, lp);      htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
                     HEAP_XMAX_INVALID |
 
***************
*** 5368,5374 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)     /* Mark the page as a
candidatefor pruning */     PageSetPrunable(page, record->xl_xid); 
 
!     if (xlrec->all_visible_cleared)         PageClearAllVisible(page);      /*
--- 5404,5410 ----     /* Mark the page as a candidate for pruning */     PageSetPrunable(page, record->xl_xid); 
!     if (xlrec->flags & XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED)         PageClearAllVisible(page);      /*
***************
*** 5393,5399 **** newt:;      * The visibility map may need to be fixed even if the heap page is      * already
up-to-date.     */
 
!     if (xlrec->new_all_visible_cleared)     {         Relation    reln = CreateFakeRelcacheEntry(xlrec->target.node);
       BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
 
--- 5429,5435 ----      * The visibility map may need to be fixed even if the heap page is      * already up-to-date.
  */
 
!     if (xlrec->flags & XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED)     {         Relation    reln =
CreateFakeRelcacheEntry(xlrec->target.node);        BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
 
***************
*** 5456,5465 **** newsame:;            SizeOfHeapHeader);     htup = &tbuf.hdr;     MemSet((char *) htup, 0,
sizeof(HeapTupleHeaderData));
!     /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
!     memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
!            (char *) xlrec + hsize,
!            newlen);     newlen += offsetof(HeapTupleHeaderData, t_bits);     htup->t_infomask2 = xlhdr.t_infomask2;
 htup->t_infomask = xlhdr.t_infomask;
 
--- 5492,5520 ----            SizeOfHeapHeader);     htup = &tbuf.hdr;     MemSet((char *) htup, 0,
sizeof(HeapTupleHeaderData));
! 
!     /*
!      * If the new tuple was delta-encoded, decode it.
!      */
!     if (xlrec->flags & XL_HEAP_UPDATE_DELTA_ENCODED)
!     {
!         /* PG93FORMAT: LZ header + Encoded data */
!         PGLZ_Header *encoded_data = (PGLZ_Header *) (((char *) xlrec) + hsize);
! 
!         oldtup.t_data = oldtupdata;
!         newtup.t_data = htup;
! 
!         heap_delta_decode(encoded_data, &oldtup, &newtup);
!         newlen = newtup.t_len;
!     }
!     else
!     {
!         /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
!         memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
!                (char *) xlrec + hsize,
!                newlen);
!     }
!      newlen += offsetof(HeapTupleHeaderData, t_bits);     htup->t_infomask2 = xlhdr.t_infomask2;     htup->t_infomask
=xlhdr.t_infomask;
 
***************
*** 5474,5480 **** newsame:;     if (offnum == InvalidOffsetNumber)         elog(PANIC, "heap_update_redo: failed to
addtuple"); 
 
!     if (xlrec->new_all_visible_cleared)         PageClearAllVisible(page);      freespace =
PageGetHeapFreeSpace(page);       /* needed to update FSM below */
 
--- 5529,5535 ----     if (offnum == InvalidOffsetNumber)         elog(PANIC, "heap_update_redo: failed to add tuple");

!     if (xlrec->flags & XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED)         PageClearAllVisible(page);      freespace =
PageGetHeapFreeSpace(page);       /* needed to update FSM below */
 
*** a/src/backend/utils/adt/pg_lzcompress.c
--- b/src/backend/utils/adt/pg_lzcompress.c
***************
*** 182,190 ****  */ #define PGLZ_HISTORY_LISTS        8192    /* must be power of 2 */ #define PGLZ_HISTORY_MASK
(PGLZ_HISTORY_LISTS - 1)
 
- #define PGLZ_HISTORY_SIZE        4096
- #define PGLZ_MAX_MATCH            273
-   /* ----------  * PGLZ_HistEntry -
--- 182,187 ----
***************
*** 302,368 **** do {                                    \             }
               \ } while (0) 
 
- 
- /* ----------
-  * pglz_out_ctrl -
-  *
-  *        Outputs the last and allocates a new control byte if needed.
-  * ----------
-  */
- #define pglz_out_ctrl(__ctrlp,__ctrlb,__ctrl,__buf) \
- do { \
-     if ((__ctrl & 0xff) == 0)                                                \
-     {                                                                        \
-         *(__ctrlp) = __ctrlb;                                                \
-         __ctrlp = (__buf)++;                                                \
-         __ctrlb = 0;                                                        \
-         __ctrl = 1;                                                            \
-     }                                                                        \
- } while (0)
- 
- 
- /* ----------
-  * pglz_out_literal -
-  *
-  *        Outputs a literal byte to the destination buffer including the
-  *        appropriate control bit.
-  * ----------
-  */
- #define pglz_out_literal(_ctrlp,_ctrlb,_ctrl,_buf,_byte) \
- do { \
-     pglz_out_ctrl(_ctrlp,_ctrlb,_ctrl,_buf);                                \
-     *(_buf)++ = (unsigned char)(_byte);                                        \
-     _ctrl <<= 1;                                                            \
- } while (0)
- 
- 
- /* ----------
-  * pglz_out_tag -
-  *
-  *        Outputs a backward reference tag of 2-4 bytes (depending on
-  *        offset and length) to the destination buffer including the
-  *        appropriate control bit.
-  * ----------
-  */
- #define pglz_out_tag(_ctrlp,_ctrlb,_ctrl,_buf,_len,_off) \
- do { \
-     pglz_out_ctrl(_ctrlp,_ctrlb,_ctrl,_buf);                                \
-     _ctrlb |= _ctrl;                                                        \
-     _ctrl <<= 1;                                                            \
-     if (_len > 17)                                                            \
-     {                                                                        \
-         (_buf)[0] = (unsigned char)((((_off) & 0xf00) >> 4) | 0x0f);        \
-         (_buf)[1] = (unsigned char)(((_off) & 0xff));                        \
-         (_buf)[2] = (unsigned char)((_len) - 18);                            \
-         (_buf) += 3;                                                        \
-     } else {                                                                \
-         (_buf)[0] = (unsigned char)((((_off) & 0xf00) >> 4) | ((_len) - 3)); \
-         (_buf)[1] = (unsigned char)((_off) & 0xff);                            \
-         (_buf) += 2;                                                        \
-     }                                                                        \
- } while (0)
- 
-  /* ----------  * pglz_find_match -  *
--- 299,304 ----
***************
*** 595,601 **** pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,              * Create the tag and add
historyentries for all matched              * characters.              */
 
!             pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off);             while (match_len--)             {
               pglz_hist_add(hist_start, hist_entries,
 
--- 531,537 ----              * Create the tag and add history entries for all matched              * characters.
      */
 
!             pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off, dp);             while (match_len--)
 {                 pglz_hist_add(hist_start, hist_entries,
 
***************
*** 647,661 **** pglz_compress(const char *source, int32 slen, PGLZ_Header *dest, void pglz_decompress(const
PGLZ_Header*source, char *dest) {     const unsigned char *sp;     const unsigned char *srcend;     unsigned char *dp;
  unsigned char *destend;      sp = ((const unsigned char *) source) + sizeof(PGLZ_Header);
 
!     srcend = ((const unsigned char *) source) + VARSIZE(source);     dp = (unsigned char *) dest;
!     destend = dp + source->rawsize;      while (sp < srcend && dp < destend)     {
--- 583,620 ---- void pglz_decompress(const PGLZ_Header *source, char *dest) {
+     pglz_decompress_with_history((char *) source, dest, NULL, NULL);
+ }
+ 
+ /* ----------
+  * pglz_decompress_with_history -
+  *
+  *        Decompresses source into dest.
+  *        To decompress, it uses history if provided.
+  * ----------
+  */
+ void
+ pglz_decompress_with_history(const char *source, char *dest, uint32 *destlen,
+                              const char *history)
+ {
+     PGLZ_Header src;     const unsigned char *sp;     const unsigned char *srcend;     unsigned char *dp;
unsignedchar *destend; 
 
+     /* To avoid the unaligned access of PGLZ_Header */
+     memcpy((char *) &src, source, sizeof(PGLZ_Header));
+      sp = ((const unsigned char *) source) + sizeof(PGLZ_Header);
!     srcend = ((const unsigned char *) source) + VARSIZE(&src);     dp = (unsigned char *) dest;
!     destend = dp + src.rawsize;
! 
!     if (destlen)
!     {
!         *destlen = src.rawsize;
!     }      while (sp < srcend && dp < destend)     {
***************
*** 699,714 **** pglz_decompress(const PGLZ_Header *source, char *dest)                     break;                 } 
!                 /*
!                  * Now we copy the bytes specified by the tag from OUTPUT to
!                  * OUTPUT. It is dangerous and platform dependent to use
!                  * memcpy() here, because the copied areas could overlap
!                  * extremely!
!                  */
!                 while (len--)                 {
!                     *dp = dp[-off];
!                     dp++;                 }             }             else
--- 658,685 ----                     break;                 } 
!                 if (history)
!                 {
!                     /*
!                      * Now we copy the bytes specified by the tag from history to
!                      * OUTPUT.
!                      */
!                     memcpy(dp, history + off, len);
!                     dp += len;
!                 }
!                 else                 {
!                     /*
!                      * Now we copy the bytes specified by the tag from OUTPUT to
!                      * OUTPUT. It is dangerous and platform dependent to use
!                      * memcpy() here, because the copied areas could overlap
!                      * extremely!
!                      */
!                     while (len--)
!                     {
!                         *dp = dp[-off];
!                         dp++;
!                     }                 }             }             else
*** a/src/include/access/heapam_xlog.h
--- b/src/include/access/heapam_xlog.h
***************
*** 142,153 **** typedef struct xl_heap_update {     xl_heaptid    target;            /* deleted tuple id */
ItemPointerDatanewtid;        /* new inserted tuple id */
 
!     bool        all_visible_cleared;    /* PD_ALL_VISIBLE was cleared */
!     bool        new_all_visible_cleared;        /* same for the page of newtid */     /* NEW TUPLE xl_heap_header AND
TUPLEDATA FOLLOWS AT END OF STRUCT */ } xl_heap_update; 
 
! #define SizeOfHeapUpdate    (offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool))  /*  * This is what we
needto know about vacuum page cleanup/redirect
 
--- 142,161 ---- {     xl_heaptid    target;            /* deleted tuple id */     ItemPointerData newtid;        /*
newinserted tuple id */
 
!     char        flags;            /* flag bits, see below */
!      /* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */ } xl_heap_update; 
! 
! #define XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED        0x01 /* Indicates as old page's
!                                                 all visible bit is cleared */
! #define XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED    0x02 /* Indicates as new page's
!                                                 all visible bit is cleared */
! #define XL_HEAP_UPDATE_DELTA_ENCODED            0x04 /* Indicates as the update
!                                                 operation is delta encoded */
! 
! #define SizeOfHeapUpdate    (offsetof(xl_heap_update, flags) + sizeof(char))  /*  * This is what we need to know
aboutvacuum page cleanup/redirect
 
*** a/src/include/access/htup_details.h
--- b/src/include/access/htup_details.h
***************
*** 18,23 ****
--- 18,24 ---- #include "access/tupdesc.h" #include "access/tupmacs.h" #include "storage/bufpage.h"
+ #include "utils/pg_lzcompress.h"  /*  * MaxTupleAttributeNumber limits the number of (user) columns in a tuple.
***************
*** 528,533 **** struct MinimalTupleData
--- 529,535 ----         HeapTupleHeaderSetOid((tuple)->t_data, (oid))  
+ #if !defined(DISABLE_COMPLEX_MACRO) /* ----------------  *        fastgetattr  *
***************
*** 542,550 **** struct MinimalTupleData  *        lookups, and call nocachegetattr() for the rest.  * ----------------
*/
 
- 
- #if !defined(DISABLE_COMPLEX_MACRO)
-  #define fastgetattr(tup, attnum, tupleDesc, isnull)                    \ (
                        \     AssertMacro((attnum) > 0),                                        \
 
--- 544,549 ----
***************
*** 572,585 **** struct MinimalTupleData             nocachegetattr((tup), (attnum), (tupleDesc))            \
)                                                           \     )
          \ )
 
- #else                            /* defined(DISABLE_COMPLEX_MACRO) */  extern Datum fastgetattr(HeapTuple tup, int
attnum,TupleDesc tupleDesc,             bool *isnull); #endif   /* defined(DISABLE_COMPLEX_MACRO) */ 
 
-  /* ----------------  *        heap_getattr  *
--- 571,626 ----             nocachegetattr((tup), (attnum), (tupleDesc))            \         )
                                   \     )                                                                \
 
+ )                                                                    \
+ 
+ /* ----------------
+  *        fastgetattr_with_len
+  *
+  *        Similar to fastgetattr and fetches the length of the given attribute
+  *        also.
+  * ----------------
+  */
+ #define fastgetattr_with_len(tup, attnum, tupleDesc, isnull, len)    \
+ (                                                                    \
+     AssertMacro((attnum) > 0),                                        \
+     (*(isnull) = false),                                            \
+     HeapTupleNoNulls(tup) ?                                         \
+     (                                                                \
+         (tupleDesc)->attrs[(attnum)-1]->attcacheoff >= 0 ?            \
+         (                                                            \
+             (*(len) = att_getlength(                                \
+                     (tupleDesc)->attrs[(attnum)-1]->attlen,         \
+                     (char *) (tup)->t_data + (tup)->t_data->t_hoff +\
+                     (tupleDesc)->attrs[(attnum)-1]->attcacheoff)),    \
+             fetchatt((tupleDesc)->attrs[(attnum)-1],                \
+                 (char *) (tup)->t_data + (tup)->t_data->t_hoff +    \
+                     (tupleDesc)->attrs[(attnum)-1]->attcacheoff)    \
+         )                                                            \
+         :                                                            \
+             nocachegetattr_with_len((tup), (attnum), (tupleDesc), (len))\
+     )                                                                \
+     :                                                                \
+     (                                                                \
+         att_isnull((attnum)-1, (tup)->t_data->t_bits) ?             \
+         (                                                            \
+             (*(isnull) = true),                                     \
+             (*(len) = 0),                                            \
+             (Datum)NULL                                             \
+         )                                                            \
+         :                                                            \
+         (                                                            \
+             nocachegetattr_with_len((tup), (attnum), (tupleDesc), (len))\
+         )                                                            \
+     )                                                                \ ) 
+ #else                            /* defined(DISABLE_COMPLEX_MACRO) */ extern Datum fastgetattr(HeapTuple tup, int
attnum,TupleDesc tupleDesc,             bool *isnull);
 
+ extern Datum fastgetattr_with_len(HeapTuple tup, int attnum,
+             TupleDesc tupleDesc, bool *isnull, int32 *len); #endif   /* defined(DISABLE_COMPLEX_MACRO) */  /*
---------------- *        heap_getattr  *
 
***************
*** 596,616 **** extern Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,  * ----------------  */
#defineheap_getattr(tup, attnum, tupleDesc, isnull) \     ( \
 
!         ((attnum) > 0) ? \         ( \
!             ((attnum) > (int) HeapTupleHeaderGetNatts((tup)->t_data)) ? \
!             ( \
!                 (*(isnull) = true), \
!                 (Datum)NULL \
!             ) \
!             : \
!                 fastgetattr((tup), (attnum), (tupleDesc), (isnull)) \         ) \         : \
!             heap_getsysattr((tup), (attnum), (tupleDesc), (isnull)) \
!     )   /* prototypes for functions in common/heaptuple.c */ extern Size heap_compute_data_size(TupleDesc tupleDesc,
--- 637,679 ----  * ----------------  */ #define heap_getattr(tup, attnum, tupleDesc, isnull) \
+ ( \
+     ((attnum) > 0) ? \     ( \
!         ((attnum) > (int) HeapTupleHeaderGetNatts((tup)->t_data)) ? \         ( \
!             (*(isnull) = true), \
!             (Datum)NULL \         ) \         : \
!             fastgetattr((tup), (attnum), (tupleDesc), (isnull)) \
!     ) \
!     : \
!         heap_getsysattr((tup), (attnum), (tupleDesc), (isnull)) \
! ) 
+ /* ----------------
+  *        heap_getattr_with_len
+  *
+  *        Similar to heap_getattr and outputs the length of the given attribute.
+  * ----------------
+  */
+ #define heap_getattr_with_len(tup, attnum, tupleDesc, isnull, len) \
+ ( \
+     ((attnum) > 0) ? \
+     ( \
+         ((attnum) > (int) HeapTupleHeaderGetNatts((tup)->t_data)) ? \
+         ( \
+             (*(isnull) = true), \
+             (*(len) = 0), \
+             (Datum)NULL \
+         ) \
+         : \
+             fastgetattr_with_len((tup), (attnum), (tupleDesc), (isnull), (len)) \
+     ) \
+     : \
+         heap_getsysattr((tup), (attnum), (tupleDesc), (isnull)) \
+ )  /* prototypes for functions in common/heaptuple.c */ extern Size heap_compute_data_size(TupleDesc tupleDesc,
***************
*** 620,625 **** extern void heap_fill_tuple(TupleDesc tupleDesc,
--- 683,690 ----                 char *data, Size data_size,                 uint16 *infomask, bits8 *bit); extern bool
heap_attisnull(HeapTupletup, int attnum);
 
+ extern Datum nocachegetattr_with_len(HeapTuple tup, int attnum,
+                TupleDesc att, Size *len); extern Datum nocachegetattr(HeapTuple tup, int attnum,
TupleDescatt); extern Datum heap_getsysattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
 
***************
*** 636,641 **** extern HeapTuple heap_modify_tuple(HeapTuple tuple,
--- 701,714 ---- extern void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc,                   Datum *values,
bool*isnull); 
 
+ extern bool heap_attr_get_length_and_check_equals(TupleDesc tupdesc,
+                         int attrnum, HeapTuple tup1, HeapTuple tup2,
+                         Size *tup1_attr_len, Size *tup2_attr_len);
+ extern bool heap_delta_encode(TupleDesc tupleDesc, HeapTuple oldtup,
+                 HeapTuple newtup, PGLZ_Header *encdata);
+ extern void heap_delta_decode (PGLZ_Header *encdata, HeapTuple oldtup,
+                 HeapTuple newtup);
+  /* these three are deprecated versions of the three above: */ extern HeapTuple heap_formtuple(TupleDesc
tupleDescriptor,               Datum *values, char *nulls);
 
*** a/src/include/access/tupmacs.h
--- b/src/include/access/tupmacs.h
***************
*** 187,192 ****
--- 187,214 ---- )  /*
+  * att_getlength -
+  *            Gets the length of the attribute.
+  */
+ #define att_getlength(attlen, attptr) \
+ ( \
+     ((attlen) > 0) ? \
+     ( \
+         (attlen) \
+     ) \
+     : (((attlen) == -1) ? \
+     ( \
+         VARSIZE_ANY(attptr) \
+     ) \
+     : \
+     ( \
+         AssertMacro((attlen) == -2), \
+         (strlen((char *) (attptr)) + 1) \
+     )) \
+ )
+ 
+ 
+ /*  * store_att_byval is a partial inverse of fetch_att: store a given Datum  * value into a tuple data area at the
specifiedaddress.  However, it only  * handles the byval case, because in typical usage the caller needs to
 
*** a/src/include/utils/pg_lzcompress.h
--- b/src/include/utils/pg_lzcompress.h
***************
*** 23,28 **** typedef struct PGLZ_Header
--- 23,30 ----     int32        rawsize; } PGLZ_Header; 
+ #define PGLZ_HISTORY_SIZE        4096
+ #define PGLZ_MAX_MATCH            273  /* ----------  * PGLZ_MAX_OUTPUT -
***************
*** 86,91 **** typedef struct PGLZ_Strategy
--- 88,198 ----     int32        match_size_drop; } PGLZ_Strategy; 
+ /*
+  * calculate the approximate length required for history encode tag for the
+  * given length
+  */
+ #define PGLZ_GET_HIST_CTRL_BIT_LEN(_len) \
+ ( \
+     ((_len) < 17) ?    (3) : (4 * (1 + ((_len) / PGLZ_MAX_MATCH)))    \
+ )
+ 
+ /* ----------
+  * pglz_out_ctrl -
+  *
+  *        Outputs the last and allocates a new control byte if needed.
+  * ----------
+  */
+ #define pglz_out_ctrl(__ctrlp,__ctrlb,__ctrl,__buf) \
+ do { \
+     if ((__ctrl & 0xff) == 0)                                                \
+     {                                                                        \
+         *(__ctrlp) = __ctrlb;                                                \
+         __ctrlp = (__buf)++;                                                \
+         __ctrlb = 0;                                                        \
+         __ctrl = 1;                                                         \
+     }                                                                        \
+ } while (0)
+ 
+ /* ----------
+  * pglz_out_literal -
+  *
+  *        Outputs a literal byte to the destination buffer including the
+  *        appropriate control bit.
+  * ----------
+  */
+ #define pglz_out_literal(_ctrlp,_ctrlb,_ctrl,_buf,_byte) \
+ do { \
+     pglz_out_ctrl(_ctrlp,_ctrlb,_ctrl,_buf);                                \
+     *(_buf)++ = (unsigned char)(_byte);                                     \
+     _ctrl <<= 1;                                                            \
+ } while (0)
+ 
+ /* ----------
+  * pglz_out_tag -
+  *
+  *        Outputs a backward/history reference tag of 2-4 bytes (depending on
+  *        offset and length) to the destination buffer including the
+  *        appropriate control bit.
+  *
+  *        Split the process of backward/history reference as different chunks,
+  *        if the given lenght is more than max match and repeats the process
+  *        until the given length is processed.
+  *
+  *        If the matched history length is less than 3 bytes then add it as a
+  *        new data only during encoding instead of history reference. This occurs
+  *        only while framing delta record for wal update operation.
+  * ----------
+  */
+ #define pglz_out_tag(_ctrlp,_ctrlb,_ctrl,_buf,_len,_off,_byte) \
+ do { \
+     int _mlen;                                                                    \
+     int _total_len = (_len);                                                    \
+     while (_total_len > 0)                                                        \
+     {                                                                            \
+         _mlen = _total_len > PGLZ_MAX_MATCH ? PGLZ_MAX_MATCH : _total_len;        \
+         if (_mlen < 3)                                                            \
+         {                                                                        \
+             (_byte) = (char *)(_byte) + (_off);                                    \
+             pglz_out_add(_ctrlp,_ctrlb,_ctrl,_buf,_mlen,(_byte));                \
+             break;                                                                \
+         }                                                                        \
+         pglz_out_ctrl(_ctrlp,_ctrlb,_ctrl,_buf);                                \
+         _ctrlb |= _ctrl;                                                        \
+         _ctrl <<= 1;                                                            \
+         if (_mlen > 17)                                                            \
+         {                                                                        \
+             (_buf)[0] = (unsigned char)((((_off) & 0xf00) >> 4) | 0x0f);        \
+             (_buf)[1] = (unsigned char)(((_off) & 0xff));                        \
+             (_buf)[2] = (unsigned char)((_mlen) - 18);                            \
+             (_buf) += 3;                                                        \
+         } else {                                                                \
+             (_buf)[0] = (unsigned char)((((_off) & 0xf00) >> 4) | ((_mlen) - 3)); \
+             (_buf)[1] = (unsigned char)((_off) & 0xff);                            \
+             (_buf) += 2;                                                        \
+         }                                                                        \
+         _total_len -= _mlen;                                                    \
+         (_off) += _mlen;                                                        \
+     }                                                                            \
+ } while (0)
+ 
+ /* ----------
+  * pglz_out_add -
+  *
+  *        Outputs a literal byte to the destination buffer including the
+  *        appropriate control bit until the given input length.
+  * ----------
+  */
+ #define pglz_out_add(_ctrlp,_ctrlb,_ctrl,_buf,_len,_byte) \
+ do { \
+     int32 _total_len = (_len);                                            \
+     while (_total_len-- > 0)                                            \
+     {                                                                    \
+         pglz_out_literal(_ctrlp, _ctrlb, _ctrl, _buf, *(_byte));        \
+         (_byte) = (char *)(_byte) + 1;                                    \
+     }                                                                    \
+ } while (0)
+   /* ----------  * The standard strategies
***************
*** 108,112 **** extern const PGLZ_Strategy *const PGLZ_strategy_always; extern bool pglz_compress(const char *source,
int32slen, PGLZ_Header *dest,               const PGLZ_Strategy *strategy); extern void pglz_decompress(const
PGLZ_Header*source, char *dest);
 
!  #endif   /* _PG_LZCOMPRESS_H_ */
--- 215,220 ---- extern bool pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,               const
PGLZ_Strategy*strategy); extern void pglz_decompress(const PGLZ_Header *source, char *dest);
 
! extern void pglz_decompress_with_history(const char *source, char *dest,
!                 uint32 *destlen, const char *history); #endif   /* _PG_LZCOMPRESS_H_ */
diff --git a/src/backend/access/common/heaptuple.c b/src/backend/access/common/heaptuple.c
index 034dfe5..83bd03d 100644
--- a/src/backend/access/common/heaptuple.c
+++ b/src/backend/access/common/heaptuple.c
@@ -60,6 +60,7 @@#include "access/sysattr.h"#include "access/tuptoaster.h"#include "executor/tuptable.h"
+#include "utils/datum.h"/* Does att's datatype allow packing into the 1-byte-header varlena format? */
@@ -297,12 +298,13 @@ heap_attisnull(HeapTuple tup, int attnum)}/* ----------------
- *        nocachegetattr
+ *        nocachegetattr_with_len *
- *        This only gets called from fastgetattr() macro, in cases where
+ *        This only gets called in cases where *        we can't use a cacheoffset and the value is not null. *
- *        This caches attribute offsets in the attribute descriptor.
+ *        This caches attribute offsets in the attribute descriptor and
+ *        outputs the length of the attribute value. * *        An alternative way to speed things up would be to
cacheoffsets *        with the tuple, but that seems more difficult unless you take
 
@@ -320,9 +322,10 @@ heap_attisnull(HeapTuple tup, int attnum) * ---------------- */Datum
-nocachegetattr(HeapTuple tuple,
-               int attnum,
-               TupleDesc tupleDesc)
+nocachegetattr_with_len(HeapTuple tuple,
+                        int attnum,
+                        TupleDesc tupleDesc,
+                        Size *len){    HeapTupleHeader tup = tuple->t_data;    Form_pg_attribute *att =
tupleDesc->attrs;
@@ -381,6 +384,9 @@ nocachegetattr(HeapTuple tuple,         */        if (att[attnum]->attcacheoff >= 0)        {
+            if (len)
+                *len = att_getlength(att[attnum]->attlen,
+                                     tp + att[attnum]->attcacheoff);            return fetchatt(att[attnum],
                tp + att[attnum]->attcacheoff);        }
 
@@ -507,9 +513,22 @@ nocachegetattr(HeapTuple tuple,        }    }
+    if (len)
+        *len = att_getlength(att[attnum]->attlen, tp + off);    return fetchatt(att[attnum], tp + off);}
+/*
+ *    nocachegetattr
+ */
+Datum
+nocachegetattr(HeapTuple tuple,
+               int attnum,
+               TupleDesc tupleDesc)
+{
+    return nocachegetattr_with_len(tuple, attnum, tupleDesc, NULL);
+}
+/* ---------------- *        heap_getsysattr *
@@ -618,6 +637,379 @@ heap_copytuple_with_tuple(HeapTuple src, HeapTuple dest)}/*
+ * Check if the specified attribute's value is same in both given tuples.
+ * and outputs the length of the given attribute in both tuples.
+ */
+bool
+heap_attr_get_length_and_check_equals(TupleDesc tupdesc, int attrnum,
+                                      HeapTuple tup1, HeapTuple tup2,
+                                    Size *tup1_attr_len, Size *tup2_attr_len)
+{
+    Datum        value1,
+                value2;
+    bool        isnull1,
+                isnull2;
+    Form_pg_attribute att;
+
+    *tup1_attr_len = 0;
+    *tup2_attr_len = 0;
+
+    /*
+     * If it's a whole-tuple reference, say "not equal".  It's not really
+     * worth supporting this case, since it could only succeed after a no-op
+     * update, which is hardly a case worth optimizing for.
+     */
+    if (attrnum == 0)
+        return false;
+
+    /*
+     * Likewise, automatically say "not equal" for any system attribute other
+     * than OID and tableOID; we cannot expect these to be consistent in a HOT
+     * chain, or even to be set correctly yet in the new tuple.
+     */
+    if (attrnum < 0)
+    {
+        if (attrnum != ObjectIdAttributeNumber &&
+            attrnum != TableOidAttributeNumber)
+            return false;
+    }
+
+    /*
+     * Extract the corresponding values.  XXX this is pretty inefficient if
+     * there are many indexed columns.    Should HeapSatisfiesHOTUpdate do a
+     * single heap_deform_tuple call on each tuple, instead?  But that doesn't
+     * work for system columns ...
+     */
+    value1 = heap_getattr_with_len(tup1, attrnum, tupdesc, &isnull1, tup1_attr_len);
+    value2 = heap_getattr_with_len(tup2, attrnum, tupdesc, &isnull2, tup2_attr_len);
+
+    /*
+     * If one value is NULL and other is not, then they are certainly not
+     * equal
+     */
+    if (isnull1 != isnull2)
+        return false;
+
+    /*
+     * If both are NULL, they can be considered equal.
+     */
+    if (isnull1)
+        return true;
+
+    /*
+     * We do simple binary comparison of the two datums.  This may be overly
+     * strict because there can be multiple binary representations for the
+     * same logical value.    But we should be OK as long as there are no false
+     * positives.  Using a type-specific equality operator is messy because
+     * there could be multiple notions of equality in different operator
+     * classes; furthermore, we cannot safely invoke user-defined functions
+     * while holding exclusive buffer lock.
+     */
+    if (attrnum <= 0)
+    {
+        /* The only allowed system columns are OIDs, so do this */
+        return (DatumGetObjectId(value1) == DatumGetObjectId(value2));
+    }
+    else
+    {
+        Assert(attrnum <= tupdesc->natts);
+        att = tupdesc->attrs[attrnum - 1];
+        return datumIsEqual(value1, value2, att->attbyval, att->attlen);
+    }
+}
+
+/* ----------------
+ * heap_delta_encode
+ *        Forms an encoded data from old and new tuple with the modified columns
+ *        using an algorithm similar to LZ algorithm.
+ *
+ *        tupleDesc - Tuple descriptor.
+ *        oldtup - pointer to the old/history tuple.
+ *        newtup - pointer to the new tuple.
+ *        encdata - pointer to the encoded data using lz algorithm.
+ *
+ *        Encode the bitmap [+padding] [+oid] as a new data. And loop for all
+ *        attributes to find any modifications in the attributes.
+ *
+ *        The unmodified data is encoded as a history tag to the output and the
+ *        modifed data is encoded as new data to the output.
+ *
+ *        If the encoded output data is less than 75% of original data,
+ *        The output data is considered as encoded and proceed further.
+ * ----------------
+ */
+bool
+heap_delta_encode(TupleDesc tupleDesc, HeapTuple oldtup, HeapTuple newtup,
+                  PGLZ_Header *encdata)
+{
+    Form_pg_attribute *att = tupleDesc->attrs;
+    int            numberOfAttributes;
+    int32        new_tup_off = 0,
+                old_tup_off = 0,
+                temp_off = 0,
+                match_off = 0,
+                change_off = 0;
+    int            attnum;
+    int32        data_len,
+                old_tup_pad_len,
+                new_tup_pad_len;
+    Size        old_tup_attr_len,
+                new_tup_attr_len;
+    bool        is_attr_equals = true;
+    unsigned char *bp = (unsigned char *) encdata + sizeof(PGLZ_Header);
+    unsigned char *bstart = bp;
+    char       *dp = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
+    char       *dstart = dp;
+    char       *history;
+    unsigned char ctrl_dummy = 0;
+    unsigned char *ctrlp = &ctrl_dummy;
+    unsigned char ctrlb = 0;
+    unsigned char ctrl = 0;
+    int32        len,
+                old_tup_bitmaplen,
+                new_tup_bitmaplen,
+                new_tup_len;
+    int32        result_size;
+    int32        result_max;
+
+    /* Include the bitmap header in the lz encoded data. */
+    history = (char *) oldtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
+    old_tup_bitmaplen = oldtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+    new_tup_bitmaplen = newtup->t_data->t_hoff - offsetof(HeapTupleHeaderData, t_bits);
+    new_tup_len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+
+    /*
+     * The maximum encoded data is of 75% of total size. The max tuple size is
+     * already validated as it cannot be more than MaxHeapTupleSize.
+     */
+    result_max = (new_tup_len * 75) / 100;
+    encdata->rawsize = new_tup_len;
+
+    /*
+     * Check for output buffer is reached the result_max by advancing the
+     * buffer by the calculated aproximate length for the corresponding
+     * operation.
+     */
+    if ((bp + (2 * new_tup_bitmaplen)) - bstart >= result_max)
+        return false;
+
+    /* Copy the bitmap data from new tuple to the encoded data buffer */
+    pglz_out_add(ctrlp, ctrlb, ctrl, bp, new_tup_bitmaplen, dp);
+    dstart = dp;
+
+    numberOfAttributes = HeapTupleHeaderGetNatts(newtup->t_data);
+    for (attnum = 1; attnum <= numberOfAttributes; attnum++)
+    {
+        /*
+         * If the attribute is modified by the update operation, store the
+         * appropiate offsets in the WAL record, otherwise skip to the next
+         * attribute.
+         */
+        if (!heap_attr_get_length_and_check_equals(tupleDesc, attnum, oldtup,
+                               newtup, &old_tup_attr_len, &new_tup_attr_len))
+        {
+            is_attr_equals = false;
+            data_len = old_tup_off - match_off;
+
+            /*
+             * Check for output buffer is reached the result_max by advancing
+             * the buffer by the calculated aproximate length for the
+             * corresponding operation.
+             */
+            len = PGLZ_GET_HIST_CTRL_BIT_LEN(data_len);
+            if ((bp + len) - bstart >= result_max)
+                return false;
+
+            /*
+             * The match_off value is calculated w.r.t to the tuple t_hoff
+             * value, the bit map len needs to be added to match_off to get
+             * the actual start offfset from the old/history tuple.
+             */
+            match_off += old_tup_bitmaplen;
+
+            /*
+             * If any unchanged data presents in the old and new tuples then
+             * encode the data as it needs to copy from history tuple with len
+             * and offset.
+             */
+            pglz_out_tag(ctrlp, ctrlb, ctrl, bp, data_len, match_off, history);
+
+            /*
+             * Recalculate the old and new tuple offsets based on padding
+             * present in the tuples
+             */
+            if (!HeapTupleHasNulls(oldtup)
+                || !att_isnull((attnum - 1), oldtup->t_data->t_bits))
+            {
+                old_tup_off = att_align_pointer(old_tup_off,
+                                                att[attnum - 1]->attalign,
+                                                att[attnum - 1]->attlen,
+                                                (char *) oldtup->t_data + oldtup->t_data->t_hoff + old_tup_off);
+            }
+
+            if (!HeapTupleHasNulls(newtup)
+                || !att_isnull((attnum - 1), newtup->t_data->t_bits))
+            {
+                new_tup_off = att_align_pointer(new_tup_off,
+                                                att[attnum - 1]->attalign,
+                                                att[attnum - 1]->attlen,
+                                                (char *) newtup->t_data + newtup->t_data->t_hoff + new_tup_off);
+            }
+
+            old_tup_off += old_tup_attr_len;
+            new_tup_off += new_tup_attr_len;
+
+            match_off = old_tup_off;
+        }
+        else
+        {
+            /*
+             * Check for output buffer is reached the result_max by advancing
+             * the buffer by the calculated aproximate length for the
+             * corresponding operation.
+             */
+            data_len = new_tup_off - change_off;
+            if ((bp + (2 * data_len)) - bstart >= result_max)
+                return false;
+
+            /* Copy the modified column data to the output buffer if present */
+            pglz_out_add(ctrlp, ctrlb, ctrl, bp, data_len, dp);
+
+            /*
+             * calculate the old tuple field start position, required to
+             * ignore if any alignmet is present.
+             */
+            if (!HeapTupleHasNulls(oldtup)
+                || !att_isnull((attnum - 1), oldtup->t_data->t_bits))
+            {
+                temp_off = old_tup_off;
+                old_tup_off = att_align_pointer(old_tup_off,
+                                                att[attnum - 1]->attalign,
+                                                att[attnum - 1]->attlen,
+                                                (char *) oldtup->t_data + oldtup->t_data->t_hoff + old_tup_off);
+
+                old_tup_pad_len = old_tup_off - temp_off;
+
+                /*
+                 * calculate the new tuple field start position to check
+                 * whether any padding is required or not because field
+                 * alignment.
+                 */
+                temp_off = new_tup_off;
+                new_tup_off = att_align_pointer(new_tup_off,
+                                                att[attnum - 1]->attalign,
+                                                att[attnum - 1]->attlen,
+                                                (char *) newtup->t_data + newtup->t_data->t_hoff + new_tup_off);
+                new_tup_pad_len = new_tup_off - temp_off;
+
+                /*
+                 * Checking for that is there any alignment difference between
+                 * old and new tuple attributes.
+                 */
+                if (old_tup_pad_len != new_tup_pad_len)
+                {
+                    /*
+                     * If the alignment difference is found between old and
+                     * new tuples and the last attribute value of the new
+                     * tuple is same as old tuple then write the encode as
+                     * history data until the current match.
+                     *
+                     * If the last attribute value of new tuple is not same as
+                     * old tuple then the matched data marking as history is
+                     * already taken care.
+                     */
+                    if (is_attr_equals)
+                    {
+                        /*
+                         * Check for output buffer is reached the result_max
+                         * by advancing the buffer by the calculated
+                         * aproximate length for the corresponding operation.
+                         */
+                        data_len = old_tup_off - old_tup_pad_len - match_off;
+                        len = PGLZ_GET_HIST_CTRL_BIT_LEN(data_len);
+                        if ((bp + len) - bstart >= result_max)
+                            return false;
+
+                        match_off += old_tup_bitmaplen;
+                        pglz_out_tag(ctrlp, ctrlb, ctrl, bp, data_len, match_off, history);
+                    }
+
+                    match_off = old_tup_off;
+
+                    /* Alignment data */
+                    if ((bp + (2 * new_tup_pad_len)) - bstart >= result_max)
+                        return false;
+
+                    pglz_out_add(ctrlp, ctrlb, ctrl, bp, new_tup_pad_len, dp);
+                }
+            }
+
+            old_tup_off += old_tup_attr_len;
+            new_tup_off += new_tup_attr_len;
+
+            change_off = new_tup_off;
+
+            /*
+             * Recalculate the destination pointer with the new offset which
+             * is used while copying the modified data.
+             */
+            dp = dstart + new_tup_off;
+            is_attr_equals = true;
+        }
+    }
+
+    /* If any modified column data presents then copy it. */
+    data_len = new_tup_off - change_off;
+    if ((bp + (2 * data_len)) - bstart >= result_max)
+        return false;
+
+    pglz_out_add(ctrlp, ctrlb, ctrl, bp, data_len, dp);
+
+    /* If any left out old tuple data presents then copy it as history */
+    data_len = old_tup_off - match_off;
+    len = PGLZ_GET_HIST_CTRL_BIT_LEN(data_len);
+    if ((bp + len) - bstart >= result_max)
+        return false;
+
+    match_off += old_tup_bitmaplen;
+    pglz_out_tag(ctrlp, ctrlb, ctrl, bp, data_len, match_off, history);
+
+    /*
+     * Write out the last control byte and check that we haven't overrun the
+     * output size allowed by the strategy.
+     */
+    *ctrlp = ctrlb;
+
+    result_size = bp - bstart;
+    if (result_size >= result_max)
+        return false;
+
+    /*
+     * Success - need only fill in the actual length of the compressed datum.
+     */
+    SET_VARSIZE_COMPRESSED(encdata, result_size + sizeof(PGLZ_Header));
+    return true;
+}
+
+/* ----------------
+ * heap_delta_decode
+ *        Decodes the encoded data to dest tuple with the help of history.
+ *
+ *        encdata - Pointer to the encoded data.
+ *        oldtup - pointer to the history tuple.
+ *        newtup - pointer to the destination tuple.
+ * ----------------
+ */
+void
+heap_delta_decode(PGLZ_Header *encdata, HeapTuple oldtup, HeapTuple newtup)
+{
+    return pglz_decompress_with_history((char *) encdata,
+             (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+                                        &newtup->t_len,
+            (char *) oldtup->t_data + offsetof(HeapTupleHeaderData, t_bits));
+}
+
+/* * heap_form_tuple *        construct a tuple from the given values[] and isnull[] arrays, *        which are of the
lengthindicated by tupleDescriptor->natts
 
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 186fb87..46a0d26 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -85,6 +85,7 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionIdxid, CommandId cid, int options);static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
    ItemPointerData from, Buffer newbuf, HeapTuple newtup,
 
+                HeapTuple oldtup,                bool all_visible_cleared, bool new_all_visible_cleared);static bool
HeapSatisfiesHOTUpdate(Relationrelation, Bitmapset *hot_attrs,                       HeapTuple oldtup, HeapTuple
newtup);
@@ -857,6 +858,54 @@ heapgettup_pagemode(HeapScanDesc scan, * definition in access/htup.h is maintained. */Datum
+fastgetattr_with_len(HeapTuple tup, int attnum, TupleDesc tupleDesc,
+                     bool *isnull, int32 *len)
+{
+    return (
+            (attnum) > 0 ?
+            (
+             (*(isnull) = false),
+             HeapTupleNoNulls(tup) ?
+             (
+              (tupleDesc)->attrs[(attnum) - 1]->attcacheoff >= 0 ?
+              (
+            (*(len) = att_getlength((tupleDesc)->attrs[(attnum - 1)]->attlen,
+                             (char *) (tup)->t_data + (tup)->t_data->t_hoff +
+                             (tupleDesc)->attrs[(attnum) - 1]->attcacheoff)),
+               fetchatt((tupleDesc)->attrs[(attnum) - 1],
+                        (char *) (tup)->t_data + (tup)->t_data->t_hoff +
+                        (tupleDesc)->attrs[(attnum) - 1]->attcacheoff)
+               )
+              :
+              (
+               nocachegetattr_with_len(tup), (attnum), (tupleDesc), (len))
+              )
+             :
+             (
+              att_isnull((attnum) - 1, (tup)->t_data->t_bits) ?
+              (
+               (*(isnull) = true),
+               (*(len) = 0),
+               (Datum) NULL
+               )
+              :
+              (
+               nocachegetattr_with_len((tup), (attnum), (tupleDesc), (len))
+               )
+              )
+             )
+            :
+            (
+             (Datum) NULL
+             )
+        );
+}
+
+/*
+ * This is formatted so oddly so that the correspondence to the macro
+ * definition in access/htup.h is maintained.
+ */
+Datumfastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,            bool *isnull){
@@ -873,7 +922,8 @@ fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
(tupleDesc)->attrs[(attnum)- 1]->attcacheoff)               )              :
 
-              nocachegetattr((tup), (attnum), (tupleDesc))
+              (
+               nocachegetattr(tup), (attnum), (tupleDesc))              )             :             (
@@ -2400,7 +2450,7 @@ simple_heap_insert(Relation relation, HeapTuple tup)HTSU_Resultheap_delete(Relation relation,
ItemPointertid,            CommandId cid, Snapshot crosscheck, bool wait,
 
-            HeapUpdateFailureData *hufd)
+            HeapUpdateFailureData * hufd){    HTSU_Result result;    TransactionId xid = GetCurrentTransactionId();
@@ -2702,7 +2752,7 @@ simple_heap_delete(Relation relation, ItemPointer tid)    result = heap_delete(relation, tid,
                   GetCurrentCommandId(true), InvalidSnapshot,
 
-                         true /* wait for commit */,
+                         true /* wait for commit */ ,                         &hufd);    switch (result)    {
@@ -2759,7 +2809,7 @@ simple_heap_delete(Relation relation, ItemPointer tid)HTSU_Resultheap_update(Relation relation,
ItemPointerotid, HeapTuple newtup,            CommandId cid, Snapshot crosscheck, bool wait,
 
-            HeapUpdateFailureData *hufd)
+            HeapUpdateFailureData * hufd){    HTSU_Result result;    TransactionId xid = GetCurrentTransactionId();
@@ -3229,10 +3279,12 @@ l2:    /* XLOG stuff */    if (RelationNeedsWAL(relation))    {
-        XLogRecPtr    recptr = log_heap_update(relation, buffer, oldtup.t_self,
-                                             newbuf, heaptup,
-                                             all_visible_cleared,
-                                             all_visible_cleared_new);
+        XLogRecPtr    recptr;
+
+        recptr = log_heap_update(relation, buffer, oldtup.t_self,
+                                 newbuf, heaptup, &oldtup,
+                                 all_visible_cleared,
+                                 all_visible_cleared_new);        if (newbuf != buffer)        {
@@ -3299,74 +3351,11 @@ static boolheap_tuple_attr_equals(TupleDesc tupdesc, int attrnum,
HeapTupletup1, HeapTuple tup2){
 
-    Datum        value1,
-                value2;
-    bool        isnull1,
-                isnull2;
-    Form_pg_attribute att;
+    Size        tup1_attr_len,
+                tup2_attr_len;
-    /*
-     * If it's a whole-tuple reference, say "not equal".  It's not really
-     * worth supporting this case, since it could only succeed after a no-op
-     * update, which is hardly a case worth optimizing for.
-     */
-    if (attrnum == 0)
-        return false;
-
-    /*
-     * Likewise, automatically say "not equal" for any system attribute other
-     * than OID and tableOID; we cannot expect these to be consistent in a HOT
-     * chain, or even to be set correctly yet in the new tuple.
-     */
-    if (attrnum < 0)
-    {
-        if (attrnum != ObjectIdAttributeNumber &&
-            attrnum != TableOidAttributeNumber)
-            return false;
-    }
-
-    /*
-     * Extract the corresponding values.  XXX this is pretty inefficient if
-     * there are many indexed columns.    Should HeapSatisfiesHOTUpdate do a
-     * single heap_deform_tuple call on each tuple, instead?  But that doesn't
-     * work for system columns ...
-     */
-    value1 = heap_getattr(tup1, attrnum, tupdesc, &isnull1);
-    value2 = heap_getattr(tup2, attrnum, tupdesc, &isnull2);
-
-    /*
-     * If one value is NULL and other is not, then they are certainly not
-     * equal
-     */
-    if (isnull1 != isnull2)
-        return false;
-
-    /*
-     * If both are NULL, they can be considered equal.
-     */
-    if (isnull1)
-        return true;
-
-    /*
-     * We do simple binary comparison of the two datums.  This may be overly
-     * strict because there can be multiple binary representations for the
-     * same logical value.    But we should be OK as long as there are no false
-     * positives.  Using a type-specific equality operator is messy because
-     * there could be multiple notions of equality in different operator
-     * classes; furthermore, we cannot safely invoke user-defined functions
-     * while holding exclusive buffer lock.
-     */
-    if (attrnum <= 0)
-    {
-        /* The only allowed system columns are OIDs, so do this */
-        return (DatumGetObjectId(value1) == DatumGetObjectId(value2));
-    }
-    else
-    {
-        Assert(attrnum <= tupdesc->natts);
-        att = tupdesc->attrs[attrnum - 1];
-        return datumIsEqual(value1, value2, att->attbyval, att->attlen);
-    }
+    return heap_attr_get_length_and_check_equals(tupdesc, attrnum, tup1, tup2,
+                                             &tup1_attr_len, &tup2_attr_len);}/*
@@ -3417,7 +3406,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)    result =
heap_update(relation,otid, tup,                         GetCurrentCommandId(true), InvalidSnapshot,
 
-                         true /* wait for commit */,
+                         true /* wait for commit */ ,                         &hufd);    switch (result)    {
@@ -3504,7 +3493,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple
tup)HTSU_Resultheap_lock_tuple(Relationrelation, HeapTuple tuple,                CommandId cid, LockTupleMode mode,
boolnowait,
 
-                Buffer *buffer, HeapUpdateFailureData *hufd)
+                Buffer *buffer, HeapUpdateFailureData * hufd){    HTSU_Result result;    ItemPointer tid =
&(tuple->t_self);
@@ -4464,7 +4453,7 @@ log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer, */static
XLogRecPtrlog_heap_update(Relationreln, Buffer oldbuf, ItemPointerData from,
 
-                Buffer newbuf, HeapTuple newtup,
+                Buffer newbuf, HeapTuple newtup, HeapTuple oldtup,                bool all_visible_cleared, bool
new_all_visible_cleared){   xl_heap_update xlrec;
 
@@ -4473,6 +4462,17 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,    XLogRecPtr    recptr;
XLogRecDatardata[4];    Page        page = BufferGetPage(newbuf);
 
+    char       *newtupdata;
+    int            newtuplen;
+    int            oldtuplen;
+    bool        compressed = false;
+
+    /* Structure which holds max output possible from the LZ algorithm */
+    struct
+    {
+        PGLZ_Header pglzheader;
+        char        buf[MaxHeapTupleSize];
+    }            buf;    /* Caller should not call me on a non-WAL-logged relation */
Assert(RelationNeedsWAL(reln));
@@ -4482,11 +4482,41 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,    else        info =
XLOG_HEAP_UPDATE;
+    newtupdata = ((char *) newtup->t_data) + offsetof(HeapTupleHeaderData, t_bits);
+    newtuplen = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+    oldtuplen = oldtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+
+    /* Is the update is going to the same page? */
+    if (oldbuf == newbuf)
+    {
+        /*
+         * LZ algorithm can hold only history offset in the range of 1 - 4095.
+         * so the delta encode is restricted for the tuples with length more
+         * than PGLZ_HISTORY_SIZE.
+         */
+        if (oldtuplen < PGLZ_HISTORY_SIZE)
+        {
+            /* Delta-encode the new tuple using the old tuple */
+            if (heap_delta_encode(reln->rd_att, oldtup, newtup,
+                                  &buf.pglzheader))
+            {
+                compressed = true;
+                newtupdata = (char *) &buf.pglzheader;
+                newtuplen = VARSIZE(&buf.pglzheader);
+            }
+        }
+    }
+
+    xlrec.flags = 0;    xlrec.target.node = reln->rd_node;    xlrec.target.tid = from;
-    xlrec.all_visible_cleared = all_visible_cleared;
+    if (all_visible_cleared)
+        xlrec.flags |= XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED;    xlrec.newtid = newtup->t_self;
-    xlrec.new_all_visible_cleared = new_all_visible_cleared;
+    if (new_all_visible_cleared)
+        xlrec.flags |= XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED;
+    if (compressed)
+        xlrec.flags |= XL_HEAP_UPDATE_DELTA_ENCODED;    rdata[0].data = (char *) &xlrec;    rdata[0].len =
SizeOfHeapUpdate;
@@ -4513,9 +4543,12 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,    rdata[2].buffer_std =
true;   rdata[2].next = &(rdata[3]);
 
-    /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
-    rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, t_bits);
-    rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+    /*
+     * PG73FORMAT: write bitmap [+ padding] [+ oid] + data follows .........
+     * OR PG93FORMAT [If encoded]: LZ header + Encoded data follows
+     */
+    rdata[3].data = newtupdata;
+    rdata[3].len = newtuplen;    rdata[3].buffer = newbuf;    rdata[3].buffer_std = true;    rdata[3].next = NULL;
@@ -5291,7 +5324,10 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)    Page        page;
OffsetNumberoffnum;    ItemId        lp = NULL;
 
+    HeapTupleData newtup;
+    HeapTupleData oldtup;    HeapTupleHeader htup;
+    HeapTupleHeader oldtupdata = NULL;    struct    {        HeapTupleHeaderData hdr;
@@ -5306,7 +5342,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)     * The visibility map
mayneed to be fixed even if the heap page is     * already up-to-date.     */
 
-    if (xlrec->all_visible_cleared)
+    if (xlrec->flags & XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED)    {        Relation    reln =
CreateFakeRelcacheEntry(xlrec->target.node);       BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid);
 
@@ -5366,7 +5402,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)    if
(PageGetMaxOffsetNumber(page)< offnum || !ItemIdIsNormal(lp))        elog(PANIC, "heap_update_redo: invalid lp");
 
-    htup = (HeapTupleHeader) PageGetItem(page, lp);
+    oldtupdata = htup = (HeapTupleHeader) PageGetItem(page, lp);    htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
                 HEAP_XMAX_INVALID |
 
@@ -5385,7 +5421,7 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update)    /* Mark the page as a
candidatefor pruning */    PageSetPrunable(page, record->xl_xid);
 
-    if (xlrec->all_visible_cleared)
+    if (xlrec->flags & XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED)        PageClearAllVisible(page);    /*
@@ -5410,7 +5446,7 @@ newt:;     * The visibility map may need to be fixed even if the heap page is     * already
up-to-date.    */
 
-    if (xlrec->new_all_visible_cleared)
+    if (xlrec->flags & XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED)    {        Relation    reln =
CreateFakeRelcacheEntry(xlrec->target.node);       BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid);
 
@@ -5473,10 +5509,29 @@ newsame:;           SizeOfHeapHeader);    htup = &tbuf.hdr;    MemSet((char *) htup, 0,
sizeof(HeapTupleHeaderData));
-    /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
-    memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
-           (char *) xlrec + hsize,
-           newlen);
+
+    /*
+     * If the new tuple was delta-encoded, decode it.
+     */
+    if (xlrec->flags & XL_HEAP_UPDATE_DELTA_ENCODED)
+    {
+        /* PG93FORMAT: LZ header + Encoded data */
+        PGLZ_Header *encoded_data = (PGLZ_Header *) (((char *) xlrec) + hsize);
+
+        oldtup.t_data = oldtupdata;
+        newtup.t_data = htup;
+
+        heap_delta_decode(encoded_data, &oldtup, &newtup);
+        newlen = newtup.t_len;
+    }
+    else
+    {
+        /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+        memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
+               (char *) xlrec + hsize,
+               newlen);
+    }
+    newlen += offsetof(HeapTupleHeaderData, t_bits);    htup->t_infomask2 = xlhdr.t_infomask2;    htup->t_infomask =
xlhdr.t_infomask;
@@ -5491,7 +5546,7 @@ newsame:;    if (offnum == InvalidOffsetNumber)        elog(PANIC, "heap_update_redo: failed to
addtuple");
 
-    if (xlrec->new_all_visible_cleared)
+    if (xlrec->flags & XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED)        PageClearAllVisible(page);    freespace =
PageGetHeapFreeSpace(page);       /* needed to update FSM below */
 
diff --git a/src/backend/utils/adt/pg_lzcompress.c b/src/backend/utils/adt/pg_lzcompress.c
index 466982e..d836b51 100644
--- a/src/backend/utils/adt/pg_lzcompress.c
+++ b/src/backend/utils/adt/pg_lzcompress.c
@@ -182,9 +182,6 @@ */#define PGLZ_HISTORY_LISTS        8192    /* must be power of 2 */#define PGLZ_HISTORY_MASK
(PGLZ_HISTORY_LISTS - 1)
 
-#define PGLZ_HISTORY_SIZE        4096
-#define PGLZ_MAX_MATCH            273
-/* ---------- * PGLZ_HistEntry -
@@ -302,67 +299,6 @@ do {                                    \            }
                  \} while (0)
 
-
-/* ----------
- * pglz_out_ctrl -
- *
- *        Outputs the last and allocates a new control byte if needed.
- * ----------
- */
-#define pglz_out_ctrl(__ctrlp,__ctrlb,__ctrl,__buf) \
-do { \
-    if ((__ctrl & 0xff) == 0)                                                \
-    {                                                                        \
-        *(__ctrlp) = __ctrlb;                                                \
-        __ctrlp = (__buf)++;                                                \
-        __ctrlb = 0;                                                        \
-        __ctrl = 1;                                                            \
-    }                                                                        \
-} while (0)
-
-
-/* ----------
- * pglz_out_literal -
- *
- *        Outputs a literal byte to the destination buffer including the
- *        appropriate control bit.
- * ----------
- */
-#define pglz_out_literal(_ctrlp,_ctrlb,_ctrl,_buf,_byte) \
-do { \
-    pglz_out_ctrl(_ctrlp,_ctrlb,_ctrl,_buf);                                \
-    *(_buf)++ = (unsigned char)(_byte);                                        \
-    _ctrl <<= 1;                                                            \
-} while (0)
-
-
-/* ----------
- * pglz_out_tag -
- *
- *        Outputs a backward reference tag of 2-4 bytes (depending on
- *        offset and length) to the destination buffer including the
- *        appropriate control bit.
- * ----------
- */
-#define pglz_out_tag(_ctrlp,_ctrlb,_ctrl,_buf,_len,_off) \
-do { \
-    pglz_out_ctrl(_ctrlp,_ctrlb,_ctrl,_buf);                                \
-    _ctrlb |= _ctrl;                                                        \
-    _ctrl <<= 1;                                                            \
-    if (_len > 17)                                                            \
-    {                                                                        \
-        (_buf)[0] = (unsigned char)((((_off) & 0xf00) >> 4) | 0x0f);        \
-        (_buf)[1] = (unsigned char)(((_off) & 0xff));                        \
-        (_buf)[2] = (unsigned char)((_len) - 18);                            \
-        (_buf) += 3;                                                        \
-    } else {                                                                \
-        (_buf)[0] = (unsigned char)((((_off) & 0xf00) >> 4) | ((_len) - 3)); \
-        (_buf)[1] = (unsigned char)((_off) & 0xff);                            \
-        (_buf) += 2;                                                        \
-    }                                                                        \
-} while (0)
-
-/* ---------- * pglz_find_match - *
@@ -595,7 +531,7 @@ pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,             * Create the tag and
addhistory entries for all matched             * characters.             */
 
-            pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off);
+            pglz_out_tag(ctrlp, ctrlb, ctrl, bp, match_len, match_off, dp);            while (match_len--)
{               pglz_hist_add(hist_start, hist_entries,
 
@@ -647,15 +583,38 @@ pglz_compress(const char *source, int32 slen, PGLZ_Header *dest,voidpglz_decompress(const
PGLZ_Header*source, char *dest){
 
+    pglz_decompress_with_history((char *) source, dest, NULL, NULL);
+}
+
+/* ----------
+ * pglz_decompress_with_history -
+ *
+ *        Decompresses source into dest.
+ *        To decompress, it uses history if provided.
+ * ----------
+ */
+void
+pglz_decompress_with_history(const char *source, char *dest, uint32 *destlen,
+                             const char *history)
+{
+    PGLZ_Header src;    const unsigned char *sp;    const unsigned char *srcend;    unsigned char *dp;    unsigned
char*destend;
 
+    /* To avoid the unaligned access of PGLZ_Header */
+    memcpy((char *) &src, source, sizeof(PGLZ_Header));
+    sp = ((const unsigned char *) source) + sizeof(PGLZ_Header);
-    srcend = ((const unsigned char *) source) + VARSIZE(source);
+    srcend = ((const unsigned char *) source) + VARSIZE(&src);    dp = (unsigned char *) dest;
-    destend = dp + source->rawsize;
+    destend = dp + src.rawsize;
+
+    if (destlen)
+    {
+        *destlen = src.rawsize;
+    }    while (sp < srcend && dp < destend)    {
@@ -699,28 +658,76 @@ pglz_decompress(const PGLZ_Header *source, char *dest)                    break;
}
-                /*
-                 * Now we copy the bytes specified by the tag from OUTPUT to
-                 * OUTPUT. It is dangerous and platform dependent to use
-                 * memcpy() here, because the copied areas could overlap
-                 * extremely!
-                 */
-                while (len--)
+                if (history)
+                {
+                    /*
+                     * Now we copy the bytes specified by the tag from history to
+                     * OUTPUT.
+                     */
+                    memcpy(dp, history + off, len);
+                    dp += len;
+                }
+                else                {
-                    *dp = dp[-off];
-                    dp++;
+                    /*
+                     * Now we copy the bytes specified by the tag from OUTPUT to
+                     * OUTPUT. It is dangerous and platform dependent to use
+                     * memcpy() here, because the copied areas could overlap
+                     * extremely!
+                     */
+                    while (len--)
+                    {
+                        *dp = dp[-off];
+                        dp++;
+                    }                }            }            else            {
-                /*
-                 * An unset control bit means LITERAL BYTE. So we just copy
-                 * one from INPUT to OUTPUT.
-                 */
-                if (dp >= destend)        /* check for buffer overrun */
-                    break;        /* do not clobber memory */
-
-                *dp++ = *sp++;
+                if (history)
+                {
+                    /*
+                     * The byte at current offset in the source is the length
+                     * of this literal segment. See pglz_out_add for encoding
+                     * side.
+                     */
+                    int32        len;
+
+                    len = sp[0];
+                    sp += 1;
+
+                    /*
+                     * Check for output buffer overrun, to ensure we don't clobber
+                     * memory in case of corrupt input.  Note: we must advance dp
+                     * here to ensure the error is detected below the loop.  We
+                     * don't simply put the elog inside the loop since that will
+                     * probably interfere with optimization.
+                     */
+                    if (dp + len > destend)
+                    {
+                        dp += len;
+                        break;
+                    }
+
+                    /*
+                     * Now we copy the bytes specified by the tag from Source to
+                     * OUTPUT.
+                     */
+                    memcpy(dp, sp, len);
+                    dp += len;
+                    sp += len;
+                }
+                else
+                {
+                    /*
+                     * An unset control bit means LITERAL BYTE. So we just copy
+                     * one from INPUT to OUTPUT.
+                     */
+                    if (dp >= destend)        /* check for buffer overrun */
+                        break;        /* do not clobber memory */
+
+                    *dp++ = *sp++;
+                }            }            /*
diff --git a/src/include/access/heapam_xlog.h b/src/include/access/heapam_xlog.h
index 8ec710e..3e4001f 100644
--- a/src/include/access/heapam_xlog.h
+++ b/src/include/access/heapam_xlog.h
@@ -142,12 +142,20 @@ typedef struct xl_heap_update{    xl_heaptid    target;            /* deleted tuple id */
ItemPointerDatanewtid;        /* new inserted tuple id */
 
-    bool        all_visible_cleared;    /* PD_ALL_VISIBLE was cleared */
-    bool        new_all_visible_cleared;        /* same for the page of newtid */
+    char        flags;            /* flag bits, see below */
+    /* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */} xl_heap_update;
-#define SizeOfHeapUpdate    (offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool))
+
+#define XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED        0x01 /* Indicates as old page's
+                                                all visible bit is cleared */
+#define XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED    0x02 /* Indicates as new page's
+                                                all visible bit is cleared */
+#define XL_HEAP_UPDATE_DELTA_ENCODED            0x04 /* Indicates as the update
+                                                operation is delta encoded */
+
+#define SizeOfHeapUpdate    (offsetof(xl_heap_update, flags) + sizeof(char))/* * This is what we need to know about
vacuumpage cleanup/redirect
 
diff --git a/src/include/access/htup_details.h b/src/include/access/htup_details.h
index 7abe3e6..4419fc4 100644
--- a/src/include/access/htup_details.h
+++ b/src/include/access/htup_details.h
@@ -18,6 +18,7 @@#include "access/tupdesc.h"#include "access/tupmacs.h"#include "storage/bufpage.h"
+#include "utils/pg_lzcompress.h"/* * MaxTupleAttributeNumber limits the number of (user) columns in a tuple.
@@ -528,6 +529,7 @@ struct MinimalTupleData        HeapTupleHeaderSetOid((tuple)->t_data, (oid))
+#if !defined(DISABLE_COMPLEX_MACRO)/* ---------------- *        fastgetattr *
@@ -542,9 +544,6 @@ struct MinimalTupleData *        lookups, and call nocachegetattr() for the rest. *
----------------*/
 
-
-#if !defined(DISABLE_COMPLEX_MACRO)
-#define fastgetattr(tup, attnum, tupleDesc, isnull)                    \(
                     \    AssertMacro((attnum) > 0),                                        \
 
@@ -572,14 +571,56 @@ struct MinimalTupleData            nocachegetattr((tup), (attnum), (tupleDesc))            \
 )                                                            \    )
           \
 
+)                                                                    \
+
+/* ----------------
+ *        fastgetattr_with_len
+ *
+ *        Similar to fastgetattr and fetches the length of the given attribute
+ *        also.
+ * ----------------
+ */
+#define fastgetattr_with_len(tup, attnum, tupleDesc, isnull, len)    \
+(                                                                    \
+    AssertMacro((attnum) > 0),                                        \
+    (*(isnull) = false),                                            \
+    HeapTupleNoNulls(tup) ?                                         \
+    (                                                                \
+        (tupleDesc)->attrs[(attnum)-1]->attcacheoff >= 0 ?            \
+        (                                                            \
+            (*(len) = att_getlength(                                \
+                    (tupleDesc)->attrs[(attnum)-1]->attlen,         \
+                    (char *) (tup)->t_data + (tup)->t_data->t_hoff +\
+                    (tupleDesc)->attrs[(attnum)-1]->attcacheoff)),    \
+            fetchatt((tupleDesc)->attrs[(attnum)-1],                \
+                (char *) (tup)->t_data + (tup)->t_data->t_hoff +    \
+                    (tupleDesc)->attrs[(attnum)-1]->attcacheoff)    \
+        )                                                            \
+        :                                                            \
+            nocachegetattr_with_len((tup), (attnum), (tupleDesc), (len))\
+    )                                                                \
+    :                                                                \
+    (                                                                \
+        att_isnull((attnum)-1, (tup)->t_data->t_bits) ?             \
+        (                                                            \
+            (*(isnull) = true),                                     \
+            (*(len) = 0),                                            \
+            (Datum)NULL                                             \
+        )                                                            \
+        :                                                            \
+        (                                                            \
+            nocachegetattr_with_len((tup), (attnum), (tupleDesc), (len))\
+        )                                                            \
+    )                                                                \)
-#else                            /* defined(DISABLE_COMPLEX_MACRO) */
+#else                            /* defined(DISABLE_COMPLEX_MACRO) */extern Datum fastgetattr(HeapTuple tup, int
attnum,TupleDesc tupleDesc,            bool *isnull);
 
+extern Datum fastgetattr_with_len(HeapTuple tup, int attnum,
+            TupleDesc tupleDesc, bool *isnull, int32 *len);#endif   /* defined(DISABLE_COMPLEX_MACRO) */
-/* ---------------- *        heap_getattr *
@@ -596,21 +637,43 @@ extern Datum fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, * ----------------
*/#defineheap_getattr(tup, attnum, tupleDesc, isnull) \
 
+( \
+    ((attnum) > 0) ? \    ( \
-        ((attnum) > 0) ? \
+        ((attnum) > (int) HeapTupleHeaderGetNatts((tup)->t_data)) ? \        ( \
-            ((attnum) > (int) HeapTupleHeaderGetNatts((tup)->t_data)) ? \
-            ( \
-                (*(isnull) = true), \
-                (Datum)NULL \
-            ) \
-            : \
-                fastgetattr((tup), (attnum), (tupleDesc), (isnull)) \
+            (*(isnull) = true), \
+            (Datum)NULL \        ) \        : \
-            heap_getsysattr((tup), (attnum), (tupleDesc), (isnull)) \
-    )
+            fastgetattr((tup), (attnum), (tupleDesc), (isnull)) \
+    ) \
+    : \
+        heap_getsysattr((tup), (attnum), (tupleDesc), (isnull)) \
+)
+/* ----------------
+ *        heap_getattr_with_len
+ *
+ *        Similar to heap_getattr and outputs the length of the given attribute.
+ * ----------------
+ */
+#define heap_getattr_with_len(tup, attnum, tupleDesc, isnull, len) \
+( \
+    ((attnum) > 0) ? \
+    ( \
+        ((attnum) > (int) HeapTupleHeaderGetNatts((tup)->t_data)) ? \
+        ( \
+            (*(isnull) = true), \
+            (*(len) = 0), \
+            (Datum)NULL \
+        ) \
+        : \
+            fastgetattr_with_len((tup), (attnum), (tupleDesc), (isnull), (len)) \
+    ) \
+    : \
+        heap_getsysattr((tup), (attnum), (tupleDesc), (isnull)) \
+)/* prototypes for functions in common/heaptuple.c */extern Size heap_compute_data_size(TupleDesc tupleDesc,
@@ -620,6 +683,8 @@ extern void heap_fill_tuple(TupleDesc tupleDesc,                char *data, Size data_size,
      uint16 *infomask, bits8 *bit);extern bool heap_attisnull(HeapTuple tup, int attnum);
 
+extern Datum nocachegetattr_with_len(HeapTuple tup, int attnum,
+               TupleDesc att, Size *len);extern Datum nocachegetattr(HeapTuple tup, int attnum,
TupleDescatt);extern Datum heap_getsysattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
 
@@ -636,6 +701,14 @@ extern HeapTuple heap_modify_tuple(HeapTuple tuple,extern void heap_deform_tuple(HeapTuple tuple,
TupleDesctupleDesc,                  Datum *values, bool *isnull);
 
+extern bool heap_attr_get_length_and_check_equals(TupleDesc tupdesc,
+                        int attrnum, HeapTuple tup1, HeapTuple tup2,
+                        Size *tup1_attr_len, Size *tup2_attr_len);
+extern bool heap_delta_encode(TupleDesc tupleDesc, HeapTuple oldtup,
+                HeapTuple newtup, PGLZ_Header *encdata);
+extern void heap_delta_decode (PGLZ_Header *encdata, HeapTuple oldtup,
+                HeapTuple newtup);
+/* these three are deprecated versions of the three above: */extern HeapTuple heap_formtuple(TupleDesc
tupleDescriptor,              Datum *values, char *nulls);
 
diff --git a/src/include/access/tupmacs.h b/src/include/access/tupmacs.h
index 984a049..c1a27f7 100644
--- a/src/include/access/tupmacs.h
+++ b/src/include/access/tupmacs.h
@@ -187,6 +187,28 @@)/*
+ * att_getlength -
+ *            Gets the length of the attribute.
+ */
+#define att_getlength(attlen, attptr) \
+( \
+    ((attlen) > 0) ? \
+    ( \
+        (attlen) \
+    ) \
+    : (((attlen) == -1) ? \
+    ( \
+        VARSIZE_ANY(attptr) \
+    ) \
+    : \
+    ( \
+        AssertMacro((attlen) == -2), \
+        (strlen((char *) (attptr)) + 1) \
+    )) \
+)
+
+
+/* * store_att_byval is a partial inverse of fetch_att: store a given Datum * value into a tuple data area at the
specifiedaddress.  However, it only * handles the byval case, because in typical usage the caller needs to
 
diff --git a/src/include/utils/pg_lzcompress.h b/src/include/utils/pg_lzcompress.h
index 4af24a3..7b9d588 100644
--- a/src/include/utils/pg_lzcompress.h
+++ b/src/include/utils/pg_lzcompress.h
@@ -23,6 +23,8 @@ typedef struct PGLZ_Header    int32        rawsize;} PGLZ_Header;
+#define PGLZ_HISTORY_SIZE        4096
+#define PGLZ_MAX_MATCH            273/* ---------- * PGLZ_MAX_OUTPUT -
@@ -86,6 +88,119 @@ typedef struct PGLZ_Strategy    int32        match_size_drop;} PGLZ_Strategy;
+/*
+ * calculate the approximate length required for history encode tag for the
+ * given length
+ */
+#define PGLZ_GET_HIST_CTRL_BIT_LEN(_len) \
+( \
+    ((_len) < 17) ?    (3) : (4 * (1 + ((_len) / PGLZ_MAX_MATCH)))    \
+)
+
+/* ----------
+ * pglz_out_ctrl -
+ *
+ *        Outputs the last and allocates a new control byte if needed.
+ * ----------
+ */
+#define pglz_out_ctrl(__ctrlp,__ctrlb,__ctrl,__buf) \
+do { \
+    if ((__ctrl & 0xff) == 0)                                                \
+    {                                                                        \
+        *(__ctrlp) = __ctrlb;                                                \
+        __ctrlp = (__buf)++;                                                \
+        __ctrlb = 0;                                                        \
+        __ctrl = 1;                                                         \
+    }                                                                        \
+} while (0)
+
+/* ----------
+ * pglz_out_literal -
+ *
+ *        Outputs a literal byte to the destination buffer including the
+ *        appropriate control bit.
+ * ----------
+ */
+#define pglz_out_literal(_ctrlp,_ctrlb,_ctrl,_buf,_byte) \
+do { \
+    pglz_out_ctrl(_ctrlp,_ctrlb,_ctrl,_buf);                                \
+    *(_buf)++ = (unsigned char)(_byte);                                     \
+    _ctrl <<= 1;                                                            \
+} while (0)
+
+/* ----------
+ * pglz_out_tag -
+ *
+ *        Outputs a backward/history reference tag of 2-4 bytes (depending on
+ *        offset and length) to the destination buffer including the
+ *        appropriate control bit.
+ *
+ *        Split the process of backward/history reference as different chunks,
+ *        if the given lenght is more than max match and repeats the process
+ *        until the given length is processed.
+ *
+ *        If the matched history length is less than 3 bytes then add it as a
+ *        new data only during encoding instead of history reference. This occurs
+ *        only while framing delta record for wal update operation.
+ * ----------
+ */
+#define pglz_out_tag(_ctrlp,_ctrlb,_ctrl,_buf,_len,_off,_byte) \
+do { \
+    int _mlen;                                                                    \
+    int _total_len = (_len);                                                    \
+    while (_total_len > 0)                                                        \
+    {                                                                            \
+        _mlen = _total_len > PGLZ_MAX_MATCH ? PGLZ_MAX_MATCH : _total_len;        \
+        if (_mlen < 3)                                                            \
+        {                                                                        \
+            (_byte) = (char *)(_byte) + (_off);                                    \
+            pglz_out_add(_ctrlp,_ctrlb,_ctrl,_buf,_mlen,(_byte));                \
+            break;                                                                \
+        }                                                                        \
+        pglz_out_ctrl(_ctrlp,_ctrlb,_ctrl,_buf);                                \
+        _ctrlb |= _ctrl;                                                        \
+        _ctrl <<= 1;                                                            \
+        if (_mlen > 17)                                                            \
+        {                                                                        \
+            (_buf)[0] = (unsigned char)((((_off) & 0xf00) >> 4) | 0x0f);        \
+            (_buf)[1] = (unsigned char)(((_off) & 0xff));                        \
+            (_buf)[2] = (unsigned char)((_mlen) - 18);                            \
+            (_buf) += 3;                                                        \
+        } else {                                                                \
+            (_buf)[0] = (unsigned char)((((_off) & 0xf00) >> 4) | ((_mlen) - 3)); \
+            (_buf)[1] = (unsigned char)((_off) & 0xff);                            \
+            (_buf) += 2;                                                        \
+        }                                                                        \
+        _total_len -= _mlen;                                                    \
+        (_off) += _mlen;                                                        \
+    }                                                                            \
+} while (0)
+
+/* ----------
+ * pglz_out_add -
+ *
+ *        Outputs a reference tag of 1 byte with length and the new data
+ *        to the destination buffer, including the appropriate control bit.
+ * ----------
+ */
+#define pglz_out_add(_ctrlp,_ctrlb,_ctrl,_buf,_len,_byte) \
+do { \
+    int32 _mlen;                                                                \
+    int32 _total_len = (_len);                                                            \
+    while (_total_len > 0)                                                        \
+    {                                                                        \
+        _mlen = _total_len > 255 ? 255 : _total_len;                                \
+        pglz_out_ctrl(_ctrlp,_ctrlb,_ctrl,_buf);                            \
+        _ctrl <<= 1;                                                        \
+        (_buf)[0] = (unsigned char)(_mlen);                                    \
+        (_buf) += 1;                                                        \
+        memcpy((_buf), (_byte), _mlen);                                        \
+        (_buf) += _mlen;                                                    \
+        (_byte) += _mlen;                                                    \
+        _total_len -= _mlen;                                                    \
+    }                                                                        \
+} while (0)
+/* ---------- * The standard strategies
@@ -108,5 +223,6 @@ extern const PGLZ_Strategy *const PGLZ_strategy_always;extern bool pglz_compress(const char
*source,int32 slen, PGLZ_Header *dest,              const PGLZ_Strategy *strategy);extern void pglz_decompress(const
PGLZ_Header*source, char *dest);
 
-
+extern void pglz_decompress_with_history(const char *source, char *dest,
+                uint32 *destlen, const char *history);#endif   /* _PG_LZCOMPRESS_H_ */

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Peter Eisentraut
Дата:
Сообщение: SPI API and exceptions
Следующее
От: Peter Eisentraut
Дата:
Сообщение: Re: multiple CREATE FUNCTION AS items for PLs