NSQ and OpenSearch Integration

Powerful performance with an easy integration, powered by Telegraf, the open source data connector built by InfluxData.

info

This is not the recommended configuration for real-time query at scale. For query and compression optimization, high-speed ingest, and high availability, you may want to consider NSQ and InfluxDB.

5B+

Telegraf downloads

#1

Time series database
Source: DB Engines

1B+

Downloads of InfluxDB

2,800+

Contributors

Table of Contents

Powerful Performance, Limitless Scale

Collect, organize, and act on massive volumes of high-velocity data. Any data is more valuable when you think of it as time series data. with InfluxDB, the #1 time series platform built to scale with Telegraf.

See Ways to Get Started

Input and output integration overview

The NSQ Telegraf plugin reads metrics from the NSQD messaging system, allowing for real-time data processing and monitoring.

The OpenSearch Output Plugin allows users to send metrics directly to an OpenSearch instance using HTTP, thus facilitating effective data management and analytics within the OpenSearch ecosystem.

Integration details

NSQ

The NSQ plugin interfaces with NSQ, a real-time messaging platform, enabling the reading of messages from NSQD. This plugin is categorized as a service plugin, meaning it actively listens for metrics and events rather than polling them at regular intervals. With an emphasis on reliability, it prevents data loss by tracking undelivered messages until they are acknowledged by outputs. The plugin allows for configurations such as specifying NSQLookupd endpoints, topics, and channels, and it supports multiple data formats for flexibility in data handling.

OpenSearch

The OpenSearch Telegraf Plugin integrates with the OpenSearch database via HTTP, allowing for the streamlined collection and storage of metrics. As a powerful tool designed specifically for OpenSearch releases from 2.x, the plugin provides robust features while offering compatibility with 1.x through the original Elasticsearch plugin. This plugin facilitates the creation and management of indexes in OpenSearch, automatically managing templates and ensuring that data is structured efficiently for analysis. The plugin supports various configuration options such as index names, authentication, health checks, and value handling, allowing it to be tailored to diverse operational requirements. Its capabilities make it essential for organizations looking to harness the power of OpenSearch for metrics storage and querying.

Configuration

NSQ

# Read metrics from NSQD topic(s)
[[inputs.nsq_consumer]]
  ## Server option still works but is deprecated, we just prepend it to the nsqd array.
  # server = "localhost:4150"

  ## An array representing the NSQD TCP HTTP Endpoints
  nsqd = ["localhost:4150"]

  ## An array representing the NSQLookupd HTTP Endpoints
  nsqlookupd = ["localhost:4161"]
  topic = "telegraf"
  channel = "consumer"
  max_in_flight = 100

  ## Max undelivered messages
  ## This plugin uses tracking metrics, which ensure messages are read to
  ## outputs before acknowledging them to the original broker to ensure data
  ## is not lost. This option sets the maximum messages to read from the
  ## broker that have not been written by an output.
  ##
  ## This value needs to be picked with awareness of the agent's
  ## metric_batch_size value as well. Setting max undelivered messages too high
  ## can result in a constant stream of data batches to the output. While
  ## setting it too low may never flush the broker's messages.
  # max_undelivered_messages = 1000

  ## Data format to consume.
  ## Each data format has its own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

OpenSearch

[[outputs.opensearch]]
  ## URLs
  ## The full HTTP endpoint URL for your OpenSearch instance. Multiple URLs can
  ## be specified as part of the same cluster, but only one URLs is used to
  ## write during each interval.
  urls = ["http://node1.os.example.com:9200"]

  ## Index Name
  ## Target index name for metrics (OpenSearch will create if it not exists).
  ## This is a Golang template (see https://pkg.go.dev/text/template)
  ## You can also specify
  ## metric name (`{{.Name}}`), tag value (`{{.Tag "tag_name"}}`), field value (`{{.Field "field_name"}}`)
  ## If the tag does not exist, the default tag value will be empty string "".
  ## the timestamp (`{{.Time.Format "xxxxxxxxx"}}`).
  ## For example: "telegraf-{{.Time.Format \"2006-01-02\"}}-{{.Tag \"host\"}}" would set it to telegraf-2023-07-27-HostName
  index_name = ""

  ## Timeout
  ## OpenSearch client timeout
  # timeout = "5s"

  ## Sniffer
  ## Set to true to ask OpenSearch a list of all cluster nodes,
  ## thus it is not necessary to list all nodes in the urls config option
  # enable_sniffer = false

  ## GZIP Compression
  ## Set to true to enable gzip compression
  # enable_gzip = false

  ## Health Check Interval
  ## Set the interval to check if the OpenSearch nodes are available
  ## Setting to "0s" will disable the health check (not recommended in production)
  # health_check_interval = "10s"

  ## Set the timeout for periodic health checks.
  # health_check_timeout = "1s"
  ## HTTP basic authentication details.
  # username = ""
  # password = ""
  ## HTTP bearer token authentication details
  # auth_bearer_token = ""

  ## Optional TLS Config
  ## Set to true/false to enforce TLS being enabled/disabled. If not set,
  ## enable TLS only if any of the other options are specified.
  # tls_enable =
  ## Trusted root certificates for server
  # tls_ca = "/path/to/cafile"
  ## Used for TLS client certificate authentication
  # tls_cert = "/path/to/certfile"
  ## Used for TLS client certificate authentication
  # tls_key = "/path/to/keyfile"
  ## Send the specified TLS server name via SNI
  # tls_server_name = "kubernetes.example.com"
  ## Use TLS but skip chain & host verification
  # insecure_skip_verify = false

  ## Template Config
  ## Manage templates
  ## Set to true if you want telegraf to manage its index template.
  ## If enabled it will create a recommended index template for telegraf indexes
  # manage_template = true

  ## Template Name
  ## The template name used for telegraf indexes
  # template_name = "telegraf"

  ## Overwrite Templates
  ## Set to true if you want telegraf to overwrite an existing template
  # overwrite_template = false

  ## Document ID
  ## If set to true a unique ID hash will be sent as
  ## sha256(concat(timestamp,measurement,series-hash)) string. It will enable
  ## data resend and update metric points avoiding duplicated metrics with
  ## different id's
  # force_document_id = false

  ## Value Handling
  ## Specifies the handling of NaN and Inf values.
  ## This option can have the following values:
  ##    none    -- do not modify field-values (default); will produce an error
  ##               if NaNs or infs are encountered
  ##    drop    -- drop fields containing NaNs or infs
  ##    replace -- replace with the value in "float_replacement_value" (default: 0.0)
  ##               NaNs and inf will be replaced with the given number, -inf with the negative of that number
  # float_handling = "none"
  # float_replacement_value = 0.0

  ## Pipeline Config
  ## To use a ingest pipeline, set this to the name of the pipeline you want to use.
  # use_pipeline = "my_pipeline"

  ## Pipeline Name
  ## Additionally, you can specify a tag name using the notation (`{{.Tag "tag_name"}}`)
  ## which will be used as the pipeline name (e.g. "{{.Tag \"os_pipeline\"}}").
  ## If the tag does not exist, the default pipeline will be used as the pipeline.
  ## If no default pipeline is set, no pipeline is used for the metric.
  # default_pipeline = ""

Input and output integration examples

NSQ

  1. Real-Time Analytics Dashboard: Integrate this plugin with a visualization tool to create a dashboard that displays real-time metrics from various topics in NSQ. By subscribing to specific topics, users can monitor system health and application performance dynamically, allowing for immediate insights and timely responses to any anomalies.

  2. Event-Driven Automation: Combine NSQ with a serverless architecture to trigger automated workflows based on incoming messages. This use case could involve processing data for machine learning models or responding to user actions in applications, thus streamlining operations and enhancing user experience through rapid processing.

  3. Multi-Service Communication Hub: Use the NSQ plugin to act as a centralized messaging hub among different microservices in a distributed architecture. By enabling services to communicate through NSQ, developers can ensure reliable message delivery while maintaining decoupled service interactions, significantly improving scalability and resilience.

  4. Metrics Aggregation for Enhanced Monitoring: Implement the NSQ plugin to aggregate metrics from multiple sources before sending them to an analytics tool. This setup enables businesses to consolidate data from various applications and services, creating a unified view for better decision-making and strategic planning.

OpenSearch

  1. Dynamic Indexing for Time-Series Data: Utilize the OpenSearch Telegraf plugin to dynamically create indexes for time-series metrics, ensuring that data is stored in an organized manner conducive to time-based queries. By defining index patterns using Go templates, users can leverage the plugin to create daily or monthly indexes, which can greatly simplify data management and retrieval over time, thus enhancing analytical performance.

  2. Centralized Logging for Multi-Tenant Applications: Implement the OpenSearch plugin in a multi-tenant application where each tenant’s logs are sent to separate indexes. This enables targeted analysis and monitoring for each tenant while maintaining data isolation. By utilizing the index name templating feature, users can automatically create tenant-specific indexes, which not only streamlines the process but also enhances security and accessibility for tenant data.

  3. Integration with Machine Learning for Anomaly Detection: Leverage the OpenSearch plugin alongside machine learning tools to automatically detect anomalies in metrics data. By configuring the plugin to send real-time metrics to OpenSearch, users can apply machine learning models on the incoming data streams to identify outliers or unusual patterns, facilitating proactive monitoring and swift remedial actions.

  4. Enhanced Monitoring Dashboards with OpenSearch: Use the metrics collected from OpenSearch to create real-time dashboards that provide insights into system performance. By feeding metrics into OpenSearch, organizations can utilize OpenSearch Dashboards to visualize key performance indicators, allowing operations teams to quickly assess health and performance, and making data-driven decisions.

Feedback

Thank you for being part of our community! If you have any general feedback or found any bugs on these pages, we welcome and encourage your input. Please submit your feedback in the InfluxDB community Slack.

Powerful Performance, Limitless Scale

Collect, organize, and act on massive volumes of high-velocity data. Any data is more valuable when you think of it as time series data. with InfluxDB, the #1 time series platform built to scale with Telegraf.

See Ways to Get Started

Related Integrations

HTTP and InfluxDB Integration

The HTTP plugin collects metrics from one or more HTTP(S) endpoints. It supports various authentication methods and configuration options for data formats.

View Integration

Kafka and InfluxDB Integration

This plugin reads messages from Kafka and allows the creation of metrics based on those messages. It supports various configurations including different Kafka settings and message processing options.

View Integration

Kinesis and InfluxDB Integration

The Kinesis plugin allows for reading metrics from AWS Kinesis streams. It supports multiple input data formats and offers checkpointing features with DynamoDB for reliable message processing.

View Integration