mqtt2graphite.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. #!/usr/bin/env python2.7
  2. __author__ = "Jan-Piet Mens"
  3. __copyright__ = "Copyright (C) 2013 by Jan-Piet Mens"
  4. import paho.mqtt.client as paho
  5. import ssl
  6. import os, sys
  7. import logging
  8. import time
  9. import socket
  10. import json
  11. import signal
  12. MQTT_HOST = os.environ.get('MQTT_HOST', '127.0.0.1')
  13. MQTT_PORT = int(os.environ.get('MQTT_PORT', 1883))
  14. CARBON_SERVER = os.environ.get('CARBON_SERVER', '127.0.0.1')
  15. CARBON_PORT = int(os.environ.get('CARBON_PORT', 2003))
  16. LOGFORMAT = '%(asctime)-15s %(message)s'
  17. DEBUG = os.environ.get('DEBUG', True)
  18. if DEBUG:
  19. logging.basicConfig(level=logging.DEBUG, format=LOGFORMAT)
  20. else:
  21. logging.basicConfig(level=logging.INFO, format=LOGFORMAT)
  22. client_id = "MQTT2Graphite_%d-%s" % (os.getpid(), socket.getfqdn())
  23. def cleanup(signum, frame):
  24. '''Disconnect cleanly on SIGTERM or SIGINT'''
  25. mqttc.publish("/clients/" + client_id, "Offline")
  26. mqttc.disconnect()
  27. logging.info("Disconnected from broker; exiting on signal %d", signum)
  28. sys.exit(signum)
  29. def is_number(s):
  30. '''Test whether string contains a number (leading/traling white-space is ok)'''
  31. try:
  32. float(s)
  33. return True
  34. except ValueError:
  35. return False
  36. def on_connect(mosq, userdata, flags, rc):
  37. logging.info("Connected to broker at %s as %s" % (MQTT_HOST, client_id))
  38. mqttc.publish("/clients/" + client_id, "Online")
  39. map = userdata['map']
  40. for topic in map:
  41. logging.debug("Subscribing to topic %s" % topic)
  42. mqttc.subscribe(topic, 0)
  43. def on_message(mosq, userdata, msg):
  44. sock = userdata['sock']
  45. lines = []
  46. now = int(time.time())
  47. map = userdata['map']
  48. # Find out how to handle the topic in this message: slurp through
  49. # our map
  50. for t in map:
  51. if paho.topic_matches_sub(t, msg.topic):
  52. # print "%s matches MAP(%s) => %s" % (msg.topic, t, map[t])
  53. # Must we rename the received msg topic into a different
  54. # name for Carbon? In any case, replace MQTT slashes (/)
  55. # by Carbon periods (.)
  56. (type, remap) = map[t]
  57. if remap is None:
  58. carbonkey = msg.topic.replace('/', '.')
  59. else:
  60. carbonkey = remap.replace('/', '.')
  61. logging.debug("CARBONKEY is [%s]" % carbonkey)
  62. if type == 'n':
  63. '''Number: obtain a float from the payload'''
  64. try:
  65. number = float(msg.payload)
  66. lines.append("%s %f %d" % (carbonkey, number, now))
  67. except ValueError:
  68. logging.info("Topic %s contains non-numeric payload [%s]" %
  69. (msg.topic, msg.payload))
  70. return
  71. elif type == 'j':
  72. '''JSON: try and load the JSON string from payload and use
  73. subkeys to pass to Carbon'''
  74. try:
  75. st = json.loads(msg.payload)
  76. for k in st:
  77. if is_number(st[k]):
  78. lines.append("%s.%s %f %d" % (carbonkey, k, float(st[k]), now))
  79. except:
  80. logging.info("Topic %s contains non-JSON payload [%s]" %
  81. (msg.topic, msg.payload))
  82. return
  83. else:
  84. logging.info("Unknown mapping key [%s]", type)
  85. return
  86. message = '\n'.join(lines) + '\n'
  87. logging.debug("%s", message)
  88. sock.sendto(message, (CARBON_SERVER, CARBON_PORT))
  89. def on_subscribe(mosq, userdata, mid, granted_qos):
  90. pass
  91. def on_disconnect(mosq, userdata, rc):
  92. if rc == 0:
  93. logging.info("Clean disconnection")
  94. else:
  95. logging.info("Unexpected disconnect (rc %s); reconnecting in 5 seconds" % rc)
  96. time.sleep(5)
  97. def main():
  98. logging.info("Starting %s" % client_id)
  99. logging.info("INFO MODE")
  100. logging.debug("DEBUG MODE")
  101. map = {}
  102. if len(sys.argv) > 1:
  103. map_file = sys.argv[1]
  104. else:
  105. map_file = 'map'
  106. f = open(map_file)
  107. for line in f.readlines():
  108. line = line.rstrip()
  109. if len(line) == 0 or line[0] == '#':
  110. continue
  111. remap = None
  112. try:
  113. type, topic, remap = line.split()
  114. except ValueError:
  115. type, topic = line.split()
  116. map[topic] = (type, remap)
  117. try:
  118. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  119. except:
  120. sys.stderr.write("Can't create UDP socket\n")
  121. sys.exit(1)
  122. userdata = {
  123. 'sock' : sock,
  124. 'map' : map,
  125. }
  126. global mqttc
  127. mqttc = paho.Client(client_id, clean_session=True, userdata=userdata)
  128. mqttc.on_message = on_message
  129. mqttc.on_connect = on_connect
  130. mqttc.on_disconnect = on_disconnect
  131. mqttc.on_subscribe = on_subscribe
  132. mqttc.will_set("clients/" + client_id, payload="Adios!", qos=0, retain=False)
  133. mqttc.connect(MQTT_HOST, MQTT_PORT, 60)
  134. signal.signal(signal.SIGTERM, cleanup)
  135. signal.signal(signal.SIGINT, cleanup)
  136. mqttc.loop_forever()
  137. if __name__ == '__main__':
  138. main()