Repository
Munin (contrib)
Last change
2018-08-02
Graph Categories
Family
manual
Capabilities
Keywords
Language
Python (2.x)
License
GPL-2.0-only
Authors

celery_tasks_states

Name

celery_tasks_states - Munin plugin to monitor the number of Celery tasks in each state.

Requirements

- Python
- celery (http://celeryproject.org/)
- celerymon (http://github.com/ask/celerymon)

Note: don’t forget to enable sending of the events on the celery daemon - run it with the –events option

Configuration

Default configuration:

[celery_tasks_states]
       env.api_url http://localhost:8989
       env.workers all

If workers variable is not set or set to “all”, task number for all the workers is monitored.

You can optionally set the workers variable to the string of hostnames you want to monitor separated by a comma.

For example:

[celery_tasks]
       env.workers localhost,foo.bar.net,bar.foo.net

This would only monitor the number of tasks for the workers with the hostnames “localhost”, “foo.bar.net” and “bar.foo.net”

Magic Markers

#%# family=manual
#%# capabilities=autoconf

Author

Tomaz Muraus (http://github.com/Kami/munin-celery)

License

GPLv2

#!/usr/bin/env python
"""=cut
=head1 NAME

celery_tasks_states - Munin plugin to monitor the number of Celery tasks in each state.

=head1 REQUIREMENTS

 - Python
 - celery (http://celeryproject.org/)
 - celerymon (http://github.com/ask/celerymon)

Note: don't forget to enable sending of the events on the celery daemon - run it with the --events option

=head1 CONFIGURATION

Default configuration:

  [celery_tasks_states]
	 env.api_url http://localhost:8989
	 env.workers all

If workers variable is not set or set to "all", task number for all the workers is monitored.

You can optionally set the workers variable to the string of hostnames you want to monitor separated by a comma.

For example:

  [celery_tasks]
	 env.workers localhost,foo.bar.net,bar.foo.net

This would only monitor the number of tasks for the workers with the hostnames "localhost", "foo.bar.net" and "bar.foo.net"

=head1 MAGIC MARKERS

  #%# family=manual
  #%# capabilities=autoconf

=head1 AUTHOR

Tomaz Muraus (http://github.com/Kami/munin-celery)

=head1 LICENSE

GPLv2

=cut"""

import os
import sys
import urllib

try:
	import json
except:
	import simplejson as json

API_URL = 'http://localhost:8989'
URL_ENDPOINTS = {
		'workers': '/api/worker/',
		'worker_tasks': '/api/worker/%s/tasks',
		'tasks': '/api/task/',
		'task_names': '/api/task/name/',
		'task_details': '/api/task/name/%s',
}
TASK_STATES = (
			'PENDING',
			'RECEIVED',
			'STARTED',
			'SUCCESS',
			'FAILURE',
			'REVOKED',
			'RETRY'
)

def get_data(what, api_url, *args):
	try:
		request = urllib.urlopen('%s%s' % (api_url, \
										   URL_ENDPOINTS[what] % (args)))
		response = request.read()
		return json.loads(response)
	except IOError:
		print 'Could not connect to the celerymon webserver'
		sys.exit(-1)

def check_web_server_status(api_url):
	try:
		request = urllib.urlopen(api_url)
		response = request.read()
	except IOError:
		print 'Could not connect to the celerymon webserver'
		sys.exit(-1)

def clean_state_name(state_name):
	return state_name.lower()

# Config
def print_config(workers = None):
	if workers:
		print 'graph_title Celery tasks in each state [workers = %s]' % (', ' . join(workers))
	else:
		print 'graph_title Celery tasks in each state'
	print 'graph_args --lower-limit 0'
	print 'graph_scale no'
	print 'graph_vlabel tasks per ${graph_period}'
	print 'graph_category cloud'

	for name in TASK_STATES:
		name = clean_state_name(name)
		print '%s.label %s' % (name, name)
		print '%s.type DERIVE' % (name)
		print '%s.min 0' % (name)
		print '%s.info number of %s tasks' % (name, name)

# Values
def print_values(workers = None, api_url = None):
	data = get_data('tasks', api_url)

	counters = dict([(key, 0) for key in TASK_STATES])
	for task_name, task_data in data:
		state = task_data['state']
		hostname = task_data['worker']['hostname']

		if workers and hostname not in workers:
			continue

		counters[state] += 1

	for name in TASK_STATES:
		name_cleaned = clean_state_name(name)
		value = counters[name]
		print '%s.value %d' % (name_cleaned, value)

if __name__ == '__main__':
	workers = os.environ.get('workers', 'all')
	api_url = os.environ.get('api_url', API_URL)

	check_web_server_status(api_url)

	if workers in [None, '', 'all']:
		workers = None
	else:
		workers = workers.split(',')

	if len(sys.argv) > 1:
		if sys.argv[1] == 'config':
			print_config(workers)
		elif sys.argv[1] == 'autoconf':
			print 'yes'
	else:
		print_values(workers, api_url)