Source code for performance.driver.classes.reporter.rawevents

import os
import json
import queue

from threading import Thread
from requests.structures import CaseInsensitiveDict
from performance.driver.core.classes import Reporter
from performance.driver.core.events import TickEvent, TeardownEvent


class JSONNormalizerEncoder(json.JSONEncoder):
  """
  Normalize stuff that cannot be serialized
  """

  def default(self, obj):

    # CaseInsensitiveDict needs to become a dict
    if type(obj) is set:
      obj = list(obj)
    elif type(obj) is CaseInsensitiveDict:
      obj = dict(obj.items())

    return json.JSONEncoder.encode(self, obj)


[docs]class RawEventsReporter(Reporter): """ The **Raw Events Reporter** is dumping every event in the eventBus to a file that can be used for offline event processing. You can also use this reporter for debugging the performance driver internals. :: reporters: - class: reporter.RawEventsReporter # [Optional] Set to `yes` to track TickEvents # Note that including tick events might introduce a lot of noise to # your data and/or increase the reporting impact. tickEvents: no # Where to dump the events filename: "events.dump" The log file is encoded with the following rules:: 1. The events are encoded in plain-text 2. Each event is separated with a new line 3. Each line contains two columns separated with semicolon 4. The first column contains the unix timestamp of the event 5. The second column contains the name of the event 6. The third column contains the field values for the event encoded as a JSON string. For example: .. code-block:: js // Timestamp // Name // Properties // 1500891843.976068;SomeEventName;{"prop":"value", ...} ... This format allows for simple grepping and more elaborate parsing. For example .. code-block:: bash cat event.dump | grep ';TickEvent;' | wc -l """ def __init__(self, *args): super().__init__(*args) config = self.getRenderedConfig() self.filename = config.get('filename', 'events.dump') # Create missing directory for the files os.makedirs(os.path.abspath(os.path.dirname(self.filename)), exist_ok=True) self.tickEvents = config.get('tickEvents', False) self.queue = queue.Queue() self.thread = Thread(target=self.reportingThread, name="rawevents-reporter") self.active = True # Start reporter thread self.logger.debug("Starting reporting thread") self.thread.start() # Subscribe to all events, as the last subscriber self.eventbus.subscribe(self.handleEvent, order=10) def reportingThread(self): """ A dedicated thread that writes down the events to the file """ with open(self.filename, 'w') as file: while self.active: # Pop next event from the queue event = self.queue.get() # None event exits the thread if event is None: break # Write down the event to file file.write("{:f};{};{}\n".format( event.ts, type(event).__name__, json.dumps(event.__dict__, cls=JSONNormalizerEncoder))) self.logger.debug("Reporting thread exited") def handleEvent(self, event): """ Serialize and dump event """ # Ignore tick events if not explicitly configured if type(event) is TickEvent and not self.tickEvents: return # Otherwise put the event in the queue for processing # by the thread. self.queue.put(event) def dump(self, summarizer): """ Implementation requirement from the Reporter base class. This method is called when the tests have finished. """ # (We have been reporting all this time, now it's time to stop) # Stop thread self.logger.info("Waiting for event reporting thread to complete") self.active = False self.queue.put(None) # Join self.thread.join() self.thread = None