Source code for performance.driver.core.events

import re
import time
import uuid

# Regex to strip ANSI sequences from the log lines
ANSI_SEQUENCE = re.compile(r'\x1b[^m]*m')

# Event matching cache for speed-up
MATCHER_CACHE = {}

# Monotonically increasing trace ID
LAST_TRACE_ID = 0


def isEventClassMatchingName(eventClass, className):
  """
  Check if the `eventClass` name or it's parent classes equals to `className`
  """
  if eventClass.__name__ == className:
    return True

  return any(
      map(lambda c: isEventClassMatchingName(c, className),
          eventClass.__bases__))


def isEventMatching(eventInstance, eventCheck):
  """
  Check if the `eventCheck` is validating the `eventInstance`
  """
  global MATCHER_CACHE

  # Check cache
  eventType = type(eventInstance)
  if eventType in MATCHER_CACHE:
    if eventCheck in MATCHER_CACHE[eventType]:
      return MATCHER_CACHE[eventType][eventCheck]

  # Apply checks
  if type(eventCheck) is str:
    ans = isEventClassMatchingName(eventType, eventCheck)
  else:
    ans = isinstance(eventInstance, eventCheck)

  # Cache response
  if not eventType in MATCHER_CACHE:
    MATCHER_CACHE[eventType] = {}
  MATCHER_CACHE[eventType][eventCheck] = ans

  return ans


def allocateEventId():
  """
  Allocate a new monotonically increasing ID in order to avoid using string IDs
  that are less performant on resolving.
  """
  global LAST_TRACE_ID

  # Increment trace ID and return it
  LAST_TRACE_ID += 1
  return LAST_TRACE_ID


[docs]class Event: """ Base event The `traceid` parameter is a unique string or object that is carried along related events and is used to group them together to the same operation. """ def __init__(self, traceid=None, ts=None): self._cachedMatchers = {} self.event = type(self).__name__ if ts is None: self.ts = time.time() else: self.ts = ts # Allocate a unique trace ID for this event self.traceids = set([allocateEventId()]) # Enrich with the given trace IDs if type(traceid) in (tuple, list, set): self.traceids.update(set(traceid)) elif not traceid is None: self.traceids.add(traceid)
[docs] def hasTrace(self, traceid): """ Check if the event was emmited from the given ID """ return traceid in self.traceids
[docs] def hasTraces(self, traceids): """ Check if at least one of the given trace ids are in the traceids """ for trace in traceids: if trace in self.traceids: return True return False
[docs] def toDict(self): """ Return dict representation of the event """ inst = dict(self.__dict__) # Remove private keys del inst['_cachedMatchers'] # Map sets to lists inst['traceids'] = list(inst['traceids']) # Return dict return inst
def __str__(self): """ Return a string representation of the event """ return '{}[trace={}]'.format(self.event, ','.join(map(str, self.traceids)))
[docs]class StartEvent(Event): """ A start event is dispatched when the test configuration is loaded and the environment is ready, in order to start the policies. """
[docs]class RestartEvent(Event): """ A restart event is dispatched in place of StartEvent when more than one test loops has to be executed. """
[docs]class TeardownEvent(Event): """ A teardown event is dispatched when all policies are completed and the system is about to be torn down. """
[docs]class InterruptEvent(Event): """ An interrupt event is dispatched when a critical exception has occurred or when the user has instructed to interupt the tests via a keystroke """
[docs]class StalledEvent(Event): """ An stalled event is dispatched from the session manager when an FSM has stuck to a non-terminal state for longer than expected time. """
[docs]class RunTaskEvent(Event): """ This event is dispatched when a policy requires the session to execute a task """ def __init__(self, task): super().__init__() self.task = task
[docs]class RunTaskCompletedEvent(Event): """ This event is displatched when a task is completed. This is useful if you want to keep track of a lengthy event """ def __init__(self, previousEvent, exception=None): super().__init__(traceid=previousEvent.traceids) self.task = previousEvent.task self.exception = exception
[docs]class TickEvent(Event): """ A clock event is dispatched periodically by the event bus """ def __init__(self, count, delta, *args, **kwargs): super().__init__(*args, **kwargs) self.count = count self.delta = delta
[docs]class ParameterUpdateEvent(Event): """ A parameter change request """ def __init__(self, newParameters, oldParameters, changes, *args, **kwargs): super().__init__(*args, **kwargs) self.parameters = newParameters self.oldParameters = oldParameters self.changes = changes
class MetricUpdateEvent(Event): """ A metric has changed value """ def __init__(self, name, value, axis, *args, **kwargs): super().__init__(*args, **kwargs) self.name = name self.value = value self.axis = axis
[docs]class FlagUpdateEvent(Event): """ A flag has changed for this run """ def __init__(self, name, value, *args, **kwargs): super().__init__(*args, **kwargs) self.name = name self.value = value
[docs]class MetricUpdateEvent(Event): """ A metric has changed """ def __init__(self, name, value, *args, **kwargs): super().__init__(*args, **kwargs) self.name = name self.value = value
[docs]class ObserverEvent(Event): """ A metric change is observed """ def __init__(self, metric, *args, **kwargs): super().__init__(*args, **kwargs) self.metric = metric
[docs]class ObserverValueEvent(ObserverEvent): """ A metric has changed to a new value """ def __init__(self, metric, value, *args, **kwargs): super().__init__(metric, *args, **kwargs) self.value = value
[docs]class LogLineEvent(Event): """ A log line from an observer """ def __init__(self, line, source, kind=None, *args, **kwargs): super().__init__(*args, **kwargs) self.line = ANSI_SEQUENCE.sub('', line) self.source = source self.kind = kind def __str__(self): return '{}<{}>'.format(super().__str__(), self.line)