Transform Data with the New Python Processing Engine in InfluxDB 3

Navigate to:

In early January, we announced the launch of InfluxDB 3 Core and InfluxDB 3 Enterprise in public alpha. One of the newest included features is the InfluxDB 3 Processing Engine–a Python-based VM built to enable data transformation, enrichment, downsampling, alerting, and more, all from within the database itself. One month later, we’re excited to deliver a big update enabling new ways to interact with and transform your data.

Why we built the Processing Engine

At InfluxData, we focus on three key goals: collecting, organizing, and acting on time series data. Ensuring all three work together seamlessly is critical for delivering actionable insights at scale.

Over the years, we have developed tools like Kapacitor and Flux Tasks to help users transform and act on their data. InfluxDB 3 builds on these foundations by embedding a lightweight Python VM directly into the database—our Processing Engine—making it easier than ever to automate workflows, enrich data, and create custom processing logic within the database.

We expect users to use this new engine for crucial tasks, such as real-time transformation when data is ingested, running scheduled analyses, and building custom alerting modules.

Where and when to use the Processing Engine

The Processing Engine was built to solve real-world problems. We’ve already seen early users leverage it for anomaly detection, sending Slack alerts when thresholds are reached. One user applied it to enrich data with live weather information at the coordinates of their in-motion devices. We also see use cases for predictive analytics, API integrations, reporting, and more.

We chose to use Python for its incredible power and simplicity. You can set a weekly trigger to pull your data, leverage Pandas for analysis, craft a chart with Ploty, export to PDF, upload to S3, and send a Slack notification with a link to the report. This all happens within the database, and you can adapt and tweak it easily as adjustments are needed.

We believe in the power of LLMs, especially with Python code. Our hope is that they continue to empower users to quickly build plugins to solve custom problems, allowing them to focus on higher-order problems every day.

So, how does it work?

The Processing Engine conists of plugins and triggers. Plugins are custom Python scripts with access to the entire array of Python libraries. To run a plugin, it must be attached to one of four pre-defined event triggers:

  • WAL Flush: Executes whenever the Write-Ahead Log (WAL) is flushed to object storage (once per second by default). Use this to immediately assess and transform data as it’s written without needing to query it.
  • Scheduled Tasks: Runs on a defined schedule (specified using cron syntax). Think about using this trigger for use cases like periodic aggregations, data cleanup, automated reporting, etc.
  • On Request: Executes when a GET or POST request is made to a custom endpoint under /api/v3/engine. You can use this for webhook-based actions, external triggers, and user-initiated computations.
  • Parquet Persistence (Coming Soon): Triggers when data is persisted in Parquet format, enabling storage optimizations, batch processing, and more. This trigger isn’t yet available, but it will be released in the coming weeks.

We’ve designed each trigger type to integrate with existing workloads, giving you a flexible and scalable tool for different workloads (e.g., real-time vs batch processing).

Using the Processing Engine

For those interested, we’re going to get a bit more into the weeds here (we’ll have expanded guidance in our documentation). To start using the engine, specify a directory for storing plugins by providing the --plugin-dir argument when starting the server. You can reference and contribute to example plugins in our public GitHub repository. As more plugins are developed, we’ll create a more long-term space for discovering and leveraging InfluxData and community-built plugins.

All plugin types have access to a shared API that allows for seamless interaction with the database. This API provides the essential tools to read, transform, and write data efficiently with a simple user experience. Let’s break it down and then show how it can all be put back together.

Writing and Querying Data

Let’s walk through an example with IoT sensor data for monitoring wind speed, where we’ll ingest, understand, and take action. Data transformation is only useful if you can write enriched/processed data back into the system. That’s where LineBuilder comes in, allowing you to construct Line Protocol that can be written back into the database.

line = LineBuilder("weather")
    .tag("location", "us-midwest")
    .float64_field("wind_speed", 22.5)
    .time_ns(1627680000000000000)
influxdb3_local.write(line)

This snippet creates a new measurement called weather, tags it with a location, and records a temperature value at a specific timestamp. The write function commits the data to the database.

Sometimes, you need to pull in historical data to make a decision or generate a new metric. The query function makes this easy by letting you execute SQL queries from within your plugin. Plus, given that these types of queries are often analytical in nature, the columnar storage of InfluxDB makes them very performant in responding to the Processing Engine.

results = influxdb3_local.query("SELECT wind_speed FROM weather WHERE time > now() - INTERVAL '10 minutes'")

# Log the results to "info"
for row in results:
    influxdb3_local.info(f"Sensor reading: {row}")

This script fetches all records from wind_speed that fall within the last 10 minutes. The results are then logged for further processing. Whether you need to filter out anomalies, compute rolling averages, or flag unusual patterns, this function makes it happen.

Logging Messages

Good observability requires understanding what’s happening in real-time. The Processing Engine provides built-in logging functions (info, warn, error) to help with this.

influxdb3_local.info("Wind speed normal")
influxdb3_local.warn("Potential issue detected in weather data")
influxdb3_local.error("Critical failure: unknown sensor issue")

Also, these logs are non-ephemeral. They’re stored long-term in system tables and your server logs. Use them to help understand when things go wrong in your scripts and why, as well as standard logging to understand state over time.

Dynamic Arguments

Each plugin type can receive arguments from the trigger configuration. These arguments let you make your plugins more dynamic; this is built to simplify setting thresholds, defining filters, providing external API credentials, etc. In the below example, you can see the flexibility for variable situations, like wind speed tracking in a storm.

def process_writes(influxdb3_local, table_batches, args=None):
    threshold = float(args["wind_threshold"])    

   for table_batch in table_batches:
        for row in table_batch["rows"]:
            wind_speed = row.get("wind_speed")
            if wind_speed > threshold:
                influxdb3_local.warn(f"High wind speed detected: {wind_speed} mph at {row['time']}")

With this approach, you can adjust thresholds dynamically without modifying code. Want to change the threshold to 50mph since you know a storm is incoming and you don’t want constant logging? Just update the plugin configuration—there is no need for a redeployment of your script or a database restart.

How to get started and share feedback

We’re still just at the beginning and are highly invested in developing quality plugins for many different use cases. Over the next few months, expect more built-in functions, new triggers, and an easier way to find and share plugins. The Processing Engine is currently available in the InfluxDB 3 Core and InfluxDB 3 Enterprise public alpha for both native installations (new!) and Docker environments. Learn more about getting started in our documentation.

Also, we’d love to hear your feedback! Join the discussion on our Discord today, or hop into our Slack workspace.