Tutorial: How to build an InfluxDB query using Flux?

Frederik Van Leeckwyck on , updated

An advanced use case for the new Flux query language

Process historians are used to gather high-resolution sensor data from industrial controllers like PLCs and SCADA systems. Once the data is collected, it is available for further processing and query building. This blog post describes the use of Flux, a new language for querying InfluxDB, the time series database behind Factry Historian.

It is recommended for advanced readers with profound technical knowledge.

Ever since it was first announced in June 2018, we have been eagerly following the development of the Flux query language. We started experimenting with Flux’s capabilities pretty early on and are now confident that it has matured enough for it to be really useful in a production or manufacturing context.

However, we think it can be difficult to translate the theorethical documentation into an approach that solves a real-life challenge.

In this blog post, we will take a stepwise approach to building the necessary queries in Flux-InfluxDB, while displaying the intermediate results. Our use case is situated in the manufacturing industry. We hope this tutorial can help the community to form a better idea of what is possible and how it can be achieved.

So, what about that challenge? Read on!

The case: how to query InfluxDB using Flux

Imagine you’re a manufacturer of sweet delicious candy. The marketing department has spotted a new opportunity for a new type of candy: a bubble gum core coated with fancy colors and wacky flavours. After launching a marketing campaign, demand has soared!

Now it’s up to you to satisfy that demand without buying more machines. Thanks a lot, marketing dep… but how are we going to do all that? Luckily, the bubble gum candy is actually a pretty simple product: it’s made of a core of bubble gum - 3 different tastes are possible - and a unique flavor coating. After flavoring, the bubble gum candy is immediately packaged on the box filler and ready for shipping.

Factory layout

Our factory layout looks like this: we own 3 bubble gum machines that are each built to make one type of bubble gum. The bubble gums they produce are then transported over a product router, so that each coater and box filler can draw bubble gum from a specific bubble gum machine. Think of all the options!

Process diagram of our bubble gum factory, showing bubble gum producers, product router, coaters and packaging machines. Icons made by [xnimrodx](https://www.flaticon.com/authors/xnimrodx), [freepik](https://www.flaticon.com/authors/freepik) and [Icongeek26](https://www.flaticon.com/authors/icongeek26) from https://www.flaticon.com.

The problem

To smoothen production speeds and run our production plant optimally, we need to constantly balance the output speed of our 3 bubble gum machines with the processing speed of our 11 flavoring and packaging stations, depending on which flavoring machine draws product from which bubble gum machine.

If we run our bubble gum machines too quickly, we will create an overload of bubble gum cores because our flavoring machines can’t process them quickly enough. Run the bubble gum machines too slowly however and we are underutilizing our capacity, resulting in frequent starts and stops of our flavoring machines, which is detrimental to productivity. This is what we call a mass balance.

The expected outcome

Keeping the mass balance between supply (the bubble gum machines) and demand (the flavoring machines and packaging stations) is key to keep our production manager happy. To help her, we will calculate the total kg / h that is being drawn by the flavoring machines from each bubble gum machine, respectively. She can then use this output to take appropriate action if needed, for example by changing the speed of the bubble gum machines. Flux to the rescue!

The dataset

To make this possible, we need the following data from Factry Historian:

  • Mass flow for all producers (3 in this case). These measurements are named BGX_FLOW_KG/H_ACT, with X = {1, 2, 3} and are stored as kilograms per hour (kg/h).
  • Mass flow for all consumers (11 in this case). This data is not directly available, but we can compute this by multiplying the amount of boxes per minute (in boxes per minute) with the mass of each box of bubble gum (in g). The data is available as machine setpoints. The measurements are named CYY_BM_SPEED_BPM_SP and CYY_BM_MW_WEIGHT_TARGET_G_SP (with YY = {01 - 11}) .
  • The routing map (3 * 11 = 33 in this case). These booleans indicate which consumer (coater and box filler) is drawing product from which producer (bubble gum maker). These are named CYY_ROUTING_BGX. For example, a measurement C03_ROUTING_BG1 with field value_num = 1.0 would mean that coater/packager 03 is drawing bubble gums from bubble gum maker 1.

This data is being gathered from the machine PLCs e.g. via OPC-UA, with InfluxDB as time-series database.

Writing the Flux queries

We typically write our Flux queries in Chronograf because it ties in pretty well with Flux’s capabilities. Once the queries are finished, we switch to Grafana to build the dashboard that is used in production.

For this example, we will focus on getting a view on which coaters/packagers are drawing product from the second bubble gum machine.

1. Obtaining the downstream consumers for a specific producer

We’ll start by obtaining a view on the downstream consumers i.e. the coaters/packagers that are drawing product from producer 2. To do so, we’ll retrieve the last 30 days of data and filter on the routing measurements. We’re only interested in the current routing, so our query becomes:

1
2
3
4
from(bucket: "historian")
  |> range(start:-30d)
  |> filter(fn: (r) => r._measurement =~ /C[0-9]+_ROUTING_BG2/ and r._field == "value_num" and r.status == "Good")
  |> last()

The resulting dataset looks like this:

|#datatype|string |long |dateTime:RFC3339             |dateTime:RFC3339             |dateTime:RFC3339        |double|string   |string         |string|
|---------|-------|-----|-----------------------------|-----------------------------|------------------------|------|---------|---------------|------|
|#group   |false  |false|true                         |true                         |false                   |false |true     |true           |true  |
|#default |_result|     |                             |                             |                        |      |         |               |      |
|         |result |table|_start                       |_stop                        |_time                   |_value|_field   |_measurement   |status|
|         |       |0    |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-14T14:27:55.38Z |0     |value_num|C01_ROUTING_BG2|Good  |
|         |       |1    |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-20T00:38:24.41Z |1     |value_num|C02_ROUTING_BG2|Good  |
|         |       |2    |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-19T18:45:40.008Z|0     |value_num|C03_ROUTING_BG2|Good  |
|         |       |3    |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-17T15:31:27.902Z|0     |value_num|C04_ROUTING_BG2|Good  |
|         |       |4    |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-17T15:14:44.817Z|0     |value_num|C05_ROUTING_BG2|Good  |
|         |       |5    |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-15T16:40:50.605Z|1     |value_num|C06_ROUTING_BG2|Good  |
|         |       |6    |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-17T17:34:16.932Z|1     |value_num|C07_ROUTING_BG2|Good  |
|         |       |7    |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-14T14:27:55.38Z |0     |value_num|C08_ROUTING_BG2|Good  |
|         |       |8    |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-14T14:27:55.38Z |0     |value_num|C09_ROUTING_BG2|Good  |
|         |       |9    |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-15T16:44:09.646Z|1     |value_num|C10_ROUTING_BG2|Good  |
|         |       |10   |2019-08-21T15:29:51.26712694Z|2019-09-20T15:29:51.26712694Z|2019-09-15T16:44:01.614Z|1     |value_num|C11_ROUTING_BG2|Good  |

We are only interested in getting the product flow from machines that are actively drawing product from the upstream producer. So, we add the next operation, namely a filter() statement for values that are 1. Because Flux only allows to compare values of the same type, we check for equality to 1.0.

Note the range starting at -30d. We can reasonably expect consumers to change which producer they draw product from at least once a month. This rather large range does not have a negative impact on the query speed because the data is very sparse as you can see from the _time column.

1
2
3
4
5
from(bucket: "historian")
  |> range(start:-30d)
  |> filter(fn: (r) => r._measurement =~ /C[0-9]+_ROUTING_BG2/ and r._field == "value_num" and r.status == "Good")
  |> last()
  |> filter(fn: (r) => r._value == 1.0)

And the result now looks like this:

|#datatype|string |long |dateTime:RFC3339              |dateTime:RFC3339              |dateTime:RFC3339        |double|string   |string         |string|
|---------|-------|-----|------------------------------|------------------------------|------------------------|------|---------|---------------|------|
|#group   |false  |false|false                         |false                         |false                   |false |false    |true           |false |
|#default |_result|     |                              |                              |                        |      |         |               |      |
|         |result |table|_start                        |_stop                         |_time                   |_value|_field   |_measurement   |status|
|         |       |0    |2019-08-21T15:39:39.040855914Z|2019-09-20T15:39:39.040855914Z|2019-09-20T00:38:24.41Z |1     |value_num|C02_ROUTING_BG2|Good  |
|         |       |1    |2019-08-21T15:39:39.040855914Z|2019-09-20T15:39:39.040855914Z|2019-09-15T16:40:50.605Z|1     |value_num|C06_ROUTING_BG2|Good  |
|         |       |2    |2019-08-21T15:39:39.040855914Z|2019-09-20T15:39:39.040855914Z|2019-09-17T17:34:16.932Z|1     |value_num|C07_ROUTING_BG2|Good  |
|         |       |3    |2019-08-21T15:39:39.040855914Z|2019-09-20T15:39:39.040855914Z|2019-09-15T16:44:09.646Z|1     |value_num|C10_ROUTING_BG2|Good  |
|         |       |4    |2019-08-21T15:39:39.040855914Z|2019-09-20T15:39:39.040855914Z|2019-09-15T16:44:01.614Z|1     |value_num|C11_ROUTING_BG2|Good  |

As you can see from the table above, we end up with the typical case in industrial settings where we have really sparse data. If you look at the _time column, you can see the timestamp for the _value_num bits being written to InfluxDB over the span of 5 days. This is different from much of the DevOps use cases we encounter regularly, such as memory and CPU usage being written e.g. every 10 sec.

Before we can proceed to request the throughput from the consumers, we need to prepare our data a little more. We will need to join our data later on, but as you can expect from the sparse data, we will not be joining on _time. We will join on the _measurement column or, more precisely, on multiple _measurement columns. Do achieve that, we will import the strings package and use its substring function to split the first 3 characters of the list of machines and append this to the other measurements. Our query now becomes:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import "strings"

from(bucket: "historian")
  |> range(start:-30d)
  |> filter(fn: (r) => r._measurement =~ /C[0-9]+_ROUTING_BG2/ and r._field == "value_num" and r.status == "Good")
  |> last()
  |> filter(fn: (r) => r._value == 1.0)
  |> map(fn: (r) => ({
    _measurement_targetgsp: strings.substring(v: r._measurement, start: 0, end: 3) + "_BM_MW_WEIGHT_TARGET_G_SP",
    _measurement_targetbpmsp: strings.substring(v: r._measurement, start: 0, end: 3) + "_BM_SPEED_BPM_SP",
    _value: r._value
  }))

Now, our output looks like this:

|#datatype|string |long |string                  |string                      |double|
|---------|-------|-----|------------------------|----------------------------|------|
|#group   |false  |false|false                   |true                        |false |
|#default |_result|     |                        |                            |      |
|         |result |table|_measurement_targetbpmsp|_measurement_targetgsp      |_value|
|         |       |0    |C02_BM_SPEED_BPM_SP     |C02_BM_MW_WEIGHT_TARGET_G_SP|1     |
|         |       |1    |C06_BM_SPEED_BPM_SP     |C06_BM_MW_WEIGHT_TARGET_G_SP|1     |
|         |       |2    |C07_BM_SPEED_BPM_SP     |C07_BM_MW_WEIGHT_TARGET_G_SP|1     |
|         |       |3    |C10_BM_SPEED_BPM_SP     |C10_BM_MW_WEIGHT_TARGET_G_SP|1     |
|         |       |4    |C11_BM_SPEED_BPM_SP     |C11_BM_MW_WEIGHT_TARGET_G_SP|1     |

We find that coaters/packagers 2, 6, 7, 10 and 11 are drawing from our producer, bubble gum machine 2.

2. Obtaining the consumer’s throughput

The consumer’s output should be presented in kg/h because the producers are also measured in kg/h. As mentioned above, we need to multiply two measurements to achieve this, namely the boxes per minute (BPM) and the weight per box (G_SP, set point in g).

Let’s start with retrieving the machine setpoint in gram per minute. To prepare the data for joining later, we’ll rename the _measurement column to _measurement_targetgsp.

1
2
3
4
5
6
MACHINES_GSP = from(bucket: "historian")
  |> range(start:-30d)
  |> filter(fn: (r) => r._measurement =~ /C[0-9]+_BM_MW_WEIGHT_TARGET_G_SP/ and r._field == "value" and r.status == "Good")
  |> last()
  |> rename(columns: {_measurement: "_measurement_targetgsp"})
  |> yield()

The resulting output looks like this:

| #datatype | string  | long  | dateTime:RFC3339     | dateTime:RFC3339     | dateTime:RFC3339     | double | string | string                       | string |
|-----------|---------|-------|----------------------|----------------------|----------------------|--------|--------|------------------------------|--------|
| #group    | false   | false | true                 | true                 | false                | false  | true   | true                         | true   |
| #default  | _result |       |                      |                      |                      |        |        |                              |        |
|           | result  | table | _start               | _stop                | _time                | _value | _field | _measurement_targetgsp       | status |
|           |         | 0     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 362    | value  | C01_BM_MW_WEIGHT_TARGET_G_SP | Good   |
|           |         | 1     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 154    | value  | C02_BM_MW_WEIGHT_TARGET_G_SP | Good   |
|           |         | 2     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 755    | value  | C03_BM_MW_WEIGHT_TARGET_G_SP | Good   |
|           |         | 3     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 433    | value  | C04_BM_MW_WEIGHT_TARGET_G_SP | Good   |
|           |         | 4     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 601    | value  | C05_BM_MW_WEIGHT_TARGET_G_SP | Good   |
|           |         | 5     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 723    | value  | C06_BM_MW_WEIGHT_TARGET_G_SP | Good   |
|           |         | 6     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 234    | value  | C07_BM_MW_WEIGHT_TARGET_G_SP | Good   |
|           |         | 7     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 352    | value  | C10_BM_MW_WEIGHT_TARGET_G_SP | Good   |
|           |         | 8     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 112    | value  | C11_BM_MW_WEIGHT_TARGET_G_SP | Good   |

Now, let’s also retrieve the boxes per minute as follows:

1
2
3
4
5
6
MACHINES_BPMSP = from(bucket: "historian")
  |> range(start:-30d)
  |> filter(fn: (r) => r._measurement =~ /C[0-9]+_BM_SPEED_BPM_SP/ and r._field == "value" and r.status == "Good")
  |> last()
  |> rename(columns: {_measurement: "_measurement_targetbpmsp", _value: "_bpm_sp"})
  |> yield()

And our data looks like this:

| #datatype | string  | long  | dateTime:RFC3339     | dateTime:RFC3339     | dateTime:RFC3339     | double  | string | string                   | string |
|-----------|---------|-------|----------------------|----------------------|----------------------|---------|--------|--------------------------|--------|
| #group    | false   | false | true                 | true                 | false                | false   | true   | true                     | true   |
| #default  | _result |       |                      |                      |                      |         |        |                          |        |
|           | result  | table | _start               | _stop                | _time                | _bpm_sp | _field | _measurement_targetbpmsp | status |
|           |         | 0     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 15      | value  | C01_BM_SPEED_BPM_SP      | Good   |
|           |         | 1     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 50      | value  | C02_BM_SPEED_BPM_SP      | Good   |
|           |         | 2     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 50      | value  | C03_BM_SPEED_BPM_SP      | Good   |
|           |         | 3     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 48      | value  | C04_BM_SPEED_BPM_SP      | Good   |
|           |         | 4     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 40      | value  | C05_BM_SPEED_BPM_SP      | Good   |
|           |         | 5     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 20      | value  | C06_BM_SPEED_BPM_SP      | Good   |
|           |         | 6     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 40      | value  | C07_BM_SPEED_BPM_SP      | Good   |
|           |         | 7     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 50      | value  | C10_BM_SPEED_BPM_SP      | Good   |
|           |         | 8     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:00Z | 50      | value  | C11_BM_SPEED_BPM_SP      | Good   |

So now that we have retrieved the routing table and the flow of product, let’s combine all of this to retrieve the total product draw from each bubble gum machine.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import "strings"

BG = from(bucket: "historian")
  |> range(start:-30d)
  |> filter(fn: (r) => r._measurement =~ /C[0-9]+_ROUTING_BG2/ and r._field == "value_num" and r.status == "Good")
  |> last()
  |> filter(fn: (r) => r._value == 1.0)
  |> map(fn: (r) => ({
    _measurement_targetgsp: strings.substring(v: r._measurement, start: 0, end: 3) + "_BM_MW_WEIGHT_TARGET_G_SP",
    _measurement_targetbpmsp: strings.substring(v: r._measurement, start: 0, end: 3) + "_BM_SPEED_BPM_SP",
    _value: r._value
  }))

MACHINES_GSP = from(bucket: "historian")
  |> range(start:-30d)
  |> filter(fn: (r) => r._measurement =~ /C[0-9]+_BM_MW_WEIGHT_TARGET_G_SP/ and r._field == "value" and r.status == "Good")
  |> last()
  |> rename(columns: {_measurement: "_measurement_targetgsp"})

MACHINES_BPMSP = from(bucket: "historian")
  |> range(start:-30d)
  |> filter(fn: (r) => r._measurement =~ /C[0-9]+_BM_SPEED_BPM_SP/ and r._field == "value" and r.status == "Good")
  |> last()
  |> rename(columns: {_measurement: "_measurement_targetbpmsp", _value: "_bpm_sp"})

BG_GSP = join(tables: {g_sp: MACHINES_GSP, bg: BG}, on: ["_measurement_targetgsp"], method: "inner")
BG_FLOW = join(tables: {bggsp: BG_GSP, bpm_sp: MACHINES_BPMSP}, on: ["_measurement_targetbpmsp"], method: "inner")
  |> map(fn: (r) => ({
    // Keep the machine name for clarity & debugging
    _machine: strings.substring(v: r._measurement_targetbpmsp, start: 0, end: 3),
    // Multiply by 60 and divide by 1000 to go from g / min to kg / h
    _value: r._value_g_sp * r._bpm_sp * 60.0 / 1000.0
  }))
  |> yield()

And our data looks like this:

| #datatype | string  | long  | double    | string   |
|-----------|---------|-------|-----------|----------|
| #group    | false   | false | false     | false    |
| #default  | _result |       |           |          |
|           | result  | table | _value    | _machine |
|           |         | 0     | 154       | C02      |
|           |         | 0     | 723       | C06      |
|           |         | 0     | 234       | C07      |
|           |         | 0     | 352       | C10      |
|           |         | 0     | 112       | C11      |

Now we just add a final |> sum() statement before the |> yield() at the end to retrieve the final draw from bubble gum machine 2 in kg / h:

| #datatype | string  | long  | double |
|-----------|---------|-------|--------|
| #group    | false   | false | false  |
| #default  | _result |       |        |
|           | result  | table | _value |
|           |         | 0     | 1575   |

We have found our total product draw in kg / h from bubble gum machine 2. Great!

3. Obtaining the producer mass flow

We will write a query to retrieve the last 30 days of data from the historian database, filter on the measurements from bubble gum machine 2 (for example). We are only interested in the last or current values, so the query becomes:

1
2
3
4
from(bucket: "historian")
  |> range(start:-30d)
  |> filter(fn: (r) => r._measurement == "BG2_FLOW_KG/H_ACT" and r._field == "value" and r.status == "Good")
  |> last()

To retrieve the following output:

| #datatype | string  | long  | dateTime:RFC3339     | dateTime:RFC3339     | dateTime:RFC3339     | double             | string | string                 | string |
|-----------|---------|-------|----------------------|----------------------|----------------------|--------------------|--------|------------------------|--------|
| #group    | false   | false | true                 | true                 | false                | false              | true   | true                   | true   |
| #default  | _result |       |                      |                      |                      |                    |        |                        |        |
|           | result  | table | _start               | _stop                | _time                | _value             | _field | _measurement           | status |
|           |         | 0     | 2019-08-21T15:29:51Z | 2019-09-20T15:29:51Z | 2019-09-20T15:29:50Z | 890.3575439453125  | value  | BG2_FLOW_KG/H_ACT      | Good   |

As simple as that! We find that bubble gum machine 2 is producing bubble gum cores at 890 kg/h.

4. Now what?

We have found that our bubble gum machine 2 is only producing bubble gum at 890 kg / h while our coaters and packagers are set to handle 1575 kg / h. I guess we’ll have to inform our production manager to:

  • increase the speed of the bubble gum machine to match the product draw or buy a new bubble gum machine because we are not running at full capacity
  • instruct our coaters/packagers to draw product from other bubble gum machines
  • lower the setpoints of our coaters/packagers so their combined draw equals 890 kg / h

In conclusion

We have shown a real-life use case of Flux to retrieve data in InfluxDB from multiple measurements and use it to solve a real business challenge encountered in a manufacturing or production context.

We’re curious about your InfluxDB/Flux use cases as well! If you can share them or if you have any questions, please do not hesitate to reach out through frederik@factry.io

Never miss the golden tip, subscribe to our quarterly newsletter