sprockets-statsd

Asynchronously send metrics to a statsd instance.

build coverage sonar docs source download license

This library provides connectors to send metrics to a statsd instance using either TCP or UDP.

import asyncio
import time

import sprockets_statsd.statsd

statsd = sprockets_statsd.statsd.Connector(
   host=os.environ.get('STATSD_HOST', '127.0.0.1'))

async def do_stuff():
   start = time.time()
   response = make_some_http_call()
   statsd.timing(f'timers.http.something.{response.code}',
                 (time.time() - start))

async def main():
   await statsd.start()
   try:
      do_stuff()
   finally:
      await statsd.stop()

The Connector instance maintains a resilient connection to the target StatsD instance, formats the metric data into payloads, and sends them to the StatsD target. It defaults to using TCP as the transport but will use UDP if the ip_protocol keyword is set to socket.IPPROTO_UDP. The Connector.start method starts a background asyncio.Task that is responsible for maintaining the connection. The timing method enqueues a timing metric to send and the task consumes the internal queue when it is connected.

The following convenience methods are available. You can also call inject_metric for complete control over the payload.

incr

Increment a counter metric

decr

Decrement a counter metric

gauge

Adjust or set a gauge metric

timing

Append a duration to a timer metric

Tornado helpers

The sprockets_statsd.tornado module contains mix-in classes that make reporting metrics from your tornado web application simple. You will need to install the sprockets_statsd[tornado] extra to ensure that the Tornado requirements for this library are met.

import asyncio
import logging

from tornado import ioloop, web

import sprockets_statsd.tornado


class MyHandler(sprockets_statsd.tornado.RequestHandler,
                web.RequestHandler):
    async def get(self):
        with self.execution_timer('some-operation'):
            await self.do_something()
        self.set_status(204)

    async def do_something(self):
        await asyncio.sleep(1)


class Application(sprockets_statsd.tornado.Application, web.Application):
    def __init__(self, **settings):
        settings['statsd'] = {
            'host': os.environ['STATSD_HOST'],
            'prefix': 'applications.my-service',
        }
        super().__init__([web.url('/', MyHandler)], **settings)

    async def on_start(self):
        await self.start_statsd()

    async def on_stop(self):
        await self.stop_statsd()


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    app = Application()
    app.listen(8888)
    iol = ioloop.IOLoop.current()
    try:
        iol.add_callback(app.on_start)
        iol.start()
    except KeyboardInterrupt:
        iol.add_future(asyncio.ensure_future(app.on_stop()),
                       lambda f: iol.stop())
        iol.start()

This application will emit two timing metrics each time that the endpoint is invoked:

applications.my-service.timers.some-operation:1001.3449192047119|ms
applications.my-service.timers.MyHandler.GET.204:1002.4960041046143|ms

You will need to set the $STATSD_HOST environment variable to enable the statsd processing inside of the application. The RequestHandler class exposes methods that send counter and timing metrics to a statsd server. The connection is managed by the Application provided that you call the start_statsd method during application startup.

Metrics are sent by a asyncio.Task that is started by start_statsd. The request handler methods insert the metric data onto a asyncio.Queue that the task reads from. Metric data remains on the queue when the task is not connected to the server and will be sent in the order received when the task establishes the server connection.

Integration with sprockets.http

If you use sprockets.http in your application stack, then the Tornado integration will detect it and install the initialization and shutdown hooks for you. The application will just work provided that the $STATSD_HOST and $STATSD_PREFIX environment variables are set appropriately. The following snippet will produce the same result as the Tornado example even without setting the prefix:

class Application(sprockets_statsd.tornado.Application,
                  sprockets.http.app.Application):
    def __init__(self, **settings):
        statsd = settings.setdefault('statsd', {})
        statsd.setdefault('host', os.environ['STATSD_HOST'])
        statsd.setdefault('protocol', 'tcp')
        settings.update({
            'service': 'my-service',
            'environment': os.environ.get('ENVIRONMENT', 'development'),
            'statsd': statsd,
            'version': getattr(__package__, 'version'),
        })
        super().__init__([web.url('/', MyHandler)], **settings)

if __name__ == '__main__':
    sprockets.http.run(Application, log_config=...)

Definint the service and environment in settings as above will result in the prefix being set to:

applications.{self.settings["service"]}.{self.settings["environment"]}

The recommended usage is to:

  1. define service, environment, and version in the settings

  2. explicitly set the host and protocol settings in self.settings["statsd"]

Tornado configuration

The Tornado statsd connection is configured by the statsd application settings key. The default values can be set by the following environment variables.

STATSD_HOST

The host or IP address of the StatsD server to send metrics to.

STATSD_PORT

The TCP port number that the StatsD server is listening on. This defaults to 8125 if it is not configured.

STATSD_PREFIX

Optional prefix to use for metric paths. See the documentation for Application for addition notes on setting the path prefix when using the Tornado helpers.

STATSD_PROTOCOL

The IP protocol to use when connecting to the StatsD server. You can specify either “tcp” or “udp”. The default is “tcp” if it not not configured.

STATSD_ENABLED

Define this variable and set it to a falsy value to disable the Tornado integration. If you omit this variable, then the connector is enabled. The following values are considered truthy:

  • non-zero integer

  • case-insensitive match of yes, true, t, or on

All other values are considered falsy. You only want to define this environment variables when you want to explicitly disable an otherwise installed and configured connection.

If you are using the Tornado helper clases, then you can fine tune the metric payloads and the connector by setting additional values in the statsd key of tornado.web.Application.settings. See the sprockets_statsd.tornado.Application class documentation for a description of the supported settings.

Reference

class sprockets_statsd.statsd.Connector(host, port=8125, *, prefix='', **kwargs)

Sends metrics to a statsd server.

Parameters
  • host (str) – statsd server to send metrics to

  • port (int) – socket port that the server is listening on

  • ip_protocol – IP protocol to use for the underlying socket – either socket.IPPROTO_TCP for TCP or socket.IPPROTO_UDP for UDP sockets.

  • prefix – optional string to prepend to metric paths

  • kwargs (Any) – additional keyword parameters are passed to the Processor initializer

This class maintains a connection to a statsd server and sends metric lines to it asynchronously. You must call the start() method when your application is starting. It creates a Task that manages the connection to the statsd server. You must also call stop() before terminating to ensure that all metrics are flushed to the statsd server.

Metrics are optionally prefixed with prefix before the metric type prefix. This should be used to prevent metrics from being overwritten when multiple applications share a StatsD instance. Each metric type is also prefixed by one of the following strings based on the metric type:

Method call

Prefix

Type code

incr()

counters.

c

decr()

counters.

c

gauge()

gauges.

g

timing()

timers.

ms

When the connector is should_terminate, metric payloads are sent by calling the inject_metric() method. The payloads are stored in an internal queue that is consumed whenever the connection to the server is active.

prefix: str

String to prefix to all metrics before the metric type prefix.

processor: Processor

The statsd processor that maintains the connection and sends the metric payloads.

inject_metric(path, value, type_code)

Send a metric to the statsd server.

Parameters
  • path (str) – formatted metric name

  • value (str) – formatted metric value

  • type_code (str) – type of the metric to send

This method formats the payload and inserts it on the internal queue for future processing.

Return type

None

async start()

Start the processor in the background.

This is a blocking method and does not return until the processor task is actually running.

Return type

None

async stop()

Stop the background processor.

Items that are currently in the queue will be flushed to the statsd server if possible. This is a blocking method and does not return until the background processor has stopped.

Return type

None

Tornado helpers

class sprockets_statsd.tornado.Application(*args, **settings)

Mix this into your application to add a statsd connection.

statsd_connector: sprockets_statsd.statsd.AbstractConnector

Connection to the StatsD server that is set between calls to start_statsd() and stop_statsd().

This mix-in is configured by the statsd settings key. The value is a dictionary with the following keys.

enabled

should the statsd connector be enabled?

host

the statsd host to send metrics to

port

port number that statsd is listening on

prefix

segment to prefix to metrics

protocol

“tcp” or “udp”

reconnect_timeout

number of seconds to sleep after a statsd connection attempt fails

wait_timeout

number of seconds to wait for a metric to arrive on the queue before verifying the connection

enabled defaults to the STATSD_ENABLED environment variable coerced to a bool. If this variable is not set, then the statsd connector WILL BE enabled. Set this to a falsy value to disable the connector. The following values are considered truthy: a non-zero integer or a case-insensitive match of “on”, “t”, “true”, or “yes”. All other values are considered falsy.

host defaults to the STATSD_HOST environment variable. If this value is not set, then the statsd connector WILL NOT be enabled.

port defaults to the STATSD_PORT environment variable with a back up default of 8125 if the environment variable is not set.

prefix is prefixed to all metric paths. This provides a namespace for metrics so that each applications metrics are maintained in separate buckets. The default is to use the STATSD_PREFIX environment variable. If it is unset and the service and environment keys are set in settings, then the default is applications.<service>.<environment>. This is a convenient way to maintain consistent metric paths when you are managing a larger number of services.

Warning

If you want to run without a prefix, then you are required to explicitly set statsd.prefix to None. This prevents accidentally polluting the metric namespace with unqualified paths.

protocol defaults to the STATSD_PROTOCOL environment variable with a back default of “tcp” if the environment variable is not set.

reconnect_timeout defaults to 1.0 seconds which limits the aggressiveness of creating new TCP connections.

wait_timeout defaults to 0.1 seconds which ensures that the processor quickly responds to connection faults.

async start_statsd(*_)

Start the connector during startup.

Call this method during application startup to enable the statsd connection. A new Connector instance will be created and started. This method does not return until the connector is running.

Return type

None

async stop_statsd(*_)

Stop the connector during shutdown.

If the connector was started, then this method will gracefully terminate it. The method does not return until after the connector is stopped.

Return type

None

class sprockets_statsd.tornado.RequestHandler(application, request, **kwargs)

Mix this into your handler to send metrics to a statsd server.

execution_timer(*path)

Record the execution duration of a block of code.

Parameters

path (Any) – path to record the duration as

Return type

Generator[None, None, None]

increase_counter(*path, amount=1)

Adjust a counter.

Parameters
  • path (Any) – path of the counter to adjust

  • amount (int) – amount to adjust the counter by. Defaults to 1 and can be negative

Return type

None

on_finish()

Extended to record the request time as a duration.

This method extends tornado.web.RequestHandler.on_finish() to record self.request.request_time as a timing metric.

Return type

None

record_timing(secs, *path)

Record the duration.

Parameters
  • secs (float) – number of seconds to record

  • path (Any) – path to record the duration under

Return type

None

Internals

class sprockets_statsd.statsd.AbstractConnector

StatsD connector that does not send metrics or connect.

Use this connector when you want to maintain the application interface without doing any real work.

decr(path, value=1)

Decrement a counter metric.

Parameters
  • path (str) – counter to decrement

  • value (int) – amount to decrement the counter by

This is equivalent to self.incr(path, -value).

Return type

None

gauge(path, value, delta=False)

Manipulate a gauge metric.

Parameters
  • path (str) – gauge to adjust

  • value (int) – value to send

  • delta (bool) – is this an adjustment of the gauge?

If the delta parameter is False (or omitted), then value is the new value to set the gauge to. Otherwise, value is an adjustment for the current gauge.

Return type

None

incr(path, value=1)

Increment a counter metric.

Parameters
  • path (str) – counter to increment

  • value (int) – amount to increment the counter by

Return type

None

timing(path, seconds)

Send a timer metric.

Parameters
  • path (str) – timer to append a value to

  • seconds (float) – number of seconds to record

Return type

None

class sprockets_statsd.statsd.Processor(*, host, port=8125, ip_protocol=6, max_queue_size=1000, reconnect_sleep=1.0, wait_timeout=0.1)

Maintains the statsd connection and sends metric payloads.

Parameters
  • host (str) – statsd server to send metrics to

  • port (int) – TCP port that the server is listening on

  • max_queue_size (int) – only allow this many elements to be stored in the queue before discarding metrics

  • reconnect_sleep (float) – number of seconds to sleep after socket error occurs when connecting

  • wait_timeout (float) – number os seconds to wait for a message to arrive on the queue

This class implements Protocol for the statsd TCP connection. The run() method is run as a background Task that consumes payloads from an internal queue, connects to the TCP server as required, and sends the already formatted payloads.

host: str

IP address or DNS name for the statsd server to send metrics to

port: int

TCP port number that the statsd server is listening on

should_terminate: bool

Flag that controls whether the background task is active or not. This flag is set to False when the task is started. Setting it to True will cause the task to shutdown in an orderly fashion.

queue: asyncio.Queue

Formatted metric payloads to send to the statsd server. Enqueue payloads to send them to the server.

running: asyncio.Event

Is the background task currently running? This is the event that run() sets when it starts and it remains set until the task exits.

stopped: asyncio.Event

Is the background task currently stopped? This is the event that run() sets when it exits and that stop() blocks on until the task stops.

property connected

Is the processor connected?

Return type

bool

async run()

Maintains the connection and processes metric payloads.

Return type

None

async stop()

Stop the processor.

This is an asynchronous but blocking method. It does not return until enqueued metrics are flushed and the processor connection is closed.

Return type

None

class sprockets_statsd.statsd.StatsdProtocol

Common interface for backend protocols/transports.

UDP and TCP transports have different interfaces (sendto vs write) so this class adapts them to a common protocol that our code can depend on.

buffered_data: bytes

Bytes that are buffered due to low-level transport failures. Since protocols & transports are created anew with each connect attempt, the Processor instance ensures that data buffered on a transport is copied over to the new transport when creating a connection.

connected: asyncio.Event

Is the protocol currently connected?

connection_lost(exc)

Clear the connected event.

Return type

None

connection_made(transport)

Capture the new transport and set the connected event.

Return type

None

send(metric)

Send a metric payload over the transport.

Return type

None

async shutdown()

Shutdown the transport and wait for it to close.

Return type

None

class sprockets_statsd.statsd.TCPProtocol

StatsdProtocol implementation over a TCP/IP connection.

eof_received()

Called when the other end calls write_eof() or equivalent.

If this returns a false value (including None), the transport will close itself. If it returns a true value, closing the transport is up to the protocol.

Return type

None

send(metric)

Send metric to the server.

If sending the metric fails, it will be saved in self.buffered_data. The processor will save and restore the buffered data if it needs to create a new protocol object.

Return type

None

async shutdown()

Close the transport after flushing any outstanding data.

Return type

None

Release history

0.1.0 (10-May-2021)

  • Added STATSD_ENABLED environment variable to disable the Tornado integration

  • Tornado application mixin automatically installs start/stop hooks if the application quacks like a sprockets.http.app.Application.

  • Limit logging when disconnected from statsd

0.0.1 (08-Apr-2021)

  • Simple support for sending counters & timers to statsd over a TCP or UDP socket