blob: ac46b97c792cd86a79fcadbf2803a2f8eaa83023 [file] [log] [blame]
Aditya Agarwale528c762006-10-11 02:48:43 +00001#include "TBufferedFileWriter.h"
2
3#include <pthread.h>
4#include <cassert>
5#include <cstdlib>
6#include <string>
7#include <sys/time.h>
8#include <sys/types.h>
9#include <fcntl.h>
10#include <errno.h>
11
12using std::string;
13
14namespace facebook { namespace thrift { namespace transport {
15
16TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz) {
17 init(filename, sz, 0, 0);
18}
19
20TBufferedFileWriter::TBufferedFileWriter(string filename, uint32_t sz, int fd, long long offset) {
21 init(filename, sz, fd, offset);
22}
23
24void TBufferedFileWriter::init(string filename, uint32_t sz, int fd, long long offset) {
25 // validate buffer size
26 sz_ = sz;
27 if (sz_ <= 0) {
28 throw TTransportException("invalid input buffer size");
29 }
30
31 // set file-related variables
32 fd_ = 0;
33 resetOutputFile(fd, filename, offset);
34
35 // set default values of flush related params
36 flushMaxBytes_ = 1024 * 100;
37 flushMaxUs_ = 20 * 1000;
38
39 // allocate event buffer
40 buffer_ = new eventInfo[sz_];
41
42 // buffer is initially empty
43 isEmpty_ = true;
44 isFull_ = false;
45
46 // both head and tail are initially at 0
47 headPos_ = 0;
48 tailPos_ = 0;
49
50 // for lack of a better option, set chunk size to 0. Users can change this to whatever they want
51 chunkSize_ = 0;
52
53 // initialize all the condition vars/mutexes
54 pthread_mutex_init(&mutex_, NULL);
55 pthread_cond_init(&notFull_, NULL);
56 pthread_cond_init(&notEmpty_, NULL);
57 pthread_cond_init(&flushed_, NULL);
58
59 // not closing the file during init
60 closing_ = false;
61
62 // spawn writer thread
63 pthread_create(&writer_, NULL, startWriterThread, (void *)this);
64}
65
66void TBufferedFileWriter::resetOutputFile(int fd, string filename, long long offset) {
67 filename_ = filename;
68 offset_ = offset;
69
70 // check if current file is still open
71 if (fd_ > 0) {
72 // TODO: unclear if this should throw an error
73 fprintf(stderr, "error, current file not closed (trying to open %s)\n", filename_.c_str());
74 ::close(fd_);
75 }
76 fd_ = fd;
77}
78
79
80TBufferedFileWriter::~TBufferedFileWriter() {
81 // flush output buffer
82 flush();
83
84 // send a signal to write thread to end
85 closing_ = true;
86 pthread_join(writer_, NULL);
87
88 delete[] buffer_;
89
90 // TODO: should the file be closed here?
91}
92
93
94void TBufferedFileWriter::enqueueEvent(const uint8_t* buf, uint32_t eventLen, bool blockUntilFlush) {
95 // make sure that event size is valid
96 if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) {
97 // ERROR("msg size is greater than max event size: %lu > %u\n", eventLen, maxEventSize_);
98 return;
99 }
100
101 if (eventLen == 0) {
102 ERROR("cannot enqueue an empty event");
103 return;
104 }
105
106 eventInfo toEnqueue;
107 uint8_t* bufCopy = (uint8_t *)malloc(sizeof(uint8_t) * eventLen);
108 toEnqueue.payLoad_ = bufCopy;
109 toEnqueue.eventSize_ = eventLen;
110
111 return enqueueEvent(toEnqueue, blockUntilFlush);
112}
113
114void TBufferedFileWriter::enqueueEvent(const eventInfo& toEnqueue, bool blockUntilFlush) {
115 // Lock mutex
116 pthread_mutex_lock(&mutex_);
117 // Can't enqueue while buffer is full
118 while(isFull_) {
119 pthread_cond_wait(&notFull_, &mutex_);
120 }
121
122 // make a copy and enqueue at tail of buffer
123 buffer_[tailPos_] = toEnqueue;
124 tailPos_ = (tailPos_+1) % sz_;
125
126 // mark the buffer as non-empty
127 isEmpty_ = false;
128
129 // circular buffer has wrapped around (and is full)
130 if(tailPos_ == headPos_) {
131 // DEBUG("queue is full");
132 isFull_ = true;
133 }
134
135 // signal anybody who's waiting for the buffer to be non-empty
136 pthread_cond_signal(&notEmpty_);
137 if(blockUntilFlush) {
138 pthread_cond_wait(&flushed_, &mutex_);
139 }
140
141 // TODO: don't return until flushed to disk
142 // this really should be a loop where it makes sure it got flushed
143 // because condition variables can get triggered by the os for no reason
144 // it is probably a non-factor for the time being
145 pthread_mutex_unlock(&mutex_);
146
147}
148
149eventInfo TBufferedFileWriter::dequeueEvent(long long deadline) {
150 //deadline time struc
151 struct timespec ts;
152 if(deadline) {
153 ts.tv_sec = deadline/(1000*1000);
154 ts.tv_nsec = (deadline%(1000*1000))*1000;
155 }
156
157 // wait for the queue to fill up
158 pthread_mutex_lock(&mutex_);
159 while(isEmpty_) {
160 // do a timed wait on the condition variable
161 if(deadline) {
162 int e = pthread_cond_timedwait(&notEmpty_, &mutex_, &ts);
163 if(e == ETIMEDOUT) {
164 break;
165 }
166 }
167 else {
168 // just wait until the buffer gets an item
169 pthread_cond_wait(&notEmpty_, &mutex_);
170 }
171 }
172
173 string ret;
174 bool doSignal = false;
175
176 // could be empty if we timed out
177 eventInfo retEvent;
178 if(!isEmpty_) {
179 retEvent = buffer_[headPos_];
180 headPos_ = (headPos_+1) % sz_;
181
182 isFull_ = false;
183 doSignal = true;
184
185 // check if this is the last item in the buffer
186 if(headPos_ == tailPos_) {
187 isEmpty_ = true;
188 }
189 }
190
191 // unlock the mutex and signal if required
192 pthread_mutex_unlock(&mutex_);
193 if(doSignal) {
194 pthread_cond_signal(&notFull_);
195 }
196
197 return retEvent;
198}
199
200
201void TBufferedFileWriter::flush()
202{
203 eventInfo flushEvent;
204 flushEvent.payLoad_ = NULL;
205 flushEvent.eventSize_ = 0;
206
207 notFlushed_ = true;
208
209 enqueueEvent(flushEvent, false);
210
211 // wait for flush to take place
212 pthread_mutex_lock(&mutex_);
213
214 while(notFlushed_) {
215 pthread_cond_wait(&flushed_, &mutex_);
216 }
217
218 pthread_mutex_unlock(&mutex_);
219}
220
221void TBufferedFileWriter::openOutputFile() {
222 mode_t mode = S_IRUSR| S_IWUSR| S_IRGRP | S_IROTH;
223 fd_ = ::open(filename_.c_str(), O_WRONLY | O_CREAT | O_APPEND, mode);
224
225 // make sure open call was successful
226 if(fd_ == -1) {
227 char errorMsg[1024];
228 sprintf(errorMsg, "TBufferedFileWriter: Could not open file: %s", filename_.c_str());
229 perror(errorMsg);
230 throw TTransportException(errorMsg);
231 }
232}
233
234uint32_t TBufferedFileWriter::getCurrentTime() {
235 long long ret;
236 struct timeval tv;
237 gettimeofday(&tv, NULL);
238 ret = tv.tv_sec;
239 ret = ret*1000*1000 + tv.tv_usec;
240 return ret;
241}
242
243
244void TBufferedFileWriter::writerThread() {
245 // open file if it is not open
246 if(!fd_) {
247 openOutputFile();
248 }
249
250 // Figure out the next time by which a flush must take place
251 long long nextFlush = getCurrentTime() + flushMaxUs_;
252 uint32_t unflushed = 0;
253
254 while(1) {
255 // this will only be true when the destructor is being invoked
256 if(closing_) {
257 if(-1 == ::close(fd_)) {
258 perror("TBufferedFileWriter: error in close");
259 }
260 throw TTransportException("error in file close");
261 }
262
263 //long long start = now();
264 eventInfo outEvent = dequeueEvent(nextFlush);
265
266 // sanity check on event
267 if ( (maxEventSize_ > 0) && (outEvent.eventSize_ > maxEventSize_)) {
268 ERROR("msg size is greater than max event size: %u > %u\n", outEvent.eventSize_, maxEventSize_);
269 continue;
270 }
271 //long long diff = now()-start;
272 //DEBUG("got a dequeue of size %d after %lld ms\n", (int)s.size(), diff/1000);
273
274 // If chunking is required, then make sure that msg does not cross chunk boundary
275 if( (outEvent.eventSize_ > 0) && (chunkSize_ != 0)) {
276
277 // event size must be less than chunk size
278 if(outEvent.eventSize_ > chunkSize_) {
279 ERROR("TBufferedFileWriter: event size(%u) is greater than chunk size(%u): skipping event",
280 outEvent.eventSize_, chunkSize_);
281 continue;
282 }
283
284 long long chunk1 = offset_/chunkSize_;
285 long long chunk2 = (offset_ + outEvent.eventSize_ - 1)/chunkSize_;
286
287 // if adding this event will cross a chunk boundary, pad the chunk with zeros
288 if(chunk1 != chunk2) {
289 int padding = (int)(chunk2*chunkSize_ - offset_);
290
291 // sanity check
292 if (padding <= 0) {
293 DEBUG("Padding is empty, skipping event");
294 continue;
295 }
296 if (padding > (int32_t)chunkSize_) {
297 DEBUG("padding is larger than chunk size, skipping event");
298 continue;
299 }
300 // DEBUG("padding %d zeros to get to chunk %lld\n", padding, chunk2);
301 uint8_t zeros[padding];
302 bzero(zeros, padding);
303 if(-1 == ::write(fd_, zeros, padding)) {
304 perror("TBufferedFileWriter: error while padding zeros");
305 throw TTransportException("TBufferedFileWriter: error while padding zeros");
306 }
307 unflushed += padding;
308 offset_ += padding;
309 }
310 }
311
312 // write the dequeued event to the file
313 if(outEvent.eventSize_ > 0) {
314 if(-1 == ::write(fd_, outEvent.payLoad_, outEvent.eventSize_)) {
315 perror("TBufferedFileWriter: error while writing event");
316 // TODO: should this trigger an exception or simply continue?
317 throw TTransportException("TBufferedFileWriter: error while writing event");
318 }
319
320 // deallocate payload
321 free(outEvent.payLoad_);
322
323 unflushed += outEvent.eventSize_;
324 offset_ += outEvent.eventSize_;
325 }
326
327 // couple of cases from which a flush could be triggered
328 if((getCurrentTime() >= nextFlush && unflushed > 0) ||
329 unflushed > flushMaxBytes_ ||
330 (outEvent.eventSize_ == 0) ) {
331 //Debug("flushing %d bytes to %s (%d %d, full? %d)", unflushed, filename_.c_str(), headPos_, tailPos_, isFull_);
332
333 // sync (force flush) file to disk
334 fsync(fd_);
335 nextFlush = getCurrentTime() + flushMaxUs_;
336 unflushed = 0;
337
338 // notify anybody(thing?) waiting for flush completion
339 pthread_mutex_lock(&mutex_);
340 notFlushed_ = false;
341 pthread_mutex_unlock(&mutex_);
342 pthread_cond_broadcast(&flushed_);
343 }
344 }
345
346}
347
348}}} // facebook::thrift::transport