Merge "Introduce an asynchronous http_check plugin"
diff --git a/collectd/files/collectd_http_check.conf b/collectd/files/collectd_http_check.conf
index 8c3673a..d14af9f 100644
--- a/collectd/files/collectd_http_check.conf
+++ b/collectd/files/collectd_http_check.conf
@@ -1,9 +1,10 @@
{%- if plugin.get('url', {})|length > 0 %}
-Import "http_check"
+Import "collectd_http_check"
-<Module "http_check">
+<Module "collectd_http_check">
MaxRetries "3"
Timeout "1"
+ PollingInterval "{{ plugin.polling_interval|default("10") }}"
{%- for name, params in plugin.url.iteritems() %}
ExpectedCode "{{ name }}" "{{ params.expected_code }}"
Url "{{ name }}" "{{ params.url }}"
diff --git a/collectd/files/plugin/http_check.py b/collectd/files/plugin/collectd_http_check.py
similarity index 64%
rename from collectd/files/plugin/http_check.py
rename to collectd/files/plugin/collectd_http_check.py
index 1002928..7747dc7 100644
--- a/collectd/files/plugin/http_check.py
+++ b/collectd/files/plugin/collectd_http_check.py
@@ -29,19 +29,26 @@
def __init__(self, *args, **kwargs):
super(HTTPCheckPlugin, self).__init__(*args, **kwargs)
self.plugin = NAME
- self.session = requests.Session()
- self.session.mount(
- 'http://',
- requests.adapters.HTTPAdapter(max_retries=self.max_retries)
- )
- self.session.mount(
- 'https://',
- requests.adapters.HTTPAdapter(max_retries=self.max_retries)
- )
self.urls = {}
self.expected_codes = {}
self.expected_contents = {}
+ self.timeout = 3
+ self.max_retries = 2
+
+ self.interval = base.INTERVAL
+ self.polling_interval = base.INTERVAL
+
+ self.sessions = {}
+ self._threads = {}
+
+ def shutdown_callback(self):
+ for tid, t in self._threads.items():
+ if t.is_alive():
+ self.logger.info('Waiting for {} thread to finish'.format(tid))
+ t.stop()
+ t.join()
+
def config_callback(self, config):
super(HTTPCheckPlugin, self).config_callback(config)
for node in config.children:
@@ -52,10 +59,24 @@
elif node.key == 'ExpectedContent':
self.expected_contents[node.values[0]] = node.values[1]
- def itermetrics(self):
for name, url in self.urls.items():
+ session = requests.Session()
+ session.mount(
+ 'http://',
+ requests.adapters.HTTPAdapter(max_retries=self.max_retries)
+ )
+ if url.startswith('https'):
+ session.mount(
+ 'https://',
+ requests.adapters.HTTPAdapter(max_retries=self.max_retries)
+ )
+ self.sessions[name] = session
+
+ def check_url(self, name, url):
+
+ def get():
try:
- r = self.session.get(url, timeout=self.timeout)
+ r = self.sessions[name].get(url, timeout=self.timeout)
except Exception as e:
self.logger.warning("Got exception for '{}': {}".format(
url, e)
@@ -83,8 +104,30 @@
'Content "{}" does not match "{}"'.format(
r.content[0:30], expected_content
))
+ return [status]
- yield {'type_instance': name, 'values': status }
+ if url not in self._threads:
+ t = base.AsyncPoller(self.collectd,
+ get,
+ self.polling_interval,
+ url)
+ t.start()
+ self._threads[url] = t
+
+ t = self._threads[url]
+ if not t.is_alive():
+ self.logger.warning("Unexpected end of the thread {}".format(
+ t.name))
+ del self._threads[url]
+ return []
+
+ return t.results
+
+ def itermetrics(self):
+ for name, url in self.urls.items():
+ r = self.check_url(name, url)
+ if r:
+ yield {'type_instance': name, 'values': r}
plugin = HTTPCheckPlugin(collectd, disable_check_metric=True)
@@ -105,9 +148,15 @@
if __name__ == '__main__':
plugin.urls['google_ok'] = 'https://www.google.com'
plugin.urls['google_fail'] = 'https://www.google.com/not_found'
+ plugin.urls['no_network'] = 'https://127.0.0.2:999'
plugin.expected_codes['google_ok'] = 200
- plugin.expected_codes['github_fail'] = 200
+ plugin.expected_codes['google_fail'] = 200
+ collectd.load_configuration(plugin)
plugin.read_callback()
+ import time
+ time.sleep(base.INTERVAL)
+ plugin.read_callback()
+ plugin.shutdown_callback()
else:
collectd.register_config(config_callback)
collectd.register_notification(notification_callback)