blob: e5c0e3e35beb80ed04062f5f4dc8bdd7349176f0 [file] [log] [blame]
#!/usr/bin/env python
import errno
import hashlib
import io
import logging
import os
import pwd
import signal
import subprocess
import sys
import time
try:
import dns.resolver
import dns.reversename
except ImportError:
pass
def get_uid_gid(username):
DAEMON_UID = pwd.getpwnam(username).pw_uid
DAEMON_GID = pwd.getpwnam(username).pw_gid
return DAEMON_UID, DAEMON_GID
def set_logger(level=logging.DEBUG):
LOG_DATEFMT = "%Y-%m-%d %H:%M:%S"
LOG_FORMAT = "%(asctime)s.%(msecs)03d - %(levelname)s - %(message)s"
logging.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATEFMT)
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.DEBUG)
return LOG
def send_signal(pid, sig):
LOG.debug('Sending signal %d to pid %d', sig, pid)
os.kill(pid, sig)
def demote(demote_uid, demote_gid):
def set_ids():
os.setgid(demote_gid)
os.setuid(demote_uid)
return set_ids
def execute_cmd(cmd, directory, uid, gid):
LOG.debug("Executing cmd '%s' with uid/gid %d/%d ", cmd, uid, gid)
return subprocess.Popen(cmd, cwd=directory, preexec_fn=demote(uid, gid))
def dns_query(address, record_type, retries=10, interval=5):
dns_response = None
LOG.debug("DNS query for address '%s' record type '%s'", address, record_type)
while(retries > 0):
time.sleep(interval)
try:
dns_response = dns.resolver.query(address, record_type)
except:
LOG.error('Fail to get DNS records for autodiscovery retries %d', retries)
retries -= 1
else:
break
if dns_response:
LOG.debug('Got dns response %s', dns_response.__dict__)
return dns_response
else:
raise Exception('Fail to get DNS response')
def get_data_dir(node_ip, retries=10, interval=5):
address = dns.reversename.from_address(node_ip)
ptr = dns_query(address, 'PTR', retries, interval)
if ptr:
# monitoring_server.2.c81t242vtv4qobpv75mckqwob.monitoring_monitoring.
ptr_str = str(ptr[0]).split('.')
# ptr_str[0] = service name, monitoring_server
# ptr_str[1] = replica number, 2
# ptr_str[3] = docker id, c81t242vtv4qobpv75mckqwob
# ptr_str[4] = network name, monitoring_monitoring
if ptr_str[1]:
return 'data/{}/'.format(ptr_str[1])
LOG.error('Fail to discover data dir')
raise Exception('Fail to discover data dir')
def create_data_dir(data_dir_prefix, data_dir, uid, gid):
try:
os.makedirs(os.path.join(data_dir_prefix, data_dir))
except OSError as e:
if e.errno != errno.EEXIST:
LOG.error('Fail to create data dir, exiting')
raise Exception('Fail to create data dir')
os.chown(os.path.join(data_dir_prefix, data_dir), uid, gid)
return os.path.join(data_dir_prefix, data_dir)
def calculate_hash(directory):
cur_hash = hashlib.sha256()
try:
for filename in os.listdir(directory):
try:
with open(os.path.join(directory, filename), "rb") as f:
for chunk in iter(lambda: f.read(io.DEFAULT_BUFFER_SIZE), b""):
cur_hash.update(chunk)
except IOError as e:
LOG.error('Fail to read file: %s', e)
except OSError as e:
LOG.error('Fail to calculate hash: %s', e)
return cur_hash.hexdigest()
def wait_for_process_watch_config(proc, config_dir, interval=10):
config_hash = calculate_hash(config_dir)
while proc.poll() is None:
new_hash = calculate_hash(config_dir)
if config_hash != new_hash:
LOG.info('Reloading service, new hash %s', new_hash)
config_hash = new_hash
send_signal(proc.pid, signal.SIGHUP)
time.sleep(interval)
code = proc.poll()
LOG.info("Process exited with code %d", code)
sys.exit(code)
def wait_for_process(proc):
code = proc.wait()
LOG.info("Process exited with code %d", code)
sys.exit(code)
def run_service(cmd, cwd, uid, gid):
proc = execute_cmd(cmd, cwd, uid, gid)
return proc, proc.pid
LOG = set_logger()