Getting Started with Bytewax and InfluxDB

Navigate to:

In this tutorial, we’ll explore how Bytewax can seamlessly integrate with InfluxDB to tackle a common challenge: downsampling. Whether you’re dealing with IoT data, DevOps monitoring, or any time series metrics, downsampling (or materialized views) is your key to managing your time series data for long-term storage without losing essential trends. Bytewax is an open source Python framework for building highly scalable dataflows to process any data stream. InfluxDB is the leading time series database and platform. By the end of this guide, you’ll have a clear understanding of how to leverage Bytewax to build scalable data pipelines, preparing it for insightful analysis with InfluxDB Cloud v3. The corresponding repo for this blog post can be found here.

Requirements

To run this example, you’ll need the following:

  • InfluxDB Cloud v3 Account: Sign up for a free trial here.
  • Bytewax: You can simply pip install with pip install bytewax or follow this documentation.

You’ll need to set the following environment variables for InfluxDB (or hard code them in the scripts):

INFLUXDB_TOKEN: Your InfluxDB authentication token.
INFLUXDB_DATABASE: The name of your InfluxDB database.
INFLUXDB_ORG: Your InfluxDB organization name.

You’ll also want to have some real-time data being written to your InfluxDB instance that you can leverage in a Bytewax dataflow. I recommend configuring a telegraf agent to pull CPU metrics from your machine.

A basic request

The simplest way to get data from your InfluxDB instance into your Bytewax Dataflow is to use a SimplePollingSource class. This script leverages Bytewax to create a data pipeline that periodically polls an InfluxDB database for data and processes it in real-time. It defines a custom data source class, InfluxDBSource, which connects to the InfluxDB database and executes a query to retrieve data from the last 15 seconds. The retrieved data is then passed into a Bytewax dataflow, where it is processed and output to the console.

import bytewax.operators as op
from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.inputs import SimplePollingSource
from influxdb_client_3 import InfluxDBClient3
import logging
from datetime import timedelta

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class InfluxDBSource(SimplePollingSource):
    def next_item(self):
        client = InfluxDBClient3(host="your host URL: i.e. us-east-1-1.aws.cloud2.influxdata.com ",
                                 database="your database",
                                 token="your InfluxDB Token")
        query = "SELECT * from cpu WHERE time >= now() - INTERVAL '15 seconds' LIMIT 5"
        data = client.query(query=query, mode="pandas")
        return data

flow = Dataflow("a_simple_example")

stream = op.input("input", flow, InfluxDBSource(timedelta(seconds=15)))

op.output("out", stream, StdOutSink())

To run this script use: python3 -m bytewax.run basic_request.py

Now that you have your time series data coming from your InfluxDB instance, you can leverage a wide variety of tools and tutorials that Bytewax has to offer for time series data:

Getting started with materialized views with Bytewax and InfluxDB

You can also use these custom Sources and Sinks for InfluxDB to perform downsampling (or create materialized views) with Bytewax. This dataflow.py script provides an example that uses a custom sink and source to downsample one minute of data every ten seconds by:

  1. Querying InfluxBD and returning a dataframe.

  2. Using SQL to aggregate the values.

  3. Writing the downsampled dataframe back to InfluxDB.

import os
import logging
from datetime import timedelta, datetime, timezone

import pandas as pd
import bytewax.operators as op
from bytewax.dataflow import Dataflow
from influx_connector import InfluxDBSink, InfluxDBSource

TOKEN = os.getenv("INLFUXDB_TOKEN", "your InfluxDB token")
DATABASE = os.getenv("INFLUXDB_DATABASE", "your InfluxDB Database")
ORG = os.getenv("INFLUXDB_ORG", "your org ID")

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Your custom aggregation query
query = """
SELECT
  date_bin(INTERVAL '15 seconds', time, TIMESTAMP '1970-01-01 00:00:00Z') AS time,
  avg("usage_system") AS usage_system_avg
FROM "cpu"
WHERE
  time >= now() - INTERVAL '1 minute'
GROUP BY 1
ORDER BY time DESC
"""

# Dataflow setup for querying and aggregating values
flow = Dataflow("a_simple_query")

# Create InfluxDBSource with the custom query
inp = op.input("inp", flow, InfluxDBSource(

    timedelta(seconds=10),  # Poll every 10 seconds
    "your host URL i.e. https://us-east-1-1.aws.cloud2.influxdata.com",
    DATABASE,
    TOKEN,
    "cpu",  # Measurement name
    ORG,
    datetime.now(timezone.utc) - timedelta(minutes=1),  # Query data from the last minute
    query=query  # Pass the custom query
))

# Inspect the input data
op.inspect("input_query", inp)

# Use the custom sink to write the DataFrame directly back to InfluxDB
op.output("out", inp, InfluxDBSink(
    host="https://us-east-1-1.aws.cloud2.influxdata.com",
    database=DATABASE,
    token=TOKEN,
    org=ORG,
    data_frame_measurement_name="cpu_aggregated",
    # data_frame_tag_columns=['cpu'],  # Specify and columns that are tags if applicable
    data_frame_timestamp_column='time'  # Specify the column that contains timestamps
))

This script sets up a Bytewax dataflow to periodically query data from an InfluxDB database every ten seconds, focusing on the CPU measurement. It retrieves data from the last minute, aggregates the average usage_system values in 15-second intervals, and then outputs the aggregated data back to a different measurement in InfluxDB (cpu_aggregated). The script is designed to easily configure InfluxDB credentials via environment variables, ensuring secure and flexible usage. ensuring secure and flexible usage. Logging is also set up to monitor the dataflow’s activity.

Using the official Bytewax InfluxDB connectors

I also want to encourage you to use the official source and sink Bytewax connectors for InfluxDB (thank you to Zander for writing this connector and adding InfluxDB to Bytewax’s large collection of source and sink options!!). The repo above includes documentation on how to use the connectors as well as installation instructions. You follow the exact same steps as shared in this blog post, but you don’t need to include the custom Source and Sink python script that we used in the example repo for this blog post. The official Bytewax InfluxDB source and sink also supports many other write types other than dataframes.

Conclusion

As always, get started with InfluxDB v3 Cloud here. In the next post, we’ll cover how to run the project, dive into the architecture and logic, and talk about some of the pros and cons of the selected stack. If you need help, please contact us on our community site or Slack channel. If you are also working on a data processing project with InfluxDB and Bytewax, I’d love to hear from you! Finally, I also recommend connecting with the Bytewax community; they’re extremely helpful. Thank you, Zander and Laura, for answering all of my questions, helping with this example, and being so welcoming!