InfluxDB Python Client Library: A Deep Dive into the WriteAPI
By
Josh Powers /
Product, Use Cases
Aug 19, 2022
Navigate to:
InfluxDB is an open-source time series database. Built to handle enormous volumes of time-stamped data produced from IoT devices to enterprise applications. As data sources for InfluxDB can exist in many different situations and scenarios, providing different ways to get data into InfluxDB is essential.
The InfluxDB client libraries are language-specific packages that integrate with the InfluxDB v2 API. These libraries give users a powerful method of sending, querying, and managing InfluxDB. Check out this TL;DR for an excellent overview of the client libraries. The libraries are available in many languages, including Python, JavaScript, Go, C#, Java, and many others.
This post will walk users through obtaining the Python client library and API structure and demonstrate how to connect, write, and prepare data with Python! Python has seen immense growth and adoption by developers due to its ease of learning and use.
Getting started
Download
The InfluxDB Python client library is available directly from PyPI for easy installs with pip or as a dependency in a project:
pip install influxdb-client
The InfluxDB Python client library supports InfluxDB Cloud, InfluxDB 2.x, and InfluxDB 1.8. It is built and tested to support Python 3.6 and newer.
Note that the support of InfluxDB 1.8 is limited to a subset of APIs and requires a few differences; these are called out further in this post.
Package extras
The client library is intentionally kept small in size and dependencies. However, there are additional package extras available that users can use to pull in other dependencies and enable some additional features:
-
influxdb-client[ciso]
: makes use of the ciso8601 date time parser. It utilizes C-bindings, which result in faster handling of date time objects at the cost of requiring the use of C-bindings. -
influxdb-client[async]
: as the name implies, this allows for the use and benefit of asynchronous requests with the client library if the user’s tools use the async and await Python commands. -
influxdb-client[extras]
: adds the ability to use Pandas DataFrames. The Pandas library is a commonly used data analysis tool. These additional dependencies are large in size and not always needed; therefore, it was included as a separate extra package.
API & documentation
The client library API and documentation are available on Read the Docs.
Source
If a user wants to build or use the library from the source, it is available on GitHub:
git clone https://github.com/influxdata/influxdb-client-python
API overview
At a high level, the API consists of a client, providing access to various APIs exposed by InfluxDB for a specific instance.
The InfluxDBClient is used to handle authentication parameters and connect to InfluxDB. There are several different ways to specify the parameters, which the following section will demonstrate.
InfluxDB fundamentals
Once connected, there are three APIs that handle fundamental interactions with InfluxDB:
-
WriteApi: write time series data to InfluxDB
-
QueryApi: query InfluxDB using Flux, InfluxDB’s functional data scripting language
-
DeleteApi: delete time series data in InfluxDB
Tasks & Scripts
Users can also use the client library to create tasks, invokable scripts, and labels:
-
TasksApi: Use tasks (scheduled Flux queries) to input a data stream and then analyze, modify, and act on the data accordingly
-
InvokableScriptsApi: Create custom InfluxDB API endpoints that query, process, and shape data. To learn more about how Invokable Scripts can empower a user, check out this TL;DR for more details!
InfluxDB administration
Finally, users can directly administer their instance via the final set of APIs:
-
BucketsApi: Create, manage, and delete buckets
-
OrganizationApi: Create, manage, and delete organizations
-
UsersApi: Create, manage, and delete users
-
LabelsApi: Add visual metadata to dashboards, tasks, and other items in the InfluxDB UI
Also check out the InfluxData Meet the Developer videos for more guided steps to using these APIs!
InfluxDBClient setup
The user first needs to create a client to gain access to the various APIs. The client requires connection information, which is comprised of the following:
-
URL: URL of the InfluxDB instance (e.g.,
http://192.168.100.10:8086
) with the hostname or IP address and port. Also, note that if secure HTTP is set up on the server, the user will need to use thehttps://
protocol. -
Access Token: the access token to authenticate to InfluxDB. If using InfluxDB 1.8, usernames and passwords are used instead of tokens. Set the token parameter using the format
username:password
. -
Org: the org the token has access to. In InfluxDB 1.8, there is no concept of organization. The org parameter is ignored and can be left empty.
The above connection information can be specified via file, the environment, or in code.
Via configuration file
Rather than hard-coding a token in code, users can specify the token with a configuration file and limit what users have access to the configuration file.
The file can use a toml
or ini
format. Examples of both are below:
# TOML-based config
[influx2]
url = "http://localhost:8086"
org = "my-org"
token = "my-token"
; ini-based config
[influx2]
url = http://localhost:8086
org = my-org
token = my-token
Users can also specify additional configuration details like timeout, proxy settings, and global tags to apply to data. Check out the entire configuration settings list, including default tags for new data.
Then in code, the user can load the file and create a client as follows:
from influxdb_client import InfluxDBClient
with InfluxDBClient.from_config_file("config.toml") as client:
# use the client to access the necessary APIs
# for example, write data using the write_api
with client.write_api() as writer:
writer.write(bucket="testing", record="sensor temp=23.3")
Via environment variables
Users can export or set any of the following environment variables:
INFLUXDB_V2_URL="http://localhost:8086"
INFLUXDB_V2_ORG="my-org"
INFLUXDB_V2_TOKEN="my-token"
See the docs for a complete list of recognized environment variables, including setting default tags for new data.
Then in code, the user can create a client as follows:
from influxdb_client import InfluxDBClient
with InfluxDBClient.from_env_properties() as client:
# use the client to access the necessary APIs
# for example, write data using the write_api
with client.write_api() as writer:
writer.write(bucket="testing", record="sensor temp=23.3")
Via code
The client library users can also provide the necessary information in code. This method is discouraged as it results in a hard-coded token that exists in code. While it is easy to get going, having credentials in a configuration file is the preferred option.
from influxdb_client import InfluxDBClient
url = "http://localhost:8086"
token = "my-token"
org = "my-org"
with InfluxDBClient(url, token) as client:
# use the client to access the necessary APIs
# for example, write data using the write_api
with client.write_api() as writer:
writer.write(bucket="testing", org=org, record="sensor temp=23.3")
Note the configurations set using a file or environment variables specified an organization. That organization is the default for the query, write, and delete APIs. Users can also specify a different organization to override a set value when making a query, writing, or deleting.
The docs list out the additional possible parameters when creating the client.
Write data with the WriteApi
Once a client is created, users then have access to use the various APIs. The following will demonstrate the write query API to send data to InfluxDB.
Batches
By default, the client will attempt to send data in batches of 1,000 every second:
from influxdb_client import InfluxDBClient
with InfluxDBClient.from_config_file("config.toml") as client:
with client.write_api() as writer:
writer.write(bucket="testing", record="sensor temp=23.3")
If an error is hit, the client retries after five seconds and uses exponential back-off for additional errors up to 125 seconds between retries. Retries are attempted five times or up to 180 seconds of waiting, whichever happens first.
Users are free to modify any of these settings by setting the write_options value when creating a write_api object. The time-based options are in milliseconds.
from influxdb_client import InfluxDBClient, WriteOptions
options = WriteOptions(
batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2
)
with InfluxDBClient.from_config_file("config.toml") as client:
with client.write_api(write_options=options) as writer:
writer.write(bucket="testing", record="sensor temp=23.3")
Synchronous
While this is not the default method for writing data, synchronous writes are the suggested method of writing data. This method makes it easier to catch errors and respond to them. Additionally, users can still break up their data into batches either manually or using a library like Rx to get similar behavior to the batch writes.
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
with InfluxDBClient.from_config_file("config.toml") as client:
with client.write_api(write_options=SYNCHRONOUS) as writer:
writer.write(bucket="testing", record="sensor temp=23.3")
Asynchronous
If a user does not want to block their application while data is sent to InfluxDB, then the asynchronous client and write APIs are available. Keep in mind that using the asynchronous requires the additional dependencies included with the influxdb-client[async]
package extra and the special async client with access to a different API as well:
import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
async def main():
async with InfluxDBClientAsync(
url="http://localhost:8086", token="my-token", org="my-org"
) as client:
await client.write_api().write(bucket="my-bucket", record="sensor temp=23.3")
if __name__ == "__main__":
asyncio.run(main())
Different methods to prepare your data
InfluxDB uses line protocol format, which is made up of a measurement name and fields, as well as optional tags and timestamps. The client libraries allow for specifying data in several different ways, and users can use whichever option works best for the data format getting imported!
String
The first option is a string containing line protocol format. This demonstrates one option if a user is reading influx line protocol directly from a file or wants to build data strings when using the data with Python.
records = """
cpu,core=0 temp=25.3 1657729063
cpu,core=0 temp=25.4 1657729078
cpu,core=0 temp=25.2 1657729093
"""
The new line character must separate each entry in line protocol. Entries that end up on the same line without a \n
between them will result in an error in parsing the data.
Dictionary
The second option uses a dictionary that specifies the various parts of the line protocol format. This option might be best for users who are parsing a file and building their data points at the same time.
records = [
{
"measurement": "cpu",
"tags": {"core": "0"},
"fields": {"temp": 25.3},
"time": 1657729063
},
{
"measurement": "cpu",
"tags": {"core": "0"},
"fields": {"temp": 25.4},
"time": 1657729078
},
{
"measurement": "cpu",
"tags": {"core": "0"},
"fields": {"temp": 25.2},
"time": 1657729093
},
]
Point Helper Class
The client library has a Point class that allows users to build measurements easily. This class helps users format the data into the various parts of line protocol, ensuring properly serialized data. The tag and field are repeatable, allowing for adding many tags and fields at once.
from influxdb_client import Point
records = [
Point("cpu").tag("core", "0").field("temp", 25.3).time(1657729063),
Point("cpu").tag("core", "0").field("temp", 25.4).time(1657729078),
Point("cpu").tag("core", "0").field("temp", 25.2).time(1657729093),
]
Pandas DataFrame
Users can pass Pandas DataFrames directly in, when the influxdb-client-python[extras] extras package is installed. Users can pass a data frame directly in and specify which columns to use as tags and the measurement name.
import pandas as pd
from influxdb_client import InfluxDBClient
records = pd.DataFrame(
data=[
["0", 25.3, 1657729063],
["0", 25.4, 1657729078],
["0", 25.2, 1657729093],
],
columns=["core", "temp", "timestamp"],
)
with InfluxDBClient.from_config_file("config.toml") as client:
with client.write_api() as writer:
writer.write(
bucket="testing",
record=records,
data_frame_measurement_name="cpu",
data_frame_tag_columns=["core"],
)
Note there are many ways to create Pandas DataFrames, and this is only one example. Consult Pandas DataFrame docs for more examples.
Data Class
Users who take advantage of Python’s Data Classes can pass them directly in and then specify which attributes to use for the tags, fields, and timestamp when passing data. Data Classes were first made available in Python 3.7 via PEP 557.
from dataclasses import dataclass
from influxdb_client import InfluxDBClient
@dataclass
class CPU:
core: str
temp: float
timestamp: int
records = [
CPU("0", 25.3, 1657729063),
CPU("0", 25.4, 1657729078),
CPU("0", 25.2, 1657729093),
]
with InfluxDBClient.from_config_file("config.toml") as client:
with client.write_api() as writer:
writer.write(
bucket="testing",
record=records,
record_measurement_name="cpu",
record_tag_keys=["core"],
record_field_keys=["temp"],
record_time_key="timestamp",
)
Named Tuple
Finally, Named Tuples assign meaning to each position in a tuple allowing for more readable, self-documenting code. Users can pass a named tuple directly in and then specify which tuple field name should be used as tags, fields, and timestamp.
from collections import namedtuple
from influxdb_client import InfluxDBClient
class CPU:
def __init__(self, core, temp, timestamp):
self.core = core
self.temp = temp
self.timestamp = timestamp
record = namedtuple("CPU", ["core", "temp", "timestamp"])
records = [
record("0", 25.3, 1657729063),
record("0", 25.4, 1657729078),
record("0", 25.2, 1657729093),
]
with InfluxDBClient.from_config_file("config.toml") as client:
with client.write_api() as writer:
writer.write(
bucket="testing",
record=records,
record_measurement_name="cpu",
record_tag_keys=["core"],
record_field_keys=["temp"],
record_time_key="timestamp",
)
Check out the Python Client Library today
This post has shown how quick, easy, and flexible the Python InfluxDB client library is to use. While the above only demonstrated the write API, it starts to demonstrate the great power users can have when interacting with InfluxDB. Combined with the other APIs, users have even more options and potential.
Consider where you might be able to use InfluxDB and the client libraries and give them a shot today!