| #include "TBufferedFileWriter.h" |
| |
| #include <pthread.h> |
| #include <cassert> |
| #include <cstdlib> |
| #include <string> |
| #include <sys/time.h> |
| #include <sys/types.h> |
| #include <fcntl.h> |
| #include <errno.h> |
| |
| using std::string; |
| |
| namespace facebook { namespace thrift { namespace transport { |
| |
| TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz) { |
| init(filename, sz, 0, 0); |
| } |
| |
| TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset) { |
| init(filename, sz, fd, offset); |
| } |
| |
| void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long offset) { |
| // validate buffer size |
| sz_ = sz; |
| if (sz_ <= 0) { |
| throw TTransportException("invalid input buffer size"); |
| } |
| |
| // set file-related variables |
| fd_ = 0; |
| resetOutputFile(fd, filename, offset); |
| |
| // set default values of flush related params |
| flushMaxBytes_ = 1024 * 100; |
| flushMaxUs_ = 20 * 1000; |
| |
| // allocate event buffer |
| buffer_ = new eventInfo[sz_]; |
| |
| // buffer is initially empty |
| isEmpty_ = true; |
| isFull_ = false; |
| |
| // both head and tail are initially at 0 |
| headPos_ = 0; |
| tailPos_ = 0; |
| |
| // for lack of a better option, set chunk size to 0. Users can change this to whatever they want |
| chunkSize_ = 0; |
| |
| // initialize all the condition vars/mutexes |
| pthread_mutex_init(&mutex_, NULL); |
| pthread_cond_init(¬Full_, NULL); |
| pthread_cond_init(¬Empty_, NULL); |
| pthread_cond_init(&flushed_, NULL); |
| |
| // not closing the file during init |
| closing_ = false; |
| |
| // spawn writer thread |
| pthread_create(&writer_, NULL, startWriterThread, (void *)this); |
| } |
| |
| void TBufferedFileWriter::resetOutputFile(int fd, string filename, long long offset) { |
| filename_ = filename; |
| offset_ = offset; |
| |
| // check if current file is still open |
| if (fd_ > 0) { |
| // TODO: unclear if this should throw an error |
| fprintf(stderr, "error, current file not closed (trying to open %s)\n", filename_.c_str()); |
| ::close(fd_); |
| } |
| fd_ = fd; |
| } |
| |
| |
| TBufferedFileWriter::~TBufferedFileWriter() { |
| // flush output buffer |
| flush(); |
| |
| // send a signal to write thread to end |
| closing_ = true; |
| pthread_join(writer_, NULL); |
| |
| delete[] buffer_; |
| |
| // TODO: should the file be closed here? |
| } |
| |
| |
| void TBufferedFileWriter::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) { |
| // make sure that event size is valid |
| if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) { |
| // ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_); |
| return; |
| } |
| |
| if (eventLen == 0) { |
| ERROR("cannot enqueue an empty event"); |
| return; |
| } |
| |
| eventInfo toEnqueue; |
| uint8_t* bufCopy = (uint8_t *)malloc(sizeof(uint8_t) * eventLen); |
| toEnqueue.payLoad_ = bufCopy; |
| toEnqueue.eventSize_ = eventLen; |
| |
| return enqueueEvent(toEnqueue, blockUntilFlush); |
| } |
| |
| void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush) { |
| // Lock mutex |
| pthread_mutex_lock(&mutex_); |
| // Can't enqueue while buffer is full |
| while(isFull_) { |
| pthread_cond_wait(¬Full_, &mutex_); |
| } |
| |
| // make a copy and enqueue at tail of buffer |
| buffer_[tailPos_] = toEnqueue; |
| tailPos_ = (tailPos_+1) % sz_; |
| |
| // mark the buffer as non-empty |
| isEmpty_ = false; |
| |
| // circular buffer has wrapped around (and is full) |
| if(tailPos_ == headPos_) { |
| // DEBUG("queue is full"); |
| isFull_ = true; |
| } |
| |
| // signal anybody who's waiting for the buffer to be non-empty |
| pthread_cond_signal(¬Empty_); |
| if(blockUntilFlush) { |
| pthread_cond_wait(&flushed_, &mutex_); |
| } |
| |
| // TODO: don't return until flushed to disk |
| // this really should be a loop where it makes sure it got flushed |
| // because condition variables can get triggered by the os for no reason |
| // it is probably a non-factor for the time being |
| pthread_mutex_unlock(&mutex_); |
| |
| } |
| |
| eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) { |
| //deadline time struc |
| struct timespec ts; |
| if(deadline) { |
| ts.tv_sec = deadline/(1000*1000); |
| ts.tv_nsec = (deadline%(1000*1000))*1000; |
| } |
| |
| // wait for the queue to fill up |
| pthread_mutex_lock(&mutex_); |
| while(isEmpty_) { |
| // do a timed wait on the condition variable |
| if(deadline) { |
| int e = pthread_cond_timedwait(¬Empty_, &mutex_, &ts); |
| if(e == ETIMEDOUT) { |
| break; |
| } |
| } |
| else { |
| // just wait until the buffer gets an item |
| pthread_cond_wait(¬Empty_, &mutex_); |
| } |
| } |
| |
| string ret; |
| bool doSignal = false; |
| |
| // could be empty if we timed out |
| eventInfo retEvent; |
| if(!isEmpty_) { |
| retEvent = buffer_[headPos_]; |
| headPos_ = (headPos_+1) % sz_; |
| |
| isFull_ = false; |
| doSignal = true; |
| |
| // check if this is the last item in the buffer |
| if(headPos_ == tailPos_) { |
| isEmpty_ = true; |
| } |
| } |
| |
| // unlock the mutex and signal if required |
| pthread_mutex_unlock(&mutex_); |
| if(doSignal) { |
| pthread_cond_signal(¬Full_); |
| } |
| |
| return retEvent; |
| } |
| |
| |
| void TBufferedFileWriter::flush() |
| { |
| eventInfo flushEvent; |
| flushEvent.payLoad_ = NULL; |
| flushEvent.eventSize_ = 0; |
| |
| notFlushed_ = true; |
| |
| enqueueEvent(flushEvent, false); |
| |
| // wait for flush to take place |
| pthread_mutex_lock(&mutex_); |
| |
| while(notFlushed_) { |
| pthread_cond_wait(&flushed_, &mutex_); |
| } |
| |
| pthread_mutex_unlock(&mutex_); |
| } |
| |
| void TBufferedFileWriter::openOutputFile() { |
| mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH; |
| fd_ = ::open(filename_.c_str(), O_WRONLY | O_CREAT | O_APPEND, mode); |
| |
| // make sure open call was successful |
| if(fd_ == -1) { |
| char errorMsg[1024]; |
| sprintf(errorMsg, "TBufferedFileWriter: Could not open file: %s", filename_.c_str()); |
| perror(errorMsg); |
| throw TTransportException(errorMsg); |
| } |
| } |
| |
| uint32_t TBufferedFileWriter::getCurrentTime() { |
| long long ret; |
| struct timeval tv; |
| gettimeofday(&tv, NULL); |
| ret = tv.tv_sec; |
| ret = ret*1000*1000 + tv.tv_usec; |
| return ret; |
| } |
| |
| |
| void TBufferedFileWriter::writerThread() { |
| // open file if it is not open |
| if(!fd_) { |
| openOutputFile(); |
| } |
| |
| // Figure out the next time by which a flush must take place |
| long long nextFlush = getCurrentTime() + flushMaxUs_; |
| uint32_t unflushed = 0; |
| |
| while(1) { |
| // this will only be true when the destructor is being invoked |
| if(closing_) { |
| if(-1 == ::close(fd_)) { |
| perror("TBufferedFileWriter: error in close"); |
| } |
| throw TTransportException("error in file close"); |
| } |
| |
| //long long start = now(); |
| eventInfo outEvent = dequeueEvent(nextFlush); |
| |
| // sanity check on event |
| if ( (maxEventSize_ > 0) && (outEvent.eventSize_ > maxEventSize_)) { |
| ERROR("msg size is greater than max event size: %u > %u\n", outEvent.eventSize_, maxEventSize_); |
| continue; |
| } |
| //long long diff = now()-start; |
| //DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000); |
| |
| // If chunking is required, then make sure that msg does not cross chunk boundary |
| if( (outEvent.eventSize_ > 0) && (chunkSize_ != 0)) { |
| |
| // event size must be less than chunk size |
| if(outEvent.eventSize_ > chunkSize_) { |
| ERROR("TBufferedFileWriter: event size(%u) is greater than chunk size(%u): skipping event", |
| outEvent.eventSize_, chunkSize_); |
| continue; |
| } |
| |
| long long chunk1 = offset_/chunkSize_; |
| long long chunk2 = (offset_ + outEvent.eventSize_ - 1)/chunkSize_; |
| |
| // if adding this event will cross a chunk boundary, pad the chunk with zeros |
| if(chunk1 != chunk2) { |
| int padding = (int)(chunk2*chunkSize_ - offset_); |
| |
| // sanity check |
| if (padding <= 0) { |
| DEBUG("Padding is empty, skipping event"); |
| continue; |
| } |
| if (padding > (int32_t)chunkSize_) { |
| DEBUG("padding is larger than chunk size, skipping event"); |
| continue; |
| } |
| // DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2); |
| uint8_t zeros[padding]; |
| bzero(zeros, padding); |
| if(-1 == ::write(fd_, zeros, padding)) { |
| perror("TBufferedFileWriter: error while padding zeros"); |
| throw TTransportException("TBufferedFileWriter: error while padding zeros"); |
| } |
| unflushed += padding; |
| offset_ += padding; |
| } |
| } |
| |
| // write the dequeued event to the file |
| if(outEvent.eventSize_ > 0) { |
| if(-1 == ::write(fd_, outEvent.payLoad_, outEvent.eventSize_)) { |
| perror("TBufferedFileWriter: error while writing event"); |
| // TODO: should this trigger an exception or simply continue? |
| throw TTransportException("TBufferedFileWriter: error while writing event"); |
| } |
| |
| // deallocate payload |
| free(outEvent.payLoad_); |
| |
| unflushed += outEvent.eventSize_; |
| offset_ += outEvent.eventSize_; |
| } |
| |
| // couple of cases from which a flush could be triggered |
| if((getCurrentTime() >= nextFlush && unflushed > 0) || |
| unflushed > flushMaxBytes_ || |
| (outEvent.eventSize_ == 0) ) { |
| //Debug("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_); |
| |
| // sync (force flush) file to disk |
| fsync(fd_); |
| nextFlush = getCurrentTime() + flushMaxUs_; |
| unflushed = 0; |
| |
| // notify anybody(thing?) waiting for flush completion |
| pthread_mutex_lock(&mutex_); |
| notFlushed_ = false; |
| pthread_mutex_unlock(&mutex_); |
| pthread_cond_broadcast(&flushed_); |
| } |
| } |
| |
| } |
| |
| }}} // facebook::thrift::transport |