Flux Aggregation in InfluxDB: Now or Later
By
Michael Hall /
Product, Use Cases, Developer
Sep 10, 2021
Navigate to:
Aggregations are a powerful tool when processing large amounts of time series data. In fact, most of the time you’re going to care more about the min
, max
, mean
, count
or last
values of your dataset than you will about the raw values you’re collecting.
Knowing this, InfluxDB and the Flux language make it as easy as possible to run these aggregations, whenever and wherever you need to, and sometimes that leads people to running them in ways that aren’t as efficient as they could be. Here are some ways to ensure your aggregation query runs as fast as possible.
Don't aggregate too early
As great as aggregate functions are and as much as you want to use them, be careful not to use them too soon. Oftentimes, we’ll see somebody with a query like this:
from(bucket: "myBucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> filter(fn: (r) => r["host"] == "myHost")
|> filter(fn: (r) => r["cpu"] == "cpu-total")
|> yield(name: "mean")
While this query will return the exact results you want, in this case the average total CPU for a specific host, it’s also doing a lot more work than it needs to.
Because the aggregateWindow
function is being called before the filter
calls on host and cpu, InfluxDB ends up calculating the average for all host and cpu values first, and then dropping the raw data after it’s done all that hard work.
Instead, perform all the filtering you can before using an aggregate function. This reduces the total amount of data going into those calculations, which will give your query a big speed boost especially on large data sets.
from(bucket: "myBucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["host"] == "myHost")
|> filter(fn: (r) => r["cpu"] == "cpu-total")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")
You can still filter data after calling an aggregate, which is useful when you want to filter on the results of the aggregation. But for filters that only check the raw data in your bucket, it’s always best to apply them first. In fact, that’s what the InfluxDB UI’s query builder does automatically!
Don't aggregate too late either
While you don’t want to get ahead of your data when it comes to aggregation, it’s also possible to call them too late in your query, resulting in a slower response time. For example:
from(bucket: "myBucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["host"] == "myHost")
|> filter(fn: (r) => r["cpu"] == "cpu-total")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> last(column: "_time")
Again this will do what you want: give you the last value of every field in the matching series presented as a single row with fields as columns. However this query will have InfluxDB doing the pivot
on the entire data set only to once again throw away that hard work immediately afterwards.
Here it is better to call the last
aggregate function before pivoting the data, to reduce the amount of data that has to be transformed to just the data you want in the end.
from(bucket: "myBucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["host"] == "myHost")
|> filter(fn: (r) => r["cpu"] == "cpu-total")
|> last(column: "_time")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
Pushdown opportunities
When your Flux query is executed, the first thing it does is read your measurement data from the storage layer of InfluxDB before performing your calculations and transformations on that data in memory. Some of those steps can be performed by the storage layer instead, meaning less data being read into memory to begin with.
When this happens we call it “pushing down” those steps to the layer below Flux, the storage engine layer, and query patterns that can do this are called pushdown patterns. Not only do these result in less data being read into memory, they reduce the amount of work the Flux engine has to do.
The first and most basic pushdown pattern is the common from() |> range() |> filter()
that most Flux queries use. If you can put your aggregate function immediately after one of these, some of them can also be pushed down to the storage layer. Let’s take a look at our last query:
from(bucket: "myBucket")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "cpu")
|> filter(fn: (r) => r["host"] == "myHost")
|> filter(fn: (r) => r["cpu"] == "cpu-total")
|> last(column: "_time")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
Because we put the last
function immediately after a from() |> range() |> filter()
pattern, it will also be pushed down to the storage layer. This means the only data that will actually be read into the Flux runtime is the last values of our cpu measurement for the selected series, and the only step executed in memory is the pivot step at the end. If we had left it calling last
after the pivot
, not only would all of that extra data have to be read into memory, but both the pivot
and last
functions would have been executed in the Flux layer.
Putting it all together
So to recap, aggregation is great and Flux is really good at it, and you can make it work better for you by doing as much filtering as possible before applying the aggregation, waiting on performing any transformations of the data until after the aggregation, and keeping an eye out for opportunities to use pushdown patterns that let the storage layer do more of the heavy lifting.
Further reading
- Top 5 Hurdles for Flux Beginners and Resources for Learning to Use Flux: This post describes common hurdles for Flux beginners and how to tackle them by using the InfluxDB UI, understanding Annotated CSV, and more.
- Top 5 Hurdles for Intermediate Flux Users and Resources for Optimizing Flux: This post describes common hurdles for intermediate and advanced Flux users while providing more detail on pushdown patterns, how the Flux engine works, and more.
- TL;DR InfluxDB Tech Tips – Aggregating across Tags or Fields and Ungrouping: This post describes how grouping affects your table stream and how to aggregate across tags and fields.