| import os |
| import sys |
| import json |
| import time |
| import zlib |
| import array |
| import pprint |
| import logging |
| import threading |
| import traceback |
| import subprocess |
| import collections |
| |
| |
| import Pool # type: ignore |
| |
| |
| mod_name = "sensors" |
| __version__ = (0, 1) |
| |
| |
| logger = logging.getLogger("agent.sensors") |
| SensorsMap = {} |
| |
| |
| class Sensor(object): |
| def __init__(self, params, allowed=None, disallowed=None): |
| self.params = params |
| self.allowed = allowed |
| self.disallowed = disallowed |
| self.allowed_names = set() |
| |
| def add_data(self, device, name, value): |
| pass |
| |
| def collect(self): |
| pass |
| |
| def get_updates(self): |
| pass |
| |
| @classmethod |
| def unpack_results(cls, device, metrics, data, typecode): |
| pass |
| |
| def init(self): |
| pass |
| |
| def stop(self): |
| pass |
| |
| |
| class ArraysSensor(Sensor): |
| typecode = 'L' |
| |
| def __init__(self, params, allowed=None, disallowed=None): |
| Sensor.__init__(self, params, allowed, disallowed) |
| self.data = collections.defaultdict(lambda: array.array(self.typecode)) |
| self.prev_vals = {} |
| |
| def add_data(self, device, name, value): |
| self.data[(device, name)].append(value) |
| |
| def add_relative(self, device, name, value): |
| key = (device, name) |
| pval = self.prev_vals.get(key) |
| if pval is not None: |
| self.data[key].append(value - pval) |
| self.prev_vals[key] = value |
| |
| def get_updates(self): |
| res = self.data |
| self.data = collections.defaultdict(lambda: array.array(self.typecode)) |
| return {key: (arr.typecode, arr.tostring()) for key, arr in res.items()} |
| |
| @classmethod |
| def unpack_results(cls, device, metrics, packed, typecode): |
| arr = array.array(typecode) |
| if sys.version_info >= (3, 0, 0): |
| arr.frombytes(packed) |
| else: |
| arr.fromstring(packed) |
| return arr |
| |
| def is_dev_accepted(self, name): |
| dev_ok = True |
| |
| if self.disallowed is not None: |
| dev_ok = all(not name.startswith(prefix) for prefix in self.disallowed) |
| |
| if dev_ok and self.allowed is not None: |
| dev_ok = any(name.startswith(prefix) for prefix in self.allowed) |
| |
| return dev_ok |
| |
| |
| time_array_typechar = ArraysSensor.typecode |
| |
| |
| def provides(name): |
| def closure(cls): |
| SensorsMap[name] = cls |
| return cls |
| return closure |
| |
| |
| def get_pid_list(disallowed_prefixes, allowed_prefixes): |
| """Return pid list from list of pids and names""" |
| # exceptions |
| disallowed = disallowed_prefixes if disallowed_prefixes is not None else [] |
| if allowed_prefixes is None: |
| # if nothing setted - all ps will be returned except setted |
| result = [pid for pid in os.listdir('/proc') |
| if pid.isdigit() and pid not in disallowed] |
| else: |
| result = [] |
| for pid in os.listdir('/proc'): |
| if pid.isdigit() and pid not in disallowed: |
| name = get_pid_name(pid) |
| if pid in allowed_prefixes or any(name.startswith(val) for val in allowed_prefixes): |
| # this is allowed pid? |
| result.append(pid) |
| return result |
| |
| |
| def get_pid_name(pid): |
| """Return name by pid""" |
| try: |
| with open(os.path.join('/proc/', pid, 'cmdline'), 'r') as pidfile: |
| try: |
| cmd = pidfile.readline().split()[0] |
| return os.path.basename(cmd).rstrip('\x00') |
| except IndexError: |
| # no cmd returned |
| return "<NO NAME>" |
| except IOError: |
| # upstream wait any string, no matter if we couldn't read proc |
| return "no_such_process" |
| |
| |
| @provides("block-io") |
| class BlockIOSensor(ArraysSensor): |
| # 1 - major number |
| # 2 - minor mumber |
| # 3 - device name |
| # 4 - reads completed successfully |
| # 5 - reads merged |
| # 6 - sectors read |
| # 7 - time spent reading (ms) |
| # 8 - writes completed |
| # 9 - writes merged |
| # 10 - sectors written |
| # 11 - time spent writing (ms) |
| # 12 - I/Os currently in progress |
| # 13 - time spent doing I/Os (ms) |
| # 14 - weighted time spent doing I/Os (ms) |
| |
| io_values_pos = [ |
| (3, 'reads_completed', True), |
| (5, 'sectors_read', True), |
| (6, 'rtime', True), |
| (7, 'writes_completed', True), |
| (9, 'sectors_written', True), |
| (10, 'wtime', True), |
| (11, 'io_queue', False), |
| (13, 'io_time', True) |
| ] |
| |
| def __init__(self, *args, **kwargs): |
| ArraysSensor.__init__(self, *args, **kwargs) |
| |
| if self.disallowed is None: |
| self.disallowed = ('ram', 'loop') |
| |
| for line in open('/proc/diskstats'): |
| vals = line.split() |
| dev_name = vals[2] |
| if self.is_dev_accepted(dev_name) and not dev_name[-1].isdigit(): |
| self.allowed_names.add(dev_name) |
| |
| self.collect(init_rel=True) |
| |
| def collect(self, init_rel=False): |
| for line in open('/proc/diskstats'): |
| vals = line.split() |
| dev_name = vals[2] |
| |
| if dev_name not in self.allowed_names: |
| continue |
| |
| for pos, name, aggregated in self.io_values_pos: |
| vl = int(vals[pos]) |
| if aggregated: |
| self.add_relative(dev_name, name, vl) |
| elif not init_rel: |
| self.add_data(dev_name, name, int(vals[pos])) |
| |
| |
| @provides("net-io") |
| class NetIOSensor(ArraysSensor): |
| # 1 - major number |
| # 2 - minor mumber |
| # 3 - device name |
| # 4 - reads completed successfully |
| # 5 - reads merged |
| # 6 - sectors read |
| # 7 - time spent reading (ms) |
| # 8 - writes completed |
| # 9 - writes merged |
| # 10 - sectors written |
| # 11 - time spent writing (ms) |
| # 12 - I/Os currently in progress |
| # 13 - time spent doing I/Os (ms) |
| # 14 - weighted time spent doing I/Os (ms) |
| |
| net_values_pos = [ |
| (0, 'recv_bytes', True), |
| (1, 'recv_packets', True), |
| (8, 'send_bytes', True), |
| (9, 'send_packets', True), |
| ] |
| |
| def __init__(self, *args, **kwargs): |
| ArraysSensor.__init__(self, *args, **kwargs) |
| |
| if self.disallowed is None: |
| self.disallowed = ('docker', 'lo') |
| |
| if self.allowed is None: |
| self.allowed = ('eth',) |
| |
| for _, _, aggregated in self.net_values_pos: |
| assert aggregated, "Non-aggregated values is not supported in net sensor" |
| |
| for line in open('/proc/net/dev').readlines()[2:]: |
| dev_name, stats = line.split(":", 1) |
| dev_name = dev_name.strip() |
| if self.is_dev_accepted(dev_name): |
| self.allowed_names.add(dev_name) |
| |
| self.collect(init_rel=True) |
| |
| def collect(self, init_rel=False): |
| for line in open('/proc/net/dev').readlines()[2:]: |
| dev_name, stats = line.split(":", 1) |
| dev_name = dev_name.strip() |
| if dev_name in self.allowed_names: |
| vals = stats.split() |
| for pos, name, _ in self.net_values_pos: |
| vl = int(vals[pos]) |
| self.add_relative(dev_name, name, vl ) |
| |
| |
| def pid_stat(pid): |
| """Return total cpu usage time from process""" |
| # read /proc/pid/stat |
| with open(os.path.join('/proc/', pid, 'stat'), 'r') as pidfile: |
| proctimes = pidfile.readline().split() |
| # get utime from /proc/<pid>/stat, 14 item |
| utime = proctimes[13] |
| # get stime from proc/<pid>/stat, 15 item |
| stime = proctimes[14] |
| # count total process used time |
| return float(int(utime) + int(stime)) |
| |
| |
| @provides("perprocess-cpu") |
| class ProcCpuSensor(ArraysSensor): |
| def collect(self): |
| # TODO(koder): fixed list of PID's must be given |
| for pid in get_pid_list(self.disallowed, self.allowed): |
| try: |
| self.add_data(get_pid_name(pid), pid, pid_stat(pid)) |
| except IOError: |
| # probably proc has already terminated, skip it |
| continue |
| |
| |
| def get_mem_stats(pid): |
| """Return memory data of pid in format (private, shared)""" |
| |
| fname = '/proc/{0}/{1}'.format(pid, "smaps") |
| lines = open(fname).readlines() |
| |
| shared = 0 |
| private = 0 |
| pss = 0 |
| |
| # add 0.5KiB as this avg error due to truncation |
| pss_adjust = 0.5 |
| |
| for line in lines: |
| if line.startswith("Shared"): |
| shared += int(line.split()[1]) |
| |
| if line.startswith("Private"): |
| private += int(line.split()[1]) |
| |
| if line.startswith("Pss"): |
| pss += float(line.split()[1]) + pss_adjust |
| |
| # Note Shared + Private = Rss above |
| # The Rss in smaps includes video card mem etc. |
| |
| if pss != 0: |
| shared = int(pss - private) |
| |
| return (private, shared) |
| |
| |
| def get_ram_size(): |
| """Return RAM size in Kb""" |
| with open("/proc/meminfo") as proc: |
| mem_total = proc.readline().split() |
| return int(mem_total[1]) |
| |
| |
| @provides("perprocess-ram") |
| class ProcRamSensor(ArraysSensor): |
| def collect(self): |
| # TODO(koder): fixed list of PID's nust be given |
| for pid in get_pid_list(self.disallowed, self.allowed): |
| try: |
| dev_name = get_pid_name(pid) |
| |
| private, shared = get_mem_stats(pid) |
| total = private + shared |
| sys_total = get_ram_size() |
| usage = float(total) / sys_total |
| |
| sensor_name = "{0}({1})".format(dev_name, pid) |
| |
| self.add_data(sensor_name, "private_mem", private) |
| self.add_data(sensor_name, "shared_mem", shared), |
| self.add_data(sensor_name, "used_mem", total), |
| self.add_data(sensor_name, "mem_usage_percent", int(usage * 100)) |
| except IOError: |
| # permission denied or proc die |
| continue |
| |
| |
| @provides("system-cpu") |
| class SystemCPUSensor(ArraysSensor): |
| # 0 - cpu name |
| # 1 - user: normal processes executing in user mode |
| # 2 - nice: niced processes executing in user mode |
| # 3 - system: processes executing in kernel mode |
| # 4 - idle: twiddling thumbs |
| # 5 - iowait: waiting for I/O to complete |
| # 6 - irq: servicing interrupts |
| # 7 - softirq: servicing softirqs |
| |
| cpu_values_pos = [ |
| (1, 'user_processes', True), |
| (2, 'nice_processes', True), |
| (3, 'system_processes', True), |
| (4, 'idle_time', True), |
| ] |
| |
| def collect(self): |
| # calculate core count |
| core_count = 0 |
| |
| for line in open('/proc/stat'): |
| vals = line.split() |
| dev_name = vals[0] |
| |
| if dev_name == 'cpu': |
| for pos, name, _ in self.cpu_values_pos: |
| self.add_data(dev_name, name, int(vals[pos])) |
| elif dev_name == 'procs_blocked': |
| self.add_data("cpu", "procs_blocked", int(vals[1])) |
| elif dev_name.startswith('cpu'): |
| core_count += 1 |
| |
| # procs in queue |
| TASKSPOS = 3 |
| vals = open('/proc/loadavg').read().split() |
| ready_procs = vals[TASKSPOS].partition('/')[0] |
| |
| # dec on current proc |
| procs_queue = (float(ready_procs) - 1) / core_count |
| self.add_data("cpu", "procs_queue_x10", int(procs_queue * 10)) |
| |
| |
| @provides("system-ram") |
| class SystemRAMSensor(ArraysSensor): |
| # return this values or setted in allowed |
| ram_fields = ['MemTotal', 'MemFree', 'Buffers', 'Cached', 'SwapCached', |
| 'Dirty', 'Writeback', 'SwapTotal', 'SwapFree'] |
| |
| def __init__(self, *args, **kwargs): |
| ArraysSensor.__init__(self, *args, **kwargs) |
| |
| if self.allowed is None: |
| self.allowed = self.ram_fields |
| |
| self.allowed_fields = set() |
| for line in open('/proc/meminfo'): |
| field_name = line.split()[0].rstrip(":") |
| if self.is_dev_accepted(field_name): |
| self.allowed_fields.add(field_name) |
| |
| def collect(self): |
| for line in open('/proc/meminfo'): |
| vals = line.split() |
| field = vals[0].rstrip(":") |
| if field in self.allowed_fields: |
| self.add_data("ram", field, int(vals[1])) |
| |
| |
| try: |
| from ceph_daemon import admin_socket |
| except ImportError: |
| admin_socket = None |
| |
| |
| @provides("ceph") |
| class CephSensor(ArraysSensor): |
| |
| historic_duration = 2 |
| historic_size = 200 |
| |
| def run_ceph_daemon_cmd(self, osd_id, *args): |
| asok = "/var/run/ceph/{}-osd.{}.asok".format(self.cluster, osd_id) |
| if admin_socket: |
| res = admin_socket(asok, args) |
| else: |
| res = subprocess.check_output("ceph daemon {} {}".format(asok, " ".join(args)), shell=True) |
| |
| return res |
| |
| def collect(self): |
| def get_val(dct, path): |
| if '/' in path: |
| root, next = path.split('/', 1) |
| return get_val(dct[root], next) |
| return dct[path] |
| |
| for osd_id in self.params['osds']: |
| data = json.loads(self.run_ceph_daemon_cmd(osd_id, 'perf', 'dump')) |
| for key_name in self.params['counters']: |
| self.add_data("osd{}".format(osd_id), key_name.replace("/", "."), get_val(data, key_name)) |
| |
| if 'historic' in self.params.get('sources', {}): |
| self.historic.setdefault(osd_id, []).append(self.run_ceph_daemon_cmd(osd_id, "dump_historic_ops")) |
| |
| if 'in_flight' in self.params.get('sources', {}): |
| self.in_flight.setdefault(osd_id, []).append(self.run_ceph_daemon_cmd(osd_id, "dump_ops_in_flight")) |
| |
| def set_osd_historic(self, duration, keep, osd_id): |
| data = json.loads(self.run_ceph_daemon_cmd(osd_id, "dump_historic_ops")) |
| self.run_ceph_daemon_cmd(osd_id, "config set osd_op_history_duration {}".format(duration)) |
| self.run_ceph_daemon_cmd(osd_id, "config set osd_op_history_size {}".format(keep)) |
| return (data["duration to keep"], data["num to keep"]) |
| |
| def init(self): |
| self.cluster = self.params['cluster'] |
| self.prev_vals = {} |
| self.historic = {} |
| self.in_flight = {} |
| |
| if 'historic' in self.params.get('sources', {}): |
| for osd_id in self.params['osds']: |
| self.prev_vals[osd_id] = self.set_osd_historic(self.historic_duration, self.historic_size, osd_id) |
| |
| def stop(self): |
| for osd_id, (duration, keep) in self.prev_vals.items(): |
| self.prev_vals[osd_id] = self.set_osd_historic(duration, keep, osd_id) |
| |
| def get_updates(self): |
| res = super().get_updates() |
| |
| for osd_id, data in self.historic.items(): |
| res[("osd{}".format(osd_id), "historic")] = (None, data) |
| |
| self.historic = {} |
| |
| for osd_id, data in self.in_flight.items(): |
| res[("osd{}".format(osd_id), "in_flight")] = (None, data) |
| |
| self.in_flight = {} |
| |
| return res |
| |
| @classmethod |
| def unpack_results(cls, device, metrics, packed, typecode): |
| if metrics in ('historic', 'in_flight'): |
| assert typecode is None |
| return packed |
| |
| arr = array.array(typecode) |
| if sys.version_info >= (3, 0, 0): |
| arr.frombytes(packed) |
| else: |
| arr.fromstring(packed) |
| |
| return arr |
| |
| |
| class SensorsData(object): |
| def __init__(self): |
| self.cond = threading.Condition() |
| self.collected_at = array.array(time_array_typechar) |
| self.stop = False |
| self.sensors = {} |
| self.data_fd = None # temporary file to store results |
| self.exception = None |
| |
| |
| def collect(sensors_config): |
| curr = {} |
| for name, config in sensors_config.items(): |
| params = {'config': config} |
| |
| if "allow" in config: |
| params["allowed_prefixes"] = config["allow"] |
| |
| if "disallow" in config: |
| params["disallowed_prefixes"] = config["disallow"] |
| |
| curr[name] = SensorsMap[name](**params) |
| return curr |
| |
| |
| def sensors_bg_thread(sensors_config, sdata): |
| try: |
| next_collect_at = time.time() |
| if "pool_sz" in sensors_config: |
| sensors_config = sensors_config.copy() |
| pool_sz = sensors_config.pop("pool_sz") |
| else: |
| pool_sz = 32 |
| |
| if pool_sz != 0: |
| pool = Pool(sensors_config.get("pool_sz")) |
| else: |
| pool = None |
| |
| # prepare sensor classes |
| with sdata.cond: |
| sdata.sensors = {} |
| for name, config in sensors_config.items(): |
| params = {'params': config} |
| logger.debug("Start sensor %r with config %r", name, config) |
| |
| if "allow" in config: |
| params["allowed_prefixes"] = config["allow"] |
| |
| if "disallow" in config: |
| params["disallowed_prefixes"] = config["disallow"] |
| |
| sdata.sensors[name] = SensorsMap[name](**params) |
| sdata.sensors[name].init() |
| |
| logger.debug("sensors.config = %s", pprint.pformat(sensors_config)) |
| logger.debug("Sensors map keys %s", ", ".join(sdata.sensors.keys())) |
| |
| # TODO: handle exceptions here |
| # main loop |
| while not sdata.stop: |
| dtime = next_collect_at - time.time() |
| if dtime > 0: |
| with sdata.cond: |
| sdata.cond.wait(dtime) |
| |
| next_collect_at += 1.0 |
| |
| if sdata.stop: |
| break |
| |
| ctm = time.time() |
| with sdata.cond: |
| sdata.collected_at.append(int(ctm * 1000000)) |
| if pool is not None: |
| caller = lambda x: x() |
| for ok, val in pool.map(caller, [sensor.collect for sensor in sdata.sensors.values()]): |
| if not ok: |
| raise val |
| else: |
| for sensor in sdata.sensors.values(): |
| sensor.collect() |
| etm = time.time() |
| sdata.collected_at.append(int(etm * 1000000)) |
| logger.debug("Add data to collected_at - %s, %s", ctm, etm) |
| |
| if etm - ctm > 0.1: |
| # TODO(koder): need to signal that something in not really ok with sensor collecting |
| pass |
| |
| except Exception: |
| logger.exception("In sensor BG thread") |
| sdata.exception = traceback.format_exc() |
| finally: |
| for sensor in sdata.sensors.values(): |
| sensor.stop() |
| |
| |
| sensors_thread = None |
| sdata = None # type: SensorsData |
| |
| |
| def rpc_start(sensors_config): |
| global sensors_thread |
| global sdata |
| |
| if array.array('L').itemsize != 8: |
| message = "Python array.array('L') items should be 8 bytes in size, not {}." + \ |
| " Can't provide sensors on this platform. Disable sensors in config and retry" |
| raise ValueError(message.format(array.array('L').itemsize)) |
| |
| if sensors_thread is not None: |
| raise ValueError("Thread already running") |
| |
| sdata = SensorsData() |
| sensors_thread = threading.Thread(target=sensors_bg_thread, args=(sensors_config, sdata)) |
| sensors_thread.daemon = True |
| sensors_thread.start() |
| |
| |
| def unpack_rpc_updates(res_tuple): |
| offset_map, compressed_blob, compressed_collected_at_b = res_tuple |
| blob = zlib.decompress(compressed_blob) |
| collected_at_b = zlib.decompress(compressed_collected_at_b) |
| collected_at = array.array(time_array_typechar) |
| collected_at.frombytes(collected_at_b) |
| yield 'collected_at', collected_at |
| |
| # TODO: data is unpacked/repacked here with no reason |
| for sensor_path, (offset, size, typecode) in offset_map.items(): |
| sensor_path = sensor_path.decode("utf8") |
| sensor_name, device, metric = sensor_path.split('.', 2) |
| sensor_data = SensorsMap[sensor_name].unpack_results(device, |
| metric, |
| blob[offset:offset + size], |
| typecode.decode("ascii")) |
| yield sensor_path, sensor_data |
| |
| |
| def rpc_get_updates(): |
| if sdata is None: |
| raise ValueError("No sensor thread running") |
| |
| offset_map = collected_at = None |
| blob = "" |
| |
| with sdata.cond: |
| if sdata.exception: |
| raise Exception(sdata.exception) |
| |
| offset_map = {} |
| for sensor_name, sensor in sdata.sensors.items(): |
| for (device, metrics), (typecode, val) in sensor.get_updates().items(): |
| offset_map["{}.{}.{}".format(sensor_name, device, metrics)] = (len(blob), len(val), typecode) |
| blob += val |
| |
| collected_at = sdata.collected_at |
| sdata.collected_at = array.array(sdata.collected_at.typecode) |
| |
| logger.debug(str(collected_at)) |
| return offset_map, zlib.compress(blob), zlib.compress(collected_at.tostring()) |
| |
| |
| def rpc_stop(): |
| global sensors_thread |
| global sdata |
| |
| if sensors_thread is None: |
| raise ValueError("No sensor thread running") |
| |
| sdata.stop = True |
| with sdata.cond: |
| sdata.cond.notify_all() |
| |
| sensors_thread.join() |
| |
| if sdata.exception: |
| raise Exception(sdata.exception) |
| |
| res = rpc_get_updates() |
| |
| sensors_thread = None |
| sdata = None |
| |
| return res |