Getting Started: Streaming Data into InfluxDB
By
Anais Dotis-Georgiou /
Product, Use Cases, Developer, Getting Started
Aug 09, 2020
Navigate to:
This is Part Two of Getting Started Tutorials for InfluxDB v2. If you’re new to InfluxDB v2, I recommend first learning about different methods for writing static data in batches to InfluxDB v2 in Part One of this Getting Started series. This is a beginner’s tutorial for how and when to write real-time data to InfluxDB v2 using:
- Telegraf and the Exec Plugin
- Telegraf and the Tail Plugin
The repo for this tutorial is here. For this tutorial, I used the Alpha Vantage Stock and Crypto API to get intraday time series data. If you would like to learn more about how to work with time series data in general, you can check out this article about market data APIs. After you claim your key, Alpha Vantage offers real-time intraday BTC price and volume data in 5 min resolution. Here is an example of the output data.
1. Telegraf and the Exec Plugin
Using Telegraf with the Exec Input Plugin allows the user to exercise commands at a set interval to retrieve metrics and write them to InfluxDB. First, I make a request to Alpha Vantage and convert the last data point to line protocol, the data ingest format for InfluxDB, with nanosecond precision with the function data_requests()
. I am interested only in gathering the last datapoint because I don’t want to rewrite 24 hours worth of data every 5 minutes.
#exec.py
import pandas as pd
import requests
from alphavantage_auth import key
import datetime
import time
#Using Alpha Vantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
#make request
def data_request():
data = requests.get(target_url).json()
#data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
#we only want the last datapoint
t = [t for t in data['Time Series (Digital Currency Intraday)']]
t = t[0]
#convert human readable time to unix time
t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
unix = int(t.strftime("%s"))
#convert timestamp to nanosecond precision
unix_ns = str(unix) + "000000000"
fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
#convert to line protocol
line = str("price"
+ ",type=BTC"
+ " "
+ "price=" + str(fields[0]['1a. price (USD)']) + ","
+ "volume=" + str(fields[0]['2. volume'])
+ " " + unix_ns)
# print("data gathered and converted")
return(line)
Now that we have a Python script that returns line protocol data, create a Telegraf configuration with the Exec Input plugin and the InfluxDB v2 Output Plugin. Please follow the documentation to manually configure Telegraf with the UI. Make sure you add the following changes to the plugins:
- Set the data collection interval to 300s (5 min) since Alpha Vantage only offers data in 5 min resolution:
interval = "300s"
- Specify the timestamp precision:
precision = "ns"
- Omit the "host" tag:
omit_hostname = true
- Specify your InfluxDB instance:
urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] # required
- Specify your bucket, token, and organization
- Specify the command for the exec plugin to execute (line 228):
commands = ["python /Users/anaisdotis-georgiou/Desktop/GettingStarted_StreamingData/exec.py" ]
- Since I only have one command, I can comment out the name suffix:
#name_suffix = "_mycollector"
- Specify the data format to consume. Since the data has been converted to line protocol, influx is chosen:
data_format = "influx"
2. Telegraf and the Tail Plugin
The Tail Input Plugin tails a logfile and parses the metrics, and Telegraf writes the data to InfluxDB. Tail.py is similar to exec.py except that each new point is appended to tail.txt first instead of being written directly to InfluxDB. Also, the run delay is defined within the tail.py with time.sleep()
instead of being specified by the execution interval within the config, like with the Exec plugin.
import pandas as pd
import requests
import time
from alphavantage_auth import key
import datetime
#Using Alphavantage to get BTC prices every 5 make_lines
#Get your key here: https://www.alphavantage.co/support/#api-key
apikey = key
url = "https://www.alphavantage.co/query?"
function = "DIGITAL_CURRENCY_INTRADAY"
symbol = "BTC"
market = "USD"
#build target url
target_url = url + "function=" + function + "&symbol=" + symbol + "&market=" + market + "&apikey=" + apikey
while True:
#make request
data = requests.get(target_url).json()
#data is returned in the following format: https://www.alphavantage.co/query?function=DIGITAL_CURRENCY_INTRADAY&symbol=BTC&market=EUR&apikey=demo
#we only want the last datapoint
t = [t for t in data['Time Series (Digital Currency Intraday)']]
t = t[0]
t = datetime.datetime.strptime(t, "%Y-%m-%d %H:%M:%S")
unix = int(t.strftime("%s"))
#convert to nanosecond precision
unix_ns = str(unix) + "000000000"
fields = [v for k, v in data['Time Series (Digital Currency Intraday)'].items()]
#convert to line protocol
line = ["price"
+ ",type=BTC"
+ " "
+ "price=" + str(fields[0]['1a. price (USD)']) + ","
+ "volume=" + str(fields[0]['2. volume'])
+ " " + unix_ns]
thefile = open('Data/tail.txt', 'a+')
for item in line:
thefile.write("%s\n" % item)
print("line added")
#Alphavantage only adds points every 5 min, so set script to sleep for 5 min as well
time.sleep(300)
Now that I have a Python script that appends line protocol data to a txt file, I manually configure Telegraf to run a config that contains the Tail Input plugin and the InfluxDB_v2 Output plugin with these changes:
- Specify the precision of your data (line 64). For this example, our BTC data has been converted to ns precision, so:
precision = "ns"
- Since we're not performing a monitoring task, we don't care about setting a 'host' tag. Set
omit_hostname = true
so that Telegraf doesn't set a 'host' tag (line 93). - Navigate to the OUTPUT PLUGIN section.
- Specify your InfluxDB instance(line 111):
urls = ["https://us-west-2-1.aws.cloud2.influxdata.com"] # required
- Specify your bucket, token, and organization (line 113-120)
- Navigate to the SERVICES INPUT PLUGIN section.
- Specify the absolute path for you line protocol txt file (line 544):
- Make sure that the Tail plugin is only reading new points by setting from_beginning to false (line 546):
from_beginning = false
- Specify the method of data digestion (last line):
data_format = "influx"
I hope you’re starting to feel more comfortable using Telegraf and InfluxDB. If you have any questions, please post them on the community site or tweet us @InfluxDB. Thanks!