MQTT Topic and Payload Parsing with Telegraf
By
Sam Dillard /
Use Cases, Product, Developer
Dec 20, 2021
Navigate to:
Buckle up, this one isn’t short…but I’m hoping it will be thoroughly informative! This post is about Telegraf as a consumer of MQTT messages in the context of writing them to InfluxDB. If you are interested in and unfamiliar with Telegraf, you can view docs here. Unsure if Telegraf aligns with your needs? I make a case for it in the Optimizing Writes section of this blog post.
It may also help to have an understanding of Line Protocol, InfluxDB’s default accepted format.
In this post, I want to impart the efficacy of Telegraf in collecting data over MQTT by a) drilling into a feature of the latest release and b) discussing parsing methodology. As many readers know, MQTT message payloads can be…quite literally whatever you want them to be, so it’s paramount that a subscriber to many topics be able to handle whatever you throw at it.
Telegraf can subscribe to any number of topics while giving its users control over how it shapes incoming data. Controlling data shape allows users to write out data that is not only compatible for the destination data store, but optimized as well.
Data shape control in this context comes in two forms, and each will get its own section in this post:
- Topic parsing
- Payload parsing
Housekeeping:
- All Telegraf behavior below assumes a standard
[Agent]
level configuration withomit_hostname = true
- Timestamps are omitted in examples
Topic parsing
In Telegraf’s latest release (v1.21), the MQTT Consumer Input Plugin added the ability to parse information out of topics dynamically and append that information to the payload. This means a Telegraf operator can use data from both the MQTT message payload and the incoming topic.
Below is a diagram to exemplify four different MQTT message packets being laid out the same way in the InfluxDB data model.
This is possible with the insertion of Telegraf, and the rest of this post explains how.
I’ll first explain the problem this solves with examples. Suppose I have a basic sensor emitting a single temperature value to a topic that includes its unique ID; a topic like: sensors/00191
. The MQTT messages will be sent to this topic with a single raw value for this example, let’s say a sample of float 10.0
.
No third-party subscriber to this information in sensors/00191
will know what to do with this temperature value:
-
It is not valid LP on its own, so Telegraf will reject it.
-
If you help Telegraf with a
data_format="value"
anddata_type="float"
, it would generate LP like:
mqtt_consumer,topic=sensors/00191 value=10.0 <timestamp>
(This is default behavior of input plugins that are given no measurement name).
Notice that Telegraf adds the whole topic as a LP tag by default. This is Telegraf being helpful so you retain any potentially necessary context. Including the whole topic as a single string, however, would necessitate some regex later to be made useful. The reason it’s there is that without it:
- This
value
field has no meaning to anyone/anything consuming the data. - Each sensor emitting data like this would overwrite the last sensor's data if written at the same time (tags are needed to differentiate fields like
value
).
The information needed, in this case, is in the topic. This can happen if a vendor programmed their sensor to emit data this way or if users are working with highly constrained resources and need to keep their payloads as small as possible.
Enter topic parsing! Telegraf solves this problem without needing to store the whole topic as a tag by allowing users to specify which portions of a topic they want to bring in as measurement, tags, or fields. Before this release, this would have been accomplished with the Regex Processor Plugin but the new experience is simpler and more convenient.
A simple configuration change to the mqtt_consumer input plugin will give us what we need:
[[inputs.mqtt_consumer]]
servers = ["<address:port>"]
topics = ["sensors/#"]
[[inputs.mqtt_consumer.topic_parsing]]
topic = "sensors/+"
measurement = "measurement/_"
tags = "_/sensor_id"
The above snippet of Telegraf configuration instantiates a client subscribed to sensors/#
(any topic with parent sensors
). If you’re familiar with Input Plugins like SNMP or parsers like JSON_v2 (going over this next if not!), you’ll recognize the nested TOML table defined by [[inputs.mqtt_consumer.topic_parsing]]
. There can be any number of these but we only need one. In this table, you tell Telegraf which topics to match on and with the information that follows you will parse those topics as their messages arrive. Following that is where you tell Telegraf what to make a measurement, tag/s, and/or Field/s. In this case, we are only pulling out the Measurement and a single tag (there isn’t much information to be parsed, after all). No Field present…but that won’t always be the case.
We use the syntax measurement/_
in a pair of quotes to telegraph (pun intended) to Telegraf which segments of the topic name will be parsed into what LP elements. Keys map to their respective indexed (by segment) value in the topic, and underscores map to segments that are not relevant to that Line Protocol element (placeholders).
In the example configuration above, we tell telegraf that the first segment will be our measurement and the second (last) segment will be a tag with key sensor_id
. The actual information in that segment will be populated as the value of that tag key. If we want to now take out the default topic
tag, we can include a topic_tag = ""
parameter in the config and get this output:
mqtt_consumer,sensor_id=00191 value=10.0 <timestamp>
Modifications like this matter at scale and with more complicated topics and payloads. It also improves analytics experience downstream…a lot.
Quick note: it’s a best (not necessary) practice to have field names contain useful information so “value” is not ideal. To fix this, you can simply use the Pivot Processor (out of scope for this post).
Anyway, speaking of more complicated topics and payloads, let’s look at two more quick brief examples:
Topic: detroit/0f801/42.33/-83.04/front_left/09fse21/rpm
Message: 846
We want our LP output to have all this information in it. The first segment is the city (site) this vehicle belongs to. The second is the vehicle’s ID. Third is the latitude at that given sample time. Fourth is the longitude, fifth is location of tire on the vehicle, sixth is the tire’s ID, and last is the rpm of that tire. All potentially important information!
I won’t go into reasoning for what segments should which LP elements for that, you can refer to the blog post I linked at the top. For this example, we’ll be generic with the measurement and let Telegraf name it. Our measurement will be mqtt_consumer
(Telegraf can rename this in other plugins if necessary). Every string will be a tag and every numerical element will be a Field. Ultimately, we want our resulting LP to look like:
mqtt_consumer,site=detroit,vehicle_id=0f801,tire_id=09fse21 lat=42.33,lon=-83.04,rpm=846 <timestamp>
The following relevant [[inputs.mqtt_consumer]]
configuration will get us there:
...snipped...
[[inputs.mqtt_consumer.topic_parsing]]
topic = "+/+/+/+/+/+/+" # all topics with 7 segments
tags = "site/vehicle_id/_/_/orientation/tire_id/_"
fields = "_/_/_/_/_/_/rpm"
There you have it a concise configuration for pulling topic-encoded information into Line Protocol records from every single topic that follows a 7-segment structure!
Payload parsing!
Parser Plugins are not new to Telegraf and neither is specifically JSON parsing but I feel the MQTT consumption topic deserves a drill-down into JSON parsing. First, note that Telegraf offers a list of other parsers as well. Parsers are invoked inside of plugins with the data_format
parameter. They are configured slightly differently as their respective formats vary in semantics. For this post, we will focus on JSON parsing.
JSON parsing in Telegraf has been around a long time, but it was revamped in a new version a few releases ago. Just try to throw at it some JSON it can’t parse!
Before I dive in here, if you want a thorough walkthrough of this upgraded parser, this blog post by the Telegraf Product Manager, Samantha Wang, is fantastic. Additionally, if you prefer to parse in the Python dialect, there is another way I wrote about here.
As you may know, MQTT will accept just about any type of payload. JSON is the most common in this context. That said, JSON can also be just about any shape so configurable parsing is paramount!
To bridge the sections, we’ll stick with the last example message but change it up slightly by putting some info that was in the topic into a JSON payload:
Topic: detroit/0f801/09fse21
Message:
{
"orientation": "front_left"
"latitude": 42.33
"longitude": -83.04
"rpm": 846
}
For this, the user will want to parse the topic as it comes just as done before. That configuration will look like:
...snipped...
[[inputs.mqtt_consumer.topic_parsing]]
topic = "+/+/+" # all topics with 3 segments
tags = "site/vehicle_id/tire_id"
For this one, we are keeping the Measurement as we had it (Telegaf’s default per Input) and we are setting no fields. The reason for the latter is we know we can get our fields from the payload this time. So let’s parse it up!
To make JSON parsing flexible enough to handle heavily nested blobs and other complexities, this new JSON_v2 parser uses nested TOML tables and GJSON querying for accessing desired blobs. The above message payload is small and flat so the parser will be heavily underutilized this time.
From the combination of the above topic and JSON body, we want the same output that we had in the topic parsing example before it. As a refresher that LP looked like:
mqtt_consumer,site=detroit,vehicle_id=0f801,tire_id=09fse21 lat=42.33,lon=-83.04,rpm=846 <timestamp>
Keeping in mind the shorter topic we just parsed above, here is the accompanying JSON parsing configuration:
...snipped...
data_format = "json_v2" # invokes the parser -- lines following are parser config
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.tag]]
path = "orientation" # GJSON path: JSON is flat -- all keys at root
[[inputs.mqtt_consumer.json_v2.field]]
path = "rpm"
type = "int"
[[inputs.mqtt_consumer.json_v2.tag]]
path = "latitude"
rename = "lat"
type = "float"
[[inputs.mqtt_consumer.json_v2.field]]
path = "longitude"
rename = "lon"
type = "float"
That will produce the same result as the example where all the metadata was in the topic, itself. Either case is totally within reason.
I can understand if the JSON parsing looks a little verbose for such a simple JSON blurb. The verbosity glows in more complicated cases. For this case, here is an example of the same result using a different way to parse tags and a different way to parse fields:
...snipped...
data_format = "json_v2" # invokes the parser -- lines following are parser config
[[inputs.mqtt_consumer.json_v2]]
[[inputs.mqtt_consumer.json_v2.object]]
tags = ["site"] # can list all Tags -- our case only has one
[[inputs.mqtt_consumer.json_v2.object.field]] # fields by key and type
rpm = "int"
latitude = "float"
longitude = "float"
That way of defining fields is my favorite part about this parser. So concise! Also notice the object
table used. If you just need to define your tags and fields and don’t need to do special things with each, these are good methods for keeping your configuration shorter!
Wrapping up
So that’s the nuts and bolts. You now know what it looks like to have Telegraf consume from an MQTT broker, subscribe to many topics, and parse data from the topic and payload simultaneously, merging that data into LP record/s. InfluxDB will love you for this!
If you’re at a stage where you’re either beginning to develop a pipeline of IoT data and MQTT is in the mix, inserting Telegraf as a consumer (even a producer as well if you feel so inclined) to distribute data to your data stores could be a good move. If you’re using or planning to use InfluxDB as the time series component to your data store layer, Telegraf is optimally designed for feeding InfluxDB. Regardless of where your data ends up, if you need a relatively simple way to get data of various forms and sources from an MQTT broker/cluster, Telegraf can do it.