import time
import requests
from performance.driver.core.classes import Observer
from performance.driver.core.events import Event, TeardownEvent, StartEvent, ParameterUpdateEvent
from performance.driver.core.reflection import subscribesToHint, publishesHint
from threading import Thread
[docs]class HTTPTimingResultEvent(Event):
"""
The results of a timing event, initiated by a ``HTTPTimingObserver``
"""
def __init__(self, url, verb, statusCode, requestTime, responseTime,
totalTime, contentLength, *args, **kwargs):
super().__init__(*args, **kwargs)
#: The URL requested
self.url = url
#: The HTTP verb used to request this resource
self.verb = verb
#: The HTTP response code
self.statusCode = statusCode
#: The time the HTTP request took to complete
self.requestTime = requestTime
#: The time the HTTP response took to complete
self.responseTime = responseTime
#: The overall time from the beginning of the request, till the end of the
#: response
self.totalTime = totalTime
#: The length of the response body
self.contentLength = contentLength
[docs]class HTTPTimingObserver(Observer):
"""
The *HTTP Timing Observer* is performing HTTP requests to the given endpoint
and is measuring the request and response times.
::
observers:
- class: observer.HTTPTimingObserver
# The URL to send the requests at
url: http://127.0.0.1:8080/v2/apps
# [Optional] The interval of the reqeusts (seconds)
interval: 1
# [Optional] The body of the HTTP request
body: |
{
"cmd": "sleep 1200",
"cpus": 0.1,
"mem": 64,
"disk": 0,
"instances": {{instances}},
"id": "/scale-instances/{{uuid()}}",
"backoffFactor": 1.0,
"backoffSeconds": 0
}
# [Optional] The HTTP Verb to use (Defaults to 'GET')
verb: POST
# [Optional] The HTTP headers to send
headers:
Accept: text/plain
This observer is publishing a ``HTTPTimingResultEvent`` every time a sample
is taken. Refer to the event documentation for more details.
"""
@subscribesToHint(TeardownEvent, StartEvent, ParameterUpdateEvent)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url = self.getConfig('url')
self.interval = float(self.getConfig('interval', '1'))
self.clockThread = None
self.active = False
self.traceids = None
# Register to the Start / Teardown events
self.eventbus.subscribe(self.handleTeardownEvent, events=(TeardownEvent, ))
self.eventbus.subscribe(self.handleStartEvent, events=(StartEvent, ))
self.eventbus.subscribe(
self.handleParameterUpdate, events=(ParameterUpdateEvent, ))
def handleParameterUpdate(self, event):
"""
Update trace IDs
"""
self.traceids = event.traceids
def handleStartEvent(self, event):
"""
Start polling timer
"""
self.logger.debug('Starting polling timer')
self.active = True
self.clockThread = Thread(target=self.pollThreadHandler, name="http-timing")
self.clockThread.start()
def handleTeardownEvent(self, event):
"""
Interrupt polling timer
"""
self.logger.debug('Stopping polling timer')
self.active = False
self.clockThread.join()
self.clockThread = None
@publishesHint(HTTPTimingResultEvent)
def pollThreadHandler(self):
"""
A thread that keeps polling the given url until it responds
"""
# Render config and definitions
config = self.getRenderedConfig()
definitions = self.getDefinitions()
# If we are missing an `Authorization` header but we have a
# `dcos_auth_token` definition, allocate an `Authorization` header now
if not 'headers' in config:
config['headers'] = {}
if not 'Authorization' in config['headers'] \
and 'dcos_auth_token' in definitions:
config['headers']['Authorization'] = 'token={}'.format(
definitions['dcos_auth_token'])
# Extract useful info
url = config['url']
body = config.get('body', None)
headers = config['headers']
verb = config.get('verb', 'get')
# While running, start
while self.active:
self.logger.debug('Checking the endpoint')
try:
# Reset timer values
times = [0, 0, 0]
# Acknowledge response
def ack_response(request, *args, **kwargs):
times[1] = time.time()
# Send request (and catch errors)
times[0] = time.time()
self.logger.debug('Performing HTTP {} to {}'.format(verb, url))
res = requests.request(
verb,
url,
verify=False,
data=body,
headers=headers,
hooks=dict(response=ack_response))
times[2] = time.time()
# Log error status codes
self.logger.debug('Completed with HTTP {}'.format(res.status_code))
if res.status_code != 200:
self.logger.warn('Endpoint at {} responded with HTTP {}'.format(
url, res.status_code))
# Broadcast status
self.logger.debug(
'Measurement completed: request={}, response={}, total={}'.format(
times[1] - times[0], times[2] - times[1], times[2] - times[0]))
self.eventbus.publish(
HTTPTimingResultEvent(
url,
verb,
res.status_code,
times[1] - times[0],
times[2] - times[1],
times[2] - times[0],
len(res.text),
traceid=self.traceids))
except requests.exceptions.ConnectionError as e:
self.logger.error('Unable to connect to {}'.format(url))
except Exception as e:
self.logger.error(
'An unhandled urllib exception occurred: {}'.format(e))
# Wait for next tick
time.sleep(self.interval)