Optimizing Queries in InfluxDB 3.0 Using Progressive Evaluation

Navigate to:

In a previous post, we described the technique that makes the ”most recent values” queries hundreds of times faster and has benefited many of our customers. The idea behind this technique is to progressively evaluate time-organized files until we reach the most recent values. Since then, we have received questions like “What queries support progressive evaluation?” “How do we verify that a query is progressively evaluated?” “Are there certain file organizations that progressive evaluation won’t help?” This blog post answers those questions.

Queries that support progressive evaluation

Currently, this technique is only available for SQL queries; it is not yet applicable to InfluxQL queries. Your SQL query must have the clause ORDER BY time DESC (or ASC). In addition, some other limitations include expressions, aliases (including AT TIME ZONE), and aggregations on the SELECT clause. In other words, everything in the SELECT clause must be simple table columns.

Examples of supported queries

SELECT host, temperature 
FROM   machine 
WHERE  time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time ASC;

SELECT host, temperature 
FROM   machine 
WHERE  time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time DESC
LIMIT  10;

Examples of unsupported queries

These queries are not optimized using progressive evaluation yet. We hope to lift the restrictions in a future release.

Query with an expression (temperature + 2)

SELECT host, temperature + 2
FROM   machine 
WHERE  time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time ASC;

Query with an alias (as host_name)

SELECT host as host_name, time 
FROM   machine 
WHERE  time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time ASC;

Query that specifies time zone and then alias (AT TIME ZONE ‘Europe/Oslo’ as time)

SELECT host, time AT TIME ZONE ‘Europe/Oslo’ as time 
FROM   machine 
WHERE  time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time DESC
LIMIT  10;

Query with an aggregate (min(temperature))

SELECT min(temperature)
FROM   machine 
WHERE  time > now - interval ‘1 day’ and region = ‘US’
ORDER BY time DESC
LIMIT  10;

Signal that a query is evaluated progressively

If ProgressiveEvalExec is in your query plan, it is optimized using progressive evaluation (see this post for how to get and read the query plan). However, the absence of progressive evaluation does not mean your query will run slowly. We only apply it when it actually benefits your query.

File organizations that benefit from progressive evaluation

To understand when progressive evaluation benefits your query, we first need to understand data organization, which is one of the most important factors affecting query performance.

Data organization in InfluxDB 3.0

Below is a brief description of how data is organized in InfluxDB 3.0 (see our system architecture and data compaction for the complete data cycle, how data is compacted, and how it benefits query performance).

As a time series database, data from the table machine always includes a time column representing the time of an event, such as temperature at 9:30 am UTC. Figure 1 shows three different stages of data organization. Each rectangle in the figure illustrates a chunk of data. C represents data that is not yet persisted and usually includes the most recent values. L represents the level of different persisted files. L0 is used for files of newly ingested and persisted data. They are usually small and contain recent values. However, L0 files of backfilled data can be as old as desired. L1 files store the results of compacting many small L0 files. We also have L2 files but they are beyond the scope of this topic and do not change how progressive evaluation works.

Data organization in InfluxDB 3-0

Figure 1 (borrowed from the compaction blog post): Four stages of data organization after two rounds of compaction.

In stage 1, all data are in small L0 files. In stage 2, data in stage 1 has been compacted to larger L1 files, while some new data are persisted in a few small L0 files and some are in a not-yet-persisted chunk (C). If you ingest new data most of the time, your data organization mostly looks like stage 2 or stage 3. However, if you backfill data, your data organization can combine stages 1 and stage 2 or stage 3. Thus, depending on how you ingest data and how fast the compactor keeps up with your ingest workload, there may be few or many overlapped and small files. Stage 3 is what we call “well-compacted data” and is usually best for query performance. The goal of the compactor is to have most of your data in stage 3. Avoiding frequent backfilling of data also helps keep your data well-compacted.

Application of progressive evaluation in various overlap scenarios

Let’s go over examples of querying different data sets. Figure 2 shows the data organization of the table machine. We use F for the prefix name of the files, which can be either L0 or L1. Files F1, F2, F6, and F7 do not time-overlap with any files. Files F3, F4, and F5 overlap with each other, and file F8 overlaps with F9, which overlaps with chunk C.

Data organization of table machine

Figure 2: Data organization of table machine

Reading Non-Overlapped Files Only

If your query asks for latest data before t1:

SELECT temperature 
FROM   machine 
WHERE  time < t1 and region = ‘US’
ORDER BY time DESC LIMIT  1;

The query will be optimized with progressive evaluation because the files needed, F1 and F2, do not overlap. The simplified query plan is as follows:

ProgressiveEvalExec: fetch=1
    SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[true]   
         ParquetExec: file_groups={2 groups: [F2], [F1]}

A few essential properties in the query plan are needed for ProgressiveEvalExec to work correctly:

  1. Files in ParquetExec are sorted on time descending F2, F1.
  2. preserve_partitioning=[true] means data of 2 file groups, [F2] and [F1], are sorted in their own group and won’t be merged. This is important for us to be able to fetch data from F2 before fetching F1.
  3. fetch=1 means the query will stop running as soon as it gets a row that meets the query filters. In other words, if there is at least one row in file F2 with US as a region, F1 will never be read.

The same query sorted on time ascending will look like this:

ProgressiveEvalExec: fetch=1
    SortExec: TopK(fetch=1), expr=[time ASC], preserve_partitioning=[true]   
         ParquetExec: file_groups={2 groups: [F1], [F2]}

You will get a similar query plan if your query reads data between t2 and t3 that include only non-overlapped files.

Reading Overlapped Files Only

Now let’s look at the query reading data between t1 and t2.

SELECT temperature 
FROM   machine 
WHERE  time < t2 and time > t1 and region = ‘US’ 
ORDER BY time DESC LIMIT  1;

Because all three needed files, F3, F4, and F5, overlap, we need to merge data for deduplication and they cannot be evaluated one by one progressively. The simplified query plan will look like this:

SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[false]
  DeduplicateExec:
     SortPreservingMergeExec:
         ParquetExec: file_groups={3 groups: [F4], [F3], [F5]}

Without ProgressiveEvalExec, the files in ParquetExec can be in any order and group because the groups will be read in parallel and merged into one stream for deduplication.

Similarly, progressive evaluation won’t be applied if your query reads overlapped data after t3.

Reading a Mixture of Non-Overlapped and Overlapped Files

When your query reads a mixture of non-overlapped and overlapped data, progressive evaluation is applied, and the data is split and grouped accordingly. Let’s look at a query that reads data before t2.

SELECT temperature 
FROM   machine 
WHERE  time < t2 and region = ‘US’ 
ORDER BY time DESC LIMIT  1;

The simplified query plan will look like this:

ProgressiveEvalExec: fetch=1
    SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[false]
       DeduplicateExec:
          SortPreservingMergeExec:
              ParquetExec: file_groups={3 groups: [F4], [F3], [F5]}
    SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[true]   
         ParquetExec: file_groups={2 groups: [F2], [F1]}

Even though files F3, F4, and F5 overlap, they do not overlap with F1 and F2 and contain more recent data. Therefore, the subplan of F3, F4, and F5 is progressively evaluated with F2 and F1. Note that the number of input streams into ProgressiveEvalExec is three: one for the merge of F3, F4, and F5, one for F2, and one for F1. These three streams are evaluated progressively in that order.

If the query is sorted ascending, the progressive order will be opposite:

ProgressiveEvalExec: fetch=1
    SortExec: TopK(fetch=1), expr=[time ASC], preserve_partitioning=[true]   
         ParquetExec: file_groups={2 groups: [F1], [F2]}
    SortExec: TopK(fetch=1), expr=[time ASC], preserve_partitioning=[false]
       DeduplicateExec:
          SortPreservingMergeExec:
              ParquetExec: file_groups={3 groups: [F4], [F3], [F5]}

Three streams still go into ProgressiveEvalExec but in the opposite order: F1, F2, and the F3, F4, and F5 merge.

Similarly, let’s read all data:

SELECT temperature 
FROM   machine 
WHERE  time < now and region = ‘US’ 
ORDER BY time DESC LIMIT  1;

The query plan is getting more complicated but follows the same rules: subplans of overlapped data will be put in the right order and progressively evaluated with non-overlapped files.

ProgressiveEvalExec: fetch=1
    SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[false]
       DeduplicateExec:
          SortPreservingMergeExec:
              SortExec:
                 RecordBatchExec: {C}
              ParquetExec: file_groups={2 groups: [F8], [F9]}
    SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[true]   
         ParquetExec: file_groups={2 groups: [F7], [F6]}
    SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[false]
       DeduplicateExec:
          SortPreservingMergeExec:
              ParquetExec: file_groups={3 groups: [F4], [F3], [F5]}
    SortExec: TopK(fetch=1), expr=[time DESC], preserve_partitioning=[true]   
         ParquetExec: file_groups={2 groups: [F2], [F1]}

There are six non-overlapped streams of data progressively evaluated by ProgressiveEvalExec in this order:

  1. The merge of C, F8 and F9
  2. F7
  3. F6
  4. The merge of F3, F4 and F5
  5. F2
  6. F1

If your query orders data on time ascending, you will have a similar query plan but in the opposite order.

Cache implications: progressive evaluation may help other queries’ latency

For workloads highly dependent on cached files to get the lowest latency possible but running near the cache limit, progressive evaluation can make a significant performance difference. Obviously, not traversing extra parquet files reduces CPU time for the optimized query. Depending on how excessive the time bound is compared to the bounds of the LIMIT, the fact that the database may bring far fewer files into cache means other queries’ files need not be evicted, potentially reducing the latency of other queries on the system.

Summing up

If your query selects pure table columns and orders data on time, InfluxDB 3.0 will automatically use progressive evaluation to improve your query performance, even if a subset of the query data contains non-overlapped files. Progressive evaluation cannot be used when all your data overlaps.