mqtt2graphite.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. #!/usr/bin/env python2.7
  2. __author__ = "Jan-Piet Mens"
  3. __copyright__ = "Copyright (C) 2013 by Jan-Piet Mens"
  4. import paho.mqtt.client as paho
  5. import ssl
  6. import os, sys
  7. import logging
  8. import time
  9. import socket
  10. import json
  11. import signal
  12. MQTT_HOST = os.environ.get('MQTT_HOST', '127.0.0.1')
  13. MQTT_PORT = int(os.environ.get('MQTT_PORT', 1883))
  14. CARBON_SERVER = os.environ.get('CARBON_SERVER', '127.0.0.1')
  15. CARBON_PORT = int(os.environ.get('CARBON_PORT', 2003))
  16. LOGFORMAT = '%(asctime)-15s %(message)s'
  17. DEBUG = os.environ.get('DEBUG', True)
  18. if DEBUG:
  19. logging.basicConfig(level=logging.DEBUG, format=LOGFORMAT)
  20. else:
  21. logging.basicConfig(level=logging.INFO, format=LOGFORMAT)
  22. client_id = "MQTT2Graphite_%d-%s" % (os.getpid(), socket.getfqdn())
  23. def cleanup(signum, frame):
  24. '''Disconnect cleanly on SIGTERM or SIGINT'''
  25. mqttc.publish("/clients/" + client_id, "Offline")
  26. mqttc.disconnect()
  27. logging.info("Disconnected from broker; exiting on signal %d", signum)
  28. sys.exit(signum)
  29. def is_number(s):
  30. '''Test whether string contains a number (leading/traling white-space is ok)'''
  31. try:
  32. float(s)
  33. return True
  34. except ValueError:
  35. return False
  36. def on_connect(mosq, userdata, flags, rc):
  37. logging.info("Connected to broker at %s as %s" % (MQTT_HOST, client_id))
  38. mqttc.publish("/clients/" + client_id, "Online")
  39. map = userdata['map']
  40. for topic in map:
  41. logging.debug("Subscribing to topic %s" % topic)
  42. mqttc.subscribe(topic, 0)
  43. def on_message(mosq, userdata, msg):
  44. sock = userdata['sock']
  45. lines = []
  46. now = int(time.time())
  47. map = userdata['map']
  48. # Find out how to handle the topic in this message: slurp through
  49. # our map
  50. for t in map:
  51. if paho.topic_matches_sub(t, msg.topic):
  52. # print "%s matches MAP(%s) => %s" % (msg.topic, t, map[t])
  53. # Must we rename the received msg topic into a different
  54. # name for Carbon? In any case, replace MQTT slashes (/)
  55. # by Carbon periods (.)
  56. (type, remap) = map[t]
  57. if remap is None:
  58. carbonkey = msg.topic.replace('/', '.')
  59. else:
  60. if '#' in remap:
  61. remap = remap.replace('#', msg.topic.replace('/', '.'))
  62. carbonkey = remap.replace('/', '.')
  63. logging.debug("CARBONKEY is [%s]" % carbonkey)
  64. if type == 'n':
  65. '''Number: obtain a float from the payload'''
  66. try:
  67. number = float(msg.payload)
  68. lines.append("%s %f %d" % (carbonkey, number, now))
  69. except ValueError:
  70. logging.info("Topic %s contains non-numeric payload [%s]" %
  71. (msg.topic, msg.payload))
  72. return
  73. elif type == 'j':
  74. '''JSON: try and load the JSON string from payload and use
  75. subkeys to pass to Carbon'''
  76. try:
  77. st = json.loads(msg.payload)
  78. for k in st:
  79. if is_number(st[k]):
  80. lines.append("%s.%s %f %d" % (carbonkey, k, float(st[k]), now))
  81. except:
  82. logging.info("Topic %s contains non-JSON payload [%s]" %
  83. (msg.topic, msg.payload))
  84. return
  85. else:
  86. logging.info("Unknown mapping key [%s]", type)
  87. return
  88. message = '\n'.join(lines) + '\n'
  89. logging.debug("%s", message)
  90. sock.sendto(message, (CARBON_SERVER, CARBON_PORT))
  91. def on_subscribe(mosq, userdata, mid, granted_qos):
  92. pass
  93. def on_disconnect(mosq, userdata, rc):
  94. if rc == 0:
  95. logging.info("Clean disconnection")
  96. else:
  97. logging.info("Unexpected disconnect (rc %s); reconnecting in 5 seconds" % rc)
  98. time.sleep(5)
  99. def main():
  100. logging.info("Starting %s" % client_id)
  101. logging.info("INFO MODE")
  102. logging.debug("DEBUG MODE")
  103. map = {}
  104. if len(sys.argv) > 1:
  105. map_file = sys.argv[1]
  106. else:
  107. map_file = 'map'
  108. f = open(map_file)
  109. for line in f.readlines():
  110. line = line.rstrip()
  111. if len(line) == 0 or line[0] == '#':
  112. continue
  113. remap = None
  114. try:
  115. type, topic, remap = line.split()
  116. except ValueError:
  117. type, topic = line.split()
  118. map[topic] = (type, remap)
  119. try:
  120. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  121. except:
  122. sys.stderr.write("Can't create UDP socket\n")
  123. sys.exit(1)
  124. userdata = {
  125. 'sock' : sock,
  126. 'map' : map,
  127. }
  128. global mqttc
  129. mqttc = paho.Client(client_id, clean_session=True, userdata=userdata)
  130. mqttc.on_message = on_message
  131. mqttc.on_connect = on_connect
  132. mqttc.on_disconnect = on_disconnect
  133. mqttc.on_subscribe = on_subscribe
  134. mqttc.will_set("clients/" + client_id, payload="Adios!", qos=0, retain=False)
  135. mqttc.connect(MQTT_HOST, MQTT_PORT, 60)
  136. signal.signal(signal.SIGTERM, cleanup)
  137. signal.signal(signal.SIGINT, cleanup)
  138. mqttc.loop_forever()
  139. if __name__ == '__main__':
  140. main()