Understanding Databricks: The Unified Data Platform for Modern Analytics
What is Databricks, and Why Do We Need It?
Databricks is a unified analytics platform designed to simplify big data processing, AI/ML workloads, and real-time analytics. Built on Apache Spark, it bridges the gap between Data Lakes and Data Warehouses, enabling organizations to store, process, and analyze massive datasets seamlessly.
Why Do We Need Databricks?
- Scalability: Processes massive data volumes efficiently.
- Unified Platform: Combines batch, streaming, and ML workflows.
- Cost-Effective: Optimizes cloud compute and storage.
- Performance: Uses Photon Engine for faster execution.
- Collaboration: Provides interactive notebooks for data teams.
- Governance: Ensures security and compliance with Unity Catalog.
Who Are the Main Users of Databricks?
- Data Engineers — Build ETL pipelines and process big data.
- Data Scientists — Train and deploy ML models.
- Data Analysts — Query and visualize data using SQL.
- Business Analysts — Generate reports and insights.
- DevOps & IT Teams — Manage infrastructure and governance.
Databricks Architecture: Control Plane & Data Plane
Databricks operates on a two-layered architecture to separate compute and storage, improving security and scalability.
1. Control Plane
- Managed by Databricks.
- Handles UI, job scheduling, cluster management, and notebooks.
- Stores metadata, logs, and query history.
- No customer data is stored here.
2. Data Plane
- Managed by your cloud provider (AWS, Azure, GCP).
- Stores and processes actual data in cloud storage (S3, ADLS, GCS).
- Runs compute clusters for Spark jobs and ML models.
How They Work Together:
- The Control Plane orchestrates workloads, while the Data Plane executes them.
- This separation ensures security (since Databricks never stores raw data) and scalability (as compute and storage are independently managed).
Core Components of Databricks
1. Workspace
- A collaborative environment where teams organize notebooks, jobs, and data assets.
- Supports version control, folder structure, and user management.
2. Notebook
- Interactive coding environment supporting Python, SQL, Scala, and R.
- Used for ETL, ML, analytics, and visualization.
3. Compute (Clusters)
Clusters are the backbone of Databricks for running Spark jobs and ML workloads.
Types of Clusters:
- All-Purpose Clusters — Interactive use by multiple users.
- Job Clusters — Temporary clusters for scheduled jobs.
- SQL Warehouses — Optimized for SQL workloads and BI tools.
4. Access Modes
- Single User: Dedicated to one user.
- Shared: Multiple users can run workloads.
- No Isolation: Full cluster access without user separation.
5. Node Count & Autoscaling
- Node Count: Defines the number of virtual machines (VMs) in the cluster.
- Autoscaling: Dynamically adjusts node count based on workload demand.
- Auto Shutdown: Shuts down inactive clusters to save costs.
Governance & Access Management in Databricks
1. Unity Catalog
- Centralized data governance for managing metadata, access, and lineage.
- Provides fine-grained access control across workspaces.
- Supports multi-cloud governance.
2. Metastore
- Stores metadata for databases, tables, and views.
- Integrated with Unity Catalog for enhanced governance.
3. Catalog
- Logical structure to organize schemas (databases) and tables.
- Provides a hierarchical organization of data assets.
4. Role-Based Access Control (RBAC)
- Ensures secure access management with predefined roles.
- Assigns permissions at catalog, schema, table, or column levels.
Job Scheduling & Workflow Management
1. Jobs in Databricks
- Automates ETL, ML, and reporting tasks.
- Can run notebooks, Python scripts, JARs, or SQL queries.
2. Scheduling Types
- On-Demand — Triggered manually.
- Scheduled — Runs at fixed intervals.
- Triggered — Starts based on external events.
3. Execution Modes
- Single Task Job — Runs one script or notebook.
- Multi-Task Job — Runs multiple tasks sequentially or in parallel.
4. Workflows in Databricks
- Allows orchestrating complex pipelines using Jobs.
- Supports dependencies between tasks.
- Can integrate with external schedulers like Airflow
Apache Spark Architecture in Databricks
Databricks is powered by Apache Spark, a distributed computing framework designed for large-scale data processing.
1. Basic Components
- Driver: The central coordinator that schedules tasks.
- Executor Nodes: Distributed workers executing tasks in parallel.
- Cluster Manager: Allocates resources and manages nodes (Databricks manages this internally).
2. Spark Job Execution Flow
- The Driver breaks the job into stages and tasks.
- The Cluster Manager assigns resources to the Executor Nodes.
- Each Executor processes tasks in parallel across RDDs (Resilient Distributed Datasets).
- Results are collected and sent back to the Driver.
3. Why Spark Works Well in Databricks?
- Optimized Cluster Management: Databricks automates provisioning and scaling.
- Photon Engine: Enhances Spark’s performance with advanced query execution.
- Delta Lake Integration: Ensures reliability and ACID transactions.
- Easy Debugging & Monitoring: UI provides job execution tracking and logs.
Some common topic in databricks
1. Data Sources
Databricks supports structured, semi-structured, and unstructured data sources.
Example: Connecting to an S3 Parquet File
df = spark.read.format("parquet").load("s3://your-bucket/path/file.parquet")
df.show()
2. DataFrame vs RDD in PySpark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()
df = spark.createDataFrame([(1, "Alice"), (2, "Bob")], ["id", "name"])
rdd = df.rdd
df.show()
print(rdd.collect())
3. Lazy Evaluation & Execution
Spark operations are not executed immediately. They are lazy, meaning they wait until an action (e.g., count()
, collect()
) is triggered.
Example of Lazy Execution:
df = spark.read.csv("data.csv", header=True)
df.select("column1") # This transformation is not executed until an action occurs.
df.show() # Action triggers execution.
4. Transformations (Narrow & Wide)
- Narrow Transformations: Each partition depends on a single parent (e.g.,
map
,filter
). Spark does not need to move data between partitions. Executed in-memory without expensive network operations.
rdd = spark.sparkContext.parallelize([1, 2, 3, 4])
mapped_rdd = rdd.map(lambda x: x * 2) # Each element is transformed independently
print(mapped_rdd.collect()) # Output: [2, 4, 6, 8]
- Wide Transformations: The output partition depends on multiple input partitions. Data is redistributed across partitions over the network. Requires shuffle (repartitioning & sorting). Example Operations:
groupByKey()
,reduceByKey()
,join()
,distinct()
rdd = spark.sparkContext.parallelize([("a", 1), ("b", 2), ("a", 3)])
grouped_rdd = rdd.groupByKey()
print([(k, list(v)) for k, v in grouped_rdd.collect()])
# Output: [('a', [1, 3]), ('b', [2])]
Here are the some commonly used Databricks transformation functions with examples
5. Actions
In Spark, Actions are operations that trigger execution and return results to the driver or write output to external storage. Unlike Transformations (which are lazy), Actions force computation of the RDD or DataFrame.
Example: Actions vs. Transformations
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
# Transformation (Lazy)
mapped_rdd = rdd.map(lambda x: x * 2) # No computation yet
# Action (Triggers Execution)
result = mapped_rdd.collect() # Now Spark executes map()
print(result) # Output: [2, 4, 6, 8, 10]
6. Caching & Persistence
In Apache Spark, cache()
and persist()
help optimize repeated computations by storing RDDs or DataFrames in memory (or disk) for faster access.
- Cache (
cache()
): Stores in-memory for faster access. Data is cached only when an action is triggered.MEMORY_ONLY
(stores data in RAM, recomputes if lost).
df = spark.range(1, 1000000) # Creating a DataFrame
df_cached = df.cache() # Cache the DataFrame in memory
df_cached.count() # Triggers caching
df_cached.show() # Faster execution since data is now cached
- Persist (
persist(level)
): More flexible thancache()
. Allows different storage levels (memory, disk, or both). Stores in memory/disk with storage level.
from pyspark import StorageLevel
df_persisted = df.persist(StorageLevel.MEMORY_AND_DISK) # Stores in both memory and disk
df_persisted.count() # Triggers persistence
df_persisted.show() # Faster access, even if memory is full
7. User-Defined Functions (UDFs)
UDFs allow you to extend Spark’s functionality by defining custom functions in Python and applying them to DataFrames.
1️⃣ Simple Python Function
def square(x):
return x * x
2️⃣ Convert to a Spark UDF
Use udf()
to register a function as a Spark UDF.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
square_udf = udf(square, IntegerType()) # Define UDF with return type
3️⃣ Apply UDF to a DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("UDF Example").getOrCreate()
# Sample DataFrame
df = spark.createDataFrame([(1,), (2,), (3,)], ["num"])
# Apply UDF
df.withColumn("square", square_udf(df.num)).show()
+---+------+
|num|square|
+---+------+
| 1| 1|
| 2| 4|
| 3| 9|
+---+------+
8. Delta Table
A Delta Table is a structured table format built on Apache Parquet, with ACID transactions and schema enforcement.
✅ Key Features:
- ACID Transactions (ensures consistency)
- Schema Enforcement & Evolution
- Time Travel (query historical data)
- Data Versioning
- Efficient Upserts, Deletes (MERGE INTO)
Creating a Delta Table
CREATE TABLE my_delta_table (
id INT,
name STRING
) USING DELTA;
Writing Data to a Delta Table in PySpark
df.write.format("delta").mode("overwrite").saveAsTable("my_delta_table")
9. Delta Live Table (DLT)
Delta Live Tables automate data pipeline management using declarative definitions.
✅ Key Features:
- Automated ETL with Quality Checks
- Streaming + Batch Processing in One Table
- Data Quality Rules (Expectations)
- Automated Schema Evolution & Pipeline Orchestration
Creating a Delta Live Table
import dlt
from pyspark.sql.functions import col
@dlt.table(
comment="This is an automatically managed Delta Live Table."
)
def bronze_table():
return spark.readStream.format("json").load("/mnt/raw_data")
📌 DLT vs. Delta Table?
🔹 Delta Table → You manage it manually
🔹 DLT → Databricks automates pipeline management
10. Managed vs. Unmanaged Tables
Managed and Unmanaged Tables define how Databricks stores data.
Managed Table Example
CREATE TABLE managed_table (id INT, name STRING) USING DELTA;
Data is stored in Databricks’ default location.
Unmanaged (External) Table Example
CREATE TABLE unmanaged_table
USING DELTA
LOCATION 's3://my-bucket/delta-table/';
Data remains in S3, even if the table is dropped!
11. Bronze, Silver, and Gold Tables
Databricks organizes Delta Tables into three layers:
Example Workflow:
bronze_df = spark.read.json("s3://raw-data/")
silver_df = bronze_df.filter("status IS NOT NULL")
gold_df = silver_df.groupBy("category").count()
gold_df.write.format("delta").saveAsTable("gold_aggregates")
Delta Lake: Overview & Architecture
Delta Lake is an open-source storage layer built on top of Apache Spark that provides ACID transactions, schema enforcement, and time travel for big data workloads. It is designed to work seamlessly with batch, streaming, and change data capture (CDC) pipelines.
Storage Components
- Data Files: Stored in Parquet format (
.parquet
files). - Transaction Log (
_delta_log/
):
- Contains JSON and checkpoint files to maintain versioning.
- Helps track metadata, schema changes, and operations.
3. Checkpoints (.parquet
inside _delta_log/
):
- Periodically written for fast recovery.
Example Delta Table Structure in Storage
/mnt/delta_table/
├── part-00000-abc123.snappy.parquet
├── part-00001-def456.snappy.parquet
├── _delta_log/
├── 00000000000000000001.json
├── 00000000000000000002.json
├── 00000000000000000003.json
├── 00000000000000000003.checkpoint.parquet
Key Features of Delta Lake
Sources & Targets for Delta Lake
Delta Lake integrates with various sources and targets.
Sources
- Batch: CSV, JSON, Parquet, ORC
- Streaming: Kafka, Kinesis, Event Hub
- CDC (Change Data Capture): MySQL binlog, Debezium, SQL Server CDC
Targets
- Data Lakes: S3, ADLS, HDFS
- Data Warehouses: Snowflake, Redshift, BigQuery
- BI Tools: Power BI, Tableau
End-to-End Example: Batch, Streaming, and CDC in Delta Lake
Let’s implement a Delta Lake pipeline handling batch ingestion, streaming data, and CDC updates.
1️⃣ Batch Data Ingestion
We start by loading batch data into a Delta Table.
from pyspark.sql import SparkSession
# Initialize SparkSession with Delta support
spark = SparkSession.builder \
.appName("DeltaLakeExample") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Load batch data from CSV
df_batch = spark.read.format("csv").option("header", True).load("/mnt/batch_data.csv")
# Write to Delta Table
df_batch.write.format("delta").mode("overwrite").save("/mnt/delta/batch_table")
2️⃣ Stream Processing in Delta Lake
Now, we consume a Kafka stream and write it to Delta Lake.
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define Schema for Kafka Data
schema = StructType([
StructField("user_id", IntegerType(), True),
StructField("event_type", StringType(), True),
StructField("timestamp", StringType(), True)
])
# Read Kafka Stream
df_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "user_events") \
.load()
# Parse JSON Data
df_parsed = df_stream.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")
# Write to Delta Table (Append Mode)
df_parsed.writeStream \
.format("delta") \
.option("checkpointLocation", "/mnt/delta/checkpoints/") \
.outputMode("append") \
.start("/mnt/delta/stream_table")
3️⃣ Change Data Capture (CDC)
We handle updates and deletes from a MySQL binlog.
Step 1: Read CDC Data from MySQL
df_cdc = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://mysql_server/db") \
.option("dbtable", "cdc_table") \
.option("user", "root") \
.option("password", "password") \
.load()
Step 2: Merge CDC Changes into Delta Table
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/mnt/delta/batch_table")
# Perform Merge (Insert, Update, Delete)
delta_table.alias("target").merge(
df_cdc.alias("source"),
"target.user_id = source.user_id"
).whenMatchedUpdate(set={"event_type": "source.event_type"}) \
.whenNotMatchedInsert(values={"user_id": "source.user_id", "event_type": "source.event_type"}) \
.execute()
Additional Features & Best Practices
🛠 1. Time Travel (Historical Queries)
# Retrieve previous version
df_old = spark.read.format("delta").option("versionAsOf", 3).load("/mnt/delta/batch_table")
df_old.show()
🛠 2. Data Compaction (Optimize)
from delta.tables import *
deltaTable = DeltaTable.forPath(spark, "/mnt/delta/batch_table")
deltaTable.optimize().executeCompaction()
🛠 3. Enforcing Schema Evolution
df_new = spark.read.format("json").load("/mnt/new_data.json")
df_new.write.format("delta").mode("append").option("mergeSchema", "true").save("/mnt/delta/batch_table")
Summary
- Delta Lake enhances Parquet with ACID transactions, schema enforcement, and streaming+batch support.
- Storage: Data stored in Parquet, metadata in
_delta_log
. - Batch & Streaming: Unified processing for structured and semi-structured data.
- CDC: Handles inserts, updates, and deletes using MERGE.
- Time Travel allows querying historical versions.
- Optimize & Compaction improves query speed.
Lakehouse Architecture: Overview
A Lakehouse combines the best of Data Lakes (scalability, flexibility) and Data Warehouses (ACID transactions, governance, performance).
Key Features
Storage in a Lakehouse
Lakehouses store data in open formats on cloud storage.
Storage Layers
- Bronze (Raw Data Layer)
- Stores raw, unprocessed data.
- Sources: Logs, APIs, IoT, Databases.
- Format: JSON, CSV, Avro, Parquet.
2. Silver (Cleaned & Normalized Data)
- Schema-enforced, deduplicated, validated data.
- Ready for analytics & reporting.
- Format: Parquet, Delta, Iceberg.
3. Gold (Aggregated & Curated Data)
- Optimized for BI dashboards & ML.
- Pre-aggregated, indexed, query-optimized data.
- Format: Delta, Iceberg, Materialized Views.
Lakehouse Architecture: Sources & Targets
Sources
- Structured: Relational Databases (MySQL, SQL Server, PostgreSQL).
- Semi-Structured: JSON, XML, CSV, API data, Logs.
- Unstructured: Images, Videos, PDFs, IoT data, Sensor data.
- Streaming Sources: Kafka, Kinesis, Event Hub.
Targets
- BI & Reporting: Power BI, Tableau, Looker.
- Machine Learning: Databricks ML, TensorFlow, Spark MLlib.
- Data Warehouses: Snowflake, Redshift, BigQuery.
- Operational Systems: API-driven applications.
Lakehouse Architecture Diagram
┌──────────────────────────────┐
│ Data Sources │
│ (DBs, APIs, Logs, Streams) │
└───────────┬──────────────────┘
│
┌─────────────────────────────────────────┐
│ Ingestion Layer │
│ (Batch: Spark, Glue, ADF; Stream: Kafka) │
└───────────┬────────────────────────────┘
│
┌──────────────────────────────────────────────────────────────┐
│ Storage Layer (Data Lake) │
│ (S3, ADLS, GCS, HDFS - Parquet, Delta) │
│ │
│ Bronze (Raw) ──▶ Silver (Cleaned) ──▶ Gold (Aggregated) │
└───────────┬─────────────────────────────────────────────────┘
│
┌────────────────────────────────────────────────────────┐
│ Processing & Compute Layer │
│ (Databricks, Spark, Snowflake, BigQuery) │
└───────────┬──────────────────────────────────────────┘
│
┌───────────────────────────────────────────────────────────┐
│ Consumption Layer │
│ (BI: Power BI, Tableau, Looker | ML: Databricks ML, MLOps) │
└───────────────────────────────────────────────────────────┘
End-to-End Example of a Lakehouse
1️⃣ Ingest Raw Data (Bronze Layer)
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("LakehousePipeline") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()
# Load raw JSON data (Batch Ingestion)
df_raw = spark.read.json("/mnt/data/raw/sales_data.json")
# Write raw data to Bronze Delta Table
df_raw.write.format("delta").mode("overwrite").save("/mnt/delta/bronze_sales")
2️⃣ Clean & Normalize Data (Silver Layer)
from delta.tables import DeltaTable
from pyspark.sql.functions import col, to_date
# Read Bronze Layer
df_bronze = spark.read.format("delta").load("/mnt/delta/bronze_sales")
# Clean & Transform Data
df_silver = df_bronze.select(
col("order_id").cast("int"),
col("customer_id").cast("int"),
col("order_date").cast("date"),
col("amount").cast("double")
).dropDuplicates()
# Write to Silver Layer
df_silver.write.format("delta").mode("overwrite").save("/mnt/delta/silver_sales")
3️⃣ Aggregate & Optimize for BI (Gold Layer)
# Read Silver Layer
df_silver = spark.read.format("delta").load("/mnt/delta/silver_sales")
# Aggregate Sales by Month
df_gold = df_silver.groupBy(to_date("order_date").alias("month")).sum("amount")
# Write to Gold Layer
df_gold.write.format("delta").mode("overwrite").save("/mnt/delta/gold_sales")
Summary
- Lakehouse blends Data Lake + Warehouse with ACID, schema enforcement, and time travel.
- Storage follows Bronze (raw) → Silver (cleaned) → Gold (aggregated) layers.
- Processing integrates batch, streaming, and CDC workloads.
- Architecture separates storage (S3, ADLS) & compute (Databricks, Snowflake, Spark).
Comparison: Lakehouse vs. Delta Lake in Databricks
Databricks Auto Loader 🚀
Auto Loader is a feature in Databricks that enables incremental data ingestion from cloud storage (S3, ADLS, GCS) into Delta Lake tables. It automatically detects new files and processes them efficiently, making it ideal for batch and streaming ingestion.
🔹 Key Features
🔹 Sources & Targets
🔹 Example: Ingest Data into Delta Table
📌 Step 1: Configure Auto Loader for Streaming
from pyspark.sql.functions import *
from pyspark.sql.types import *
# Define source path (S3, ADLS, GCS)
source_path = "s3://your-bucket/raw-data/"
# Read streaming data
df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json") # Change format if needed
.option("cloudFiles.schemaLocation", "s3://your-bucket/checkpoints/") # Stores schema
.load(source_path)
)
# Write to Delta Table in Append mode
(df.writeStream
.format("delta")
.option("checkpointLocation", "s3://your-bucket/checkpoints/")
.outputMode("append")
.table("bronze_table")
)
📌 Step 2: Load Data in Batch Mode
df = (spark.read
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("header", "true")
.load(source_path)
)
df.write.format("delta").mode("append").saveAsTable("bronze_table")
🔹 Schema Evolution
Enable schema evolution to automatically add new columns:
(df.writeStream
.format("delta")
.option("mergeSchema", "true") # Enable schema evolution
.option("checkpointLocation", "s3://your-bucket/checkpoints/")
.table("bronze_table")
)
Auto Loader can track new, updated, and deleted files efficiently:
- New files → Automatically processed
- Schema changes → Auto Loader updates the schema
- Deletes → Handled using file metadata tracking
- Updates → Merge logic can be implemented with Delta Lake