Source code for performance.driver.classes.reporter.datadog

import time
import socket

from performance.driver.core.classes import Reporter
from performance.driver.core.events import MetricUpdateEvent, TeardownEvent
from performance.driver.core.reflection import subscribesToHint
from performance.driver.core.utils import parseTimeExpr

# NOTE: The following block is needed only when sphinx is parsing this file
#       in order to generate the documentation. It's not really useful for
#       the logic of the file itself.
try:
  from datadog import initialize, api
except ImportError:
  import logging
  logging.error('One or more libraries required by DataDogReporter were not'
                'installed. The reporter will not work.')


[docs]class DataDogMetricReporter(Reporter): """ The **DataDog Metric Reporter** uploads the raw metric values to DataDog the moment they are collected. :: reporters: - class: reporter.DataDogMetricReporter # The API Key to use api_key: 1234567890abcdef # [Optional] The hostname to use as the agent name in datadog # If missing the network name of the machine will be used hostname: test.host # [Optional] Prefix of the metrics (Defaults to `dcos.perf.`) prefix: "dcos.perf." # [Optional] How frequently to flush the metrics to DataDog flushInterval: 5s # [Optional] Report configuration. If missing, the default behavior # is to report the summarized metrics at the end of the test. report: # [Optional] The string `all` indicates that all the metrics should # be submitted to DataDog metrics: all # [Optional] OR a list of metric names can be provided metrics: - metricA - metricB # [Optional] OR you can use a dictionary to provide an alias metrics: metricA: aliasedMetricA metricB: aliasedMetricB # [Optional] Set to `yes` to submit the raw values the moment they # are collected. raw: yes # [Optional] Set to `yes` to submit summarized values of the metrics # at the end of the test run. summarized: yes # [Optional] The string `all` indicates that all the indicators should # be submitted to DataDog indicators: all # [Optional] OR a list of indicator names can be provided indicators: - indicatorA - indicatorB # [Optional] OR you can use a dictionary to provide an alias indicators: indicatorA: aliasedIndicatorA indicatorB: aliasedIndicatorB The DataDog reporter is using the DataDog API to submit the values of the test metrics to DataDog in real-time. """ @subscribesToHint(MetricUpdateEvent) def start(self): """ Initialize when the tests are starting """ config = self.getRenderedConfig() self.series = [] # Initialize DataDog API initialize( api_key=config.get('api_key', None), hostname=config.get('hostname', socket.gethostname())) # Get some configuration options reportConfig = config.get('report', {}) self.prefix = config.get('prefix', 'dcos.perf.') self.flushInterval = parseTimeExpr(config.get('flushInterval', '5s')) self.metrics = reportConfig.get('metrics', None) self.submitSummarized = reportConfig.get('summarized', True) self.indicators = reportConfig.get('indicators', None) self.submitIndicators = self.indicators != None # Handle cases of literal 'all' if self.metrics == 'all': self.metrics = None if self.indicators == 'all': self.indicators = None # Handle invalid cases if type(self.metrics) is str: raise ValueError('Unexpected string `{}` for the config `report.metrics`') if type(self.indicators) is str: raise ValueError('Unexpected string `{}` for the config `report.indicators`') # Push metrics as they are produced, if we have requested a live # metric trace if reportConfig.get('raw', False): self.eventbus.subscribe(self.handleMetricUpdate, order=10, events=(MetricUpdateEvent, )) # Flush final metrics when the tests are completed self.lastFlush = time.time() self.flushing = False self.eventbus.subscribe(self.handleTeardown, order=10, events=(TeardownEvent, )) def handleMetricUpdate(self, event): """ Handle a metric change """ # Check if this metrics is ignored metricName = event.name if self.metrics is not None and metricName not in self.metrics: return # Check if the user has provided an alias map for the metric if type(self.metrics) is dict: metricName = self.metrics[metricName] self.logger.debug("Submitting to datadog {}{}={}".format( self.prefix, metricName, event.value )) # Instead of summiting frequent changes, we are batching them and # sending them at a fixed interval self.series.append({ "metric": '{}{}'.format(self.prefix, metricName), "points": (event.ts, event.value), "tags": list(map( lambda v: "{}:{}".format(v[0], str(v[1])), self.getMeta().items() )) }) # Check if we have reached the flush interval if (time.time() - self.lastFlush) >= self.flushInterval: self.flushMetrics() def dump(self, results): """ Dump summarized metrics to DataDog """ commonTags = list(map( lambda v: "{}:{}".format(v[0], str(v[1])), self.getMeta().items() )) # Report the metrics to DataDog (if enabled) if self.submitSummarized: for case in results.sum(): for metric, _summ in case['values'].items(): if self.metrics is not None and metric not in self.metrics: continue # Each metric can have one or more summarizers. Submit the values # from all of them to DataDog for summarizer, value in _summ.items(): # If we have an array, pick only the first item in the value set if type(value) in (list, tuple): value = value[0] # Check if the user has provided an alias map for the metric if type(self.metrics) is dict: metric = self.metrics[metric] self.logger.debug("Submitting to datadog {}{}.{}={}".format( self.prefix, metric, summarizer, value )) # Collect the data point self.series.append({ "metric": '{}{}.{}'.format(self.prefix, metric, summarizer), "points": (time.time(), value), "tags": commonTags + list(map( lambda v: "{}:{}".format(v[0], str(v[1])), case['parameters'].items() )) }) # Report the indicators to DataDog (if enabled) if self.submitIndicators: for indicator, value in summarizer.indicators().items(): if self.indicators is not None and indicator not in self.indicators: continue # Check if the user has provided an alias map for the indicator if type(self.indicators) is dict: indicator = self.indicators[indicator] self.logger.debug("Submitting to datadog {}{}={}".format( self.prefix, indicator, value )) # Submit the data point self.series.append({ "metric": '{}{}'.format(self.prefix, indicator), "points": (time.time(), value), "tags": commonTags }) # Flush the metrics we collected so far self.flushMetrics() def handleTeardown(self, event): """ Flush metrics when we are tearing down """ self.flushMetrics() def flushMetrics(self): """ Flush the metrics """ # A flush operation can take long time to complete. Do not trigger # a new flush until the previous one is completed if self.flushing: self.logger.warn("Very slow flushing to DataDog ({} sec)".format( time.time() - self.lastFlush)) return self.lastFlush = time.time() # Pop and reset the timeseries series = self.series self.series = [] # Ignore if there were no metrics to flush if len(series) == 0: return # Send the metrics self.flushing = True self.logger.info("Flushing {} points to DataDog".format(len(series))) api.Metric.send(series) self.flushing = False
[docs]class DataDogReporter(Reporter): """ The **DataDog Reporter** is uploading the indicators into DataDog for archiving and alerting usage. :: reporters: - class: reporter.DataDogReporter # The API Key to use api_key: 1234567890abcdef # The App Key to use app_key: 1234567890abcdef # The data points to submit points: # The name of the metric to submit to DataDog and the # indicator to read the data from - name: memory indicator: meanMemoryUsage # [Optional] The hostname to use as the agent name in datadog # If missing the network name of the machine will be used hostname: test.host # [Optional] Prefix of the metrics (Defaults to `dcos.perf.`) prefix: "dcos.perf." The DataDog reporter is using the DataDog API to submit one or more indicator values as data points. .. note:: This reporter is **only** collecting the ``indicators``. Metric values or summaries cannot be reported to DataDog. Use the ``reporter.DataDogMetricReporter`` instead. """ def dump(self, summarizer): """ Dump summarizer values to DataDog """ config = self.getRenderedConfig() # Initialize DataDog API initialize( api_key=config.get('api_key', None), app_key=config.get('app_key', None), hostname=config.get('hostname', socket.gethostname())) # Get some configuration options prefix = config.get('prefix', 'dcos.perf.') metrics = config.get('metrics', self.generalConfig.metrics.keys()) # Calculate indicators indicatorValues = summarizer.indicators() # Compise datadog series series = [] for point in config.get('points', []): # Make sure we have this summarizer if not point['indicator'] in indicatorValues: raise TypeError('Unknown indicator `{}` in datadog summarizer'.format( point['indicator'])) # Submit metrics and add all metadata as tags series.append({ "metric": '{}{}'.format(prefix, point['name']), "points": indicatorValues[point['indicator']], "tags": list( map(lambda v: "{}:{}".format(v[0], str(v[1])), self.getMeta().items())) }) # Send all series in one batch self.logger.info("Submitting series to datadog: {}".format(series)) api.Metric.send(series)