How to Create a Telegraf Parser Plugin for Data Stored in Kafka
Session date: Jun 16, 2020 08:00am (Pacific Time)
Is your data in an exotic format stored in Kafka? Let’s write a Telegraf Plugin! This session is a step-by-step presentation on how to develop a Telegraf parser plugin. The focus will be on Apache Avro, a popular data serialization format widely used in Kafka-based data pipelines.
Watch the Webinar
Watch the webinar “How to Create a Telegraf Parser Plugin for Data Stored in Kafka” by filling out the form and clicking on the Watch Webinar button on the right. This will open the recording.
[et_pb_toggle _builder_version=”3.17.6” title=”Transcript” title_font_size=”26” border_width_all=”0px” border_width_bottom=”1px” module_class=”transcript-toggle” closed_toggle_background_color=”rgba(255,255,255,0)”]
Here is an unedited transcript of the webinar “How to Create a Telegraf Parser Plugin for Data Stored in Kafka”. This is provided for those who prefer to read than watch the webinar. Please note that the transcript is raw. We apologize for any transcribing errors.
Speakers:
-
- Caitlin Croft: Customer Marketing Manager, InfluxData
- Emanuele Falzone: Ph.D. Student at Politecnico di Milano
- Smanatha Wang: Product Manager, InfluxData
Caitlin Croft: 00:00:04.000 Hello everyone. Once again, welcome to today’s webinar. Super excited to have all of you join. My name is Caitlin Croft. Today we will be talking about how to create a Telegraf parser plugin for data stored in Kafka. And joining with me is Emanuel, who is a PhD student in Italy. And what’s kind of cool is he’s working on his PhD under the advisement of one of our primary Flux trainers. So if you’ve attended our Flux training, he’s well versed in InfluxDB. So, without further ado, I will hand it off to Emanuel.
Emanuele Falzone: 00:00:47.000 Okay, thank you Caitlin. And I’d also like to think you, InfluxData, for this opportunity. So, today as Caitlin said, we are here to talk about Telegraf. That is this magic component that we usually use with Influx Database. And in particular, we are going to see how we can extend Telegraf to make it handle some strange kind of data that you probably have stored in Kafka.
Emanuele Falzone: 00:01:21.000 A few words about me. My full name is Emanuel Falzone. As Caitlin said, I’m a PhD student at Politecnico di Milano, and I really love free and open source software. And since Telegraf is open source, that is the main point about this whole webinar. If something is open source, you can and you are encouraged to modify it and to provide some new features. You can also find my contacts list, so if you want to reach me in a separate way or using a separate channel, you can reach me using my email or my GitHub, my website, or also after this webinar.
Emanuele Falzone: 00:02:07.000 Before going in too deep of this webinar, I’d like to give you an outline of what we are going to see. And we will start from an introduction with the data life cycle, and we will see how Telegraf and Influx Database are placed inside this data life cycle. And then we will move to our use case. That is, let’s say a mock one, but still interesting, that makes use of Apache Avro. That is a data format that we are going to use. And we will see how we can extend Telegraf in order to make it parse Apache Avro messages. At the end, I will provide you some hint about another use case that is related to bank transactions. And at the end, I will throw [inaudible]. I would like to also disclaim myself, since I will not explain what are Kafka and Influx Database, and I will suppose that all of you people have let’s say a grasp knowledge about them.
Emanuele Falzone: 00:03:17.000 So, let’s start from the data life cycle. We can divide our data life cycle in four phases. The first one is ingestion, where the data is produced and probably ingested by some application. Then you have the storage phase, where you want to store your data in a reliable way. Then you would like to exploit such data and do some kind of analysis in order to get some useful information from the data that you store. And at the end, you would like to visualize the results of your analysis in order to let such information be available also to non - technical users. Let’s see how InfluxDB is placed inside this data life cycle. And since InfluxDB is really powerful, we can use it for both storage analysis and visualization while we usually use a custom application to handle the ingestion phase. The main problem is how you can go from the ingestion phase to the storage phase, so how you can get your data into Influx Database. And there are two scenarios, the one that makes use of Telegraf, and the other that does not. On the left we can see that without Telegraf, what we have to do is to modify our custom application, or maybe create another small custom application, that takes the data and push it into Influx Database. So it will have to use the Influx Database API and so on in order to take the data from the ingestion phase and in order to have it inside Influx Database.
Emanuele Falzone: 00:05:16.000 While on the right, if we use Telegraf, things changes, because we can’t use Telegraf as a middleware, and Telegraf will be responsible of pulling the data from our custom application - so in this case, we don’t have to modify our custom application - and push the data into Influx Database. So the main advantage of using Telegraf is that, I mean, if your custom application is not so strange, probably Telegraf will be able to pull the data from such custom application and automatically into line protocol that is the data format of InfluxDB and push the data into InfluxDB. So if we go back to our data life cycle, Telegraf is the component that allow us from going from the ingestion phase to the storage phase. So it will take the data from the custom application and push the data into Influx Database.
Emanuele Falzone: 00:06:17.000 So we can now move to our use case. That is, a use case that is based on a system that is a customer relationship management system that is available for different platforms. So we have different channels that we can use to use this system. That are web, iOS, and Android application. And we also have different club status, so depending on the club status that we have, customers, or in this case also we can name it users, can have access to different type of features. And we have four levels of club status. And the whole code related to this use case is available in one of my repositories on GitHub. And here you can find the link, and I will make the link available also after the presentation.
Emanuele Falzone: 00:07:13.000 The final results that we want to achieve are these plots. So we want to build a dashboard using Influx Database in order to see the average ratings by channel, by club status, and by gender. And in the meantime, we would like also to see the total number of ratings that we have and the overall ratings of our system. And as you can see, your rating is 2.51, so our customer relationship management system is not so good. I’m just kidding. So the main point about that is that, okay, we know what we want to achieve, but we have to take a look at where the data comes from. So we are using Kafka, and we have two main streams. The first one that you can see in the upper left, that is ratings, and that is the stream that really contains the ratings of our customer. So we have inside this stream, the rating ID, the user ID, the stars that are the actual rating - so, in this case, five, that is the best ratings that we can achieve - and the channel. So in this case, the ratings was provided through the web platform. And in another database, in another let’s say MySQL database, we also have the customer data. And customer data are mainly the first name, last name, and most importantly, the club status. So in this case, this is a platinum user.
Emanuele Falzone: 00:09:07.000 And what we want to do is to join these two streams. And in order to join them, we use KSQL, that works really, really well with Kafka. In order to obtain as a result of the join, flat message that join the two streams in a privacy - preserving way. So we take the stars and the rating time from the ratings stream while we take only the gender and the club status from the customer data. And since KSQL works really well with Apache Avro format, our final results will be this topic that is called ratings with customer data, that will contain messages that are in Apache Avro format. So the main point that we have to investigate is how Apache Avro works and what it is. Apache Avro is a binary format that is self - describing, meaning that inside the data you have both the schema - for example, in this case, the integer type - and the value, that in this case is 42. And the main advantage of using Avro is that given a schema, you can automatically generate the encoder and the decoder for the values. So we can see now an example of such schema and value, that are the ones that we really have in Kafka. So on the left we have the schema, that as you can see has its own namespace that allow us to identify the schema in a unique way, and has a list of fields. For example, we have the first one, that is rating time, that can be of type null or long, and stars that can be of type null or int. So these fields are optional, and we also provide the default value.
Emanuele Falzone: 00:11:24.000 And on the right we can see an actual implementation of such schema, so a real value that specify a real value for each field that we define inside the schema. So we have the rating time that is of type long and has a real value in Unix time, a channel that is a string, that is the iOS test version in this case, the stars that is an integer and has this [inaudible] value, and so on. The main advantage of using Avro, as I said, is to automatically generate an encoder and decoder. So if we take a look at this encode and decode process from the schema and the value that we have that are adjacent objects, we can encode both the schema and the value in a binary format, so we will have this list of bytes. And starting from this list of bytes, we can decode the whole message getting back to the schema and the value. The main problem is that inside those list of bytes, you have to place both the schema and the value, and there are several ways to place them inside the message. The first one is to use the first N bytes to put let’s say the schema inside the message and use the other N bytes for the binary encoding of the value, so the binary encoding of the real message. The main problem about that is that if you have such messages in a Kafka topic, you are probably repeating the same schema for each message, so in some way you are wasting space since the schema inside the Kafka topic would probably be the same for each message.
Emanuele Falzone: 00:13:33.000 So what Confluent did, that is another company that works with Kafka, what they did was to build this schema registry component that we can see like a hash table where we have the key that is the ID of the schema and the schema that is the adjacent representation of the schema itself. And this component exposes web REST API that you can invoke using get request and passing the ID of the schema in order to get back the original schema. So using the schema registry, the encoding of the Avro messages inside the Kafka topic, it’s different. We have the first byte, that is the magic byte reserved by Confluent for versioning purpose. Then we have four bytes that are the schema ID, and we can use those four bytes to get the real value of the schema ID and check into the schema registry if there is a real schema associated with the schema ID, and get the real schema in order to build a decoder. And the remaining part of the message is still the same. We have N bytes that contain the real data, so the binary encoding of the value.
Emanuele Falzone: 00:15:11.000 So if we take a look at the architecture that we want, we would like to have Telegraf talking with both Kafka and the schema registry. And in particular, what Telegraf has to do is to pull the messages from Kafka, extract the schema ID from each message, then ask to the schema registry the corresponding schema, and once it has the real schema, it can build decoder and decode the messages that he retrieved from Kafka. And once he has this plain message let’s say, it can easily convert it into line protocol and send it to Influx Database.
Emanuele Falzone: 00:16:06.000 So we will now move talking about Telegraf, and we will analyze in deep what is Telegraf, how it works, and finally we will see how we can extend it in order to make it understand Avro. So Telegraf is the most -used tool to feed data into Influx Database. So there are a lot of people that use Telegraf for this purpose, and probably even if you have a strange scenario, you should check if Telegraf could be your solution for coding the data that you have into Influx Database. It is written in Go lang, so I hope that some of you are familiar with Go, but it’s a language like the others, so it will not be difficult to read it. It’s open source, so you can find the sources online, and you can download it, modify, and do whatever you want. The most interesting point about Telegraf is that it has a plugin architecture, with more than 200 plugins, each one with its documentation and examples. And it is really easy to use, since they provide [inaudible], and the only thing that you have to do to make Telegraf work is to fill a configuration file, and that’s it. From now on, we will focus on the plugin architecture, because this is the point that is more interesting for our purpose.
Emanuele Falzone: 00:17:46.000 So we can divide the space in two ways. The one that we have on the left, that is the outside space, and the one that we have on the right, that is the Telegraf boundary. So this is where we are going to manage the data inside Telegraf. And the first component that we are going to see are the input plugins, that are the ones that are responsible of pulling the data from outside, creating this data flow that comes into Telegraf. And the output of such input plugins are bytes or string. So they must be in order at least of bytes or a simple string. Then we have the parser plugins that are the ones that are responsible of converting bytes or string into Telegraf metrics. And now we are probably wondering, what is a Telegraf metric? And we are going to see that. A Telegraf metric is an in - memory object that is closely based to the InfluxDB data model. So we have the measurement name, that is the name space for the metric, the tags that are usually used to identify the metric, the fields that contain the real value, and a timestamp that is associated with the fields. And since each Telegraf metric is an in - memory object, before transmitting it, we have to convert it into a concrete representation, so we have to serialize such Telegraf metric.
Emanuele Falzone: 00:19:42.000 And if we continue with our plugin architecture we can see that after having all our data converted in Telegraf metrics, we can use a serializer plugin to convert such Telegraf metrics into a concrete representation. That could be, for example, line protocol. And then we can finally use an output plugin to send those data to another system, that could be, for example InfluxDB. So as I said at the beginning, we have really a lot of plugins, more than 200, already implemented in Telegraf. And we are going to see this list to let’s say describe the most MQTTused ones. And as input plugin, we can use file, Kafka, Influx, MQTT, HTTP, socket, and also the other services provided by Amazon or Google, and so on. And we also can read the data from relational database or non - relational databases. And the other point that is interesting is that Telegraf already has a lot of parser plugins. So it is able to understand JSON format, CSV, formal URL code, Influx, and Wavefront. And there are a lot of serializers and output plugins that are more or less they work in a similar way with respect to the parser and the input plugins, but firstly, this is not our focus, so we are not going too deep in our knowledge about serializers and output plugins, and secondly, usually people use Telegraf to push data into InfluxDB. So the serializer is the line protocol serializer, the Influx serializer, and the output is the output plugin that sends the data to Influx Database.
Emanuele Falzone: 00:21:56.000 So what you have to do once you know what plugin you want to use, you should configure your configuration file in order to specify what plugin Telegraf has to use. This is a sample configuration file, and we can see that there are mainly two sections. The first one that we have on the left, that is the one related to input plugin. So in this case we are using the file, input plugin, so Telegraf will read from a file that is saved in that temporary folder and it’s named sample.csv. And inside the input plugin specification, we can specify the parser plugin we want to use. So in this case, we use data format equal CSV to specify that we want to use the CSV parser plugin. And below that data format equal CSV, we have a list of parser specific settings. So in this case we are specifying that there is the header, what is the delimiter, what is the measurement column, and so on. On the right, instead, we have the configuration, the output, and in this case, we are specifying what is the URL where we can find our Influx implementation, since we are using an InfluxDB output, and we also have to provide the token organization and bucket that are specific to that output plugin. And at the end, finally, we can use the serializer plugin saying that the data that we want to send to Influx Database should be in the Influx format, so in the line protocol format.
Emanuele Falzone: 00:24:01.000 So now, move your mind back to our use case. And if we see the plugin architecture that we want to use for our use case, we would like to use Kafka consumer as input plugin, we would like to use an Avro parser to go from list of bytes to Telegraf metric. And that is the plugin that is missing. So the easiest way, we are going to extend Telegraf, and then we can finally use Inflex serializer and InfluxDB output to convert the Telegraf metrics that we have into line protocol and send them to Influx Database. So the main problem that we have is how we can map Avro messages into Telegraf metric. And since the schema of each Avro message is composed by a list of fields, what we want to do is to specify for each field, if that field is the timestamp, or it should be a tag, or a field. And finally we should also want to provide what is the measurement that identifies all those metrics. So we want to say, in this case, that the rating time should be interpreted as a timestamp of the metric. The channel should be a tag of the metric. The “STARS” should be a field, and so on. So if we go back to our possible configuration file, what we want to use is to use an input plugin, that is a Kafka consumer, that reads messages from that topic and from that broker. And the interesting point is this one. So we are here specifying that the data format of each message is Avro, and the measurement is this string that we call ratings, and that the Avro fields that are named the CHANNEL, CLUB_STATUS, and GENDER, should be interpreted as tags. And we only have one field, that is stars. And the time tamp is the rating time Avro fields, and it is in Unix format.
Emanuele Falzone: 00:26:24.000 And last but not least, we also have to specify the schema registry, because as we said before, Telegraf also has to extract the schema ID and then check into the schema registry for the corresponding schema. And as you can see, the right part, that is the one related to the output plugin, is the same as before. So we want to use Influx serializer and push the data into our Influx component. So how can I make Telegraf understand our messages? That is the main problem. In few seconds, we will jump into VS code, and we will see a lot of code. But I’d like to provide an overview of what files we are going to touch and what is the main role of each file. So we are going to take a look at the Gopkg.lock and Gopkg.toml, that are the ones that handle the dependency. Then we will have a lot of files under this folder that is called Plugin Parser Avro. And here we will have two files that are parser.go and schemaregistry.go, that are the ones that handle the parser logic, so the ones that we will use to go from a list of bytes into a Telegraf metric. Then we will have a file for tests, so where we will write some tests. And at the end, we have the registry.go and config.go, that are two files that are let’s say the ones that you should let’s say modify in order to let people really use your Telegraf.
Emanuele Falzone: 00:28:22.000 So you have in the first part to define what is your parser logic, but then you also have to modify some files in order to make the whole things work. And at the end, I will also show the documentation related to our parser plugin. And all the code is available in one of my repository, that is a fork of the Telegraf main code base, under the Avro branch. So as our friend Linus told us: “Talk is cheap, show me the code”. And then I will show you the code.
Emanuele Falzone: 00:29:09.000 Okay. So I hope that you are familiar with Visual Studio code and all that kind of stuff. I think that the best point to start is the main logic, so I don’t see my pointer. Okay, so we can start from parser.go. And in this case, we can see that we have our struct that is called parser, and has its own attributes that are the ones that we really want to use. In this case, we have the schema registry, the measurement, the tags fields, timestamp, and so on. And each of them has a corresponding let’s say part inside the configuration file. And the main point about that - sorry, wait a minute, because I can’t see my pointer and I am going crazy. Okay, it’s back, I don’t know why. So we defined this struct that contains the attributes that we need. And then we are going to implement some functions. And those function are defined using an interface, and the main interface is defined here. So you can see the type parser interface. So each parser has to implement three methods. The first one that is parse, that takes a buffer of bytes as input and provides an array of Telegraf metric as output. And we also have to implement parse line. That is more or less the time, but we take string as input. And the last one is set default tags, that is used to set default tags that are coming let’s say from outside. So if we go back to our parser, we have the implementation of such method.
Emanuele Falzone: 00:31:38.000 So I think that we can start from the parse method, that is the more interesting. And it actually do what we discussed before. So it creates a new schema registry object starting from the URL of the schema registry that we provided as an attribute. Then it will compute the schema ID that are the bytes from one to five inside the array of bytes that we have as input. Then it will get the schema, so it will ask the schema registry to get the schema having the corresponding schema ID. And once it has the schema, it can build the decoder. So in order to build the decoder, I’m using an external library that is this one, that is provided by LinkedIn. So I’m using this library to build a decoder starting from the schema. So I will get my codec object using this function and passing this schema that I want to use as a perimeter. And once I have this decoder, I can get a native Go object from the binary encoding of the message. So in this case, as you can see, we are taking from the fifth bytes until the end of the buffer. So we are taking the message part. And we are using this method, native from binary, to get a native object from the binary encoding of the message. And until now, we are happy because we have this native object and we can use this native object to create a real metric and to return an array of metric that simply contains only one metric.
Emanuele Falzone: 00:33:50.000 So we can move to the other interesting method. That is parse line. And in this case, the implementation is really simply, since we convert the string that we have as a parameter into a buffer of bytes and use the parse method that we defined before. And set default tag is really simple, since one of the attributes that we have is default tags and it simply makes this assignment. So once you have completed this file, you have your parser logic ready. I will not go in too deep about the schema I just defined because it is really simple. It’s simply an HTTP client that makes a get call, so a get request, so this is not interesting. The other interesting part is the one related to the test, so we should define another file inside the same folder that ends with underscore test. So it will be let’s say executed during the test phase, and inside this file we can define a series of tests in order to be sure that our parser is working correctly. So in this case I will show you only the first simple one that is in this way. We define a schema. We define a message in an adjacent format. Then we build a local schema registry so that we can simulate the real schema registry. We instantiate a parser, and we encode the message using the library provided by LinkedIn. And at the end, we want to be sure that the string representation of the original message is equal to the string representation of the message that is decoded using our parser. So we can use this kind of loop to test if our parser is able to decode the message in the correct way.
Emanuele Falzone: 00:36:21.000 And if you are extending Telegraf only to use by yourself, maybe you just want to skip tests, but if you are planning to make a pull request and get your code accepted in the official code base, probably there will be a lot of users that are going to use your plugin in production, and you want to prevent unexpected behaviors. So you are really encouraged to write a lot of tests in order to track whole the particular situation that you can have. So at this point, we have our parser implemented and tested. What we have to do is to change some other files in order to let people specify our parser using the configuration file that we have while we are using the Telegraf. In order to do so, we have somehow to track how stuff works from the beginning. So we can start from the configuration.go that has this method that is local load config. That is the one that starting from configuration file loads the real configuration. And what it does, it loads the configuration, it parses the configuration in order to get a table that is a map let’s say from the data. And once it has a table, it can loop over the fields of the table. And in this case, the fields are, let’s say, the first children of the toml that we defined, so there are a few possibilities.
Emanuele Falzone: 00:38:24.000 And one of them, as we saw before, is input. And if we have an input that is in this case, what it does is to add the input to our configuration. So we should see and go to the definition of the add input method. And what the add input method does is to simply check for name and some other kind of stuff. And if you specify a parser, it will build a parser. So this the interesting point. We are going to extend Telegraf providing another parser. So we should go to the definition of build parser and check if we have to implement some other kind of stuff. So starting from build parser, what it does is to get the parser configuration. And this is really interesting, because if we look at this function that is the one that we are interested in right now, what it does is it creates an empty parser config. So what is an empty parser config? The config struct of parser is defined inside the registry.go that is inside the parser folder. And if we look at it, that is this one, it is a struct that contains - the first attributes is the most important. That is the data format, it is the real name of the parser. And this is only a hint, but we know that these data format attributes is associated with the data underscore format that we have inside the configuration file.
Emanuele Falzone: 00:40:20.000 So if I write Avro inside the configuration file I will expect the data format to be equal to Avro. And then we have a list of parser specific attributes. And in this case, the easiest example is the one related to CSV. So you see that we have a lot of custom configuration for the CSV data type. And what we have to do is to add our custom configuration for the Avro type. So we have to add an attribute for the schema registry, an attribute for the measurement, and so on. So we know what is parser config struct. So we can go back to the config, and once the configuration has created this empty struct, what it does is it simply reads data from the table that is the map that we obtained from the configuration file. And it parses it in order to fill in the parser configuration attributes. So in this case what we are going to do if extract, for example, the Avro measurement from the fields of the table, check if it is in the correct type, and finally assign the value that we have to the attributes that we have inside the parser configuration.
Emanuele Falzone: 00:41:59.000 And from the configuration point of view, we are pretty happy because now we can go from the configuration file in a TOML format to a parser config object that actually contains the values that we specified inside the configuration file. And if we go back to our build parser function, once it has the configuration, what it does is to create a new parser. And this function is defined inside the parser, so it is defined inside the registry.go. And what it simply does, it is a switch case on the data format that we’ve seen before, and we have all the cases that are, let’s say, corresponding to the real parser plugin that Telegraf has. And what we have to do is to add this case, so case “Avro”. And what we have to do is to let parser be a new Avro parser using the attributes that we have inside the configuration objects. And what this function new Avro parser does is simply creates a new Avro parser instance and returns it.
Emanuele Falzone: 00:43:32.000 So this is all you have to do in order to extend Telegraf and create your own parser for your own data format. So now I will go back to the slides even if I know that I’m late. So I would like to spend a few words about open source contribution. Telegraf, as I said, is open source, so you are encouraged to do some contribution, and I think that we can let’s say split people into two groups. The first one is composed of people that have really custom data format, so they are probably interested in extending Telegraf, but they don’t want to make an open source contribution because no other people will use their extension, while the other group is composed of people that are working with data formats that are somehow wide spreading over the internet and a lot of people is starting to use them. And so they should consider to do an open source contribution in order to have their extension included in the original code base.
Emanuele Falzone: 00:44:56.000 And briefly what you have to do is to sign the CLA and you have to open an issue on GitHub in order to discuss the changes that you want to make. And this is really interesting because you can probably find other people trying to solve the same problem, or if you’re lucky, you will find other people that are already working on the same problem so you can join the forces and get the stuff done faster and better. Then you have, obviously, to make the real changes, and Telegraf has some guidelines that you want to follow in order to do your extension. And unfortunately, guidelines about parser plugin is still missing, but you can easily read the others and do some kind of merge. And I think that during the code browsing session, I’ve shown you all the files that you have to extend and to implement. So you really are encouraged to do such kind of contribution. And at the end you have to be sure to have your unit tests and documentation provided. I skipped the documentation part because it is really simple. Maybe if we have time later I will go back to documentation part. And finally you can open your pull request and get it accepted.
Emanuele Falzone: 00:46:31.000 The last point is about another possible use case. I provide code about this use case inside this repository, and this use case is related to bank transaction. And in particular, I call this use case, can you help John? And John works in a bank, and he wants to visualize some information inside InfluxDB. And such information can be computed starting from a series of transaction. And we can see a simple transaction in line protocol, a simple one. That is, we have the transaction as measurement, we have sender and receiver as tags, the amount in dollars that is 28, and the related timestamp. And the point is that due to security reasons, those messages before being sent to Kafka, they are encrypted using the public key of the bank. So if you will look at the content of each topic, you will see a lot of meaningless bytes. And what I want you to try is to extend Telegraf providing a parser plugin that is able to decrypt such messages using private key that you provide through the configuration file.
Emanuele Falzone: 00:48:04.000 So I hope that someone of you will try to implement this use case, and I will now draw my conclusion. So during this webinar, we have seen how Influx Database and Telegraf are placed inside the data life cycle, and we’ve seen two use cases. The first one, that is related to user ratings of CRM, and we analyzed this use case deeply. And we analyzed how we can extend Telegraf in order to make it parse Apache Avro messages. And the other one, I simply give you a hint about it, that is a use case that is related to bank transactions. So the point here is that the messages are encrypted using a key, and you can try to build a plugin that is able to decrypt such messages and convert them into line protocol. So if you have any questions, I would be happy to answer them.
Caitlin Croft: 00:49:20.000 Perfect. Thank you Emanuel. We already have a few questions. Is this code already integrated into the official code base?
Emanuele Falzone: 00:49:31.000 No. Not yet, because I still have to implement some more tests because there are a lot of cases that I would like to test. And I still have to open a pull request. I’ve seen in the issues on GitHub that there are a few people that are interested in this case, so they have messages in Apache Avro format and they want to use Telegraf to push them into Influx. But I still have to make the pull request. However, my code is publicly available and I also have a Docker image on Docker Hub if someone wants to try it on the fly.
Caitlin Croft: 00:50:32.000 Perfect. Is there a template with starter code that I can copy to get started, and can you share how you got started from the original project?
Emanuele Falzone: 00:50:45.000 Okay. No, there is no starting template, at least for now. And I think that it is a really great idea. But if you read the other guidelines, the ones related to input and so on, they somehow provide a slightly template stuff that you can actually use. They are really helpful even though they are not built for the parser plugin. And that’s where I started, I started reading from that guidelines, and the problem was that it was the first time for me that I was using the Go language, so there was also this problem that I had to face. And the way that I started was simply cloned the repository, try to understand how the folders and the files are organized. I think there is one file that is really interesting, and you should look at it. Sorry, that is this one, under “.circleci”. That is the description of their continuous integration. And in this case you can let’s say understand how the component is built starting from the source to the Docker image. And the other file that I recommend you to check it out is the “Makefile” that contains some comments that they use inside of “.circleci”. So that’s the two files that I recommend you to start from.
Caitlin Croft: 00:52:55.000 Okay. It appears we are supposed to fork Telegraf itself and put our code into it. Is that correct?
Emanuele Falzone: 00:53:05.000 I would say yes.
Caitlin Croft: 00:53:12.000 Okay. How efficient is it to ask the schema registry for each message when the data is high frequency?
Emanuele Falzone: 00:53:23.000 Okay. You can try to build some kind of a caching system, so for the implementation that we have right now asks the schema registry for each schema ID. So even if you have a lot of messages with a high throughput, it will ask every time to the schema registry for the corresponding schema. I think that a cache extension can be easily implemented, and in this case it will save a lot of network traffic and also a lot of delays due to the avoidance of the network.
Caitlin Croft: 00:54:20.000 Right. Another question. Emanuel, thank you so much for your nice presentation. Have you considered other options like using the Kafka InfluxDB Sink connector to read messages from Kafka and write it to InfluxDB? If so, what would be the advantages of using Telegraf over the InfluxDB Connector in your opinion?
Emanuele Falzone: 00:54:46.000 Okay. Well, I considered using other kind of connector. I think that you can use the ones that are built in Influx Database or others. The main advantage of using Telegraf is that you can place and deploy Telegraf whether you want - and you can also deploy it let’s say somehow far from Influx Database and near to your data, and it will handle all the problems from the place where you have the data to Influx Database. And the other interesting point is that you can also configure Telegraf in order to make it read messages from different sources. So at the same time, you can also have let’s say two or three Kafka deployments and read for both of them or maybe having an MQTT system in the Kafka and use them in the same configuration file. And the main advantage of using Telegraf is, I think, from my perspective, is that once you have the parser plugin already implemented you are probably going to get your work faster and easier because you don’t have to care about all the condition logic, but you only have to specify the mapping between your data format and the Telegraf metric format.
Caitlin Croft: 00:56:50.000 Does your parser support derived data types such as arrays, and how do you write arrays to InfluxDB?
Emanuele Falzone: 00:56:59.000 Okay. That’s an interesting question because - let me explain myself. The implementation that we have now is supposed to have only one message for each Kafka component. But this parser extension was implemented having in mind how Kafka works. The truth is that what you have in the real world is that you probably you are using Avro but you are not using Kafka, and once you are using Avro you can encode messages in a different way. So for example, if you are using the Avro data format to save messages inside a file, you will probably put the schema at the beginning of the file and then have a list of messages. And in this case you will have to extend, furthermore, the implementation that I’ve done in order to let the parser work in different way. So what I’m planning to do is to add another attributes that specify how each message is built. So for example, I’d like to specify the two versions that we discussed during this presentation. So the first version that is the one that for each message, we have the whole schema and the message, and the other one that is the one adopted by Confluent. So I think that this [inaudible] in the users of Apache Avro should be taken into account. But for now I’m not considering it, so thanks for pointing this out.
Caitlin Croft: 00:59:29.000 Great. So I just wanted to let everyone know, we actually have one of our product managers on the call. She’s actually in charge of Telegraf, and as this is a very Telegraf - specific webinar we thought we’d invite her on. So I think she wanted to provide a little bit more insight. So I’ll hand it off to Samantha Wang.
Samantha Wang: 00:59:51.000 Yeah, Emanuel, thanks. This is great stuff. I think, just to chime in on, I think, the forking question that was asked, for this Avro parser you would need to fork Telegraf but for other plugins in general you would be able to just use, everything that’s already existing in a latest release, you would just need to use the latest [inaudible] Telegraf. But yeah, and just to mention, too, once Avro parser gets in as a PR, we’ll definitely be able to take a look at it and get it in quickly. Definitely looks like really cool and useful stuff, so just wanted to add that little bit.
Emanuele Falzone: 01:00:35.000 Thank you.
Samantha Wang: 01:00:35.000 No problem.
Caitlin Croft: 01:00:37.000 Thank you, Samantha. Looks like we have another question here. Just to clarify, the other Sink Connectors also run in Telegraf, too, right? They just expect the parser data in well - known ways, right?
Emanuele Falzone: 01:00:52.000 Yes, yes. The truth is that - I should go back to one of the previous slides - okay, so take this one. The truth is that when you define an input plugin, the input plugin can also directly convert the data from the format that it reads into Telegraf metric. So you can also configure Telegraf to skip the parser plugin. And this is really interesting if you have data in a really custom format, and you don’t want to use a parser, but you simply want to build an input plugin that does the conversion into Telegraf metric. But anyway, the way in which parser plugins work is the same. You have that interface, so you have to implement the parse and parse line methods, and all the parsers are built in that way.
[silence]
Emanuele Falzone: 01:02:41.000 Okay. Any other questions?
Caitlin Croft: 01:02:48.000 It doesn’t look like it. I’m just throwing in a bunch of links for everyone. So we have Influx days coming up next week so feel free to register for it. It’s completely free. We have our community Slack channel, where you can answer questions and ask them, and there’s InfluxDB experts from the community as well as from InfluxData. And we also of course have our community office hours tomorrow, and I’d love to see you all there as well. I also know that sometimes after these webinars, sometimes you think of questions that you would’ve liked to have asked Emanuel and you just forgot. You all should have my email address, so feel free to email me and I’m happy to pass along those questions to Emanuel. And once again, thank you everyone for joining. This webinar has been recorded and it will be available for replay later today. Thank you!
Emanuele Falzone: 01:03:53.000 Thank you.
Caitlin Croft: 01:03:56.000 Bye.
[/et_pb_toggle]
Emanuele Falzone
Ph.D. Student at Politecnico di Milano
Emanuele Falzone is a Ph.D. student at Politecnico di Milano, at Department of Electronics, Computer and Bioengineering, under the supervision of Prof. Emanuele Della Valle, since November 2019. His research interest is mainly Stream Processing. He received his M.Sc. degree in Computer Science from Politecnico di Milano in December 2018. He graduated in B.Sc. in Computer Engineering from Politecnico di Milano in September 2016.