Source code for performance.driver.classes.tracker.duration

import logging
import time
import queue

from performance.driver.core.classes import Tracker
from performance.driver.core.events import ParameterUpdateEvent, RestartEvent, TeardownEvent, isEventMatching
from performance.driver.core.eventfilters import EventFilter

class DurationTrackerSession:
  """
  A tracking session
  """

  def __init__(self, tracker, traceids):
    self.logger = logging.getLogger('DurationTrackerSession')
    self.startFilter = tracker.startFilter.start(traceids, self.handleStart)
    self.endFilter = tracker.endFilter.start(traceids, self.handleEnd)
    self.tracker = tracker
    self.traceids = set(traceids)
    self.startLookup = {}
    self.endQueue = queue.Queue()
    self.startQueue = queue.Queue()
    self.consumedEvents = set()
    self.fired = False

  def handleStart(self, event):

    # Update traceid-specific lookup table
    for traceid in event.traceids:
      if not traceid in self.startLookup:
        self.startLookup[traceid] = queue.Queue()
      self.startLookup[traceid].put(event)

    # And also track the events in the order they appear,
    # in case the events used do not provide
    self.startQueue.put(event)
    self.checkEvents()

  def handleEnd(self, event):
    self.endQueue.put(event)
    self.checkEvents()

  def checkEvents(self):
    if not self.startLookup:
      return
    if self.endQueue.empty():
      return

    retryEvents = []
    while not self.endQueue.empty():
      endEvent = self.endQueue.get()
      startEvent = None

      # Try to find a most-specific start event, starting from the latest
      # trace ID and advancing to the earliest trace ID
      for traceid in reversed(sorted(endEvent.traceids)):
        if traceid in self.startLookup:
          startEvent = self.startLookup[traceid].get()
          if self.startLookup[traceid].empty():
            del self.startLookup[traceid]
          self.consumedEvents.add(startEvent)
          break

      # If we haven't found a start event, keep this event and retry later
      if startEvent is None:
        retryEvents.append(endEvent)
        continue

      # Track metric
      self.tracker.trackMetric(self.tracker.metric, endEvent.ts - startEvent.ts,
                               self.traceids)

    # Put back all retry events in the queue
    for event in retryEvents:
      self.endQueue.put(event)

  def finalizeLingeringEvents(self):
    """
    This method is called when there were left-over events in the `endQueue`
    after the end of the run. This means that the end events that we received
    do not inherit the traceIDs from the start events.

    In some situations this might be accepted, therefore in this method we are
    going to try and re-construct the sequence of these events and try to
    extract the correct metrics.
    """

    # Drain start events queue in the `startEvents` list
    startEvents = []
    while not self.startQueue.empty():
      event = self.startQueue.get()
      if event in self.consumedEvents:
        continue
      startEvents.append(event)

    # Drain end events queue in the `endEvents` list
    endEvents = []
    while not self.endQueue.empty():
      event = self.endQueue.get()
      endEvents.append(event)

    # Sort both lists by timestamp
    startEvents = sorted(startEvents, key=lambda e: e.ts)
    endEvents = sorted(endEvents, key=lambda e: e.ts)

    # While we have start-end events in the queue, start collecting duration
    # metrics from them.
    while startEvents and endEvents:
      startEvent = startEvents.pop(0)
      endEvent = endEvents.pop(0)

      # Track metric
      self.tracker.trackMetric(self.tracker.metric, endEvent.ts - startEvent.ts,
                               self.traceids)

    # If there are still left-over traces, warn
    if startEvents:
      self.logger.warn('Incomplete duration traces for {} ({} without end)'.format(
          self.tracker.metric, len(startEvents)))
    if endEvents:
      self.logger.warn('Incomplete duration traces for {} ({} without start)'.format(
          self.tracker.metric, len(endEvents)))

  def handle(self, event):
    self.startFilter.handle(event)
    self.endFilter.handle(event)

  def finalize(self):
    self.startFilter.finalize()
    self.endFilter.finalize()
    self.finalizeLingeringEvents()

    # We are done, clear unused structures
    self.consumedEvents = set()
    self.startLookup = {}


[docs]class DurationTracker(Tracker): """ Tracks the duration between a ``start`` and an ``end`` event. :: trackers: - class: tracker.DurationTracker # The metric where to write the measured value to metric: someMetric # The relevant events events: # The event to start counting from # (This can be a filter expression) start: StartEventFilter # The event to stop counting at # (This can be a filter expression) end: EndEventFilter This tracker always operates within a tracking session, initiated by a ``ParameterUpdateEvent`` and terminated by the next ``ParameterUpdateEvent``, or the completion of the test. .. important:: The ``start`` and ``end`` events must contain the trace IDs of the originating ``ParameterUpdateEvent``. Otherwise they won't be measured. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.traces = [] self.traceIndex = {} self.activeTrace = None config = self.getRenderedConfig() self.startFilter = EventFilter(config['events']['start']) self.endFilter = EventFilter(config['events']['end']) self.metric = config['metric'] self.eventbus.subscribe(self.handleEvent, order=100) def handleEvent(self, event): """ Handle an event from the event bus and process tracked events by calculating the time of the first event and the time of the lsat event """ # A terminal event terminates an active trace eventType = type(event) if eventType in (RestartEvent, TeardownEvent): for trace in self.traces: trace.finalize() self.traces = [] self.traceIndex = {} self.activeTrace = None return # Each parameter update initiates a trace of interest if eventType is ParameterUpdateEvent: # Start a new session tracker self.activeTrace = DurationTrackerSession(self, event.traceids) self.traces.append(self.activeTrace) for trace in event.traceids: self.traceIndex[trace] = self.activeTrace # Update relevant traces handled = [] for id in event.traceids: if not id in self.traceIndex: continue trace = self.traceIndex[id] if trace in handled: continue trace.handle(event) handled.append(trace)