[GENERAL] logical replication API to read WAL file through replication slot

Поиск
Список
Период
Сортировка
От Dipesh Dangol
Тема [GENERAL] logical replication API to read WAL file through replication slot
Дата
Msg-id CA+=-RVY0anrEoHn+u-4235AWE-ckDzmdhmWLC-BPC3r5mgSXsQ@mail.gmail.com
обсуждение исходный текст
Список pgsql-general
hi,

I am trying to implement logical replication stream API of postgresql.
I am facing unusual connection breakdown problem. Here is the simple code that I am
using to read WAL file:

String url = "jdbc:postgresql://pcnode2:5432/benchmarksql";
            Properties props = new Properties();
            PGProperty.USER.set(props, "benchmarksql");
            PGProperty.PASSWORD.set(props, "benchmarksql");
            PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
            PGProperty.REPLICATION.set(props, "database");
            PGProperty.PREFER_QUERY_MODE.set(props, "simple");

            Connection conn = DriverManager.getConnection(url, props);
            PGConnection replConnection = conn.unwrap(PGConnection.class);

            PGReplicationStream stream = replConnection.getReplicationAPI()
                    .replicationStream().logical()
                    .withSlotName("replication_slot3")
                    .withSlotOption("include-xids", true)
                    .withSlotOption("include-timestamp", "on")
                    .withSlotOption("skip-empty-xacts", true)
                    .withStatusInterval(20, TimeUnit.MILLISECONDS).start();
            while (true) {

                ByteBuffer msg = stream.read();

                if (msg == null) {
                    TimeUnit.MILLISECONDS.sleep(10L);
                    continue;
                }

                int offset = msg.arrayOffset();
                byte[] source = msg.array();
                int length = source.length - offset;
                String data = new String(source, offset, length);
                System.out.println(data);

                stream.setAppliedLSN(stream.getLastReceiveLSN());
                stream.setFlushedLSN(stream.getLastReceiveLSN());

            }

Even the slightest modification in the code like commenting System.out.println(data);
which is just printing the data in the console, causes connection breakdown problem with
following error msg

org.postgresql.util.PSQLException: Database connection failed when reading from copy
    at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1028)
    at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
    at Server.main(Server.java:52)
Caused by: java.net.SocketException: Socket closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140)
    at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109)
    at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:191)
    at org.postgresql.core.PGStream.receive(PGStream.java:495)
    at org.postgresql.core.PGStream.receive(PGStream.java:479)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1161)
    at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1026)
    ... 5 more

I am trying to implement some logic like filtering out the unrelated table after reading log.
But due to this unusual behavior I couldn't implement properly.
Can somebody give me some hint how to solve this problem.

Thank you.

Dipesh Dangol

В списке pgsql-general по дате отправления:

Предыдущее
От: Nico Williams
Дата:
Сообщение: Re: [GENERAL] Schema/table replication
Следующее
От: Jeff Janes
Дата:
Сообщение: Re: [GENERAL] Confused about max_standby_streaming_delay