PID Controllers and InfluxDB: Part 2 - Digital Twin

Navigate to:

In a previous post, we described a CSTR and a PID controller. This post will cover the code and architecture of the digital twin from this project repo. The project leverages Kafka for data streaming, Faust for data processing, InfluxDB for storing the time series data, and Telegraf for writing data from the topic to InfluxDB. We’ll also cover the advantages and disadvantages of this stack. The corresponding repo for this blog post can be found here. Architecture drawing of the project described in this blog. A digital twin of a CSTR produces data that is collected by Kafka. Faust acts as a PID controller. Telegraf subscribes to a Kafka topic to send sensor and control data to InfluxDB. The reactor and cooling jacket are monitored by Grafana.

Digital twin logic architecture

Here we’ll focus on the login in the cstr_kafka_influxdb directory. This directory contains an example of how to model a CSTR using InfluxDB, Kafka, Telegraf, and Python. Two scripts cstr_model.py and pid_controller.py are responsible for simulating the CSTR and the PID controller respectively. The high-level architecture looks like this: The result is a CSTR Digital Twin that mimics executing a reaction under provided temperature setpoint conditions. We produce data that looks like this:

The result of the CSTR digital twin. The top graph shows the concentration of reactant A (Ca) over time, the middle graph shows the temperature of the reactor (T) and the set point (setpoint), and the bottom graph shows the temperature of the cooling jacket (u).

cstr_model.py explained

This script simulates a Continuous Stirred-Tank Reactor (CSTR) and processes incoming control signals from a PID controller. The script contains the following parts:

Imports and App Configuration:

import faust
import numpy as np
from scipy.integrate import odeint
app = faust.App(
    'cstr_model',
    broker='kafka://localhost:9092',
    store='memory://',
    value_serializer='json',
    web_port=6066
)
cstr_topic = app.topic('cstr')
pid_control_topic = app.topic('pid_control')

Two Kafka topics are defined: cstr (for sending CSTR state) and pid_control (for receiving control signals).

Initial Values and Constants:

# The process count and iterations is used to stop the number of times values are produced and consumed from the respective topics. 
process_count = 0
max_iterations = 300
# Tf is the temperature of the feed. This is assumed to be a constant. 
Tf = 350
# Caf is the concentration of the reactant A in the feed. This is assumed to be a constant. 
Caf = 1
# ts represents the time duration between points. Its assumed that all time is regular here. 
ts = [0,0.03333]  
# initial_Ca and initial T are the initial values of the concentration of reactant A and the temperature inside the reactor, respectively. 
initial_Ca = 0.87725294608097
initial_T = 324.475443431599

CSTR Model Function:

Defines the differential equations that model the CSTR.

def cstr_model_func(x, t, u, Tf, Caf):
    Ca, T = x
    q = 100
    V = 100
    rho = 1000
    Cp = 0.239
    mdelH = 5e4
    EoverR = 8750
    k0 = 7.2e10
    UA = 5e4
    rA = k0 * np.exp(-EoverR / T) * Ca
    dCadt = q / V * (Caf - Ca) - rA
    dTdt = q / V * (Tf - T) + mdelH / (rho * Cp) * rA + UA / V / rho / Cp * (u - T)
    return [dCadt, dTdt]`

Simulation Function:

This function will be used to consume the u value from the pid_control topic. The function also uses odeint to integrate the differential equations and simulate the CSTR over the next timestamp. These new_Ca and new_T values get sent to the cstr topic so that the PID controller can evaluate the next u value.

def simulate_cstr(Ca, T, ts, u, Tf, Caf):
    x0 = [Ca, T]
    y = odeint(cstr_model_func, x0, ts, args=(u, Tf, Caf))
    new_Ca = y[-1][0]
    new_T = y[-1][1]
    return new_Ca, new_T

Defining Faust Agents:

Next we define the agents and functions that listen for values from the topics and send the new_Ca and new_T values as well as our initial values:

@app.agent(cstr_topic)
async def cstr(cstr):
    global process_count
    async for Ca_T_values in cstr:
        Ca = Ca_T_values.get('Ca')
        T = Ca_T_values.get('T')
        print(f"[model] cstr func")
        print(f"[model] Received Ca cstr: {Ca}, T: {T}")

@app.agent(pid_control_topic)
async def consume_u(events):
    global process_count
    print("[model] Starting PID control loop")
    initial_values = {
        'Ca': initial_Ca,
        'T': initial_T`
    }`
    print(f"[model] Sending initial values: Ca: {initial_Ca}, T: {initial_T}")
    await cstr_topic.send(value=initial_values)

Finally, we process incoming events from the pid_control Kafka topic to get the new value for u (temperature of the cooling jacket), solve the ODE’s to get the next Ca and T values, and send the results to the cstr Kafka topic.

  async for event in events:
        print(f"[model] Received event: {event}")
        u = event.get('u')
        Ca = event.get('Ca')
        T = event.get('T')
        print(f"[model] Into simulate_cstr u: {u}, Into simulate_cstr Ca: {Ca}, Into simulate_cstr T: {T}")
        if u is not None and Ca is not None and T is not None:
            new_Ca, new_T = simulate_cstr(Ca, T, ts, u, Tf, Caf)
            new_values = {
                'Ca': new_Ca,
                'T': new_T,
            }
            print(f"[model] consume sent")
            print(f"[model] Received u: {u}, Computed new Ca: {new_Ca}, new T: {new_T}")
            await cstr_topic.send(value=new_values)

pid_controller.py explained

From a Kafka perspective, this script largely works the same as the cstr_model.py script, except that it listens for values from the cstr topic and calculates the u value which it sends to the pid_control topic.

The only real difference is the logic for the PID controller which is provided by this function:

def pid_control(T_ss, u_ss, ts, Tf, Caf, Ca, T, sp, ie_previous):
    """Compute the u value based on PID control."""
    delta_t = ts[1] - ts[0]
    e = sp - T
    if process_count >= 1:
        ie = ie_previous + e * delta_t
    else:
        ie = 0.0
    P = Kc * e
    I = Kc / tauI * ie
    print(f"[pid] delta_t: {delta_t}, e: {e}, ie_previous: {ie_previous}, ie: {ie}")  # Debugging print
    op = u_ss + P + I
    # Upper and Lower limits on OP
    op_hi = 350.0
    op_lo = 250.0
    if op > op_hi:
        op = op_hi
        ie = ie - e * delta_t
    if op < op_lo:
        op = op_lo
        ie = ie - e * delta_t
    u = op
    return u, ie

This function returns the cooling jacket temperature as well as the integral of error term. The IE is a key component of a PID controller. It is part of the integral control action in the PID algorithm, which helps eliminate steady-state error and improve system accuracy over time.

Writing the data to InfluxDB v3 with Telegraf

The Kafka Consumer Input Plugin is used to get data from the correct topics and send it to InfluxDB. The configuration file looks like this:

TOML or bash
[agent]
  interval = "10s"
  round_interval = true
  debug = true
[[inputs.kafka_consumer]]
  brokers = ["kafka:9092"]
  topics = ["pid_control"]
  data_format = "json_v2"
  [[inputs.kafka_consumer.json_v2]]
    measurement_name = "CSTR_data"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "Ca"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "T"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "u"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
      path = "setpoint"
      type = "float"
    [[inputs.kafka_consumer.json_v2.field]]
    path = "ie"
    type = "float"
[[outputs.file]]
  files = ["stdout"]
[[outputs.influxdb_v2]]
  urls = ["https://us-east-1-1.aws.cloud2.influxdata.com/"]
  token = "xxx"
  organization = "xxx"
  bucket = "CSTR"`

Essentially, we’re just subscribing to the pid_control topic and gathering all of the calculated values written there.

Monitoring a chemical reactor with Grafana

Now that our Digital Twin is writing reactor temperature, concentration, cooling jacket temperature, and setpoint data to InfluxDB, we can use Grafana to build a dashboard and alerts to monitor our CSTR. You can use the Grafana InfluxDB v3 Data Source to make visualizations like this:

A dashboard of the CSTR and cooling jacket data in Grafana.

The bottom right graph of the Set Point and Temperature of the Reactor shows that the cooling jacket and controller successfully help the reactor temperature stay close and reach the setpoints.

Why use Kafka?

In my current project, I’ve chosen Kafka as the backbone for real-time data streaming and processing between the CSTR and PID controller. Despite only utilizing a single partition per topic (cstr and pid_control), Kafka offers a highly reliable and scalable architecture that can grow with the project’s needs. Kafka’s robust message broker capabilities ensure that real-time data flows seamlessly from the CSTR to the PID controller, facilitating continuous monitoring and adjustment of the reactor conditions.

One of the primary reasons for selecting Kafka is its fault tolerance and durability, which guarantees that no data is lost during the streaming process. This reliability is crucial for maintaining optimal reactor conditions and ensuring the integrity of the process. Additionally, Kafka’s ability to handle high-throughput, low-latency data streams makes it ideal for real-time applications like this, where timely data processing is essential.

Looking ahead, if the project needs to scale, Kafka’s architecture allows for easy partitioning of topics. By increasing the number of partitions, we can distribute the data load across multiple consumers, enhancing the system’s throughput and fault tolerance. This scalability means that as the complexity of the process grows—perhaps by adding more CSTRs or PID controllers—the Kafka-based infrastructure can seamlessly adapt to handle the increased data volume and processing demands.

Pros and cons of Faust

Faust is a stream processing library that offers significant advantages for developers working with real-time data. I chose to use it because its Pythonic API makes it particularly appealing. I didn’t have to use Java or Scala, which are typically used with Kafka Streams. Additionally, Faust’s integration with Kafka enables the creation of robust, scalable, and fault-tolerant data pipelines that can handle high-throughput data streams efficiently. Faust also supports asynchronous programming, which enhances its capability to process data concurrently, making it suitable for applications requiring real-time analytics, monitoring, and event-driven processing. Faust focuses on simplicity and ease of use, which enables rapid development and deployment of streaming applications.

However, Faust also has some limitations. One of the main drawbacks is that Faust is less efficient than its Java or Scala counterparts, such as Kafka Streams, in environments requiring high concurrency. Additionally, Faust is entirely community supported, resulting in less extensive documentation, fewer community resources, and potentially slower development of new features and bug fixes.

Considering other data processing tools

While I chose Faust because it is easy to use, several other well-known data processing tools, including Spark, Kafka Streams, Bytewax, and Quix.io, are also worth considering.

  • Kafka Streams, written in Java and Scala, is highly optimized for performance and offers tight integration with Kafka, making it a robust choice for enterprise-level streaming applications but at the cost of requiring expertise in Java/Scala.
  • Apache Spark, although primarily known for batch processing, also supports stream processing through Structured Streaming and provides a powerful, unified engine that can handle both batch and stream processing with high performance and scalability.
  • Bytewax, similar to Faust, is designed for Python developers and leverages timely dataflow for real-time analytics, offering a more modern and Python-native approach to stream processing.
  • Quix.io stands out with its focus on high-frequency data streams, providing a comprehensive platform that includes data ingestion, processing, and visualization tools. It also contains quick start templates and plugins for easy integration with InfluxDB.

Each tool has its own strengths and trade-offs, so the choice is dependent on specific project requirements, existing infrastructure, and the team’s expertise.

Run the example

To run the example yourself, follow these steps:

  1. Create an InfluxDB Cloud v3 account here
  2. Create a bucket called “CSTR”
  3. Create a token
  4. Edit the telegraf.conf to include your InfluxDB credentials
  5. Run docker-compose up -d

Now, you should be able to visualize your CSTR data in InfluxDB. In the screenshot below, we use SQL to query for your Ca and T data.

Future

This digital twin models a perfect CSTR and makes a lot of assumptions to do so, including:

  • Perfect mixing: The contents of the reactor are perfectly mixed, meaning that the concentration and temperature are uniform throughout the reactor.
  • Constant volume of the reactor.
  • Steady-state flow rates: The feed and concentration rates are constant.
  • First-order chemical reaction kinetics.
  • Instantaneous heat transfer between the reactor and the cooling jacket.
  • Plus, many more conditions I won’t list.

The implications here allow us to create ODEs that we can actually solve for. Unfortunately, in the real world, these assumptions are not always valid. Specifically, The simulate_cstr function in cstr_model.py uses the received control input u and the current values of Ca and T to predict the next values. Then, the function odeint integrates the differential equations over a small time step to compute the new concentration and temperature. In a real-world example, we might have the luxury of applying those simplifying assumptions and easily predicting the next value with a series of ODEs. In that use case, other forecasting models can be used to predict the next value.

In an upcoming blog post, I’ll highlight how to leverage some forecasting models to predict Ca and T values and incorporate those forecasts to create a better model for the CSTR and PID controller.

Conclusion

This blog post describes how we make a digital twin of a CSTR using Kafka and Faust as a PID controller while storing temperature data in InfluxDB. We use Telegraf to get data from the Kafka topic and Grafana to visualize the results. As always, get started with InfluxDB v3 Cloud here. In the next post, we’ll cover how to run the project, dive into the architecture and logic, and discuss some of the pros and cons of the selected stack. If you need help, please contact us on our community site or Slack channel. If you are also working on a data processing project with InfluxDB, I’d love to hear from you!