Getting Started with MQTT, Telegraf, and InfluxDB Cloud v3

Navigate to:

If you’re looking to dive into the world of IoT data collection, you’ve probably come across MQTT—a lightweight messaging protocol designed for efficiently transmitting data between devices. In this post, we’ll walk through how to use Telegraf, a plugin-driven agent for collecting, processing, and writing metrics, to gather data from an MQTT broker (we’ll be using Mosquitto, a popular open-source MQTT broker) and send it to InfluxDB Cloud v3, a time series database built for storing and analyzing real-time generator data (like temperature, load, power, and fuel usage). Whether you’re tracking smart home devices or monitoring anything else that talks MQTT, we’ve got you covered. Let’s get started! The corresponding repo for this blog post can be found here. A screenshot of fuel values for three generators from the InfluxDB Cloud Serverless UI, collected by Telegraf with the MQTT Consumer Plugin.

Requirements and run

There are multiple ways to run the examples in this repository, but the recommended and most efficient method is using Docker. Docker lets you easily spin up the simulator without worrying about manual dependency installation. To use Docker, follow these steps:

  1. Clone the repository.

git clone https://github.com/InfluxCommunity/MQTT_Simulators.git

  1. Include your InfluxDB credentials in the telegraf.com
  2. Build the Docker image:

docker build emergency_generator/. -t emergency-generator:latest

  1. Finally, deploy the simulator with Docker Compose:

docker-compose up -d

Those who prefer to run the simulator locally without Docker must manually install the Mosquitto MQTT broker and set up the Python environment.

  1. Install Mosquitto on your machine:
    sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppa sudo apt-get update sudo apt-get install mosquitto

  2. Start the Mosquitto broker:
    sudo systemctl enable mosquitto sudo systemctl start mosquitto

  • Clone the repository and navigate to the simulator:

git clone https://github.com/InfluxCommunity/MQTT_Simulators.git

  • Install the required Python packages:

python3 -m pip install --no-cache-dir -r requirements.txt

  • Set your environment variables:

export GENERATORS=3 export BROKER=localhost

  • Run the simulator:

python3 src/emergency_generator.py

While both methods will get you up and running, Docker simplifies the process, avoiding potential configuration issues and ensuring a consistent environment.

Code walkthrough

The repo contains a couple of examples, but we’ll focus on the emergency_generator example. We’ll break down the core components of the simulator, the MQTT publisher, and the Telegraf configuration to give you a clear understanding of how everything fits together. By the end of this walkthrough, you’ll have a solid grasp of how the code works and how to adapt it to your use case.

  1. emergency_generator/src/emergency_generator.py

This script simulates emergency power generators, generating data such as temperature, load, power, and fuel level. The key components are:

  • emergency_generator class: Represents a single generator. It initializes the generator with an ID, location (using Faker for random U.S. coordinates), and base fuel level and calculates temperature, load, power, and fuel usage based on the load. Each generator simulates its performance by returning data through returnTemperature, returnPower, and returnFuelLevel.
  • runEmergencyGenerator function: This function creates an instance of the emergency_generator class, connects to an MQTT broker, and continuously publishes generator health data to the MQTT broker on a random interval (between 5 to 15 seconds). The data is published to a topic called emergency_generator.
  • Threading: The script can run multiple generator instances concurrently using Python’s threading module. The GENERATORS environment variable controls the number of generators, and each thread runs a separate generator, continuously publishing its status.
  1. emergency_generator/src/mqtt_producer.py

This script handles publishing data to the MQTT broker:

  • mqtt_publisher class: This class connects to an MQTT broker and provides methods for sending (publishing) data to MQTT topics. It can connect to the broker using a standard or secure connection (with username and password).
  • publish_to_topic method: This method formats and publishes the generator data to a specified MQTT topic, appending the generator ID to the topic name. The message is serialized to JSON before being published.
  1. telegraf/telegraf.conf

This configuration file is for Telegraf, a metrics collection agent, to pull data from the MQTT broker and send it to InfluxDB.

  • [[inputs.mqtt_consumer]]: This input plugin subscribes to MQTT topics (in this case, #, which subscribes to all topics). The data is expected in JSON format, and it processes incoming MQTT messages with the json_v2 parser to extract generator data (e.g., generatorID).
  • [[outputs.influxdb_v2]]: This output plugin configures Telegraf to send the collected data to an InfluxDB instance, specifying the URL, authentication token, and destination bucket where the data will be stored.
  1. mosquitto/mosquitto.conf

This is the configuration file for the Mosquitto MQTT broker.

  • allow_anonymous true: Allows anonymous connections to the broker, meaning clients can publish and subscribe without authentication.
  • listener 1883: Configures the broker to listen on port 1883 for MQTT connections. The comments indicate that during the initial setup, you can use an unencrypted connection to generate certificates for secure communication. Once the certificates are obtained, you can disable the unencrypted listener.

Leveraging Telegraf

Telegraf is an excellent tool for collecting IoT data with MQTT because it’s lightweight, highly customizable, and supports real-time data ingestion. With its native mqtt_consumer plugin, Telegraf can easily subscribe to MQTT topics, process messages in various formats (like JSON), and directly send the data to InfluxDB or other databases. Its ability to handle large volumes of data efficiently, combined with minimal resource overhead, makes it ideal for IoT environments where low latency, scalability, and flexibility are crucial. Additionally, Telegraf’s vast library of plugins allows seamless integration with other systems, enhancing IoT data monitoring and analysis.

Telegraf plays a crucial role in ingesting data from MQTT brokers and transforming it into a format that can be written to InfluxDB. Let’s break down how you can leverage Telegraf to handle generator data in JSON format using the following payload:

{
  "generatorID": "generator1",
  "lat": 40.68066,
  "lon": -73.47429,
  "temperature": 186,
  "power": 186,
  "load": 2,
  "fuel": 277
}

To capture this data, we configure the Telegraf mqtt_consumer plugin. This plugin subscribes to topics on an MQTT broker and listens for incoming messages. Here’s the configuration that handles the payload and extracts key fields:

[[inputs.mqtt_consumer]]  
  servers = ["tcp://mosquitto:1883"]
  topics = ["#"] 
  qos = 2
  connection_timeout = "30s"
  data_format = "json_v2"

 [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "genData"
      [[inputs.mqtt_consumer.json_v2.object]]
          path = "@this"
          disable_prepend_keys = true
          tags = ["generatorID"]

In this configuration:

  • servers defines the address of your MQTT broker.
  • topics is set to #, meaning we subscribe to all topics. You can specify more precise topics if needed.
  • data_format is set to json_v2, which tells Telegraf that the payload is in JSON format.
  • Inside the JSON parsing section, we define measurement_name as “genData,” meaning all data from this input will be written to InfluxDB under that measurement.
  • We use the json_v2.object processor to specify that the entire JSON object should be treated as is while extracting the generatorID as a tag for better filtering and querying in InfluxDB.

Using this configuration, Telegraf efficiently ingests real-time generator data, transforming it into a structured format ready for storage and analysis in InfluxDB.

Final thoughts

As always, get started with InfluxDB v3 Cloud and Telegraf. If you need help, please contact us on our community site or Slack channel. If you are also working on an IoT project with MQTT, Telegraf, or InfluxDB, we’d love to hear from you. I encourage you to share your use case with us and get a hoodie!