nifi-kafka-druid-water-level-data

This demo only runs in the default namespace, as a ServiceAccount will be created. Additionally, we have to use the FQDN service names (including the namespace), so that the used TLS certificates are valid.

This demo will

  • Install the required Stackable operators.

  • Spin up the following data products:

    • Superset: A modern data exploration and visualization platform. This demo utilizes Superset to retrieve data from Druid via SQL queries and build dashboards on top of that data.

    • Kafka: A distributed event streaming platform for high-performance data pipelines, streaming analytics and data integration. This demo uses it as an event streaming platform to stream the data in near real-time.

    • NiFi: An easy-to-use, robust system to process and distribute data. This demo uses it to fetch water-level data from the internet and ingest it into Kafka.

    • Druid: A real-time database to power modern analytics applications. This demo uses it to ingest the near real-time data from Kafka, store it and enable access to the data via SQL.

    • MinIO: A S3 compatible object store. This demo uses it as persistent storage for Druid to store all the data.

  • Ingest water level data from the PEGELONLINE web service into Kafka. The data contains measured water levels of different measuring stations all around Germany. If the web service is unavailable, this demo will not work, as it needs the web service to ingest the data.

    • First, historical data from the last 31 days will be fetched and ingested.

    • Afterwards, the demo will fetch the current measurement of every station approximately every two minutes and ingest it near-real-time into Kafka.

  • Start a Druid ingestion job that ingests the data into the Druid instance.

  • Create Superset dashboards for visualization of the data.

The whole data pipeline will have a very low latency, from putting a record into Kafka to showing up in the dashboard charts. You can see the deployed products and their relationship in the following diagram:

overview

System requirements

To run this demo, your system needs at least:

  • 9 cpu units (core/hyperthread)

  • 30GiB memory

  • 75GiB disk storage

List Deployed Stacklets

To list the installed Stackable services run the following command:

$ stackablectl stacklet list
┌───────────┬─────────────┬───────────┬────────────────────────────────────────────────┐
│ Product   ┆ Name        ┆ Namespace ┆ Endpoints                                      │
╞═══════════╪═════════════╪═══════════╪════════════════════════════════════════════════╡
│ druid     ┆ druid       ┆ default   ┆ broker-metrics        172.18.0.2:31804         │
│           ┆             ┆           ┆ broker-https          https://172.18.0.2:31725 │
│           ┆             ┆           ┆ coordinator-metrics   172.18.0.2:30547         │
│           ┆             ┆           ┆ coordinator-https     https://172.18.0.2:31186 │
│           ┆             ┆           ┆ historical-metrics    172.18.0.2:32024         │
│           ┆             ┆           ┆ historical-https      https://172.18.0.2:31239 │
│           ┆             ┆           ┆ middlemanager-metrics 172.18.0.2:32213         │
│           ┆             ┆           ┆ middlemanager-https   https://172.18.0.2:31641 │
│           ┆             ┆           ┆ router-metrics        172.18.0.2:30950         │
│           ┆             ┆           ┆ router-https          https://172.18.0.2:30175 │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ kafka     ┆ kafka       ┆ default   ┆ metrics               172.18.0.2:30145         │
│           ┆             ┆           ┆ kafka-tls             172.18.0.2:31662         │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ nifi      ┆ nifi        ┆ default   ┆ https                 https://172.18.0.2:30306 │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ superset  ┆ superset    ┆ default   ┆ external-superset     http://172.18.0.2:30963  │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ zookeeper ┆ zookeeper   ┆ default   ┆ zk                    172.18.0.2:32710         │
├╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ minio     ┆ minio-druid ┆ default   ┆ http                  http://172.18.0.2:30337  │
│           ┆             ┆           ┆ console-http          http://172.18.0.2:31775  │
└───────────┴─────────────┴───────────┴────────────────────────────────────────────────┘

When a product instance has not finished starting yet, the service will have no endpoint. Depending on your internet connectivity, creating all the product instances might take considerable time. A warning might be shown if the product is not ready yet.

Inspect Data in Kafka

Kafka is an event streaming platform to stream the data in near real-time. All the messages put in and read from Kafka are structured in dedicated queues called topics. The test data will be put into a topic called earthquakes. The records are produced (put in) by the test data generator and consumed (read) by Druid afterwards in the same order they were created.

As Kafka has no web interface, you must use a Kafka client like hhttps://github.com/edenhill/kcat[kafkacat]. Kafka uses mutual TLS, so clients wanting to connect to Kafka must present a valid TLS certificate. The easiest way to obtain this is to shell into the kafka-broker-default-0 Pod, as we will do in the following section for demonstration purposes. For a production setup, you should spin up a dedicated Pod provisioned with a certificate acting as a Kafka client instead of shell-ing into the Kafka Pod.

List Available Topics

You can execute a command on the Kafka broker to list the available topics as follows:

kubectl exec -it kafka-broker-default-0 -c kafka -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls_server_mount/tls.key -X ssl.certificate.location=/stackable/tls_server_mount/tls.crt -X ssl.ca.location=/stackable/tls_server_mount/ca.crt -L"
Metadata for all topics (from broker -1: ssl://localhost:9093/bootstrap):
 1 brokers:
  broker 1001 at 172.18.0.2:31146 (controller)
 2 topics:
  topic "stations" with 8 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
    partition 1, leader 1001, replicas: 1001, isrs: 1001
    partition 2, leader 1001, replicas: 1001, isrs: 1001
    partition 3, leader 1001, replicas: 1001, isrs: 1001
    partition 4, leader 1001, replicas: 1001, isrs: 1001
    partition 5, leader 1001, replicas: 1001, isrs: 1001
    partition 6, leader 1001, replicas: 1001, isrs: 1001
    partition 7, leader 1001, replicas: 1001, isrs: 1001
  topic "measurements" with 8 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
    partition 1, leader 1001, replicas: 1001, isrs: 1001
    partition 2, leader 1001, replicas: 1001, isrs: 1001
    partition 3, leader 1001, replicas: 1001, isrs: 1001
    partition 4, leader 1001, replicas: 1001, isrs: 1001
    partition 5, leader 1001, replicas: 1001, isrs: 1001
    partition 6, leader 1001, replicas: 1001, isrs: 1001
    partition 7, leader 1001, replicas: 1001, isrs: 1001

You can see that Kafka consists of one broker, and the topic stations and measurements have been created with eight partitions each.

Show Sample Records

To see some records sent to Kafka, run the following commands. You can change the number of records to print via the -c parameter.

kubectl exec -it kafka-broker-default-0 -c kafka -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls_server_mount/tls.key -X ssl.certificate.location=/stackable/tls_server_mount/tls.crt -X ssl.ca.location=/stackable/tls_server_mount/ca.crt -C -t stations -c 2"
{
  "uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
  "number": 48900237,
  "shortname": "EITZE",
  "longname": "EITZE",
  "km": 9.56,
  "agency": "VERDEN",
  "longitude": 9.2767694354,
  "latitude": 52.9040654474,
  "water": {
    "shortname": "ALLER",
    "longname": "ALLER"
  }
}
{
  "uuid": "5aaed954-de4e-4528-8f65-f3f530bc8325",
  "number": 48900204,
  "shortname": "RETHEM",
  "longname": "RETHEM",
  "km": 34.22,
  "agency": "VERDEN",
  "longitude": 9.3828408101,
  "latitude": 52.7890975921,
  "water": {
    "shortname": "ALLER",
    "longname": "ALLER"
  }
}
kubectl exec -it kafka-broker-default-0 -c kafka -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls_server_mount/tls.key -X ssl.certificate.location=/stackable/tls_server_mount/tls.crt -X ssl.ca.location=/stackable/tls_server_mount/ca.crt -C -t measurements -c 3"
{
  "timestamp": 1658151900000,
  "value": 221,
  "station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
}
{
  "timestamp": 1658152800000,
  "value": 220,
  "station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
}
{
  "timestamp": 1658153700000,
  "value": 220,
  "station_uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87"
}

The records of the two topics only contain the needed data. The measurement records contain a station_uuid for the measuring station. The relationship is illustrated below.

topics

The reason for splitting the data up into two different topics is the improved performance. One more straightforward solution would be to use a single topic and produce records like the following:

{
  "uuid": "47174d8f-1b8e-4599-8a59-b580dd55bc87",
  "number": 48900237,
  "shortname": "EITZE",
  "longname": "EITZE",
  "km": 9.56,
  "agency": "VERDEN",
  "longitude": 9.2767694354,
  "latitude": 52.9040654474,
  "water": {
    "shortname": "ALLER",
    "longname": "ALLER"
  },
  "timestamp": 1658151900000,
  "value": 221
}

Notice the two last attributes that differ from the previously shown stations records. The obvious downside is that every measurement (multiple millions of it) has to contain all the data known about the station it was measured at. This often leads to transmitting and storing duplicated information, e.g., the longitude of a station, resulting in increased network traffic and storage usage. The solution is only to send a station’s known/needed data or measurement data. This process is called data normalization. The downside is that when analyzing the data, you need to combine the records from multiple tables in Druid (stations and measurements).

If you are interested in how many records have been produced to the Kafka topic so far, use the following command. It will print the last record produced to the topic partition, formatted with the pattern specified in the -f parameter. The given pattern will print some metadata of the record.

$ kubectl exec -it kafka-broker-default-0 -c kafka -- /bin/bash -c "/stackable/kcat -b localhost:9093 -X security.protocol=SSL -X ssl.key.location=/stackable/tls_server_mount/tls.key -X ssl.certificate.location=/stackable/tls_server_mount/tls.crt -X ssl.ca.location=/stackable/tls_server_mount/ca.crt -C -t measurements -o -8 -c 8 -f 'Topic %t / Partition %p / Offset: %o / Timestamp: %T\n'"
Topic measurements / Partition 0 / Offset: 1324098 / Timestamp: 1680606104652
Topic measurements / Partition 1 / Offset: 1346816 / Timestamp: 1680606100462
Topic measurements / Partition 2 / Offset: 1339363 / Timestamp: 1680606100461
Topic measurements / Partition 3 / Offset: 1352787 / Timestamp: 1680606104652
Topic measurements / Partition 4 / Offset: 1330144 / Timestamp: 1680606098368
Topic measurements / Partition 5 / Offset: 1340226 / Timestamp: 1680606104652
Topic measurements / Partition 6 / Offset: 1320125 / Timestamp: 1680606100462
Topic measurements / Partition 7 / Offset: 1317719 / Timestamp: 1680606098368

If you calculate 1,324,098 records * 8 partitions, you end up with ~ 10,592,784 records. The output also shows that the last measurement record was produced at the timestamp 1680606104652, translating to Di 4. Apr 13:01:44 CEST 2023 (using the command date -d @1680606104).

NiFi

NiFi fetches water-level data from the internet and ingests it into Kafka in real time. This demo includes a workflow ("process group") that fetches the last 30 days of historical measurements and produces the records in Kafka. It also keeps streaming near-real-time updates for every available measuring station.

View testdata-generation Job

You can look at the ingestion job running in NiFi by opening the endpoint https from your stackablectl stacklet list command output. You have to use the endpoint from your command output. In this case, it is https://172.18.0.3:32440. Open it with your favourite browser. Suppose you get a warning regarding the self-signed certificate generated by the ref:secret-operator::index.adoc[Secret Operator] (e.g. Warning: Potential Security Risk Ahead). In that case, you must tell your browser to trust the website and continue.

nifi 1

Log in with the username admin and password adminadmin.

nifi 2

As you can see, the NiFi workflow consists of lots of components. It is split into two main components:

  1. On the left is the part bulk-loading all the known stations and the historical data for the last 30 days

  2. On the right is the other part iterating over all stations and emitting the current measurement in an endless loop

You can zoom in by using your mouse and mouse wheel.

nifi 3
nifi 4

The left workflows works as follows:

  1. The Get station list processors fetch the current list of stations as JSON via HTTP from the PEGELONLINE web service.

  2. Produce stations records takes the list and produces a Kafka record for every station into the topic stations.

  3. SplitRecords simultaneously takes the single FlowFile (NiFI record) containing all the stations and crates a new FlowFile for every station.

  4. Extract station_uuid takes every FlowFile representing a station and extract the attribute station_uuid into the metadata of the FlowFile.

  5. Get historic measurements calls the PEGELONLINE web service for every station and fetches the measurements of the last 30 days. All failures are routed to the LogAttribute processor to inspect them in case any failure occurs.

  6. Add station_uuid will add the attribute station_uuid to the JSON list of measurements returned from the PEGELONLINE web service, which is missing.

  7. PublishKafkaRecord_2_6 finally emits every measurement as a Kafka record to the topic measurements. All failures are routed to the LogAttribute processor to inspect them in case any failures occur.

The right side works similarly but is executed in an endless loop to stream the data in near-realtime. Double-click on the Get station list processor to show the processor details.

nifi 5

Head over to the tab PROPERTIES.

nifi 6

Here, you can see the setting Remote URl, which specifies the download URL from where the JSON file containing the stations is retrieved. Close the processor details popup by clicking OK. You can also have a detailed view of the Produce station records processor by double-clicking it.

nifi 7

The Kafka connection details within this processor - like broker addresses and topic names - are specified. It uses the JsonTreeReader to parse the downloaded JSON and the JsonRecordSetWriter to split it into individual JSON records before writing it out. Double-click the Get historic measurements processor.

nifi 8

This processor fetched the historical data for every station. Click on the Remote URL property.

nifi 9

The Remote URL does contain the ${station_uuid} placeholder, which gets replaced for every station.

Double-click the PublishKafkaRecord_2_6 processor.

nifi 10

You can also see the number of produced records by right-clicking on PublishKafkaRecord_2_6 and selecting View status history.

nifi 11

After selecting Messages Send (5 mins) in the top right corner, you can see that ~10 million records were produced in ~5 minutes, corresponding to ~30k measurements/s. Keep in mind that the demo uses a single-node NiFi setup. The performance can be increased by using multiple nodes.

Regarding the NiFi resources, use the hamburger menu icon on the top right corner and select Node Status History.

nifi 12

The diagram shows the used heap size of the NiFi node. You can also select other metrics to show in the top right corner.

Druid

Druid is used to ingest the near real-time data from Kafka, store it and enable SQL access. The demo has started two ingestion jobs - one reading from the topic stations and the other from measurements - and saving it into Druid’s deep storage. The Druid deep storage is based on the S3 store provided by MinIO.

View Ingestion Job

You can have a look at the ingestion jobs running in Druid by opening the Druid endpoint router-http from your stackablectl stacklet list command output (http://172.18.0.4:30899 in this case).

druid 1

By clicking on Ingestion at the top you can see the running ingestion jobs.

druid 2

After clicking on the magnification glass to the right side of the RUNNING supervisor, you can see additional information (here, the supervisor measurements was chosen). On the tab Statistics on the left, you can see the number of processed records as well as the number of errors.

druid 3

The statistics show that Druid is currently ingesting 3597 records/s and has already ingested ~10 million. All records have been ingested successfully, indicated by having no processWithError, thrownAway or unparseable records.

Query the Data Source

The started ingestion jobs have automatically created the Druid data sources stations and measurements. You can see the available data sources by clicking on Datasources at the top.

druid 4

The Avg. row size (bytes) column shows that a typical measurement record has 4 bytes, while a station record has 213, more than 50 times the size. So, by choosing two dedicated topics over a single topic, this demo saved 50x of storage and computation costs.

By clicking on the measurements data source, you can see the segments of the data source. In this case, the measurements data source is partitioned by the measurement day, resulting in 33 segments.

druid 5

Druid offers a web-based way of querying the data sources via SQL. To achieve this you first have to click on Query at the top.

druid 6

You can now enter any arbitrary SQL statement, to e.g. list 10 stations run

select * from stations limit 10
druid 7

To count the measurements per day run

select
  time_format(__time, 'YYYY/MM/dd') as "day",
  count(*) as measurements
from measurements
group by 1
order by 1 desc
druid 8

Superset

Superset provides the ability to execute SQL queries and build dashboards. Open the Superset endpoint external-superset in your browser (http://172.18.0.4:32251 in this case).

superset 1

Log in with the username admin and password adminadmin.

superset 2

View Dashboard

The demo has created a Dashboard to visualize the water level data. To open it, click on the tab Dashboards at the top.

superset 3

Click on the dashboard called Water level data. It might take some time until the dashboards renders all the included charts.

superset 4

View Charts

The dashboard Water level data consists of multiple charts. To list the charts, click on the tab Charts at the top.

superset 5

Click on the Chart Measurements / hour. On the left side, you can modify the chart and click on Run to see the effect.

superset 6

You can see that starting from 2022/08/12 some stations didn’t measure or transmit their data. They started sending measurements again at 2022/08/14.

View the Station Distribution on the World Map

To look at the stations' geographical distribution, you have to click on the tab Charts at the top again. Afterwards, click on the chart Stations distribution.

superset 7

The stations are, of course, placed alongside waterways. They are coloured by the waters they measure, so all stations alongside a body of water have the same colour. You can move and zoom the map with your mouse to interactively explore the map. You can, e.g. have a detailed look at the water Rhein.

superset 8

Execute arbitrary SQL statements

Within Superset, you can create dashboards and run arbitrary SQL statements. On the top, click on the tab SQL LabSQL Editor.

superset 9

On the left, select the database druid, the schema druid and set See table schema to stations or measurements.

superset 10

In the right textbox, enter the desired SQL statement. We need to join the two tables to get exciting results. Run the following query to determine the number of measurements the stations made:

select
  stations.longname as station,
  count(*) as measurements
from measurements inner join stations on stations.uuid = measurements.station_uuid
group by 1
order by 2 desc
superset 11

You can also find the number of measurements for every body of water:

select
  stations.water_longname as water,
  count(*) as measurements
from measurements inner join stations on stations.uuid = measurements.station_uuid
group by 1
order by 2 desc
superset 12

What might also be interesting is the average and current measurement of the stations:

select
  stations.longname as station,
  avg("value") as avg_measurement,
  latest("value") as current_measurement,
  latest("value") - avg("value") as diff
from measurements inner join stations on stations.uuid = measurements.station_uuid
group by 1
order by 2 desc
superset 13

MinIO

The S3 MinIO store provides persistent deep storage for Druid to store all the data used. Open the MinIO endpoint console-http retrieved by stackablectl stacklet list in your browser (http://172.18.0.5:32595 in this case).

minio 1

Log in with the username admin and password adminadmin.

minio 2

Click on the blue button Browse on the bucket druid and open the folders data.

minio 3

You can see the druid has created a folder for both data sources. Go ahead and open the folder measurements.

minio 4

Druid saved 35MB of data within 33 prefixes (folders). One prefix corresponds to one segment, which contains all the measurements of a day. If you don’t see any folders or files, the reason is that Druid still needs to save its data from memory to the deep storage. After waiting a few minutes, the data should have been flushed to S3 and show up.

minio 5

If you open up a prefix for a specific day, you can see that Druid has placed a file containing that day’s data there.

Summary

The demo put station records into the Kafka stream pipeline topic station. It also streamed ~30,000 measurements/s for a total of ~11 million measurements into the topic measurements. Druid ingested the data near real-time into its data source and enabled SQL access to it. Superset was used as a web-based frontend to execute SQL statements and build dashboards.

Where to go from here

There are multiple paths to go from here. The following sections give you some ideas on what to explore next. You can find the description of the water level data on the on the PEGELONLINE REST API documentation (German only).

Execute Arbitrary SQL Statements

Within Superset (or the Druid web interface), you can execute arbitrary SQL statements to explore the water level data.

Create Additional Dashboards

You also can create additional charts and bundle them together in a Dashboard. Have a look at the Superset documentation on how to do that.

Load Additional Data

You can use the NiFi web interface to collect arbitrary data and write it to Kafka (it’s recommended to use new Kafka topics for that). Alternatively, you can use a Kafka client like kafkacat to create new topics and ingest data. Using the Druid web interface, you can start an ingestion job that consumes and stores the data in an internal data source. There is an excellent tutorial from Druid on how to do this. Afterwards, the data source can be analyzed within Druid and Superset, like the earthquake data.