mqtt2graphite.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. #!/usr/bin/env python2.7
  2. __author__ = "Jan-Piet Mens"
  3. __copyright__ = "Copyright (C) 2013 by Jan-Piet Mens"
  4. import paho.mqtt.client as paho
  5. import ssl
  6. import os, sys
  7. import logging
  8. import time
  9. import socket
  10. import json
  11. import signal
  12. MQTT_HOST = os.environ.get('MQTT_HOST', '192.168.0.4')
  13. MQTT_PORT = int(os.environ.get('MQTT_PORT', 1883))
  14. CARBON_SERVER = os.environ.get('CARBON_SERVER', '192.168.0.4')
  15. CARBON_PORT = int(os.environ.get('CARBON_PORT', 2003))
  16. LOGFORMAT = '%(asctime)-15s %(message)s'
  17. DEBUG = os.environ.get('DEBUG', True)
  18. if DEBUG:
  19. logging.basicConfig(level=logging.DEBUG, format=LOGFORMAT)
  20. else:
  21. logging.basicConfig(level=logging.INFO, format=LOGFORMAT)
  22. client_id = "MQTT2Graphite_%d-%s" % (os.getpid(), socket.getfqdn())
  23. def cleanup(signum, frame):
  24. '''Disconnect cleanly on SIGTERM or SIGINT'''
  25. mqttc.publish("/clients/" + client_id, "Offline")
  26. mqttc.disconnect()
  27. logging.info("Disconnected from broker; exiting on signal %d", signum)
  28. sys.exit(signum)
  29. def is_number(s):
  30. '''Test whether string contains a number (leading/traling white-space is ok)'''
  31. try:
  32. float(s)
  33. return True
  34. except ValueError:
  35. return False
  36. def on_connect(mosq, userdata, flags, rc):
  37. logging.info("Connected to broker at %s as %s" % (MQTT_HOST, client_id))
  38. mqttc.publish("/clients/" + client_id, "Online")
  39. map = userdata['map']
  40. for topic in map:
  41. logging.debug("Subscribing to topic %s" % topic)
  42. mqttc.subscribe(topic, 0)
  43. def on_message(mosq, userdata, msg):
  44. sock = userdata['sock']
  45. lines = []
  46. now = int(time.time())
  47. map = userdata['map']
  48. # Find out how to handle the topic in this message: slurp through
  49. # our map
  50. for t in map:
  51. if paho.topic_matches_sub(t, msg.topic):
  52. # print "%s matches MAP(%s) => %s" % (msg.topic, t, map[t])
  53. # Must we rename the received msg topic into a different
  54. # name for Carbon? In any case, replace MQTT slashes (/)
  55. # by Carbon periods (.)
  56. (type, remap) = map[t]
  57. if remap is None:
  58. carbonkey = msg.topic.replace('/', '.')
  59. else:
  60. if '#' in remap:
  61. remap = remap.replace('#', msg.topic.replace('/', '.'))
  62. carbonkey = remap.replace('/', '.')
  63. logging.debug("CARBONKEY is [%s]" % carbonkey)
  64. if type == 'n':
  65. '''Number: obtain a float from the payload'''
  66. try:
  67. number = float(msg.payload)
  68. lines.append("%s %f %d" % (carbonkey, number, now))
  69. except ValueError:
  70. logging.info("Topic %s contains non-numeric payload [%s]" %
  71. (msg.topic, msg.payload))
  72. return
  73. elif type == 'j':
  74. '''JSON: try and load the JSON string from payload and use
  75. subkeys to pass to Carbon'''
  76. try:
  77. st = json.loads(msg.payload)
  78. for k in st:
  79. if is_number(st[k]):
  80. lines.append("%s.%s %f %d" % (carbonkey, k, float(st[k]), now))
  81. except:
  82. logging.info("Topic %s contains non-JSON payload [%s]" %
  83. (msg.topic, msg.payload))
  84. return
  85. elif type == 's':
  86. '''JSON: try and load the JSON string from payload and use
  87. subkeys to pass to Carbon'''
  88. try:
  89. st = json.loads(msg.payload)
  90. print st['AM2301']
  91. for k in st['AM2301']:
  92. if is_number(st['AM2301'][k]):
  93. lines.append("%s.%s %f %d" % (carbonkey, k, float(st['AM2301'][k]), now))
  94. except:
  95. logging.info("Topic %s contains non-JSON payload [%s]" %
  96. (msg.topic, msg.payload))
  97. return
  98. elif type == 't':
  99. '''JSON: try and load the JSON string from payload and use
  100. subkeys to pass to Carbon'''
  101. try:
  102. st = json.loads(msg.payload)
  103. print st['DHT11']
  104. for k in st['DHT11']:
  105. if is_number(st['DHT11'][k]):
  106. lines.append("%s.%s %f %d" % (carbonkey, k, float(st['DHT11'][k]), now))
  107. except:
  108. logging.info("Topic %s contains non-JSON payload [%s]" %
  109. (msg.topic, msg.payload))
  110. return
  111. elif type == 'u':
  112. '''JSON: try and load the JSON string from payload and use
  113. subkeys to pass to Carbon'''
  114. try:
  115. st = json.loads(msg.payload)
  116. print st['SHT3X-0x45']
  117. for k in st['SHT3X-0x45']:
  118. if is_number(st['SHT3X-0x45'][k]):
  119. lines.append("%s.%s %f %d" % (carbonkey, k, float(st['SHT3X-0x45'][k]), now))
  120. except:
  121. logging.info("Topic %s contains non-JSON payload [%s]" %
  122. (msg.topic, msg.payload))
  123. return
  124. else:
  125. logging.info("Unknown mapping key [%s]", type)
  126. return
  127. message = '\n'.join(lines) + '\n'
  128. logging.debug("%s", message)
  129. sock.sendto(message, (CARBON_SERVER, CARBON_PORT))
  130. def on_subscribe(mosq, userdata, mid, granted_qos):
  131. pass
  132. def on_disconnect(mosq, userdata, rc):
  133. if rc == 0:
  134. logging.info("Clean disconnection")
  135. else:
  136. logging.info("Unexpected disconnect (rc %s); reconnecting in 5 seconds" % rc)
  137. time.sleep(5)
  138. def main():
  139. logging.info("Starting %s" % client_id)
  140. logging.info("INFO MODE")
  141. logging.debug("DEBUG MODE")
  142. map = {}
  143. if len(sys.argv) > 1:
  144. map_file = sys.argv[1]
  145. else:
  146. map_file = 'map'
  147. f = open(map_file)
  148. for line in f.readlines():
  149. line = line.rstrip()
  150. if len(line) == 0 or line[0] == '#':
  151. continue
  152. remap = None
  153. try:
  154. type, topic, remap = line.split()
  155. except ValueError:
  156. type, topic = line.split()
  157. map[topic] = (type, remap)
  158. try:
  159. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  160. except:
  161. sys.stderr.write("Can't create UDP socket\n")
  162. sys.exit(1)
  163. userdata = {
  164. 'sock' : sock,
  165. 'map' : map,
  166. }
  167. global mqttc
  168. mqttc = paho.Client(client_id, clean_session=True, userdata=userdata)
  169. mqttc.on_message = on_message
  170. mqttc.on_connect = on_connect
  171. mqttc.on_disconnect = on_disconnect
  172. mqttc.on_subscribe = on_subscribe
  173. mqttc.will_set("clients/" + client_id, payload="Adios!", qos=0, retain=False)
  174. mqttc.connect(MQTT_HOST, MQTT_PORT, 60)
  175. signal.signal(signal.SIGTERM, cleanup)
  176. signal.signal(signal.SIGINT, cleanup)
  177. mqttc.loop_forever()
  178. if __name__ == '__main__':
  179. main()