import requests
import time
from performance.driver.core.events import Event, TickEvent, ParameterUpdateEvent, TeardownEvent
from performance.driver.core.template import TemplateString, TemplateDict
from performance.driver.core.classes import Channel
from performance.driver.core.reflection import subscribesToHint, publishesHint
from threading import Thread, Lock
###############################
# Events
###############################
[docs]class HTTPRequestStartEvent(Event):
"""
Published before every HTTP request
"""
def __init__(self, verb, url, body, headers, *args, **kwargs):
super().__init__(*args, **kwargs)
#: The HTTP verb that was used (in lower-case). Ex: ``get``
self.verb = verb.lower()
#: The URL requested
self.url = url
#: The request body
self.body = body
#: The request headers
self.headers = headers
[docs]class HTTPFirstRequestStartEvent(HTTPRequestStartEvent):
"""
Published when the first request out of many is started.
This is valid when a ``repeat`` parameter has a value > 1.
"""
[docs]class HTTPLastRequestStartEvent(HTTPRequestStartEvent):
"""
Published when the last request out of many is started.
This is valid when a ``repeat`` parameter has a value > 1.
"""
[docs]class HTTPRequestEndEvent(Event):
"""
Published when the HTTP request has completed and the response is starting
"""
def __init__(self, verb, url, body, headers, *args, **kwargs):
super().__init__(*args, **kwargs)
#: The HTTP verb that was used (in lower-case). Ex: ``get``
self.verb = verb.lower()
#: The URL requested
self.url = url
#: The request body
self.body = body
#: The request headers
self.headers = headers
[docs]class HTTPFirstRequestEndEvent(HTTPRequestEndEvent):
"""
Published when the first request out of many is completed.
This is valid when a ``repeat`` parameter has a value > 1.
"""
[docs]class HTTPLastRequestEndEvent(HTTPRequestEndEvent):
"""
Published when the last request out of many is completed.
This is valid when a ``repeat`` parameter has a value > 1.
"""
[docs]class HTTPResponseStartEvent(Event):
"""
Published when the HTTP response is starting.
"""
def __init__(self, url, *args, **kwargs):
super().__init__(*args, **kwargs)
#: The URL requested
self.url = url
[docs]class HTTPFirstResponseStartEvent(HTTPResponseStartEvent):
"""
Published when the first response out of many is starting.
This is valid when a ``repeat`` parameter has a value > 1.
"""
[docs]class HTTPLastResponseStartEvent(HTTPResponseStartEvent):
"""
Published when the last response out of many is starting.
This is valid when a ``repeat`` parameter has a value > 1.
"""
[docs]class HTTPResponseEndEvent(Event):
"""
Published when the HTTP response has completed
"""
def __init__(self, url, body, headers, *args, **kwargs):
Event.__init__(self, *args, **kwargs)
#: The URL requested
self.url = url
#: The response body (as string)
self.body = body
#: The response headers
self.headers = headers
[docs]class HTTPFirstResponseEndEvent(HTTPResponseEndEvent):
"""
Published when the first response out of many has completed.
This is valid when a ``repeat`` parameter has a value > 1.
"""
[docs]class HTTPLastResponseEndEvent(HTTPResponseEndEvent):
"""
Published when the last response out of many has completed.
This is valid when a ``repeat`` parameter has a value > 1.
"""
[docs]class HTTPErrorEvent(Event):
"""
Published when an exception is raised during an HTTP operation (ex. connection error)
"""
def __init__(self, exception, *args, **kwargs):
Event.__init__(self, *args, **kwargs)
#: The exception that was raised
self.exception = exception
[docs]class HTTPResponseErrorEvent(HTTPResponseEndEvent, HTTPErrorEvent):
"""
Published when an exception was raised while processing an HTTP response.
This is valid when a ``repeat`` parameter has a value = 1.
"""
def __init__(self, url, body, headers, exception, *args, **kwargs):
HTTPResponseEndEvent.__init__(self, url, body, headers, *args, **kwargs)
HTTPErrorEvent.__init__(self, exception)
[docs]class HTTPFirstResponseErrorEvent(HTTPFirstResponseEndEvent, HTTPErrorEvent):
"""
Published when the first response out of many has an error.
This is valid when a ``repeat`` parameter has a value > 1.
"""
def __init__(self, url, body, headers, exception, *args, **kwargs):
HTTPFirstResponseEndEvent.__init__(self, url, body, headers, *args, **kwargs)
HTTPErrorEvent.__init__(self, exception)
[docs]class HTTPLastResponseErrorEvent(HTTPLastResponseEndEvent, HTTPErrorEvent):
"""
Published when the last response out of many has an error.
This is valid when a ``repeat`` parameter has a value > 1.
"""
def __init__(self, url, body, headers, exception, *args, **kwargs):
HTTPLastResponseEndEvent.__init__(self, url, body, headers, *args, **kwargs)
HTTPErrorEvent.__init__(self, exception)
###############################
# Helpers
###############################
def pickFirstLast(current, total, firstEvent, lastEvent, middleEvent):
"""
A helper function to select the correct event classification
as a shorthand.
- Pick `firstEvent` if current = 0
- Pick `lastEvent` if current = total - 1
- Pick `middleEvent` on any other case
"""
if current == total - 1:
return lastEvent
elif current == 0:
return firstEvent
else:
return middleEvent
class HTTPRequestState:
"""
This class keeps track of the request state.
This includes the request information (url, headers, body), the repeat
information (how many times to repeat the request and how many times we have
done this already) and other information used by the HTTPChannel.handleRequest
thread handler to complete the request.
"""
def __init__(self, channel, eventParameters, traceids):
# Render config and definitions
config = channel.getRenderedConfig(eventParameters)
definitions = channel.getDefinitions()
# If we are missing an `Authorization` header but we have a
# `dcos_auth_token` definition, allocate an `Authorization` header now
if not 'headers' in config:
config['headers'] = {}
if not 'Authorization' in config['headers'] \
and 'dcos_auth_token' in definitions:
config['headers']['Authorization'] = 'token={}'.format(
definitions['dcos_auth_token'])
# Get base request parameters
self.url = config['url']
self.body = config.get('body', '')
self.verb = config.get('verb', 'GET')
self.headers = config.get('headers', {})
self.traceids = traceids
self.eventParameters = eventParameters
self.channel = channel
# Extract repeat config
self.repeat = int(config.get('repeat', 1))
self.repeatAfter = config.get('repeatAfter', None)
self.repeatInterval = config.get('repeatInterval', None)
if not self.repeatInterval is None:
self.repeatInterval = float(self.repeatInterval)
# Expose 'i' parameter that should be equal to the current
# run in scase of a repeatable one
# State information
self.activeRequest = None
self.completedCounter = 0
self.lastRequestTs = 0
self.active = True
def getUrl(self):
"""
Dynamically compose URL, by appending some template variables (if any)
"""
# Compile the parameters to request
parameters = {'i': self.completedCounter}
parameters.update(self.eventParameters)
# Render url
return self.channel.getRenderedConfig(parameters).get('url')
def getBody(self):
"""
Dynamically compose body, by applying some template variables (if any)
"""
# Compile the parameters to request
parameters = {'i': self.completedCounter}
parameters.update(self.eventParameters)
# Render body
body = self.channel.getRenderedConfig(parameters).get('body')
# Apply conditionals on body
if type(body) is list:
for case in body:
if not 'if' in case or not eval(case['if'], parameters):
continue
return case['value']
raise ValueError(
'Could not find a matching body case for parameters: {0!r}'.format(
body))
# Render config and get body
return body
###############################
# Entry Point
###############################
[docs]class HTTPChannel(Channel):
"""
The *HTTP Channel* performs an HTTP Requests when a parameter changes.
::
channels:
- class: channel.HTTPChannel
# The URL to send the requests at
url: http://127.0.0.1:8080/v2/apps
# The body of the HTTP request
body: |
{
"cmd": "sleep 1200",
"cpus": 0.1,
"mem": 64,
"disk": 0,
"instances": {{instances}},
"id": "/scale-instances/{{uuid()}}",
"backoffFactor": 1.0,
"backoffSeconds": 0
}
# [Optional] The HTTP Verb to use (Defaults to 'GET')
verb: POST
# [Optional] The HTTP headers to send
headers:
Accept: text/plain
# [Optional] How many times to re-send the request (can be
# a macro value)
repeat: 1234
# [Optional] How long to wait between re-sends (in seconds)
# If missing the next request will be sent as soon as the previous
# has completed
repeatInterval: 1234
# [Optional] For which event to wait before re-sending the request.
repeatAfter: event
When a parameter is changed, a new HTTP request is made. If a ``repeat``
parameter is specified, the same HTTP request will be sent again, that many
times.
Various events are published from this channel, that can be used to synchronise
other components or track metrics.
* When an HTTP request is initiated an ``HTTPRequestStartEvent`` is published.
* When an HTTP request is completed and the response is pending, an
``HTTPFirstRequestEndEvent`` is published.
* When the HTTP response is starting, an ``HTTPFirstResponseStartEvent`` is
published.
* When the HTTP response is completed, an ``HTTPResponseEndEvent`` is
published.
If you are using the ``repeat`` configuration parameter you can also use the
following events:
* When the first HTTP request is started, the ``HTTPFirstRequestStartEvent``
is published.
* When the last HTTP request is started, the ``HTTPLastRequestStartEvent``
is published.
* When the first HTTP request is completed, the ``HTTPFirstRequestEndEvent``
is published.
* When the last HTTP request is completed, the ``HTTPLastRequestEndEvent``
is published.
* When the first HTTP response is started, the ``HTTPFirstResponseStartEvent``
is published.
* When the last HTTP response is started, the ``HTTPLastResponseStartEvent``
is published.
* When the first HTTP response is completed, the ``HTTPFirstResponseEndEvent``
is published.
* When the last HTTP response is completed, the ``HTTPLastResponseEndEvent``
is published.
Therefore it's possble to track the progress of the entire repeat batch, aswell
as the progress of an individual HTTP event.
.. note::
This channel will automatically inject an ``Authorization`` header if
a ``dcos_auth_token`` definition exists, so you don't have to specify
it through the ``headers`` configuration.
Note that a ``dcos_auth_token`` can be dynamically injected via an
authentication task.
"""
@subscribesToHint(ParameterUpdateEvent, TeardownEvent)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.requestStates = []
self.requestStateMutex = Lock()
self.session = requests.Session()
# Increase pool sizes
self.session.mount('https://',
requests.adapters.HTTPAdapter(
pool_connections=100, pool_maxsize=100))
self.session.mount('http://',
requests.adapters.HTTPAdapter(
pool_connections=100, pool_maxsize=100))
# Receive parameter updates and clean-up on teardown
self.eventbus.subscribe(self.handleTeardown, events=(TeardownEvent, ))
@publishesHint(HTTPFirstRequestEndEvent, HTTPLastRequestEndEvent,
HTTPRequestEndEvent, HTTPFirstResponseStartEvent,
HTTPLastResponseStartEvent, HTTPResponseStartEvent,
HTTPFirstRequestStartEvent, HTTPLastRequestStartEvent,
HTTPRequestStartEvent, HTTPFirstResponseEndEvent,
HTTPLastResponseEndEvent, HTTPResponseEndEvent)
def handleRequest(self, req):
if req is None or not req.active:
self.logger.debug('Bailing out of {} request to {} due to termination'.
format(req.verb, req.url))
return
self.logger.info('Performing {} {} request(s) to {}'.format(
req.repeat, req.verb, req.getUrl()))
# Make sure to process loops in a single stack frame
while req.active:
# Render body
renderedBody = req.getBody()
reqUrl = req.getUrl()
# Helper function that handles `repeatAfter` events before scheduling
# a new request
def handle_repeatAfter(event):
self.eventbus.unsubscribe(handle_repeatAfter)
self.handleRequest(req)
# Helper function that handles TickEvents until the designated time
# in the `repeatInterval` parameter has passed
def handle_repeatInterval(event):
deltaMs = event.ts - req.lastRequestTs
if deltaMs >= req.repeatInterval:
self.eventbus.unsubscribe(handle_repeatInterval)
self.handleRequest(req)
# Helper function to notify the message bus when an HTTP response starts
def ack_response(request, *args, **kwargs):
self.eventbus.publish(
pickFirstLast(req.completedCounter, req.repeat,
HTTPFirstRequestEndEvent, HTTPLastRequestEndEvent,
HTTPRequestEndEvent)(
req.verb,
reqUrl,
renderedBody,
req.headers,
traceid=req.traceids))
self.eventbus.publish(
pickFirstLast(req.completedCounter, req.repeat,
HTTPFirstResponseStartEvent,
HTTPLastResponseStartEvent, HTTPResponseStartEvent)(
reqUrl, traceid=req.traceids))
# Place request
self.eventbus.publish(
pickFirstLast(req.completedCounter, req.repeat,
HTTPFirstRequestStartEvent, HTTPLastRequestStartEvent,
HTTPRequestStartEvent)(
req.verb,
reqUrl,
renderedBody,
req.headers,
traceid=req.traceids))
self.logger.debug('Placing a {} request to {}'.format(req.verb, reqUrl))
try:
# Send request (and trap errors)
req.activeRequest = self.session.request(
req.verb,
reqUrl,
verify=False,
data=renderedBody,
headers=req.headers,
hooks=dict(response=ack_response))
# Warn errors
if (req.activeRequest.status_code <
200) or (req.activeRequest.status_code >= 300):
self.logger.warn(
'HTTP {} Request to {} returned status code of {}'.format(
req.verb, reqUrl, req.activeRequest.status_code))
# Process response
self.eventbus.publish(
pickFirstLast(req.completedCounter, req.repeat,
HTTPFirstResponseEndEvent, HTTPLastResponseEndEvent,
HTTPResponseEndEvent)(
reqUrl,
req.activeRequest.text,
req.activeRequest.headers,
traceid=req.traceids))
except requests.exceptions.ConnectionError as e:
self.logger.error('Cannot connect to {}'.format(reqUrl))
# Dispatch error
self.eventbus.publish(
pickFirstLast(req.completedCounter, req.repeat,
HTTPFirstResponseErrorEvent,
HTTPLastResponseErrorEvent, HTTPResponseErrorEvent)(
reqUrl, "", {}, e, traceid=req.traceids))
# Check for repetitions
req.completedCounter += 1
if req.completedCounter < req.repeat:
self.logger.debug("Completed {} out of {} requests".format(req.completedCounter, req.repeat))
# Register an event listener if we have an `repeatAfter` parameter
if not req.repeatAfter is None:
self.eventbus.subscribe(
handle_repeatAfter, events=(req.repeatAfter, ))
break
# Register a timeout if we have a `repeatInterval` parameter
if not req.repeatInterval is None:
req.lastRequestTs = time.time()
self.eventbus.subscribe(handle_repeatInterval, events=(TickEvent, ))
break
# Otherwise let the loop continue in order to re-schedule request
continue
else:
self.logger.info('Completed {} {} request(s) to {}'.format(
req.repeat, req.verb, req.getUrl()))
# Remoe the active reuqest when we are done
with self.requestStateMutex:
try:
self.requestStates.remove(req)
except ValueError:
pass
break
def handleTeardown(self, event):
"""
Stop pending request(s) at teardown
"""
with self.requestStateMutex:
for req in self.requestStates:
req.active = False
if req.activeRequest:
req.activeRequest.raw._fp.close()
self.requestStates = []
def handleParameterUpdate(self, event):
"""
Handle a property update
"""
# Prepare request state and send initial request
state = HTTPRequestState(self, event.parameters, event.traceids)
# Keep track of the requests
with self.requestStateMutex:
self.requestStates.append(state)
# Start request chain
Thread(target=self.handleRequest, daemon=True, args=(state, )).start()