TL;DR InfluxDB Tech Tips: Converting InfluxQL Queries to Flux Queries
By
Anais Dotis-Georgiou /
Use Cases, Developer, Product
Mar 31, 2022
Navigate to:
If you’re a 1.x user of InfluxDB, you’re most likely more familiar with InfluxQL than you are with Flux. To gain a deep understanding of Flux, it’s important to understand:
- The basics of the language
- Annotated CSV, the output format of Flux queries
However, you can still use Flux without studying those topics. In this TL;DR, we’ll convert common InfluxQL queries into Flux and identify patterns between the two languages to help you get started using Flux more easily if you come from a InfluxQL or SQL background.
The fundamentals
Almost every user will start by writing an InfluxQL query like:
SELECT "fieldKey1","fieldKey2" FROM "db"."measurement1" WHERE time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY <tagKey1>
This is the equivalent Flux query:
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now())
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1" or r._field == "fieldKey2")
|> yield(name: "my first flux query")
Let’s take a look at how these two queries map to each other:
If you’re new to Flux, one of the first things you’ll notice is the pipe-forward operator, |>
. Flux is a declarative language and is designed to transform data by pipe-forwarding the data between functions where each function transforms the data in order. You can see the code starts with a from() function and then pipe forwards the results of that from() function to range() function. You specify your time range with the range() function instead of the WHERE clause. Finally, you don’t need to group by tags because tags are automatically grouped in Flux. However, you could include a redundant group() function to group by that tag as shown.
The yield() function is what you use to specify what parts of your Flux query should return data and where you assign a name to those returned results. If you’re only querying for one stream of data as we are above, then including a yield() function is optional. The result will be named “_result” by default if no yield() function is applied. If you wanted to query for data from multiple buckets simultaneously, you would have to use two yield functions and name each result something different. This is similar to using the AS clause. For example:
from(bucket: "db1")
// specify start:0 to query from all time. Equivalent to SELECT * from db1. Use just as cautiously.
|> range(start: 0)
|> yield(name: "db1 results")
from(bucket: "db2")
|> range(start: 0)
|> yield(name: "db2 results")
SELECT statement → from() and filter() functions
To select fields, tags and measurements with Flux, use the from() and filter() function instead of the SELECT statement. The from() function is where you specify what bucket you want to grab data from. In Flux, a bucket refers to your database and retention policy. You select fields, tags, and measurements with the filter() function. You can either use an and statement to select different fields, tags, and measurements or you can use multiple filter() functions. The following two Flux queries are equivalent:
-
|> filter(fn: (r) => r._measurement == "measurement1") |> filter(fn: (r) => r._field == "tagKey1") |> filter(fn: (r) => r._field == "fieldKey1")
-
|> filter(fn: (r) => r._measurement == "measurement1" and r._field == "tagKey1" and r._field == "fieldKey1")
Make sure not to accidentally use an and statement instead of an or statement because you’ll return no results. The following query is trying to return fields that are named both “fieldKey1” and “fieldKey2” at the same time. This is impossible, so you’ll return no results. Don’t do the following:
|> filter(fn: (r) => r._field == "fieldKey1" and r._field == "fieldKey1" )
Do use an or statement to select for multiple measurements, tags, or fields like so:
|> filter(fn: (r) => r._field == "fieldKey1" or r._field == "fieldKey1" )
SELECT multiple aggregations → multiple yield() functions
If you’re comfortable with InfluxQL, then you’ve probably selected multiple aggregation before. Maybe you want to select the min and max of different fields like so:
SELECT max("fieldKey1") AS "max", min("fieldKey2") AS "min" FROM "db"."measurement1" WHERE time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY <tagKey1>
You can do the same with Flux, but it requires more lines of codes:
//querying your common data once and referencing it with a variable
data = from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
data
|> filter(fn: (r) => r._field == "fieldKey1")
|> max()
|> yield(name: "max")
data
|> filter(fn: (r) => r._field == "fieldKey2")
|> min()
|> yield(name: "min")
Let’s take a look at how these two queries map to each other:
An aside: One of the first things InfluxQL users who are learning Flux tell me is that Flux is necessarily complicated. I hear things like “what used to be one line of InfluxQL code is now 10 lines of Flux”. I felt that same pain point when I started learning Flux, but now I far prefer Flux over InfluxQL. “Simple” InfluxQL queries might appear more complicated in Flux, but the degree of that complexity pales in comparison to the degree of data transformation and analytics power that Flux provides. You can build custom functions, pivot data, join data, write custom flapping algorithms, map across data, fetch and work with JSON, and so much more.
When writing Flux, it’s important to start thinking about data structure, hierarchy, and shape in your head. This is probably the biggest shift in thinking when it comes to Flux and InfluxQL. For example in the query above, you need to think about the common denominator in your query. In this case that’s querying for data from “db” and “measurement1”. Store this data in a variable and reference the variable. Now you can filter further for the specific fields that you want to find the max and min of (or “fieldKey1” and “fieldKey2”, respectively). Finally use the yield() function to name each result as you might have done with the AS clause in InfluxQL to yield two results simultaneously – both the max(“fieldKey1”) and min(“fieldkey2”).
You could have also executed the following query:
//querying your data multiple times
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> max()
|> yield(name: "max")
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> max()
|> yield(name: "max")
You would return the same results with this latter query, but this query could be less efficient. Efficiency depends on the shape of your data. The advantage to storing your base query in a variable and then referencing that variable is that this approach prevents you from performing redundant queries, or querying your data multiple times as illustrated in the latter. Using variables to store your base query is especially ideal for when you filter for tags in your base query that limit your returned data to a particular subset of data. If you have a lot of fields in your measurement then querying for all of the data and storing the data in a variable first, like in the former solution, could be less efficient. This is because by filtering for just one measurement, you could be querying for a lot of data if there are hundreds of fields in that measurement as opposed to just two fields. The best practice is to store your base query in a variable if that base query returns only a subset of data, which will be then further filtered for a subset of that base.
An aside: Using reduce() to return multiple aggregations simultaneously
You can also use the reduce() function to return multiple aggregations simultaneously. Using reduce() is more of an advanced topic, but it’s worth mentioning this solution here:
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1" or r._field == "fieldKey2")
|> reduce(
identity: {count: 0.0, sum: 0.0, min: 0.0, max: 0.0, mean: 0.0},
fn: (r, accumulator) => ({
count: accumulator.count + 1.0,
sum: r._value + accumulator.sum,
min: if accumulator.count == 0.0 then r._value else if r._value < accumulator.min then r._value else accumulator.min,
max: if accumulator.count == 0.0 then r._value else if r._value > accumulator.max then r._value else accumulator.max,
mean: (r._value + accumulator.sum) / (accumulator.count + 1.0)
})
)
|> yield(name: "count_sum_min_max_mean_reduce")
This will produce 4 additional columns in your data with the count, sum, min, max, and mean values for both of your fields. The only disadvantage to this approach is that the InfluxDB UI can only visualize one column at a time. If you want to visualize these aggregates simultaneously, you’ll need to use the approach discussed above.
WHERE clause → range() or filter() functions
Hopefully, you’ve probably already noticed some overlap between the WHERE clause and the range() or filter() functions. You use the range() function to describe the time range for where you want to select your data from. The filter() function supports conditional statements on your tags, measurements, fields, or field values.
Maybe you want to select for field values greater than a certain value like so:
SELECT "fieldKey1" FROM "<db>"."<measurement1>" WHERE "fieldKey1" > 8 and time >= '2022-02-22T20:22:02Z' and time < now()
You can do the same with Flux, but use the range() and filter() function instead for the time and field value conditions, respectively:
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._value > 8)
GROUP BY clause → group() or aggregateWindow() function
Tags are automatically grouped with Flux so there usually isn’t a need to GROUP BY tag as with InfluxQL unless you’ve ungrouped your data. In other words, the following InfluxQL query:
SELECT "fieldKey1" FROM "db"."measurement1"
WHERE "tagKey1" == 'tagValue1' OR "tagKey1" == 'tagValue2' and time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY
"tagKey1"
…is equivalent to the following Flux Query:
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tagKey1 == "tagValue1" or r.tagKey1 == "tagValue2")
|> filter(fn: (r) => r._field == "fieldKey1")
Alternatively, if you don’t use the GROUP BY clause in InfluxQL, you’ll have to ungroup your data in Flux by using an empty group() function. The following InfluxQL query:
SELECT "fieldKey1" FROM "db"."measurement1"
WHERE "tagKey1" == 'tagValue1' OR "tagKey1" == 'tagValue2' and time >= '2022-02-22T20:22:02Z' and time < now()
…is equivalent to the following Flux Query:
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tagKey1 == "tagValue1" or r.tagKey1 == "tagValue2")
|> filter(fn: (r) => r._field == "fieldKey1")
|> group()
Grouping by time
When grouping by time in InfluxQL you’re almost always applying an aggregate or selector function to your data. Most likely you’ll be performing something like:
SELECT max("fieldKey1") FROM "db"."measurement1"
WHERE "tagKey1" == 'tagValue1' OR "tagKey1" == 'tagValue2' and time >= '2022-02-22T20:22:02Z' and time < now() GROUP BY time(1d)
You can do the same with Flux, but with the aggregateWindow() function instead:
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r.tagKey1 == "tagValue1" or r.tagKey1 == "tagValue2")
|> filter(fn: (r) => r._field == "fieldKey1")
|> aggregateWindow(
every: 1d,
fn: max)
The aggregateWindow() function allows you to apply a selector (like min, max, median, etc.) or aggregate function (mean, count, sum, etc.) to your data after grouping by time by the duration specified with the every parameter. You can also construct any custom function you desire and pass it into the fn parameter. Take a look at Top 5 Hurdles for Intermediate Flux Users and Resources for Optimizing Flux to learn more about creating your own custom utility function with Flux and using it with the aggregateWindow() function.
INTO clause → to() function
The following InfluxQL query is used to write the results of one query into another measurement with in another database:
SELECT "fieldKey1" FROM "db"."measurement1" INTO "destination_db"."measurement1"
WHERE "fieldKey1" > 8 and time >= '2022-02-22T20:22:02Z' and time < now()
You can do the same with Flux, but with the to() function instead:
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._value > 8)
|> to(bucket: "destination_db")
Keep in mind that if you want to move large amounts of data, you need to make sure to do so with sequential writes as you would with InfluxQL. If you want to rename measurements before writing the data to another database, you would do the following in InfluxQL:
SELECT * FROM "db"."measurement1" INTO "destination_db"."new_measurement1"
WHERE "fieldKey1" > 8 and time >= '2022-02-22T20:22:02Z' and time < now()
You can do the same in Flux with the set() function:
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._value > 8)
|> set(key: "_measurement",value: "new_measurement1")
|> to(bucket: "destination_db")
You can’t rename tags with InfluxQL, but you can with Flux. If you wanted to rename your tags or field keys, you could use the set function multiple times or you could use the map() function with conditional query logic to conditionally transform your data:
from(bucket: "db")
|> range(start: "2022-02-22T20:22:02Z", stop: now()
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> filter(fn: (r) => r._tagKey1 == "tagValue1")
|> filter(fn: (r) => r._tagKey1 == "tagValue2")
|> filter(fn: (r) => r._value > 8)
|> map(
fn: (r) => ({r with
_measurement: "new_measurement1",
tagKey1: if r.tagkey1 == "tagValue1" then "new_tagValue1" else "new_tagValue2""
})
)
|> rename(columns: {tagKey1: "new_tagKey1"})
|> to(bucket: "destination_db")
Here we are using the map function to rename the measurement from “measurement1” to “new_measurement1” and the tag values from “tagValue1” and “tagValue2” to “new_tagValue1” and “new_tagValue2”. Finally we use the rename() function to rename the tag key from “tagKey1” to “new_tagKey1”. The map() function is an incredibly powerful function when it comes to using Flux instead of InfluxQL. You can use it to transform each record of the data that is piped into it by applying a function to it.
A collection of InfluxQL translations
Sometimes the best way to learn something new is simply through repetition and examples. This section is just a list of InfluxQL to Flux query translations to help you understand the similarities and differences between the two languages.
InfluxQL Query | Flux Query |
SELECT LAST(fieldKey1) from measurement1 where time < '2021-08-17' |
from(bucket: "my-bucket")
|> range(start: 0, stop: 2021-08-17T12:00:00Z)
|> filter(fn: (r) => r._measurement == "measurement1")
|> filter(fn: (r) => r._field == "fieldKey1")
|> last() |
SELECT last("fieldKey1")*60+last("fieldKey2") FROM "measurement1" WHERE time > now() -5m GROUP BY time(1h) fill(null) |
import "influxdata/influxdb/schema"from(bucket:"my-bucket")
|>range(start: -5m, stop: now())
|>filter(fn: (r) => r._measurement == "measurement1")
|>aggregateWindow(every:1h fn: last)
|> schema.fieldsAsCols()
|> map(fn: (r) => ({ r with _value: r.fieldKey1 * 60 + r.fieldKey2})) |
SELECT last("fieldKey1") - first("fieldKey2") FROM "measurement1" WHERE time < now() - 1d AND "tagKey1"='measurement1' |
With the assumption that the data is always increasing:
from(bucket: "my-bucket")
|> range(start: -1d)
|> filter(fn: (r) => r["_measurement"] == "measurement1")
|> filter(fn: (r) => r["_field"] == "fieldKey1" or r["_field"] == "fieldKey2")
|> spread()
Alternatively:
data = from(bucket: "my-bucket")
|> range(start: -1d)
|> filter(fn: (r) => r["_measurement"] == "measurement1")
|> filter(fn: (r) => r["_field"] == "fieldKey1" or r["_field"] == "fieldKey2")
last = data |> last()
first = data |> first()
union(tables: [last, first]) |> sort(columns: ["_time"], desc: true)
|> difference()
|
While I won’t go into detail about all of the functions used in the Flux column from the table above, I hope that you can naturally draw some conclusions about how the Flux scripts might be working. I also encourage you to try the functions out for yourself by setting up a free tier account and writing and querying some sample data to and from your InfluxDB Cloud instance.
Pro tip: Use multiple yield() statements between lines of the queries above to see how each line of Flux is transforming your data.
Conclusion: A cost benefit analysis
Of course you’ve noticed that Flux queries are generally longer than InfluxQL queries. They might even seem more complicated at first – although I hope this post helps you overcome the barrier to Flux adoption. However, Flux is vastly more powerful than InfluxQL. With Flux you can:
- Perform joins and unions
- Manipulate timestamps
- Filter with conditional logic
- Conditionally transform data
- Perform math across measurements
- Write median absolute deviation anomaly detection algorithms or Naive Bayes classifiers
- Have more flexibility and functionality than TICK scripts with a far improved developer experience
- Transform data on a schedule with tasks
- Create alerts and notifications
- And much more
If you want to learn more about Flux, I encourage you to read the following two sections from the Time to Awesome book: Introduction to Flux and Querying and Data Transformations to guide you through some of the most important and useful functions and concepts in the Flux langage. I also want to invite you to look at the following free courses offered as a part of InfluxDB University:
If you are using Flux and need help, please ask for some in our community site or Slack channel. If you’re developing a cool IoT application on top of InfluxDB, we’d love to hear about it, so make sure to share your story! Additionally, please share your thoughts, concerns, or questions in the comments section. We’d love to get your feedback and help you with any problems you run into!