TL;DR InfluxDB Tech Tips: Downsampling with Flight SQL and AWS Lambda
By
Anais Dotis-Georgiou /
Developer, Product
Feb 02, 2023
Navigate to:
This tutorial covers how to perform downsampling with the new InfluxDB storage engine, InfluxDB IOx, in InfluxDB Cloud (available on AWS us-east-1 and AWS eu-central-1 starting February 1st) using AWS Lambda. This tutorial describes how to:
-
Use the Flight SQL Python Library to query an InfluxDB Cloud instance.
-
Use SQL to downsample the data.
-
Convert the query output to a Pandas DataFrame.
-
Write the data back to InfluxDB with the InfluxDB v2 Python Client Library.
-
Create and test an AWS Lambda function.
-
Use CloudWatch or EventBridge to trigger the Lambda function on a schedule.
InfluxDB IOx addresses key user needs including (but not limited to):
-
Unlimited cardinality.
-
The ability to store logs and trace data.
-
Interoperability with ML and business analytics tools.
-
SQL support and fast analytics.
We achieved these goals by building InfluxDB IOx on the Apache ecosystem (Apache Parquet, Apache DataFusion, Apache Arrow, and Apache Flight SQL).
To learn more about why we chose these technologies and how the decision to implement them helps InfluxDB meet those goals, consider reading this blog post.
Requirements
This tutorial assumes you meet the following requirements:
- An AWS account. Follow this documentation to create one.
- Python 3.6
- InfluxDB v2 Python Client Library. Use
pip3 install 'influxdb-client[ciso]`
to install it. - Flight SQL Python Library. Use
pip install git+https://github.com/influxdata/flightsql-dbapi.git
orbash pip3 install flightsql-dbapi
- Pandas Library. Use
bash pip3 install flightsql-dbapi
You can also install all of the following dependencies with the following requirements.txt:
pandas==0.23.4
influxdb_client==1.30.0
flightsql-dbapi==0.0.1
Advantages of Arrow Flight SQL
Arrow Flight SQL is a “new general-purpose client-server framework to simplify high-performance transport of large datasets over network interfaces.” In other words, Arrow Flight SQL is a protocol for accelerating SQL database access for databases that take advantage of Apache Arrow, a framework for defining in-memory columnar data. InfluxDB uses Apache Arrow to define its in-memory data and Flight SQL to transport these large datasets. This means that InfluxDB users can now easily query large datasets. The performance benefits associated with Flight SQL and Apache Arrow makes tasks like downsampling, data analysis, and data preparation outside of InfluxDB feasible and efficient. This means that users can reliably and efficiently perform analytics and data management tasks with the tools and languages that they’re most comfortable with, like Python and Pandas (instead of having to use Flux and the InfluxDB Task system). Flight SQL also supports JDBC/ODBC drivers which means that InfluxDB users will be able to connect InfluxDB to business analytics and visualization tools like Superset, PowerBI, and Tableau. Look for more blog posts on these topics coming soon!
Python script
The following script is the most basic example of how to downsample data with the new database engine for InfluxDB. This script follows these steps:
- Import our dependencies.
- Gather authentication credentials including:
- All Access token. Note: If you’re using AWS Lambda to perform your downsampling tasks you probably want to use the AWS Secrets Manager instead of simply storing the token in your script.
- URL
- Organization ID (org ID).
- Bucket.
- Instantiate the Flight SQL client
- Execute a SQL query. Here we use the
Date_Bin()
function to perform the downsampling. We are converting the raw high-precision data from the last hour into 1 minute sum values. - Create a reader object to consume the result.
- Read all data into a
pyarrow.Table
object. - Convert the data to a Pandas DataFrame. Note: This script uses SQL to perform the downsampling to showcase the simplest example. However, this approach enables developers to take advantage of Pandas to perform any data manipulation and analysis they need after this step.
- Instantiate the InfluxDB v2 Python Client Library and write the Pandas DataFrame back to InfluxDB.
# 1. Import dependencies.
from flightsql import FlightSQLClient
import pandas as pd
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
# 2. Gather authentication credentials.
token = "-_KiZFxxx"
url = "https://us-east-1-1.aws.cloud2.influxdata.com/"
org = "28d1f2f565460a6c"
bucket = "anais-iox"
# 3. Instantiate the FlightSQL Client
client = FlightSQLClient(host='us-east-1-1.aws.cloud2.influxdata.com',
token=token,
metadata={'bucket-name': 'anais-iox'},
features={'metadata-reflection': 'true'})
# 4. Execute a query against InfluxDB's Flight SQL endpoint
query = client.execute("SELECT DATE_BIN(INTERVAL '1 minute', time, '2019-09-18T00:00:00Z'::timestamp) as time, SUM(\"co\") as 'sum_co', SUM(\"temperature\") as 'sum_temp', SUM(\"humidity\") as 'sum_hum' FROM \"airSensors\" WHERE time >= now() - interval '1 hour' GROUP BY time")
# 5. Create reader to consume result
reader = client.do_get(query.endpoints[0].ticket)
# 6. Read all data into a pyarrow.Table
Table = reader.read_all()
print(Table)
# 7. Convert to Pandas DataFrame
df = Table.to_pandas()
df = df.sort_values(by="time")
print(df)
data_frame = df
# 8. Write the Pandas DataFrame back to InfluxDB
with InfluxDBClient(url=url, token=token, org=org) as client:
client.write_api(write_options=SYNCHRONOUS).write(bucket=bucket,
record=data_frame,
data_frame_measurement_name="downsampled",
data_frame_timestamp_column="time")
Creating an AWS Lambda function
To create an AWS Lambda function, first log into your console. Next, search for AWS Lambda and select the service. Then, click Create Function.
Now you can name your function, specify the language you want to use, the architecture, and any specific permissions. Click Create Function at the bottom right when you’re done.
Now you can include your Python script. Hit Test to make sure that it works as expected. Verify that you are successfully writing your downsampled data to InfluxDB by querying for it. It is also recommended that you use the AWS Secrets Manager instead of simply storing the token in your script.
Finally, select Deploy to deploy your script. If you’re looking to perform a downsampling task, you’ll need to run your Lambda script on a schedule. You can use CloudWatch or EventBridge to create a rule and target your AWS Lambda function to run on a user-defined schedule.
Use the following documentation depending on your preferred service:
After performing the downsampling task with an AWS Lambda function, we verify that our data is being downsampled and written to a new measurement, “downsampled”, in InfluxDB.
Conclusion
To take advantage of all the advancements with InfluxDB IOx, sign up here. If you would like to contact the InfluxDB IOx developers, join the InfluxData Community Slack and look for the #influxdb_iox channel.
I hope this blog post inspires you to explore InfluxDB Cloud and take advantage of Flight SQL to transport large datasets from InfluxDB for data processing with the tools of your choice. If you need any help, please reach out using our community site or Slack channel. I’d love to hear about what you’re trying to achieve and what features you’d like the task system in InfluxDB to have.