diff -brcN postgresql-jdbc-8.2-505.src.orig/build.xml postgresql-jdbc-8.2-505.src.copy/build.xml *** postgresql-jdbc-8.2-505.src.orig/build.xml 2006-11-29 06:00:15.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/build.xml 2007-06-11 17:58:32.000000000 +0300 *************** *** 84,89 **** --- 84,97 ---- + + + + + + + + *************** *** 116,121 **** --- 124,130 ---- + *************** *** 393,398 **** --- 402,408 ---- + *************** *** 428,433 **** --- 438,444 ---- + diff -brcN postgresql-jdbc-8.2-505.src.orig/doc/pgjdbc.xml postgresql-jdbc-8.2-505.src.copy/doc/pgjdbc.xml *** postgresql-jdbc-8.2-505.src.orig/doc/pgjdbc.xml 2007-02-19 08:04:48.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/doc/pgjdbc.xml 2007-06-11 17:58:33.000000000 +0300 *************** *** 2481,2486 **** --- 2481,2526 ---- + + Copy + + Bulk data transfer with INSERT or SELECT + can be quite slow for large amounts of data. + PostgreSQL provides a special SQL + statement COPY for this purpose. + It can be used either to exchange data between a file and a table, which requires + superuser privileges to access the file on database server, or between the client and + a table, which requires special copy subprotocol support from the database driver. + Latter is available as an extension of this JDBC driver. + + + + You should pay attention to always end your copy operation explicitly with + close (end reading), done (end writing) + or fail (cancel writing). Otherwise your database connection + will not return from copy subprotocol to normal operation. + + + + // get hold of the extension interface + import org.postgresql.copy.CopyManager; + CopyManager copier = ((org.postgresql.PGConnection)conn).getCopyAPI(); + + // write some data to a fictional (varchar,int) table + CopyManager.CopyIntoDB copywriter = copier.copyIntoDB("COPY mytable FROM STDIN"); + copywriter.start(); + copywriter.write( new byte[][]{{"row\t123\n".getBytes()}}, 0, 1 ); + copywriter.done(); + + // read data from a fictional table + CopyManager.CopyOutOfDB copyreader = copier.copyOutOfDB("COPY mytable TO STDOUT"); + byte[][] result = new byte[1][]; + copyreader.read( result, 0, 1 ); + copyreader.close(); + + + + diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/PGConnection.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/PGConnection.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/PGConnection.java 2005-04-20 03:10:58.000000000 +0300 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/PGConnection.java 2007-06-11 17:58:32.000000000 +0300 *************** *** 11,16 **** --- 11,17 ---- import java.sql.*; import org.postgresql.core.Encoding; + import org.postgresql.copy.CopyManager; import org.postgresql.fastpath.Fastpath; import org.postgresql.largeobject.LargeObjectManager; *************** *** 30,35 **** --- 31,43 ---- public PGNotification[] getNotifications() throws SQLException; /** + * This returns the COPY API for the current connection. + * @since 8.2 + * @since 7.5 + */ + public CopyManager getCopyAPI() throws SQLException; + + /** * This returns the LargeObject API for the current connection. * @since 7.3 */ diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyManager.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyManager.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyManager.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyManager.java 2007-06-11 22:40:25.000000000 +0300 *************** *** 0 **** --- 1,249 ---- + package org.postgresql.copy; + + import java.io.IOException; + import java.sql.SQLException; + import java.sql.SQLWarning; + import java.util.Vector; + + import org.postgresql.core.*; + import org.postgresql.util.*; + import org.postgresql.core.v3.QueryExecutorImpl; // COPY is only implemented for protocol v3 + + /** + * API for PostgreSQL protocol version 3 COPY block data transfer + * @author kato@iki.fi + * @since 8.2 + */ + public class CopyManager { + private final QueryExecutorImpl queryExecutor; + private final Logger logger; + + public CopyManager(BaseConnection connection) { + this.queryExecutor = (QueryExecutorImpl)connection.getQueryExecutor(); + this.logger = connection.getLogger(); + } + + /** + * Interface common to both copiers + */ + + public interface Copier { + /** + * Issue the statement to start copying + * @throws SQLException if starting fails + */ + void start() throws SQLException; + + /** + * @return current copy mode; see QueryExecutor.CopyState + */ + int getState(); + } + + /** + * Reader returned from CopyManager.copyOutOfDB() for a COPY TO STDOUT statement. + * You may discard of it once you have read all the input. + * @author kato@iki.fi + * @since 8.2 + */ + public interface CopyOutOfDB extends Copier { + + /** + * Read data rows from database statement COPY TO STDOUT. + * @param copydataRows reusable array for storing received data rows + * @param rowOffset index to start filling input array at; usually zero + * @param maxRows index to stop filling input array at; usually copydataRows.length + * @return Number of rows now in input array; copy is completed when it is less than maxRows. + * @throws IOException from low level database connection + * @throws SQLException on miscommunication with the database server + */ + int read(byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException; + + /** + * Close explicitly when you don't want to read any more data. + * @throws SQLException on miscommunication with the database server + * @throws IOException from low level database connection + */ + void close() throws IOException, SQLException; + } + + /** + * Writer returned from CopyManager.copyIntoDB() for a COPY FROM STDIN statement. + * You must call one of the finishing methods to return to normal connection. + * @author kato@iki.fi + * @since 8.2 + */ + public interface CopyIntoDB extends Copier { + + /** + * Write data for database statement COPY FROM STDIN. + * @param copydataRows reusable array of chunks of data + * @param rowOffset index at output array to start feeding from; usually zero + * @param maxRows index at output array to end feeding at; usually copydataRows.length + * @throws IOException from low level database connection + * @throws SQLException on miscommunication with the database server or running out of space + */ + void write(byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException; + + /** + * Finish feeding data and save it to the table. + * @throws SQLException on miscommunication with the database server or running out of space + * @throws IOException from low level database connection + */ + void done() throws IOException, SQLException; + + /** + * Cancel feeding data and discard changes. + * @throws SQLException on miscommunication with the database server + * @throws IOException from low level database connection + */ + void fail(byte[] logErrorMessage) throws IOException, SQLException; + + /** + * Only returns valid number after succesful completion with PostgreSQL server versions 8.2 and above. + * @return number of rows updated + */ + int getUpdateCount(); + } + + /** + * Prepares a copy from database. + * @param sql the COPY TO STDOUT command + * @return reader of copy data + * @throws SQLException if execution of the initial statement fails + */ + public CopyOutOfDB copyOutOfDB(String sql) throws SQLException { + return new CopyOutOfDBHandler(sql); + } + + /** + * Prepares a copy into database. + * @param sql the COPY FROM STDIN command. + * @return writer of copy data + * @throws SQLException if execution of the initial statement fails + */ + public CopyIntoDB copyIntoDB(String sql) throws SQLException { + return new CopyIntoDBHandler(sql); + } + + // + // Internals + // + + private abstract class CopierImpl { + String sql; + CopyResultHandler handler; + + public void start() throws SQLException { + if(queryExecutor.getCopyState() != CopyState.NONE) + throw new PSQLException(GT.tr("Tried to initiate simultaneous copy operations over single connection"), PSQLState.PROTOCOL_VIOLATION); + + // queryExecutor.execute(queryExecutor.createSimpleQuery(sql), null, handler, 0, 0, QueryExecutor.QUERY_NO_RESULTS | QueryExecutor.QUERY_ONESHOT); + try { + queryExecutor.startCopy(handler, sql); + } catch(IOException ioe) { + throw new PSQLException(GT.tr("Connection failed during copy start"), PSQLState.CONNECTION_FAILURE_DURING_TRANSACTION, ioe); + } + } + + abstract class CopyResultHandler implements ResultHandler { + + public void handleError(SQLException error) { + throw new RuntimeException(error); + } + + public void handleResultRows(Query fromQuery, Field[] fields, Vector tuples, ResultCursor cursor) { + throw new RuntimeException("Normal result rows handed to copy handler"); + } + + public void handleWarning(SQLWarning warning) { + logger.info("Copy handler received an unexpected warning", warning); + } + } + + public int getState() { + return queryExecutor.getCopyState(); + } + } + + private class CopyIntoDBHandler extends CopierImpl implements CopyIntoDB { + int updateCount = 0; + + private class WriteHandler extends CopierImpl.CopyResultHandler { + + public void handleCommandStatus(String status, int rowCount, long insertOID) { + if(getState() == CopyState.NONE && status.startsWith("COPY IN ")) { + // here we could check the format returned by server, but it's really up to the user to conform to the format he specified + } else if(getState() == CopyState.DONE && status.startsWith("COPY")) { + if(rowCount>0) // only with server 8.2+ + updateCount += rowCount; + } else { + logger.info("Unexpected command status in copywriter state " + Integer.toString(getState()) + ": " + status); + } + } + + public void handleCompletion() throws SQLException { + if(getState() != CopyState.DONE ) + logger.info("Unexpected completion of copywriting in state " + Integer.toString(getState())); + } + + public void handleError(SQLException error) { + if(getState() != CopyState.FAIL) // It is normal to get an error response as a result to CopyFail + super.handleError(error); + } + } + + protected CopyIntoDBHandler(String sql) throws SQLException { + this.sql = sql; + handler = new WriteHandler(); + } + + public void done() throws IOException, SQLException { + queryExecutor.endCopy(handler); + } + + public void fail(byte[] logErrorMessage) throws IOException, SQLException { + queryExecutor.failCopy(handler, logErrorMessage); + } + + public void write(byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException { + queryExecutor.sendCopydata(handler, copydataRows, rowOffset, maxRows); + } + + public int getUpdateCount() { + return updateCount; + } + } + + private class CopyOutOfDBHandler extends CopierImpl implements CopyOutOfDB { + + private class ReadHandler extends CopierImpl.CopyResultHandler { + public void handleCommandStatus(String status, int updateCount, long insertOID) { + if(getState() == CopyState.NONE && status.startsWith("COPY OUT ")) { + // here we could check the format returned by server, but it's really up to the user to conform to the format he specified + } else if(getState() == CopyState.DONE && status.startsWith("COPY")) { + // ok, nothing to do + } else { + logger.info("Unexpected command status in copyreader state " + Integer.toString(getState()) + ": " + status); + } + } + + public void handleCompletion() throws SQLException { // called upon CopyDone, ie. all rows received + // well... it's perfectly ok! + } + } + + protected CopyOutOfDBHandler(String sql) throws SQLException { + this.sql = sql; + handler = new ReadHandler(); + } + + public void close() throws IOException, SQLException { + queryExecutor.skipCopy(handler); + } + + public int read(byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException { + return queryExecutor.receiveCopydata(handler, copydataRows, rowOffset, maxRows); + } + } + } diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyState.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyState.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/copy/CopyState.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/copy/CopyState.java 2007-06-11 17:58:32.000000000 +0300 *************** *** 0 **** --- 1,13 ---- + package org.postgresql.copy; + + /** + * Actually just an ENUM: state of the copy machine + */ + public class CopyState { + public static final int NONE = 0; + public static final int INTODB = 1; + public static final int OUTOFDB = 2; + public static final int DONE = 3; + public static final int FAIL = 4; + } + diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/core/v3/QueryExecutorImpl.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/core/v3/QueryExecutorImpl.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/core/v3/QueryExecutorImpl.java 2006-12-01 10:53:45.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/core/v3/QueryExecutorImpl.java 2007-06-11 17:58:32.000000000 +0300 *************** *** 27,32 **** --- 27,33 ---- import org.postgresql.util.PSQLState; import org.postgresql.util.ServerErrorMessage; import org.postgresql.util.GT; + import org.postgresql.copy.CopyState; /** * QueryExecutor implementation for the V3 protocol. *************** *** 630,635 **** --- 631,892 ---- return returnValue; } + // + // Copy subprotocol implementation + // + + private static int currentCopyState = CopyState.NONE; + + public int getCopyState() { + return currentCopyState; + } + + /** + * startCopy starts a raw query (presumably COPY ...) + * @param handler associated result handler + * @param sql the statement to send + * @throws IOException passed from lower level + * @throws SQLException on communications mismatch with server + */ + public void startCopy(ResultHandler handler, String sql) throws IOException, SQLException { + byte message[] = sql.getBytes(); + int messageSize = 4 + message.length + 1; + pgStream.SendChar('Q'); + pgStream.SendInteger4(messageSize); + pgStream.Send(message); + pgStream.SendChar(0); + pgStream.flush(); + processCopyResults(handler, null, -1, 0); + } + + /** + * receiveCopydata is called to read copy data from server + * @param handler the ResultHandler overseeing this operation, notably handleCompletion() + * @param copydataRows buffer for rows to read + * @param rowOffset index to start inserting data at in buffer (0 = beginning) + * @param maxRows return after reading up to this index in buffer (copydataRows.length = fill it up) + * @return int rowOffset; if equal to maxRows, there is more data to read; otherwise end of copy or error. + * @throws IOException passed from low-level connection + * @throws SQLException passed from handler.handleCompletion() + */ + public int receiveCopydata(ResultHandler handler, byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException { + if(currentCopyState != CopyState.OUTOFDB) + throw new PSQLException(GT.tr("Attempted to receive copydata while not in COPY TO STDIN"), PSQLState.PROTOCOL_VIOLATION); + return processCopyResults(handler, copydataRows, rowOffset, maxRows); + } + + /** + * sendCopydata sends raw copydata to backend + * @param handler ResultHandler for state changes (currently unused) + * @param copydataRows array of copydata rows as bytearrays to send + * @param rowOffset how many rows to skip from beginning of copydataRows (0 to send all from beginning) + * @param maxRows index of first row not to send (copydataRows.length to send all up to end) + * @throws IOException upon connection failure + */ + public void sendCopydata(ResultHandler handler, byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException { + if(currentCopyState != CopyState.INTODB) + throw new PSQLException(GT.tr("Attempted to send copydata while not in COPY FROM STDOUT"), PSQLState.PROTOCOL_VIOLATION); + while(rowOffset CopyDone"); + + pgStream.SendChar('c'); // CopyDone + pgStream.SendInteger4(4); // No message in specification + pgStream.flush(); + + currentCopyState = CopyState.DONE; + processCopyResults(handler, null, 0, 0); + } + + /** + * failCopy is used to cancel copydata mode discarding sent data. + * @param handler the ResultHandler especially handling the ErrorMessage as expected correct response + * @param logErrorMessage optional message for backend to log + * @throws IOException passed from low-level connection + * @throws SQLException passed from handler + */ + public void failCopy(ResultHandler handler, byte[] logErrorMessage) throws IOException, SQLException { + // internal state not checked to ensure that a copy can always be cancelled. + pgStream.SendChar('f'); + pgStream.SendInteger4(4+logErrorMessage.length+1); + if(logErrorMessage.length > 0) + pgStream.Send(logErrorMessage); + pgStream.SendChar('\0'); + pgStream.flush(); + + currentCopyState = CopyState.FAIL; + + processCopyResults(handler, null, 0, 0); + } + + + /** + * Skips rest of copydata from server + * @param handler the ResultHandler overseeing this operation + * @throws IOException passed from low-level connection + * @throws SQLException upon internal or communications inconsistency + */ + public void skipCopy(ResultHandler handler) throws IOException, SQLException { + if(currentCopyState == CopyState.OUTOFDB) { + currentCopyState = CopyState.FAIL; + processCopyResults(handler, null, 0, 0); + } + } + + /** + * processCopyResults handles copy subprotocol messages from server. + * @param handler the ResultHandler overseeing this operation, especially errors or completion. + * @param copydataRows array of bytearrays of data read from server, null when not expecting input + * @param rowOffset offset in copydataRows to start inserting data + * @param maxRows end processing when rowOffset reaches this value (presumably copydataRows.length) + * @throws IOException from low-level stream + * @throws SQLException from handler + * @return int rowOffset current number of rows in copydataRows + */ + private int processCopyResults(ResultHandler handler, byte[][] copydataRows, int rowOffset, int maxRows) throws IOException, SQLException { + // do not check internal state to ensure we can always resolve from copy mode. + + if(copydataRows!=null && maxRows<=rowOffset) // otherwise this state would drop a row + throw new PSQLException(GT.tr("Requested less than one row of copydata"), PSQLState.INVALID_PARAMETER_VALUE); + + boolean endReceiving = false; + int l_len; + while(!endReceiving) { + int c = pgStream.ReceiveChar(); + switch(c) { + case 'A': // Asynchronous Notify + receiveAsyncNotify(); + break; + + case 'C': // Command Complete + String status = receiveCommandStatus(); + + if(currentCopyState != CopyState.DONE) + throw new PSQLException(GT.tr("Unexpected command completion while copying"), PSQLState.PROTOCOL_VIOLATION); + + int count = status.startsWith("COPY ") ? Integer.parseInt(status.substring(1 + status.lastIndexOf(' '))) : -1; + handler.handleCommandStatus( status, count, 0 ); + break; + + case 'N': // Notice Response + handler.handleWarning(receiveNoticeResponse()); + break; + + case 'E': // ErrorMessage (expected response to CopyFail) + handler.handleError(receiveErrorResponse()); + break; + + case 'G': // CopyInResponse + currentCopyState = CopyState.INTODB; + receiveCopyHeader(handler); // discarded as unnecessary: application is responsible of stream structure + handler.handleCommandStatus( "COPY IN", -1, 0 ); + endReceiving = true; + break; + + case 'H': // CopyOutResponse + currentCopyState = CopyState.OUTOFDB; + receiveCopyHeader(handler); // discarded as unnecessary: application is responsible of stream structure + handler.handleCommandStatus( "COPY OUT", -1, 0 ); + endReceiving = true; + break; + + case 'd': // CopyData + if (logger.logDebug()) + logger.debug(" <=BE CopyData"); + + l_len = pgStream.ReceiveIntegerR(4) - 4; + byte[] buf = new byte[l_len]; + pgStream.Receive(buf, 0, l_len); + if(copydataRows != null) { + copydataRows[rowOffset++] = buf; + endReceiving = rowOffset >= maxRows; + } else if(currentCopyState == CopyState.FAIL) { + // discard (after fail or skip) + } else { + throw new PSQLException(GT.tr("Unexpected copydata package"), PSQLState.PROTOCOL_VIOLATION); + } + break; + + case 'c': // CopyDone (expected after all copydata received) + currentCopyState = CopyState.DONE; // copy done, waiting for completion message + l_len = pgStream.ReceiveIntegerR(4) - 4; + if(l_len > 0) + pgStream.Receive(l_len); + if (logger.logDebug()) + logger.debug(" <=BE CopyDone"); + + handler.handleCompletion(); + break; + + case 'Z': // ReadyForQuery: After FE:CopyDone => BE:CommandComplete + receiveRFQ(); + + if(currentCopyState != CopyState.DONE && currentCopyState != CopyState.FAIL) + throw new PSQLException(GT.tr("Server unexpectedly reset copying"), PSQLState.PROTOCOL_VIOLATION); + + currentCopyState = CopyState.NONE; // copy completed + endReceiving = true; + break; + + default: + currentCopyState = CopyState.NONE; // presumably we've returned from the copy subprotocol state. + throw new IOException("Unexpected packet type: " + c); + } + } + + return rowOffset; + } + + /** + * Used upon Copy in/out start signals from server to resolve copy format header + * @param handler the ResultHandler handling current query + * @return int[] with overall format followed by format of each field + * @throws IOException if reading from stream fails + */ + private int[] receiveCopyHeader(ResultHandler handler) throws IOException { + int l_len = pgStream.ReceiveIntegerR(4) - (4+1+2); + + if(l_len<0) // assert sanity + handler.handleError(new PSQLException(GT.tr("Too short copy header"), PSQLState.PROTOCOL_VIOLATION)); + + int copyFormat = pgStream.ReceiveIntegerR(1); + int numColumns = pgStream.ReceiveIntegerR(2); + + if(l_len!=numColumns*2) // assert consistency + handler.handleError(new PSQLException(GT.tr("Inconsistent copy header length: " + Integer.toString(l_len) + " != " + Integer.toString(numColumns)), PSQLState.PROTOCOL_VIOLATION)); + + PSQLException err = null; // avoid repeated calls + + int[] result = new int[numColumns+1]; + result[0] = copyFormat; + for(int i=1; i<=numColumns; i++) + result[i] = pgStream.ReceiveIntegerR(2); + + return result; + } + /* * Send a query to the backend. */ *************** *** 1419,1425 **** case 'd': // CopyData { // COPY FROM STDIN / COPY TO STDOUT, neither of which are currently ! // supported. // CopyInResponse can only occur in response to an Execute we sent. // Every Execute we send is followed by either a Bind or a ClosePortal, --- 1676,1682 ---- case 'd': // CopyData { // COPY FROM STDIN / COPY TO STDOUT, neither of which are currently ! // supported by normal result processing. // CopyInResponse can only occur in response to an Execute we sent. // Every Execute we send is followed by either a Bind or a ClosePortal, *************** *** 1430,1436 **** /* discard */ pgStream.Receive(l_len); ! handler.handleError(new PSQLException(GT.tr("The driver currently does not support COPY operations."), PSQLState.NOT_IMPLEMENTED)); } break; --- 1687,1693 ---- /* discard */ pgStream.Receive(l_len); ! handler.handleError(new PSQLException(GT.tr("Use Copy extension for COPY operations."), PSQLState.NOT_IMPLEMENTED)); } break; *************** *** 1491,1496 **** --- 1748,1754 ---- handler.handleCompletion(); } + /* * Receive the field descriptions from the back end. */ diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/jdbc2/AbstractJdbc2Connection.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/jdbc2/AbstractJdbc2Connection.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/jdbc2/AbstractJdbc2Connection.java 2007-04-16 21:31:44.000000000 +0300 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/jdbc2/AbstractJdbc2Connection.java 2007-06-11 17:58:32.000000000 +0300 *************** *** 23,28 **** --- 23,29 ---- import org.postgresql.util.PGobject; import org.postgresql.util.PSQLException; import org.postgresql.util.GT; + import org.postgresql.copy.CopyManager; /** * This class defines methods of the jdbc2 specification. *************** *** 1090,1093 **** --- 1091,1102 ---- { return bindStringAsVarchar; } + + private CopyManager copyManager = null; + public CopyManager getCopyAPI() throws SQLException + { + if (copyManager == null) + copyManager = new CopyManager(this); + return copyManager; + } } diff -brcN postgresql-jdbc-8.2-505.src.orig/org/postgresql/test/copy/CopyTest.java postgresql-jdbc-8.2-505.src.copy/org/postgresql/test/copy/CopyTest.java *** postgresql-jdbc-8.2-505.src.orig/org/postgresql/test/copy/CopyTest.java 1970-01-01 02:00:00.000000000 +0200 --- postgresql-jdbc-8.2-505.src.copy/org/postgresql/test/copy/CopyTest.java 2007-06-11 17:58:32.000000000 +0300 *************** *** 0 **** --- 1,152 ---- + /** + * + */ + package org.postgresql.test.copy; + + import java.sql.Connection; + import java.sql.ResultSet; + import java.sql.SQLException; + import java.sql.Statement; + + import junit.framework.TestCase; + + import org.postgresql.PGConnection; + import org.postgresql.copy.CopyManager; + import org.postgresql.test.TestUtil; + + /** + * @author kato@iki.fi + * + */ + public class CopyTest extends TestCase { + + private Connection con; + private Statement stmt; + private CopyManager copyAPI; + private CopyManager.CopyIntoDB writer; + private CopyManager.CopyOutOfDB reader; + private byte[][] data = { + "First Row\t1\t1.10\n".getBytes(), // 0 required to match DB output for numeric(5,2) + "Second Row\t2\t2.20\n".getBytes() // 0 required to match DB output for numeric(5,2) + }; + + public CopyTest(String name) { + super(name); + } + + /* (non-Javadoc) + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + con = TestUtil.openDB(); + stmt = con.createStatement(); + + // Drop the test table if it already exists for some + // reason. It is not an error if it doesn't exist. + try { + stmt.executeUpdate("DROP TABLE copytest"); + } catch (SQLException e) { + // Intentionally ignore. We cannot distinguish + // "table does not exist" from other errors, since + // PostgreSQL doesn't support error codes yet. + } + + stmt.executeUpdate( + "CREATE TABLE copytest(stringvalue text, intvalue int, numvalue numeric(5,2))"); + + copyAPI = ((PGConnection)con).getCopyAPI(); + } + + /* (non-Javadoc) + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + con.setAutoCommit(true); + if (stmt != null) { + stmt.executeUpdate("DROP TABLE copytest"); + stmt.close(); + } + if (con != null) { + TestUtil.closeDB(con); + } + } + + private int getCount() throws SQLException { + if(stmt==null) + stmt = con.createStatement(); + ResultSet rs = stmt.executeQuery("SELECT count(*) FROM copytest"); + rs.next(); + int result = rs.getInt(1); + rs.close(); + return result; + } + + public void testCopyIntoDBfail() { + String sql = "COPY copytest FROM STDIN"; + String at = "init"; + int rowCount = -1; + try { + writer = copyAPI.copyIntoDB(sql); + at = "start"; + writer.start(); + at = "write"; + writer.write( data, 0, data.length); + at = "fail"; + writer.fail("Test copy failing".getBytes()); + at = "using connection after writing copy"; + rowCount = getCount(); + } catch(Exception e) { + fail("copyIntoDB at " + at + ": " + e.toString()); + } + assertEquals(0, rowCount); + } + + public void testCopyIntoDBsucceed() { + String sql = "COPY copytest FROM STDIN"; + String at = "init"; + int rowCount = -1; + try { + writer = copyAPI.copyIntoDB(sql); + at = "start"; + writer.start(); + at = "write"; + writer.write( data, 0, data.length); + at = "done"; + writer.done(); + at = "using connection after writing copy"; + rowCount = getCount(); + } catch(Exception e) { + fail("copyIntoDB at " + at + ": " + e.toString()); + } + assertEquals(data.length, rowCount); + } + + public void testCopyOutOfDB() { + String sql = "COPY copytest TO STDOUT"; + byte[][] copydata = new byte[data.length][]; + String at = "init"; + + try { + if(getCount()==0) + testCopyIntoDBsucceed(); // ensure we have some data. + reader = copyAPI.copyOutOfDB(sql); + at = "start"; + reader.start(); + at = "read"; + reader.read( copydata, 0, copydata.length); + at = "close"; + reader.close(); + at = "using connection after reading copy"; + getCount(); + // deep comparison of data written and read + at = "comparison of written and read data"; + for(int i=0; i