mqtt2graphite.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #!/usr/bin/env python2.7
  2. __author__ = "Jan-Piet Mens"
  3. __copyright__ = "Copyright (C) 2013 by Jan-Piet Mens"
  4. import mosquitto
  5. import ssl
  6. import os, sys
  7. import logging
  8. import time
  9. import socket
  10. import json
  11. import signal
  12. MQTT_HOST = os.environ.get('MQTT_HOST', 'localhost')
  13. CARBON_SERVER = '127.0.0.1'
  14. CARBON_PORT = 2003
  15. LOGFORMAT = '%(asctime)-15s %(message)s'
  16. DEBUG = 1
  17. if DEBUG:
  18. logging.basicConfig(level=logging.DEBUG, format=LOGFORMAT)
  19. else:
  20. logging.basicConfig(level=logging.INFO, format=LOGFORMAT)
  21. client_id = "MQTT2Graphite_%d-%s" % (os.getpid(), socket.getfqdn())
  22. def cleanup(signum, frame):
  23. '''Disconnect cleanly on SIGTERM or SIGINT'''
  24. mqttc.publish("/clients/" + client_id, "Offline")
  25. mqttc.disconnect()
  26. logging.info("Disconnected from broker; exiting on signal %d", signum)
  27. sys.exit(signum)
  28. def is_number(s):
  29. '''Test whether string contains a number (leading/traling white-space is ok)'''
  30. try:
  31. float(s)
  32. return True
  33. except ValueError:
  34. return False
  35. def on_connect(mosq, userdata, rc):
  36. logging.info("Connected to broker at %s as %s" % (MQTT_HOST, client_id))
  37. mqttc.publish("/clients/" + client_id, "Online")
  38. map = userdata['map']
  39. for topic in map:
  40. logging.debug("Subscribing to topic %s" % topic)
  41. mqttc.subscribe(topic, 0)
  42. def on_message(mosq, userdata, msg):
  43. sock = userdata['sock']
  44. host = userdata['carbon_server']
  45. port = userdata['carbon_port']
  46. lines = []
  47. now = int(time.time())
  48. map = userdata['map']
  49. # Find out how to handle the topic in this message: slurp through
  50. # our map
  51. for t in map:
  52. if mosquitto.topic_matches_sub(msg.topic, t):
  53. # print "%s matches MAP(%s) => %s" % (msg.topic, t, map[t])
  54. # Must we rename the received msg topic into a different
  55. # name for Carbon? In any case, replace MQTT slashes (/)
  56. # by Carbon periods (.)
  57. (type, remap) = map[t]
  58. if remap is None:
  59. carbonkey = msg.topic.replace('/', '.')
  60. else:
  61. carbonkey = remap.replace('/', '.')
  62. logging.debug("CARBONKEY is [%s]" % carbonkey)
  63. if type == 'n':
  64. '''Number: obtain a float from the payload'''
  65. try:
  66. number = float(msg.payload)
  67. lines.append("%s %f %d" % (carbonkey, number, now))
  68. except ValueError:
  69. return
  70. elif type == 'j':
  71. '''JSON: try and load the JSON string from payload and use
  72. subkeys to pass to Carbon'''
  73. try:
  74. st = json.loads(msg.payload)
  75. for k in st:
  76. if is_number(st[k]):
  77. lines.append("%s.%s %f %d" % (carbonkey, k, float(st[k]), now))
  78. except:
  79. return
  80. else:
  81. sys.stderr.write("Unknown mapping key [%s]\n", type)
  82. sys.exit(2)
  83. message = '\n'.join(lines) + '\n'
  84. logging.debug("%s", message)
  85. sock.sendto(message, (host, port))
  86. def on_subscribe(mosq, userdata, mid, granted_qos):
  87. pass
  88. def on_disconnect(mosq, userdata, rc):
  89. if rc == 0:
  90. logging.info("Clean disconnection")
  91. else:
  92. logging.info("Unexpected disconnect (rc %s); reconnecting in 5 seconds" % rc)
  93. time.sleep(5)
  94. if __name__ == '__main__':
  95. logging.info("Starting %s" % client_id)
  96. logging.info("INFO MODE")
  97. logging.debug("DEBUG MODE")
  98. map = {}
  99. f = open('map')
  100. for line in f.readlines():
  101. line = line.rstrip()
  102. if len(line) == 0 or line[0] == '#':
  103. continue
  104. remap = None
  105. try:
  106. type, topic, remap = line.split()
  107. except ValueError:
  108. type, topic = line.split()
  109. map[topic] = (type, remap)
  110. try:
  111. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  112. except:
  113. sys.stderr.write("Can't create UDP socket\n")
  114. sys.exit(1)
  115. userdata = {
  116. 'sock' : sock,
  117. 'carbon_server' : CARBON_SERVER,
  118. 'carbon_port' : CARBON_PORT,
  119. 'map' : map,
  120. }
  121. mqttc = mosquitto.Mosquitto(client_id, clean_session=True, userdata=userdata)
  122. mqttc.on_message = on_message
  123. mqttc.on_connect = on_connect
  124. mqttc.on_disconnect = on_disconnect
  125. mqttc.on_subscribe = on_subscribe
  126. mqttc.will_set("clients/" + client_id, payload="Adios!", qos=0, retain=False)
  127. mqttc.connect(MQTT_HOST, 1883, 60)
  128. signal.signal(signal.SIGTERM, cleanup)
  129. signal.signal(signal.SIGINT, cleanup)
  130. mqttc.loop_forever()