THRIFT-377. java: TFileTransport port in Java
This patch adds TFileTransport to the java library. This transport is not a general-purpose file transport; instead, it is more of a way to execute one-way RPC via an offline file process.
Patch: Joydeep Sen Sarma
git-svn-id: 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/transport/ b/lib/java/src/org/apache/thrift/transport/
new file mode 100644
index 0000000..7a58eea
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/
@@ -0,0 +1,130 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+ * FileProcessor: helps in processing files generated by TFileTransport.
+ * Port of original cpp implementation
+ *
+ * @author Joydeep Sen Sarma <>
+ */
+public class TFileProcessor {
+ private TProcessor processor_;
+ private TProtocolFactory inputProtocolFactory_;
+ private TProtocolFactory outputProtocolFactory_;
+ private TFileTransport inputTransport_;
+ private TTransport outputTransport_;
+ public TFileProcessor(TProcessor processor, TProtocolFactory protocolFactory,
+ TFileTransport inputTransport,
+ TTransport outputTransport) {
+ processor_ = processor;
+ inputProtocolFactory_ = outputProtocolFactory_ = protocolFactory;
+ inputTransport_ = inputTransport;
+ outputTransport_ = outputTransport;
+ }
+ public TFileProcessor(TProcessor processor,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
+ TFileTransport inputTransport,
+ TTransport outputTransport) {
+ processor_ = processor;
+ inputProtocolFactory_ = inputProtocolFactory;
+ outputProtocolFactory_ = outputProtocolFactory;
+ inputTransport_ = inputTransport;
+ outputTransport_ = outputTransport;
+ }
+ private void processUntil(int lastChunk) throws TException {
+ TProtocol ip = inputProtocolFactory_.getProtocol(inputTransport_);
+ TProtocol op = outputProtocolFactory_.getProtocol(outputTransport_);
+ int curChunk = inputTransport_.getCurChunk();
+ try {
+ while (lastChunk >= curChunk) {
+ processor_.process(ip, op);
+ int newChunk = inputTransport_.getCurChunk();
+ curChunk = newChunk;
+ }
+ } catch (TTransportException e) {
+ // if we are processing the last chunk - we could have just hit EOF
+ // on EOF - trap the error and stop processing.
+ if(e.getType() != TTransportException.END_OF_FILE)
+ throw e;
+ else {
+ return;
+ }
+ }
+ }
+ /**
+ * Process from start to last chunk both inclusive where chunks begin from 0
+ * @param startChunkNum first chunk to be processed
+ * @param lastChunkNum last chunk to be processed
+ */
+ public void processChunk(int startChunkNum, int endChunkNum) throws TException {
+ int numChunks = inputTransport_.getNumChunks();
+ if(endChunkNum < 0)
+ endChunkNum += numChunks;
+ if(startChunkNum < 0)
+ startChunkNum += numChunks;
+ if(endChunkNum < startChunkNum)
+ throw new TException("endChunkNum " + endChunkNum + " is less than " + startChunkNum);
+ inputTransport_.seekToChunk(startChunkNum);
+ processUntil(endChunkNum);
+ }
+ /**
+ * Process a single chunk
+ *
+ * @param chunkNum chunk to be processed
+ */
+ public void processChunk(int chunkNum) throws TException {
+ processChunk(chunkNum, chunkNum);
+ }
+ /**
+ * Process a current chunk
+ */
+ public void processChunk() throws TException {
+ processChunk(inputTransport_.getCurChunk());
+ }
diff --git a/lib/java/src/org/apache/thrift/transport/ b/lib/java/src/org/apache/thrift/transport/
new file mode 100644
index 0000000..630fb99
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/
@@ -0,0 +1,628 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+import java.util.Random;
+ * FileTransport implementation of the TTransport interface.
+ * Currently this is a straightforward port of the cpp implementation
+ *
+ * It may make better sense to provide a basic stream access on top of the framed file format
+ * The FileTransport can then be a user of this framed file format with some additional logic
+ * for chunking.
+ *
+ * @author Joydeep Sen Sarma <>
+ */
+public class TFileTransport extends TTransport {
+ public static class truncableBufferedInputStream extends BufferedInputStream {
+ public void trunc() {
+ pos = count = 0;
+ }
+ public truncableBufferedInputStream(InputStream in) {
+ super(in);
+ }
+ public truncableBufferedInputStream(InputStream in, int size) {
+ super(in, size);
+ }
+ }
+ public static class Event {
+ private byte[] buf_;
+ private int nread_;
+ private int navailable_;
+ /**
+ * Initialize an event. Initially, it has no valid contents
+ *
+ * @param buf byte array buffer to store event
+ */
+ public Event(byte[] buf) {
+ buf_ = buf;
+ nread_ = navailable_ = 0;
+ }
+ public byte[] getBuf() { return buf_;}
+ public int getSize() { return buf_.length; }
+ public void setAvailable(int sz) { nread_ = 0; navailable_=sz;}
+ public int getRemaining() { return (navailable_ - nread_); }
+ public int emit(byte[] buf, int offset, int ndesired) {
+ if((ndesired == 0) || (ndesired > getRemaining()))
+ ndesired = getRemaining();
+ if(ndesired <= 0)
+ return (ndesired);
+ System.arraycopy(buf_, nread_, buf, offset, ndesired);
+ nread_ += ndesired;
+ return(ndesired);
+ }
+ };
+ public static class chunkState {
+ /**
+ * Chunk Size. Must be same across all implementations
+ */
+ public static final int DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
+ private int chunk_size_ = DEFAULT_CHUNK_SIZE;
+ private long offset_ = 0;
+ public chunkState() {}
+ public chunkState(int chunk_size) { chunk_size_ = chunk_size; }
+ public void skip(int size) {offset_ += size; }
+ public void seek(long offset) {offset_ = offset;}
+ public int getChunkSize() { return chunk_size_;}
+ public int getChunkNum() { return ((int)(offset_/chunk_size_));}
+ public int getRemaining() { return (chunk_size_ - ((int)(offset_ % chunk_size_)));}
+ public long getOffset() { return (offset_);}
+ }
+ public static enum tailPolicy {
+ NOWAIT(0, 0),
+ WAIT_FOREVER(500, -1);
+ /**
+ * Time in milliseconds to sleep before next read
+ * If 0, no sleep
+ */
+ public final int timeout_;
+ /**
+ * Number of retries before giving up
+ * if 0, no retries
+ * if -1, retry forever
+ */
+ public final int retries_;
+ /**
+ * ctor for policy
+ *
+ * @param timeout sleep time for this particular policy
+ * @param retries number of retries
+ */
+ tailPolicy(int timeout, int retries) {
+ timeout_ = timeout;
+ retries_ = retries;
+ }
+ }
+ /**
+ * Current tailing policy
+ */
+ tailPolicy currentPolicy_ = tailPolicy.NOWAIT;
+ /**
+ * Underlying file being read
+ */
+ protected TSeekableFile inputFile_ = null;
+ /**
+ * Underlying outputStream
+ */
+ protected OutputStream outputStream_ = null;
+ /**
+ * Event currently read in
+ */
+ Event currentEvent_ = null;
+ /**
+ * InputStream currently being used for reading
+ */
+ InputStream inputStream_ = null;
+ /**
+ * current Chunk state
+ */
+ chunkState cs = null;
+ /**
+ * Read timeout
+ */
+ private int readTimeout_ = 0;
+ /**
+ * is read only?
+ */
+ private boolean readOnly_ = false;
+ /**
+ * Get File Tailing Policy
+ *
+ * @return current read policy
+ */
+ public tailPolicy getTailPolicy() {
+ return (currentPolicy_);
+ }
+ /**
+ * Set file Tailing Policy
+ *
+ * @param policy New policy to set
+ * @return Old policy
+ */
+ public tailPolicy setTailPolicy(tailPolicy policy) {
+ tailPolicy old = currentPolicy_;
+ currentPolicy_ = policy;
+ return (old);
+ }
+ /**
+ * Initialize read input stream
+ *
+ * @return input stream to read from file
+ */
+ private InputStream createInputStream() throws TTransportException {
+ InputStream is;
+ try {
+ if(inputStream_ != null) {
+ ((truncableBufferedInputStream)inputStream_).trunc();
+ is = inputStream_;
+ } else {
+ is = new truncableBufferedInputStream(inputFile_.getInputStream());
+ }
+ } catch (IOException iox) {
+ System.err.println("createInputStream: "+iox.getMessage());
+ throw new TTransportException(iox.getMessage(), iox);
+ }
+ return(is);
+ }
+ /**
+ * Read (potentially tailing) an input stream
+ *
+ * @param is InputStream to read from
+ * @param buf Buffer to read into
+ * @param off Offset in buffer to read into
+ * @param len Number of bytes to read
+ * @param tp policy to use if we hit EOF
+ *
+ * @return number of bytes read
+ */
+ private int tailRead(InputStream is, byte[] buf,
+ int off, int len, tailPolicy tp) throws TTransportException {
+ int orig_len = len;
+ try {
+ int retries = 0;
+ while(len > 0) {
+ int cnt =, off, len);
+ if(cnt > 0) {
+ off += cnt;
+ len -= cnt;
+ retries = 0;
+ cs.skip(cnt); // remember that we read so many bytes
+ } else if (cnt == -1) {
+ // EOF
+ retries++;
+ if((tp.retries_ != -1) && tp.retries_ < retries)
+ return (orig_len - len);
+ if(tp.timeout_ > 0) {
+ try {Thread.sleep(tp.timeout_);} catch(InterruptedException e) {}
+ }
+ } else {
+ // either non-zero or -1 is what the contract says!
+ throw new
+ TTransportException("Unexpected return from = "
+ + cnt);
+ }
+ }
+ } catch (IOException iox) {
+ throw new TTransportException(iox.getMessage(), iox);
+ }
+ return(orig_len - len);
+ }
+ /**
+ * Event is corrupted. Do recovery
+ *
+ * @return true if recovery could be performed and we can read more data
+ * false is returned only when nothing more can be read
+ */
+ private boolean performRecovery() throws TTransportException {
+ int numChunks = getNumChunks();
+ int curChunk = cs.getChunkNum();
+ if(curChunk >= (numChunks-1)) {
+ return false;
+ }
+ seekToChunk(curChunk+1);
+ return true;
+ }
+ /**
+ * Read event from underlying file
+ *
+ * @return true if event could be read, false otherwise (on EOF)
+ */
+ private boolean readEvent() throws TTransportException {
+ byte[] ebytes = new byte[4];
+ int esize;
+ int nread;
+ int nrequested;
+ retry:
+ do {
+ // corner case. read to end of chunk
+ nrequested = cs.getRemaining();
+ if(nrequested < 4) {
+ nread = tailRead(inputStream_, ebytes, 0, nrequested, currentPolicy_);
+ if(nread != nrequested) {
+ return(false);
+ }
+ }
+ // assuming serialized on little endian machine
+ nread = tailRead(inputStream_, ebytes, 0, 4, currentPolicy_);
+ if(nread != 4) {
+ return(false);
+ }
+ esize=0;
+ for(int i=3; i>=0; i--) {
+ int val = (0x000000ff & (int)ebytes[i]);
+ esize |= (val << (i*8));
+ }
+ // check if event is corrupted and do recovery as required
+ if(esize > cs.getRemaining()) {
+ throw new TTransportException("FileTransport error: bad event size");
+ /*
+ if(performRecovery()) {
+ esize=0;
+ } else {
+ return false;
+ }
+ */
+ }
+ } while (esize == 0);
+ // reset existing event or get a larger one
+ if(currentEvent_.getSize() < esize)
+ currentEvent_ = new Event(new byte [esize]);
+ // populate the event
+ byte[] buf = currentEvent_.getBuf();
+ nread = tailRead(inputStream_, buf, 0, esize, currentPolicy_);
+ if(nread != esize) {
+ return(false);
+ }
+ currentEvent_.setAvailable(esize);
+ return(true);
+ }
+ /**
+ * open if both input/output open unless readonly
+ *
+ * @return true
+ */
+ public boolean isOpen() {
+ return ((inputStream_ != null) && (readOnly_ || (outputStream_ != null)));
+ }
+ /**
+ * Diverging from the cpp model and sticking to the TSocket model
+ * Files are not opened in ctor - but in explicit open call
+ */
+ public void open() throws TTransportException {
+ if (isOpen())
+ throw new TTransportException(TTransportException.ALREADY_OPEN);
+ try {
+ inputStream_ = createInputStream();
+ cs = new chunkState();
+ currentEvent_ = new Event(new byte [256]);
+ if(!readOnly_)
+ outputStream_ = new BufferedOutputStream(inputFile_.getOutputStream(), 8192);
+ } catch (IOException iox) {
+ throw new TTransportException(TTransportException.NOT_OPEN, iox);
+ }
+ }
+ /**
+ * Closes the transport.
+ */
+ public void close() {
+ if (inputFile_ != null) {
+ try {
+ inputFile_.close();
+ } catch (IOException iox) {
+ System.err.println("WARNING: Error closing input file: " +
+ iox.getMessage());
+ }
+ inputFile_ = null;
+ }
+ if (outputStream_ != null) {
+ try {
+ outputStream_.close();
+ } catch (IOException iox) {
+ System.err.println("WARNING: Error closing output stream: " +
+ iox.getMessage());
+ }
+ outputStream_ = null;
+ }
+ }
+ /**
+ * File Transport ctor
+ *
+ * @param path File path to read and write from
+ * @param readOnly Whether this is a read-only transport
+ */
+ public TFileTransport(final String path, boolean readOnly) throws IOException {
+ inputFile_ = new TStandardFile(path);
+ readOnly_ = readOnly;
+ }
+ /**
+ * File Transport ctor
+ *
+ * @param inputFile_ open TSeekableFile to read/write from
+ * @param readOnly Whether this is a read-only transport
+ */
+ public TFileTransport(TSeekableFile inputFile, boolean readOnly) {
+ inputFile_ = inputFile;
+ readOnly_ = readOnly;
+ }
+ /**
+ * Cloned from Only difference is throwing an EOF exception
+ * where one is detected.
+ */
+ public int readAll(byte[] buf, int off, int len)
+ throws TTransportException {
+ int got = 0;
+ int ret = 0;
+ while (got < len) {
+ ret = read(buf, off+got, len-got);
+ if (ret < 0) {
+ throw new TTransportException("Error in reading from file");
+ }
+ if(ret == 0) {
+ throw new TTransportException(TTransportException.END_OF_FILE,
+ "End of File reached");
+ }
+ got += ret;
+ }
+ return got;
+ }
+ /**
+ * Reads up to len bytes into buffer buf, starting att offset off.
+ *
+ * @param buf Array to read into
+ * @param off Index to start reading at
+ * @param len Maximum number of bytes to read
+ * @return The number of bytes actually read
+ * @throws TTransportException if there was an error reading data
+ */
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ if(!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN,
+ "Must open before reading");
+ if(currentEvent_.getRemaining() == 0) {
+ if(!readEvent())
+ return(0);
+ }
+ int nread = currentEvent_.emit(buf, off, len);
+ return nread;
+ }
+ public int getNumChunks() throws TTransportException {
+ if(!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN,
+ "Must open before getNumChunks");
+ try {
+ long len = inputFile_.length();
+ if(len == 0)
+ return 0;
+ else
+ return (((int)(len/cs.getChunkSize())) + 1);
+ } catch (IOException iox) {
+ throw new TTransportException(iox.getMessage(), iox);
+ }
+ }
+ public int getCurChunk() throws TTransportException {
+ if(!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN,
+ "Must open before getCurChunk");
+ return (cs.getChunkNum());
+ }
+ public void seekToChunk(int chunk) throws TTransportException {
+ if(!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN,
+ "Must open before seeking");
+ int numChunks = getNumChunks();
+ // file is empty, seeking to chunk is pointless
+ if (numChunks == 0) {
+ return;
+ }
+ // negative indicates reverse seek (from the end)
+ if (chunk < 0) {
+ chunk += numChunks;
+ }
+ // too large a value for reverse seek, just seek to beginnin
+ if (chunk < 0) {
+ chunk = 0;
+ }
+ long eofOffset=0;
+ boolean seekToEnd = (chunk >= numChunks);
+ if(seekToEnd) {
+ chunk = chunk - 1;
+ try { eofOffset = inputFile_.length(); }
+ catch (IOException iox) {throw new TTransportException(iox.getMessage(),
+ iox);}
+ }
+ if(chunk*cs.getChunkSize() != cs.getOffset()) {
+ try {*cs.getChunkSize()); }
+ catch (IOException iox) {
+ System.err.println("createInputStream: "+iox.getMessage());
+ throw new TTransportException("Seek to chunk " +
+ chunk + " " +iox.getMessage(), iox);
+ }
+ currentEvent_.setAvailable(0);
+ inputStream_ = createInputStream();
+ }
+ if(seekToEnd) {
+ // waiting forever here - otherwise we can hit EOF and end up
+ // having consumed partial data from the data stream.
+ tailPolicy old = setTailPolicy(tailPolicy.WAIT_FOREVER);
+ while(cs.getOffset() < eofOffset) { readEvent(); }
+ currentEvent_.setAvailable(0);
+ setTailPolicy(old);
+ }
+ }
+ public void seekToEnd() throws TTransportException {
+ if(!isOpen())
+ throw new TTransportException(TTransportException.NOT_OPEN,
+ "Must open before seeking");
+ seekToChunk(getNumChunks());
+ }
+ /**
+ * Writes up to len bytes from the buffer.
+ *
+ * @param buf The output data buffer
+ * @param off The offset to start writing from
+ * @param len The number of bytes to write
+ * @throws TTransportException if there was an error writing data
+ */
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ throw new TTransportException("Not Supported");
+ }
+ /**
+ * Flush any pending data out of a transport buffer.
+ *
+ * @throws TTransportException if there was an error writing out data.
+ */
+ public void flush() throws TTransportException {
+ throw new TTransportException("Not Supported");
+ }
+ /**
+ * test program
+ *
+ */
+ public static void main(String[] args) throws Exception {
+ int num_chunks = 10;
+ if((args.length < 1) || args[0].equals("--help")
+ || args[0].equals("-h") || args[0].equals("-?")) {
+ printUsage();
+ }
+ if(args.length > 1) {
+ try {
+ num_chunks = Integer.parseInt(args[1]);
+ } catch (Exception e) {
+ System.err.println("Cannot parse " + args[1]);
+ printUsage();
+ }
+ }
+ TFileTransport t = new TFileTransport(args[0], true);
+ System.out.println("NumChunks="+t.getNumChunks());
+ Random r = new Random();
+ for(int j=0; j<num_chunks; j++) {
+ byte[] buf = new byte[4096];
+ int cnum = r.nextInt(t.getNumChunks()-1);
+ System.out.println("Reading chunk "+cnum);
+ t.seekToChunk(cnum);
+ for(int i=0; i<4096; i++) {
+, 0, 4096);
+ }
+ }
+ }
+ private static void printUsage() {
+ System.err.println("Usage: TFileTransport <filename> [num_chunks]");
+ System.err.println(" (Opens and reads num_chunks chunks from file randomly)");
+ System.exit(1);
+ }
diff --git a/lib/java/src/org/apache/thrift/transport/ b/lib/java/src/org/apache/thrift/transport/
new file mode 100644
index 0000000..e02d36f
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/
@@ -0,0 +1,33 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+public interface TSeekableFile {
+ public InputStream getInputStream() throws IOException;
+ public OutputStream getOutputStream() throws IOException;
+ public void close() throws IOException;
+ public long length() throws IOException;
+ public void seek(long pos) throws IOException;
diff --git a/lib/java/src/org/apache/thrift/transport/ b/lib/java/src/org/apache/thrift/transport/
new file mode 100644
index 0000000..7a33af8
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/
@@ -0,0 +1,60 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+public class TStandardFile implements TSeekableFile {
+ protected String path_ = null;
+ protected RandomAccessFile inputFile_ = null;
+ public TStandardFile(String path) throws IOException {
+ path_ = path;
+ inputFile_ = new RandomAccessFile(path_, "r");
+ }
+ public InputStream getInputStream() throws IOException {
+ return new FileInputStream(inputFile_.getFD());
+ }
+ public OutputStream getOutputStream() throws IOException {
+ return new FileOutputStream(path_);
+ }
+ public void close() throws IOException {
+ if(inputFile_ != null) {
+ inputFile_.close();
+ }
+ }
+ public long length() throws IOException {
+ return inputFile_.length();
+ }
+ public void seek(long pos) throws IOException {
+ }