Source code for performance.driver.core.eventfilters

import re
import logging
from threading import Timer

from import isEventMatching
from performance.driver.core.utils import parseTimeExpr

DSL_TOKENS = re.compile(r'(\*|\w+)(?:\[(.*?)\])?(\:(?:\w[\:\w\(\),]*))?')
DSL_ATTRIB = re.compile(r'(?:^|,)([\w\.\']+)([=~!><]+)([^,]+)')
DSL_FLAGS = re.compile(r'\:([^\:]+)')

REPLACE_DICT = re.compile(r"\.\'(.*?)\'")

global_single_events = {}

def tokenizeExpression(expression):
  Tokenize expression

  # The test boolean value all test conditions should be test against
  # (This helps us implement the negation opeartion below)
  testTrue = True

  # In case the expression is a negation pop it now
  if expression.startswith(':not('):
    if not expression.endswith(')'):
      raise ValueError(
          'Unable to find closing parenthesis on "{}"'.format(expression)
    testTrue = False
    expression = expression[5:-1]

  # Find all the events to match against
  matches = DSL_TOKENS.findall(expression)
  if not matches:
    raise ValueError(
        'The given expression "{}" is not a valid event filter DSL'.format(

  # Process event matches
  events = []
  for (event, exprAttrib, flags) in matches:

    # Calculate event id
    eid = event + ':' + exprAttrib + ':' + flags

    # Process sub-tokens
    flags = list(map(lambda x: x.lower(), DSL_FLAGS.findall(str(flags))))

    # Flag parameters
    flagParameters = {}
    for i in range(0, len(flags)):
      flag = flags[i]
      if '(' in flag:
        (flag, params) = flag.split('(')
        if not params.endswith(')'):
          raise ValueError(
              'Mismatched closing parenthesis in flag {}'.format(flags[i]))
        flags[i] = flag
        flagParameters[flag] = params[:-1]

    # Compile attribute selectors
    attrib = []
    if exprAttrib:
      for (left, op, right) in DSL_ATTRIB.findall(exprAttrib):

        # Expand .'xx' to ['xxx']
        left = REPLACE_DICT.sub(lambda x: '["{}"]'.format(, left)

        # Shorthand some ops
        if op == "=":
          op = "=="

        # Handle loose regex match
        if op == "~=":
              eval('lambda event: not{})) is None'.
                   format(left), {'regex': re.compile(right)}))

        # Handle exact regex match
        elif op == "~==":
              eval('lambda event: not regex.match(str(event.{})) is None'.
                   format(left), {'regex': re.compile(right)}))

        # Handle `in` operator
        elif op == "<~":
          attrib.append(lambda event: right in list(getattr(event, left)))

        # Handle operator match
          if not right.isnumeric() and not right[0] in ('"', "'"):
            right = '"{}"'.format(right.replace('"', '\\"'))
              eval('lambda event: event.{} {} {}'.format(left, op, right)))

    # Collect flags
    events.append((event, attrib, flags, flagParameters, eid, testTrue))

  # Return events
  return events

class EventFilterSession:
  An event filter session

  def __init__(self, filter, traceids, callback):
    self.foundEvent = None
    self.triggerAtExit = False
    self.filter = filter
    self.traceids = traceids
    self.callback = callback
    self.timer = None
    self.counterGroups = {}
    self.logger = logging.getLogger('EventFilter<{}>'.format(filter.expression))

    # Immediately call-back to matched filters
    for (eventSpec, attribChecks, flags, flagParameters,
         eid, testTrue) in
      if 'single' in flags:
        if eid in global_single_events:

  def afterTimerCallback(self):
    Callback for the :after(x) selector
    self.timer = None

  def handle(self, event):
    Handle the incoming event
    for (eventSpec, attribChecks, flags, flagParameters,
         eid, TRUE) in

      # (NOTE: `TRUE` is True when the filter is positive, or False if it's
      # negated using a :not(..) expression)

      # Handle all events or matching events
      if TRUE == (eventSpec != "*" and not isEventMatching(event, eventSpec)):

      # Handle attributes
      attribCheckFailed = False
      for attribCheckFn in attribChecks:
          if not attribCheckFn(event):
            attribCheckFailed = True
        except KeyError:
          attribCheckFailed = True
      if TRUE == attribCheckFailed:

      # Handle trace ID
      if TRUE == (not self.traceids is None and not 'notrace' in flags and not

      # Handle order
      if 'first' in flags:
        if self.foundEvent is None:
          self.foundEvent = event
      if 'last' in flags:
        self.foundEvent = event
        self.triggerAtExit = True
      if 'single' in flags:
        if eid in global_single_events:
        self.foundEvent = event
        global_single_events[eid] = event
      if 'after' in flags:
        time = parseTimeExpr(flagParameters['after'])
        if time is None:
          raise ValueError(
              'Event selector `:after({})` contains an invalid time expression'.

        # Restart timer to call the callback after the given delay
        if self.timer:
        self.logger.debug('(Re)Starting timer after {} sec'.format(time))
        self.foundEvent = event
        self.timer = Timer(time, self.afterTimerCallback)
      if 'nth' in flags:
        parts = flagParameters['nth'].split(',')
        nth = int(parts[0])
        grp = eid
        if len(parts) > 1:
          grp = parts[1]
        if not grp in self.counterGroups:
          self.counterGroups[grp] = 0
        self.counterGroups[grp] += 1
        self.logger.debug('Found {} hits on group {} ({} requested)'.format(
          self.counterGroups[grp], grp, nth))
        if nth == self.counterGroups[grp]:
          self.logger.debug('Found all {} hits found'.format(nth))
          self.foundEvent = event

      # Fire callback

  def finalize(self):
    Called when a tracking session is finalised

    # If we have an active :after() timer, flush it now
    if self.timer:

    # Submit the last event
    if self.triggerAtExit and self.foundEvent:

  def __str__(self):
    return '<Session[{}], traceid={}>'.format(self.filter.expression,

  def __repr__(self):
    return '<Session[{}], traceid={}>'.format(self.filter.expression,

[docs]class EventFilter: """ Various trackers in *DC/OS Performance Test Driver* are operating purely on events. Therefore it's some times needed to use a more elaborate selector in order to filter the correct events. The following filter expression is currently supported and are closely modeled around the CSS syntax: .. code-block:: css EventName[attrib1=value,attrib2=value,...]:selector1:selector2:... Where: * _Event Name_ is the name of the event or ``*`` if you want to match any event. * _Attributes_ is a comma-separated list of ``<attrib> <operator> <value>`` values. For example: ``method==post``. The following table summarises the different operators you can use for the attributes. +-----------------+----------------------------------------------------+ | Operator | Description | +=================+====================================================+ | ``=`` or ``==`` | Equal (case sensitive for strings) | +-----------------+----------------------------------------------------+ | ``!=`` | Not equal | +-----------------+----------------------------------------------------+ | ``>``, ``>=`` | Grater than / Grater than or equal | +-----------------+----------------------------------------------------+ | ``<``, ``<=`` | Less than / Less than or equal | +-----------------+----------------------------------------------------+ | ``~=`` | Partial regular expression match | +-----------------+----------------------------------------------------+ | ``~==`` | Exact regular expression match | +-----------------+----------------------------------------------------+ | ``<~`` | Value in list or key in dictionary (like ``in``) | +-----------------+----------------------------------------------------+ * _Selector_ specifies which event out of many similar to chose. Valid selectors are: +-----------------+----------------------------------------------------+ | Selector | Description | +=================+====================================================+ | ``:first`` | Match the first event in the tracking session | +-----------------+----------------------------------------------------+ | ``:last`` | Match the last event in the tracking session | +-----------------+----------------------------------------------------+ | ``:nth(n)`` | Match the n-th event in the tracking session. If | | ``:nth(n,grp)`` | a ``grp`` parameter is specified, the counter will | | | be groupped with the given indicator. | +-----------------+----------------------------------------------------+ | ``:single`` | Match a single event, globally. After the first | | | match all other usages accept by default. | +-----------------+----------------------------------------------------+ | ``:after(Xs)`` | Trigger after X seconds after the last event | +-----------------+----------------------------------------------------+ | ``:notrace`` | Ignore the trace ID matching and accept any event, | | | even if they do not belong in the trace session. | +-----------------+----------------------------------------------------+ For example, to match every ``HTTPRequestEvent``: .. code-block:: css HTTPRequestEvent Or, to match every POST ``HTTPRequestEvent``: .. code-block:: css HTTPRequestEvent[method=post] Or, to match the last ``HTTPResponseEndEvent`` .. code-block:: css HTTPResponseEndEvent:last Or, to match the ``HTTPRequestStartEvent`` that contains the string "foo": .. code-block:: css HTTPResponseEndEvent[body~=foo] Or match any first event: .. code-block:: css *:first """ def __init__(self, expression): self.expression = expression = tokenizeExpression(expression) def start(self, traceids, callback): """ Start a tracking session with the given trace ID """ # Make sure trace IDs is always a list if (traceids != None) and (type(traceids) not in (list, set, tuple)): traceids = set([traceids]) return EventFilterSession(self, traceids, callback) def __str__(self): return '<Filter[{}]>'.format(self.expression) def __repr__(self): return '<Filter[{}]>'.format(self.expression)