Browse Source

Initial import

Jan-Piet Mens 12 years ago
commit
58bc8585f6
3 changed files with 256 additions and 0 deletions
  1. 46 0
      README.md
  2. 42 0
      map
  3. 168 0
      mqtt2graphite.py

+ 46 - 0
README.md

@@ -0,0 +1,46 @@
+# mqtt2graphite
+
+This program subscribes to any number of MQTT topics, extracts a value from the
+messages' payload and sends that off to [Graphite][1] via [Carbon][2] over a UDP
+socket. 
+
+Values in the payload can be simple numbers (`92`, `12.7`) or [JSON][3] strings.
+In the latter case, all JSON names/keys are extracted and if their values are 
+numeric, these are then sent off to Carbon (see example below)
+
+## Requirements
+
+* [mosquitto.py](http://mosquitto.org/documentation/python/)
+* A running Carbon/Graphite server with UDP-enabled reception
+* Access to an MQTT broker (I use [Mosquitto](http://mosquitto.org/))
+
+## Running
+
+* Set the environment variable `MQTT_HOST` to the name/IP of your MQTT broker. (`localhost` is default.)
+* Edit the `map` file
+* Run `./mqtt2graphite.py`
+
+## Handling JSON payloads
+
+```
+mosquitto_pub  -t test/jp/j2 -m '{ "size":69,"temp": 89.3, "gas": " 88", "name": "JP Mens" }'
+```
+
+produces the following Carbon keys
+
+```
+test.jp.j2.gas 88.000000 1363169282
+test.jp.j2.temp 89.300000 1363169282
+test.jp.j2.size 69.000000 1363169282
+```
+
+## Todo
+
+A lot. 
+
+* I'm not experienced enough with high volume of messages, so this should maybe
+  transmit to Carbon via StatsD?
+
+  [1]: http://graphite.wikidot.com/
+  [2]: http://graphite.wikidot.com/getting-your-data-into-graphite
+  [3]: http://json.org

+ 42 - 0
map

@@ -0,0 +1,42 @@
+#(@)mqtt2graphite map file
+
+# Each line has two OR three values in it. Values MUST no contain white
+# space. 
+
+# First value:
+#	n 	payload contains a number (int or float) use that
+#	j	payload is JSON. Extract all keys with numeric values
+
+# Second value:
+#	Subscribe to this channel. MQTT wildcards (#) are allowed.
+
+# Third (optional) value:
+#	The MQTT topic is to be mapped to this key in Carbon (Graphite)
+#	Use periods as separators, just like Carbon expects them. If 
+#	this value is not specified, the MQTT topic name will be used,
+#	with slashes (/) converted to dots (.)
+
+n	test/mosquitto/messages/load/received
+n	test/mosquitto/messages/load/#
+
+n	test/jp/j1
+
+n	$SYS/broker/load/messages/received/1min		test.mosquitto.messages.load.received
+n	$SYS/broker/load/messages/sent/1min		test.mosquitto.messages.load.sent
+
+j	test/jp/j2
+
+j	test/jp/j3					test.jp.json
+
+# The last line above means, subscribe to the MQTT topic of "test/jp/jp3",
+# extract JSON, and translate the topic to a the "test.jp.json.___" key.
+# Submitting an MQTT message payload of
+#
+#	{ "size":69,"temp": 89.3, "gas": " 88", "name": "JP Mens" }
+#
+# will produce the following Carbon entries:
+#
+#	test.jp.json.gas 88.000000 1363169729
+#	test.jp.json.temp 89.300000 1363169729
+#	test.jp.json.size 69.000000 1363169729
+

+ 168 - 0
mqtt2graphite.py

@@ -0,0 +1,168 @@
+#!/usr/bin/env python2.7
+
+__author__ = "Jan-Piet Mens"
+__copyright__ = "Copyright (C) 2013 by Jan-Piet Mens"
+
+import mosquitto
+import ssl
+import os, sys
+import logging
+import time
+import socket
+import json
+import signal
+
+MQTT_HOST = os.environ.get('MQTT_HOST', 'localhost')
+CARBON_SERVER = '127.0.0.1'
+CARBON_PORT = 2003
+
+LOGFORMAT = '%(asctime)-15s %(message)s'
+
+DEBUG = 1
+if DEBUG:
+    logging.basicConfig(level=logging.DEBUG, format=LOGFORMAT)
+else:
+    logging.basicConfig(level=logging.INFO, format=LOGFORMAT)
+
+client_id = "MQTT2Graphite_%d-%s" % (os.getpid(), socket.getfqdn())
+
+def cleanup(signum, frame):
+    '''Disconnect cleanly on SIGTERM or SIGINT'''
+
+    mqttc.publish("/clients/" + client_id, "Offline")
+    mqttc.disconnect()
+    logging.info("Disconnected from broker; exiting on signal %d", signum)
+    sys.exit(signum)
+
+
+def is_number(s):
+    '''Test whether string contains a number (leading/traling white-space is ok)'''
+
+    try:
+        float(s)
+        return True
+    except ValueError:
+        return False
+
+
+def on_connect(mosq, userdata, rc):
+    logging.info("Connected to broker at %s as %s" % (MQTT_HOST, client_id))
+
+    mqttc.publish("/clients/" + client_id, "Online")
+
+    map = userdata['map']
+    for topic in map:
+        logging.debug("Subscribing to topic %s" % topic)
+        mqttc.subscribe(topic, 0)
+
+def on_message(mosq, userdata, msg):
+
+    sock = userdata['sock']
+    host = userdata['carbon_server']
+    port = userdata['carbon_port']
+    lines = []
+    now = int(time.time())
+
+    map = userdata['map']
+    # Find out how to handle the topic in this message: slurp through
+    # our map 
+    for t in map:
+        if mosquitto.topic_matches_sub(msg.topic, t):
+            # print "%s matches MAP(%s) => %s" % (msg.topic, t, map[t])
+
+            # Must we rename the received msg topic into a different
+            # name for Carbon? In any case, replace MQTT slashes (/)
+            # by Carbon periods (.)
+            (type, remap) = map[t]
+            if remap is None:
+                carbonkey = msg.topic.replace('/', '.')
+            else:
+                carbonkey = remap.replace('/', '.')
+            logging.debug("CARBONKEY is [%s]" % carbonkey)
+
+            if type == 'n':
+                '''Number: obtain a float from the payload'''
+                try:
+                    number = float(msg.payload)
+                    lines.append("%s %f %d" % (carbonkey, number, now))
+                except ValueError:
+                    return
+
+            elif type == 'j':
+                '''JSON: try and load the JSON string from payload and use
+                   subkeys to pass to Carbon'''
+                try:
+                    st = json.loads(msg.payload)
+                    for k in st:
+                        if is_number(st[k]):
+                            lines.append("%s.%s %f %d" % (carbonkey, k, float(st[k]), now))
+                except:
+                    return
+
+            else:
+                sys.stderr.write("Unknown mapping key [%s]\n", type)
+                sys.exit(2)
+
+            message = '\n'.join(lines) + '\n'
+            logging.debug("%s", message)
+
+            sock.sendto(message, (host, port))
+  
+def on_subscribe(mosq, userdata, mid, granted_qos):
+    pass
+
+def on_disconnect(mosq, userdata, rc):
+    if rc == 0:
+        logging.info("Clean disconnection")
+    else:
+        logging.info("Unexpected disconnect (rc %s); reconnecting in 5 seconds" % rc)
+        time.sleep(5)
+
+if __name__ == '__main__':
+    
+
+    logging.info("Starting %s" % client_id)
+    logging.info("INFO MODE")
+    logging.debug("DEBUG MODE")
+
+    map = {}
+    f = open('map')
+    for line in f.readlines():
+        line = line.rstrip()
+        if len(line) == 0 or line[0] == '#':
+            continue
+        remap = None
+        try:
+            type, topic, remap = line.split()
+        except ValueError:
+            type, topic = line.split()
+
+        map[topic] = (type, remap)
+
+    try:
+        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    except:
+        sys.stderr.write("Can't create UDP socket\n")
+        sys.exit(1)
+
+    userdata = {
+        'sock'      : sock,
+        'carbon_server' : CARBON_SERVER,
+        'carbon_port'   : CARBON_PORT,
+        'map'       : map,
+    }
+    mqttc = mosquitto.Mosquitto(client_id, clean_session=True, userdata=userdata)
+    mqttc.on_message = on_message
+    mqttc.on_connect = on_connect
+    mqttc.on_disconnect = on_disconnect
+    mqttc.on_subscribe = on_subscribe
+
+    mqttc.will_set("clients/" + client_id, payload="Adios!", qos=0, retain=False)
+
+    mqttc.connect(MQTT_HOST, 1883, 60)
+
+    signal.signal(signal.SIGTERM, cleanup)
+    signal.signal(signal.SIGINT, cleanup)
+
+    mqttc.loop_forever()
+