Downsampling to InfluxDB Cloud Dedicated with Java Flight SQL Client
By
Subashini Sukumar /
Product
Jun 09, 2023
Navigate to:
InfluxDB Cloud Dedicated is a hosted and managed InfluxDB Cloud cluster dedicated to a single tenant. The InfluxDB time series platform is designed to handle high write and query loads so you can use and leverage InfluxDB Cloud Dedicated for your specific time series use case. In this tutorial, we walk through the process of reading data from InfluxDB Cloud Dedicated using the Java Flight SQL client. The Java Flight SQL client is part of Apache Arrow Flight, a framework for building high-performance data services. It provides a way to efficiently transmit large datasets over gRPC, a modern high-performance RPC framework. Try querying InfluxDB Cloud Dedicated with the Java Flight SQL client for yourself with this repo.
Requirements and setup
This tutorial assumes that you already have an InfluxDB Cloud Dedicated account. It also assumes that you have Docker available.
Please keep your cluster URL handy.
You will need to create the following:
-
A source database
-
A target database
-
Source token with Read/Write permissions
-
Target token with Read/Write permissions
During the initial setup, you need to load the data for downsampling. For the purpose of this tutorial, I have used the NOAA air sensor dataset. However, other data sources such as Telegraf can be used. Here is a simple Telegraf configuration to load the data. Check out the following documentation on writing data to InfluxDB Cloud Dedicated for other methods of writing data to InfluxDB Cloud.
Code walkthrough
- Import required classes: We start by importing the required classes from Apache Arrow Flight and other necessary libraries.
import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.types.pojo.ArrowType; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.temporal.TemporalField; import java.util.concurrent.CountDownLatch;
- Define the main class: We define a CloudReadWriteExample class with a
main
method where our code will execute.public class CloudReadWriteExample { public static void main(String[] args) {
- Set up the connection configurations: We expect that the following variables are set in ENV:
Source/target host: The source host is the Cloud Dedicated cluster URL without the protocol (“https://”) and the target host is the cluster URL with the protocol (“https://”)
Source/Target database: In this example, the source database stores the air sensor data before downsampling and the target database stores the downsampled data.
Source/target token: Tokens for the source and target databases, respectively.
Query: In this example, the query variable contains the downsampling query. You can replace it with any query based on your requirement.
//ReadConfigs private static final String SOURCE_HOST = System.getenv("SOURCE_URL"); private static final String SOURCE_HOST = System.getenv("SOURCE_HOST"); private static final String READ_TOKEN = System.getenv("READ_TOKEN"); private static final String SOURCE_DATABASE_NAME = System.getenv("SOURCE_DATABASE_NAME"); private static final String DOWNSAMPLE_QUERY = System.getenv("DOWNSAMPLE_QUERY"); //WriteConfigs private static final String TARGET_CLUSTER_URL = System.getenv("TARGET_URL"); private static final String TARGET_CLUSTER_URL = System.getenv("TARGET_CLUSTER_URL"); private static final String WRITE_TOKEN = System.getenv("WRITE_TOKEN"); private static final String TARGET_DATABASE_NAME = System.getenv("TARGET_DATABASE_NAME"); private static final String TARGET_TABLE_NAME = System.getenv("TARGET_TABLE_NAME");
- Create an interceptor that injects header metadata, in this case the database name, in every request.
FlightClientMiddleware.Factory f = info -> new FlightClientMiddleware() { @Override public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { outgoingHeaders.insert("database", SOURCE_DATABASE_NAME); }
- Create a ‘Location’ object using the
forGrpcTls
method, which sets up the connection with gRPC and Transport Layer Security (TLS) encryption.Location location = Location.forGrpcTls(SOURCE_HOST, 443);
- Authentication: We set up the authentication using the
BearerCredentialWriter
andCredentialCallOption
classes. Replace 'READ_TOKEN' with your actual Cloud Dedicated (i.e., source database) authentication token.CredentialCallOption auth = new CredentialCallOption(new BearerCredentialWriter(READ_TOKEN));
- After successful authentication,
execute
returns aFlightInfo
object that contains metadata and an endpoints list. Each endpoint contains the following: - A list of addresses where you can retrieve the data. - Aticket
value that identifies the data to retrieve.FlightInfo flightInfo = sqlClient.execute(DOWNSAMPLE_QUERY, auth); // Extract the Flight ticket from the response. Ticket ticket = flightInfo.getEndpoints().get(0).getTicket();
- Retrieve the stream data of the query from the endpoint.
// Pass the ticket to request the Arrow stream data from the endpoint. final FlightStream stream = sqlClient.getStream(ticket, auth);
- Create the InfluxDB
WriteApi
function using following code:WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() .batchSize(5000) .flushInterval(1000)
- The code below iterates over the stream data, prepares measurement for each row, and writes the prepared point into the target database.
while (stream.next()) { try { // Get the current vector data from the stream. final VectorSchemaRoot root = stream.getRoot(); System.out.println(root.contentToTSVString()); InfluxDBClient influxDBClient = InfluxDBClientFactory.create(TARGET_CLUSTER_URL, WRITE_TOKEN.toCharArray(), "", TARGET_DATABASE_NAME); CountDownLatch countDownLatch = new CountDownLatch(1); try (WriteApi writeApi = influxDBClient.makeWriteApi(WriteOptions.builder() .batchSize(5000) .flushInterval(1000) .backpressureStrategy(BackpressureOverflowStrategy.DROP_OLDEST) .bufferLimit(10000) .jitterInterval(1000) .retryInterval(5000) .build())) { writeApi.listenEvents(WriteSuccessEvent.class, (value) -> countDownLatch.countDown()); writeAsPoints(writeApi, root); writeApi.flush(); } } catch (Exception e) { // Handle exceptions. System.out.println("Error executing FlightSqlClient: " + e.getMessage()); } }
- The code below is a utility method to prepare a measurement from each row from
ArrowStream
data.private void writeAsPoints(WriteApi writeApi, VectorSchemaRoot root) { int fields = root.getSchema().getFields().size(); for(int i = 0; i < rowCount; i++) { Point point = Point.measurement(TARGET_TABLE_NAME); for (int j=0;j<fields;j++) { String fieldName = root.getSchema().getFields().get(j).getName(); ArrowType fieldType = root.getSchema().getFields().get(j).getType(); if (fieldName.equalsIgnoreCase("time") && fieldType instanceof ArrowType.Timestamp) { point.time(((LocalDateTime) root.getFieldVectors().get(j) .getObject(i)).atZone(ZoneId.systemDefault()).toInstant(), WritePrecision.NS); } else { point.addField(fieldName, root.getFieldVectors().get(j).getObject(i).toString()); } } writeApi.writePoint(TARGET_DATABASE_NAME, TARGET_TABLE_NAME, point); } }
You can find the full script here.
Query, perform downsampling, and write data back to InfluxDB Cloud Dedicated with Java Flight SQL
To run the example in the corresponding repo, follow the following steps:
-
Clone the repo and cd into it.
-
Run
docker build -t myimage
-
Run
docker run myimage
Visualizing the data with Grafana
Use Grafana to query and visualize data stored in an InfluxDB Cloud Dedicated database. InfluxDB Cloud Dedicated supports both SQL and InfluxQL query languages. Install the Grafana FlightSQL plugin to query InfluxDB with SQL using the Flight SQL protocol.
This Grafana installation link provides instructions on how to install Grafana and visualize the data.
Querying the data with Grafana
You can use the Flight SQL connection data source to query the data in a Cloud Dedicated database.
Resources and conclusion
Take a look at the following documentation. It helped me build this example, and it can help you on your journey with querying InfluxDB Cloud Dedicated:
-
Reference documentation for Arrow Flight.
-
Reference documentation for Arrow Flight SQL for the Java Client.
-
InfluxDB Cloud documentation for Querying data with Arrow Flight SQL in Python.
-
A blog post on InfluxDB, Flight SQL, Pandas, and Jupyter Notebooks Tutorial.
-
A blog post on Downsampling with Flight SQL and AWS Lambda.
I hope this blog post inspires you to explore InfluxDB Cloud Dedicated and take advantage of Flight SQL to transport large datasets from InfluxDB for data processing with the tools of your choice. If you need any help, please reach out using our community site or Slack channel. I’d love to hear about what you’re trying to achieve and what features you’d like InfluxDB to have.