Flux: The Key to Edge Data Replication with InfluxDB
By
Anais Dotis-Georgiou /
Product
Oct 06, 2022
Navigate to:
This article was originally published in The New Stack and is reposted here with permission.
EDR enables developers to use the full capabilities of InfluxDB at the edge. Developers also can use that same data in the cloud for different purposes.
Flux is the data scripting and query language for the InfluxDB time series database platform, enabling useful features such as Edge Data Replication (EDR).
EDR allows developers to automatically replicate data from a bucket in an open source instance of InfluxDB to an instance of InfluxDB Cloud. This feature changes the way IoT developers think about building IoT applications on top of InfluxDB because it creates a durable and reliable data transfer between these different elements of a given system. EDR enables developers to leverage the full capabilities of InfluxDB at the edge, for the edge. At the same time, it allows developers to use that same data, or a subset of that data in the cloud, for entirely different purposes.
EDR can enable tasks such as gathering high-fidelity data from edge sources, cleaning and transforming data at the edge, consolidating edge data in the cloud to generate a global view of the edge as a whole, and for time series analytics and alerting on consolidated data in the cloud.
The Flux advantage
As a fully functional scripting and query language, Flux has a lot of power. It allows you to query time series data, setting the stage for creating alerts, managing time series data life cycles, and for cleaning, analyzing and transforming data.
Flux has an advantage in that it provides the ability to execute these queries and capabilities server-side, increasing performance. In fact, some Flux functions push queries down into the InfluxDB platform, executing them on storage to prioritize speed and memory optimization.
You can learn more about pushdowns in InfluxDB here.
Querying with Flux
A simple Flux query looks like this:
from(bucket: "bucket1")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tag1 == "tagvalue1")
|> filter(fn: (r) => r._field == "field1")
You use a from()
function to select the bucket where you want to query data. The range()
function specifies the time window or interval for your data query. Finally, filter()
functions allow you to specify which subsets of your data you want to query.
InfluxDB uses measurements to group or structure large subsets of data together, tags to store metadata about your time series data, and fields to store the actual values of your time series data.
For example, to query weather data, a Flux query might look like this:
from(bucket: "weather")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "United States")
|> filter(fn: (r) => r.location == "Austin")
|> filter(fn: (r) => r.temperature == 78.0)
To learn more about writing Flux queries, there are plenty of resources available to get you started.
- Beginner Flux and InfluxDB Essentials: These free InfluxDB University courses are a great place to start your Flux journey.
- TL;DR InfluxDB Tech Tips: Converting InfluxQL Queries to Flux Queries: If you already have experience with InfluxQL, this post is a great resource for transitioning to Flux.
- Introduction to Flux: This section of “Time to Awesome,” a book for IoT application developers looking to build on top of InfluxDB, introduces Flux to developers.
- Flux Documentation: The Flux docs should be a go-to resource for anyone writing Flux code.
Managing your time series data life cycle
Flux allows you to transform and aggregate time series data in a variety of ways. A common use of Flux is to create tasks to downsample data for materialized views of your data. Downsampling transforms high-resolution data to a lower-resolution summary. Among other things, downsampling reduces the overall disk size of your InfluxDB instance while retaining the overall shape of your data.
A downsampling task in Flux might look like this:
// Task Options
option task = {name: "downsample task example", every: 1w}
// Defines a data source
data = from(bucket: "weather")
// queries data for 1w based off of the task configuration options above
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "United States")
|> filter(fn: (r) => r.location == "Austin")
|> filter(fn: (r) => r.temperature == 78.0)
data
// Windows and aggregates the data in to 1h averages
|> aggregateWindow(fn: mean, every: 1h)
// Stores the aggregated data in a new bucket
|> to(bucket: "data-downsampled")
This task takes high-precision temperature data and collects one-hour averages, then writes that data to a new bucket using the to()
function.
Alert on your data
Given the recent introduction of Edge Data Replication, we want to dive deeper into how Flux can be used to enable data transformation from the edge to the cloud. Flux offers a wide variety of ways to check your data to see if it meets certain conditions and to then alert on it. You can use Flux to set alerts based on data you use or collect at the edge as well as data consolidated in the Cloud.
There are several different ways to create alerts in Flux. You can build threshold or ‘deadman’ checks through the InfluxDB Cloud UI. This method automatically generates Flux script for each alert you create, and uses the InfluxDB checks and notifications system. Additionally, you can write your own custom check and notification tasks. How you check and alert on your data is up to you. Flux offers the user a lot of flexibility when it comes to processing your data. You can send alerts to the following notification endpoints with Flux to Slack, Microsoft teams, PagerDuty and many others:
To learn more about creating checks and notifications, take a look at the following resources:
- An Alert Task: This section of “Time to Awesome” describes how to write an alert task that checks your data and sends an alert to Slack.
- Checks and Notifications: This section of “Time to Awesome” provides a detailed description of how the checks and notifications system in InfluxDB works.
- Create a check in the InfluxDB UI: This documentation describes several different ways to create a check with InfluxDB and Flux.
- Custom Checks Documentation: The InfluxDB docs are always a great resource for all things Flux and InfluxDB.
Analyze, clean, and transform your data
Transforming and applying statistical analysis to your time series data allows you to gain valuable insights from it. You also might need to clean and prepare your data before using the Edge Data Replication feature and consolidating it in the cloud.
Flux has many different functions for analyzing and preparing time series data. There are transformations functions for statistical time series analysis, transformations functions for dynamic statistical and fundamental time series analysis, and technical momentum indicators for financial analysis. There are also additional packages for Math for working with geotemporal data.
Flux can also transform your data. For example, joins (video), unions and pivots (video) change the shape of your data.
Another critical component of successfully using time series data is the ability to manipulate timestamps. There are a set of functions allow you to manipulate your timestamps and execute a variety of time series transformations.
You can also use Flux to write custom algorithms. For example, these following links demonstrate two different algorithms written in Flux for anomaly detection:
- Anomaly Detection with Median Absolute Deviation
- A Deep Dive into Machine Learning in Flux: Naive Bayes Classification
The power of edge data replication in InfluxDB
Recently, InfluxDB released a new feature called Edge Data Replication. EDR allows you to configure a bucket in an open source instance of InfluxDB to automatically replicate data from that bucket to a bucket in a Cloud instance of InfluxDB.
For example, using EDR you can run multiple OSS instances on the edge or in the fog and collect high-precision data there. You can then create tasks to downsample, process, or transform that data before automatically replicating it to an instance of InfluxDB Cloud. You can use this feature to build hybrid architecture and save costs by minimizing the amount of data you transfer to the cloud, or to clean your data before it reaches your cloud datastore, making it more efficient.
Setting up Edge Data Replication only requires a few steps:
- Create a remote connection with the CLI with the influx remote command like so:
influx remote create \ --name example-remote-name \ --remote-url https://us-west-2-1.aws.cloud2.influxdata.com \ --remote-api-token mYsuP3r5Ecr37t0k3n \ --remote-org-id 00xoXXoxXX00
- Add any desired downsampling or transformation tasks and write the data to a new bucket in OSS.
- Create a replication stream from your OSS bucket to a bucket in InfluxDB Cloud with the influx replication command like so:
influx replication create \ --name example-replication-stream-name \ --remote-id 00xoXXXo0X0x \ --local-bucket-id Xxxo00Xx000o \ --remote-bucket-id 0xXXx00oooXx
The bottom line
I hope this article inspires you to explore InfluxDB, Flux and Edge Data Replication. To learn more about these topics, check our upcoming talk at the Data + AI Summit conference where we discuss these topics in detail or connect with me on our community Slack channel.