data-lakehouse-iceberg-trino-spark
This demo shows a data workload with real-world data volumes and uses significant resources to ensure acceptable response times. It will likely not run on your workstation. There is also the smaller trino-iceberg demo focusing on the abilities a lakehouse using Apache
Iceberg offers. The |
This demo only runs in the |
This demo will
-
Install the required Stackable operators.
-
Spin up the following data products:
-
Trino: A fast distributed SQL query engine for big data analytics that helps you explore your data universe. This demo uses it to enable SQL access to the data.
-
Apache Spark: A multi-language engine for executing data engineering, data science, and machine learning. This demo uses it to stream data from Kafka into the lakehouse.
-
MinIO: S3 compatible object store. This demo uses it as persistent storage to store all the data used
-
Apache Kafka: A distributed event streaming platform for high-performance data pipelines, streaming analytics and data integration. This demo uses it as an event streaming platform to stream the data in near real-time.
-
Apache NiFi: An easy-to-use, robust system to process and distribute data. This demo uses it to fetch multiple online real-time data sources and ingest it into Kafka.
-
Apache Hive metastore: A service that stores metadata related to Apache Hive and other services. This demo uses it as metadata storage for Trino and Spark.
-
Open policy agent (OPA): An open-source, general-purpose policy engine unifying policy enforcement across the stack. This demo uses it as the authorizer for Trino, which decides which user can query which data.
-
Apache Superset: A modern data exploration and visualization platform. This demo utilizes Superset to retrieve data from Trino via SQL queries and build dashboards on top of that data.
-
-
Copy multiple data sources in CSV and Parquet format into the S3 staging area.
-
Let Trino copy the data from the staging area into the lakehouse area. During the copy, transformations such as validating, casting, parsing timestamps and enriching the data by joining lookup tables are done.
-
Simultaneously, start a NiFi workflow, which fetches datasets in real-time via the internet and ingests the data as JSON records into Kafka.
-
Spark structured streaming job is started, which streams the data out of Kafka into the lakehouse.
-
Create Superset dashboards for visualization of the different datasets.
You can see the deployed products and their relationship in the following diagram:
System Requirements
The demo was developed and tested on a kubernetes cluster with 10 nodes (4 cores (8 threads), 20GB RAM and 30GB HDD). Instance types that loosely correspond to this on the Hyperscalers are:
-
Google:
e2-standard-8
-
Azure:
Standard_D4_v2
-
AWS:
m5.2xlarge
In addition to these nodes the operators will request multiple persistent volumes with a total capacity of about 1TB.
Apache Iceberg
As Apache Iceberg states on their website:
Apache Iceberg is an open table format for huge analytic datasets. Iceberg adds tables to compute engines including Spark, Trino, PrestoDB, Flink, Hive and Impala using a high-performance table format that works just like a SQL table.
This demo uses Iceberg, which plays nicely with object storage and has integrations for Trino and Spark. It also provides the following benefits among other things, instead of putting Apache Parquet files directly into S3 using the Hive connector:
-
Standardized table storage: Using this standardized specification, multiple tools such as Trino, Spark and Flink can read and write Iceberg tables.
-
Versioned tables with snapshots, time travel and rollback mechanisms
-
Row level updates and deletes: Deletes will be written as separate files for best performance and compacted with the below-mentioned mechanism.
-
Built-in compaction: Running table maintenance functions such as compacting smaller files (including deleted files) into larger files for best query performance is recommended. Iceberg offers out-of-the-box tools for this.
-
Hidden partitioning: Image you have a table
sales (day varchar, ts timestamp)
partitioned byday
. Lots of times, users would run a statement such asselect count(*) where ts > now() - interval 1 day
, resulting in a full table scan as the partition columnday
was not filtered in the query. Iceberg resolves this problem by using hidden partitions. In Iceberg, your table would look likesales (ts timestamp) with (partitioning = ARRAY['day(ts)'])
. The columnday
is not needed anymore, and the queryselect count(\*) where ts > now() - interval 1 day
would use partition pruning as expected to read only one the partitions from today and yesterday. -
Branching and tagging: Iceberg enables git-like semantics on your lakehouse. You can create tags pointing to a specific snapshot of your data and branches. For details, please read this excellent blog post. Currently, this is only supported in Spark. Trino is working on support.
If you want to read more about the motivation and the working principles of Iceberg, please have a read on their website or GitHub repository.
Listing Deployed Stacklets
To list the installed installed Stackable services run the following command:
$ stackablectl stacklet list
PRODUCT NAME NAMESPACE ENDPOINTS EXTRA INFOS
hive hive default hive 212.227.224.138:31022
metrics 212.227.224.138:30459
hive hive-iceberg default hive 212.227.233.131:31511
metrics 212.227.233.131:30003
kafka kafka default metrics 217.160.118.190:32160
kafka 217.160.118.190:31736
nifi nifi default https https://217.160.120.117:31499 Admin user: admin, password: adminadmin
opa opa default http http://217.160.222.211:31767
superset superset default external-superset http://212.227.233.47:32393 Admin user: admin, password: adminadmin
trino trino default coordinator-metrics 212.227.224.138:30610
coordinator-https https://212.227.224.138:30876
zookeeper zookeeper default zk 212.227.224.138:32321
minio minio default http http://217.160.222.211:32031 Third party service
console-http http://217.160.222.211:31429 Admin user: admin, password: adminadmin
When a product instance has not finished starting yet, the service will have no endpoint. Depending on your internet connectivity, creating all the product instances might take considerable time. A warning might be shown if the product is not ready yet. |
MinIO
Listing Buckets
The S3 provided by MinIO is used as persistent storage to store all the data used. Open the minio
endpoint
console-http
retrieved by the stackablectl stacklet list
command in your browser (http://217.160.222.211:31429 in
this case).
Log in with the username admin
and password adminadmin
.
Here, you can see the two buckets contained in the S3:
-
staging
: The demo loads static datasets into this area. It is stored in different formats, such as CSV and Parquet. It does contain actual data tables as well as lookup tables. -
lakehouse
: This bucket is where the cleaned and aggregated data resides. The data is stored in the Apache Iceberg table format.
Inspecting Lakehouse
Click on the blue button Browse
on the bucket lakehouse
.
Multiple folders (called prefixes in S3), each containing a different dataset, are displayed. First, select the folder
house-sales
, then the folder starting with house-sales-*
, and lastly, the folder named data
.
As you can see, the table house-sales
is partitioned by day. Go ahead and click on any folder.
You can see that Trino has placed a single file into the selected folder containing all the house sales of that particular year.
NiFi
NiFi is used to fetch multiple data sources from the internet and ingest it into Kafka near-realtime. Some of these sources are statically downloaded (e.g. as CSV), and others are fetched dynamically via APIs endpoints, including:
-
Water level measurements in Germany (real-time)
-
Shared bikes in Germany (real-time)
-
House sales in the UK (static)
-
Registered earthquakes worldwide (static)
-
E-charging stations in Germany (static)
-
NewYork taxi data (static)
View Ingestion Jobs
You can have a look at the ingestion job running in NiFi by opening the NiFi endpoint https
from your
stackablectl stacklet list
command output (https://217.160.120.117:31499 in this case).
Suppose you get a warning regarding the self-signed certificate generated by the Secret Operator (e.g. Warning: Potential Security Risk Ahead). In that case, you must tell your browser to trust the website and continue. |
Log in with the username admin
and password adminadmin
.
As you can see, the NiFi workflow consists of lots of components. You can zoom in by using your mouse and mouse wheel. On the left side are two strands, that
-
Fetch the list of known water-level stations and ingest them into Kafka.
-
Continuously run a loop fetching the measurements of the last 30 for every measuring station and ingesting them into Kafka.
On the right side are three strands that
-
Fetch the current shared bike station information
-
Fetch the current shared bike station status
-
Fetch the current shared bike bike status
For details on the NiFi workflow ingesting water-level data, please read the nifi-kafka-druid-water-level-data documentation on NiFi.
Spark
Spark Structured Streaming is used to stream data from Kafka into the lakehouse.
Accessing the Web Interface
To have access to the Spark web interface you need to run the following command to forward port 4040 to your local machine.
kubectl port-forward $(kubectl get pod -o name | grep 'spark-ingest-into-lakehouse-.*-driver') 4040
Afterwards you can access the web interface on http://localhost:4040.
Listing Running Streaming Jobs
The UI displays the last job runs. Each running Structured Streaming job creates lots of Spark jobs internally. Click on
the Structured Streaming
tab to see the running streaming jobs.
Five streaming jobs are currently running. You can also click on a streaming job to get more details. For the job
ingest smart_city shared_bikes_station_status
click, on the Run ID
highlighted in blue to open them up.
How the Streaming Jobs Work
The demo has started all the running streaming jobs. Look at the demo code to see the actual code
submitted to Spark. This document will explain one specific ingestion job - ingest water_level measurements
.
The streaming job is written in Python using pyspark
. First off, the schema used to parse the JSON coming from Kafka
is defined. Nested structures or arrays are supported as well. The schema differs from job to job.
schema = StructType([ \
StructField("station_uuid", StringType(), True), \
StructField("timestamp", TimestampType(), True), \
StructField("value", FloatType(), True), \
])
Afterwards, a streaming read from Kafka is started. It reads from our Kafka at kafka:9090
with the topic
water_levels_measurements
. When starting up, the job will ready all the existing messages in Kafka (read from
earliest) and will process 50000000 records as a maximum in a single batch. As Kafka has retention set up, Kafka records
might alter out of the topic before Spark has read the records, which can be the case when the Spark application wasn’t
running or crashed for too long. In the case of this demo, the streaming job should not error out. For a production job,
failOnDataLoss
should be set to true
so that missing data does not go unnoticed - and Kafka offsets need to be
adjusted manually, as well as some post-loading of data.
Note: The following Python snippets belong to a single Python statement but are split into separate blocks for better explanation.
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "water_levels_measurements") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 50000000) \
.option("failOnDataLoss", "false") \
.load() \
So far we have a readStream
reading from Kafka. Records on Kafka are simply a byte-stream, so they must be converted
to strings and the json needs to be parsed.
.selectExpr("cast(key as string)", "cast(value as string)") \
.withColumn("json", from_json(col("value"), schema)) \
Afterwards, we only select the needed fields (coming from JSON). We are not interested in all the other fields, such as
key
, value
, topic
or offset
. The metadata of the Kafka records, such as topic
, timestamp
, partition
and
offset
, are also available. Please have a look at the Spark streaming documentation on Kafka.
.select("json.station_uuid", "json.timestamp", "json.value") \
After all these transformations, we need to specify the sink of the stream, in this case, the Iceberg lakehouse. We are
writing in the iceberg
format using the update
mode rather than the "normal" append
mode. Spark will aim for a
micro-batch every 2 minutes and save its checkpoints (its current offsets on the Kafka topic) in the specified S3
location. Afterwards, the streaming job will be started by calling .start()
.
.writeStream \
.queryName("ingest water_level measurements") \
.format("iceberg") \
.foreachBatch(upsertWaterLevelsMeasurements) \
.outputMode("update") \
.trigger(processingTime='2 minutes') \
.option("checkpointLocation", "s3a://lakehouse/water-levels/checkpoints/measurements") \
.start()
Deduplication Mechanism
One important part was skipped during the walkthrough:
.foreachBatch(upsertWaterLevelsMeasurements) \
upsertWaterLevelsMeasurements
is a Python function that describes inserting the records from Kafka into the lakehouse
table. This specific streaming job removes all duplicate records that can occur because of how the PegelOnline API works
and gets called. As we don’t want duplicate rows in our lakehouse tables, we need to filter the duplicates out as
follows.
def upsertWaterLevelsMeasurements(microBatchOutputDF, batchId):
microBatchOutputDF.createOrReplaceTempView("waterLevelsMeasurementsUpserts")
microBatchOutputDF._jdf.sparkSession().sql("""
MERGE INTO lakehouse.water_levels.measurements as t
USING (SELECT DISTINCT * FROM waterLevelsMeasurementsUpserts) as u
ON u.station_uuid = t.station_uuid AND u.timestamp = t.timestamp
WHEN NOT MATCHED THEN INSERT *
""")
First, the data frame containing the upserts (records from Kafka) will be registered as a temporary view so that they
can be accessed via Spark SQL. Afterwards, the MERGE INTO
statement adds the new records to the lakehouse table.
The incoming records are first de-duplicated (using SELECT DISTINCT * FROM waterLevelsMeasurementsUpserts
) so that the
data from Kafka does not contain duplicates. Afterwards, the - now duplication-free - records get added to the
lakehouse.water_levels.measurements
, but only if they still need to be present.
Upsert Mechanism
The MERGE INTO
statement can be used for de-duplicating data and updating existing rows in the lakehouse table. The
ingest water_level stations
streaming job uses the following MERGE INTO
statement:
MERGE INTO lakehouse.water_levels.stations as t
USING
(
SELECT station_uuid, number, short_name, long_name, km, agency, latitude, longitude, water_short_name, water_long_name
FROM waterLevelsStationInformationUpserts
WHERE (station_uuid, kafka_timestamp) IN (SELECT station_uuid, max(kafka_timestamp) FROM waterLevelsStationInformationUpserts GROUP BY station_uuid)
) as u
ON u.station_uuid = t.station_uuid
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
First, the data within a batch is de-deduplicated as well. The record containing the station update with the highest Kafka timestamp is the newest and will be used during Upsert.
If a record for a station (detected by the same station_uuid
) already exists, its contents will be updated. If the
station is yet to be discovered, it will be inserted. The MERGE INTO
also supports updating subsets of fields and more
complex calculations, e.g. incrementing a counter. For details, have a look at the
Iceberg MERGE INTO documentation.
Delete Mechanism
The MERGE INTO
statement can de-duplicate data and update existing lakehouse table rows. For details have a look at
the Iceberg MERGE INTO documentation.
Table Maintenance
As mentioned, Iceberg supports out-of-the-box table maintenance such as compaction.
This demo executes some maintenance functions in a rudimentary Python loop with timeouts in between. When running in production, the maintenance can be scheduled using Kubernetes CronJobs or Apache Airflow, which the Stackable Data Platform also supports.
# key: table name
# value: compaction strategy
tables_to_compact = {
"lakehouse.water_levels.stations": "",
"lakehouse.water_levels.measurements": ", strategy => 'sort', sort_order => 'timestamp DESC NULLS LAST,station_uuid ASC NULLS LAST'",
"lakehouse.smart_city.shared_bikes_station_information": "",
"lakehouse.smart_city.shared_bikes_station_status": ", strategy => 'sort', sort_order => 'last_reported DESC NULLS LAST,station_id ASC NULLS LAST'",
"lakehouse.smart_city.shared_bikes_bike_status": "",
}
while True:
expire_before = (datetime.now() - timedelta(hours=12)).strftime("%Y-%m-%d %H:%M:%S")
for table, table_compaction_strategy in tables_to_compact.items():
print(f"[{table}] Expiring snapshots older than 12 hours ({expire_before})")
spark.sql(f"CALL lakehouse.system.expire_snapshots(table => '{table}', older_than => TIMESTAMP '{expire_before}', retain_last => 50, stream_results => true)")
print(f"[{table}] Removing orphaned files")
spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => '{table}')")
print(f"[{table}] Starting compaction")
spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => '{table}'{table_compaction_strategy})")
print(f"[{table}] Finished compaction")
print("All tables compacted. Waiting 25min before scheduling next run...")
time.sleep(25 * 60) # Assuming compaction takes 5 min run every 30 minutes
The scripts have a dictionary of all the tables to run maintenance on. The following procedures are run:
expire_snapshots
Each write/update/delete/upsert/compaction in Iceberg produces a new snapshot while keeping the old data and metadata around for snapshot isolation and time travel. The expire_snapshots procedure can be used to remove older snapshots and their files which are no longer needed.
remove_orphan_files
Used to remove files which are not referenced in any metadata files of an Iceberg table and can thus be considered "orphaned".
rewrite_data_files
Iceberg tracks each data file in a table. More data files leads to more metadata stored in manifest files, and small data files causes an unnecessary amount of metadata and less efficient queries from file open costs. Iceberg can compact data files in parallel using Spark with the rewriteDataFiles action. This will combine small files into larger files to reduce metadata overhead and runtime file open cost.
Some tables will also be sorted during rewrite, please have a look at the documentation on rewrite_data_files.
Trino
Trino is used to enable SQL access to the data.
Accessing the Web Interface
Open up the the Trino endpoint coordinator-https
from your stackablectl stacklet list
command output
(https://212.227.224.138:30876 in this case).
Log in with the username admin
and password adminadmin
.
Connect to Trino
Please have a look at the trino-operator documentation on how to connect to Trino. This demo recommends to use DBeaver, as Trino consists of many schemas and tables you can explore.
Here you can see all the available Trino catalogs.
-
staging
: The staging area containing raw data in various data formats such as CSV or Parquet -
system
: Internal catalog to retrieve Trino internals -
tpcds
: TPCDS connector providing a set of schemas to support the TPC Benchmark™ DS -
tpch
: TPCH connector providing a set of schemas to support the TPC Benchmark™ DS -
lakehouse
: The lakehouse area containing the enriched and performant accessible data
Superset
Superset provides the ability to execute SQL queries and build dashboards. Open the Superset endpoint
external-superset
in your browser (http://212.227.233.47:32393 in this case).
Log in with the username admin
and password adminadmin
.
Viewing the Dashboard
The demo has created dashboards to visualize the different data sources. Select the Dashboards
tab at the top to view
these dashboards.
Click on the dashboard called House sales
. It might take some time until the dashboards renders all the included
charts.
Another dashboard to look at is Earthquakes
.
Another dashboard to look at is Taxi trips
.
There are multiple other dashboards you can explore on you own.
Viewing Charts
The dashboards consist of multiple charts. To list the charts, select the Charts
tab at the top.
Executing Arbitrary SQL Statements
Within Superset, you can create dashboards and run arbitrary SQL statements. On the top click on the tab SQL Lab
→
SQL Editor
.
On the left, select the database Trino lakehouse
, the schema house_sales
, and set See table schema
to
house_sales
.
In the right textbox, you can enter the desired SQL statement. If you want to avoid making one up, use the following:
select city, sum(price) as sales
from house_sales
group by 1
order by 2 desc