I have been on a journey to explore Azure IoT and push the thousands of events that flow through my local MQTT broker (Mosquitto) in to Azure IOT. From direct connection to Azure IOT-Hub (via MQTT and SAS tokens) through to Azure IOT-Edge running locally with MQTT. I have been able to achieve my goals with varying levels of success, but have a few concerns on the approaches I have tried thus far.
- Direct-Connection to Azure-IOT Hub introduces latency to the cloud.
- Authentication, from SAS tokens to X509 certificates, its not anonymous and some of my tiny devices (Tasmota) dont bode well.
- Topic structure, it is defined (devices/{DeviceID}/messsages/events/) and not free form. It means reconfiguration, which isn’t hard, but a lot of friction.
Channeling my inner Steve Balmer (developer, developer, developer), it’s time to build. My goals for building a solution
- No reconfiguration of any of my MQTT devices (Home Assistant, PLC, Arduino Mega 2560, ~75 Tasmota devices).
- Bridge my existing MQTT broker (Mosquitto) in to Azure IOT.
- Must run on aarch64 architecture.
Pretty lofty goals, you may even say I am being lazy, but the reality is I want a low friction away to derive operational intelligence from the many thousands of events each day (read below, its over 10K per day!)
And for that we need to get our hands dirty, write some code and use SDK’s.
What we are going to build
To overcome, the limitations described above we are going to build an application in Python using the Python MQTT library Paho and the Azure Python IOT SDK. Lets quickly talk about both of these.
Paho MQTT
Paho is a Python client class which enable applications to connect to an MQTT broker to publish messages, to subscribe to topics and receive published messages. It also provides some helper functions to make publishing one off messages to an MQTT server very straightforward. This is what we will be using to listen to messages on our Mosquitto broker. For examples and more you can more information about the Paho MQTT module on the pypi.org website.
To install Paho, you can use PIP.
pip install paho-mqtt
and can leverage Paho with
import paho.mqtt.client as mqtt
Azure IOT SDK for Python
Once the messages have been read by Paho from Mosquitto we need to get these in to Azure IOT. The Azure IoT SDKs for Python enables us to do away with MQTT and speak directly to the service in Python. The SDK takes care of ‘Authentication’, ‘Send device-to-cloud message’, ‘Receive cloud-to-device messages’, ‘Device twins’, ‘Direct methods’, ‘Connection status and error reporting’, ‘Retry policies’ and ‘Upload file to blob’.
A lot of the heavy lifting I need is being performed by this SDK for us. To install Azure IoT SDK for Python you can use PIP. For code examples and more you can find more information about this device module on the pypi.org website.
pip install azure-iot-device
and can leverage Azure IOT SDK for Python
from azure.iot.device import IoTHubDeviceClient
Lets write some code.
Code Summary
See the steps below as I tease out this solution or my GitHub repo for the full Python script. To give you a better understanding on how this works I will break it down in to the logical steps below required to receive messages from Mosquitto over MQTT using Paho and to then re-publish them in to Azure IoT-Hub using the Azure IoT SDK for Python.
Step 1 – Import Modules
We need to use modules, mainly Paho and Azure IoT to provide additional functionality
import paho.mqtt.client as mqtt
import os
import asyncio
import uuid
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import Message
from datetime import datetime
Step 2 – Connect To Mosquitto & Subscribe To MQTT Topics
After declaring our modules we need to connect to our MQTT broker, we will do this with a function (on_connect)
client = mqtt.Client(MQTTClientName)
client.on_connect = on_connect
def on_connect(client, userdata, flags, rc): # The callback for when the client connects to the broker
print(str(datetime.now()) + " | Connecting to MQTT Broker : " + MQTTBrokerIP)
print(str(datetime.now()) + " | Connected with result code {0}".format(str(rc)))
print(str(datetime.now()) + " | We are connected!")
print()
print(str(datetime.now()) + " | Subscribing to MQTT Topics")
print(str(datetime.now()) + " | Subscribing to " + MQTTTopicSubscribe)
client.subscribe(MQTTTopicSubscribe)
print()
After connecting, we need to tell our MQTT broker what topics we want to subscribe to. This way we can be more precise on what we want to replicate in to Azure. We can use MQTT topic filters to do this. Initially I started with a # but decided to use a single level wildcard +.
MQTTTopicSubscribe = "stat/+/POWER" #MQTT Topic Filter
Plus sign (+): It is a single level wildcard that matches any name for a specific topic level. We can use this wildcard instead of specifying a name for any topic level in the topic filter.
Understanding wildcards | MQTT Essentials – A Lightweight IoT Protocol (packtpub.com)
Hash (#): It is a multi level wildcard that we can use only at the end of the topic filter, as the last level and matches any topic whose first levels are the same as the topic levels specified at the left-hand side of the # symbol.
Step 3 – Listen For Messages
We have now subscribed to MQTT topics and we need to listen and act on these incoming messages. I am taking the MQTT Topic and MQTT Payload, and passing these in to the my python Azure function (not an Azure Function ;)) which will push the payload in to Azure.
client.on_message = on_message
def on_message(client, userdata, msg): # The callback for when a PUBLISH message is received from the server.
global mqtt_topic
mqtt_topic = msg.topic
global mqtt_payload
mqtt_payload = str(msg.payload)
print(str(datetime.now()) + " | Message received")
print(str(datetime.now()) + " | MQTT Topic and Payload: " + msg.topic + " " + str(msg.payload)[2:][:-1]) # Print a received msg
asyncio.run(azure())
Step 4 – Send Messages To Azure
With the MQTT Topic and Payload we can now push these messages in to Azure. I am sending these as a JSON object. I have had to massage the MQTT payload as my PLC is adding a few extra values I dont need.
async def azure():
# Create instance of the device client using the connection string
device_client = IoTHubDeviceClient.create_from_connection_string(AzureIOTHub_conn_str)
# Connect the device client.
await device_client.connect()
print(str(datetime.now()) + " | Async connection established to Azure IOT")
# Send a single message
print(str(datetime.now()) + " | Sending message to Azure IOT Hub")
msg = Message("{ \"DateTime\": \"" + str(datetime.now()) + "\", \"MQTT Topic\": \"" + mqtt_topic + "\", \"Payload\": \"" + mqtt_payload[2:][:-1] + "\" }")
msg.message_id = uuid.uuid4()
msg.content_encoding = "utf-8"
msg.content_type = "application/json"
await device_client.send_message(msg)
print(str(datetime.now()) + " | Message sent, tearing down Azure IOT Hub connection")
print()
# Finally, shut down the client
await device_client.shutdown()
Pulling It All Together
Here is a complete copy of the above, plus a bit more. Assuming you have installed Paho and Azure IOT installed via PIP. You could cut and paste the below or clone my GitHub repository.
import paho.mqtt.client as mqtt
import os
import asyncio
import uuid
from azure.iot.device.aio import IoTHubDeviceClient
from azure.iot.device import Message
from datetime import datetime
# -----------------------------------------------------------------------
# EDIT BELOW THIS LINE
ScriptVersion = "1.0"
ModifiedDate = "Monday 15, November 2021"
MQTTBrokerIP = "10.0.0.200" #IP Address of your MQTT Broker
MQTTTopicSubscribe = "stat/+/POWER" #MQTT Topic Filter
MQTTClientName = "RaspiPI4" #Used to identify the device to your MQTT Broker
AzureIOTHub_conn_str = "********************************************************" #Azure IOT Hub Connection String
# EDIT ABOVE THIS LINE
# -----------------------------------------------------------------------
def on_connect(client, userdata, flags, rc): # The callback for when the client connects to the broker
print(str(datetime.now()) + " | Connecting to MQTT Broker : " + MQTTBrokerIP)
print(str(datetime.now()) + " | Connected with result code {0}".format(str(rc)))
print(str(datetime.now()) + " | We are connected!")
print()
print(str(datetime.now()) + " | Subscribing to MQTT Topics")
print(str(datetime.now()) + " | Subscribing to " + MQTTTopicSubscribe)
client.subscribe(MQTTTopicSubscribe)
print()
def on_message(client, userdata, msg): # The callback for when a PUBLISH message is received from the server.
global mqtt_topic
mqtt_topic = msg.topic
global mqtt_payload
mqtt_payload = str(msg.payload)
print(str(datetime.now()) + " | Message received")
print(str(datetime.now()) + " | MQTT Topic and Payload: " + msg.topic + " " + str(msg.payload)[2:][:-1]) # Print a received msg
asyncio.run(azure())
async def azure():
# Create instance of the device client using the connection string
device_client = IoTHubDeviceClient.create_from_connection_string(AzureIOTHub_conn_str)
# Connect the device client.
await device_client.connect()
print(str(datetime.now()) + " | Async connection established to Azure IOT")
# Send a single message
print(str(datetime.now()) + " | Sending message to Azure IOT Hub")
msg = Message("{ \"DateTime\": \"" + str(datetime.now()) + "\", \"MQTT Topic\": \"" + mqtt_topic + "\", \"Payload\": \"" + mqtt_payload[2:][:-1] + "\" }")
msg.message_id = uuid.uuid4()
msg.content_encoding = "utf-8"
msg.content_type = "application/json"
await device_client.send_message(msg)
print(str(datetime.now()) + " | Message sent, tearing down Azure IOT Hub connection")
print()
# Finally, shut down the client
await device_client.shutdown()
print("*********************************************************************")
print("* *")
print("* *")
print("* MQTT --> Azure IOT Bridge *")
print("* *")
print("* *")
print("* *")
print("* shane@baldacchino.net *")
print(f"* Version : {ScriptVersion} *")
print(f"* Modified Date : {ModifiedDate} *")
print("* *")
print("*********************************************************************")
client = mqtt.Client(MQTTClientName)
client.on_connect = on_connect
print(str(datetime.now()) + " | Listening for messages")
print()
client.on_message = on_message
client.connect(MQTTBrokerIP, 1883, 60) # Connect to (broker, port, keepalive-time)
client.loop_forever() # Start networking daemon
try:
asyncio.run(azure())
except:
pass #continue on errors - used to solve internet connectivity issues.
Seeing This In Action
Lets drop to a video to see this in working end-to-end, to validate messages are flowing in to Azure IoT Hub I can use the Azure CLI (AZ-CLI) to monitor the output.
az iot hub monitor-events --output table --device-id devicename --hub-name hubname --output json
For the purpose of this demo, I have left a handful of messages at QoS level 2 and set LWT (Last Will and Testament) to true.
After 24 hours of running, we can see I have published 10.52K of messages in to Azure IoT Hub and there are certain ebbs and flows that occur in my house.
Conclusion
There are many ways to skin this code cat. My requirements was to publish messages in to Azure and we have been able to achieve this via different ways (I am sure there is more). Automation is a journey, which path will you take?
We illustrated a transparent side-car approach that will listen to an existing broker, on topics you desire and push these in to Azure IoT, all without making any configuration changes (the most important thing for my implementation).
Are there any draw backs, sure there are, right now this is one way in direction (simplex) and allows me to push messages in to Azure IoT but not receive messages back. Azure IoT Edge or direct MQTT publishing to Azure IoT Hub would be duplex communication. The Azure IoT SDK for Python is capable of duplex communication (Receive cloud-to-device message) but I have yet to implement it. Will I, I am unsure but its nice to know I can .
Personally, I like the SDK approach, its my code, my choices on what I do but I do understand this is not for everyone.
We now have my messages, my events, in Azure and now its time to make some friends and learn how to derive operational intelligence from visualisations through to machine learning and beyond.
Think big and happy building
Shane
Hello Thank you for this article, I have followed the instructions and got this error.
2022-03-16 01:11:47.109691 | Listening for messages
Traceback (most recent call last):
File “MQTT_IoT.py”, line 78, in
client.connect(MQTTBrokerIP, 1883, 60) # Connect to (broker, port, keepalive-time)
File “/home/codingtron/.local/lib/python3.6/site-packages/paho/mqtt/client.py”, line 914, in connect
return self.reconnect()
File “/home/codingtron/.local/lib/python3.6/site-packages/paho/mqtt/client.py”, line 1044, in reconnect
sock = self._create_socket_connection()
File “/home/codingtron/.local/lib/python3.6/site-packages/paho/mqtt/client.py”, line 3685, in _create_socket_connection
return socket.create_connection(addr, timeout=self._connect_timeout, source_address=source)
File “/usr/lib/python3.6/socket.py”, line 724, in create_connection
raise err
File “/usr/lib/python3.6/socket.py”, line 713, in create_connection
sock.connect(sa)
ConnectionRefusedError: [Errno 111] Connection refused
Can you please tell me what cloud be the issue?
Thanks.. Also can you explain how it will connect over mqtt_websockerts_only please?
Thanks
Thanks for the post, apologies for the delay in getting back to you. Is your local broker online and able to be connected to? I am using “10.0.0.200” in my example, is your broker IP updated and able to be connected on 1833 (MQTT TCP Port)?
This does not use MQTT over websockets, just MQTT.
No it wasn’t connecting to MQTT broker and i changed the config and now this error
CTraceback (most recent call last):
File “MQTT_IoT.py”, line 79, in
client.loop_forever() # Start networking daemon
File “/home/codingtron/.local/lib/python3.6/site-packages/paho/mqtt/client.py”, line 1756, in loop_forever
rc = self._loop(timeout)
File “/home/codingtron/.local/lib/python3.6/site-packages/paho/mqtt/client.py”, line 1150, in _loop
socklist = select.select(rlist, wlist, [], timeout)
KeyboardInterrupt
Actually at this step it stucked and i had to ctl+c.
Hmm, I am unsure going on this but I would try a very simple Paho MQTT test before moving forward.
http://www.steves-internet-guide.com/into-mqtt-python-client/ has an example snippet which I used to craft this script.
Thanks
Shane