Re: libpq compression

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: libpq compression
Дата
Msg-id 20201028192712.xj2qneyb3fvmjm6c@alap3.anarazel.de
обсуждение исходный текст
Ответ на Re: libpq compression  (Konstantin Knizhnik <k.knizhnik@postgrespro.ru>)
Ответы Re: libpq compression  (Daniil Zakhlystov <usernamedt@yandex-team.ru>)
Re: libpq compression  (Konstantin Knizhnik <k.knizhnik@postgrespro.ru>)
Re: libpq compression  (Konstantin Knizhnik <k.knizhnik@postgrespro.ru>)
Список pgsql-hackers
Hi,

On 2020-10-26 19:20:46 +0300, Konstantin Knizhnik wrote:
> diff --git a/configure b/configure
> index ace4ed5..deba608 100755
> --- a/configure
> +++ b/configure
> @@ -700,6 +700,7 @@ LD
>  LDFLAGS_SL
>  LDFLAGS_EX
>  with_zlib
> +with_zstd
>  with_system_tzdata
>  with_libxslt
>  XML2_LIBS


I don't see a corresponding configure.ac change?


> +     <varlistentry id="libpq-connect-compression" xreflabel="compression">
> +      <term><literal>compression</literal></term>
> +      <listitem>
> +      <para>
> +        Request compression of libpq traffic. Client sends to the server list of compression algorithms, supported
byclient library.
 
> +        If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq
messagessend both from client to server and
 
> +        visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies
with'n' (no compression)
 
> +        message and it is up to the client whether to continue work without compression or report error.
> +        Supported compression algorithms are chosen at configure time. Right now two libraries are supported: zlib
(default)and zstd (if Postgres was
 
> +        configured with --with-zstd option). In both cases streaming mode is used.
> +      </para>
> +      </listitem>
> +     </varlistentry>
> +


- there should be a reference to potential security impacts when used in
  combination with encrypted connections
- What does " and it is up to the client whether to continue work
  without compression or report error" actually mean for a libpq parameter?
- What is the point of the "streaming mode" reference?



> @@ -263,6 +272,21 @@
>       </varlistentry>
>  
>       <varlistentry>
> +      <term>CompressionAck</term>
> +      <listitem>
> +       <para>
> +         Server acknowledges using compression for client-server communication protocol.
> +         Compression can be requested by client by including "compression" option in connection string.
> +         Client sends to the server list of compression algorithms, supported by client library
> +         (compression algorithm is identified by one letter: <literal>'f'</literal> - Facebook zstd,
<literal>'z'</literal>- zlib,...).
 
> +         If server supports one of this algorithms, then it acknowledges use of this algorithm and all subsequent
libpqmessages send both from client to server and
 
> +         visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it replies
with'n' (no compression)
 
> +         algorithm identifier and it is up to the client whether to continue work without compression or report
error.
> +       </para>
> +      </listitem>
> +     </varlistentry>

Why are compression methods identified by one byte identifiers? That
seems unnecessarily small, given this is commonly a once-per-connection
action?


The protocol sounds to me like there's no way to enable/disable
compression in an existing connection. To me it seems better to have an
explicit, client initiated, request to use a specific method of
compression (including none). That allows to enable compression for bulk
work, and disable it in other cases (e.g. for security sensitive
content, or for unlikely to compress well content).

I think that would also make cross-version handling easier, because a
newer client driver can send the compression request and handle the
error, without needing to reconnect or such.

Most importantly, I think such a design is basically a necessity to make
connection poolers to work in a sensible way.

And lastly, wouldn't it be reasonable to allow to specify things like
compression levels? All that doesn't have to be supported now, but I
think the protocol should take that into account.


> +<para>
> +        Used compression algorithm. Right now the following streaming compression algorithms are supported: 'f' -
Facebookzstd, 'z' - zlib, 'n' - no compression.
 
> +</para>

I would prefer this just be referenced as zstd or zstandard, not
facebook zstd. There's an RFC (albeit only "informational"), and it
doesn't name facebook, except as an employer:
https://tools.ietf.org/html/rfc8478


> +int
> +pq_configure(Port* port)
> +{
> +    char* client_compression_algorithms = port->compression_algorithms;
> +    /*
> +     * If client request compression, it sends list of supported compression algorithms.
> +     * Each compression algorirthm is idetified by one letter ('f' - Facebook zsts, 'z' - xlib)
> +     */

s/algorirthm/algorithm/
s/idetified/identified/
s/zsts/zstd/
s/xlib/zlib/

That's, uh, quite the typo density.


> +    if (client_compression_algorithms)
> +    {
> +        char server_compression_algorithms[ZPQ_MAX_ALGORITHMS];
> +        char compression_algorithm = ZPQ_NO_COMPRESSION;
> +        char compression[6] = {'z',0,0,0,5,0}; /* message length = 5 */
> +        int rc;

Why is this hand-rolling protocol messages?


> +        /* Intersect lists */
> +        while (*client_compression_algorithms != '\0')
> +        {
> +            if (strchr(server_compression_algorithms, *client_compression_algorithms))
> +            {
> +                compression_algorithm = *client_compression_algorithms;
> +                break;
> +            }
> +            client_compression_algorithms += 1;
> +        }

Why isn't this is handled within zpq?


> +        /* Send 'z' message to the client with selectde comression algorithm ('n' if match is ont found) */

s/selectde/selected/
s/comression/compression/
s/ont/not/


> +        socket_set_nonblocking(false);
> +        while ((rc = secure_write(MyProcPort, compression, sizeof(compression))) < 0
> +               && errno == EINTR);
> +        if ((size_t)rc != sizeof(compression))
> +            return -1;

Huh? This all seems like an abstraction violation.


> +        /* initialize compression */
> +        if (zpq_set_algorithm(compression_algorithm))
> +            PqStream = zpq_create((zpq_tx_func)secure_write, (zpq_rx_func)secure_read, MyProcPort);
> +    }
> +    return 0;
> +}

Why is zpq a wrapper around secure_write/read? I'm a bit worried this
will reduce the other places we could use zpq.


>  static int
> -pq_recvbuf(void)
> +pq_recvbuf(bool nowait)
>  {

> +        /* If srteaming compression is enabled then use correpondent comression read function. */

s/srteaming/streaming/
s/correpondent/correponding/
s/comression/compression/

Could you please try to proof-read the patch a bit? The typo density
is quite high.


> +        r = PqStream
> +            ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
> +                       PQ_RECV_BUFFER_SIZE - PqRecvLength, &processed)
> +            : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
> +                          PQ_RECV_BUFFER_SIZE - PqRecvLength);
> +        PqRecvLength += processed;

? : doesn't make sense to me in this case. This should be an if/else.


>          if (r < 0)
>          {
> +            if (r == ZPQ_DECOMPRESS_ERROR)
> +            {
> +                char const* msg = zpq_error(PqStream);
> +                if (msg == NULL)
> +                    msg = "end of stream";
> +                ereport(COMMERROR,
> +                        (errcode_for_socket_access(),
> +                         errmsg("failed to decompress data: %s", msg)));
> +                return EOF;
> +            }

I don't think we should error out with "failed to decompress data:"
e.g. when the client closed the connection.



> @@ -1413,13 +1457,18 @@ internal_flush(void)
>      char       *bufptr = PqSendBuffer + PqSendStart;
>      char       *bufend = PqSendBuffer + PqSendPointer;
>  
> -    while (bufptr < bufend)
> +    while (bufptr < bufend || zpq_buffered(PqStream) != 0) /* has more data to flush or unsent data in internal
compressionbuffer */
 
>      {

Overly long line.


> -        int            r;
> -
> -        r = secure_write(MyProcPort, bufptr, bufend - bufptr);
> -
> -        if (r <= 0)
> +        int        r;
> +        size_t  processed = 0;
> +        size_t  available = bufend - bufptr;
> +        r = PqStream
> +            ? zpq_write(PqStream, bufptr, available, &processed)
> +            : secure_write(MyProcPort, bufptr, available);

Same comment as above, re ternary expression.



> +/*
> + * Functions implementing streaming compression algorithm
> + */
> +typedef struct
> +{
> +    /*
> +     * Returns letter identifying compression algorithm.
> +     */
> +    char    (*name)(void);
> +
> +    /*
> +     * Create compression stream with using rx/tx function for fetching/sending compressed data
> +     */
> +    ZpqStream* (*create)(zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg);
> +
> +    /*
> +     * Read up to "size" raw (decompressed) bytes.
> +     * Returns number of decompressed bytes or error code.
> +     * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function.
> +     */
> +    ssize_t (*read)(ZpqStream *zs, void *buf, size_t size);
> +
> +    /*
> +     * Write up to "size" raw (decompressed) bytes.
> +     * Returns number of written raw bytes or error code returned by tx function.
> +     * In the last case amount of written raw bytes is stored in *processed.
> +     */
> +    ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed);

This should at least specify how these functions are supposed to handle
blocking/nonblocking sockets.


> +
> +#define ZSTD_BUFFER_SIZE (8*1024)
> +#define ZSTD_COMPRESSION_LEVEL 1

Add some arguments for choosing these parameters.


> +
> +/*
> + * Array with all supported compression algorithms.
> + */
> +static ZpqAlgorithm const zpq_algorithms[] =
> +{
> +#if HAVE_LIBZSTD
> +    {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered},
> +#endif
> +#if HAVE_LIBZ
> +    {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered},
> +#endif
> +    {NULL}
> +};

I think it's preferrable to use designated initializers.

Do we really need zero terminated lists? Works fine, but brrr.

> +/*
> + * Index of used compression algorithm in zpq_algorithms array.
> + */
> +static int zpq_algorithm_impl;

This is just odd API design imo. Why doesn't the dispatch work based on
an argument for zpq_create() and the ZpqStream * for the rest?

What if there's two libpq connections in one process? To servers
supporting different compression algorithms? This isn't going to fly.


> +/*
> + * Get list of the supported algorithms.
> + * Each algorithm is identified by one letter: 'f' - Facebook zstd, 'z' - zlib.
> + * Algorithm identifies are appended to the provided buffer and terminated by '\0'.
> + */
> +void
> +zpq_get_supported_algorithms(char algorithms[ZPQ_MAX_ALGORITHMS])
> +{
> +    int i;
> +    for (i = 0; zpq_algorithms[i].name != NULL; i++)
> +    {
> +        Assert(i < ZPQ_MAX_ALGORITHMS);
> +        algorithms[i] = zpq_algorithms[i].name();
> +    }
> +    Assert(i < ZPQ_MAX_ALGORITHMS);
> +    algorithms[i] = '\0';
> +}

Uh, doesn't this bake ZPQ_MAX_ALGORITHMS into the ABI? That seems
entirely unnecessary?



> @@ -2180,6 +2257,20 @@ build_startup_packet(const PGconn *conn, char *packet,
>          ADD_STARTUP_OPTION("replication", conn->replication);
>      if (conn->pgoptions && conn->pgoptions[0])
>          ADD_STARTUP_OPTION("options", conn->pgoptions);
> +    if (conn->compression && conn->compression[0])
> +    {
> +        bool enabled;
> +        /*
> +         * If compressoin is enabled, then send to the server list of compression algorithms
> +         * supported by client
> +         */

s/compressoin/compression/

> +        if (parse_bool(conn->compression, &enabled))
> +        {
> +            char compression_algorithms[ZPQ_MAX_ALGORITHMS];
> +            zpq_get_supported_algorithms(compression_algorithms);
> +            ADD_STARTUP_OPTION("compression", compression_algorithms);
> +        }
> +    }

I think this needs to work in a graceful manner across server
versions. You can make that work with an argument, using the _pq_
parameter stuff, but as I said earlier, I think it's a mistake to deal
with this in the startup packet anyway.

Greetings,

Andres Freund



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Tom Lane
Дата:
Сообщение: Re: duplicate function oid symbols
Следующее
От: Andres Freund
Дата:
Сообщение: Re: duplicate function oid symbols