| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include <sys/time.h> |
| |
| #include "FacebookBase.h" |
| #include "ServiceTracker.h" |
| #include "concurrency/ThreadManager.h" |
| |
| using namespace std; |
| using namespace facebook::fb303; |
| using namespace apache::thrift::concurrency; |
| |
| |
| uint64_t ServiceTracker::CHECKPOINT_MINIMUM_INTERVAL_SECONDS = 60; |
| int ServiceTracker::LOG_LEVEL = 5; |
| |
| |
| ServiceTracker::ServiceTracker(facebook::fb303::FacebookBase *handler, |
| void (*logMethod)(int, const string &), |
| bool featureCheckpoint, |
| bool featureStatusCheck, |
| bool featureThreadCheck, |
| Stopwatch::Unit stopwatchUnit) |
| : handler_(handler), logMethod_(logMethod), |
| featureCheckpoint_(featureCheckpoint), |
| featureStatusCheck_(featureStatusCheck), |
| featureThreadCheck_(featureThreadCheck), |
| stopwatchUnit_(stopwatchUnit), |
| checkpointServices_(0) |
| { |
| if (featureCheckpoint_) { |
| time_t now = time(NULL); |
| checkpointTime_ = now; |
| } else { |
| checkpointTime_ = 0; |
| } |
| } |
| |
| /** |
| * Registers the beginning of a "service method": basically, any of |
| * the implementations of Thrift remote procedure calls that a |
| * FacebookBase handler is handling. Controls concurrent |
| * services and reports statistics (via log and via fb303 counters). |
| * Throws an exception if the server is not ready to handle service |
| * methods yet. |
| * |
| * note: The relationship between startService() and finishService() |
| * is currently defined so that a call to finishService() should only |
| * be matched to this call to startService() if this method returns |
| * without exception. It wouldn't be a problem to implement things |
| * the other way, so that *every* start needed a finish, but this |
| * convention was chosen to match the way an object's constructor and |
| * destructor work together, i.e. to work well with ServiceMethod |
| * objects. |
| * |
| * @param const ServiceMethod &serviceMethod A reference to the ServiceMethod |
| * object instantiated at the start |
| * of the service method. |
| */ |
| void |
| ServiceTracker::startService(const ServiceMethod &serviceMethod) |
| { |
| // note: serviceMethod.timer_ automatically starts at construction. |
| |
| // log service start |
| logMethod_(5, serviceMethod.signature_); |
| |
| // check handler ready |
| if (featureStatusCheck_ && !serviceMethod.featureLogOnly_) { |
| // note: Throwing exceptions before counting statistics. See note |
| // in method header. |
| // note: A STOPPING server is not accepting new connections, but it |
| // is still handling any already-connected threads -- so from the |
| // service method's point of view, a status of STOPPING is a green |
| // light. |
| facebook::fb303::fb_status status = handler_->getStatus(); |
| if (status != facebook::fb303::ALIVE |
| && status != facebook::fb303::STOPPING) { |
| if (status == facebook::fb303::STARTING) { |
| throw ServiceException("Server starting up; please try again later"); |
| } else { |
| throw ServiceException("Server not alive; please try again later"); |
| } |
| } |
| } |
| |
| // check server threads |
| if (featureThreadCheck_ && !serviceMethod.featureLogOnly_) { |
| // note: Might want to put these messages in reportCheckpoint() if |
| // log is getting spammed. |
| if (threadManager_ != NULL) { |
| size_t idle_count = threadManager_->idleWorkerCount(); |
| if (idle_count == 0) { |
| stringstream message; |
| message << "service " << serviceMethod.signature_ |
| << ": all threads (" << threadManager_->workerCount() |
| << ") in use"; |
| logMethod_(3, message.str()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Logs a significant step in the middle of a "service method"; see |
| * startService. |
| * |
| * @param const ServiceMethod &serviceMethod A reference to the ServiceMethod |
| * object instantiated at the start |
| * of the service method. |
| * @return int64_t Elapsed units (see stopwatchUnit_) since ServiceMethod |
| * instantiation. |
| */ |
| int64_t |
| ServiceTracker::stepService(const ServiceMethod &serviceMethod, |
| const string &stepName) |
| { |
| stringstream message; |
| string elapsed_label; |
| int64_t elapsed = serviceMethod.timer_.elapsedUnits(stopwatchUnit_, |
| &elapsed_label); |
| message << serviceMethod.signature_ |
| << ' ' << stepName |
| << " [" << elapsed_label << ']'; |
| logMethod_(5, message.str()); |
| return elapsed; |
| } |
| |
| /** |
| * Registers the end of a "service method"; see startService(). |
| * |
| * @param const ServiceMethod &serviceMethod A reference to the ServiceMethod |
| * object instantiated at the start |
| * of the service method. |
| */ |
| void |
| ServiceTracker::finishService(const ServiceMethod &serviceMethod) |
| { |
| // log end of service |
| stringstream message; |
| string duration_label; |
| int64_t duration = serviceMethod.timer_.elapsedUnits(stopwatchUnit_, |
| &duration_label); |
| message << serviceMethod.signature_ |
| << " finish [" << duration_label << ']'; |
| logMethod_(5, message.str()); |
| |
| // count, record, and maybe report service statistics |
| if (!serviceMethod.featureLogOnly_) { |
| |
| if (!featureCheckpoint_) { |
| |
| // lifetime counters |
| // (note: No need to lock statisticsMutex_ if not doing checkpoint; |
| // FacebookService::incrementCounter() is already thread-safe.) |
| handler_->incrementCounter("lifetime_services"); |
| |
| } else { |
| |
| statisticsMutex_.lock(); |
| // note: No exceptions expected from this code block. Wrap in a try |
| // just to be safe. |
| try { |
| |
| // lifetime counters |
| // note: Good to synchronize this with the increment of |
| // checkpoint services, even though incrementCounter() is |
| // already thread-safe, for the sake of checkpoint reporting |
| // consistency (i.e. since the last checkpoint, |
| // lifetime_services has incremented by checkpointServices_). |
| handler_->incrementCounter("lifetime_services"); |
| |
| // checkpoint counters |
| checkpointServices_++; |
| checkpointDuration_ += duration; |
| |
| // per-service timing |
| // note kjv: According to my tests it is very slightly faster to |
| // call insert() once (and detect not-found) than calling find() |
| // and then maybe insert (if not-found). However, the difference |
| // is tiny for small maps like this one, and the code for the |
| // faster solution is slightly less readable. Also, I wonder if |
| // the instantiation of the (often unused) pair to insert makes |
| // the first algorithm slower after all. |
| map<string, pair<uint64_t, uint64_t> >::iterator iter; |
| iter = checkpointServiceDuration_.find(serviceMethod.name_); |
| if (iter != checkpointServiceDuration_.end()) { |
| iter->second.first++; |
| iter->second.second += duration; |
| } else { |
| checkpointServiceDuration_.insert(make_pair(serviceMethod.name_, |
| make_pair(1, duration))); |
| } |
| |
| // maybe report checkpoint |
| // note: ...if it's been long enough since the last report. |
| time_t now = time(NULL); |
| uint64_t check_interval = now - checkpointTime_; |
| if (check_interval >= CHECKPOINT_MINIMUM_INTERVAL_SECONDS) { |
| reportCheckpoint(); |
| } |
| |
| } catch (...) { |
| statisticsMutex_.unlock(); |
| throw; |
| } |
| statisticsMutex_.unlock(); |
| |
| } |
| } |
| } |
| |
| /** |
| * Logs some statistics gathered since the last call to this method. |
| * |
| * note: Thread race conditions on this method could cause |
| * misreporting and/or undefined behavior; the caller must protect |
| * uses of the object variables (and calls to this method) with a |
| * mutex. |
| * |
| */ |
| void |
| ServiceTracker::reportCheckpoint() |
| { |
| time_t now = time(NULL); |
| |
| uint64_t check_count = checkpointServices_; |
| uint64_t check_interval = now - checkpointTime_; |
| uint64_t check_duration = checkpointDuration_; |
| |
| // export counters for timing of service methods (by service name) |
| handler_->setCounter("checkpoint_time", check_interval); |
| map<string, pair<uint64_t, uint64_t> >::iterator iter; |
| uint64_t count; |
| for (iter = checkpointServiceDuration_.begin(); |
| iter != checkpointServiceDuration_.end(); |
| iter++) { |
| count = iter->second.first; |
| handler_->setCounter(string("checkpoint_count_") + iter->first, count); |
| if (count == 0) { |
| handler_->setCounter(string("checkpoint_speed_") + iter->first, |
| 0); |
| } else { |
| handler_->setCounter(string("checkpoint_speed_") + iter->first, |
| iter->second.second / count); |
| } |
| } |
| |
| // reset checkpoint variables |
| // note: Clearing the map while other threads are using it might |
| // cause undefined behavior. |
| checkpointServiceDuration_.clear(); |
| checkpointTime_ = now; |
| checkpointServices_ = 0; |
| checkpointDuration_ = 0; |
| |
| // get lifetime variables |
| uint64_t life_count = handler_->getCounter("lifetime_services"); |
| uint64_t life_interval = now - handler_->aliveSince(); |
| |
| // log checkpoint |
| stringstream message; |
| message << "checkpoint_time:" << check_interval |
| << " checkpoint_services:" << check_count |
| << " checkpoint_speed_sum:" << check_duration |
| << " lifetime_time:" << life_interval |
| << " lifetime_services:" << life_count; |
| if (featureThreadCheck_ && threadManager_ != NULL) { |
| size_t worker_count = threadManager_->workerCount(); |
| size_t idle_count = threadManager_->idleWorkerCount(); |
| message << " total_workers:" << worker_count |
| << " active_workers:" << (worker_count - idle_count); |
| } |
| logMethod_(4, message.str()); |
| } |
| |
| /** |
| * Remembers the thread manager used in the server, for monitoring thread |
| * activity. |
| * |
| * @param shared_ptr<ThreadManager> threadManager The server's thread manager. |
| */ |
| void |
| ServiceTracker::setThreadManager(boost::shared_ptr<ThreadManager> |
| threadManager) |
| { |
| threadManager_ = threadManager; |
| } |
| |
| /** |
| * Logs messages to stdout; the passed message will be logged if the |
| * passed level is less than or equal to LOG_LEVEL. |
| * |
| * This is the default logging method used by the ServiceTracker. An |
| * alternate logging method (that accepts the same parameters) may be |
| * specified to the constructor. |
| * |
| * @param int level A level associated with the message: higher levels |
| * are used to indicate higher levels of detail. |
| * @param string message The message to log. |
| */ |
| void |
| ServiceTracker::defaultLogMethod(int level, const string &message) |
| { |
| if (level <= LOG_LEVEL) { |
| string level_string; |
| time_t now = time(NULL); |
| char now_pretty[26]; |
| ctime_r(&now, now_pretty); |
| now_pretty[24] = '\0'; |
| switch (level) { |
| case 1: |
| level_string = "CRITICAL"; |
| break; |
| case 2: |
| level_string = "ERROR"; |
| break; |
| case 3: |
| level_string = "WARNING"; |
| break; |
| case 5: |
| level_string = "DEBUG"; |
| break; |
| case 4: |
| default: |
| level_string = "INFO"; |
| break; |
| } |
| cout << '[' << level_string << "] [" << now_pretty << "] " |
| << message << endl; |
| } |
| } |
| |
| |
| /** |
| * Creates a Stopwatch, which can report the time elapsed since its |
| * creation. |
| * |
| */ |
| Stopwatch::Stopwatch() |
| { |
| gettimeofday(&startTime_, NULL); |
| } |
| |
| void |
| Stopwatch::reset() |
| { |
| gettimeofday(&startTime_, NULL); |
| } |
| |
| uint64_t |
| Stopwatch::elapsedUnits(Stopwatch::Unit unit, string *label) const |
| { |
| timeval now_time; |
| gettimeofday(&now_time, NULL); |
| time_t duration_secs = now_time.tv_sec - startTime_.tv_sec; |
| |
| uint64_t duration_units; |
| switch (unit) { |
| case UNIT_SECONDS: |
| duration_units = duration_secs |
| + (now_time.tv_usec - startTime_.tv_usec + 500000) / 1000000; |
| if (NULL != label) { |
| stringstream ss_label; |
| ss_label << duration_units << " secs"; |
| label->assign(ss_label.str()); |
| } |
| break; |
| case UNIT_MICROSECONDS: |
| duration_units = duration_secs * 1000000 |
| + now_time.tv_usec - startTime_.tv_usec; |
| if (NULL != label) { |
| stringstream ss_label; |
| ss_label << duration_units << " us"; |
| label->assign(ss_label.str()); |
| } |
| break; |
| case UNIT_MILLISECONDS: |
| default: |
| duration_units = duration_secs * 1000 |
| + (now_time.tv_usec - startTime_.tv_usec + 500) / 1000; |
| if (NULL != label) { |
| stringstream ss_label; |
| ss_label << duration_units << " ms"; |
| label->assign(ss_label.str()); |
| } |
| break; |
| } |
| return duration_units; |
| } |
| |
| /** |
| * Creates a ServiceMethod, used for tracking a single service method |
| * invocation (via the ServiceTracker). The passed name of the |
| * ServiceMethod is used to group statistics (e.g. counts and durations) |
| * for similar invocations; the passed signature is used to uniquely |
| * identify the particular invocation in the log. |
| * |
| * note: A version of this constructor is provided that automatically |
| * forms a signature the name and a passed numeric id. Silly, sure, |
| * but commonly used, since it often saves the caller a line or two of |
| * code. |
| * |
| * @param ServiceTracker *tracker The service tracker that will track this |
| * ServiceMethod. |
| * @param const string &name The service method name (usually independent |
| * of service method parameters). |
| * @param const string &signature A signature uniquely identifying the method |
| * invocation (usually name plus parameters). |
| */ |
| ServiceMethod::ServiceMethod(ServiceTracker *tracker, |
| const string &name, |
| const string &signature, |
| bool featureLogOnly) |
| : tracker_(tracker), name_(name), signature_(signature), |
| featureLogOnly_(featureLogOnly) |
| { |
| // note: timer_ automatically starts at construction. |
| |
| // invoke tracker to start service |
| // note: Might throw. If it throws, then this object's destructor |
| // won't be called, which is according to plan: finishService() is |
| // only supposed to be matched to startService() if startService() |
| // returns without error. |
| tracker_->startService(*this); |
| } |
| |
| ServiceMethod::ServiceMethod(ServiceTracker *tracker, |
| const string &name, |
| uint64_t id, |
| bool featureLogOnly) |
| : tracker_(tracker), name_(name), featureLogOnly_(featureLogOnly) |
| { |
| // note: timer_ automatically starts at construction. |
| stringstream ss_signature; |
| ss_signature << name << " (" << id << ')'; |
| signature_ = ss_signature.str(); |
| |
| // invoke tracker to start service |
| // note: Might throw. If it throws, then this object's destructor |
| // won't be called, which is according to plan: finishService() is |
| // only supposed to be matched to startService() if startService() |
| // returns without error. |
| tracker_->startService(*this); |
| } |
| |
| ServiceMethod::~ServiceMethod() |
| { |
| // invoke tracker to finish service |
| // note: Not expecting an exception from this code, but |
| // finishService() might conceivably throw an out-of-memory |
| // exception. |
| try { |
| tracker_->finishService(*this); |
| } catch (...) { |
| // don't throw |
| } |
| } |
| |
| uint64_t |
| ServiceMethod::step(const std::string &stepName) |
| { |
| return tracker_->stepService(*this, stepName); |
| } |