Data Pipelining with InfluxDB
By
Anais Dotis-Georgiou /
Developer
Nov 21, 2024
Navigate to:
In this blog post, we’ll explore how to build a data pipeline using Kafka, Faust, and InfluxDB to effectively ingest, transform, and store data. We’ll start with an overview of Kafka, a high-performance messaging platform, and Faust, a Python library designed for stream processing, now maintained by the community as Faust-streaming. After establishing a foundation in these tools, we’ll demonstrate how to use the Telegraf Kafka Consumer Input Plugin to read data from a Kafka topic and write it to InfluxDB. Finally, I’ll share an example pipeline to tie it all together. Find the repo that accompanies this blog post here. The scratchpad.md contains all of the commands used in this tutorial to make it easier to follow.
A diagram for using InfluxDB with Kafa, Faust, and Telegraf. Data is published to a Kafka topic; Faust reads and transforms the data before publishing it to a new topic; Telegraf tails the new topic and writes the data to InfluxDB.
Note on Faust-streaming:
Faust-streaming, a Python-native stream processing library, integrates well with Kafka and is great for building real-time, event-driven applications. However, it may not match the performance or scalability of Java-based alternatives, and as a community-maintained project, it might lack the robust support and updates seen in more established frameworks.
Requirements
This tutorial assumes that you meet the following requirements:
- InfluxDB Cloud Free Trial or other version of InfluxDB
- Telegraf installed locally
- Docker
- Python package manager like Pipenv
You’ll also need the following InfluxDB resources:
- Bucket
- Token
- Organization ID (The fastest way to find your organization ID is from the host URL of your InfluxDB, i.e., https://us-east-1-1.aws.cloud2.influxdata.com/orgs/<your or ID is here>)
I recommend using the UI to create these resources because it’s the fastest. You can also use the InfluxDB CLI for resource creation and management, but you have to configure the CLI first.
To run any of the following examples, you’ll need to follow these steps:
- Clone the repo:
git clone https://github.com/InfluxCommunity/kafka_faust_examples
- Change directories:
cd first
- Spin up a container running Kafka:
docker compose up -d
- Install the dependencies and activate a virtual environment:
pipenv install pipenv shell
Hello World example with Kafka and Faust
The “Hello World” example demonstrates the basics of a Faust streaming application. It sets up a simple Faust app that reads events from a Kafka topic and prints each event to the console. This example introduces Faust’s structure, helping you understand how to create, configure, and run a Faust application with minimal setup.
Let’s take a look at the Faust code:
import faust
app = faust.App(
'hello_world',
broker='kafka://localhost:9092',
# Be explicit about using in-memory Table storage
store='memory://',
value_serializer='raw',
)
greetings_topic = app.topic('greetings')
@app.agent(greetings_topic)
async def greet(greetings):
async for greeting in greetings:
print(greeting)
This Faust application starts by creating an instance named “hello_world.” The application connects to a Kafka broker running locally on port 9092, which is the endpoint for sending and receiving messages. It uses in-memory tables for temporary storage, meaning data won’t persist between sessions, which is helpful for testing or short-lived applications. The messages are handled in their raw form without additional serialization.
The app defines a Kafka topic called “greetings,” which acts as the message channel for this example. An agent, a special kind of coroutine in Faust, is set up to listen to this “greetings” topic continuously. This agent, named “greet,” processes each incoming message asynchronously. Each message received from the “greetings” topic prints to the console.
When this application runs, it connects to Kafka, listens for messages on the specified topic, and processes them as they come in. To see it in action, start the Faust worker for this app:
faust -A hello_world worker -l info
Then send test messages to the “greetings” topic:
faust -A hello_world send greetings "Hello Kafka"
Each test message appears in the console, demonstrating the app’s real-time message processing capability with Kafka and Faust. We can also open an interactive shell inside the Kafka Docker container:
docker exec -it first-kafka-1 /bin/sh
Start a Kafka console consumer to read all messages in the topic from the start, including historical messages:
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic greetings --from-beginning
Page views example with InfluxDB and Telegraf
The “Page Views” example showcases real-time counting of page views using Faust. It reads page view events from a Kafka topic, aggregates the number of views per page, and stores these counts in an in-memory table. For each page view, the updated count is published to another topic, allowing consumers to access up-to-date view counts for each page in real time. This example highlights Faust’s ability to handle stateful stream processing and perform real-time aggregations. telegraf.conf is configured to consume the JSON messages from the Kafka topic and write them to InfluxDB.
import faust
# Initialize the Faust app
app = faust.App(
'page_views',
broker='kafka://localhost:9092',
topic_partitions=4,
)
# Define the structure of a page view event
class PageView(faust.Record):
id: str
user: str
# Topics
page_view_topic = app.topic('page_views', value_type=PageView)
page_view_count_topic = app.topic('page_view_counts')
# Table to maintain the count of views per page
page_views = app.Table('page_views', default=int)
# Agent to consume and process page views
@app.agent(page_view_topic)
async def count_page_views(views):
async for view in views.group_by(PageView.id):
# Increment the count for the specific page ID
page_views[view.id] += 1
# Create a JSON-like dictionary for the count message
count_message = {
"id": view.id,
"count": page_views[view.id]
}
# Send the JSON message to the page_view_count_topic
await page_view_count_topic.send(value=count_message)
if __name__ == '__main__':
app.main()
This Faust application, named “page_views,” also connects to a Kafka broker running on localhost:9092 but focuses on tracking and counting user views per page. It defines a PageView record, which represents each event and contains fields for the page ID and user.
Two Kafka topics are created for this app: page_views for incoming view events and page_view_counts for publishing aggregated counts. The application maintains these counts in a temporary in-memory table, where it stores the total view count for each page by ID.
An agent listens to the page_views topic and processes each message by incrementing the count for the specific page ID. Once updated, it constructs a JSON message with the page ID and count, then sends it to the page_view_counts topic asynchronously.
Start the Faust worker for this app, and it will consume and aggregate page views in real-time with:
faust -A page_views worker -l info
Sends a test message to the page_views topic in Kafka, which the page_views Faust application is listening to with:
faust -A page_views send page_views '{"id": "foo", "user": "bar"}'
Now, we’re ready to start writing aggregated page view counts to InfluxDB with Telegraf. To configure Telegraf, we need to add some authentication credentials to the config and specify how we want to parse the JSON from the topic:
[[inputs.kafka_consumer]]
brokers = ["localhost:9092"]
topics = ["page_view_counts"]
data_format = "json_v2"
[[inputs.kafka_consumer.json_v2]]
measurement_name = "view_count"
[[inputs.kafka_consumer.json_v2.field]]
path = "id"
type = "string"
[[inputs.kafka_consumer.json_v2.field]]
path = "count"
type = "int"
[[outputs.influxdb_v2]]
urls = ["your host url i.e. https://us-east-1-1.aws.cloud2.influxdata.com/"]
# place your influxdb token and org ID here
token = "your token"
organization = "your org ID"
bucket = "views"
This Telegraf configuration defines a kafka_consumer input plugin to consume JSON messages from the Kafka topic page_view_counts and extract two fields: ID as a string and count as an integer, storing them in the InfluxDB measurement view_count. The influxdb_v2 output plugin specifies the destination InfluxDB bucket (views) along with the necessary connection details, including the URL, token, and organization. The full config also writes the line protocol values to stdout for verification. A diagram of the Telegraf plugins used in the Page Views example.
You can now continue to publish new messages to the topic, watch the count aggregate, and visualize the results in InfluxDB.
Data pipelining for IIoT—is machine learning necessary?
In the Industrial Internet of Things (IIoT), data pipelining is essential for monitoring and controlling complex industrial processes in real-time. A quintessential example is the Continuous Stirred Tank Reactor (CSTR), a chemical reactor widely used across various industries—including chemical manufacturing, pharmaceuticals, food and beverage production, biotechnology, energy, and environmental engineering. CSTRs facilitate continuous reactions where reactants are steadily fed into the reactor, and products are continuously removed, making them vital for processes like polymer production, fermentation, biodiesel synthesis, and wastewater treatment.
In the PID Controllers and InfluxDB Part 1 and Part 2, we create a digital twin of a CSTR integrated with a Proportional-Integral-Derivative (PID) controller. The PID controller serves as a feedback mechanism that continuously adjusts inputs to maintain desired output levels, ensuring stability and precision in the process. We simulate a first-order reaction that converts chemical A into chemical B—akin to real-world reactions like the hydrolysis of esters or ethanol production.
In practical applications, sensors track these values and adjust them to maintain optimal operating conditions. This example underscores that effective modeling and maintenance in IIoT can often be achieved through differential equations and accurate simulations without always resorting to machine learning or deep learning solutions. Given the well-defined nature of industrial processes, traditional modeling techniques frequently provide the necessary precision and insight, highlighting the enduring value of foundational engineering principles in the age of digital transformation.
Final thoughts and resources
Since Faust-streaming is community-maintained, you might find it lacking in features or prefer a solution with stronger support and security assurances. I recommend Comparison of Stream Processing Frameworks Part 1 and Part 2 for a good overview of various options and their advantages and disadvantages. You might also enjoy the following resources that highlight how to use InfluxDB with other stream processing tools:
As always, get started with InfluxDB v3 Cloud here. If you need help, please contact us on our community site or Slack channel. If you are also working on a data processing project with InfluxDB, I’d love to hear from you!