Source code for performance.driver.classes.observer.marathonmetrics

import requests
import json
import time
import threading

from .events.marathon import MarathonMetricUpdateEvent

from performance.driver.core.classes import Observer
from performance.driver.core.events import Event, MetricUpdateEvent, \
                              TeardownEvent, ParameterUpdateEvent, StartEvent
from performance.driver.core.reflection import subscribesToHint, publishesHint
from performance.driver.core.utils import dictDiff


[docs]class MarathonMetricsObserver(Observer): """ The *Marathon Metrics Observer* is observing for changes in the marathon `/stats` endpoint and is emitting events according to it's configuration :: observers: - class: observer.MarathonMetricsObserver # The URL to the marathon metrics endpoint url: "{{marathon_url}}/metrics" # [Optional] Additional headers to send headers: Accept: test/plain This observer is polling the ``/metrics`` endpoint 2 times per second and for every value that is changed, a ``MetricUpdateEvent`` event is published. .. note:: The name of the parameter is always the flattened name in the JSON response. For example, a parameter change in the following path: .. highlight:: json :: { "foo": { "bar.baz": { "bax": 1 } } } Will be broadcasted as a change in the following path: .. highlight:: yaml :: foo.bar.baz.bax: 1 .. note:: This observer will automatically inject an ``Authorization`` header if a ``dcos_auth_token`` definition exists, so you don't have to specify it through the ``headers`` configuration. Note that a ``dcos_auth_token`` can be dynamically injected via an authentication task. """ @subscribesToHint(TeardownEvent, ParameterUpdateEvent, StartEvent) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.eventbus.subscribe(self.handleTeardownEvent, events=(TeardownEvent, )) self.eventbus.subscribe( self.handleParameterUpdateEvent, events=(ParameterUpdateEvent, )) self.eventbus.subscribe(self.handleStart, events=(StartEvent, )) self.previous = {} self.forceUpdate = False self.pollingActive = False self.pollingThread = None def handleStart(self, event): """ Start main thread at start """ self.pollingActive = True self.pollingThread = threading.Thread(target=self.pollingThreadTarget, name="marathonmetrics-poller") self.pollingThread.start() def handleParameterUpdateEvent(self, event): """ Force parameter update when we have a test restart """ self.forceUpdate = True def pollingThreadTarget(self): """ The main polling thread """ while self.pollingActive: self.checkMetrics() time.sleep(0.5) def handleTeardownEvent(self, event): """ Stop thread at teardown """ self.pollingActive = False self.pollingThread.join() @publishesHint(MetricUpdateEvent) def checkMetrics(self): """ Check for the state of the metrics """ definitions = self.getDefinitions() config = self.getRenderedConfig() url = config.get('url') headers = config.get('headers', {}) # If we are missing an `Authorization` header but we have a # `dcos_auth_token` definition, allocate an `Authorization` header now if not 'Authorization' in headers \ and 'dcos_auth_token' in definitions: headers['Authorization'] = 'token={}'.format( definitions['dcos_auth_token']) # Fetch metrics try: res = requests.get(url, headers=headers, verify=False) if res.status_code != 200: self.logger.debug( 'Metrics marathon endpoint not accessible (Received {} HTTP status code)'. format(res.status_code)) return # Get previous value and reset previous if we have a force update prevValue = self.previous if self.forceUpdate: self.forceUpdate = False prevValue = {} # Emit one event for every parameter value value = res.json() for path, vprev, vnext in dictDiff(prevValue, value): pathstr = '.'.join(path) self.logger.debug('Metric {} changed to {}'.format(pathstr, vnext)) self.eventbus.publish(MarathonMetricUpdateEvent(pathstr, vnext)) self.previous = value except requests.exceptions.ConnectionError as e: self.logger.debug('Metrics marathon endpoint not accessible')