Source code for performance.driver.classes.observer.marathonpoller

import requests
import json
import time
import threading

from .marathonevents import *

from performance.driver.core.classes import Observer
from performance.driver.core.events import TickEvent, TeardownEvent, StartEvent
from performance.driver.core.reflection import subscribesToHint, publishesHint
from performance.driver.core.utils import dictDiff
from performance.driver.core.eventfilters import EventFilter

from performance.driver.classes.channel.marathon import MarathonDeploymentStartedEvent

DIFF_REASON_REMOVED = 0
DIFF_REASON_CREATED = 1
DIFF_REASON_MODIFIED = 2

EMPTY_GROUP = {"id": "/", "apps": [], "groups": [], "pods": []}


[docs]class MarathonPollerObserver(Observer): """ The *Marathon Poller Observer* is a polling-based fallback observer that can fully replace the ``MarathonEventsObserver`` when the SSE event bus is not available. :: observers: - class: observer.MarathonPollerObserver # The URL to the marathon base url: "{{marathon_url}}" # [Optional] Additional headers to send headers: Accept: test/plain # [Optional] How long to wait between consecutive polls (seconds) interval: 0.5 # [Optional] How long to wait before considering the deployment "Failed" # If set to 0 the deployment will never fail. failureTimeout: 0 # [Optional] How many times to re-try polling the endpoint before # considering the connection closed retries: 3 # [Optional] Event binding events: # [Optional] Which event to wait to start polling start: StartEvent # [Optional] Which event to wait to stop polling stop: TeardownEvent This observer is polling the ``/groups`` endpoint as fast as possible and it calculates deferences from the previously observed state. Any differences are propagated as virtual deployment events as: * ``MarathonDeploymentSuccessEvent`` * ``MarathonDeploymentFailedEvent`` If requested, the poller is going to look for ``MarathonDeploymentStartedEvent`` events and is going to wait for it to be completed in a given time. If the time is passed, a synthetic failure event will be generated: * ``MarathonDeploymentFailedEvent`` .. note:: This observer will automatically inject an ``Authorization`` header if a ``dcos_auth_token`` definition exists, so you don't have to specify it through the ``headers`` configuration. Note that a ``dcos_auth_token`` can be dynamically injected via an authentication task. """ @subscribesToHint(MarathonDeploymentStartedEvent) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Load config config = self.getRenderedConfig() self.url = config['url'] self.headers = config.get('headers', {}) self.pollInterval = config.get('interval', 0.5) self.failureTimeout = config.get('failureTimeout', 0) self.retries = config.get('retries', 3) eventsConfig = config.get('events', {}) self.startEventSession = EventFilter( eventsConfig.get('start', 'StartEvent')).start(None, self.handleStartEvent) self.stopEventSession = EventFilter( eventsConfig.get('stop', 'TeardownEvent')).start( None, self.handleStopEvent) self.retriesLeft = self.retries self.requestTraceIDs = {} self.requestedDeployments = set() self.requestedDeploymentTimeout = {} self.pollDelta = 0 self.connected = False self.lastGroup = {} self.reset() # Keep track of outgoing deployment requests self.eventbus.subscribe( self.handleRequest, events=(MarathonDeploymentStartedEvent, )) self.eventbus.subscribe(self.handleEvent) # Start thread self.thread = None self.active = False def handleEvent(self, event): """ Pass down event to start/stop sessions """ self.startEventSession.handle(event) self.stopEventSession.handle(event) def handleStartEvent(self, event): """ Handle request to start polling """ if not self.thread is None: return # Start polling thread self.active = True self.thread = threading.Thread(target=self.pollerThread, name="marathonpoller-poller") self.thread.start() def handleStopEvent(self, event): """ Handle request to stop polling """ if self.thread is None: return # Stop polling thread self.active = False self.thread.join() def pollerThread(self): """ The poller thread polls the marathon endpoint at fixed intervals """ while self.active: self.pollGroupsEndpoint() time.sleep(self.pollInterval) def reset(self): """ Reset the local state """ self.retriesLeft = self.retries self.connected = False self.lastGroup = {"id": "/", "apps": [], "groups": [], "pods": []} def cleanupInstanceDeployment(self, inst): """ Remove records associated to the given instance """ if inst in self.requestedDeployments: self.requestedDeployments.remove(inst) if inst in self.requestedDeploymentTimeout: del self.requestedDeploymentTimeout[inst] if inst in self.requestTraceIDs: del self.requestTraceIDs[inst] @publishesHint(MarathonDeploymentFailedEvent) def failRequestedDeployment(self, inst, reason="due to timeout"): """ Fail the specified requested deployment """ self.logger.warn('Failing deployment {} {}'.format(inst, reason)) self.eventbus.publish( MarathonDeploymentFailedEvent( None, inst, traceid=self.requestTraceIDs.get(inst, None))) self.cleanupInstanceDeployment(inst) def failAllPendingRequests(self): """ Fail all the requested deployments """ # Copy this list in order to be able to iterate on it's items # while removing items from `self.requestedDeployments` immutableList = list(self.requestedDeployments) for inst in immutableList: self.failRequestedDeployment(inst, "due to connection interrupt") def failExpiredPendingRequests(self): """ Fail all the requests that passed it's grace timeout """ ts = time.time() expire_ids = [] for inst, timeout in self.requestedDeploymentTimeout.items(): if ts >= timeout: expire_ids.append(inst) for inst in expire_ids: self.failRequestedDeployment(inst) def handleRequest(self, event): """ Keep track of the requested deployments """ self.requestTraceIDs[event.instance] = event.traceids self.requestedDeployments.add(event.instance) # Set the deployment failure timeout ts = time.time() if self.failureTimeout > 0: self.requestedDeploymentTimeout[ event.instance] = ts + self.failureTimeout @publishesHint(MarathonStartedEvent, MarathonUnavailableEvent, MarathonDeploymentSuccessEvent, MarathonGroupChangeSuccessEvent) def pollGroupsEndpoint(self): """ Poll the groups endpoint """ definitions = self.getDefinitions() # If we are missing an `Authorization` header but we have a # `dcos_auth_token` definition, allocate an `Authorization` header now # # Note: We are putting this within the loop because the `dcos_auth_token` # might appear at a later time if an authentication task is already # in progress. # headers = dict(self.headers) if not 'Authorization' in headers \ and 'dcos_auth_token' in definitions: headers['Authorization'] = 'token={}'.format( definitions['dcos_auth_token']) # Poll the endpoint group = None try: url = '{}/v2/groups?embed=group.groups&embed=group.apps&embed=group.pods&embed=group.apps.deployments'.format( self.url) self.logger.debug('Requesting {}'.format(url)) res = requests.get(url, headers=headers, verify=False) # Handle HTTP response if res.status_code < 200 or res.status_code >= 300: self.logger.warn( 'Unexpected HTTP response HTTP/{}'.format(res.status_code)) if self.connected: self.logger.debug('We are connected, ignoring for {} more tries'. format(self.retriesLeft)) self.retriesLeft -= 1 if self.retriesLeft > 0: self.logger.debug('Not taking an action') return # Don't take any action, wait for next tick else: self.retriesLeft = self.retries self.logger.debug('Resetting retries to {}'.format(self.retriesLeft)) group = res.json() except Exception as e: self.logger.error( 'Unexpected exception {}: {}'.format(type(e).__name__, str(e))) if self.connected: self.logger.debug('We are connected, ignoring for {} more tries'. format(self.retriesLeft)) self.retriesLeft -= 1 if self.retriesLeft > 0: self.logger.debug('Not taking an action') return # Don't take any action, wait for next tick # Handle connected state toggle if not self.connected and group: self.logger.info('Marathon is responding') self.connected = True self.lastGroup = group self.eventbus.publish(MarathonStartedEvent()) elif self.connected and not group: self.logger.warn('Marathon became unresponsive') self.failAllPendingRequests() self.reset() self.eventbus.publish(MarathonUnavailableEvent()) elif self.connected: (diff_instances, diff_groups) = diffRootGroups(self.lastGroup, group) self.lastGroup = group # Create one virtual deployments for every affected instance for inst in diff_instances: self.eventbus.publish( MarathonDeploymentSuccessEvent( None, [inst], traceid=self.requestTraceIDs.get(inst, None))) self.cleanupInstanceDeployment(inst) # Create virtual group deployments for grp in diff_groups: self.eventbus.publish( MarathonGroupChangeSuccessEvent( None, grp, traceid=self.requestTraceIDs.get(grp, None))) self.cleanupInstanceDeployment(grp) # Fail expired requests self.failExpiredPendingRequests()
def diffRootGroups(group_a, group_b): """ Calculate the differences in apps, pods and groups of the given two groups """ diff_instances = set() diff_groups = set() # Get app IDs from two groups apps_a = {} for app_a in group_a['apps']: apps_a[app_a['id']] = app_a apps_b = {} for app_b in group_b['apps']: apps_b[app_b['id']] = app_b # Check for changes in apps for iid in apps_a.keys(): if not iid in apps_b: diff_instances.add(iid) # Removed for iid in apps_b.keys(): if not iid in apps_a: if len(apps_b[iid].get('deployments', [])) == 0: diff_instances.add(iid) # Added & No remaining deployments for iid, app_a in apps_a.items(): if iid in apps_b: if dictDiff(app_a, apps_b[iid]): if len(apps_b[iid].get('deployments', [])) == 0: diff_instances.add(iid) # Added & No remaining deployments # Get pod IDs from two groups pods_a = {} for pod_a in group_a['pods']: pods_a[pod_a['id']] = pod_a pods_b = {} for pod_b in group_b['pods']: pods_b[pod_b['id']] = pod_b # Check for changes in pods for iid in pods_a.keys(): if not iid in pods_b: diff_instances.add(iid) for iid in pods_b.keys(): if not iid in pods_a: if len(pods_b[iid].get('deployments', [])) == 0: diff_instances.add(iid) # Added & No remaining deployments for iid, pod_a in pods_a.items(): if iid in pods_b: if dictDiff(pod_a, pods_b[iid]): if len(pods_b[iid].get('deployments', [])) == 0: diff_instances.add(iid) # Added & No remaining deployments # Get IDs from two groups groups_a = {} for group_a in group_a['groups']: groups_a[group_a['id']] = group_a groups_b = {} for group_b in group_b['groups']: groups_b[group_b['id']] = group_b # Check for changes in pods for gid in groups_a.keys(): if not gid in groups_b: diff_groups.add(gid) for gid in groups_b.keys(): if not gid in groups_a: diff_groups.add(gid) for gid, pgroup_a in groups_a.items(): if gid in groups_b: if dictDiff(pgroup_a, groups_b[gid]): diff_groups.add(gid) # For every changed group, deep into details base_groups_immutable = set(diff_groups) for group in base_groups_immutable: empty_group = {"id": group, "apps": [], "pods": [], "groups": []} if group in groups_a: if group in groups_b: (child_diff_instances, child_diff_groups) = diffRootGroups( groups_a[group], groups_b[group]) diff_instances.update(child_diff_instances) diff_groups.update(child_diff_groups) else: (child_diff_instances, child_diff_groups) = diffRootGroups( groups_a[group], empty_group) diff_instances.update(child_diff_instances) diff_groups.update(child_diff_groups) else: (child_diff_instances, child_diff_groups) = diffRootGroups( empty_group, groups_b[group]) diff_instances.update(child_diff_instances) diff_groups.update(child_diff_groups) # Return instance and group diffs return (diff_instances, diff_groups)