AMQP and OpenSearch Integration
Powerful performance with an easy integration, powered by Telegraf, the open source data connector built by InfluxData.
5B+
Telegraf downloads
#1
Time series database
Source: DB Engines
1B+
Downloads of InfluxDB
2,800+
Contributors
Table of Contents
Powerful Performance, Limitless Scale
Collect, organize, and act on massive volumes of high-velocity data. Any data is more valuable when you think of it as time series data. with InfluxDB, the #1 time series platform built to scale with Telegraf.
See Ways to Get Started
Input and output integration overview
The AMQP Consumer Input Plugin allows you to ingest data from an AMQP 0-9-1 compliant message broker, such as RabbitMQ, enabling seamless data collection for monitoring and analytics purposes.
The OpenSearch Output Plugin allows users to send metrics directly to an OpenSearch instance using HTTP, thus facilitating effective data management and analytics within the OpenSearch ecosystem.
Integration details
AMQP
This plugin provides a consumer for use with AMQP 0-9-1, a prominent implementation of which is RabbitMQ. AMQP, or Advanced Message Queuing Protocol, was originally developed to enable reliable, interoperable messaging between diverse systems in a network. The plugin reads metrics from a topic exchange using a configured queue and binding key, delivering a flexible and efficient means of collecting data from AMQP-compliant messaging systems. This enables users to leverage existing RabbitMQ implementations to monitor their applications effectively by capturing detailed metrics for analysis and alerting.
OpenSearch
The OpenSearch Telegraf Plugin integrates with the OpenSearch database via HTTP, allowing for the streamlined collection and storage of metrics. As a powerful tool designed specifically for OpenSearch releases from 2.x, the plugin provides robust features while offering compatibility with 1.x through the original Elasticsearch plugin. This plugin facilitates the creation and management of indexes in OpenSearch, automatically managing templates and ensuring that data is structured efficiently for analysis. The plugin supports various configuration options such as index names, authentication, health checks, and value handling, allowing it to be tailored to diverse operational requirements. Its capabilities make it essential for organizations looking to harness the power of OpenSearch for metrics storage and querying.
Configuration
AMQP
[[inputs.amqp_consumer]]
## Brokers to consume from. If multiple brokers are specified a random broker
## will be selected anytime a connection is established. This can be
## helpful for load balancing when not using a dedicated load balancer.
brokers = ["amqp://localhost:5672/influxdb"]
## Authentication credentials for the PLAIN auth_method.
# username = ""
# password = ""
## Name of the exchange to declare. If unset, no exchange will be declared.
exchange = "telegraf"
## Exchange type; common types are "direct", "fanout", "topic", "header", "x-consistent-hash".
# exchange_type = "topic"
## If true, exchange will be passively declared.
# exchange_passive = false
## Exchange durability can be either "transient" or "durable".
# exchange_durability = "durable"
## Additional exchange arguments.
# exchange_arguments = { }
# exchange_arguments = {"hash_property" = "timestamp"}
## AMQP queue name.
queue = "telegraf"
## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"
## If true, queue will be passively declared.
# queue_passive = false
## Additional arguments when consuming from Queue
# queue_consume_arguments = { }
# queue_consume_arguments = {"x-stream-offset" = "first"}
## A binding between the exchange and queue using this binding key is
## created. If unset, no binding is created.
binding_key = "#"
## Maximum number of messages server should give to the worker.
# prefetch_count = 50
## Max undelivered messages
## This plugin uses tracking metrics, which ensure messages are read to
## outputs before acknowledging them to the original broker to ensure data
## is not lost. This option sets the maximum messages to read from the
## broker that have not been written by an output.
##
## This value needs to be picked with awareness of the agent's
## metric_batch_size value as well. Setting max undelivered messages too high
## can result in a constant stream of data batches to the output. While
## setting it too low may never flush the broker's messages.
# max_undelivered_messages = 1000
## Timeout for establishing the connection to a broker
# timeout = "30s"
## Auth method. PLAIN and EXTERNAL are supported
## Using EXTERNAL requires enabling the rabbitmq_auth_mechanism_ssl plugin as
## described here: https://www.rabbitmq.com/plugins.html
# auth_method = "PLAIN"
## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Content encoding for message payloads, can be set to
## "gzip", "identity" or "auto"
## - Use "gzip" to decode gzip
## - Use "identity" to apply no encoding
## - Use "auto" determine the encoding using the ContentEncoding header
# content_encoding = "identity"
## Maximum size of decoded message.
## Acceptable units are B, KiB, KB, MiB, MB...
## Without quotes and units, interpreted as size in bytes.
# max_decompression_size = "500MB"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
OpenSearch
[[outputs.opensearch]]
## URLs
## The full HTTP endpoint URL for your OpenSearch instance. Multiple URLs can
## be specified as part of the same cluster, but only one URLs is used to
## write during each interval.
urls = ["http://node1.os.example.com:9200"]
## Index Name
## Target index name for metrics (OpenSearch will create if it not exists).
## This is a Golang template (see https://pkg.go.dev/text/template)
## You can also specify
## metric name (`{{.Name}}`), tag value (`{{.Tag "tag_name"}}`), field value (`{{.Field "field_name"}}`)
## If the tag does not exist, the default tag value will be empty string "".
## the timestamp (`{{.Time.Format "xxxxxxxxx"}}`).
## For example: "telegraf-{{.Time.Format \"2006-01-02\"}}-{{.Tag \"host\"}}" would set it to telegraf-2023-07-27-HostName
index_name = ""
## Timeout
## OpenSearch client timeout
# timeout = "5s"
## Sniffer
## Set to true to ask OpenSearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option
# enable_sniffer = false
## GZIP Compression
## Set to true to enable gzip compression
# enable_gzip = false
## Health Check Interval
## Set the interval to check if the OpenSearch nodes are available
## Setting to "0s" will disable the health check (not recommended in production)
# health_check_interval = "10s"
## Set the timeout for periodic health checks.
# health_check_timeout = "1s"
## HTTP basic authentication details.
# username = ""
# password = ""
## HTTP bearer token authentication details
# auth_bearer_token = ""
## Optional TLS Config
## Set to true/false to enforce TLS being enabled/disabled. If not set,
## enable TLS only if any of the other options are specified.
# tls_enable =
## Trusted root certificates for server
# tls_ca = "/path/to/cafile"
## Used for TLS client certificate authentication
# tls_cert = "/path/to/certfile"
## Used for TLS client certificate authentication
# tls_key = "/path/to/keyfile"
## Send the specified TLS server name via SNI
# tls_server_name = "kubernetes.example.com"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false
## Template Config
## Manage templates
## Set to true if you want telegraf to manage its index template.
## If enabled it will create a recommended index template for telegraf indexes
# manage_template = true
## Template Name
## The template name used for telegraf indexes
# template_name = "telegraf"
## Overwrite Templates
## Set to true if you want telegraf to overwrite an existing template
# overwrite_template = false
## Document ID
## If set to true a unique ID hash will be sent as
## sha256(concat(timestamp,measurement,series-hash)) string. It will enable
## data resend and update metric points avoiding duplicated metrics with
## different id's
# force_document_id = false
## Value Handling
## Specifies the handling of NaN and Inf values.
## This option can have the following values:
## none -- do not modify field-values (default); will produce an error
## if NaNs or infs are encountered
## drop -- drop fields containing NaNs or infs
## replace -- replace with the value in "float_replacement_value" (default: 0.0)
## NaNs and inf will be replaced with the given number, -inf with the negative of that number
# float_handling = "none"
# float_replacement_value = 0.0
## Pipeline Config
## To use a ingest pipeline, set this to the name of the pipeline you want to use.
# use_pipeline = "my_pipeline"
## Pipeline Name
## Additionally, you can specify a tag name using the notation (`{{.Tag "tag_name"}}`)
## which will be used as the pipeline name (e.g. "{{.Tag \"os_pipeline\"}}").
## If the tag does not exist, the default pipeline will be used as the pipeline.
## If no default pipeline is set, no pipeline is used for the metric.
# default_pipeline = ""
Input and output integration examples
AMQP
-
Integrating Application Metrics with AMQP: Use the AMQP Consumer plugin to gather application metrics that are published to a RabbitMQ exchange. By configuring the plugin to listen to specific queues, teams can gain insights into application performance, track request rates, error counts, and latency metrics, all in real-time. This setup not only aids in anomaly detection but also provides valuable data for capacity planning and system optimization.
-
Event-Driven Monitoring: Configure the AMQP Consumer to trigger specific monitoring events whenever certain conditions are met within an application. For instance, if a message indicating a high error rate is received, the plugin can feed this data into monitoring tools, generating alerts or scaling events. This integration can improve responsiveness to issues and automate parts of the operations workflow.
-
Cross-Platform Data Aggregation: Leverage the AMQP Consumer plugin to consolidate metrics from various applications distributed across different platforms. By utilizing RabbitMQ as a centralized message broker, organizations can unify their monitoring data, allowing for comprehensive analysis and dashboarding through Telegraf, thus maintaining visibility across heterogeneous environments.
-
Real-Time Log Processing: Extend the use of the AMQP Consumer to capture log data sent to a RabbitMQ exchange, processing logs in real time for monitoring and alerting purposes. This application ensures that operational issues are detected and addressed swiftly by analyzing log patterns, trends, and anomalies as they occur.
OpenSearch
-
Dynamic Indexing for Time-Series Data: Utilize the OpenSearch Telegraf plugin to dynamically create indexes for time-series metrics, ensuring that data is stored in an organized manner conducive to time-based queries. By defining index patterns using Go templates, users can leverage the plugin to create daily or monthly indexes, which can greatly simplify data management and retrieval over time, thus enhancing analytical performance.
-
Centralized Logging for Multi-Tenant Applications: Implement the OpenSearch plugin in a multi-tenant application where each tenant’s logs are sent to separate indexes. This enables targeted analysis and monitoring for each tenant while maintaining data isolation. By utilizing the index name templating feature, users can automatically create tenant-specific indexes, which not only streamlines the process but also enhances security and accessibility for tenant data.
-
Integration with Machine Learning for Anomaly Detection: Leverage the OpenSearch plugin alongside machine learning tools to automatically detect anomalies in metrics data. By configuring the plugin to send real-time metrics to OpenSearch, users can apply machine learning models on the incoming data streams to identify outliers or unusual patterns, facilitating proactive monitoring and swift remedial actions.
-
Enhanced Monitoring Dashboards with OpenSearch: Use the metrics collected from OpenSearch to create real-time dashboards that provide insights into system performance. By feeding metrics into OpenSearch, organizations can utilize OpenSearch Dashboards to visualize key performance indicators, allowing operations teams to quickly assess health and performance, and making data-driven decisions.
Feedback
Thank you for being part of our community! If you have any general feedback or found any bugs on these pages, we welcome and encourage your input. Please submit your feedback in the InfluxDB community Slack.
Powerful Performance, Limitless Scale
Collect, organize, and act on massive volumes of high-velocity data. Any data is more valuable when you think of it as time series data. with InfluxDB, the #1 time series platform built to scale with Telegraf.
See Ways to Get Started
Related Integrations
Related Integrations
HTTP and InfluxDB Integration
The HTTP plugin collects metrics from one or more HTTP(S) endpoints. It supports various authentication methods and configuration options for data formats.
View IntegrationKafka and InfluxDB Integration
This plugin reads messages from Kafka and allows the creation of metrics based on those messages. It supports various configurations including different Kafka settings and message processing options.
View IntegrationKinesis and InfluxDB Integration
The Kinesis plugin allows for reading metrics from AWS Kinesis streams. It supports multiple input data formats and offers checkpointing features with DynamoDB for reliable message processing.
View Integration