blob: c327aabdaaf162c84eb1196f75ab39800aa56826 [file] [log] [blame]
#ifndef _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_
#define _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_ 1
#include "TTransport.h"
#include "Thrift.h"
#include <string>
#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
using namespace boost;
using std::string;
// Data pertaining to a single event
typedef struct eventInfo {
uint8_t* payLoad_;
uint32_t eventSize_;
eventInfo():payLoad_(NULL), eventSize_(0){};
} eventInfo;
/**
* Class that stores a circular in-memory event/message buffer and writes
* elements to disk when the buffer becomes full or a flush is triggered.
*
* @author Aditya Agarwal <aditya@facebook.com>
*/
class TBufferedFileWriter : public TTransport {
public:
void setFlushMaxUs(uint32_t flushMaxUs) {
flushMaxUs_ = flushMaxUs;
}
uint32_t getFlushMaxUs() {
return flushMaxUs_;
}
void setFlushMaxBytes(uint32_t flushMaxBytes) {
flushMaxBytes_ = flushMaxBytes;
}
uint32_t getFlushMaxBytes() {
return flushMaxBytes_;
}
void setChunkSize(uint32_t chunkSize) {
chunkSize_ = chunkSize;
}
uint32_t getChunkSize() {
return chunkSize_;
}
void setMaxEventSize(uint32_t maxEventSize) {
maxEventSize_ = maxEventSize;
}
uint32_t getMaxEventSize() {
return maxEventSize_;
}
TBufferedFileWriter(string filename, uint32_t sz);
TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset);
void init(string filename, uint32_t sz, int fd, long long offset);
~TBufferedFileWriter();
void resetOutputFile(int fd, string filename, long long offset);
void enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush);
void enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush);
void write(const uint8_t* buf, uint32_t len) {
enqueueEvent(buf, len, false);
}
eventInfo dequeueEvent(long long deadline);
void flush();
// control for writer thread
static void* startWriterThread(void* ptr) {
(((TBufferedFileWriter*)ptr)->writerThread());
return 0;
}
void writerThread();
private:
// circular buffer to hold data in before it is flushed. This is an array of strings. Each
// element of the array stores a msg that needs to be written to the file
eventInfo* buffer_;
// size of string buffer
uint32_t sz_;
// size of chunks that file will be split up into
uint32_t chunkSize_;
// max number of microseconds that can pass without flushing
uint32_t flushMaxUs_;
// max number of bytes that can be written without flushing
uint32_t flushMaxBytes_;
// max event size
uint32_t maxEventSize_;
// writer thread id
pthread_t writer_;
// variables that determine position of head/tail of circular buffer
int headPos_, tailPos_;
// variables indicating whether the buffer is full or empty
bool isFull_, isEmpty_;
pthread_cond_t notFull_, notEmpty_;
bool closing_;
// To keep track of whether the buffer has been flushed
pthread_cond_t flushed_;
bool notFlushed_;
// Mutex that is grabbed when enqueueing, dequeueing and flushing
// from the circular buffer
pthread_mutex_t mutex_;
// File information
string filename_;
int fd_;
// Offset within the file
long long offset_;
void openOutputFile();
uint32_t getCurrentTime();
};
}}}
#endif // _THRIFT_TRANSPORT_TBUFFEREDFILEWRITER_H_