Source code for performance.driver.classes.reporter.raw

import os
import json
import datetime

from performance.driver.core.classes import Reporter
from performance.driver.core.eventfilters import EventFilter
from performance.driver.core.events import StartEvent, ParameterUpdateEvent


[docs]class RawReporter(Reporter): """ The **Raw Reporter** is creating a raw dump of the results in the results folder in JSON format. :: reporters: - class: reporter.RawReporter # Where to dump the results filename: "results-raw.json" # [Optional] Include event traces events: # [Optional] Include events that pass through the given expression include: FilterExpression # [Optional] Exclude events that pass through the given expression exclude: FilterExpression # [Optional] Group the events to their traces traces: yes The JSON structure of the data included is the following: .. code-block:: js { // Timing information "time": { "started": "", "completed": "" }, // The configuration used to run this test "config": { ... }, // The values for the indicators "indicators": { "indicator": 1.23, ... }, // The metadata of the run "meta": { "test": "1-app-n-instances", ... }, // Raw dump of the timeseries for every phase "raw": [ { // One or more status flags collected in this phase "flags": { "status": "OK" }, // The values of all parameter (axes) in this phase "parameters": { "apps": 1, "instances": 1 }, // The time-series values for every phase "values": { "metricName": [ // Each metric is composed of the timestamp of it's // sampling time and the value [ 1499696193.822527, 11 ], ... ] } } ], // Summarised dump of the raw timeseries above, in the same // structure "sum": [ { // One or more status flags collected in this phase "flags": { "status": "OK" }, // The values of all parameter (axes) in this phase "parameters": { "apps": 1, "instances": 1 }, // The summarised values of each timeseries "values": { "metricName": { // Here are the summarisers you selected in the `metric` // configuration parameter. "sum": 123.4, "mean": 123.4, ... } } } ] } """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.timeStarted = datetime.datetime.now().isoformat() # Do some delayed-initialization when the system is ready self.eventbus.subscribe(self.handleStartEvent, order=10, events=(StartEvent, )) # Event-tracing configuration self.includeFilter = None self.excludeFilter = None self.eventTraces = {} def handleStartEvent(self, event): """ Start tracing, if requested """ # Check the config and subscribe config = self.getRenderedConfig() if 'events' in config: # Get events config eventsConfig = config.get('events') if not type(eventsConfig) is dict: eventsConfig = {} # Config include/exclude filter includeExpr = eventsConfig.get('include', '*') self.logger.info("Events collected: {}".format(includeExpr)) self.includeFilter = EventFilter(includeExpr).start(None, self.handleInclude) if 'exclude' in eventsConfig: # TODO: When we have negation on the EventFilter fix this raise ValueError('Exclude filter is currently not supported') # Start subscription to all events self.eventbus.subscribe(self.handleEvent, order=10) def handleInclude(self, event): """ Handle events passing through the include filter """ # TODO: When we have negation on the EventFilter handle negative matches # Locate the tracing bin where to place this event for i in event.traceids: if i in self.eventTraces: if not event in self.eventTraces[i]: self.eventTraces[i].add(event) return def handleEvent(self, event): """ Handle incoming event """ # A ParameterUpdate event starts a new trace if type(event) is ParameterUpdateEvent: trace = min(filter(lambda x: type(x) is int, event.traceids)) self.eventTraces[trace] = set([event]) # Every other event passes through the include filter self.includeFilter.handle(event) def dump(self, summarizer): """ Dump summarizer values to the csv file """ # Get the fiename to write into config = self.getRenderedConfig() filename = config.get('filename', 'results-raw.json') # Create missing directory for the files os.makedirs(os.path.abspath(os.path.dirname(filename)), exist_ok=True) # Prepare results object results = { 'time': { 'started': self.timeStarted, 'completed': datetime.datetime.now().isoformat() }, 'config': self.getRootConfig().config, 'raw': summarizer.raw(), 'sum': summarizer.sum(), 'indicators': summarizer.indicators(), 'meta': self.getMeta() } # Collect results if self.eventTraces: traces = [] for traceEvents in self.eventTraces.values(): root = next(filter( lambda x: type(x) is ParameterUpdateEvent, traceEvents)) events = [] # Serialize events for event in traceEvents: events.append(event.toDict()) # Compose trace record traces.append({ 'parameters': root.parameters, 'events': events }) # Put traces on the result results['events'] = traces # Dump the results self.logger.info("Saving raw results on {}".format(filename)) with open(filename, 'w') as f: f.write(json.dumps(results, sort_keys=True, indent=2))