Getting Started with Kafka, Telegraf, and InfluxDB v3

Navigate to:

In the world of smart gardening, keeping track of environmental conditions like humidity, temperature, wind, and soil moisture is key to ensuring your plants thrive. But how do you bring all this data together in an efficient and scalable way? Enter the powerful trio of Kafka, Telegraf, and InfluxDB Cloud v3. In this guide, we’ll walk you through setting up a seamless pipeline that collects real-time data from garden sensors, streams it through Kafka, and stores it in InfluxDB for monitoring and analysis through the use of Telegraf. Whether you’re new to these tools or looking to expand your IoT toolkit, this example will show you how to get started. The corresponding repo for this tutorial can be found here.

Requirements and Run

Before we dive into the setup, there are a few requirements to have in place. First, you’ll need Docker and Docker Compose installed on your system, as the example relies on containerized services to simplify deployment. You should also have an InfluxDB Cloud v3 account, with your URL, token, organization, and bucket information readily available. These details will be crucial for configuring Telegraf to write garden sensor data to InfluxDB. Additionally, ensure you have Python installed, as the garden sensor gateway script relies on Python’s Kafka package to simulate and send sensor data. Finally, familiarity with the basic concepts of Kafka, Telegraf, and InfluxDB will help you follow along more easily.

To run this example, follow these steps:

  1. Clone the project and navigate to the directory.
  2. Open the resources/mytelegraf.conf file and insert your InfluxDB Cloud v3 URL, token, organization, and bucket name. You can also use environment files if you desire instead.
  3. Start the containers by changing “directory” to resources and running the command docker-compose up --build -d.
  4. Wait approximately 30 seconds for Telegraf to initialize and begin writing metrics.
  5. Once everything is up and running, the garden sensor gateway will start generating random humidity, temperature, wind, and soil data, sending it through Kafka, and storing it in your InfluxDB Cloud v3 instance for monitoring and analysis.

Code Explained

In this section, we’ll break down the example’s components and explain how each piece fits together to create a seamless data pipeline for monitoring garden sensor data using Kafka, Telegraf, and InfluxDB Cloud v3.

1. app/Dockerfile

The Dockerfile in the app directory is responsible creates a containerized environment to run the garden_sensor_gateway.py script.

2. app/garden_sensor_gateway.py

import time
import json
import random

from kafka import KafkaProducer

def random_temp_cels():
    return round(random.uniform(-10, 50), 1)

def random_humidity():
    return round(random.uniform(0, 100), 1)

def random_wind():
    return round(random.uniform(0, 10), 1)

def random_soil():
    return round(random.uniform(0, 100), 1)

def get_json_data():
    data = {}

    data["temperature"] = random_temp_cels()
    data["humidity"] = random_humidity()
    data["wind"] = random_wind()
    data["soil"] = random_soil()

    return json.dumps(data) 

def main():
    producer = KafkaProducer(bootstrap_servers=['kafka:9092'])

    for _ in range(20000):
        json_data = get_json_data()
        producer.send('garden_sensor_data', bytes(f'{json_data}','UTF-8'))
        print(f"Sensor data is sent: {json_data}")
        time.sleep(5)

if __name__ == "__main__":
    main()

This Python script simulates garden sensor data and sends it to a Kafka topic. Let’s look at how it works:

  • Importing Libraries: The script imports necessary libraries like time, json, random, and KafkaProducer from the kafka-python package.
  • Data Generation Functions: Functions like random_temp_cels(), random_humidity(), random_wind(), and random_soil() generate random values for temperature, humidity, wind, and soil moisture, respectively. These values are rounded to one decimal place to simulate realistic sensor readings.
  • Data Formatting: The get_json_data() function collects these generated values into a dictionary and converts it into a JSON string using json.dumps(data).
  • Kafka Producer: The main() function initializes a Kafka producer with KafkaProducer(bootstrap_servers=[‘kafka:9092’]), pointing it to the Kafka broker running in the container. It then enters a loop where it generates sensor data, sends it to the Kafka topic garden_sensor_data, and prints the data to the console. The loop runs 20,000 times, with a 5-second delay between each iteration.

3. resources/docker-compose.yml

The docker-compose.yml file in the resources directory defines the services required for the project, orchestrating the containers for Kafka, Zookeeper, Telegraf, and the garden sensor gateway. Here’s what each service does:

  • Kafka and Zookeeper: These services set up the Kafka broker and Zookeeper, which Kafka relies on for distributed coordination. Kafka is exposed on port 9092, and Zookeeper on port 2181.
  • Garden Sensor Gateway: This service builds the container for the garden_sensor_gateway.py script using the Dockerfile in the app directory. It depends on Kafka to ensure that Kafka is up and healthy before the script starts running.
  • Telegraf: The Telegraf service is configured to consume messages from the Kafka topic garden_sensor_data and write them to InfluxDB Cloud v3. The Telegraf configuration file, mytelegraf.conf, is mounted into the container to provide the necessary settings.

4. resources/mytelegraf.conf

[[inputs.kafka_consumer]]
  ## Kafka brokers.
  brokers = ["kafka:9092"]
  ## Topics to consume.
  topics = ["garden_sensor_data"]
  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "json"

This configuration file is where Telegraf is set up to process the garden sensor data:``

  • InfluxDB Output: The [[outputs.influxdb_v2]] section configures Telegraf to write data to InfluxDB Cloud v3. You must replace the placeholders with your InfluxDB URL, token, organization, and bucket details.
  • Kafka Consumer Input: The [[inputs.kafka_consumer]] section configures Telegraf to subscribe to the garden_sensor_data topic on Kafka. It consumes the JSON-formatted sensor data, which is sent to InfluxDB for storage and analysis.

Together, these components create a robust pipeline where garden sensor data is generated, sent to Kafka, processed by Telegraf, and stored in InfluxDB Cloud v3, allowing you to monitor your garden’s environment in real-time.

Conclusion

This blog post describes how to start using InfluxDB, Kafka, and Telegraf. A Python script that generates garden data and sends it to a Kafka topic, Telegraf reads the data from the Kafka topic and writes it to InfluxDB. As always, get started with InfluxDB v3 Cloud here. In the next post, we’ll cover how to run the project, dive into the architecture and logic, and discuss some of the pros and cons of the selected stack. If you need help, please contact us on our community site or Slack channel. If you are also working on a data pipelining project with InfluxDB, I’d love to hear from you!