Re: libpq compression

Поиск
Список
Период
Сортировка
От Konstantin Knizhnik
Тема Re: libpq compression
Дата
Msg-id eb8e63d3-bfe5-97f4-d852-13d1be39f040@postgrespro.ru
обсуждение исходный текст
Ответ на Re: libpq compression  (Andres Freund <andres@anarazel.de>)
Список pgsql-hackers
New version of the patch with fixed is attached.


On 28.10.2020 22:27, Andres Freund wrote:
> Hi,
>
> On 2020-10-26 19:20:46 +0300, Konstantin Knizhnik wrote:
>> diff --git a/configure b/configure
>> index ace4ed5..deba608 100755
>> --- a/configure
>> +++ b/configure
>> @@ -700,6 +700,7 @@ LD
>>   LDFLAGS_SL
>>   LDFLAGS_EX
>>   with_zlib
>> +with_zstd
>>   with_system_tzdata
>>   with_libxslt
>>   XML2_LIBS
>
> I don't see a corresponding configure.ac change?
>
>
>> +     <varlistentry id="libpq-connect-compression" xreflabel="compression">
>> +      <term><literal>compression</literal></term>
>> +      <listitem>
>> +      <para>
>> +        Request compression of libpq traffic. Client sends to the server list of compression algorithms, supported
byclient library.
 
>> +        If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq
messagessend both from client to server and
 
>> +        visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies
with'n' (no compression)
 
>> +        message and it is up to the client whether to continue work without compression or report error.
>> +        Supported compression algorithms are chosen at configure time. Right now two libraries are supported: zlib
(default)and zstd (if Postgres was
 
>> +        configured with --with-zstd option). In both cases streaming mode is used.
>> +      </para>
>> +      </listitem>
>> +     </varlistentry>
>> +
>
> - there should be a reference to potential security impacts when used in
>    combination with encrypted connections
> - What does " and it is up to the client whether to continue work
>    without compression or report error" actually mean for a libpq parameter?
> - What is the point of the "streaming mode" reference?
>
>
>
>> @@ -263,6 +272,21 @@
>>        </varlistentry>
>>   
>>        <varlistentry>
>> +      <term>CompressionAck</term>
>> +      <listitem>
>> +       <para>
>> +         Server acknowledges using compression for client-server communication protocol.
>> +         Compression can be requested by client by including "compression" option in connection string.
>> +         Client sends to the server list of compression algorithms, supported by client library
>> +         (compression algorithm is identified by one letter: <literal>'f'</literal> - Facebook zstd,
<literal>'z'</literal>- zlib,...).
 
>> +         If server supports one of this algorithms, then it acknowledges use of this algorithm and all subsequent
libpqmessages send both from client to server and
 
>> +         visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it
replieswith 'n' (no compression)
 
>> +         algorithm identifier and it is up to the client whether to continue work without compression or report
error.
>> +       </para>
>> +      </listitem>
>> +     </varlistentry>
> Why are compression methods identified by one byte identifiers? That
> seems unnecessarily small, given this is commonly a once-per-connection
> action?
>
>
> The protocol sounds to me like there's no way to enable/disable
> compression in an existing connection. To me it seems better to have an
> explicit, client initiated, request to use a specific method of
> compression (including none). That allows to enable compression for bulk
> work, and disable it in other cases (e.g. for security sensitive
> content, or for unlikely to compress well content).
>
> I think that would also make cross-version handling easier, because a
> newer client driver can send the compression request and handle the
> error, without needing to reconnect or such.
>
> Most importantly, I think such a design is basically a necessity to make
> connection poolers to work in a sensible way.
>
> And lastly, wouldn't it be reasonable to allow to specify things like
> compression levels? All that doesn't have to be supported now, but I
> think the protocol should take that into account.
>
>
>> +<para>
>> +        Used compression algorithm. Right now the following streaming compression algorithms are supported: 'f' -
Facebookzstd, 'z' - zlib, 'n' - no compression.
 
>> +</para>
> I would prefer this just be referenced as zstd or zstandard, not
> facebook zstd. There's an RFC (albeit only "informational"), and it
> doesn't name facebook, except as an employer:
> https://tools.ietf.org/html/rfc8478
>
>
>> +int
>> +pq_configure(Port* port)
>> +{
>> +    char* client_compression_algorithms = port->compression_algorithms;
>> +    /*
>> +     * If client request compression, it sends list of supported compression algorithms.
>> +     * Each compression algorirthm is idetified by one letter ('f' - Facebook zsts, 'z' - xlib)
>> +     */
> s/algorirthm/algorithm/
> s/idetified/identified/
> s/zsts/zstd/
> s/xlib/zlib/
>
> That's, uh, quite the typo density.
>
>
>> +    if (client_compression_algorithms)
>> +    {
>> +        char server_compression_algorithms[ZPQ_MAX_ALGORITHMS];
>> +        char compression_algorithm = ZPQ_NO_COMPRESSION;
>> +        char compression[6] = {'z',0,0,0,5,0}; /* message length = 5 */
>> +        int rc;
> Why is this hand-rolling protocol messages?
>
>
>> +        /* Intersect lists */
>> +        while (*client_compression_algorithms != '\0')
>> +        {
>> +            if (strchr(server_compression_algorithms, *client_compression_algorithms))
>> +            {
>> +                compression_algorithm = *client_compression_algorithms;
>> +                break;
>> +            }
>> +            client_compression_algorithms += 1;
>> +        }
> Why isn't this is handled within zpq?
>
>
>> +        /* Send 'z' message to the client with selectde comression algorithm ('n' if match is ont found) */
> s/selectde/selected/
> s/comression/compression/
> s/ont/not/
>
>
>> +        socket_set_nonblocking(false);
>> +        while ((rc = secure_write(MyProcPort, compression, sizeof(compression))) < 0
>> +               && errno == EINTR);
>> +        if ((size_t)rc != sizeof(compression))
>> +            return -1;
> Huh? This all seems like an abstraction violation.
>
>
>> +        /* initialize compression */
>> +        if (zpq_set_algorithm(compression_algorithm))
>> +            PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort);
>> +    }
>> +    return 0;
>> +}
> Why is zpq a wrapper around secure_write/read? I'm a bit worried this
> will reduce the other places we could use zpq.
>
>
>>   static int
>> -pq_recvbuf(void)
>> +pq_recvbuf(bool nowait)
>>   {
>> +        /* If srteaming compression is enabled then use correpondent comression read function. */
> s/srteaming/streaming/
> s/correpondent/correponding/
> s/comression/compression/
>
> Could you please try to proof-read the patch a bit? The typo density
> is quite high.
>
>
>> +        r = PqStream
>> +            ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
>> +                       PQ_RECV_BUFFER_SIZE - PqRecvLength, &processed)
>> +            : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
>> +                          PQ_RECV_BUFFER_SIZE - PqRecvLength);
>> +        PqRecvLength += processed;
> ? : doesn't make sense to me in this case. This should be an if/else.
>
>
>>           if (r < 0)
>>           {
>> +            if (r == ZPQ_DECOMPRESS_ERROR)
>> +            {
>> +                char const* msg = zpq_error(PqStream);
>> +                if (msg == NULL)
>> +                    msg = "end of stream";
>> +                ereport(COMMERROR,
>> +                        (errcode_for_socket_access(),
>> +                         errmsg("failed to decompress data: %s", msg)));
>> +                return EOF;
>> +            }
> I don't think we should error out with "failed to decompress data:"
> e.g. when the client closed the connection.
>
>
>
>> @@ -1413,13 +1457,18 @@ internal_flush(void)
>>       char       *bufptr = PqSendBuffer + PqSendStart;
>>       char       *bufend = PqSendBuffer + PqSendPointer;
>>   
>> -    while (bufptr < bufend)
>> +    while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal
compressionbuffer */
 
>>       {
> Overly long line.
>
>
>> -        int            r;
>> -
>> -        r = secure_write(MyProcPort, bufptr, bufend - bufptr);
>> -
>> -        if (r <= 0)
>> +        int        r;
>> +        size_t  processed = 0;
>> +        size_t  available = bufend - bufptr;
>> +        r = PqStream
>> +            ? zpq_write(PqStream, bufptr, available, &processed)
>> +            : secure_write(MyProcPort, bufptr, available);
> Same comment as above, re ternary expression.
>
>
>
>> +/*
>> + * Functions implementing streaming compression algorithm
>> + */
>> +typedef struct
>> +{
>> +    /*
>> +     * Returns letter identifying compression algorithm.
>> +     */
>> +    char    (*name)(void);
>> +
>> +    /*
>> +     * Create compression stream with using rx/tx function for fetching/sending compressed data
>> +     */
>> +    ZpqStream* (*create)(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg);
>> +
>> +    /*
>> +     * Read up to "size" raw (decompressed) bytes.
>> +     * Returns number of decompressed bytes or error code.
>> +     * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function.
>> +     */
>> +    ssize_t (*read)(ZpqStream *zs, void *buf, size_t size);
>> +
>> +    /*
>> +     * Write up to "size" raw (decompressed) bytes.
>> +     * Returns number of written raw bytes or error code returned by tx function.
>> +     * In the last case amount of written raw bytes is stored in *processed.
>> +     */
>> +    ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed);
> This should at least specify how these functions are supposed to handle
> blocking/nonblocking sockets.
>
>
>> +
>> +#define ZSTD_BUFFER_SIZE (8*1024)
>> +#define ZSTD_COMPRESSION_LEVEL 1
> Add some arguments for choosing these parameters.
>
>
>> +
>> +/*
>> + * Array with all supported compression algorithms.
>> + */
>> +static ZpqAlgorithm const zpq_algorithms[] =
>> +{
>> +#if HAVE_LIBZSTD
>> +    {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered},
>> +#endif
>> +#if HAVE_LIBZ
>> +    {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered},
>> +#endif
>> +    {NULL}
>> +};
> I think it's preferrable to use designated initializers.
>
> Do we really need zero terminated lists? Works fine, but brrr.
>
>> +/*
>> + * Index of used compression algorithm in zpq_algorithms array.
>> + */
>> +static int zpq_algorithm_impl;
> This is just odd API design imo. Why doesn't the dispatch work based on
> an argument for zpq_create() and the ZpqStream * for the rest?
>
> What if there's two libpq connections in one process? To servers
> supporting different compression algorithms? This isn't going to fly.
>
>
>> +/*
>> + * Get list of the supported algorithms.
>> + * Each algorithm is identified by one letter: 'f' - Facebook zstd, 'z' - zlib.
>> + * Algorithm identifies are appended to the provided buffer and terminated by '\0'.
>> + */
>> +void
>> +zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS])
>> +{
>> +    int i;
>> +    for (i = 0; zpq_algorithms[i].name != NULL; i++)
>> +    {
>> +        Assert(i < ZPQ_MAX_ALGORITHMS);
>> +        algorithms[i] = zpq_algorithms[i].name();
>> +    }
>> +    Assert(i < ZPQ_MAX_ALGORITHMS);
>> +    algorithms[i] = '\0';
>> +}
> Uh, doesn't this bake ZPQ_MAX_ALGORITHMS into the ABI? That seems
> entirely unnecessary?
>
>
>
>> @@ -2180,6 +2257,20 @@ build_startup_packet(const PGconn *conn, char *packet,
>>           ADD_STARTUP_OPTION("replication", conn->replication);
>>       if (conn->pgoptions && conn->pgoptions[0])
>>           ADD_STARTUP_OPTION("options", conn->pgoptions);
>> +    if (conn->compression && conn->compression[0])
>> +    {
>> +        bool enabled;
>> +        /*
>> +         * If compressoin is enabled, then send to the server list of compression algorithms
>> +         * supported by client
>> +         */
> s/compressoin/compression/
>
>> +        if (parse_bool(conn->compression, &enabled))
>> +        {
>> +            char compression_algorithms[ZPQ_MAX_ALGORITHMS];
>> +            zpq_get_supported_algorithms(compression_algorithms);
>> +            ADD_STARTUP_OPTION("compression", compression_algorithms);
>> +        }
>> +    }
> I think this needs to work in a graceful manner across server
> versions. You can make that work with an argument, using the _pq_
> parameter stuff, but as I said earlier, I think it's a mistake to deal
> with this in the startup packet anyway.
>
> Greetings,
>
> Andres Freund

-- 
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company


Вложения

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Magnus Hagander
Дата:
Сообщение: -O switch
Следующее
От: Dmitry Dolgov
Дата:
Сообщение: Re: POC: GROUP BY optimization