diff -brcN postgresql-jdbc-8.2-505.src.orig/build.xml postgresql-jdbc-8.2-505.src.copy/build.xml
*** postgresql-jdbc-8.2-505.src.orig/build.xml 2006-11-29 06:00:15.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/build.xml 2007-06-11 17:58:32.000000000 +0300
***************
*** 84,89 ****
--- 84,97 ----
+
+
+
+
+
+
+
+
***************
*** 116,121 ****
--- 124,130 ----
+
***************
*** 393,398 ****
--- 402,408 ----
+
***************
*** 428,433 ****
--- 438,444 ----
+
diff -brcN postgresql-jdbc-8.2-505.src.orig/doc/pgjdbc.xml postgresql-jdbc-8.2-505.src.copy/doc/pgjdbc.xml
*** postgresql-jdbc-8.2-505.src.orig/doc/pgjdbc.xml 2007-02-19 08:04:48.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/doc/pgjdbc.xml 2007-06-11 17:58:33.000000000 +0300
***************
*** 2481,2486 ****
--- 2481,2526 ----
+
+ Copy
+
+ Bulk data transfer with INSERT or SELECT
+ can be quite slow for large amounts of data.
+ PostgreSQL provides a special SQL
+ statement COPY for this purpose.
+ It can be used either to exchange data between a file and a table, which requires
+ superuser privileges to access the file on database server, or between the client and
+ a table, which requires special copy subprotocol support from the database driver.
+ Latter is available as an extension of this JDBC driver.
+
+
+
+ You should pay attention to always end your copy operation explicitly with
+ close (end reading), done (end writing)
+ or fail (cancel writing). Otherwise your database connection
+ will not return from copy subprotocol to normal operation.
+
+
+
+ // get hold of the extension interface
+ import org.postgresql.copy.CopyManager;
+ CopyManager copier = ((org.postgresql.PGConnection)conn).getCopyAPI();
+
+ // write some data to a fictional (varchar,int) table
+ CopyManager.CopyIntoDB copywriter = copier.copyIntoDB("COPY mytable FROM STDIN");
+ copywriter.start();
+ copywriter.write( new byte[][]{{"row\t123\n".getBytes()}}, 0, 1 );
+ copywriter.done();
+
+ // read data from a fictional table
+ CopyManager.CopyOutOfDB copyreader = copier.copyOutOfDB("COPY mytable TO STDOUT");
+ byte[][] result = new byte[1][];
+ copyreader.read( result, 0, 1 );
+ copyreader.close();
+
+
+
+
diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/PGConnection.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/PGConnection.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/PGConnection.java 2005-04-20 03:10:58.000000000 +0300
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/PGConnection.java 2007-06-11 17:58:32.000000000 +0300
***************
*** 11,16 ****
--- 11,17 ----
import java.sql.*;
import org.postgresql.core.Encoding;
+ import org.postgresql.copy.CopyManager;
import org.postgresql.fastpath.Fastpath;
import org.postgresql.largeobject.LargeObjectManager;
***************
*** 30,35 ****
--- 31,43 ----
public PGNotification[] getNotifications() throws SQLException;
/**
+ * This returns the COPY API for the current connection.
+ * @since 8.2
+ * @since 7.5
+ */
+ public CopyManager getCopyAPI() throws SQLException;
+
+ /**
* This returns the LargeObject API for the current connection.
* @since 7.3
*/
diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyManager.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyManager.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyManager.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyManager.java 2007-06-11 22:40:25.000000000 +0300
***************
*** 0 ****
--- 1,249 ----
+ package org.postgresql.copy;
+
+ import java.io.IOException;
+ import java.sql.SQLException;
+ import java.sql.SQLWarning;
+ import java.util.Vector;
+
+ import org.postgresql.core.*;
+ import org.postgresql.util.*;
+ import org.postgresql.core.v3.QueryExecutorImpl; // COPY is only implemented for protocol v3
+
+ /**
+ * API for PostgreSQL protocol version 3 COPY block data transfer
+ * @author kato@iki.fi
+ * @since 8.2
+ */
+ public class CopyManager {
+ private final QueryExecutorImpl queryExecutor;
+ private final Logger logger;
+
+ public CopyManager(BaseConnection connection) {
+ this.queryExecutor = (QueryExecutorImpl)connection.getQueryExecutor();
+ this.logger = connection.getLogger();
+ }
+
+ /**
+ * Interface common to both copiers
+ */
+
+ public interface Copier {
+ /**
+ * Issue the statement to start copying
+ * @throws SQLException if starting fails
+ */
+ void start() throws SQLException;
+
+ /**
+ * @return current copy mode; see QueryExecutor.CopyState
+ */
+ int getState();
+ }
+
+ /**
+ * Reader returned from CopyManager.copyOutOfDB() for a COPY TO STDOUT statement.
+ * You may discard of it once you have read all the input.
+ * @author kato@iki.fi
+ * @since 8.2
+ */
+ public interface CopyOutOfDB extends Copier {
+
+ /**
+ * Read data rows from database statement COPY TO STDOUT.
+ * @param copydataRows reusable array for storing received data rows
+ * @param rowOffset index to start filling input array at; usually zero
+ * @param maxRows index to stop filling input array at; usually copydataRows.length
+ * @return Number of rows now in input array; copy is completed when it is less than maxRows.
+ * @throws IOException from low level database connection
+ * @throws SQLException on miscommunication with the database server
+ */
+ int read(byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException;
+
+ /**
+ * Close explicitly when you don't want to read any more data.
+ * @throws SQLException on miscommunication with the database server
+ * @throws IOException from low level database connection
+ */
+ void close() throws IOException, SQLException;
+ }
+
+ /**
+ * Writer returned from CopyManager.copyIntoDB() for a COPY FROM STDIN statement.
+ * You must call one of the finishing methods to return to normal connection.
+ * @author kato@iki.fi
+ * @since 8.2
+ */
+ public interface CopyIntoDB extends Copier {
+
+ /**
+ * Write data for database statement COPY FROM STDIN.
+ * @param copydataRows reusable array of chunks of data
+ * @param rowOffset index at output array to start feeding from; usually zero
+ * @param maxRows index at output array to end feeding at; usually copydataRows.length
+ * @throws IOException from low level database connection
+ * @throws SQLException on miscommunication with the database server or running out of space
+ */
+ void write(byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException;
+
+ /**
+ * Finish feeding data and save it to the table.
+ * @throws SQLException on miscommunication with the database server or running out of space
+ * @throws IOException from low level database connection
+ */
+ void done() throws IOException, SQLException;
+
+ /**
+ * Cancel feeding data and discard changes.
+ * @throws SQLException on miscommunication with the database server
+ * @throws IOException from low level database connection
+ */
+ void fail(byte[] logErrorMessage) throws IOException, SQLException;
+
+ /**
+ * Only returns valid number after succesful completion with PostgreSQL server versions 8.2 and above.
+ * @return number of rows updated
+ */
+ int getUpdateCount();
+ }
+
+ /**
+ * Prepares a copy from database.
+ * @param sql the COPY TO STDOUT command
+ * @return reader of copy data
+ * @throws SQLException if execution of the initial statement fails
+ */
+ public CopyOutOfDB copyOutOfDB(String sql) throws SQLException {
+ return new CopyOutOfDBHandler(sql);
+ }
+
+ /**
+ * Prepares a copy into database.
+ * @param sql the COPY FROM STDIN command.
+ * @return writer of copy data
+ * @throws SQLException if execution of the initial statement fails
+ */
+ public CopyIntoDB copyIntoDB(String sql) throws SQLException {
+ return new CopyIntoDBHandler(sql);
+ }
+
+ //
+ // Internals
+ //
+
+ private abstract class CopierImpl {
+ String sql;
+ CopyResultHandler handler;
+
+ public void start() throws SQLException {
+ if(queryExecutor.getCopyState() != CopyState.NONE)
+ throw new PSQLException(GT.tr("Tried to initiate simultaneous copy operations over single connection"), PSQLState.PROTOCOL_VIOLATION);
+
+ // queryExecutor.execute(queryExecutor.createSimpleQuery(sql), null, handler, 0, 0, QueryExecutor.QUERY_NO_RESULTS | QueryExecutor.QUERY_ONESHOT);
+ try {
+ queryExecutor.startCopy(handler, sql);
+ } catch(IOException ioe) {
+ throw new PSQLException(GT.tr("Connection failed during copy start"), PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe);
+ }
+ }
+
+ abstract class CopyResultHandler implements ResultHandler {
+
+ public void handleError(SQLException error) {
+ throw new RuntimeException(error);
+ }
+
+ public void handleResultRows(Query fromQuery, Field[] fields, Vector tuples, ResultCursor cursor) {
+ throw new RuntimeException("Normal result rows handed to copy handler");
+ }
+
+ public void handleWarning(SQLWarning warning) {
+ logger.info("Copy handler received an unexpected warning", warning);
+ }
+ }
+
+ public int getState() {
+ return queryExecutor.getCopyState();
+ }
+ }
+
+ private class CopyIntoDBHandler extends CopierImpl implements CopyIntoDB {
+ int updateCount = 0;
+
+ private class WriteHandler extends CopierImpl.CopyResultHandler {
+
+ public void handleCommandStatus(String status, int rowCount, long insertOID) {
+ if(getState() == CopyState.NONE && status.startsWith("COPY IN ")) {
+ // here we could check the format returned by server, but it's really up to the user to conform to the format he specified
+ } else if(getState() == CopyState.DONE && status.startsWith("COPY")) {
+ if(rowCount>0) // only with server 8.2+
+ updateCount += rowCount;
+ } else {
+ logger.info("Unexpected command status in copywriter state " + Integer.toString(getState()) + ": " + status);
+ }
+ }
+
+ public void handleCompletion() throws SQLException {
+ if(getState() != CopyState.DONE )
+ logger.info("Unexpected completion of copywriting in state " + Integer.toString(getState()));
+ }
+
+ public void handleError(SQLException error) {
+ if(getState() != CopyState.FAIL) // It is normal to get an error response as a result to CopyFail
+ super.handleError(error);
+ }
+ }
+
+ protected CopyIntoDBHandler(String sql) throws SQLException {
+ this.sql = sql;
+ handler = new WriteHandler();
+ }
+
+ public void done() throws IOException, SQLException {
+ queryExecutor.endCopy(handler);
+ }
+
+ public void fail(byte[] logErrorMessage) throws IOException, SQLException {
+ queryExecutor.failCopy(handler, logErrorMessage);
+ }
+
+ public void write(byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException {
+ queryExecutor.sendCopydata(handler, copydataRows, rowOffset, maxRows);
+ }
+
+ public int getUpdateCount() {
+ return updateCount;
+ }
+ }
+
+ private class CopyOutOfDBHandler extends CopierImpl implements CopyOutOfDB {
+
+ private class ReadHandler extends CopierImpl.CopyResultHandler {
+ public void handleCommandStatus(String status, int updateCount, long insertOID) {
+ if(getState() == CopyState.NONE && status.startsWith("COPY OUT ")) {
+ // here we could check the format returned by server, but it's really up to the user to conform to the format he specified
+ } else if(getState() == CopyState.DONE && status.startsWith("COPY")) {
+ // ok, nothing to do
+ } else {
+ logger.info("Unexpected command status in copyreader state " + Integer.toString(getState()) + ": " + status);
+ }
+ }
+
+ public void handleCompletion() throws SQLException { // called upon CopyDone, ie. all rows received
+ // well... it's perfectly ok!
+ }
+ }
+
+ protected CopyOutOfDBHandler(String sql) throws SQLException {
+ this.sql = sql;
+ handler = new ReadHandler();
+ }
+
+ public void close() throws IOException, SQLException {
+ queryExecutor.skipCopy(handler);
+ }
+
+ public int read(byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException {
+ return queryExecutor.receiveCopydata(handler, copydataRows, rowOffset, maxRows);
+ }
+ }
+ }
diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyState.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyState.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyState.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyState.java 2007-06-11 17:58:32.000000000 +0300
***************
*** 0 ****
--- 1,13 ----
+ package org.postgresql.copy;
+
+ /**
+ * Actually just an ENUM: state of the copy machine
+ */
+ public class CopyState {
+ public static final int NONE = 0;
+ public static final int INTODB = 1;
+ public static final int OUTOFDB = 2;
+ public static final int DONE = 3;
+ public static final int FAIL = 4;
+ }
+
diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/core/v3/QueryExecutorImpl.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/core/v3/QueryExecutorImpl.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/core/v3/QueryExecutorImpl.java 2006-12-01 10:53:45.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/core/v3/QueryExecutorImpl.java 2007-06-11 17:58:32.000000000 +0300
***************
*** 27,32 ****
--- 27,33 ----
import org.postgresql.util.PSQLState;
import org.postgresql.util.ServerErrorMessage;
import org.postgresql.util.GT;
+ import org.postgresql.copy.CopyState;
/**
* QueryExecutor implementation for the V3 protocol.
***************
*** 630,635 ****
--- 631,892 ----
return returnValue;
}
+ //
+ // Copy subprotocol implementation
+ //
+
+ private static int currentCopyState = CopyState.NONE;
+
+ public int getCopyState() {
+ return currentCopyState;
+ }
+
+ /**
+ * startCopy starts a raw query (presumably COPY ...)
+ * @param handler associated result handler
+ * @param sql the statement to send
+ * @throws IOException passed from lower level
+ * @throws SQLException on communications mismatch with server
+ */
+ public void startCopy(ResultHandler handler, String sql) throws IOException, SQLException {
+ byte message[] = sql.getBytes();
+ int messageSize = 4 + message.length + 1;
+ pgStream.SendChar('Q');
+ pgStream.SendInteger4(messageSize);
+ pgStream.Send(message);
+ pgStream.SendChar(0);
+ pgStream.flush();
+ processCopyResults(handler, null, -1, 0);
+ }
+
+ /**
+ * receiveCopydata is called to read copy data from server
+ * @param handler the ResultHandler overseeing this operation, notably handleCompletion()
+ * @param copydataRows buffer for rows to read
+ * @param rowOffset index to start inserting data at in buffer (0 = beginning)
+ * @param maxRows return after reading up to this index in buffer (copydataRows.length = fill it up)
+ * @return int rowOffset; if equal to maxRows, there is more data to read; otherwise end of copy or error.
+ * @throws IOException passed from low-level connection
+ * @throws SQLException passed from handler.handleCompletion()
+ */
+ public int receiveCopydata(ResultHandler handler, byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException {
+ if(currentCopyState != CopyState.OUTOFDB)
+ throw new PSQLException(GT.tr("Attempted to receive copydata while not in COPY TO STDIN"), PSQLState.PROTOCOL_VIOLATION);
+ return processCopyResults(handler, copydataRows, rowOffset, maxRows);
+ }
+
+ /**
+ * sendCopydata sends raw copydata to backend
+ * @param handler ResultHandler for state changes (currently unused)
+ * @param copydataRows array of copydata rows as bytearrays to send
+ * @param rowOffset how many rows to skip from beginning of copydataRows (0 to send all from beginning)
+ * @param maxRows index of first row not to send (copydataRows.length to send all up to end)
+ * @throws IOException upon connection failure
+ */
+ public void sendCopydata(ResultHandler handler, byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException {
+ if(currentCopyState != CopyState.INTODB)
+ throw new PSQLException(GT.tr("Attempted to send copydata while not in COPY FROM STDOUT"), PSQLState.PROTOCOL_VIOLATION);
+ while(rowOffset CopyDone");
+
+ pgStream.SendChar('c'); // CopyDone
+ pgStream.SendInteger4(4); // No message in specification
+ pgStream.flush();
+
+ currentCopyState = CopyState.DONE;
+ processCopyResults(handler, null, 0, 0);
+ }
+
+ /**
+ * failCopy is used to cancel copydata mode discarding sent data.
+ * @param handler the ResultHandler especially handling the ErrorMessage as expected correct response
+ * @param logErrorMessage optional message for backend to log
+ * @throws IOException passed from low-level connection
+ * @throws SQLException passed from handler
+ */
+ public void failCopy(ResultHandler handler, byte[] logErrorMessage) throws IOException, SQLException {
+ // internal state not checked to ensure that a copy can always be cancelled.
+ pgStream.SendChar('f');
+ pgStream.SendInteger4(4+logErrorMessage.length+1);
+ if(logErrorMessage.length > 0)
+ pgStream.Send(logErrorMessage);
+ pgStream.SendChar('\0');
+ pgStream.flush();
+
+ currentCopyState = CopyState.FAIL;
+
+ processCopyResults(handler, null, 0, 0);
+ }
+
+
+ /**
+ * Skips rest of copydata from server
+ * @param handler the ResultHandler overseeing this operation
+ * @throws IOException passed from low-level connection
+ * @throws SQLException upon internal or communications inconsistency
+ */
+ public void skipCopy(ResultHandler handler) throws IOException, SQLException {
+ if(currentCopyState == CopyState.OUTOFDB) {
+ currentCopyState = CopyState.FAIL;
+ processCopyResults(handler, null, 0, 0);
+ }
+ }
+
+ /**
+ * processCopyResults handles copy subprotocol messages from server.
+ * @param handler the ResultHandler overseeing this operation, especially errors or completion.
+ * @param copydataRows array of bytearrays of data read from server, null when not expecting input
+ * @param rowOffset offset in copydataRows to start inserting data
+ * @param maxRows end processing when rowOffset reaches this value (presumably copydataRows.length)
+ * @throws IOException from low-level stream
+ * @throws SQLException from handler
+ * @return int rowOffset current number of rows in copydataRows
+ */
+ private int processCopyResults(ResultHandler handler, byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException {
+ // do not check internal state to ensure we can always resolve from copy mode.
+
+ if(copydataRows!=null && maxRows<=rowOffset) // otherwise this state would drop a row
+ throw new PSQLException(GT.tr("Requested less than one row of copydata"), PSQLState.INVALID_PARAMETER_VALUE);
+
+ boolean endReceiving = false;
+ int l_len;
+ while(!endReceiving) {
+ int c = pgStream.ReceiveChar();
+ switch(c) {
+ case 'A': // Asynchronous Notify
+ receiveAsyncNotify();
+ break;
+
+ case 'C': // Command Complete
+ String status = receiveCommandStatus();
+
+ if(currentCopyState != CopyState.DONE)
+ throw new PSQLException(GT.tr("Unexpected command completion while copying"), PSQLState.PROTOCOL_VIOLATION);
+
+ int count = status.startsWith("COPY ") ? Integer.parseInt(status.substring(1 + status.lastIndexOf(' '))) : -1;
+ handler.handleCommandStatus( status, count, 0 );
+ break;
+
+ case 'N': // Notice Response
+ handler.handleWarning(receiveNoticeResponse());
+ break;
+
+ case 'E': // ErrorMessage (expected response to CopyFail)
+ handler.handleError(receiveErrorResponse());
+ break;
+
+ case 'G': // CopyInResponse
+ currentCopyState = CopyState.INTODB;
+ receiveCopyHeader(handler); // discarded as unnecessary: application is responsible of stream structure
+ handler.handleCommandStatus( "COPY IN", -1, 0 );
+ endReceiving = true;
+ break;
+
+ case 'H': // CopyOutResponse
+ currentCopyState = CopyState.OUTOFDB;
+ receiveCopyHeader(handler); // discarded as unnecessary: application is responsible of stream structure
+ handler.handleCommandStatus( "COPY OUT", -1, 0 );
+ endReceiving = true;
+ break;
+
+ case 'd': // CopyData
+ if (logger.logDebug())
+ logger.debug(" <=BE CopyData");
+
+ l_len = pgStream.ReceiveIntegerR(4) - 4;
+ byte[] buf = new byte[l_len];
+ pgStream.Receive(buf, 0, l_len);
+ if(copydataRows != null) {
+ copydataRows[rowOffset++] = buf;
+ endReceiving = rowOffset >= maxRows;
+ } else if(currentCopyState == CopyState.FAIL) {
+ // discard (after fail or skip)
+ } else {
+ throw new PSQLException(GT.tr("Unexpected copydata package"), PSQLState.PROTOCOL_VIOLATION);
+ }
+ break;
+
+ case 'c': // CopyDone (expected after all copydata received)
+ currentCopyState = CopyState.DONE; // copy done, waiting for completion message
+ l_len = pgStream.ReceiveIntegerR(4) - 4;
+ if(l_len > 0)
+ pgStream.Receive(l_len);
+ if (logger.logDebug())
+ logger.debug(" <=BE CopyDone");
+
+ handler.handleCompletion();
+ break;
+
+ case 'Z': // ReadyForQuery: After FE:CopyDone => BE:CommandComplete
+ receiveRFQ();
+
+ if(currentCopyState != CopyState.DONE && currentCopyState != CopyState.FAIL)
+ throw new PSQLException(GT.tr("Server unexpectedly reset copying"), PSQLState.PROTOCOL_VIOLATION);
+
+ currentCopyState = CopyState.NONE; // copy completed
+ endReceiving = true;
+ break;
+
+ default:
+ currentCopyState = CopyState.NONE; // presumably we've returned from the copy subprotocol state.
+ throw new IOException("Unexpected packet type: " + c);
+ }
+ }
+
+ return rowOffset;
+ }
+
+ /**
+ * Used upon Copy in/out start signals from server to resolve copy format header
+ * @param handler the ResultHandler handling current query
+ * @return int[] with overall format followed by format of each field
+ * @throws IOException if reading from stream fails
+ */
+ private int[] receiveCopyHeader(ResultHandler handler) throws IOException {
+ int l_len = pgStream.ReceiveIntegerR(4) - (4+1+2);
+
+ if(l_len<0) // assert sanity
+ handler.handleError(new PSQLException(GT.tr("Too short copy header"), PSQLState.PROTOCOL_VIOLATION));
+
+ int copyFormat = pgStream.ReceiveIntegerR(1);
+ int numColumns = pgStream.ReceiveIntegerR(2);
+
+ if(l_len!=numColumns*2) // assert consistency
+ handler.handleError(new PSQLException(GT.tr("Inconsistent copy header length: " + Integer.toString(l_len) + " != " + Integer.toString(numColumns)), PSQLState.PROTOCOL_VIOLATION));
+
+ PSQLException err = null; // avoid repeated calls
+
+ int[] result = new int[numColumns+1];
+ result[0] = copyFormat;
+ for(int i=1; i<=numColumns; i++)
+ result[i] = pgStream.ReceiveIntegerR(2);
+
+ return result;
+ }
+
/*
* Send a query to the backend.
*/
***************
*** 1419,1425 ****
case 'd': // CopyData
{
// COPY FROM STDIN / COPY TO STDOUT, neither of which are currently
! // supported.
// CopyInResponse can only occur in response to an Execute we sent.
// Every Execute we send is followed by either a Bind or a ClosePortal,
--- 1676,1682 ----
case 'd': // CopyData
{
// COPY FROM STDIN / COPY TO STDOUT, neither of which are currently
! // supported by normal result processing.
// CopyInResponse can only occur in response to an Execute we sent.
// Every Execute we send is followed by either a Bind or a ClosePortal,
***************
*** 1430,1436 ****
/* discard */
pgStream.Receive(l_len);
! handler.handleError(new PSQLException(GT.tr("The driver currently does not support COPY operations."), PSQLState.NOT_IMPLEMENTED));
}
break;
--- 1687,1693 ----
/* discard */
pgStream.Receive(l_len);
! handler.handleError(new PSQLException(GT.tr("Use Copy extension for COPY operations."), PSQLState.NOT_IMPLEMENTED));
}
break;
***************
*** 1491,1496 ****
--- 1748,1754 ----
handler.handleCompletion();
}
+
/*
* Receive the field descriptions from the back end.
*/
diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/jdbc2/AbstractJdbc2Connection.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/jdbc2/AbstractJdbc2Connection.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/jdbc2/AbstractJdbc2Connection.java 2007-04-16 21:31:44.000000000 +0300
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/jdbc2/AbstractJdbc2Connection.java 2007-06-11 17:58:32.000000000 +0300
***************
*** 23,28 ****
--- 23,29 ----
import org.postgresql.util.PGobject;
import org.postgresql.util.PSQLException;
import org.postgresql.util.GT;
+ import org.postgresql.copy.CopyManager;
/**
* This class defines methods of the jdbc2 specification.
***************
*** 1090,1093 ****
--- 1091,1102 ----
{
return bindStringAsVarchar;
}
+
+ private CopyManager copyManager = null;
+ public CopyManager getCopyAPI() throws SQLException
+ {
+ if (copyManager == null)
+ copyManager = new CopyManager(this);
+ return copyManager;
+ }
}
diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/test/copy/CopyTest.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/test/copy/CopyTest.java
*** postgresql-jdbc-8.2-505.src.orig/org/postgresql/test/copy/CopyTest.java 1970-01-01 02:00:00.000000000 +0200
--- postgresql-jdbc-8.2-505.src.copy/org/postgresql/test/copy/CopyTest.java 2007-06-11 17:58:32.000000000 +0300
***************
*** 0 ****
--- 1,152 ----
+ /**
+ *
+ */
+ package org.postgresql.test.copy;
+
+ import java.sql.Connection;
+ import java.sql.ResultSet;
+ import java.sql.SQLException;
+ import java.sql.Statement;
+
+ import junit.framework.TestCase;
+
+ import org.postgresql.PGConnection;
+ import org.postgresql.copy.CopyManager;
+ import org.postgresql.test.TestUtil;
+
+ /**
+ * @author kato@iki.fi
+ *
+ */
+ public class CopyTest extends TestCase {
+
+ private Connection con;
+ private Statement stmt;
+ private CopyManager copyAPI;
+ private CopyManager.CopyIntoDB writer;
+ private CopyManager.CopyOutOfDB reader;
+ private byte[][] data = {
+ "First Row\t1\t1.10\n".getBytes(), // 0 required to match DB output for numeric(5,2)
+ "Second Row\t2\t2.20\n".getBytes() // 0 required to match DB output for numeric(5,2)
+ };
+
+ public CopyTest(String name) {
+ super(name);
+ }
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#setUp()
+ */
+ protected void setUp() throws Exception {
+ con = TestUtil.openDB();
+ stmt = con.createStatement();
+
+ // Drop the test table if it already exists for some
+ // reason. It is not an error if it doesn't exist.
+ try {
+ stmt.executeUpdate("DROP TABLE copytest");
+ } catch (SQLException e) {
+ // Intentionally ignore. We cannot distinguish
+ // "table does not exist" from other errors, since
+ // PostgreSQL doesn't support error codes yet.
+ }
+
+ stmt.executeUpdate(
+ "CREATE TABLE copytest(stringvalue text, intvalue int, numvalue numeric(5,2))");
+
+ copyAPI = ((PGConnection)con).getCopyAPI();
+ }
+
+ /* (non-Javadoc)
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ con.setAutoCommit(true);
+ if (stmt != null) {
+ stmt.executeUpdate("DROP TABLE copytest");
+ stmt.close();
+ }
+ if (con != null) {
+ TestUtil.closeDB(con);
+ }
+ }
+
+ private int getCount() throws SQLException {
+ if(stmt==null)
+ stmt = con.createStatement();
+ ResultSet rs = stmt.executeQuery("SELECT count(*) FROM copytest");
+ rs.next();
+ int result = rs.getInt(1);
+ rs.close();
+ return result;
+ }
+
+ public void testCopyIntoDBfail() {
+ String sql = "COPY copytest FROM STDIN";
+ String at = "init";
+ int rowCount = -1;
+ try {
+ writer = copyAPI.copyIntoDB(sql);
+ at = "start";
+ writer.start();
+ at = "write";
+ writer.write( data, 0, data.length);
+ at = "fail";
+ writer.fail("Test copy failing".getBytes());
+ at = "using connection after writing copy";
+ rowCount = getCount();
+ } catch(Exception e) {
+ fail("copyIntoDB at " + at + ": " + e.toString());
+ }
+ assertEquals(0, rowCount);
+ }
+
+ public void testCopyIntoDBsucceed() {
+ String sql = "COPY copytest FROM STDIN";
+ String at = "init";
+ int rowCount = -1;
+ try {
+ writer = copyAPI.copyIntoDB(sql);
+ at = "start";
+ writer.start();
+ at = "write";
+ writer.write( data, 0, data.length);
+ at = "done";
+ writer.done();
+ at = "using connection after writing copy";
+ rowCount = getCount();
+ } catch(Exception e) {
+ fail("copyIntoDB at " + at + ": " + e.toString());
+ }
+ assertEquals(data.length, rowCount);
+ }
+
+ public void testCopyOutOfDB() {
+ String sql = "COPY copytest TO STDOUT";
+ byte[][] copydata = new byte[data.length][];
+ String at = "init";
+
+ try {
+ if(getCount()==0)
+ testCopyIntoDBsucceed(); // ensure we have some data.
+ reader = copyAPI.copyOutOfDB(sql);
+ at = "start";
+ reader.start();
+ at = "read";
+ reader.read( copydata, 0, copydata.length);
+ at = "close";
+ reader.close();
+ at = "using connection after reading copy";
+ getCount();
+ // deep comparison of data written and read
+ at = "comparison of written and read data";
+ for(int i=0; i