Kinesis and MariaDB Integration

Powerful performance with an easy integration, powered by Telegraf, the open source data connector built by InfluxData.

info

This is not the recommended configuration for real-time query at scale. For query and compression optimization, high-speed ingest, and high availability, you may want to consider Kinesis and InfluxDB.

5B+

Telegraf downloads

#1

Time series database
Source: DB Engines

1B+

Downloads of InfluxDB

2,800+

Contributors

Table of Contents

Powerful Performance, Limitless Scale

Collect, organize, and act on massive volumes of high-velocity data. Any data is more valuable when you think of it as time series data. with InfluxDB, the #1 time series platform built to scale with Telegraf.

See Ways to Get Started

Input and output integration overview

The Kinesis plugin enables you to read from Kinesis data streams, supporting various data formats and configurations.

This plugin writes metrics from Telegraf directly into MariaDB using parameterized SQL INSERT statements, offering a flexible way to store metrics in structured, relational tables.

Integration details

Kinesis

The Kinesis Telegraf plugin is designed to read from Amazon Kinesis data streams, enabling users to gather metrics in real-time. As a service input plugin, it operates by listening for incoming data rather than polling at regular intervals. The configuration specifies various options including the AWS region, stream name, authentication credentials, and data formats. It supports tracking of undelivered messages to prevent data loss, and users can utilize DynamoDB for maintaining checkpoints of the last processed records. This plugin is particularly useful for applications requiring reliable and scalable stream processing alongside other monitoring needs.

MariaDB

The SQL output plugin in Telegraf enables direct writing of metrics into SQL-compatible databases like MariaDB by executing parameterized SQL statements. With support for the MySQL driver, the plugin seamlessly integrates with MariaDB for reliable, structured metric storage. This setup is ideal for users who prefer SQL-based analytics or want to store metrics alongside business data for unified querying. MariaDB is a community-developed, enterprise-grade fork of MySQL that emphasizes performance, security, and openness. The plugin supports inserting time series metrics into custom schemas, enabling flexible analytics and integrations with BI tools like Metabase or Grafana using SQL connectors.

Configuration

Kinesis


# Configuration for the AWS Kinesis input.
[[inputs.kinesis_consumer]]
  ## Amazon REGION of kinesis endpoint.
  region = "ap-southeast-2"

  ## Amazon Credentials
  ## Credentials are loaded in the following order
  ## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
  ## 2) Assumed credentials via STS if role_arn is specified
  ## 3) explicit credentials from 'access_key' and 'secret_key'
  ## 4) shared profile from 'profile'
  ## 5) environment variables
  ## 6) shared credentials file
  ## 7) EC2 Instance Profile
  # access_key = ""
  # secret_key = ""
  # token = ""
  # role_arn = ""
  # web_identity_token_file = ""
  # role_session_name = ""
  # profile = ""
  # shared_credential_file = ""

  ## Endpoint to make request against, the correct endpoint is automatically
  ## determined and this option should only be set if you wish to override the
  ## default.
  ##   ex: endpoint_url = "http://localhost:8000"
  # endpoint_url = ""

  ## Kinesis StreamName must exist prior to starting telegraf.
  streamname = "StreamName"

  ## Shard iterator type (only 'TRIM_HORIZON' and 'LATEST' currently supported)
  # shard_iterator_type = "TRIM_HORIZON"

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

  ##
  ## The content encoding of the data from kinesis
  ## If you are processing a cloudwatch logs kinesis stream then set this to "gzip"
  ## as AWS compresses cloudwatch log data before it is sent to kinesis (aws
  ## also base64 encodes the zip byte data before pushing to the stream.  The base64 decoding
  ## is done automatically by the golang sdk, as data is read from kinesis)
  ##
  # content_encoding = "identity"

  ## Optional
  ## Configuration for a dynamodb checkpoint
  [inputs.kinesis_consumer.checkpoint_dynamodb]
    ## unique name for this consumer
    app_name = "default"
    table_name = "default"

MariaDB

[[outputs.sql]]
  ## Database driver
  ## Valid options: mssql (Microsoft SQL Server), mysql (MySQL), pgx (Postgres),
  ##  sqlite (SQLite3), snowflake (snowflake.com) clickhouse (ClickHouse)
  driver = "mysql"

  ## Data source name
  ## The format of the data source name is different for each database driver.
  ## See the plugin readme for details.
  data_source_name = "username:password@tcp(host:port)/dbname"

  ## Timestamp column name
  timestamp_column = "timestamp"

  ## Table creation template
  ## Available template variables:
  ##  {TABLE} - table name as a quoted identifier
  ##  {TABLELITERAL} - table name as a quoted string literal
  ##  {COLUMNS} - column definitions (list of quoted identifiers and types)
  table_template = "CREATE TABLE {TABLE}({COLUMNS})"

  ## SQL INSERT statement with placeholders. Telegraf will substitute values at runtime.
  ## table_template = "INSERT INTO metrics (timestamp, name, value, tags) VALUES (?, ?, ?, ?)"

  ## Table existence check template
  ## Available template variables:
  ##  {TABLE} - tablename as a quoted identifier
  table_exists_template = "SELECT 1 FROM {TABLE} LIMIT 1"

  ## Initialization SQL
  init_sql = "SET sql_mode='ANSI_QUOTES';"

  ## Maximum amount of time a connection may be idle. "0s" means connections are
  ## never closed due to idle time.
  connection_max_idle_time = "0s"

  ## Maximum amount of time a connection may be reused. "0s" means connections
  ## are never closed due to age.
  connection_max_lifetime = "0s"

  ## Maximum number of connections in the idle connection pool. 0 means unlimited.
  connection_max_idle = 2

  ## Maximum number of open connections to the database. 0 means unlimited.
  connection_max_open = 0

  ## NOTE: Due to the way TOML is parsed, tables must be at the END of the
  ## plugin definition, otherwise additional config options are read as part of the
  ## table

  ## Metric type to SQL type conversion
  ## The values on the left are the data types Telegraf has and the values on
  ## the right are the data types Telegraf will use when sending to a database.
  ##
  ## The database values used must be data types the destination database
  ## understands. It is up to the user to ensure that the selected data type is
  ## available in the database they are using. Refer to your database
  ## documentation for what data types are available and supported.
  #[outputs.sql.convert]
  #  integer              = "INT"
  #  real                 = "DOUBLE"
  #  text                 = "TEXT"
  #  timestamp            = "TIMESTAMP"
  #  defaultvalue         = "TEXT"
  #  unsigned             = "UNSIGNED"
  #  bool                 = "BOOL"
  #  ## This setting controls the behavior of the unsigned value. By default the
  #  ## setting will take the integer value and append the unsigned value to it. The other
  #  ## option is "literal", which will use the actual value the user provides to
  #  ## the unsigned option. This is useful for a database like ClickHouse where
  #  ## the unsigned value should use a value like "uint64".
  #  # conversion_style = "unsigned_suffix"

Input and output integration examples

Kinesis

  1. Real-Time Data Processing with Kinesis: This use case involves integrating the Kinesis plugin with a monitoring dashboard to analyze incoming data metrics in real-time. For instance, an application could consume logs from multiple services and present them visually, allowing operations teams to quickly identify trends and react to anomalies as they occur.

  2. Serverless Log Aggregation: Utilize this plugin in a serverless architecture where Kinesis streams aggregate logs from various microservices. The plugin can create metrics that help detect issues in the system, automating alerting processes through third-party integrations, enabling teams to minimize downtime and improve reliability.

  3. Dynamic Scaling Based on Stream Metrics: Implement a solution where stream metrics consumed by the Kinesis plugin could be used to adjust resources dynamically. For example, if the number of records processed spikes, corresponding scale-up actions could be triggered to handle the increased load, ensuring optimal resource allocation and performance.

  4. Data Pipeline to S3 with Checkpointing: Create a robust data pipeline where Kinesis stream data is processed through the Telegraf Kinesis plugin, with checkpoints stored in DynamoDB. This approach can ensure data consistency and reliability, as it manages the state of processed data, enabling seamless integration with downstream data lakes or storage solutions.

MariaDB

  1. Business Intelligence Integration: Store application performance metrics directly into MariaDB and connect it to BI tools like Metabase or Apache Superset. This setup allows blending of operational data with business KPIs for unified dashboards, enhancing visibility across departments.

  2. Compliance Reporting with Historical Metrics: Use this plugin to log metrics into MariaDB for audit and compliance use cases. The relational model enables precise querying of past performance indicators with timestamped entries, supporting regulatory documentation.

  3. Custom Alerting Based on SQL Logic: Insert metrics into MariaDB and use custom SQL queries to define alert thresholds or conditions. Combined with cron jobs or scheduled scripts, this enables advanced alerting workflows not possible with traditional metric platforms.

  4. IoT Sensor Metrics Storage: Collect sensor data from IoT devices via Telegraf and store it in MariaDB using a normalized schema. This approach is cost-effective and integrates well with existing SQL-based systems for real-time or historical analysis.

Feedback

Thank you for being part of our community! If you have any general feedback or found any bugs on these pages, we welcome and encourage your input. Please submit your feedback in the InfluxDB community Slack.

Powerful Performance, Limitless Scale

Collect, organize, and act on massive volumes of high-velocity data. Any data is more valuable when you think of it as time series data. with InfluxDB, the #1 time series platform built to scale with Telegraf.

See Ways to Get Started

Related Integrations

HTTP and InfluxDB Integration

The HTTP plugin collects metrics from one or more HTTP(S) endpoints. It supports various authentication methods and configuration options for data formats.

View Integration

Kafka and InfluxDB Integration

This plugin reads messages from Kafka and allows the creation of metrics based on those messages. It supports various configurations including different Kafka settings and message processing options.

View Integration

Kinesis and InfluxDB Integration

The Kinesis plugin allows for reading metrics from AWS Kinesis streams. It supports multiple input data formats and offers checkpointing features with DynamoDB for reliable message processing.

View Integration