Re: Deadlock detection

Поиск
Список
Период
Сортировка
От Oliver Jowett
Тема Re: Deadlock detection
Дата
Msg-id 4977C8FD.7040204@opencloud.com
обсуждение исходный текст
Ответ на Re: Deadlock detection  (Oliver Jowett <oliver@opencloud.com>)
Ответы Re: Deadlock detection  (Oliver Jowett <oliver@opencloud.com>)
Список pgsql-jdbc
Oliver Jowett wrote:

> I have a bit of time spare today, I might look at putting together that
> OutputStream wrapper.

Try this. I have not tested at all - it compiles but that's as far as I
got - but it should give you an idea of what I had in mind.

-O
? org/postgresql/core/AntiDeadlockStream.java
Index: org/postgresql/core/PGStream.java
===================================================================
RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/PGStream.java,v
retrieving revision 1.22
diff -u -r1.22 PGStream.java
--- org/postgresql/core/PGStream.java    8 Jan 2008 06:56:27 -0000    1.22
+++ org/postgresql/core/PGStream.java    22 Jan 2009 01:10:13 -0000
@@ -34,6 +34,7 @@
 {
     private final String host;
     private final int port;
+    private final boolean antiDeadlock;

     private final byte[] _int4buf;
     private final byte[] _int2buf;
@@ -52,12 +53,14 @@
      *
      * @param host the hostname to connect to
      * @param port the port number that the postmaster is sitting on
+     * @param antiDeadlock true to insert an anti-deadlock outputstream
      * @exception IOException if an IOException occurs below it.
      */
-    public PGStream(String host, int port) throws IOException
+    public PGStream(String host, int port, boolean antiDeadlock) throws IOException
     {
         this.host = host;
         this.port = port;
+        this.antiDeadlock = antiDeadlock;

         changeSocket(new Socket(host, port));
         setEncoding(Encoding.getJVMEncoding("US-ASCII"));
@@ -74,6 +77,10 @@
         return port;
     }

+    public boolean getAntiDeadlock() {
+        return antiDeadlock;
+    }
+
     public Socket getSocket() {
         return connection;
     }
@@ -110,6 +117,8 @@
         // Buffer sizes submitted by Sverre H Huseby <sverrehu@online.no>
         pg_input = new VisibleBufferedInputStream(connection.getInputStream(), 8192);
         pg_output = new BufferedOutputStream(connection.getOutputStream(), 8192);
+        if (antiDeadlock)
+            pg_output = new AntiDeadlockStream(pg_output, 8192, 30000);

         if (encoding != null)
             setEncoding(encoding);
Index: org/postgresql/core/v2/ConnectionFactoryImpl.java
===================================================================
RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v2/ConnectionFactoryImpl.java,v
retrieving revision 1.17
diff -u -r1.17 ConnectionFactoryImpl.java
--- org/postgresql/core/v2/ConnectionFactoryImpl.java    30 Sep 2008 03:42:48 -0000    1.17
+++ org/postgresql/core/v2/ConnectionFactoryImpl.java    22 Jan 2009 01:10:14 -0000
@@ -59,7 +59,7 @@
         PGStream newStream = null;
         try
         {
-            newStream = new PGStream(host, port);
+            newStream = new PGStream(host, port, Boolean.valueOf(info.getProperty("antiDeadlock")).booleanValue());

             // Construct and send an ssl startup packet if requested.
             if (trySSL)
@@ -147,7 +147,7 @@

             // We have to reconnect to continue.
             pgStream.close();
-            return new PGStream(pgStream.getHost(), pgStream.getPort());
+            return new PGStream(pgStream.getHost(), pgStream.getPort(), pgStream.getAntiDeadlock());

         case 'N':
             if (logger.logDebug())
Index: org/postgresql/core/v2/ProtocolConnectionImpl.java
===================================================================
RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v2/ProtocolConnectionImpl.java,v
retrieving revision 1.12
diff -u -r1.12 ProtocolConnectionImpl.java
--- org/postgresql/core/v2/ProtocolConnectionImpl.java    1 Apr 2008 07:19:20 -0000    1.12
+++ org/postgresql/core/v2/ProtocolConnectionImpl.java    22 Jan 2009 01:10:14 -0000
@@ -90,7 +90,7 @@
             if (logger.logDebug())
                 logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")");

-            cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort());
+            cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort(), false);
             cancelStream.SendInteger4(16);
             cancelStream.SendInteger2(1234);
             cancelStream.SendInteger2(5678);
Index: org/postgresql/core/v3/ConnectionFactoryImpl.java
===================================================================
RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v3/ConnectionFactoryImpl.java,v
retrieving revision 1.19
diff -u -r1.19 ConnectionFactoryImpl.java
--- org/postgresql/core/v3/ConnectionFactoryImpl.java    29 Nov 2008 07:40:30 -0000    1.19
+++ org/postgresql/core/v3/ConnectionFactoryImpl.java    22 Jan 2009 01:10:14 -0000
@@ -73,7 +73,7 @@
         PGStream newStream = null;
         try
         {
-            newStream = new PGStream(host, port);
+            newStream = new PGStream(host, port, Boolean.valueOf(info.getProperty("antiDeadlock")).booleanValue());

             // Construct and send an ssl startup packet if requested.
             if (trySSL)
@@ -178,7 +178,7 @@

             // We have to reconnect to continue.
             pgStream.close();
-            return new PGStream(pgStream.getHost(), pgStream.getPort());
+            return new PGStream(pgStream.getHost(), pgStream.getPort(), pgStream.getAntiDeadlock());

         case 'N':
             if (logger.logDebug())
Index: org/postgresql/core/v3/ProtocolConnectionImpl.java
===================================================================
RCS file: /cvsroot/jdbc/pgjdbc/org/postgresql/core/v3/ProtocolConnectionImpl.java,v
retrieving revision 1.13
diff -u -r1.13 ProtocolConnectionImpl.java
--- org/postgresql/core/v3/ProtocolConnectionImpl.java    1 Apr 2008 07:19:20 -0000    1.13
+++ org/postgresql/core/v3/ProtocolConnectionImpl.java    22 Jan 2009 01:10:14 -0000
@@ -90,7 +90,7 @@
             if (logger.logDebug())
                 logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")");

-            cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort());
+            cancelStream = new PGStream(pgStream.getHost(), pgStream.getPort(), false);
             cancelStream.SendInteger4(16);
             cancelStream.SendInteger2(1234);
             cancelStream.SendInteger2(5678);
package org.postgresql.core;

import java.io.*;

/**
 * Temporary hack to try to detect/avoid socket deadlocks caused
 * by blocking on write while we have lots of pending data to read
 * from the server (i.e. the server is also blocked on write).
 *
 * see the thread at http://archives.postgresql.org/pgsql-jdbc/2009-01/msg00045.php
 *
 * @author Oliver Jowett <oliver@opencloud.com>
 */
class AntiDeadlockStream extends OutputStream implements Runnable {
    private static final class BufferLock {}
    private final BufferLock bufferLock = new BufferLock();

    private final OutputStream wrapped;
    private final long flushTimeout;

    private byte[] buffer;
    private int bufferSize;
    private byte[] swapBuffer;

    private boolean closeRequest;
    private boolean flushRequest;
    private boolean closeComplete;

    private IOException failedException;

    AntiDeadlockStream(OutputStream wrapped, int initialSize, long flushTimeout) {
        this.wrapped = wrapped;
        this.flushTimeout = flushTimeout;
        this.buffer = new byte[initialSize];
        this.swapBuffer = new byte[initialSize];

        new Thread(this, "AntiDeadlock thread").start();
    }

    public void close() throws IOException {
        synchronized (bufferLock) {
            closeRequest = true;
            bufferLock.notifyAll();

            while (!closeComplete) {
                if (failedException != null)
                    throw (IOException) (new IOException("Write thread reported an error").initCause(failedException));

                try {
                    bufferLock.wait();
                } catch (InterruptedException ie) {
                    throw new InterruptedIOException();
                }
            }
        }
    }

    public void flush() throws IOException {
        synchronized (bufferLock) {
            long expiry = -1;

            flushRequest = true;
            bufferLock.notifyAll();

            while (true) {
                if (failedException != null)
                    throw (IOException) (new IOException("Write thread reported an error").initCause(failedException));
                if (closeRequest)
                    throw new IOException("Stream is closed");
                if (bufferSize == 0)
                    return;

                long delay;
                if (expiry == -1) {
                    delay = flushTimeout;
                    expiry = System.currentTimeMillis() + delay;
                } else {
                    delay = expiry - System.currentTimeMillis();
                }

                if (delay <= 0) {
                    System.err.println("Warning: possible socket deadlock detected (timeout=" + flushTimeout + ",
remainingbuffer=" + bufferSize); 
                    new Throwable("Deadlock call stack").fillInStackTrace().printStackTrace(System.err);
                    return;
                }

                try {
                    bufferLock.wait(delay);
                } catch (InterruptedException ie) {
                    throw new InterruptedIOException();
                }
            }
        }
    }

    public void write(int b) throws IOException {
        write(new byte[] { (byte)b }, 0, 1);
    }

    public void write(byte[] b) throws IOException {
        write(b, 0, b.length);
    }

    public void write(byte[] b, int off, int len) throws IOException {
        if (b == null)
            throw new NullPointerException();

        if (off < 0 || len < 0 || off+len > b.length)
            throw new IndexOutOfBoundsException();

        synchronized (bufferLock) {
            if (closeRequest)
                throw new IOException("Stream is closed");

            if (failedException != null)
                throw (IOException) (new IOException("Write thread reported an error").initCause(failedException));

            int needs = bufferSize + len;
            int newSize = buffer.length;
            while (newSize < needs)
                newSize *= 2;

            if (newSize != buffer.length) {
                byte[] newBuffer = new byte[newSize];
                System.arraycopy(buffer, 0, newBuffer, 0, bufferSize);
                buffer = newBuffer;
            }

            if (bufferSize == 0)
                bufferLock.notifyAll();

            System.arraycopy(b, off, buffer, bufferSize, len);
            bufferSize += len;
        }
    }

    //
    // Runnable
    //

    public void run() {
        while (true) {
            boolean doFlush;
            boolean doClose;
            int writeLength;

            synchronized (bufferLock) {
                if (bufferSize == 0 && !closeRequest && !flushRequest) {
                    try {
                        bufferLock.wait();
                    } catch (InterruptedException ie) {
                        failedException = new InterruptedIOException("write thread interrupted");
                        bufferLock.notifyAll();
                        return;
                    }

                    continue;
                }

                byte[] oldBuffer = buffer;
                buffer = swapBuffer;
                swapBuffer = buffer;

                writeLength = bufferSize;
                doFlush = flushRequest;
                doClose = closeRequest;

                flushRequest = false;

                bufferLock.notifyAll();
            }

            try {
                if (writeLength > 0)
                    wrapped.write(swapBuffer, 0, writeLength);
                if (flushRequest)
                    wrapped.flush();
                if (closeRequest) {
                    wrapped.close();

                    synchronized (bufferLock) {
                        closeComplete = true;
                        bufferLock.notifyAll();
                    }

                    return;
                }
            } catch (IOException ioe) {
                synchronized (bufferLock) {
                    failedException = ioe;
                    bufferLock.notifyAll();
                    try {
                        wrapped.close();
                    } catch (IOException ioe2) {
                        // Ignore it.
                    }
                    return;
                }
            }
        }
    }
}

В списке pgsql-jdbc по дате отправления:

Предыдущее
От: Simon Riggs
Дата:
Сообщение: Re: Deadlock detection
Следующее
От: Oliver Jowett
Дата:
Сообщение: Re: Deadlock detection