Export MQTT Topics to InfluxDB
Intro
The goal of this blog series is to collect the sensor data of different IoT devices via MQTT, store them in InfluxDB 2.6 and visualize them with Grafana.
The previous blogs from this series:
- Grafana with podman kube
- Setup MQTT Broker, IoT Devices and Security
- InfluxDB v2 as Datasource for Grafana
In this post I will describe how to collect the sensor data via a MQTT Broker and store them in InfluxDB 2.6 with MQTT Exporter. Like all the other services, MQTT Exporter will run containerized managed with podman kube on openSUSE MicroOS.
The example configuration is for Shelly Plug S, Shelly H&T and Shelly Plus H&T devices. As explained in Setup MQTT Broker, IoT Devices and Security, I use custom MQTT prefixes with topics in the form of shellies/shelly-plug-sX
, shellies/shelly-ht-XX
and shelly-ht/shelly-plus-ht-XX
.
Connect to MQTT Broker
The configuration file for MQTT Broker is divided in three parts: how to connect with the MQTT Broker, how to connect with the InfluxDB and the mapping between the topics and how to store.
For security reasons we will only use TLS encrypted connections, so that the password to authenticate against MQTT will not transfered in plaintext. The part for MQTT looks like:
mqtt:
broker: mqtt.example.com
# port: 8883
protocol: mqtts
user: <username>
password: <password>
client_id: mqtt-exporter-shellies
topic_paths:
- shellies/#
- shelly-ht/#
device_id_regex: ".*?/(?P<deviceid>.*?)/.*"
metric_per_topic_regex: ".*/(?P<metricname>.*)"
qos: 0
The variables have the following meaning:
broker
defines the hostname or IP address under which the MQTT broker is reachable.port
andprotocol
define on which port with which protocol the MQTT broker is listening. The default port1883
is unecrypted and impliesmqtt
as protocol, port8883
is the default port for TLS encrypted connections and usesmqtts
as protocol. Same the other way around:mqtt
implies port1883
if not otherwise specified andmqtts
implies port8883
if not otherwise specified.user
is the username for authentication with the MQTT broker. It’s also possible to specify it via the environment variable MQTT_BROKER.password
is the password for authentication with the MQTT broker. For security reason it is adviced to not write that password in this configuration file, but specify it as environment variable MQTT_PASSWORD.client_id
is the unique client ID. If not specified, hostname-PID will be used.topic_paths
defines the tpic paths to subscribe to. This can be more than one, but you can only specify onedevice_id_regex
andmetric_per_topic_regex
. This must fit for all subscribed topics. If this is not possible, an own MQTT exporter instance needs to be started for every topic.device_id_regex
is a regular expression to extract the device ID from the topic path. The default regular expression, assumes that the last “element” of the topic_path is the device id. The regular expression must contain a named capture group with the name deviceid. For example the expression for tasamota based sensors is “tasmota/discovery/(?P.)/.”.metric_per_topic_regex
is the regular expression used to extract the metric name from the topic. Must contain a named group formetricname
.qos
defines the MQTT QoS level, default is0
, which is good enough for this use case.
Connect to InfluxDB
MQTT Exporter works with InfluxDB >= 1.8 and InfluxDB 2.x. If InfluxDB 2.x is used and the token has permissions to create new buckets, MQTT Exporter can create the bucket if it does not exist.
influxdb:
server: influxdb.example.com
database: shellies
organization: my-org
#token: <token>
server
is the hostname or IP address of the InfluxDB.database
is the database or bucket, into which MQTT Exporter should write the dataorganization
is only used with InfluxDB 2 and specifies where the bucket can be found or should be created.token
is the authentication token to connect with InfluxDB. For InfluxDB 1.x it is username:password, for InfluxDB 2.x this is a token generated in the UI or commandline tool and specifies which access rights the user has. For security reasons, to avoid that the token is accidently commited to a public git repo or something similar, the token can also be specified via the environment variableINFLUXDB_TOKEN
. This should be preferred method.
Metrics mapping
This section defines which MQTT metrics/topics will be exported, in which format and what the key in the database will be. Only metrics listed here will be exported. The first few examples assume that the MQTT message is just one single value. The case where a MQTT message is a json struct is handled below.
Sinle value MQTT message
metrics:
- mqtt_name: power
name: power
unit: Watt
type: float
mqtt_name
and type
are required fields. name
is optional and will be set to mqtt_name
if not specified. unit
is optional and is stored as tag["unit"]
.
mqtt_name
will be compared with the metricname
of metric_per_topic_regex
. If they match, this entry is used to store the value of the MQTT topic in InfluxDB in the format specified by type
. Valid types are float, int and string. name
specifies the key under which the value is stored.
Mapping of strings to integers
Sometimes it’s easier to compare simple integer values instead of strings in the query language, especially if you have e.g. a switch, which can only have the values “on” and “off”. In this case you can specify a mapping of strings to integers with string_value_mapping
instead of specifing type
. Beside the map you can specify an error value, which is used if the value of the MQTT topic doesn’t match any string in the mapping.
- mqtt_name: 0
name: switch
string_value_mapping:
map:
off: 0
low: 1
on: 2
error_value: -1
MQTT json message
In the case the clients send arbitrary JSON messages on the topic we need a way to specify the topic name and the “way” inside the json struct to the value.
As example:
shelly-ht/shelly-plus-ht-01/events/rpc {
"src": "shellyplusht-08b61fce63c4",
"dst": "shelly-ht/shelly-plus-ht-01/events",
"method": "NotifyFullStatus",
"params": {
"ts": 1674406876.85,
"ble": {},
"cloud": {
"connected": false
},
"devicepower:0": {
"id": 0,
"battery": {
"V": 6.16,
"percent": 100
},
"external": {
"present": false
}
},
"ht_ui": {},
"humidity:0": {
"id": 0,
"rh": 56.9
},
"mqtt": {
"connected": true
},
"sys": {
"mac": "08B61FCE63C4",
"restart_required": false,
"time": null,
"unixtime": null,
"uptime": 1,
"ram_size": 235504,
"ram_free": 165344,
"fs_size": 458752,
"fs_free": 131072,
"cfg_rev": 16,
"kvs_rev": 0,
"webhook_rev": 0,
"available_updates": {},
"wakeup_reason": {
"boot": "deepsleep_wake",
"cause": "periodic"
},
"wakeup_period": 7200
},
"temperature:0": {
"id": 0,
"tC": 19.9,
"tF": 67.7
},
"wifi": {
"sta_ip": "172.17.0.80",
"status": "got ip",
"ssid": "my-wifi",
"rssi": -67
},
"ws": {
"connected": false
}
}
}
Assume we want now the temperature in Celsius from the above struct, this means we have a MQTT topic shelly-ht/shelly-plus-ht-01/events/rpc
which leads to the metricname rpc
.
Inside JSON struct we want to have the value of the key tC
of temperature:0
which can be found in params
.
MQTT Exporter uses gojsonq to find the key, please look at that examples for more informations how to construct the search. In short: the different parts of the json path are seperated with a dot: .
The mqtt_name
has now the form of: metricname.json.path
.
In our concrete example, this would be rpc
for the metricname and params.temperature:0.tC
. The metricname and json path are also seperated by a dot. The resulting mqtt_name
for this example would be: rpc.params.temperature:0.tC
- mqtt_name: rpc.params.temperature:0.tC
name: temperature
unit: C
type: float
Additional tags
A list of additional tags can be specified in the form “key: value”. They will always be added if the metricname matches this rule.
- mqtt_name: rpc.params.devicepower:0.battery.percent
name: battery_voltage
unit: "%"
type: float
const_tags:
device: "Shelly H&T"
battery: "Type C"
Full example configuration file
Configuration for Shelly devices
This is the full configuration file I use for my Shelly devices:
mqtt:
broker: mqtt.example.com
# port: 8883
protocol: mqtts
user: <username>
password: <password>
client_id: mqtt-exporter-shellies
topic_paths:
- shellies/#
- shelly-ht/#
device_id_regex: ".*?/(?P<deviceid>.*?)/.*"
metric_per_topic_regex: ".*/(?P<metricname>.*)"
qos: 0
influxdb:
server: influxdb.example.com
database: shellies
organization: my-org
#token: <token>
metrics:
# The first metrics are for the Shelly Plug S
- mqtt_name: temperature
name: temperature
unit: C
type: float
- mqtt_name: power
name: power
unit: Watt
type: float
- mqtt_name: energy
name: energy
unit: Watt/Minute
type: int
- mqtt_name: 0
name: switch
string_value_mapping:
map:
off: 0
low: 1
on: 2
error_value: -1
- mqtt_name: info.update.has_update
name: firmware_update
unit: Boolean
string_value_mapping:
map:
true: 1
false: 0
error_value: -1
- mqtt_name: info.update.old_version
name: current_firmware_version
unit: Version
type: string
- mqtt_name: info.wifi_sta.ip
name: ipaddress
type: string
# the following metrics are for the Shelly Plus H&T
- mqtt_name: rpc.params.temperature:0.tC
name: temperature
unit: C
type: float
- mqtt_name: rpc.params.humidity:0.rh
name: humidity
unit: rh
type: float
- mqtt_name: rpc.params.devicepower:0.battery.V
name: battery_voltage
unit: Volt
type: float
- mqtt_name: rpc.params.devicepower:0.battery.percent
name: battery_voltage
unit: "%"
type: float
- mqtt_name: rpc.params.wifi.sta_ip
name: ip_address
type: string
- mqtt_name: humidity
name: humidity
unit: rh
type: float
- mqtt_name: battery
name: battery
unit: "%"
type: int
health_check: ":8080"
Configuration for Sonoff SNZB-02 devices
This is an example configuration file for Sonoff SNZB-02 devices connected via Zigbee2MQTT. I changed the device name to something readable following the scheme SONOFF-SNZB-02-X
, where X is a consecutive number. The metric name for the measurement variable in InfluxDB is SONOFF-SNZB-02
, without the consecutive number.
The MQTT topic used by the Sonoff devices is zigbee2mqtt/SONOFF-SNZB-02-X
, an example message looks like:
zigbee2mqtt/SONOFF-SNZB-02-2 {
"battery": 100,
"humidity": 64.87,
"linkquality": 244,
"temperature": 6.6,
"voltage": 3000
}
The configuration file:
mqtt:
broker: broker.example.com
port: 8883
topic_paths:
- zigbee2mqtt/#
device_id_regex: "zigbee2mqtt/(?P<deviceid>.*)"
metric_per_topic_regex: ".*/(?P<metricname>.*)-[0-9]"
qos: 0
influxdb:
server: influxdb.example.com
database: zigbee
organization: my-org
metrics:
- mqtt_name: SONOFF-SNZB-02.temperature
name: temperature
type: float
- mqtt_name: SONOFF-SNZB-02.humidity
name: humidity
type: float
- mqtt_name: SONOFF-SNZB-02.battery
name: battery
type: float
- mqtt_name: SONOFF-SNZB-02.linkquality
name: linkquality
type: int
- mqtt_name: SONOFF-SNZB-02.voltage
name: voltage
type: int
Health Check
Liveness and readiness health checks are needed if the service runs in Kubernetes. The livness probe tells kubernetes that the application is alive, if the service does not answer, the service will be restarted. The readiness probe tells kubernetes, when the container is ready to serve traffic.
The endpoints are:
- IP:Port/healthz for the liveness probe
- IP:Port/readyz for the readiness probe
The IP:Port will be defined with the health_check
option in the configuration file. If this config variable is not set, the health check stay disabled.
Example:
health_check: ":8080"