Обсуждение: pipe chunks protocol

Поиск
Список
Период
Сортировка

pipe chunks protocol

От
Andrew Dunstan
Дата:
This patch implements the protocol Tom suggested for writing to the
syslogger pipe. It seems to pass my tests (basically "make installcheck"
against a server with stderr redirection turned on and log_statement set
to 'all').

The effect of this should be to prevent two problems:
. partial messages get written to the log file, which messes with
rotation, and
. messages from various backends get interleaved, causing garbled logs.

Please review ASAP. I want to get this applied soon so that a) it gets
wider testing and b) I can use it as the basis for the adapted CSV log
patch. If this is acceptable I intend to backpatch this all the way to
wherever we started using the syslogger pipe (was that 8.0?).

cheers

andrew


Re: pipe chunks protocol

От
Andrew Dunstan
Дата:
and here's the patch


Andrew Dunstan wrote:
>
> This patch implements the protocol Tom suggested for writing to the
> syslogger pipe. It seems to pass my tests (basically "make
> installcheck" against a server with stderr redirection turned on and
> log_statement set to 'all').
>
> The effect of this should be to prevent two problems:
> . partial messages get written to the log file, which messes with
> rotation, and
> . messages from various backends get interleaved, causing garbled logs.
>
> Please review ASAP. I want to get this applied soon so that a) it gets
> wider testing and b) I can use it as the basis for the adapted CSV log
> patch. If this is acceptable I intend to backpatch this all the way to
> wherever we started using the syslogger pipe (was that 8.0?).
>
> cheers
>
> andrew
>
>
Index: src/backend/postmaster/syslogger.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/postmaster/syslogger.c,v
retrieving revision 1.31
diff -c -r1.31 syslogger.c
*** src/backend/postmaster/syslogger.c    4 Jun 2007 22:21:42 -0000    1.31
--- src/backend/postmaster/syslogger.c    13 Jun 2007 15:38:07 -0000
***************
*** 42,47 ****
--- 42,48 ----
  #include "utils/guc.h"
  #include "utils/ps_status.h"
  #include "utils/timestamp.h"
+ #include "lib/stringinfo.h"

  /*
   * We really want line-buffered mode for logfile output, but Windows does
***************
*** 54,59 ****
--- 55,77 ----
  #define LBF_MODE    _IOLBF
  #endif

+ /* try not to break chunked messages into multiple reads */
+ #if PIPE_BUF > 1024
+ #define READ_SIZE PIPE_BUF
+ #else
+ #define READ_SIZE 1024
+ #endif
+
+ /*
+  * we use a buffer twice as big as a read so that if there is a fragment left
+  * after processing what is read we can save it and copy it back before the
+  * next read.
+  */
+ #define READ_BUF_SIZE 2 * READ_SIZE
+
+ /* buffer to keep any partial chunks read between calls to read()/ReadFile() */
+ static char * read_fragment[READ_SIZE];
+ static int read_fragment_len = 0;

  /*
   * GUC parameters.    Redirect_stderr cannot be changed after postmaster
***************
*** 75,89 ****
   * Private state
   */
  static pg_time_t next_rotation_time;
-
  static bool redirection_done = false;
-
  static bool pipe_eof_seen = false;
-
  static FILE *syslogFile = NULL;
-
  static char *last_file_name = NULL;

  /* These must be exported for EXEC_BACKEND case ... annoying */
  #ifndef WIN32
  int            syslogPipe[2] = {-1, -1};
--- 93,117 ----
   * Private state
   */
  static pg_time_t next_rotation_time;
  static bool redirection_done = false;
  static bool pipe_eof_seen = false;
  static FILE *syslogFile = NULL;
  static char *last_file_name = NULL;

+ /*
+  * buffers for saving partial messages from different backends. We don't expect
+  * that there will be very many outstanding at one time, so 20 seems plenty of
+  * leeway. If this array gets full we won't lose messages, but we will lose
+  * the protocol protection against them being partially written or interleaved.
+  */
+ typedef struct
+ {
+     pid_t pid;
+     StringInfoData data;
+ } save_buffer;
+ #define CHUNK_SLOTS 20
+ static save_buffer saved_chunks[CHUNK_SLOTS];
+
  /* These must be exported for EXEC_BACKEND case ... annoying */
  #ifndef WIN32
  int            syslogPipe[2] = {-1, -1};
***************
*** 117,123 ****
  static void set_next_rotation_time(void);
  static void sigHupHandler(SIGNAL_ARGS);
  static void sigUsr1Handler(SIGNAL_ARGS);
!

  /*
   * Main entry point for syslogger process
--- 145,151 ----
  static void set_next_rotation_time(void);
  static void sigHupHandler(SIGNAL_ARGS);
  static void sigUsr1Handler(SIGNAL_ARGS);
! static void write_chunk(const char * buffer, int count);

  /*
   * Main entry point for syslogger process
***************
*** 244,250 ****
          bool        time_based_rotation = false;

  #ifndef WIN32
!         char        logbuffer[1024];
          int            bytesRead;
          int            rc;
          fd_set        rfds;
--- 272,278 ----
          bool        time_based_rotation = false;

  #ifndef WIN32
!         char        logbuffer[READ_BUF_SIZE];
          int            bytesRead;
          int            rc;
          fd_set        rfds;
***************
*** 325,332 ****
          }
          else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
          {
              bytesRead = piperead(syslogPipe[0],
!                                  logbuffer, sizeof(logbuffer));

              if (bytesRead < 0)
              {
--- 353,363 ----
          }
          else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
          {
+             Assert (read_fragment_len <= READ_SIZE);
+
+             memcpy(logbuffer, read_fragment, read_fragment_len);
              bytesRead = piperead(syslogPipe[0],
!                                  logbuffer + read_fragment_len, READ_SIZE);

              if (bytesRead < 0)
              {
***************
*** 337,343 ****
              }
              else if (bytesRead > 0)
              {
!                 write_syslogger_file(logbuffer, bytesRead);
                  continue;
              }
              else
--- 368,374 ----
              }
              else if (bytesRead > 0)
              {
!                 write_syslogger_file(logbuffer, bytesRead + read_fragment_len);
                  continue;
              }
              else
***************
*** 349,354 ****
--- 380,389 ----
                   * and all backends are shut down, and we are done.
                   */
                  pipe_eof_seen = true;
+
+                 /* if there's a fragment left then force it out now */
+                 if (read_fragment_len)
+                     write_chunk(read_fragment, read_fragment_len);
              }
          }
  #else                            /* WIN32 */
***************
*** 622,631 ****
--- 657,821 ----
   * This is exported so that elog.c can call it when am_syslogger is true.
   * This allows the syslogger process to record elog messages of its own,
   * even though its stderr does not point at the syslog pipe.
+  *
+  * This routine processes the log pipe protocol which sends log messages as
+  * chunks - such chunks are detected and reassembled here.
+  * The protocol has a header that starts with two nul bytes, then has a 16 bit
+  * length, the pid of the sending process, and a flag to indicate if it is
+  * the last chunk in a message. Incomplete chunks are saved until we read some
+  * more, and non-final chunks are accumulated until we get the final chunk.
+  *
+  * All of this is to avoid 2 problems:
+  * . partial messages being written to logfiles, (messes rotation) and
+  * . messages from different backends being interleaved (messages garbled).
+  *
+  * Any non-protocol messages are written out directly. These should only come
+  * from non-PostgreSQL sources, however (e.g. third party libraries writing to
+  * stderr). This won't matter for CSV output, which will be a separate
+  * reporting channel.
   */
  void
  write_syslogger_file(const char *buffer, int count)
  {
+     char *cursor = (char *) buffer;
+     int  chunklen;
+     PipeProto p;
+
+     /* the buffer has any fragment we had saved, so reset the length */
+     read_fragment_len = 0;
+
+
+     while (count > 0)
+     {
+         /* not enough data even for a header? save it until we get more */
+         if (count < sizeof(PipeProto))
+         {
+             memcpy(read_fragment, cursor, count);
+             read_fragment_len = count;
+             return;
+         }
+         /* process protocol chunks */
+         if ( cursor[0] == '\0' && cursor[1] == '\0' )
+         {
+             memcpy(&p,cursor,sizeof(PipeProto));
+             /* save a partial chunk in the fragment buffer */
+             if (p.len + PIPE_DATA_OFFSET > count)
+             {
+                 Assert(count <= READ_SIZE);
+                 memcpy(read_fragment, cursor, count);
+                 read_fragment_len = count;
+                 return;
+             }
+             /*
+              * save a complete non-final chunk in the per-pid buffer
+              * if possible - if not just write it out.
+              */
+             else if ( p.is_last != 't')
+             {
+                 int free_slot = -1, existing_slot = -1;
+                 int i;
+                 StringInfo str;
+
+                 Assert (p.is_last == 'f');
+
+                 for (i = 0; i < CHUNK_SLOTS; i++)
+                 {
+                     if (saved_chunks[i].pid == 0 && free_slot < 0)
+                         free_slot = i;
+                     if (saved_chunks[i].pid == p.pid)
+                     {
+                         existing_slot = i;
+                         break;
+                     }
+                 }
+                 if (existing_slot > -1)
+                 {
+                     str = &(saved_chunks[existing_slot].data);
+                     appendBinaryStringInfo(str, cursor + PIPE_DATA_OFFSET,
+                                            p.len);
+                 }
+                 else if (free_slot > -1)
+                 {
+                     saved_chunks[free_slot].pid = p.pid;
+                     str = &(saved_chunks[free_slot].data);
+                     initStringInfo(str);
+                     appendBinaryStringInfo(str, cursor + PIPE_DATA_OFFSET,
+                                            p.len);
+                 }
+                 else
+                 {
+                     /*
+                      * if there is no exisiting or free slot we'll just have to
+                      * take our chances and write out a part message and hope
+                      * that it's not followed by something from another pid.
+                      */
+                     write_chunk(cursor + PIPE_DATA_OFFSET, p.len);
+                 }
+             }
+             /*
+              * add a final chunk to anything saved for that pid, and either way
+              * write the whole thing out.
+              */
+             else
+             {
+                 int existing_slot = -1;
+                 int i;
+                 for (i = 0; i < CHUNK_SLOTS; i++)
+                 {
+                     if (saved_chunks[i].pid == p.pid)
+                     {
+                         existing_slot = i;
+                         break;
+                     }
+                 }
+                 if (existing_slot > -1)
+                 {
+                     appendBinaryStringInfo(&(saved_chunks[existing_slot].data),
+                                            cursor + PIPE_DATA_OFFSET, p.len);
+                     write_chunk(saved_chunks[existing_slot].data.data,
+                                 saved_chunks[existing_slot].data.len);
+                     saved_chunks[existing_slot].pid = 0;
+                     pfree(saved_chunks[existing_slot].data.data);
+                 }
+                 else
+                 {
+                     /* the whole message was one chunk, probably. */
+                     write_chunk(cursor + PIPE_DATA_OFFSET, p.len);
+                 }
+             }
+
+             count -= PIPE_DATA_OFFSET + p.len;
+             cursor += PIPE_DATA_OFFSET + p.len;
+         }
+         /* process non-protocol chunks */
+         else
+         {
+             /* look for the start of a protocol header */
+             for(chunklen = 1; chunklen + 1 < count; chunklen++)
+             {
+                 if (cursor[chunklen] == '\0' && cursor[chunklen + 1] == '\0')
+                 {
+                     write_chunk(cursor, chunklen);
+                     cursor += chunklen;
+                     count -= chunklen;
+                     break;
+                 }
+             }
+             /* if no protocol header, write out the whole remaining buffer */
+             if (chunklen + 1 >= count)
+             {
+                 write_chunk(cursor, count);
+                 read_fragment_len = 0;
+                 return;
+             }
+         }
+     }
+
+ }
+
+ void
+ write_chunk(const char *buffer, int count)
+ {
      int            rc;

  #ifndef WIN32
***************
*** 654,664 ****
  pipeThread(void *arg)
  {
      DWORD        bytesRead;
!     char        logbuffer[1024];

      for (;;)
      {
!         if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer),
                        &bytesRead, 0))
          {
              DWORD        error = GetLastError();
--- 844,856 ----
  pipeThread(void *arg)
  {
      DWORD        bytesRead;
!     char        logbuffer[READ_BUF_SIZE];

      for (;;)
      {
!         Assert (read_fragment_len <= READ_SIZE);
!         memcpy(logbuffer, read_buffer, read_fragment_len);
!         if (!ReadFile(syslogPipe[0], logbuffer + read_fragment_len, READ_SIZE,
                        &bytesRead, 0))
          {
              DWORD        error = GetLastError();
***************
*** 672,682 ****
                       errmsg("could not read from logger pipe: %m")));
          }
          else if (bytesRead > 0)
!             write_syslogger_file(logbuffer, bytesRead);
      }

      /* We exit the above loop only upon detecting pipe EOF */
      pipe_eof_seen = true;
      _endthread();
      return 0;
  }
--- 864,879 ----
                       errmsg("could not read from logger pipe: %m")));
          }
          else if (bytesRead > 0)
!             write_syslogger_file(logbuffer, bytesRead + read_fragment_len);
      }

      /* We exit the above loop only upon detecting pipe EOF */
      pipe_eof_seen = true;
+
+     /* if there's a fragment left then force it out now */
+     if (read_fragment_len)
+         write_chunk(read_fragment, read_fragment_len);
+
      _endthread();
      return 0;
  }
Index: src/backend/utils/error/elog.c
===================================================================
RCS file: /cvsroot/pgsql/src/backend/utils/error/elog.c,v
retrieving revision 1.186
diff -c -r1.186 elog.c
*** src/backend/utils/error/elog.c    7 Jun 2007 21:45:59 -0000    1.186
--- src/backend/utils/error/elog.c    13 Jun 2007 15:38:08 -0000
***************
*** 56,61 ****
--- 56,62 ----
  #ifdef HAVE_SYSLOG
  #include <syslog.h>
  #endif
+ #include <limits.h>

  #include "access/transam.h"
  #include "access/xact.h"
***************
*** 71,76 ****
--- 72,78 ----
  #include "utils/ps_status.h"


+
  /* Global variables */
  ErrorContextCallback *error_context_stack = NULL;

***************
*** 124,129 ****
--- 126,135 ----
  static const char *error_severity(int elevel);
  static void append_with_tabs(StringInfo buf, const char *str);
  static bool is_log_level_output(int elevel, int log_min_level);
+ static void write_pipe_chunks(int fd, char * data, int len);
+
+ /* allow space for preamble plus a little head room */
+ #define MAX_CHUNK (sizeof(PipeChunk) - sizeof(PipeProto))


  /*
***************
*** 1783,1789 ****
              write_eventlog(edata->elevel, buf.data);
          else
  #endif
!             fprintf(stderr, "%s", buf.data);
      }

      /* If in the syslogger process, try to write messages direct to file */
--- 1789,1798 ----
              write_eventlog(edata->elevel, buf.data);
          else
  #endif
!         if (Redirect_stderr)
!             write_pipe_chunks(fileno(stderr),buf.data, buf.len);
!         else
!             write(fileno(stderr), buf.data, buf.len);
      }

      /* If in the syslogger process, try to write messages direct to file */
***************
*** 1794,1799 ****
--- 1803,1836 ----
  }


+ static void
+ write_pipe_chunks(int fd, char * data, int len)
+ {
+     PipeChunk p;
+
+     Assert(len > 0);
+
+     p.proto.nuls[0] = p.proto.nuls[1] = '\0';
+     p.proto.pid = MyProcPid;
+     p.proto.is_last = 'f';
+     p.proto.len = MAX_CHUNK;
+
+     /* write all but the last chunk */
+     while (len > MAX_CHUNK)
+     {
+         memcpy(p.proto.data, data, MAX_CHUNK);
+         write(fd, &p, PIPE_DATA_OFFSET + MAX_CHUNK );
+         data += MAX_CHUNK;
+         len -= MAX_CHUNK;
+     }
+
+     /* write the last chunk */
+     p.proto.is_last = 't';
+     p.proto.len = len;
+     memcpy(p.proto.data, data, len);
+     write(fd, &p, PIPE_DATA_OFFSET + len);
+ }
+
  /*
   * Write error report to client
   */
Index: src/include/postmaster/syslogger.h
===================================================================
RCS file: /cvsroot/pgsql/src/include/postmaster/syslogger.h,v
retrieving revision 1.8
diff -c -r1.8 syslogger.h
*** src/include/postmaster/syslogger.h    5 Jan 2007 22:19:57 -0000    1.8
--- src/include/postmaster/syslogger.h    13 Jun 2007 15:38:09 -0000
***************
*** 37,40 ****
--- 37,64 ----
  extern void SysLoggerMain(int argc, char *argv[]);
  #endif

+ /*
+  * primitive protocol structure for writing to syslogger pipe(s).
+  *
+  * we use 't' or 'f' instead of a bool to make the protocol a tiny bit
+  * more robust against finding a false double nul byte prologue.
+  * But we still might find it in the len and/or pid bytes unless we're careful.
+  */
+ typedef struct
+ {
+     char      nuls[2];    /* always \0\0 */
+     uint16    len;        /* size of this chunk */
+     pid_t     pid;        /* our pid */
+     char      is_last;    /* is this the last chunk? 't' or 'f' */
+     char      data[1];
+ } PipeProto;
+
+ typedef union
+ {
+     PipeProto    proto;
+     char         data[PIPE_BUF];
+ }  PipeChunk;
+
+ #define PIPE_DATA_OFFSET offsetof(PipeProto, data) /* 9 usually */
+
  #endif   /* _SYSLOGGER_H */

Re: pipe chunks protocol

От
Tom Lane
Дата:
Andrew Dunstan <andrew@dunslane.net> writes:
>> This patch implements the protocol Tom suggested for writing to the
>> syslogger pipe. It seems to pass my tests (basically "make
>> installcheck" against a server with stderr redirection turned on and
>> log_statement set to 'all').

I didn't like this patch much --- it broke the API of
write_syslogger_file, which is supposed to just write what it's given
(and it is used from outside syslogger.c with that expectation).  Also
the way it slung unconsumed data back and forth between two buffers
seemed both confusing and inefficient.  Here's a revised version.

In my testing, I found that a standard "make installcheck" run produces
only one message large enough to be split (the "infinite_recurse" thing
in errors.sql), so this is definitely not a Good Enough test.  Maybe
we could get Ed L. or one of the other complainants to try it.  (The
patch seems to need some adjustment to apply against 8.2, though.)

            regards, tom lane


Вложения

Re: pipe chunks protocol

От
Andrew Dunstan
Дата:

Tom Lane wrote:
> Andrew Dunstan <andrew@dunslane.net> writes:
>
>>> This patch implements the protocol Tom suggested for writing to the
>>> syslogger pipe. It seems to pass my tests (basically "make
>>> installcheck" against a server with stderr redirection turned on and
>>> log_statement set to 'all').
>>>
>
> I didn't like this patch much --- it broke the API of
> write_syslogger_file, which is supposed to just write what it's given
> (and it is used from outside syslogger.c with that expectation).  Also
> the way it slung unconsumed data back and forth between two buffers
> seemed both confusing and inefficient.  Here's a revised version.
>

Well. it was like the curate's egg :-) Anyway, thanks for the cleanup.

> In my testing, I found that a standard "make installcheck" run produces
> only one message large enough to be split (the "infinite_recurse" thing
> in errors.sql), so this is definitely not a Good Enough test.  Maybe
> we could get Ed L. or one of the other complainants to try it.

Yeah, what I did was to wind back the chunk size - try 128 and you'll
see plenty of chunked messages :-) But we really need to do this with
installcheck-parallel to exercise it properly.

>   (The
> patch seems to need some adjustment to apply against 8.2, though.)
>
>
>

I know we normally try not to do this, but I'd be happy to wait for the
back branches in this case.

cheers

andrew




Re: pipe chunks protocol

От
Tom Lane
Дата:
Andrew Dunstan <andrew@dunslane.net> writes:
> Yeah, what I did was to wind back the chunk size - try 128 and you'll
> see plenty of chunked messages :-) But we really need to do this with
> installcheck-parallel to exercise it properly.

Doh, of course.  I ran installcheck-parallel with log_statement = all
and the chunk size reduced to 64, and saw no apparent problems.
So that gives me at least enough confidence to apply it to HEAD.

>> (The
>> patch seems to need some adjustment to apply against 8.2, though.)

> I know we normally try not to do this, but I'd be happy to wait for the
> back branches in this case.

[confused...] How do you envision proceeding exactly?

            regards, tom lane

Re: pipe chunks protocol

От
Andrew Dunstan
Дата:

Tom Lane wrote:
> Andrew Dunstan <andrew@dunslane.net> writes:
>
>> Yeah, what I did was to wind back the chunk size - try 128 and you'll
>> see plenty of chunked messages :-) But we really need to do this with
>> installcheck-parallel to exercise it properly.
>>
>
> Doh, of course.  I ran installcheck-parallel with log_statement = all
> and the chunk size reduced to 64, and saw no apparent problems.
> So that gives me at least enough confidence to apply it to HEAD.
>
>
>>> (The
>>> patch seems to need some adjustment to apply against 8.2, though.)
>>>
>
>
>> I know we normally try not to do this, but I'd be happy to wait for the
>> back branches in this case.
>>
>
> [confused...] How do you envision proceeding exactly?
>
>
>

Never mind, if you're happy adapting and applying this right away to
back branches then I'm happy too. I just didn't want to have to wait
much before I start work on the CSVlog adaptation.

cheers

andrew

Re: pipe chunks protocol

От
Tom Lane
Дата:
Andrew Dunstan <andrew@dunslane.net> writes:
> Tom Lane wrote:
>> [confused...] How do you envision proceeding exactly?

> Never mind, if you're happy adapting and applying this right away to
> back branches then I'm happy too. I just didn't want to have to wait
> much before I start work on the CSVlog adaptation.

Actually, I was hoping you'd adapt/apply to the back branches ;-)

            regards, tom lane

Re: pipe chunks protocol

От
Andrew Dunstan
Дата:

Tom Lane wrote:
>
> Actually, I was hoping you'd adapt/apply to the back branches ;-)
>
>
>

curses! foiled again!

OK, will do.

cheers

andrew