Building Your First Python Plugin for the InfluxDB 3 Processing Engine
By
Anais Dotis-Georgiou /
Developer
Mar 04, 2025
Navigate to:
**Note: This blog runs the InfluxDB 3 Core CLI not Enterprise.
One of the most compelling features of InfluxDB 3 is its built-in Python Processing Engine, a versatile component that adds powerful, real-time processing capabilities to InfluxDB 3 Core. For those familiar with Kapacitor in InfluxDB 1.x or Flux Tasks in 2.x, the Processing Engine represents a more streamlined, integrated, and scalable approach to acting on data. With the ability to run Python code directly within the database, you no longer need external servers or complex data pipelines to process incoming information.
The Processing Engine can trigger actions as data arrives, on-demand, or on a schedule, making it ideal for real-time transformation, normalization, alerting, downsampling, and edge data replication. In this blog, we’ll build a Python plugin that standardizes IoT data from diverse sources. Standardization is critical because IoT devices often produce data in inconsistent formats—different units, structures, or naming conventions—complicating analysis and decision-making. By normalizing this data at the point of ingestion, you simplify downstream queries, ensure consistency across datasets, and improve the reliability of your analytics.
Requirements
To follow this tutorial, you’ll need:
- Docker installed on your machine.
- A code editor like Visual Studio Code (VS Code) or another integrated development environment (IDE) of your choice.
Note: Using Docker for this tutorial ensures you can easily spin up a compatible environment without complex setup steps, allowing you to focus on building and testing your Python plugin. We’ll walk through the process of creating the plugin from scratch with Docker, but you can also run this tutorial locally without the Docker commands.
This tutorial also assumes you have some familiarity with Docker fundamentals, such as running containers, managing images, and using Docker Compose. Additionally, it helps to have a basic understanding of InfluxDB, including the line protocol ingest format and the InfluxDB 3 Core CLI. If you’re new to any of these concepts, we recommend reviewing the InfluxDB documentation or Docker’s getting-started guide before proceeding:
Process overview
Inconsistencies in units, field names, and measurement structures are common when dealing with IoT data from various devices. Different sensors might report temperature in Fahrenheit or Celsius, pressure in pascals or kilopascals, and inconsistent naming conventions like humidity_percent vs. humidity. This variability makes querying, analyzing, and correlating data unnecessarily complex, leading to errors in reporting, delayed insights, and increased maintenance overhead.
Standardizing units and field names during data ingestion ensures a consistent and reliable dataset for downstream analysis. Beyond simplifying analytics, consistent naming conventions are essential for maintaining compliance with industry standards, regulatory requirements, and internal data governance policies. With standardized, well-structured data, teams can more confidently generate reports, audit historical records, and integrate with other systems that rely on clear, predictable data formats.
Our plugin will solve this by standardizing units and names as the data is ingested, ensuring a consistent and reliable dataset for downstream analysis.
Here’s a high-level look at the process:
- Create a Plugin Directory: Set up a plugin directory in your local InfluxDB 3 environment and give it read/write permissions.
- Start InfluxDB 3 with Docker: Pull the InfluxDB 3 Core Docker image and launch a container, mounting the plugins directory.
- Write the Python Processing Script: Create a Python script to convert measurements into standardized units and naming conventions.
- Create Source and Destination Databases: Use the CLI to create databases for raw and standardized data.
- Test and Enable the Plugin: Test the script with sample data, then enable the plugin.
- Write Data and Verify the Transformation: Ingest sample IoT data with inconsistent formats and query the destination database to confirm successful standardization.
Building your first plugin
As mentioned in the overview, start by creating a plugin directory at:
mkdir -p ~/influxdb3/plugins
Ensure the directory has the necessary read and write permissions:
chmod 755 ~/influxdb3/plugins
Next, pull the latest InfluxDB 3 Enterprise Docker image:
docker pull quay.io/influxdb/influxdb3-enterprise:latest
Refer to the official documentation for details on running the Enterprise or Core editions, depending on your setup. Now, we’re ready to start the InfluxDB 3 container with the following command:
docker run -it \
-v ~/influxdb3/data:/var/lib/influxdb3 \
-v ~/influxdb3/plugins:/plugins \
-p 8181:8181 \
--user root \
quay.io/influxdb/influxdb3-enterprise:latest serve \
--node-id my_host \
--object-store file \
--data-dir /var/lib/influxdb3 \
--plugin-dir /plugins
Let’s break down this command:
-v ~/influxdb3/data:/var/lib/influxdb3
: Mounts the local data directory as the database’s persistent storage location.-v ~/influxdb3/plugins:/plugins
: Mounts the plugins directory where our Python plugin will live, making it accessible to the Processing Engine.-p 8181:8181
: Maps port 8181 from the container to the host, allowing access to the InfluxDB 3 API.--user root
: Ensures the container runs with root privileges, which are required for the Processing Engine to access the plugins.- serve: Starts the InfluxDB 3 server.
--node-id my_host
: Assigns a unique node ID, which can be customized based on your environment.--object-store file
: Configures the database to use the local filesystem for object storage.--data-dir /var/lib/influxdb3
: Points to the directory where InfluxDB will persist its data.--plugin-dir /plugins
: Instructs InfluxDB to load any available plugins from the mounted plugins directory.
Note: If you’re running InfluxDB 3 Core or Enterprise locally, you’ll want to serve your InfluxDB 3 instance and set the plugin directory with the following command (learn more about the serve command options with the CLI Operations for InfluxDB 3 Core and Enterprise or the docs):
influxdb3 serve --object-store file --data-dir ~/.influxdb3/data --node-id my_host --plugin-dir ~/influxdb3/plugins
With this setup, the Processing Engine will have access to your Python plugins and be ready to apply transformations to incoming data.
Now, we’re ready to write the Python script to handle our data standardization. We’ll name our script hello_world.py
. It will contain a function called process_writes
that performs the core transformation logic. This function processes incoming batches of sensor data, standardizing field names, tags, and units to ensure consistency across datasets. It iterates through each table batch, logs key information, and skips tables that match a predefined exclusion rule. For each row, it converts sensor names and locations to lowercase and replaces spaces with underscores to maintain uniform naming conventions. It also enriches the data by adding a timestamp field indicating when the record was processed. Finally, the function writes the transformed data to a new InfluxDB 3 database called unified_sensor_data
, ensuring all sensor records share a consistent structure for easier querying, analysis, and compliance with data standards.
import datetime
def process_writes(influxdb3_local, table_batches, args=None):
# Log the provided arguments
if args:
for key, value in args.items():
influxdb3_local.info(f"{key}: {value}")
# Process each table batch
for table_batch in table_batches:
table_name = table_batch["table_name"]
influxdb3_local.info(f"Processing table: {table_name}")
# Skip processing a specific table if needed
if table_name == "exclude_table":
continue
# Analyze each row
for row in table_batch["rows"]:
influxdb3_local.info(f"Row: {row}")
# Standardize sensor names (lowercase, no spaces)
sensor_name = row.get("sensor", "unknown").lower().replace(" ", "_")
influxdb3_local.info(f"Standardized sensor name: {sensor_name}")
# Standardize location and other tags by replacing spaces with underscores
location = row.get("location", "unknown").lower().replace(" ", "_")
# Add enriched field (e.g., timestamp)
line = LineBuilder(table_name)
line.tag("sensor", sensor_name)
line.tag("location", location)
line.float64_field("temperature_c", row.get("temperature", 0))
line.string_field("processed_at", datetime.datetime.utcnow().isoformat())
# Write the enriched data to a different database
influxdb3_local.write_to_db("unified_sensor_data", line)
influxdb3_local.info("Processing completed")
Create the source and destination databases with the influxdb3 create database command:
docker exec {container id} influxdb3 create database my_databasedocker exec {container id} influxdb3 create database unified_sensor_data
Testing your plugin
You can test your plugin on a target database with the influxdb3 test command:
docker exec {container id} influxdb3 test wal_plugin \
-d my_database \
--lp="sensor_data,location=living\\ room temperature=22.5 123456789" \
/plugins/hello-world.py
The command outputs a JSON object with logging information, database writes, and errors. It shows us that the trigger is parsing data from the sensor_data
measurement, standardizing the data, and writing the transformed line protocol to the unified_sensor_data
database without errors:
{
"log_lines": [
"INFO: Processing table: sensor_data",
"INFO: Row: {'location': 'living room', 'temperature': 22.5, 'time': 123456789}",
"INFO: Processing completed"
],
"database_writes": {
"my_database": [],
"unified_sensor_data": [
"sensor_data,sensor=unknown,location=living\\ room temperature_c=22.5,processed_at=\"2025-02-13T21:33:44.117195\""
]
},
"errors": []
}
Create and enable your trigger
Now that the test passes successfully, let’s create a trigger and enable it to run our Python plugin. The following command runs the influxdb3 create trigger command. The -d
option specifies the database where the trigger will be applied. The --plugin-filename="/plugins/hello-world.py"
option points to the plugin script that will be executed when the trigger is activated. The --trigger-spec="all_tables"
option indicates that the trigger should apply to all tables within the specified database. Finally, hello_world_trigger
is the name assigned to the trigger.
docker exec {container id} influxdb3 create trigger \
-d my_database \
--plugin-filename="/plugins/hello-world.py" \
--trigger-spec="all_tables" \
hello_world_trigger
Now we can enable it with influxdb3 enable trigger command:
docker exec {container id} influxdb3 enable trigger \
--database my_database \
hello_world_trigger
Verify your trigger and plugin are working
To verify that our trigger is enabled correctly and that our plugin is working as expected, we can write a line to the my_database
source database and query the unified_sensor_data database
to verify that the standardization is working as expected. Use the influxdb3 write command to accomplish the former:
docker exec {container id} influxdb3 write \
--database my_database \
"sensor_data,sensor=TempSensor1,location=living\\ room temperature=22.5 123456789"
Finally, we verify that our data has been transformed to our standard with the influxdb3 query command:
docker exec {container id} influxdb3 query \
--database unified_sensor_data \
"SELECT * FROM sensor_data"
The output confirms that the spaces in our location
tag value were correctly replaced with underscores, a processed_at
field was added, our sensor
tag values were converted to lowercase, and our temperature
field key now contains the temperature unit.
Final thoughts
I hope this tutorial helps you start creating Python Plugins and enabling triggers in InfluxDB 3 Core and Enterprise with Docker. I encourage you to look at the InfluxData/influxdb3_plugins as we start to add examples and plugins there. I also invite you to contribute any plugin that you create there! Get started by downloading Core or Enterprise. Share your feedback with our development team on Discord in the #influxdb3_core channel, Slack in the #influxdb3_core channel, or our Community Forums.