Optimizing Queries in InfluxDB 3.0 Using Progressive Evaluation
By
Nga Tran /
Reid Kaufmann /
Developer
Nov 14, 2024
Navigate to:
In a previous post, we described the technique that makes the ”most recent values” queries hundreds of times faster and has benefited many of our customers. The idea behind this technique is to progressively evaluate time-organized files until we reach the most recent values. Since then, we have received questions like “What queries support progressive evaluation?” “How do we verify that a query is progressively evaluated?” “Are there certain file organizations that progressive evaluation won’t help?” This blog post answers those questions.
Queries that support progressive evaluation
Currently, this technique is only available for SQL queries; it is not yet applicable to InfluxQL queries. Your SQL query must have the clause ORDER BY time DESC
(or ASC
). In addition, some other limitations include expressions, aliases (including AT TIME ZONE
), and aggregations on the SELECT
clause. In other words, everything in the SELECT
clause must be simple table columns.
Examples of supported queries
SELECT host, temperature
FROM machine
WHERE time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time ASC;
SELECT host, temperature
FROM machine
WHERE time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time DESC
LIMIT 10;
Examples of unsupported queries
These queries are not optimized using progressive evaluation yet. We hope to lift the restrictions in a future release.
Query with an expression (temperature + 2
)
SELECT host, temperature + 2
FROM machine
WHERE time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time ASC;
Query with an alias (as host_name
)
SELECT host as host_name, time
FROM machine
WHERE time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time ASC;
Query that specifies time zone and then alias (AT TIME ZONE ‘Europe/Oslo’ as time)
SELECT host, time AT TIME ZONE ‘Europe/Oslo’ as time
FROM machine
WHERE time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time DESC
LIMIT 10;
Query with an aggregate (min(temperature))
SELECT min(temperature)
FROM machine
WHERE time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time DESC
LIMIT 10;
Signal that a query is evaluated progressively
If ProgressiveEvalExec
is in your query plan, it is optimized using progressive evaluation (see this post for how to get and read the query plan). However, the absence of progressive evaluation does not mean your query will run slowly. We only apply it when it actually benefits your query.
File organizations that benefit from progressive evaluation
To understand when progressive evaluation benefits your query, we first need to understand data organization, which is one of the most important factors affecting query performance.
Data organization in InfluxDB 3.0
Below is a brief description of how data is organized in InfluxDB 3.0 (see our system architecture and data compaction for the complete data cycle, how data is compacted, and how it benefits query performance).
As a time series database, data from the table machine
always includes a time
column representing the time of an event, such as temperature at 9:30 am UTC. Figure 1 shows three different stages of data organization. Each rectangle in the figure illustrates a chunk of data. C represents data that is not yet persisted and usually includes the most recent values. L represents the level of different persisted files. L0 is used for files of newly ingested and persisted data. They are usually small and contain recent values. However, L0 files of backfilled data can be as old as desired. L1 files store the results of compacting many small L0 files. We also have L2 files but they are beyond the scope of this topic and do not change how progressive evaluation works.
Figure 1 (borrowed from the compaction blog post): Four stages of data organization after two rounds of compaction.
In stage 1, all data are in small L0 files. In stage 2, data in stage 1 has been compacted to larger L1 files, while some new data are persisted in a few small L0 files and some are in a not-yet-persisted chunk (C). If you ingest new data most of the time, your data organization mostly looks like stage 2 or stage 3. However, if you backfill data, your data organization can combine stages 1 and stage 2 or stage 3. Thus, depending on how you ingest data and how fast the compactor keeps up with your ingest workload, there may be few or many overlapped and small files. Stage 3 is what we call “well-compacted data” and is usually best for query performance. The goal of the compactor is to have most of your data in stage 3. Avoiding frequent backfilling of data also helps keep your data well-compacted.
Application of progressive evaluation in various overlap scenarios
Let’s go over examples of querying different data sets. Figure 2 shows the data organization of the table machine
. We use F for the prefix name of the files, which can be either L0 or L1. Files F1, F2, F6, and F7 do not time-overlap with any files. Files F3, F4, and F5 overlap with each other, and file F8 overlaps with F9, which overlaps with chunk C.
Figure 2: Data organization of table machine
Reading Non-Overlapped Files Only
If your query asks for latest data before t1:
SELECT temperature
FROM machine
WHERE time < t1 and region = ‘US’
ORDER BY time DESC LIMIT 1;
The query will be optimized with progressive evaluation because the files needed, F1 and F2, do not overlap. The simplified query plan is as follows:
ProgressiveEvalExec: fetch=1
SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[true]
ParquetExec: file_groups={2 groups: [F2], [F1]}
A few essential properties in the query plan are needed for ProgressiveEvalExec
to work correctly:
- Files in
ParquetExec
are sorted on time descending F2, F1. preserve_partitioning=[true]
means data of 2 file groups, [F2] and [F1], are sorted in their own group and won’t be merged. This is important for us to be able to fetch data from F2 before fetching F1.fetch=1
means the query will stop running as soon as it gets a row that meets the query filters. In other words, if there is at least one row in file F2 withUS
as a region, F1 will never be read.
The same query sorted on time ascending will look like this:
ProgressiveEvalExec: fetch=1
SortExec: TopK(fetch=1), expr=[time ASC], preserve_partitioning=[true]
ParquetExec: file_groups={2 groups: [F1], [F2]}
You will get a similar query plan if your query reads data between t2 and t3 that include only non-overlapped files.
Reading Overlapped Files Only
Now let’s look at the query reading data between t1 and t2.
SELECT temperature
FROM machine
WHERE time < t2 and time > t1 and region = ‘US’
ORDER BY time DESC LIMIT 1;
Because all three needed files, F3, F4, and F5, overlap, we need to merge data for deduplication and they cannot be evaluated one by one progressively. The simplified query plan will look like this:
SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[false]
DeduplicateExec:
SortPreservingMergeExec:
ParquetExec: file_groups={3 groups: [F4], [F3], [F5]}
Without ProgressiveEvalExec
, the files in ParquetExec
can be in any order and group because the groups will be read in parallel and merged into one stream for deduplication.
Similarly, progressive evaluation won’t be applied if your query reads overlapped data after t3.
Reading a Mixture of Non-Overlapped and Overlapped Files
When your query reads a mixture of non-overlapped and overlapped data, progressive evaluation is applied, and the data is split and grouped accordingly. Let’s look at a query that reads data before t2.
SELECT temperature
FROM machine
WHERE time < t2 and region = ‘US’
ORDER BY time DESC LIMIT 1;
The simplified query plan will look like this:
ProgressiveEvalExec: fetch=1
SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[false]
DeduplicateExec:
SortPreservingMergeExec:
ParquetExec: file_groups={3 groups: [F4], [F3], [F5]}
SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[true]
ParquetExec: file_groups={2 groups: [F2], [F1]}
Even though files F3, F4, and F5 overlap, they do not overlap with F1 and F2 and contain more recent data. Therefore, the subplan of F3, F4, and F5 is progressively evaluated with F2 and F1. Note that the number of input streams into ProgressiveEvalExec
is three: one for the merge of F3, F4, and F5, one for F2, and one for F1. These three streams are evaluated progressively in that order.
If the query is sorted ascending, the progressive order will be opposite:
ProgressiveEvalExec: fetch=1
SortExec: TopK(fetch=1), expr=[time ASC], preserve_partitioning=[true]
ParquetExec: file_groups={2 groups: [F1], [F2]}
SortExec: TopK(fetch=1), expr=[time ASC], preserve_partitioning=[false]
DeduplicateExec:
SortPreservingMergeExec:
ParquetExec: file_groups={3 groups: [F4], [F3], [F5]}
Three streams still go into ProgressiveEvalExec
but in the opposite order: F1, F2, and the F3, F4, and F5 merge.
Similarly, let’s read all data:
SELECT temperature
FROM machine
WHERE time < now and region = ‘US’
ORDER BY time DESC LIMIT 1;
The query plan is getting more complicated but follows the same rules: subplans of overlapped data will be put in the right order and progressively evaluated with non-overlapped files.
ProgressiveEvalExec: fetch=1
SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[false]
DeduplicateExec:
SortPreservingMergeExec:
SortExec:
RecordBatchExec: {C}
ParquetExec: file_groups={2 groups: [F8], [F9]}
SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[true]
ParquetExec: file_groups={2 groups: [F7], [F6]}
SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[false]
DeduplicateExec:
SortPreservingMergeExec:
ParquetExec: file_groups={3 groups: [F4], [F3], [F5]}
SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[true]
ParquetExec: file_groups={2 groups: [F2], [F1]}
There are six non-overlapped streams of data progressively evaluated by ProgressiveEvalExec
in this order:
- The merge of C, F8 and F9
- F7
- F6
- The merge of F3, F4 and F5
- F2
- F1
If your query orders data on time ascending, you will have a similar query plan but in the opposite order.
Cache implications: progressive evaluation may help other queries’ latency
For workloads highly dependent on cached files to get the lowest latency possible but running near the cache limit, progressive evaluation can make a significant performance difference. Obviously, not traversing extra parquet files reduces CPU time for the optimized query. Depending on how excessive the time bound is compared to the bounds of the LIMIT, the fact that the database may bring far fewer files into cache means other queries’ files need not be evicted, potentially reducing the latency of other queries on the system.
Summing up
If your query selects pure table columns and orders data on time, InfluxDB 3.0 will automatically use progressive evaluation to improve your query performance, even if a subset of the query data contains non-overlapped files. Progressive evaluation cannot be used when all your data overlaps.