Source code for performance.driver.classes.tracker.count

import logging
import time

from performance.driver.core.classes import Tracker
from performance.driver.core.events import ParameterUpdateEvent, RestartEvent, TeardownEvent, isEventMatching
from performance.driver.core.eventfilters import EventFilter
from queue import Queue, Empty
from threading import Lock


class CountTrackerSession:
  """
  A tracking session
  """

  def __init__(self, tracker, event, stepExpression):
    self.queue = Queue()
    self.logger = logging.getLogger('CountTrackerSession')
    self.eventFilter = tracker.eventFilter.start(event.traceids, self.handleEvent)
    self.tracker = tracker
    self.traceids = set(event.traceids)
    self.stepExpression = stepExpression
    self.eventParameters = event.parameters

    self.counter = 0
    self.mutex = Lock()

  def handleEvent(self, event):
    env = {'event': event}
    env.update(self.tracker.getDefinitions())
    env.update(self.eventParameters)

    with self.mutex:
      self.counter += eval(self.stepExpression, env)

  def handle(self, event):
    self.eventFilter.handle(event)

  def finalize(self):
    self.eventFilter.finalize()

    # Track metric
    self.tracker.trackMetric(self.tracker.metric, self.counter, self.traceids)


[docs]class CountTracker(Tracker): """ Tracks the occurrences of an ``event`` within the tracking session. :: trackers: - class: tracker.CountTracker # The metric where to write the measured value to metric: someMetric # The event to count # (This can be a filter expression) events: SomeEvent # [Optional] The increment step (this can be a python expression) step: 1 This tracker always operates within a tracking session, initiated by a ``ParameterUpdateEvent`` and terminated by the next ``ParameterUpdateEvent``, or the completion of the test. .. important:: The ``event`` must contain the trace IDs of the originating ``ParameterUpdateEvent``, otherwise the events won't be measured. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.traces = [] self.traceIndex = {} self.activeTrace = None config = self.getRenderedConfig() self.eventFilter = EventFilter(config['events']) self.metric = config['metric'] self.stepExpression = config.get('step', '1') self.eventbus.subscribe(self.handleEvent) def handleEvent(self, event): """ Handle an event from the event bus and process tracked events by calculating the time of the first event and the time of the lsat event """ # A terminal events terminates an active trace eventType = type(event) if eventType in (RestartEvent, TeardownEvent): for trace in self.traces: trace.finalize() self.traces = [] self.traceIndex = {} self.activeTrace = None return # Each parameter update initiates a trace of interest if type(event) is ParameterUpdateEvent: # Start a new session tracker self.activeTrace = CountTrackerSession(self, event, self.stepExpression) self.traces.append(self.activeTrace) for trace in event.traceids: self.traceIndex[trace] = self.activeTrace # Update relevant traces handled = [] for id in event.traceids: if not id in self.traceIndex: continue trace = self.traceIndex[id] if trace in handled: continue trace.handle(event) handled.append(trace)