mqtt2graphite.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. #!/usr/bin/env python2.7
  2. __author__ = "Jan-Piet Mens"
  3. __copyright__ = "Copyright (C) 2013 by Jan-Piet Mens"
  4. import mosquitto
  5. import ssl
  6. import os, sys
  7. import logging
  8. import time
  9. import socket
  10. import json
  11. import signal
  12. MQTT_HOST = os.environ.get('MQTT_HOST', 'localhost')
  13. CARBON_SERVER = '127.0.0.1'
  14. CARBON_PORT = 2003
  15. LOGFORMAT = '%(asctime)-15s %(message)s'
  16. DEBUG = 1
  17. if DEBUG:
  18. logging.basicConfig(level=logging.DEBUG, format=LOGFORMAT)
  19. else:
  20. logging.basicConfig(level=logging.INFO, format=LOGFORMAT)
  21. client_id = "MQTT2Graphite_%d-%s" % (os.getpid(), socket.getfqdn())
  22. def cleanup(signum, frame):
  23. '''Disconnect cleanly on SIGTERM or SIGINT'''
  24. mqttc.publish("/clients/" + client_id, "Offline")
  25. mqttc.disconnect()
  26. logging.info("Disconnected from broker; exiting on signal %d", signum)
  27. sys.exit(signum)
  28. def is_number(s):
  29. '''Test whether string contains a number (leading/traling white-space is ok)'''
  30. try:
  31. float(s)
  32. return True
  33. except ValueError:
  34. return False
  35. def on_connect(mosq, userdata, rc):
  36. logging.info("Connected to broker at %s as %s" % (MQTT_HOST, client_id))
  37. mqttc.publish("/clients/" + client_id, "Online")
  38. map = userdata['map']
  39. for topic in map:
  40. logging.debug("Subscribing to topic %s" % topic)
  41. mqttc.subscribe(topic, 0)
  42. def on_message(mosq, userdata, msg):
  43. sock = userdata['sock']
  44. host = userdata['carbon_server']
  45. port = userdata['carbon_port']
  46. lines = []
  47. now = int(time.time())
  48. map = userdata['map']
  49. # Find out how to handle the topic in this message: slurp through
  50. # our map
  51. for t in map:
  52. if mosquitto.topic_matches_sub(msg.topic, t):
  53. # print "%s matches MAP(%s) => %s" % (msg.topic, t, map[t])
  54. # Must we rename the received msg topic into a different
  55. # name for Carbon? In any case, replace MQTT slashes (/)
  56. # by Carbon periods (.)
  57. (type, remap) = map[t]
  58. if remap is None:
  59. carbonkey = msg.topic.replace('/', '.')
  60. else:
  61. carbonkey = remap.replace('/', '.')
  62. logging.debug("CARBONKEY is [%s]" % carbonkey)
  63. if type == 'n':
  64. '''Number: obtain a float from the payload'''
  65. try:
  66. number = float(msg.payload)
  67. lines.append("%s %f %d" % (carbonkey, number, now))
  68. except ValueError:
  69. logging.info("Topic %s contains non-numeric payload [%s]" %
  70. (msg.topic, msg.payload))
  71. return
  72. elif type == 'j':
  73. '''JSON: try and load the JSON string from payload and use
  74. subkeys to pass to Carbon'''
  75. try:
  76. st = json.loads(msg.payload)
  77. for k in st:
  78. if is_number(st[k]):
  79. lines.append("%s.%s %f %d" % (carbonkey, k, float(st[k]), now))
  80. except:
  81. logging.info("Topic %s contains non-JSON payload [%s]" %
  82. (msg.topic, msg.payload))
  83. return
  84. else:
  85. logging.info("Unknown mapping key [%s]", type)
  86. return
  87. message = '\n'.join(lines) + '\n'
  88. logging.debug("%s", message)
  89. sock.sendto(message, (host, port))
  90. def on_subscribe(mosq, userdata, mid, granted_qos):
  91. pass
  92. def on_disconnect(mosq, userdata, rc):
  93. if rc == 0:
  94. logging.info("Clean disconnection")
  95. else:
  96. logging.info("Unexpected disconnect (rc %s); reconnecting in 5 seconds" % rc)
  97. time.sleep(5)
  98. if __name__ == '__main__':
  99. logging.info("Starting %s" % client_id)
  100. logging.info("INFO MODE")
  101. logging.debug("DEBUG MODE")
  102. map = {}
  103. f = open('map')
  104. for line in f.readlines():
  105. line = line.rstrip()
  106. if len(line) == 0 or line[0] == '#':
  107. continue
  108. remap = None
  109. try:
  110. type, topic, remap = line.split()
  111. except ValueError:
  112. type, topic = line.split()
  113. map[topic] = (type, remap)
  114. try:
  115. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  116. except:
  117. sys.stderr.write("Can't create UDP socket\n")
  118. sys.exit(1)
  119. userdata = {
  120. 'sock' : sock,
  121. 'carbon_server' : CARBON_SERVER,
  122. 'carbon_port' : CARBON_PORT,
  123. 'map' : map,
  124. }
  125. mqttc = mosquitto.Mosquitto(client_id, clean_session=True, userdata=userdata)
  126. mqttc.on_message = on_message
  127. mqttc.on_connect = on_connect
  128. mqttc.on_disconnect = on_disconnect
  129. mqttc.on_subscribe = on_subscribe
  130. mqttc.will_set("clients/" + client_id, payload="Adios!", qos=0, retain=False)
  131. mqttc.connect(MQTT_HOST, 1883, 60)
  132. signal.signal(signal.SIGTERM, cleanup)
  133. signal.signal(signal.SIGINT, cleanup)
  134. mqttc.loop_forever()