[GENERAL] logical replication API to read WAL file through replication slot
От | Dipesh Dangol |
---|---|
Тема | [GENERAL] logical replication API to read WAL file through replication slot |
Дата | |
Msg-id | CA+=-RVY0anrEoHn+u-4235AWE-ckDzmdhmWLC-BPC3r5mgSXsQ@mail.gmail.com обсуждение исходный текст |
Список | pgsql-general |
hi,
I am trying to implement logical replication stream API of postgresql.I am facing unusual connection breakdown problem. Here is the simple code that I am
using to read WAL file:
String url = "jdbc:postgresql://pcnode2: 5432/benchmarksql";
Properties props = new Properties();
PGProperty.USER.set(props, "benchmarksql");
PGProperty.PASSWORD.set(props, "benchmarksql");
PGProperty.ASSUME_MIN_SERVER_ VERSION.set(props, "9.4");
PGProperty.REPLICATION.set( props, "database");
PGProperty.PREFER_QUERY_MODE. set(props, "simple");
Connection conn = DriverManager.getConnection( url, props);
PGConnection replConnection = conn.unwrap(PGConnection. class);
PGReplicationStream stream = replConnection. getReplicationAPI()
.replicationStream().logical()
.withSlotName("replication_ slot3")
.withSlotOption("include-xids" , true)
.withSlotOption("include- timestamp", "on")
.withSlotOption("skip-empty- xacts", true)
.withStatusInterval(20, TimeUnit.MILLISECONDS).start() ;
while (true) {
ByteBuffer msg = stream.read();
if (msg == null) {
TimeUnit.MILLISECONDS.sleep( 10L);
continue;
}
int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
String data = new String(source, offset, length);
System.out.println(data);
stream.setAppliedLSN(stream. getLastReceiveLSN());
stream.setFlushedLSN(stream. getLastReceiveLSN());
}
Properties props = new Properties();
PGProperty.USER.set(props, "benchmarksql");
PGProperty.PASSWORD.set(props, "benchmarksql");
PGProperty.ASSUME_MIN_SERVER_
PGProperty.REPLICATION.set(
PGProperty.PREFER_QUERY_MODE.
Connection conn = DriverManager.getConnection(
PGConnection replConnection = conn.unwrap(PGConnection.
PGReplicationStream stream = replConnection.
.replicationStream().logical()
.withSlotName("replication_
.withSlotOption("include-xids"
.withSlotOption("include-
.withSlotOption("skip-empty-
.withStatusInterval(20, TimeUnit.MILLISECONDS).start()
while (true) {
ByteBuffer msg = stream.read();
if (msg == null) {
TimeUnit.MILLISECONDS.sleep(
continue;
}
int offset = msg.arrayOffset();
byte[] source = msg.array();
int length = source.length - offset;
String data = new String(source, offset, length);
System.out.println(data);
stream.setAppliedLSN(stream.
stream.setFlushedLSN(stream.
}
Even the slightest modification in the code like commenting System.out.println(data);
which is just printing the data in the console, causes connection breakdown problem with
following error msg
org.postgresql.util. PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3. QueryExecutorImpl. readFromCopy( QueryExecutorImpl.java:1028)
at org.postgresql.core.v3. CopyDualImpl.readFromCopy( CopyDualImpl.java:41)
at org.postgresql.core.v3. replication. V3PGReplicationStream. receiveNextData( V3PGReplicationStream.java: 155)
at org.postgresql.core.v3. replication. V3PGReplicationStream. readInternal( V3PGReplicationStream.java: 124)
at org.postgresql.core.v3. replication. V3PGReplicationStream.read( V3PGReplicationStream.java:70)
at Server.main(Server.java:52)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream. socketRead0(Native Method)
at java.net.SocketInputStream. socketRead(SocketInputStream. java:116)
at java.net.SocketInputStream. read(SocketInputStream.java: 171)
at java.net.SocketInputStream. read(SocketInputStream.java: 141)
at org.postgresql.core. VisibleBufferedInputStream. readMore( VisibleBufferedInputStream. java:140)
at org.postgresql.core. VisibleBufferedInputStream. ensureBytes( VisibleBufferedInputStream. java:109)
at org.postgresql.core. VisibleBufferedInputStream. read( VisibleBufferedInputStream. java:191)
at org.postgresql.core.PGStream. receive(PGStream.java:495)
at org.postgresql.core.PGStream. receive(PGStream.java:479)
at org.postgresql.core.v3. QueryExecutorImpl. processCopyResults( QueryExecutorImpl.java:1161)
at org.postgresql.core.v3. QueryExecutorImpl. readFromCopy( QueryExecutorImpl.java:1026)
... 5 more
at org.postgresql.core.v3.
at org.postgresql.core.v3.
at org.postgresql.core.v3.
at org.postgresql.core.v3.
at org.postgresql.core.v3.
at Server.main(Server.java:52)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.
at java.net.SocketInputStream.
at java.net.SocketInputStream.
at java.net.SocketInputStream.
at org.postgresql.core.
at org.postgresql.core.
at org.postgresql.core.
at org.postgresql.core.PGStream.
at org.postgresql.core.PGStream.
at org.postgresql.core.v3.
at org.postgresql.core.v3.
... 5 more
But due to this unusual behavior I couldn't implement properly.
Can somebody give me some hint how to solve this problem.
Thank you.
Dipesh Dangol
В списке pgsql-general по дате отправления: