Source code for performance.driver.classes.observer.logstax.observer

import re

from performance.driver.core.classes import Observer
from performance.driver.core.events import Event, ParameterUpdateEvent, LogLineEvent, isEventMatching
from performance.driver.core.reflection import subscribesToHint, publishesHint

from .codecs import CodecTypes
from .filters import FilterTypes


[docs]class LogStaxMessageEvent(Event): def __init__(self, message, **kwargs): super().__init__(**kwargs) # Extract tags and fields self.tags = message.tags self.fields = message.fields
def createEventFilterFn(eventSpec): """ Creates a function that returns `None` if the passed event does not match the specifications, or the log line if it passes. """ def filterFn(event): # Check if name matches if not isEventMatching(event, eventSpec['name']): return None # Check if kind matches if 'kind' in eventSpec: kinds = eventSpec['kind'] if type(kinds) is str: kinds = [kinds] try: if not event.kind in kinds: return None except AttributeError: return None # Return the field if 'field' in eventSpec: return getattr(event, eventSpec['field']) else: try: return event.field except AttributeError: return None return filterFn
[docs]class LogStaxObserver(Observer): """ The **Logstax Observer** is logstash-like observer for dcos-perf-test-driver that uses some event contents as the line source and a set of rules for creating fields for post-processing. :: observers: - class: observer.LogStaxObserver # An array of filters to apply on every line filters: # Grok Pattern Matching # --------------------- - type: grok # Match the given field(s) for GROK expressions match: message: "^%{IP:httpHost} - (?<user>%{WORD}|-).*" # [Optional] Overwrite the specified fields with the values # extracted from the grok pattern. By default only new fields # are appended. overwrite: [message, name] # [Optional] Add the given fields to the message add_field: source: grok # [Optional] Remove the given fields from the message remove_field: [source] # [Optional] Add the given tags in the message add_tag: [foo, bar] # [Optional] Remove the given tags in the message remove_tag: [bar, baz] # [Optional] Which event(s) to listen for and which fields to extract events: # By default it's using the `LogLineEvent` - name: LogLineEvent field: line # [Optional] One or more `codecs` to apply on the incoming lines. # These codecs convert one or more log lines into codecs: # Pass-through every incoming line to a rule matcher - type: singleline # Group multiple lines into a block and then pass it to the # rule matcher as an array of lines. Refer to the `MultilineCodec` # for more details. - type: multiline lines: - match: firstLine.* - match: secondLine.* This observer is trying to reproduce a logstash set-up, using the LogLineEvent as the only source. It is first passing the events through a `codec` that is going to create a processable messages. Each message contains fields and tags. By default, the `singleline` codec is populating the `message` field with the contents of the line. The `multiline` codec is more elaborate and can be used in order to extract multi-line blocks from the incoming stream. The messages are then passed to the filters. If a filter matches the incoming message it is going to apply the transformations described. When the filter process is completed, the observer is going to braodcast a ``LogStaxMessageEvent`` that can be processed at a later time by the ``LogStaxTracker`` in order to extract useful metrics. """ @subscribesToHint(LogLineEvent, ParameterUpdateEvent) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) config = self.getRenderedConfig() # Compose codecs self.codecs = [] for codec in config.get('codecs', [{'type': 'singleline'}]): if codec['type'] in CodecTypes: codecClass = CodecTypes[codec['type']] self.codecs.append(codecClass(codec)) else: self.logger.error('Unknown codec `{}` specified'.format(codec['type'])) # Compose filters self.filters = [] for filter in config['filters']: if filter['type'] in FilterTypes: filterClass = FilterTypes[filter['type']] self.filters.append(filterClass(filter)) else: self.logger.error( 'Unknown filter `{}` specified'.format(filter['type'])) # Compose event filters self.events = [] for event in config.get('events', [{ 'name': 'LogLineEvent', 'kind': ['stdout', 'stderr'], 'field': 'line' }]): self.events.append(createEventFilterFn(event)) # Stop thread at teardown self.eventbus.subscribe(self.handleAnyEvent) # Keep trace ID of the latest parameter update self.traceids = None self.eventbus.subscribe( self.handleParameterUpdate, events=(ParameterUpdateEvent, )) def handleParameterUpdate(self, event): """ Update the eventID to use on the log line events """ self.traceids = event.traceids def handleLine(self, line): """ Process the given log line, regardless of it's origin """ # Process the given line through the codecs in best-effort order messages = [] for codec in self.codecs: msg = codec.handle(line) if len(msg) > 0: messages = msg break # Check if no codec could process the given line if len(messages) == 0: return # Filter messages for message in messages: handled = (len(self.filters) == 0) for inst in self.filters: res = inst.filter(message) if not res is None: handled = True message = res # Broadcast event if it's handled if handled: self.handleMessage(message) @publishesHint(LogStaxMessageEvent) def handleMessage(self, message): """ Handle a completed message """ self.eventbus.publish(LogStaxMessageEvent(message, traceid=self.traceids)) def handleAnyEvent(self, event): """ Extract log from every event """ for applyFilter in self.events: line = applyFilter(event) if line is None: continue # Handle line and exit self.handleLine(line) return