Spark SQL: A Guide to Effective Data Processing

Spark SQL: A Guide to Effective Data Processing

Different Object Creations in Spark SQL

Table of contents

Apache Spark SQL (latest versions, including Spark 3.x and 4.x) supports various objects for efficient data processing, querying, and managing structured & semi-structured data.

1. Objects You Can Create in the Latest Version of Spark SQL

Below are the key objects you can create using the latest version of Spark SQL:


1.1 Databases (Catalogs & Namespaces)

Purpose: Organize structured data like schemas in a database.

How to Create a Database?

CREATE DATABASE IF NOT EXISTS sales_db;

Use SHOW DATABASES; to list all databases.


1.2 Tables (Managed, External, Temporary, Delta, Iceberg, Hudi)

Purpose: Store structured data in Spark SQL.

A) Managed Table (Spark owns the storage)

CREATE TABLE sales_data (
    id INT, 
    amount DOUBLE, 
    date STRING
) USING PARQUET;

Storage is managed by Spark (saved in warehouse location).

Stores the table in PARQUET format for efficient storage & retrieval.

Parquet is a columnar storage format, meaning:

✔️ Faster queries than CSV or JSON.
✔️ Compresses data efficiently.
✔️ Optimized for big data processing in Spark.

In Databricks, the table is saved in DBFS (Databricks File System) at:

  •   /user/hive/warehouse/sales_data
    

    In Spark Standalone, it will be stored in the default warehouse location.

  • To check where it is stored, run:

      DESCRIBE FORMATTED sales_data;
    

B) External Table (Data stored outside Spark)

CREATE EXTERNAL TABLE sales_external (
    id INT, 
    amount DOUBLE, 
    date STRING
) USING PARQUET LOCATION 's3://my-bucket/sales/';

An External Table in Spark SQL is a table where the data is stored outside Spark’s managed storage, such as AWS S3, Azure Data Lake, Google Cloud Storage, or HDFS.

Unlike Managed Tables, where Spark controls the storage, External Tables only define the schema and location of existing data without moving or deleting the actual files.

Useful when data is in cloud storage (S3, ADLS, GCS, HDFS).


C) Temporary Tables (Session-Specific)

CREATE TEMPORARY VIEW temp_sales AS 
SELECT * FROM sales_data;

A Temporary View is a session-scoped virtual table that:

  • Is created dynamically from a query.

  • Exists only for the current session.

  • Disappears once the session ends (not stored permanently).

Example Use Case:

  • You don’t want to create a physical table, but you need to reuse a query multiple times.

Alternative: Creating a Global Temporary View

If you need the view to be accessible across multiple sessions, use Global Temporary Views:

CREATE GLOBAL TEMPORARY VIEW global_sales AS 
SELECT * FROM sales_data;

Global views persist until all sessions are closed.
To query a global view, use:

SELECT * FROM global_temp.global_sales;

D) Delta Tables (Latest Feature)

Delta Lake provides ACID transactions, versioning, and schema evolution.

CREATE TABLE delta_sales 
USING DELTA 
AS SELECT * FROM parquet.`/mnt/datalake/sales/`;

Delta Lake is an enhanced storage format built on top of Parquet, designed to handle big data, real-time analytics, and machine learning workloads.

Key Features of Delta Lake:

ACID Transactions – Ensures data integrity (unlike Parquet).
Schema Evolution – Allows adding/removing columns dynamically.
Time Travel – Lets you query previous versions of data.
Deletes & Updates – Supports UPDATE, DELETE, and MERGE commands (unlike Parquet).

Example: Time Travel (Query Old Data)

SELECT * FROM delta_sales VERSION AS OF 2;

When Should You Use Delta Lake Instead of Parquet?

FeatureParquetDelta Lake
ACID Transactions❌ No✅ Yes
Schema Evolution❌ Manual✅ Automatic
Time Travel❌ No✅ Yes
Supports Updates/Deletes❌ No✅ Yes
Performance (Indexing & Caching)🚀 Fast⚡ Faster

Delta Lake is ideal for production-ready big data & machine learning pipelines.


E) Iceberg Table (Latest Feature)

Apache Iceberg supports large-scale table management with schema evolution.

CREATE TABLE iceberg_sales 
USING ICEBERG 
AS SELECT * FROM parquet.`/mnt/datalake/sales/`;

This process converts Parquet data into Iceberg format, enabling advanced features like time travel, schema evolution, hidden partitioning, and incremental data processing.

What is Apache Iceberg?

Apache Iceberg is an open-source table format designed for large-scale analytics on data lakes (AWS S3, Azure Data Lake, Google Cloud Storage, HDFS).

Key Features of Apache Iceberg:

Time Travel – Query old versions of data effortlessly.
Schema Evolution – Add, remove, or rename columns without breaking queries.
Hidden Partitioning – No need to manually create partitions.
Incremental Processing – Query only the changed/new data efficiently.
ACID Transactions – Ensures data consistency like a database.


When Should You Use Iceberg Instead of Parquet or Delta?

FeatureParquetDelta LakeIceberg
ACID Transactions❌ No✅ Yes✅ Yes
Schema Evolution❌ Manual✅ Yes✅ Yes
Time Travel❌ No✅ Yes✅ Yes
Automatic Partitioning❌ No❌ No✅ Yes
Incremental Queries❌ No❌ No✅ Yes
Cloud-Native & Open-Source✅ Yes❌ Mostly Databricks✅ Yes

Apache Iceberg is best for production data lakes where performance, time travel, and incremental processing are required.

F) Hudi Table (For Streaming & Incremental Processing)

Apache Hudi is best for real-time updates & transactional tables.

CREATE TABLE hudi_sales 
USING HUDI 
AS SELECT * FROM parquet.`/mnt/datalake/sales/`;

Supports upserts & incremental queries. This process converts the Parquet dataset into Apache Hudi format, enabling real-time data ingestion, updates, and incremental queries.

What is Apache Hudi?

Apache Hudi (Hadoop Upserts, Deletes, and Incrementals) is a transactional storage format designed for real-time big data processing on data lakes like AWS S3, Azure Data Lake, Google Cloud Storage, or HDFS.

Key Features of Apache Hudi:

Upserts & Deletes – Supports row-level updates and deletes efficiently.
Incremental Queries – Retrieve only changed records instead of full scans.
Time Travel & Versioning – Query historical snapshots of data.
Streaming & Real-Time Processing – Optimized for streaming ingestion.
ACID Transactions – Ensures data consistency in distributed environments.

When Should You Use Hudi Instead of Parquet, Delta, or Iceberg?

FeatureParquetDelta LakeIcebergHudi
ACID Transactions❌ No✅ Yes✅ Yes✅ Yes
Schema Evolution❌ Manual✅ Yes✅ Yes✅ Yes
Time Travel❌ No✅ Yes✅ Yes✅ Yes
Automatic Partitioning❌ No❌ No✅ Yes✅ Yes
Incremental Queries❌ No❌ No✅ Yes✅ Yes
Upserts & Deletes❌ No✅ Yes❌ No✅ Yes
Optimized for Streaming❌ No❌ No❌ No✅ Yes

Apache Hudi is best for real-time streaming data and transactional updates.


1.3 Views (Logical Tables)

Purpose: Save complex queries as reusable objects.

A) Permanent View

CREATE VIEW sales_view AS 
SELECT id, amount, date FROM sales_data WHERE amount > 1000;

🔹 Acts like a saved query.


B) Temporary View (Session-Specific)

CREATE TEMP VIEW temp_view AS 
SELECT * FROM sales_data WHERE date > '2024-01-01';

🔹 Exists only during session runtime.


1.4 Functions (User-Defined Functions - UDFs)

Purpose: Create custom functions for advanced data processing.UDFs help extend Spark SQL’s built-in functions, making complex operations easier!

A) Scalar UDF (Returns Single Value)

Think of Scalar UDFs like a function in Excel that applies a formula to each individual cell separately.

CREATE FUNCTION to_upper AS 'org.apache.spark.sql.functions.upper';

Converts text to uppercase.

B) Aggregate UDF (Returns Aggregated Values)

Think of Aggregate UDFs like SUM or AVG in Excel, where instead of modifying individual cells, they calculate a single output for a group of values.

CREATE FUNCTION custom_sum AS 'org.apache.spark.sql.functions.sum';

🔹 Performs custom aggregation operations.

C) Table UDFs

Creating a Table UDF in SQL

The reference to com.example.spark.udtf.SplitString in Spark SQL suggests a User-Defined Table Function (UDTF) that splits a comma-separated string into multiple rows.

Since Spark does not natively support UDTFs in SQL (like Hive does), you must implement it in Java or Scala and register it in Spark before using it.

CREATE FUNCTION split_string_udtf AS 'com.example.spark.udtf.SplitString';

This function takes a string and returns multiple rows.

Step 1: Implementing SplitString UDTF in Java (or Scala)

This class implements the UDTF logic in Java.

package com.example.spark.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.util.ArrayList;

public class SplitString extends GenericUDTF {
    @Override
    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        if (args.length != 1) {
            throw new UDFArgumentLengthException("SplitString() takes exactly one argument.");
        }
        if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentTypeException(0, "Only string type arguments are allowed.");
        }

        ArrayList<String> fieldNames = new ArrayList<>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<>();
        fieldNames.add("word");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    @Override
    public void process(Object[] args) throws HiveException {
        if (args[0] == null) return;
        String input = args[0].toString();
        String[] words = input.split(",");
        for (String word : words) {
            forward(new Object[]{word.trim()});
        }
    }

    @Override
    public void close() throws HiveException { }
}

What This Code Does:

  • Takes a comma-separated string as input.

  • Splits the string into individual words.

  • Emits each word as a separate row.


Step 2: Registering the UDTF in Spark

After compiling and adding the JAR file to Spark, register the UDTF:

CREATE FUNCTION split_string_udtf AS 'com.example.spark.udtf.SplitString';

Step 3: Using the UDTF in SQL

Now, you can call the function in Spark SQL:

SELECT split_string_udtf('apple,banana,orange');

Expected Output:

fruit
apple
banana
orange

Alternative Approach Without UDTFs

If UDTFs are not available in your Spark setup, you can use the built-in EXPLODE() function:

SELECT EXPLODE(SPLIT('apple,banana,orange', ',')) AS fruit;

Using the Table UDF in SQL

SELECT split_string_udtf('apple,banana,orange');

Expected Output:

fruit
apple
banana
orange

D.) PANDAS UDF

Creating Pandas UDFs in Spark SQL

Pandas UDFs (also called Vectorized UDFs) apply operations on entire columns at once, making them much faster than row-based UDFs.

Example: Creating a Pandas UDF in SQL

Use Case: Multiply a column's values by 2 for better performance.

Register Pandas UDF in Python

Since Pandas UDFs require Python, first define the function in a PySpark notebook.

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf("double")  # Define return type
def multiply_by_2(s: pd.Series) -> pd.Series:
    return s * 2

# Register the function in Spark SQL
spark.udf.register("multiply_by_2", multiply_by_2)

This function is now available in Spark SQL.


Using the Pandas UDF in SQL

SELECT multiply_by_2(amount) AS doubled_amount FROM sales_data;

Expected Output:

amountdoubled_amount
100200
250500
75150

Pandas UDFs execute much faster than row-based UDFs!

Summary: Table UDFs vs. Pandas UDFs in SQL

FeatureTable UDF (UDTF)Pandas UDF
ReturnsMultiple rows per inputOne transformed column
Example"apple,banana"apple, bananaMultiply column values by 2
PerformanceMediumFastest (Vectorized Processing)
Use CaseSplitting strings, flattening arraysMath transformations, scaling data

1.5 Indexes

Purpose: Improve performance for frequent queries.

Traditional Indexing in Spark SQL (Basic Index)

This is similar to SQL indexing, but Spark does not support direct CREATE INDEX commands like traditional RDBMS (MySQL, PostgreSQL).

Instead, indexing is done using sorting and bucketing.

A) Bucketing (Indexing Alternative)

  • Bucketing groups similar data into fixed-size partitions for faster joins and aggregations.

  • Works like indexing but divides data into pre-defined "buckets".

🔹 Creating a Bucketed Table in Spark

CREATE TABLE sales_bucketed (
    id INT, 
    amount DOUBLE
) USING PARQUET 
CLUSTERED BY (id) INTO 4 BUCKETS;

This clusters data based on id into 4 partitions, improving performance.

CREATE TABLE sales_bucketed (...)

Creates a new table named sales_bucketed.

id INT, amount DOUBLE

Defines two columns:

  • id → Unique identifier (Integer).

  • amount → Sales amount (Decimal/Double).

USING PARQUET

Stores the table in Parquet format, which is a compressed and optimized file format for big data.

CLUSTERED BY (id) INTO 4 BUCKETS

Organizes the data into 4 buckets based on the id column.

  • All rows with the same id are assigned to the same bucket.

  • Spark pre-assigns the data to buckets, making joins and searches much faster.


B) Indexing in Delta Lake (OPTIMIZE + ZORDER)

Delta Lake improves query performance by using indexing mechanisms like Z-Ordering, Data Skipping, and Bloom Filters.

1️⃣Z-Ordering Index (Multi-Dimensional Indexing)

  • Best for large datasets with multiple filters (e.g., filtering by date and amount).

  • Helps Spark read fewer files instead of scanning all data.

🔹 Creating a Z-Ordering Index

OPTIMIZE delta_sales ZORDER BY (date, amount);

This improves query performance when filtering by date and amount.

Example Query That Runs Faster:

SELECT * FROM delta_sales WHERE date > '2024-01-01' AND amount > 5000;

Query That Runs Faster After Z-Ordering

SELECT * FROM delta_sales WHERE date > '2024-01-01' AND amount > 5000;

Only relevant files are scanned, making the query faster.

2️⃣ Data Skipping (Automatic Indexing in Delta)

Delta Lake tracks metadata like min and max values in each file, so Spark skips irrelevant files during queries.

How It Works?

  • Each Delta file contains statistics (e.g., min/max values for columns).

  • When running a query, Delta Lake only reads the files that contain relevant data.

Example Query That Benefits From Data Skipping

SELECT * FROM delta_sales WHERE amount > 1000;

Delta Lake automatically avoids scanning files where amount is always below 1000.


3️⃣ Bloom Filter Index (Fast Exact Match Lookup)

Best for finding specific values in large datasets.

  • Works like a search index → quickly tells if a value exists in a file.

  • Used for fast lookups in high-cardinality columns (e.g., unique customer IDs).

Enabling Bloom Indexing in Delta

ALTER TABLE delta_sales 
SET TBLPROPERTIES ('delta.bloomFilter.columns' = 'id');

This enables a Bloom filter index on the id column.

Query That Runs Faster After Bloom Indexing

SELECT * FROM delta_sales WHERE id = 1001;

Instead of scanning all files, Spark directly finds the relevant files.

4️⃣ Indexing Using Partitioning

Partitioning is a type of indexing where data is split into folders based on a column.

  • Best for columns with low cardinality (e.g., year, country).

Creating a Partitioned Delta Table

CREATE TABLE delta_sales (
    id INT,
    amount DOUBLE,
    date STRING
) USING DELTA 
PARTITIONED BY (date);

Example Query That Benefits From Partitioning

SELECT * FROM delta_sales WHERE date = '2024-02-01';

Spark reads only the date=2024-02-01 partition, making it much faster.

Summary: Indexing Methods in Delta Lake

Index TypeBest ForHow to Enable?
Z-OrderingMulti-column filteringOPTIMIZE ZORDER BY (date, amount);
Data SkippingAuto-optimized query performanceEnabled by default in Delta
Bloom Filter IndexFast lookups on unique valuesALTER TABLE SET TBLPROPERTIES ('delta.bloomFilter.columns' = 'id');
PartitioningFiltering based on low-cardinality columnsPARTITIONED BY (date);

C.) Apache Iceberg Indexing (Hidden Partitioning & Sort Order)

Iceberg provides automatic partitioning and indexing, reducing query scan time.

1️⃣ Hidden Partition Indexing (Automatic Partitioning in Iceberg)

Traditional partitioning requires managing folders manually, but Iceberg automatically tracks partitions without needing to create directories.

Why Use Hidden Partitioning?

  • Iceberg automatically tracks partition values in metadata (no need for PARTITIONED BY in SQL).

  • Queries don’t need to specify partition columns explicitly.

Creating an Iceberg Table with Automatic Partition Indexing

CREATE TABLE iceberg_sales (
    id INT,
    amount DOUBLE,
    date STRING
) USING ICEBERG;

Iceberg automatically partitions the date column behind the scenes.

Query That Benefits From Hidden Partitioning

SELECT * FROM iceberg_sales WHERE date > '2024-01-01';

Spark reads only relevant partitions instead of scanning the entire dataset.


2️⃣ Sorting-Based Indexing (Sort Order)

Iceberg allows sorting at the file level, improving query efficiency for ordered queries.

Why Use Sort Order Indexing?

  • Helps with range-based queries (BETWEEN, ORDER BY).

  • Stores data physically in sorted order, reducing scan time.

Enabling Sort Order Indexing

ALTER TABLE iceberg_sales WRITE ORDERED BY (date, amount);

Example Query That Runs Faster With Sorting

SELECT * FROM iceberg_sales WHERE amount BETWEEN 500 AND 1000 ORDER BY date;

Sorted storage reduces the number of files scanned.


3️⃣ Data Skipping Indexing (Manifest & Metadata Tracking)

Iceberg automatically tracks column statistics (min, max, count, etc.) so Spark skips unnecessary files when executing queries.

How Does It Work?

  • Each data file contains metadata about min/max values for indexed columns.

  • When querying, Iceberg only scans files that contain relevant values.

Example Query That Benefits From Data Skipping

SELECT * FROM iceberg_sales WHERE amount > 5000;

Instead of scanning all files, Iceberg reads only files where amount might be >5000.

4️⃣Bloom Filter Indexing (Fast Lookup Indexing)

Bloom Filters help with fast, exact-match lookups on high-cardinality columns (like id).

Why Use Bloom Filters?

  • Helps fast lookup queries (WHERE id = 1001).

  • Avoids scanning unnecessary data files.

🔹 Enabling Bloom Filters in Iceberg

ALTER TABLE iceberg_sales SET TBLPROPERTIES (
    'write.metadata.bloom-filter-columns'='id'
);

Example Query That Runs Faster With Bloom Indexing

SELECT * FROM iceberg_sales WHERE id = 1001;

Spark directly finds the relevant file instead of scanning all records.


5️⃣ Secondary Indexing (Apache Iceberg + Apache Lucene)

Apache Iceberg can integrate with Apache Lucene for advanced secondary indexing (experimental feature).

Why Use Secondary Indexing?

  • Helps speed up text searches and non-primary key lookups.

Example Query That Runs Faster With Secondary Indexing

SELECT * FROM iceberg_sales WHERE customer_name LIKE 'John%';

If Lucene indexing is enabled, text-based lookups become much faster.


Summary: Different Indexing Methods in Apache Iceberg

Index TypeBest ForHow to Enable?
Hidden PartitioningFast partition-based filteringEnabled by default in Iceberg
Sorting-Based IndexingRange queries, ORDER BYWRITE ORDERED BY (column)
Data Skipping (Manifest Indexing)Auto-optimized queriesEnabled by default in Iceberg
Bloom Filter IndexingFast lookups on unique valuesALTER TABLE SET TBLPROPERTIES ('write.metadata.bloom-filter-columns'='id');
Secondary Indexing (Lucene)Full-text search, complex lookupsExperimental (Lucene-based indexing)

D.) Apache Hudi Indexing (Bloom Index, Global Index)

Apache Hudi (Hadoop Upserts, Deletes, and Incrementals) provides multiple types of indexing mechanisms to speed up queries and optimize data processing.

Hudi is designed for real-time, incremental, and streaming data workloads, so its indexing methods focus on fast updates, upserts, and lookups.

1️⃣ Bloom Filter Indexing (Default Index in Hudi)

Best for searching records by primary key (id) without scanning all files.

Hudi maintains a Bloom Filter index at the file level, helping quickly check whether a record exists in a file before scanning it.

How to Enable Bloom Filter Indexing in Hudi

CREATE TABLE hudi_sales (
    id INT,
    amount DOUBLE,
    date STRING
) USING HUDI 
TBLPROPERTIES ('hoodie.index.type' = 'BLOOM');

Creates a Bloom Filter index on the id column.

Example Query That Runs Faster With Bloom Indexing

SELECT * FROM hudi_sales WHERE id = 1001;

Instead of scanning all files, Hudi quickly finds the relevant file using Bloom Filters.

2️⃣ Simple Index (Hudi Default)

Best for fast updates and deletes on transactional datasets.

  • Hudi maintains a mapping of record keys to file locations (faster than scanning all files).

  • Works well for small to medium datasets.

How to Enable Simple Indexing

ALTER TABLE hudi_sales SET TBLPROPERTIES ('hoodie.index.type' = 'SIMPLE');

Example Query That Runs Faster

MERGE INTO hudi_sales AS target
USING updates AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET target.amount = source.amount;

Performs fast upserts using the simple index.

3️⃣ Bucket Indexing (For Large-Scale Data)

Best for large-scale data where primary keys are evenly distributed.
✔️ Divides data into fixed-size buckets, reducing lookup time.

How to Enable Bucket Indexing

ALTER TABLE hudi_sales SET TBLPROPERTIES (
    'hoodie.index.type' = 'BUCKET',
    'hoodie.bucket.index.numBuckets' = '4'
);

Example Query That Runs Faster

SELECT * FROM hudi_sales WHERE id = 5000;

Hudi searches only in the relevant bucket instead of scanning all files.


4️⃣ Global Index (For Multi-Partition Lookups)

Best for searching across multiple partitions in large datasets.
Unlike other indexes, Global Index does not restrict searches to a single partition.

How to Enable Global Indexing

ALTER TABLE hudi_sales SET TBLPROPERTIES ('hoodie.index.type' = 'GLOBAL_BLOOM');

Example Query That Runs Faster

SELECT * FROM hudi_sales WHERE id = 7500;

Finds id = 7500 across multiple partitions without scanning all files.

5️⃣ HBase Indexing (For Large-Scale Upserts)

Best for OLTP-like workloads where upserts are frequent.
✔️ Maintains a mapping of record keys in Apache HBase for ultra-fast lookups.

How to Enable HBase Indexing

ALTER TABLE hudi_sales SET TBLPROPERTIES ('hoodie.index.type' = 'HBASE');

Example Query That Runs Faster

SELECT * FROM hudi_sales WHERE id = 9999;

Uses HBase for fast upserts and searches instead of scanning all files.


Summary: Apache Hudi Indexing Types

Index TypeBest ForHow to Enable?
Bloom Filter Indexing (Default)Fast lookups on primary keys'hoodie.index.type' = 'BLOOM'
Simple IndexingFast updates and deletes'hoodie.index.type' = 'SIMPLE'
Bucket IndexingLarge-scale data with evenly distributed keys'hoodie.index.type' = 'BUCKET' + numBuckets=4
Global IndexingSearching across multiple partitions'hoodie.index.type' = 'GLOBAL_BLOOM'
HBase IndexingUltra-fast upserts with external storage'hoodie.index.type' = 'HBASE'

5️⃣ Materialized Views (Query Indexing Alternative)

Materialized Views precompute results for faster retrieval.

Creating a Materialized View

CREATE MATERIALIZED VIEW mv_sales 
AS SELECT id, SUM(amount) AS total_sales FROM sales_data GROUP BY id;

Querying this view is faster than running the aggregation repeatedly.


Final Summary: Different Indexing Methods in Spark SQL

Index TypeBest ForHow to Use in Spark SQL?
Bucketing (Partitioning Alternative)Faster Joins & AggregationsCLUSTERED BY (id) INTO 4 BUCKETS;
Z-Ordering (Delta Lake)Multi-Column FilteringOPTIMIZE ZORDER BY (date, amount);
Hidden Partitioning (Iceberg)Auto-Partitioning & IndexingNo explicit indexing required
Bloom Index (Hudi)Fast Record LookupsTBLPROPERTIES ('hoodie.index.type' = 'BLOOM');
Materialized ViewsPrecomputed Query ResultsCREATE MATERIALIZED VIEW ...

Choose the right indexing strategy based on your dataset and query patterns.


1.6 Partitions (For Large Tables)

Partitioning in Spark SQL improves query performance by logically dividing large datasets into smaller, manageable pieces. Instead of scanning the entire dataset, Spark only reads the relevant partitions, making queries much faster.

A) Single-Column Partitioning

CREATE TABLE sales_partitioned (
    id INT,
    amount DOUBLE,
    date STRING
) USING PARQUET 
PARTITIONED BY (date);

This creates a table partitioned by the date column.
Each unique date will create a separate partition folder.

Query That Runs Faster With Partitioning

SELECT * FROM sales_partitioned WHERE date = '2024-02-01';

Spark reads only the date=2024-02-01 partition instead of scanning all data.

B) Multi-Column Partitioning

For large datasets, multi-column partitioning helps with complex queries.

CREATE TABLE sales_multi_partitioned (
    id INT,
    amount DOUBLE,
    country STRING,
    year INT
) USING PARQUET 
PARTITIONED BY (year, country);

This creates partitions based on year and country.
Data is stored in directories like:

/data/sales/year=2023/country=USA/part-0001.parquet  
/data/sales/year=2023/country=India/part-0002.parquet

Query That Runs Faster With Multi-Column Partitioning

SELECT * FROM sales_multi_partitioned WHERE year = 2023 AND country = 'USA';

Spark scans only /year=2023/country=USA/ instead of the entire table.

3️⃣ Dynamic vs. Static Partitioning

Spark supports both Static and Dynamic Partitioning when inserting data.

A) Static Partitioning

You specify partition values manually when inserting data.

INSERT INTO sales_partitioned PARTITION(date='2024-02-01') 
SELECT id, amount FROM sales_data WHERE date = '2024-02-01';

Data is inserted only into the date=2024-02-01 partition.

B) Dynamic Partitioning

  • Spark automatically detects partition values while inserting data.

  • Useful for bulk data loading.

SET spark.sql.sources.partitionOverwriteMode = DYNAMIC;
INSERT INTO sales_partitioned 
SELECT id, amount, date FROM sales_data;

Spark automatically creates partitions based on date values.

4️⃣ Optimizing Partitioned Queries

Partitioning is effective only when queries use filters on partitioned columns.

A) Enabling Partition Pruning

Partition pruning ensures Spark only scans relevant partitions.

SELECT * FROM sales_partitioned WHERE date >= '2024-01-01';

Spark reads only partitions where date >= 2024-01-01 instead of full table scans.

B) Optimizing Query Performance With Partition Indexing

For Delta Tables, use ZORDER BY for better performance.

OPTIMIZE sales_partitioned ZORDER BY (amount);

Z-Ordering clusters similar data together inside partitions for faster queries.

Summary: Creating & Using Partitions in Spark SQL

FeatureWhat It Does?Example Query
Single-Column PartitioningCreates partitions by one columnPARTITIONED BY (date);
Multi-Column PartitioningCreates partitions based on multiple columnsPARTITIONED BY (year, country);
Static PartitioningManually insert data into a specific partitionPARTITION (date='2024-02-01')
Dynamic PartitioningAutomatically detects partitions when inserting dataINSERT INTO sales_partitioned SELECT ...
Partition PruningSkips unnecessary partitions for faster queriesWHERE date = '2024-02-01'
Z-Ordering (Delta Optimization)Optimizes query performance inside partitionsOPTIMIZE sales_partitioned ZORDER BY (amount);

1.7 Buckets (Cluster Data for Performance)

Purpose: Improve query speed by distributing data into equal-sized buckets.

Types of Bucketing in Spark SQL

1️⃣ Hash-Based Bucketing (Default)

  • Uses a hash function to distribute data across buckets.

  • Ensures even distribution of data across all buckets.

  • Best for: Faster joins, lookups, and aggregations on high-cardinality columns.

Example: Creating a Hash-Based Bucketed Table

CREATE TABLE sales_bucketed (
    id INT,
    amount DOUBLE
) USING PARQUET 
CLUSTERED BY (id) INTO 4 BUCKETS;

Spark creates 4 buckets based on the hash of id.

Query That Benefits From Hash Bucketing

SELECT * FROM sales_bucketed WHERE id = 1001;

Spark directly looks in the relevant bucket instead of scanning all data.

2️⃣ Range-Based Bucketing

  • Divides data into buckets based on a range of values (instead of hashing).

  • Ensures data locality, improving query performance.

  • Best for: Queries involving range filters (BETWEEN, ORDER BY).

Example: Creating a Range-Based Bucketed Table

CREATE TABLE salary_bucketed (
    employee_id INT,
    salary DOUBLE
) USING PARQUET 
CLUSTERED BY (salary) INTO 5 BUCKETS;

Spark distributes salaries into 5 buckets based on defined salary ranges.

Query That Benefits From Range Bucketing

SELECT * FROM salary_bucketed WHERE salary BETWEEN 5000 AND 10000;

Spark only scans the relevant buckets instead of all rows.

3️⃣ Multi-Column Bucketing (Composite Buckets)

  • Uses multiple columns to distribute data into buckets.

  • Best for: Multi-key joins, filtering, and grouped queries.

Example: Creating a Multi-Column Bucketed Table

CREATE TABLE customer_bucketed (
    customer_id INT,
    region STRING,
    amount DOUBLE
) USING PARQUET 
CLUSTERED BY (customer_id, region) INTO 6 BUCKETS;

Data is bucketed first by customer_id, then by region.

Query That Benefits From Multi-Column Bucketing

SELECT * FROM customer_bucketed WHERE customer_id = 101 AND region = 'US';

Spark quickly finds the bucket with both conditions, reducing scan time.

4️⃣ Bucketed Tables with Sorted Data (Hybrid Bucketing)

  • Combines bucketing and sorting for further optimization.

  • Best for: Faster aggregation & range queries (GROUP BY, ORDER BY).

Example: Creating a Bucketed Table With Sorted Data

CREATE TABLE sales_sorted_bucketed (
    id INT,
    amount DOUBLE,
    date STRING
) USING PARQUET 
CLUSTERED BY (id) SORTED BY (date) INTO 4 BUCKETS;

Buckets data by id and sorts within each bucket by date.

Query That Benefits From Sorted Bucketing

SELECT * FROM sales_sorted_bucketed WHERE id = 500 ORDER BY date;

Spark avoids scanning unnecessary buckets and retrieves sorted data quickly.

Optimizing Queries on Bucketed Tables

Enable bucket pruning to avoid unnecessary scans

SET spark.sql.bucketing.enabled = true;

Join two bucketed tables efficiently

SELECT * 
FROM sales_bucketed s 
JOIN customers_bucketed c 
ON s.id = c.customer_id;

This avoids shuffle operations, making joins faster.

Summary: Types of Bucketing in Spark SQL

Bucket TypeBest ForExample
Hash-Based BucketingJoins & exact match lookupsCLUSTERED BY (id) INTO 4 BUCKETS;
Range-Based BucketingRange queries (BETWEEN, ORDER BY)CLUSTERED BY (salary) INTO 5 BUCKETS;
Multi-Column BucketingOptimized multi-key lookupsCLUSTERED BY (customer_id, region) INTO 6 BUCKETS;
Sorted BucketingFaster aggregations & range queriesCLUSTERED BY (id) SORTED BY (date) INTO 4 BUCKETS;

In Delta & Iceberg, indexing helps Spark find data faster without scanning the entire table.


1.8 Indexing in Delta Lake

Delta Lake supports multiple types of indexing, including Z-Ordering, Bloom Filters, and Secondary Indexing.

A) Z-Ordering (Multi-Column Sorting for Faster Queries)

🔹 Best for range queries & multi-column filtering
🔹 Example Use Case: Finding transactions by date and amount quickly.

Creating a Z-Ordered Index in Delta

OPTIMIZE delta_sales ZORDER BY (date, amount);

✔️ Reorders data inside Delta files to make queries like this faster:

SELECT * FROM delta_sales WHERE date > '2024-01-01' AND amount > 5000;

Instead of scanning all files, Spark reads only the relevant ones.

B) Bloom Filter Indexing (Fast Exact Match Lookups)

🔹 Best for finding unique values in large datasets (e.g., customer_id, email).
🔹 Example Use Case: Checking if a customer exists in a billion-record table.

Enabling Bloom Filter Indexing in Delta

ALTER TABLE delta_sales 
SET TBLPROPERTIES ('delta.bloomFilter.columns' = 'customer_id');

✔️ Optimized for queries like:

SELECT * FROM delta_sales WHERE customer_id = 'CUST1234';

Delta skips unnecessary file scans & finds the exact match faster.

C) Secondary Indexing in Delta Lake

🔹 Best for creating indexes on frequently searched columns.
🔹 Example Use Case: Speeding up product_category searches in e-commerce data.

Enabling Secondary Indexing in Delta
1️⃣ Install Hyperspace for Spark (An Open-Source Secondary Indexing Tool)

from hyperspace import Hyperspace
hs = Hyperspace(spark)

2️⃣ Create a Secondary Index on product_category

hs.createIndex(
    deltaTable=df, 
    indexName="product_category_index",
    indexedColumns=["product_category"]
)

✔️ Query That Runs Faster With Secondary Index

SELECT * FROM delta_sales WHERE product_category = 'Electronics';

Instead of scanning the entire Delta table, Spark reads the prebuilt index.

2️⃣ Indexing in Apache Iceberg

Iceberg automatically indexes partitions & supports secondary indexing for complex queries.

A) Hidden Partition Indexing (Auto-Partitioning)

🔹 Best for partitioned tables (e.g., date or region).
🔹 Example Use Case: Speeding up queries on year partitions.

Creating an Iceberg Table With Hidden Partitioning

CREATE TABLE iceberg_sales (
    id INT,
    amount DOUBLE,
    year INT
) USING ICEBERG;

✔️ Query That Runs Faster With Auto-Partition Indexing

SELECT * FROM iceberg_sales WHERE year = 2024;

Only scans the year=2024 partition instead of all data.

B) Bloom Filter Indexing in Iceberg

🔹 Best for finding records in large datasets (e.g., searching for a specific email in user logs).
🔹 Example Use Case: Quickly checking if a transaction exists by transaction_id.

Enabling Bloom Filters in Iceberg

ALTER TABLE iceberg_sales SET TBLPROPERTIES (
    'write.metadata.bloom-filter-columns'='transaction_id'
);

Query That Runs Faster

SELECT * FROM iceberg_sales WHERE transaction_id = 'TXN789123';

Quickly finds transactions instead of scanning all records.

C) Lucene-Based Secondary Indexing (For Text & Complex Queries)

🔹 Best for full-text search (e.g., finding product_description containing "wireless").
🔹 Example Use Case: Searching large Iceberg tables for keyword matches.

Using Lucene Indexing in Iceberg

CREATE INDEX product_desc_index ON iceberg_sales (product_description)
USING LUCENE;

Optimized for queries like:

SELECT * FROM iceberg_sales WHERE product_description LIKE '%wireless%';

Instead of scanning billions of rows, Lucene finds matches instantly.

Summary: Delta vs. Iceberg Indexing

Index TypeDelta LakeApache IcebergBest For
Z-Ordering✅ Supported (OPTIMIZE ZORDER BY)❌ Not AvailableMulti-column filtering
Hidden Partition Indexing❌ Not Automatic (Must PARTITION BY)✅ Built-In (No Explicit Partitioning)Partitioned Queries
Bloom Filter Indexingdelta.bloomFilter.columnswrite.metadata.bloom-filter-columnsFast exact match lookups
Secondary IndexingHyperspace (External Tool)Lucene (Experimental)Non-primary key lookups
Text Search Indexing❌ Not Built-In (External Tools Needed)Lucene (Full-Text Search)Searching large text fields

Choose Delta Lake if you need ACID transactions & Spark-native optimizations.
Choose Iceberg if you need automatic partitioning & text-based indexing.

Real-World Use Cases

Use CaseBest Indexing TypeTable Format
Speeding up customer_id lookupsBloom Filter IndexingDelta & Iceberg
Faster filtering on date & amountZ-OrderingDelta
Searching for exact product_idSecondary IndexDelta & Iceberg
Text search in product_descriptionLucene IndexIceberg

1.10 Temporal Tables & Streaming Tables in Spark SQL

Temporal Tables and Streaming Tables are used in real-time analytics to track changes, analyze historical data, and process continuous data streams.


1️⃣ Temporal Tables in Spark SQL

Temporal Tables store historical versions of data, allowing time travel (querying past data) and slowly changing dimensions (SCDs).

A) Delta Lake Time Travel (Version-Based Temporal Tables)

Best for: Auditing, historical analysis, undo operations.
Automatically stores past versions of data.
Uses VERSION AS OF or TIMESTAMP AS OF.

Creating a Delta Table With Time Travel

CREATE TABLE delta_sales (
    id INT,
    amount DOUBLE,
    date STRING
) USING DELTA;

Querying a Previous Version

SELECT * FROM delta_sales VERSION AS OF 5;

Querying Data at a Specific Timestamp

SELECT * FROM delta_sales TIMESTAMP AS OF '2024-02-01 10:00:00';

This helps analyze how data looked at a past point in time.


B) Slowly Changing Dimensions (SCD) Type 2 Temporal Table

Best for: Tracking changes in customer, product, or employee records over time.
Each change creates a new row with start_date & end_date.

Creating an SCD Type 2 Table

CREATE TABLE customer_history (
    customer_id INT,
    name STRING,
    status STRING,
    start_date TIMESTAMP,
    end_date TIMESTAMP
) USING DELTA;

Inserting a New Record

INSERT INTO customer_history 
VALUES (1, 'John Doe', 'Active', '2024-02-01', NULL);

Updating a Record (Closing Old Version & Adding New)

UPDATE customer_history 
SET end_date = '2024-03-01' 
WHERE customer_id = 1 AND end_date IS NULL;

INSERT INTO customer_history 
VALUES (1, 'John Doe', 'Inactive', '2024-03-01', NULL);

Now, the table maintains a full history of changes.

2️⃣ Streaming Tables in Spark SQL (Real-Time Data Processing)

Streaming Tables process continuous data streams from sources like Kafka, Event Hubs, or Cloud Storage.


A) Creating a Streaming Table From Kafka

Best for: Processing event streams, logs, user activity tracking.
Reads data continuously without stopping.

Creating a Streaming Table (Kafka Source)

CREATE STREAMING TABLE kafka_sales_stream (
    id INT,
    amount DOUBLE,
    event_time TIMESTAMP
) USING KAFKA 
OPTIONS (
    "kafka.bootstrap.servers" = "localhost:9092",
    "subscribe" = "sales_topic"
);

Querying the Streaming Table in Real-Time

SELECT * FROM kafka_sales_stream;

This continuously pulls new records as they arrive in Kafka.

B) Creating a Streaming Table From Delta Lake

Best for: Processing real-time changes in a Delta Table.
Useful for CDC (Change Data Capture) & Live Dashboards.

Creating a Streaming Table From a Delta Source

CREATE STREAMING TABLE live_sales_stream
USING DELTA
LOCATION '/mnt/delta/sales';

Querying the Streaming Delta Table

SELECT * FROM live_sales_stream;

Updates as soon as new data is written to /mnt/delta/sales.

C) Creating a Streaming Table in Iceberg

Best for: Incremental processing in Iceberg tables.

Creating a Streaming Table From Iceberg

CREATE STREAMING TABLE iceberg_sales_stream
USING ICEBERG
LOCATION 's3://my-bucket/iceberg_sales';

Querying Iceberg Streaming Data

SELECT * FROM iceberg_sales_stream;

Supports automatic snapshot-based incremental queries.

Summary: Temporal vs. Streaming Tables in Spark SQL

FeatureDelta Lake Temporal TablesStreaming Tables (Delta, Kafka, Iceberg)
Best For?Historical data tracking, SCD, Time TravelReal-time processing, event-driven data
Time Travel Query?VERSION AS OF, TIMESTAMP AS OF❌ Not Supported
Slowly Changing Dimensions?✅ Yes (start_date, end_date)❌ No
Real-Time Updates?❌ No✅ Yes
Example Source?Historical customer recordsKafka, Delta, Iceberg, Streaming APIs

Use Temporal Tables for historical analysis & tracking changes.
Use Streaming Tables for real-time data ingestion & event processing.

1.11 Materialized Views

Purpose: Cache precomputed results for fast queries.

A) Creating a Materialized View

CREATE MATERIALIZED VIEW mv_sales 
AS SELECT id, SUM(amount) AS total_sales FROM sales_data GROUP BY id;

🔹 Improves performance for frequent aggregations.

1.12 Schema & Metadata Objects

Purpose: Manage structured data definitions.

A) Show Schema

DESCRIBE TABLE sales_data;

B) Alter Schema

ALTER TABLE sales_data ADD COLUMNS (category STRING);

Summary: Latest Objects You Can Create in Spark SQL

This table summarizes all Spark SQL object types, their creation methods, and their purpose based on the latest features.

🔹 1️⃣ Databases (Catalogs & Namespaces)

Object TypeExamplePurpose
DatabaseCREATE DATABASE sales_db;Organize structured data like schemas in a database.

🔹 2️⃣ Tables (Managed, External, Temporary, Delta, Iceberg, Hudi)

Table TypeExamplePurpose
Managed TableCREATE TABLE sales_data (id INT, amount DOUBLE) USING PARQUET;Spark manages storage and metadata.
External TableCREATE EXTERNAL TABLE sales_external (id INT, amount DOUBLE) USING PARQUET LOCATION 's3://my-bucket/sales/';Stores schema in Spark but data in external storage (S3, ADLS, HDFS).
Temporary TableCREATE TEMP VIEW temp_sales AS SELECT * FROM sales_data;Exists only in session memory, disappears after session ends.
Global Temporary ViewCREATE GLOBAL TEMP VIEW global_sales AS SELECT * FROM sales_data;Accessible across sessions until all are closed.
Delta TableCREATE TABLE delta_sales USING DELTA AS SELECT * FROM parquet.\/mnt/datalake/sales/`;`Provides ACID transactions, schema evolution, time travel.
Iceberg TableCREATE TABLE iceberg_sales USING ICEBERG AS SELECT * FROM parquet.\/mnt/datalake/sales/`;`Supports hidden partitioning, time travel, and schema evolution.
Hudi TableCREATE TABLE hudi_sales USING HUDI AS SELECT * FROM parquet.\/mnt/datalake/sales/`;`Supports upserts, incremental queries, and real-time ingestion.

🔹 3️⃣ Views (Logical Tables)

View TypeExamplePurpose
Permanent ViewCREATE VIEW sales_view AS SELECT id, amount FROM sales_data WHERE amount > 1000;Saves a query result for reuse across sessions.
Temporary ViewCREATE TEMP VIEW temp_view AS SELECT * FROM sales_data WHERE date > '2024-01-01';Exists only within the current session.

🔹 4️⃣ Functions (User-Defined Functions - UDFs)

UDF TypeExamplePurpose
Scalar UDFCREATE FUNCTION to_upper AS 'org.apache.spark.sql.functions.upper';Applies transformation to single row values (e.g., converts text to uppercase).
Aggregate UDFCREATE FUNCTION custom_sum AS 'org.apache.spark.sql.functions.sum';Aggregates multiple rows into a single value (e.g., SUM, AVG).
Table UDF (UDTF)CREATE FUNCTION split_string_udtf AS 'com.example.spark.udtf.SplitString';Converts one row → multiple rows (splitting strings into rows).
Pandas UDF (Vectorized UDF)@pandas_udf("double")Applies batch-based transformations using Pandas for faster execution.

🔹 5️⃣ Indexes (Query Optimization)

Index TypeExamplePurpose
Bucketing (Indexing Alternative)CREATE TABLE sales_bucketed CLUSTERED BY (id) INTO 4 BUCKETS;Groups data into fixed partitions for faster joins & lookups.
Z-Ordering (Delta Lake)OPTIMIZE delta_sales ZORDER BY (date, amount);Optimizes multi-column filtering by minimizing file scanning.
Hidden Partitioning (Iceberg)CREATE TABLE iceberg_sales USING ICEBERG;Auto-indexing without explicit partitions.
Bloom Filter Indexing (Hudi)ALTER TABLE hudi_sales SET TBLPROPERTIES ('hoodie.index.type' = 'BLOOM');Fast lookups on high-cardinality columns like id.
Materialized Views (Query Indexing Alternative)CREATE MATERIALIZED VIEW mv_sales AS SELECT id, SUM(amount) FROM sales_data GROUP BY id;Precomputes results for faster querying.

🔹 6️⃣ Partitions (Large Table Optimization)

Partition TypeExamplePurpose
Single-Column PartitioningCREATE TABLE sales_partitioned PARTITIONED BY (date);Splits data into folders based on a single column.
Multi-Column PartitioningCREATE TABLE sales_multi_partitioned PARTITIONED BY (year, country);Partitions by multiple columns for complex queries.
Static PartitioningINSERT INTO sales_partitioned PARTITION(date='2024-02-01') SELECT id, amount FROM sales_data;Manually assigns partition values when inserting data.
Dynamic PartitioningINSERT INTO sales_partitioned SELECT id, amount, date FROM sales_data;Automatically detects partitions while inserting data.

🔹 7️⃣ Buckets (Cluster Data for Performance)

Bucket TypeExamplePurpose
Hash-Based BucketingCREATE TABLE sales_bucketed CLUSTERED BY (id) INTO 4 BUCKETS;Speeds up joins & lookups by hashing id.
Range-Based BucketingCREATE TABLE salary_bucketed CLUSTERED BY (salary) INTO 5 BUCKETS;Optimizes range queries (BETWEEN, ORDER BY).
Multi-Column BucketingCREATE TABLE customer_bucketed CLUSTERED BY (customer_id, region) INTO 6 BUCKETS;Best for multi-key joins & grouped queries.
Sorted BucketingCREATE TABLE sales_sorted_bucketed CLUSTERED BY (id) SORTED BY (date) INTO 4 BUCKETS;Optimizes range queries & aggregations.

🔹 8️⃣ Materialized Views (Precomputed Query Caching)

Materialized View TypeExamplePurpose
Standard Materialized ViewCREATE MATERIALIZED VIEW mv_sales AS SELECT id, SUM(amount) FROM sales_data GROUP BY id;Stores precomputed query results for faster performance.
Incremental Refresh Materialized ViewCREATE MATERIALIZED VIEW mv_incremental_sales USING DELTA LIVE TABLE AS SELECT id, SUM(amount) FROM live_delta_sales;Automatically updates as new data arrives.
Partitioned Materialized ViewCREATE MATERIALIZED VIEW mv_partitioned_sales PARTITIONED BY (year, region) AS SELECT year, region, SUM(amount) FROM sales_data GROUP BY year, region;Optimized for filtering by partitioned columns.
Auto-Refresh Materialized View (Databricks)ALTER MATERIALIZED VIEW mv_sales SET TBLPROPERTIES ('delta.autoOptimize.autoCompact' = 'true');Automatically refreshes on a schedule.