Source code for performance.driver.classes.policy.multivariableexplorer

import itertools
import time

from performance.driver.core.events import RunTaskEvent, isEventMatching
from performance.driver.core.classes import PolicyFSM, State
from performance.driver.core.reflection import subscribesToHint, publishesHint


[docs]class MultivariableExplorerPolicy(PolicyFSM): """ The **Multi-Variable Exploration Policy** is running one scale test for every product of the parameters defined in the ``matrix``. :: policies: - class: policy.MultivariableExplorerPolicy # The following rules describe the permutation matrix matrix: # The key is the name of the parameter to control param: ... # A "discreet" parameter can take one of the specified values apps: type: discreet values: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096] # A "sample" parameter takes any value within a numerical range size: type: sample min: 0 # Default max: 1 # Default step: 1 # Default samples: 10 # Default # A "range" parameter takes all the values within the given range instances: type: range min: 0 max: 1 step: 1 # The event binding configuration events: # Signals are events that define a terminal condition and it's status # # For example, in this case when the `MarathonDeploymentSuccessEvent` # is received, the test will be completed with status `OK` # signal: OK: MarathonDeploymentSuccessEvent FAILED: MarathonDeploymentFailedEvent ... : ... # [Optional] Wait for the given number of signal events before # considering the test complete. # # This parameter is an expression evaluated at run-time, so you # could use anything that can go within a python's `eval` statement # # For example: "discreet + 2" # signalEventCount: 1 # [Optional] Start the tests with this event is received start: EventToWaitUntilReady This policy is first computing all possible combinations of the parameter matrix given and is then running the tests for every one. The policy will start immediately when the test driver is ready unless the ``start`` event is specified. In that case the policy will wait for this event before starting with the first test. The policy continues with the next test only when a *signal* event is received. Such events are defined in the ``signal`` dictionary. Since a test can either complete successfully or fail you are expected to provide the status indication for every signal event. It is also possible to wait for more than one signal event before considering the test complete. To specify the number of events to expect you can use the ``signalEventCount`` parameter. However since the number of events to expect depends on an arbitrary number of factors, it's possible to use an expression instead of a value. For the expression you can use the names of the parameters taking part in the matrix and the special variable ``_i`` that contains the number of the test, starting from 1. For example ``signalEventCount: "apps + size/2"`` """ class Start(State): """ Entry point state """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.startEvent = False def onEnter(self): """ Reset state when entering the `Start` state """ self.parameterNames = [] self.parameterOptions = [] self.parameterValues = None self.progressTotal = 0 self.progressCurrent = 0 self.eventsRemaining = 0 self.startEvent = False # Compose permutation matrix renderdConfig = self.getRenderedConfig() matrix = renderdConfig.get('matrix') for key, config in matrix.items(): self.parameterNames.append(key) v_type = config.get('type', 'sample') # Sample of sample values values if v_type == 'sample': v_step = config.get('step', 1) v_min = config.get('min', 0.0) v_max = config.get('max', 1.0) v_samples = config.get('samples', 10) values = [] for _ in range(0, v_samples): values.append( round(random.uniform(v_min, v_max) / v_step) * v_step) self.parameterOptions.append(values) # Range values if v_type == 'range': v_step = config.get('step', 1) v_min = config.get('min', 0.0) v_max = config.get('max', 1.0) values = range(v_min, v_max + v_step, v_step) self.parameterOptions.append(values) # Discreet values elif v_type == 'discrete': self.parameterOptions.append(config['values']) # Invalid values else: raise ValueError('Unknown matrix value type "{}"'.format(v_type)) # Process the events configuration eventsConfig = renderdConfig.get('events', {}) # Prepare signal events in the correct structure self.signalEvents = {} signalEventConfig = eventsConfig.get('signal', {}) if not signalEventConfig: raise ValueError( '`events.signal` configuration must have at least 1 event defined') for status, eventName in signalEventConfig.items(): if not type(eventName) in (tuple, list): eventName = [eventName] for event in eventName: self.signalEvents[event] = status # Get the event count expression self.signalEventCount = eventsConfig.get('signalEventCount', 1) # If we don't have a startup event, go directly to `Run` # Otherwise, wait for it self.startEvent = eventsConfig.get('start', False) if self.startEvent == False: self.goto(MultivariableExplorerPolicy.Run) return self.logger.info('Waiting until the system is ready') def onEvent(self, event): """ If we have a `startEvent` defined, wait until the event is received before switching into driving the policy """ if self.startEvent == False: return if isEventMatching(event, self.startEvent): self.goto(MultivariableExplorerPolicy.Run) def onRestartEvent(self, event): """ When the tests are re-started, marathon is already running, so only wait for the restart signal before switching to `Run` state. """ self.goto(MultivariableExplorerPolicy.Run) class Run(State): """ Initialize test cases and prepare for deployment """ def onEnter(self): """ Initialize test cases and start deployment """ self.parameterValues = list(itertools.product(*self.parameterOptions)) self.progressTotal = len(self.parameterValues) self.progressCurrent = 0 self.logger.info('Exploring {} variables with {} permutations'.format( len(self.parameterNames), self.progressTotal)) self.goto(MultivariableExplorerPolicy.Deploy) class Deploy(State): """ Deploy a configuration """ def onEnter(self): """ Pick next run value """ # If we ran out of values, go to sink if len(self.parameterValues) == 0: self.goto(MultivariableExplorerPolicy.End) return # Fetch the next case to process self.progressCurrent += 1 values = self.parameterValues.pop(0) parameters = dict(zip(self.parameterNames, list(values))) # If we have to wait as many events as the value, update # `eventsRemaining` accordingly evalVars = dict(parameters) evalVars['_i'] = self.progressCurrent try: self.eventsRemaining = eval(str(self.signalEventCount), {}, evalVars) except Exception as e: self.logger.error( 'Error while parsing the `signalEventcount` expression') raise e # Dispatch the request to update the test parameter. All such updates # are batched together into a single event in the bus at the end of the # stack, but they will all share the same trace ID self.traceid = self.setParameters(parameters) # We will be using the trace ID to find out which events are cascade # children of the initial request self.goto(MultivariableExplorerPolicy.Waiting) self.logger.info('Initiating a test sequence') class Waiting(State): """ Waiting for the test to complete """ @publishesHint(RunTaskEvent) def onEvent(self, event): """ Process all relevant events """ if not event.hasTrace(self.traceid): for ev, status in self.signalEvents.items(): if isEventMatching(event, ev): self.logger.error('Untracked terminal event!!!') return # Check if this event is a success or a failure trigger isHandled = False for ev, status in self.signalEvents.items(): if isEventMatching(event, ev): isHandled = True self.logger.info('Run completed with status: {}'.format(status)) self.setStatus(status) break # If the event is not handled, ignore it if not isHandled: return # Check if we ran out of events that we are waiting for self.eventsRemaining -= 1 if self.eventsRemaining > 0: self.logger.info( 'Waiting for {} more events'.format(self.eventsRemaining)) return # Run the inter-test tasks. Upon completion the # onRunTaskCompleted event will be dispatched self.eventbus.publish(RunTaskEvent('intertest')) @publishesHint(RunTaskEvent) def onStalledEvent(self, event): """ If the FSM doesn't change state within a given thresshold, it's considered stale and it should be reaped cleanly. This handler will mark the status as "Stalled" and go to next test. """ self.logger.warn( 'No activity while waiting for a marathon deployment to succeed') self.logger.debug( 'This means that either marathon failed to deploy the request ' 'on time, or that you haven\'t registered an observer that emmits a ' '`MarathonDeploymentSuccessEvent`.') # Set error status self.setStatus('stalled') # Run the inter-test tasks. Upon completion the # onRunTaskCompleted event will be dispatched self.eventbus.publish(RunTaskEvent('intertest')) def onRunTaskCompletedEvent(self, event): """ This event is received when the `intertest` task completes execution. Since this task might take long time to complete we are waiting for it's completion before switching to next deployment cycle. """ # Ignore all other events if event.task != 'intertest': return # Schedule next deployment self.goto(MultivariableExplorerPolicy.Deploy) class End(State): """ Sink state """