mqtt2graphite.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #!/usr/bin/env python2.7
  2. __author__ = "Jan-Piet Mens"
  3. __copyright__ = "Copyright (C) 2013 by Jan-Piet Mens"
  4. import paho.mqtt.client as paho
  5. import ssl
  6. import os, sys
  7. import logging
  8. import time
  9. import socket
  10. import json
  11. import signal
  12. MQTT_HOST = os.environ.get('MQTT_HOST', 'localhost')
  13. CARBON_SERVER = os.environ.get('CARBON_SERVER', '127.0.0.1')
  14. CARBON_PORT = 2003
  15. LOGFORMAT = '%(asctime)-15s %(message)s'
  16. DEBUG = os.environ.get('DEBUG', True)
  17. if DEBUG:
  18. logging.basicConfig(level=logging.DEBUG, format=LOGFORMAT)
  19. else:
  20. logging.basicConfig(level=logging.INFO, format=LOGFORMAT)
  21. client_id = "MQTT2Graphite_%d-%s" % (os.getpid(), socket.getfqdn())
  22. def cleanup(signum, frame):
  23. '''Disconnect cleanly on SIGTERM or SIGINT'''
  24. mqttc.publish("/clients/" + client_id, "Offline")
  25. mqttc.disconnect()
  26. logging.info("Disconnected from broker; exiting on signal %d", signum)
  27. sys.exit(signum)
  28. def is_number(s):
  29. '''Test whether string contains a number (leading/traling white-space is ok)'''
  30. try:
  31. float(s)
  32. return True
  33. except ValueError:
  34. return False
  35. def on_connect(mosq, userdata, rc):
  36. logging.info("Connected to broker at %s as %s" % (MQTT_HOST, client_id))
  37. mqttc.publish("/clients/" + client_id, "Online")
  38. map = userdata['map']
  39. for topic in map:
  40. logging.debug("Subscribing to topic %s" % topic)
  41. mqttc.subscribe(topic, 0)
  42. def on_message(mosq, userdata, msg):
  43. sock = userdata['sock']
  44. host = userdata['carbon_server']
  45. port = userdata['carbon_port']
  46. lines = []
  47. now = int(time.time())
  48. map = userdata['map']
  49. # Find out how to handle the topic in this message: slurp through
  50. # our map
  51. for t in map:
  52. if paho.topic_matches_sub(t, msg.topic):
  53. # print "%s matches MAP(%s) => %s" % (msg.topic, t, map[t])
  54. # Must we rename the received msg topic into a different
  55. # name for Carbon? In any case, replace MQTT slashes (/)
  56. # by Carbon periods (.)
  57. (type, remap) = map[t]
  58. if remap is None:
  59. carbonkey = msg.topic.replace('/', '.')
  60. else:
  61. carbonkey = remap.replace('/', '.')
  62. logging.debug("CARBONKEY is [%s]" % carbonkey)
  63. if type == 'n':
  64. '''Number: obtain a float from the payload'''
  65. try:
  66. number = float(msg.payload)
  67. lines.append("%s %f %d" % (carbonkey, number, now))
  68. except ValueError:
  69. logging.info("Topic %s contains non-numeric payload [%s]" %
  70. (msg.topic, msg.payload))
  71. return
  72. elif type == 'j':
  73. '''JSON: try and load the JSON string from payload and use
  74. subkeys to pass to Carbon'''
  75. try:
  76. st = json.loads(msg.payload)
  77. for k in st:
  78. if is_number(st[k]):
  79. lines.append("%s.%s %f %d" % (carbonkey, k, float(st[k]), now))
  80. except:
  81. logging.info("Topic %s contains non-JSON payload [%s]" %
  82. (msg.topic, msg.payload))
  83. return
  84. else:
  85. logging.info("Unknown mapping key [%s]", type)
  86. return
  87. message = '\n'.join(lines) + '\n'
  88. logging.debug("%s", message)
  89. sock.sendto(message, (host, port))
  90. def on_subscribe(mosq, userdata, mid, granted_qos):
  91. pass
  92. def on_disconnect(mosq, userdata, rc):
  93. if rc == 0:
  94. logging.info("Clean disconnection")
  95. else:
  96. logging.info("Unexpected disconnect (rc %s); reconnecting in 5 seconds" % rc)
  97. time.sleep(5)
  98. def main():
  99. logging.info("Starting %s" % client_id)
  100. logging.info("INFO MODE")
  101. logging.debug("DEBUG MODE")
  102. map = {}
  103. if len(sys.argv) > 1:
  104. map_file = sys.argv[1]
  105. else:
  106. map_file = 'map'
  107. f = open(map_file)
  108. for line in f.readlines():
  109. line = line.rstrip()
  110. if len(line) == 0 or line[0] == '#':
  111. continue
  112. remap = None
  113. try:
  114. type, topic, remap = line.split()
  115. except ValueError:
  116. type, topic = line.split()
  117. map[topic] = (type, remap)
  118. try:
  119. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  120. except:
  121. sys.stderr.write("Can't create UDP socket\n")
  122. sys.exit(1)
  123. userdata = {
  124. 'sock' : sock,
  125. 'carbon_server' : CARBON_SERVER,
  126. 'carbon_port' : CARBON_PORT,
  127. 'map' : map,
  128. }
  129. mqttc = paho.Client(client_id, clean_session=True, userdata=userdata)
  130. global mqttc
  131. mqttc.on_message = on_message
  132. mqttc.on_connect = on_connect
  133. mqttc.on_disconnect = on_disconnect
  134. mqttc.on_subscribe = on_subscribe
  135. mqttc.will_set("clients/" + client_id, payload="Adios!", qos=0, retain=False)
  136. mqttc.connect(MQTT_HOST, 1883, 60)
  137. signal.signal(signal.SIGTERM, cleanup)
  138. signal.signal(signal.SIGINT, cleanup)
  139. mqttc.loop_forever()
  140. if __name__ == '__main__':
  141. main()