Skip to main content

How to Profile Spark Jobs After Completion: The Complete Guide to Collecting Metrics on Databricks, EMR, and Dataproc

· 22 min read
Cazpian Engineering
Platform Engineering Team

How to Profile Spark Jobs After Completion

Your Spark job finished. It ran for 2 hours. It used 50 executors. The bill arrived. Now what?

Most teams have no systematic way to answer the obvious follow-up questions: Was 50 executors the right number? Did we waste memory? Was there data skew? Should we use different instance types? They either overprovision "just in case" — burning 2-5x the needed budget — or reactively debug when jobs fail with OOM errors.

The root cause is not a lack of tooling. It is a data collection problem. Spark generates extraordinarily detailed metrics at every level — task, stage, executor, application — but those metrics vanish when the application terminates. If you did not capture them during or immediately after execution, they are gone.

This post solves that problem. We cover every method for collecting Spark profiling metrics, show exactly how to set them up on Databricks, EMR, and Dataproc, and then show you how to turn those metrics into right-sizing decisions with concrete formulas and thresholds.

The Five Metrics That Tell the Full Story

Before diving into collection methods, you need to know which numbers actually matter. Out of the hundreds of metrics Spark tracks, five tell you almost everything about whether your job is right-sized.

1. GC Time Ratio

gc_ratio = jvmGcTime / executorRunTime
RangeDiagnosis
Under 5%Healthy. Memory is sufficient.
5-10%Warning. Memory pressure building. Monitor trend.
Over 10%Critical. Tasks spend significant time garbage collecting instead of computing. Increase spark.executor.memory or reduce spark.executor.cores.

This is the single most important metric for memory sizing. When GC ratio exceeds 10%, every task is effectively paused for garbage collection multiple times — your job is running with one hand tied behind its back.

2. Peak Executor Memory

peak_memory = max(JVMHeapMemory) across all executors and stages

Compare this against spark.executor.memory. If peak usage is under 40% of allocated, you are over-provisioned. If over 90%, you are at OOM risk.

UtilizationDiagnosis
Under 40%Over-provisioned. Drop to a smaller instance type or reduce spark.executor.memory.
40-85%Healthy range.
Over 85%Tight. Add headroom or risk OOM under data volume spikes.

3. Task Duration Distribution (Skew Ratio)

skew_ratio = max(task_duration) / median(task_duration)  -- per stage
RatioDiagnosis
Under 2xNo significant skew.
2-5xModerate skew. Investigate partition keys.
Over 5xSevere skew. One partition is orders of magnitude larger. All executors wait for the straggler. Fix with salting, spark.sql.adaptive.skewJoin.enabled, or repartitioning.

4. Disk Spill Bytes

total_disk_spill = sum(diskBytesSpilled) across all tasks

Any non-zero value means Spark ran out of execution memory for a sort, join, or aggregation and fell back to disk. This is a severe performance signal — disk I/O is 10-100x slower than memory.

ValueDiagnosis
0 bytesHealthy.
Over 0Increase spark.executor.memory, reduce spark.executor.cores (fewer concurrent tasks per executor = more memory per task), or repartition to reduce per-partition data size.

5. CPU Utilization Ratio

cpu_ratio = (executorCpuTime / 1e6) / executorRunTime
RatioDiagnosis
Over 0.70CPU-bound. Healthy — compute is the bottleneck, not I/O.
0.30-0.70Mixed. Some I/O wait or GC overhead.
Under 0.30I/O or GC bound. Tasks spend most of their time waiting — for shuffle data, GC pauses, or external I/O.

Where These Metrics Live in Spark

Spark tracks metrics at four levels. Understanding the hierarchy is essential because different collection methods give you access to different levels.

Application (total wall clock, configuration)
└── Job (duration, result)
└── Stage (aggregated task metrics via accumulables)
└── Task (per-task: executorRunTime, jvmGcTime, shuffleReadBytes,
peakExecutionMemory, diskBytesSpilled, cpuTime)
└── Executor Snapshot (JVMHeapMemory, GC counts, memory pools)

Task-level metrics are the most granular and the most valuable for profiling. They come from SparkListenerTaskEnd events in the event log. Stage-level metrics are aggregated sums across tasks. Application-level metrics are what you see in the Spark UI summary.

The key insight: most platform-managed monitoring (CloudWatch, Cloud Monitoring, Databricks system tables) only captures application-level or YARN-level metrics. For task-level detail — the metrics that actually diagnose problems — you need Spark event logs.

Collection Method 1: Spark Event Logs

This is the universal method that works on every platform. Spark event logs are newline-delimited JSON files containing every SparkListenerEvent — task completions, stage completions, executor metrics, SQL plans, everything.

Enabling Event Logs

# Core settings (add to spark-defaults.conf or spark-submit --conf)
spark.eventLog.enabled true
spark.eventLog.dir s3a://my-bucket/spark-event-logs/
spark.eventLog.compress true
spark.eventLog.compression.codec zstd

# CRITICAL: Enable per-stage peak executor metrics (off by default!)
spark.eventLog.logStageExecutorMetrics true

# Rolling logs (default in Spark 3.0+)
spark.eventLog.rolling.enabled true
spark.eventLog.rolling.maxFileSize 128m

The logStageExecutorMetrics setting is the most commonly missed configuration. Without it, you lose the SparkListenerStageExecutorMetrics events that record peak JVM heap, GC counts, and memory pool usage per executor per stage — the data you need for memory right-sizing.

What Is Inside an Event Log

Each line is a JSON object with an Event field. The most valuable events for profiling:

EventProfiling Use
SparkListenerTaskEndPer-task: executorRunTime, jvmGcTime, peakExecutionMemory, diskBytesSpilled, shuffleReadMetrics, shuffleWriteMetrics, inputMetrics
SparkListenerStageCompletedStage-level aggregated accumulables: total run time, total GC, total spill, total shuffle
SparkListenerStageExecutorMetricsPeak JVMHeapMemory, OnHeapExecutionMemory, MinorGCTime, MajorGCTime per executor per stage
SparkListenerExecutorMetricsUpdateHeartbeat snapshots (~10s) of executor memory — builds time-series
SparkListenerEnvironmentUpdateThe full Spark configuration used (executor memory, cores, instances)

Parsing Event Logs with PySpark

You can analyze event logs from completed jobs using Spark itself — ideal for batch profiling across many applications:

from pyspark.sql.functions import *

# Read all event logs (works with rolling directories on S3/GCS/ADLS)
events = spark.read.text("s3a://my-bucket/spark-event-logs/application_*")

# Extract task-level metrics
tasks = events.filter(
get_json_object("value", "$.Event") == "SparkListenerTaskEnd"
).select(
get_json_object("value", "$.Stage ID").cast("int").alias("stage_id"),
get_json_object("value", "$.Task Info.Executor ID").alias("executor_id"),
get_json_object("value", "$.Task Metrics.Executor Run Time").cast("long").alias("run_time_ms"),
get_json_object("value", "$.Task Metrics.Executor CPU Time").cast("long").alias("cpu_time_ns"),
get_json_object("value", "$.Task Metrics.JVM GC Time").cast("long").alias("gc_time_ms"),
get_json_object("value", "$.Task Metrics.Peak Execution Memory").cast("long").alias("peak_memory"),
get_json_object("value", "$.Task Metrics.Memory Bytes Spilled").cast("long").alias("memory_spilled"),
get_json_object("value", "$.Task Metrics.Disk Bytes Spilled").cast("long").alias("disk_spilled"),
get_json_object("value",
"$.Task Metrics.Shuffle Read Metrics.Remote Bytes Read").cast("long").alias("shuffle_remote_read"),
get_json_object("value",
"$.Task Metrics.Shuffle Read Metrics.Local Bytes Read").cast("long").alias("shuffle_local_read"),
get_json_object("value",
"$.Task Metrics.Shuffle Write Metrics.Shuffle Bytes Written").cast("long").alias("shuffle_write"),
get_json_object("value",
"$.Task Metrics.Input Metrics.Bytes Read").cast("long").alias("input_bytes"),
)

# Per-stage profiling summary
tasks.groupBy("stage_id").agg(
count("*").alias("tasks"),
expr("percentile_approx(run_time_ms, 0.5)").alias("p50_runtime_ms"),
expr("percentile_approx(run_time_ms, 0.95)").alias("p95_runtime_ms"),
max("run_time_ms").alias("max_runtime_ms"),
avg("gc_time_ms").alias("avg_gc_ms"),
expr("avg(gc_time_ms / run_time_ms)").alias("avg_gc_ratio"),
sum("disk_spilled").alias("total_disk_spill"),
max("peak_memory").alias("max_peak_memory"),
sum("shuffle_remote_read").alias("total_shuffle_read"),
sum("shuffle_write").alias("total_shuffle_write"),
).orderBy("stage_id").show(truncate=False)

Storage and Retention

Store event logs in cloud object storage with lifecycle policies:

Hot (0-14 days):    Standard storage — active profiling
Warm (14-90 days): Infrequent Access / Nearline — ad-hoc investigation
Cold (90-365 days): Glacier / Coldline / Archive — compliance
Delete after 365 days

Size estimation: A medium ETL pipeline (5,000 tasks) produces ~100 MB uncompressed, ~10 MB with zstd. A large pipeline (50,000 tasks) produces ~1 GB uncompressed, ~100 MB compressed.

Collection Method 2: Spark History Server REST API

The History Server replays event logs and exposes structured profiling data through a REST API. This is the easiest method for automated profiling of completed jobs.

Key Endpoints

EndpointReturns
GET /api/v1/applicationsList of all applications with start/end times
GET /api/v1/applications/{id}/stagesAll stages with duration, task counts, I/O
GET /api/v1/applications/{id}/stages/{stageId}/taskSummaryPercentile distributions (P5, P25, P50, P75, P95) for runtime, GC, shuffle — gold for skew detection
GET /api/v1/applications/{id}/allexecutorsAll executors (including dead ones) with memory, GC, shuffle totals
GET /api/v1/applications/{id}/stages/{stageId}/taskListIndividual task details — sort by runtime to find stragglers

Automated Profiling Script

HISTORY_SERVER="http://localhost:18080"
APP_ID="app-20260319120000-0001"
BASE="${HISTORY_SERVER}/api/v1/applications/${APP_ID}"

# Dump all stages
curl -s "${BASE}/stages" | python3 -m json.tool > stages.json

# Get task summary with quantiles for each stage (skew detection)
for STAGE_ID in $(python3 -c "
import json
stages = json.load(open('stages.json'))
for s in stages:
print(s['stageId'])
"); do
curl -s "${BASE}/stages/${STAGE_ID}/0/taskSummary?\
quantiles=0.05,0.25,0.5,0.75,0.95,0.99" > "stage_${STAGE_ID}_summary.json"
done

# Get all executors (includes peak memory metrics)
curl -s "${BASE}/allexecutors" | python3 -m json.tool > executors.json

The taskSummary endpoint is the most powerful profiling tool in the REST API. It returns percentile distributions for executorRunTime, jvmGcTime, gettingResultTime, schedulerDelay, shuffleReadSize, shuffleWriteSize, memoryBytesSpilled, and diskBytesSpilled — in a single call, without fetching every individual task.

Collection Method 3: Spark Metrics System (Prometheus)

Spark's built-in metrics system exports executor, driver, and JVM metrics through configurable sinks. Since Spark 3.0, a Prometheus endpoint is available.

Enabling Prometheus

# spark-defaults.conf or --conf
spark.ui.prometheus.enabled true
spark.metrics.conf.*.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet
spark.metrics.conf.*.sink.prometheusServlet.path=/metrics/prometheus
spark.metrics.conf.*.source.jvm.class=org.apache.spark.metrics.source.JvmSource

This exposes two endpoints while the application runs:

  • http://driver:4040/metrics/prometheus/ — Driver-side Dropwizard metrics
  • http://driver:4040/metrics/executors/prometheus/ — Per-executor metrics

The Ephemeral Problem and How to Solve It

These endpoints vanish when the application terminates. For post-job analysis, you must persist the data during runtime:

Option A: Prometheus scraping — Configure Prometheus to scrape the endpoints every 15s. After the job ends, query historical data with PromQL:

# prometheus.yml
scrape_configs:
- job_name: 'spark-driver'
metrics_path: '/metrics/prometheus/'
scrape_interval: 15s
static_configs:
- targets: ['driver-host:4040']

Option B: CSV Sink — Write metrics to local files for offline analysis:

*.sink.csv.class=org.apache.spark.metrics.sink.CSVSink
*.sink.csv.period=10
*.sink.csv.unit=seconds
*.sink.csv.directory=/tmp/spark-metrics

Option C: Graphite/StatsD Sink — Push to a time-series database:

*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=graphite.example.com
*.sink.graphite.port=2003
*.sink.graphite.period=10
*.sink.graphite.unit=seconds
*.sink.graphite.prefix=spark.${spark.app.name}

Limitation

The metrics system provides executor-level aggregates (cumulative counters like succeededTasks, shuffleBytesWritten), not per-task breakdowns. For task-level detail, you need event logs or a custom SparkListener.

Collection Method 4: Custom SparkListener

For teams that want to push specific metrics to a custom store (Postgres, Delta Lake, Iceberg, ClickHouse), a SparkListener implementation gives full control.

Key Callbacks

class ProfilingListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val metrics = taskEnd.taskMetrics
val info = taskEnd.taskInfo
// Access: metrics.executorRunTime, metrics.jvmGCTime,
// metrics.peakExecutionMemory, metrics.diskBytesSpilled,
// metrics.shuffleReadMetrics.totalBytesRead,
// metrics.shuffleWriteMetrics.bytesWritten,
// metrics.inputMetrics.bytesRead
// Write to your metrics store
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val info = stageCompleted.stageInfo
// Access: info.taskMetrics (aggregated), info.submissionTime,
// info.completionTime, info.numTasks, info.failureReason
}
}

Register it via:

spark.extraListeners=com.example.ProfilingListener

The spark-measure library (open-source, by Luca Canali at CERN) provides a production-ready SparkListener that collects task and stage metrics into DataFrames — no custom code needed:

// Add spark-measure dependency, then:
val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure {
// your Spark code here
}
stageMetrics.printReport()

Platform-Specific: Databricks

Databricks provides the richest native profiling ecosystem through Unity Catalog system tables. However, there is a critical gap: system tables do not expose Spark task-level metrics (GC time per task, shuffle per executor, disk spill per stage). For that granularity, you still need Spark event logs.

What System Tables Provide

System TableMetricsGranularity
system.compute.node_timelineCPU utilization %, memory utilization %, network bytesPer node, per minute
system.lakeflow.job_run_timelinesetup, queue, execution, cleanup durations, result statePer job run
system.lakeflow.job_task_run_timelineSame durations at task levelPer task run
system.billing.usageDBU consumption per job, cluster, userHourly
system.billing.list_pricesHistorical SKU pricingPer price change
system.query.historyQuery duration, rows read/produced, bytes read, spill, shuffle, pruned files, I/O cache %Per SQL query (SQL warehouses + serverless only)

Identify Under-Utilized Clusters

SELECT
nt.cluster_id,
c.cluster_name,
c.worker_node_type,
ROUND(AVG(nt.cpu_utilization_percent), 1) AS avg_cpu_pct,
ROUND(MAX(nt.cpu_utilization_percent), 1) AS peak_cpu_pct,
ROUND(AVG(nt.memory_utilization_percent), 1) AS avg_mem_pct
FROM system.compute.node_timeline nt
JOIN system.compute.clusters c
ON nt.cluster_id = c.cluster_id
WHERE nt.timestamp >= current_date() - INTERVAL 14 DAYS
GROUP BY nt.cluster_id, c.cluster_name, c.worker_node_type
HAVING avg_cpu_pct < 20
ORDER BY avg_cpu_pct;

If average CPU is under 20%, the cluster is massively over-provisioned — either too many nodes or too large an instance type.

Cost Per Job with Dollar Amounts

SELECT
u.usage_metadata.job_id,
u.usage_metadata.job_name,
SUM(u.usage_quantity) AS total_dbus,
SUM(u.usage_quantity * p.pricing.effective_list.default) AS dollar_cost
FROM system.billing.usage u
LEFT JOIN system.billing.list_prices p
ON u.sku_name = p.sku_name AND u.cloud = p.cloud
AND u.usage_start_time >= p.price_start_time
AND (p.price_end_time IS NULL OR u.usage_start_time < p.price_end_time)
WHERE u.usage_metadata.job_id IS NOT NULL
AND u.usage_date >= current_date() - INTERVAL 30 DAYS
GROUP BY u.usage_metadata.job_id, u.usage_metadata.job_name
ORDER BY dollar_cost DESC;

Cost of Failed Jobs (Waste)

SELECT
j.name AS job_name,
COUNT(*) AS failed_runs,
SUM(jrt.run_duration_seconds) / 3600 AS total_wasted_hours,
SUM(b.dollar_cost) AS total_wasted_dollars
FROM system.lakeflow.job_run_timeline jrt
JOIN system.lakeflow.jobs j ON jrt.job_id = j.job_id
LEFT JOIN (
SELECT usage_metadata.job_run_id AS run_id,
SUM(usage_quantity * p.pricing.effective_list.default) AS dollar_cost
FROM system.billing.usage u
LEFT JOIN system.billing.list_prices p
ON u.sku_name = p.sku_name AND u.cloud = p.cloud
AND u.usage_start_time >= p.price_start_time
AND (p.price_end_time IS NULL OR u.usage_start_time < p.price_end_time)
GROUP BY usage_metadata.job_run_id
) b ON jrt.run_id = b.run_id
WHERE jrt.result_state = 'FAILED'
AND jrt.period_start_time >= current_date() - INTERVAL 30 DAYS
GROUP BY j.name
ORDER BY total_wasted_dollars DESC;

Databricks Ganglia Replacement

Since DBR 13.3, Ganglia has been replaced by native compute metrics visible in the Metrics tab of the cluster page. This provides CPU, memory, network, and Spark task/shuffle metrics at minute granularity — available for up to 30 days.

The Spark Task-Level Gap

System tables give you job-level and node-level metrics. For task-level diagnosis (GC per task, shuffle per executor, spill per stage, skew detection), you need:

  1. Spark event logs — Enable spark.eventLog.enabled=true with cluster log delivery or a dedicated event log directory
  2. Overwatch (Databricks Labs, open-source) — Parses event logs into queryable Delta tables with stage and task-level metrics. In maintenance mode since system tables launched, but still the only way to get this data natively on Databricks

Platform-Specific: AWS EMR

Persistent Spark History Server

EMR provides a managed persistent UI that survives cluster termination — but it only works when event logs are in HDFS (the default). If you set spark.eventLog.dir to S3, the managed UI will not function.

Recommended: Write event logs to S3 and run a dedicated History Server:

[{
"Classification": "spark-defaults",
"Properties": {
"spark.eventLog.enabled": "true",
"spark.eventLog.dir": "s3a://my-bucket/spark-event-logs/",
"spark.eventLog.compress": "true",
"spark.eventLog.compression.codec": "zstd",
"spark.eventLog.logStageExecutorMetrics": "true"
}
}]

Then run a single-node EMR cluster as a persistent History Server:

aws emr create-cluster \
--name "Persistent Spark History Server" \
--release-label emr-7.1.0 \
--applications Name=Spark \
--instance-type m5.large --instance-count 1 \
--configurations '[{
"Classification": "spark-defaults",
"Properties": {
"spark.history.fs.logDirectory": "s3a://my-bucket/spark-event-logs/",
"spark.history.retainedApplications": "200",
"spark.history.fs.cleaner.enabled": "true",
"spark.history.fs.cleaner.maxAge": "30d"
}
}]' \
--use-default-roles

CloudWatch Metrics (Cluster-Level)

EMR publishes YARN metrics to CloudWatch under the AWS/ElasticMapReduce namespace. These are cluster-level only:

CloudWatch MetricWhat It Tells You
MemoryAllocatedMB / MemoryTotalMBMemory utilization. Low ratio = over-provisioned.
ContainerPendingNon-zero = resource starvation. Scale up.
YARNMemoryAvailablePercentageInverse of utilization. Consistently over 60% = waste.
IsIdle1 = no tasks running. Pay attention to idle time between steps.
AppsCompleted / AppsFailedJob success rate tracking.
aws cloudwatch get-metric-statistics \
--namespace AWS/ElasticMapReduce \
--metric-name YARNMemoryAvailablePercentage \
--dimensions Name=JobFlowId,Value=j-XXXXXXXXXXXXX \
--start-time 2026-03-18T00:00:00Z \
--end-time 2026-03-19T00:00:00Z \
--period 300 --statistics Average,Minimum

Capture YARN Metrics Before Termination

The YARN ResourceManager API (http://master:8088/ws/v1/cluster/apps) provides rich per-application metrics (memorySeconds, vcoreSeconds, elapsedTime) but is only available while the cluster runs. Capture it via a shutdown action:

#!/bin/bash
# bootstrap-shutdown-capture.sh -- runs on master before termination
CLUSTER_ID=$(python3 -c "import json; print(json.load(open('/mnt/var/lib/info/job-flow.json'))['jobFlowId'])")
mkdir -p /mnt/var/lib/instance-controller/public/shutdown-actions/
cat > /mnt/var/lib/instance-controller/public/shutdown-actions/capture.sh << 'SCRIPT'
curl -s http://localhost:8088/ws/v1/cluster/apps?applicationTypes=SPARK | \
aws s3 cp - s3://my-bucket/yarn-snapshots/${CLUSTER_ID}/apps.json
curl -s http://localhost:8088/ws/v1/cluster/metrics | \
aws s3 cp - s3://my-bucket/yarn-snapshots/${CLUSTER_ID}/metrics.json
SCRIPT
chmod +x /mnt/var/lib/instance-controller/public/shutdown-actions/capture.sh

EMR Log Structure in S3

When using --log-uri s3://my-logs/, EMR archives:

s3://my-logs/<cluster-id>/
├── steps/<step-id>/stderr.gz # Spark driver output
├── containers/<app-id>/
│ └── <container-id>/stderr.gz # Executor logs (GC, OOM)
└── node/<instance-id>/applications/ # Daemon logs

Platform-Specific: GCP Dataproc

Persistent History Server on GCS

Create a dedicated single-node cluster as a persistent History Server:

# Configure ephemeral clusters to write event logs to GCS
gcloud dataproc clusters create my-job-cluster \
--region=us-central1 \
--num-workers=4 \
--properties="\
spark:spark.eventLog.enabled=true,\
spark:spark.eventLog.dir=gs://my-spark-logs/event-logs/,\
spark:spark.eventLog.logStageExecutorMetrics=true" \
--enable-component-gateway

# Create a persistent History Server reading from the same GCS bucket
gcloud dataproc clusters create phs-cluster \
--region=us-central1 --single-node \
--properties="spark:spark.history.fs.logDirectory=gs://my-spark-logs/event-logs/" \
--enable-component-gateway

The PHS cluster stays running and provides the Spark History UI through the Component Gateway — accessible after ephemeral clusters are deleted.

Cloud Monitoring Metrics

Dataproc publishes YARN metrics under dataproc.googleapis.com/cluster/:

MetricDescription
yarn/memory_fractionYARN memory utilization (0.0-1.0)
yarn/container_count (state=pending)Resource starvation indicator
yarn/apps (status=completed/failed)Job tracking
job/yarn_memory_secondsMemory-seconds per job — primary cost metric
job/yarn_vcore_secondsvCore-seconds per job
gcloud monitoring time-series list \
--filter='metric.type="dataproc.googleapis.com/cluster/yarn/memory_fraction"
AND resource.labels.cluster_name="my-cluster"' \
--interval-start-time="2026-03-18T00:00:00Z" \
--interval-end-time="2026-03-19T00:00:00Z"

Dataproc Jobs API (Survives Cluster Deletion)

Unlike YARN ResourceManager, the Dataproc Jobs API retains metadata indefinitely:

gcloud dataproc jobs describe JOB_ID --region=us-central1 --format=json

This returns yarnApplications[].memoryMbSeconds and vcoreSeconds — the primary cost metrics — plus full status history with timestamps for PENDING, SETUP_DONE, RUNNING, and DONE transitions.

BigQuery Audit Logs

Export Dataproc audit logs to BigQuery for historical SQL analysis:

gcloud logging sinks create dataproc-to-bq \
"bigquery.googleapis.com/projects/my-project/datasets/dataproc_logs" \
--log-filter='resource.type="cloud_dataproc_job"' \
--use-partitioned-tables

Then query job history with SQL:

SELECT
JSON_EXTRACT_SCALAR(protopayload_auditlog.response,
'$.job.reference.jobId') AS job_id,
CAST(JSON_EXTRACT_SCALAR(protopayload_auditlog.response,
'$.job.yarnApplications[0].memoryMbSeconds') AS INT64) AS memory_mb_seconds,
CAST(JSON_EXTRACT_SCALAR(protopayload_auditlog.response,
'$.job.yarnApplications[0].vcoreSeconds') AS INT64) AS vcore_seconds
FROM `my-project.dataproc_logs.cloudaudit_googleapis_com_activity`
WHERE resource.type = 'cloud_dataproc_job'
AND DATE(timestamp) >= '2026-03-01'
ORDER BY memory_mb_seconds DESC;

Right-Sizing from Actual Metrics

Once you have the data, here is how to turn it into configuration changes.

Memory Right-Sizing Formula

recommended_executor_memory = peak_JVM_heap × 1.15
container_memory = recommended_executor_memory + max(384 MB, 0.1 × recommended_executor_memory)

Worked example:

Observed: peak JVM heap = 5.8 GB across all executors
Current: spark.executor.memory = 16g

Recommended: 5.8 × 1.15 = 6.67 → round to 7g
Overhead: max(384 MB, 0.1 × 7g) = 0.7g
Container: 7.7g total

Savings per executor: 16g - 7g = 9g → enables smaller instance type

If your current instance has 64 GB and you are using 16g per executor (3 executors per node + overhead), dropping to 7g per executor might let you switch from r5.2xlarge ($0.504/hr) to m5.2xlarge ($0.384/hr) — 24% cost reduction.

Executor Count Right-Sizing

executor_utilization = total_task_run_time / (num_executors × cores × wall_clock_time)
optimal_executors = ceil(total_task_run_time / (cores × target_wall_clock))

If utilization is under 20%, you are wasting over 80% of compute capacity. Enable dynamic allocation:

spark.dynamicAllocation.enabled               true
spark.dynamicAllocation.minExecutors 2
spark.dynamicAllocation.maxExecutors <optimal from formula>
spark.dynamicAllocation.executorIdleTimeout 60s

Skew Remediation Decision

Skew Ratio (max/median)Action
Under 3xNo action needed.
3-10xEnable AQE skew join: spark.sql.adaptive.skewJoin.enabled=true with skewedPartitionThresholdInBytes=256m and skewedPartitionFactor=5.
Over 10xAQE alone may not suffice. Investigate the partition key. Consider salting, two-phase aggregation, or broadcast join for the skewed table.

Post-Job Profiling Thresholds Cheat Sheet

MetricHow to ComputeHealthyWarningAction Required
GC RatiojvmGcTime / executorRunTimeUnder 5%5-10%Over 10%: increase memory or reduce cores
CPU RatiocpuTime / (runTime × 1e6)Over 0.700.30-0.70Under 0.30: I/O or GC bound
Skew Ratiomax(taskDuration) / median(taskDuration) per stageUnder 2x2-5xOver 5x: salt, AQE, or repartition
Memory UtilpeakHeap / allocatedMemory50-85%Under 40%: wasteOver 90%: OOM risk
Disk Spillsum(diskBytesSpilled)0Any valueOver 1 GB: urgent — increase memory
Shuffle LocalitylocalBytes / (local + remote)Over 50%20-50%Under 20%: network bottleneck
Scheduling DelaywallClock - runTime - deserialize - serializeUnder 100ms100ms-1sOver 1s: resource contention
Executor UtiltotalTaskTime / (executors × cores × wallClock)Over 50%20-50%Under 20%: massive over-provisioning

Platform Comparison: What You Get Where

Metric LevelSpark Event LogsDatabricks System TablesEMR CloudWatchDataproc Cloud Monitoring
Task-level (GC, shuffle, spill per task)YesNoNoNo
Stage-level (aggregated metrics)YesNoNoNo
Executor-level (peak memory, GC counts)YesNoNoNo
Job-level (duration, result)YesYes (lakeflow)Yes (steps)Yes (jobs API)
Node-level (CPU, memory %)NoYes (node_timeline)Yes (CloudWatch)Yes (Cloud Monitoring)
Cost (dollars per job)NoYes (billing.usage)No (manual calc)No (manual calc)
SQL query metrics (rows, bytes, spill)Via SQL eventsYes (query.history)NoNo
Post-termination availabilityIndefinite (object storage)365 days63 days (5-min), 15 months (1-hr)~6 weeks (1-min), ~6 months (10-min)

The takeaway: Spark event logs are the only universal source of task-level profiling data. Platform monitoring gives you node utilization and cost, but cannot tell you why a stage was slow or which executor had GC problems. You need both.

Building the Profiling Pipeline

For teams managing dozens or hundreds of daily jobs, manual profiling does not scale. Here is the architecture for automated profiling:

Spark Jobs (Databricks / EMR / Dataproc)

├── Event Logs → S3/GCS/ADLS
│ │
│ ▼
│ Event Log Parser
│ (Spark job or Python)
│ │
│ ▼
│ Metrics Store
│ (Delta / Iceberg / ClickHouse)

├── Platform Metrics → System Tables / CloudWatch / Cloud Monitoring
│ │
│ ▼
│ Join on job_id / cluster_id

└── Alerts
├── GC ratio > 10% → Slack notification
├── Disk spill > 1 GB → Slack notification
├── Skew ratio > 5x → Slack notification
├── Memory util < 40% → Right-sizing recommendation
└── Executor util < 20% → Over-provisioning alert

The parser runs on a schedule (daily), reads new event logs, extracts the five key metrics per stage, and writes them to a metrics table. A dashboard shows trends over time. Alerts fire when thresholds are breached.

How Cazpian Handles This

The profiling workflow described above requires stitching together event logs, platform APIs, History Servers, and custom parsers — across every cluster, every platform, every job. Most teams never build this pipeline. They either guess at configurations or overprovision by default.

Cazpian eliminates this entirely.

Immediate Metrics After Every Job and Query

When a Spark job or SQL query completes on Cazpian, profiling metrics are immediately available — no event log parsing, no History Server setup, no custom SparkListener. Every metric covered in this post is collected automatically:

  • GC ratio per stage and per executor
  • Peak memory utilization against allocated memory
  • Task duration distribution with skew ratio detection
  • Disk spill totals per stage
  • CPU utilization ratio (compute vs I/O wait)
  • Shuffle metrics — bytes read/written, local vs remote ratio
  • Cost — actual dollar spend per job

In the Cazpian SQL editor, compute metrics appear alongside query results. For batch pipelines, the job dashboard shows per-stage breakdowns immediately after completion. No digging through Spark UI. No waiting for event log ingestion.

AI-Powered Recommendations

Raw metrics require expertise to interpret. Cazpian's AI analyzes the collected metrics and provides actionable recommendations:

  • Bottleneck identification — pinpoints whether a job is memory-bound, I/O-bound, skew-bound, or compute-bound
  • Right-sizing suggestions — specific executor memory and core count changes based on actual peak usage, not guesswork
  • Skew remediation — identifies skewed stages and recommends salting, AQE tuning, or repartitioning strategies
  • Spill elimination — traces disk spill to the responsible stage and suggests the minimum memory increase to eliminate it
  • Cost optimization — correlates resource utilization with spend and recommends instance type changes with projected savings

Instead of a senior engineer spending 30 minutes interpreting event log data and computing right-sizing formulas manually, the AI delivers the diagnosis and fix in seconds.

The Visibility Gap on Other Platforms

On Databricks, task-level metrics require parsing event logs or running Overwatch. On EMR, you need a self-hosted History Server and shutdown scripts to capture YARN data. On Dataproc, you piece together Cloud Monitoring, Jobs API, and audit logs. On all three, correlating metrics with cost requires joining billing data manually.

Cazpian treats profiling as a first-class feature, not an afterthought. Every metric is collected, correlated with cost, and surfaced with AI-driven recommendations — making right-sizing a routine part of every job, not a quarterly optimization project.

What Profiling Cannot Tell You

Profiling completed jobs tells you what happened. It does not tell you:

  • What the optimal partition count is — you need to experiment with spark.sql.shuffle.partitions and re-measure
  • Whether a broadcast join would help — you need to check table sizes and try spark.sql.autoBroadcastJoinThreshold
  • Whether your data format is optimal — Parquet vs ORC, compression codec, row group size require A/B testing
  • Whether your query plan is efficient — use EXPLAIN FORMATTED before the job runs, not after

Profiling is the diagnostic step. It tells you where to look. The fix still requires engineering judgment and experimentation.

Next Steps

  1. Enable event logging with spark.eventLog.logStageExecutorMetrics=true on every cluster. This is the single highest-leverage change.
  2. Set up a persistent History Server (self-hosted on S3/GCS) if you use EMR or Dataproc. On Databricks, enable cluster log delivery.
  3. Run the PySpark event log parser on your last 7 days of jobs. Identify the top 3 jobs by GC ratio and disk spill.
  4. Apply right-sizing formulas to your top 5 most expensive jobs. Start with memory right-sizing — it is the safest change with the highest ROI.
  5. Automate with a daily profiling pipeline that parses event logs and alerts on threshold breaches.

The difference between a well-profiled Spark deployment and an unmonitored one is typically 2-5x in compute cost. The metrics are already there. You just need to collect them.