Code & Configuration
Configuration (YAML)
Le fichier de1_project_config.yml centralise les paramètres du pipeline.
de1_project_config.yml
# Project Configuration
paths:
raw_csv_glob: "data/US_Accidents_March23.csv"
bronze: "outputs/project/bronze"
silver: "outputs/project/silver"
gold: "outputs/project/gold"
proof: "proof"
layout:
# Utilisé dans la cellule 3 et 5 pour le partitionnement
partition_by: ["State"]
Notebook Jupyter: DE1_Project_Notebook_EN
Chargement du fichier de configuration et lancement de spark/p>
0. Load Config
import yaml, pathlib, datetime
from pyspark.sql import SparkSession, functions as F, types as T
import os
# Force Spark à utiliser l'adresse locale (localhost) pour éviter les erreurs réseaux
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"
with open("de1_project_config.yml") as f:
CFG = yaml.safe_load(f)
spark = SparkSession.builder \
.appName("DE1-Project-Lakehouse") \
.config("spark.driver.host", "127.0.0.1") \
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
.getOrCreate()
CFG
proof = CFG["paths"]["proof"]
# Fonction pour sauvegarder le Plan Physique (La preuve technique)
def save_execution_plan(df, filename):
# On récupère le plan "Expliqué" complet
# mode="extended" donne le Parsed, Analyzed, Optimized et Physical plan
plan = df._jdf.queryExecution().toString()
filepath = f"{proof}/{filename}.txt"
with open(filepath, "w") as f:
f.write(plan)
print(f"Plan d'exécution sauvegardé dans : {filepath}")
print(f"Config loaded")
Ingestion brute CSV. Ajout des colonnes d'audit (ingested_at). Stockage Parquet non partitionné pour la vitesse.
1. Bronze — landing raw data
print("--- Démarrage Bronze ---")
raw_glob = CFG["paths"]["raw_csv_glob"]
bronze = CFG["paths"]["bronze"]
df_raw = (spark.read.option("header","true").csv(raw_glob))
df_raw.write.mode("overwrite").csv(bronze) # keep raw as CSV copy
print("Bronze written:", bronze)
# 1. Enrichissement : Ajout des colonnes d'audit (Timestamp + Source)
# On réutilise 'df_raw' qui est déjà en mémoire
df_bronze_enhanced = df_raw \
.withColumn("_ingested_at", F.current_timestamp()) \
.withColumn("_source_file", F.input_file_name())
# --- PREUVE ---
# Cela prouve que Spark effectue un "FileScan csv"
save_execution_plan(df_bronze_enhanced, "bronze_ingestion_plan")
# 2. Sauvegarde en Parquet (Plus rapide pour l'étape Silver)
# On définit un nouveau chemin pour ne pas mélanger avec le CSV
bronze_parquet = f"{bronze}_parquet"
df_bronze_enhanced.write.mode("overwrite").parquet(bronze_parquet)
print(f"Version enrichie (Parquet) écrite dans : {bronze_parquet}")
print(f"Nombre de lignes ingérées : {df_bronze_enhanced.count()}")
Casting des types (Severity INT, Date Timestamp). Partitionnement initial par State, puis optimisé par Year.
2. Silver — cleaning and typing
from pyspark.sql import functions as F, types as T
print("--- Démarrage Silver ---")
# 1. Configuration des chemins
# On lit le Bronze (version Parquet optimisée de l'étape précédente)
bronze_path = f"{CFG['paths']['bronze']}_parquet"
silver = CFG["paths"]["silver"]
# 2. Lecture
df_bronze = spark.read.parquet(bronze_path)
# 3. Transformation & Typage
# On convertit les String en types réels (Timestamp, Int, Double)
df_silver = (df_bronze
.select(
F.col("ID").alias("accident_id"),
F.col("Severity").cast("int"),
F.col("Start_Time").cast("timestamp").alias("event_time"),
F.col("State"),
F.col("City"),
F.col("Temperature(F)").cast("double").alias("temp_f"),
F.col("Weather_Condition"),
F.col("_ingested_at"), # On garde la traçabilité
F.col("_source_file")
)
# 4. Nettoyage : On supprime les lignes sans date ou sans état
.dropna(subset=["event_time", "State", "accident_id"])
)
# --- PREUVE ---
# Sauvegarde le plan d'exécution avant l'écriture
save_execution_plan(df_silver, "silver_transformation_plan")
# 4. Écriture avec Partitionnement
# On récupère la colonne de partition depuis la config (ex: ["State"])
partition_cols = CFG['layout']['partition_by']
print(f"Écriture dans {silver} (Partitionné par {partition_cols})...")
df_silver.write.mode("overwrite").parquet(silver)
print("Silver written:", silver)
print(f"Nombre de lignes ingérées : {df_silver.count()}")
Agrégats métier : Accidents par État et tendances mensuelles pour le reporting.
3. Gold — analytics tables
print("--- Démarrage Gold ---")
silver_path = CFG["paths"]["silver"]
gold_path = CFG["paths"]["gold"]
# 1. Lecture de la couche Silver
df_silver = spark.read.parquet(silver_path)
# --- KPI 1 : Statistiques par État (Gravité moyenne & Total) ---
df_gold_state = (df_silver
.groupBy("State")
.agg(
F.count("accident_id").alias("total_accidents"),
F.round(F.avg("Severity"), 2).alias("avg_severity")
)
.orderBy(F.col("total_accidents").desc())
)
# --- KPI 2 : Analyse Temporelle (Par mois) ---
# On extrait le mois depuis event_time
df_gold_monthly = (df_silver
.withColumn("Month", F.date_format("event_time", "yyyy-MM"))
.groupBy("Month")
.count()
.orderBy("Month")
)
# --- PREUVE ---
# Référence avant optimisation
save_execution_plan(df_gold_state, "gold_q1_baseline_plan")
# 3. Écriture des résultats
print("Écriture des tables Gold...")
df_gold_state.write.mode("overwrite").parquet(f"{gold_path}/accidents_by_state")
df_gold_monthly.write.mode("overwrite").parquet(f"{gold_path}/accidents_by_month")
print(f"Résultats dans : {gold_path}")
print(f"Nombre de lignes de statistiques par Etat ingérées : {df_gold_state.count()}")
print(f"Nombre de lignes temporelle ingérées : {df_gold_monthly.count()}")
Métrique du minimum à trouver
4. Baseline plans and metrics
print("--- Démarrage Baseline Metrics ---")
# 1. Configuration
silver_path = CFG["paths"]["silver"]
gold_path = CFG["paths"]["gold"]
# Lecture de la table Silver (non-optimisée pour cette requête)
df_silver = spark.read.parquet(silver_path)
# 2. Définition de la Requête Q1 (Baseline)
# "Quels sont les états avec le plus d'accidents graves ?"
# C'est une requête lourde car elle doit scanner toute la table et grouper.
df_gold_q1_baseline = (df_silver
.groupBy("State")
.agg(
F.count("accident_id").alias("total_accidents"),
F.avg("Severity").alias("avg_severity")
)
.orderBy(F.col("total_accidents").desc())
)
# 3. PREUVE 1 : Sauvegarde du Plan Baseline
# Ce fichier montrera que Spark fait un gros scan (Scan Parquet)
save_execution_plan(df_gold_q1_baseline, "q1_baseline_plan")
# 4. Exécution pour Mesure (Baseline Time)
print("Lancement de l'exécution Baseline (Q1)...")
# On écrit le résultat dans un dossier spécifique "baseline"
# Le mode "overwrite" assure qu'on refait le calcul à chaque fois
(df_gold_q1_baseline
.write
.mode("overwrite")
.parquet(f"{gold_path}/q1_baseline_results")
)
print("Baseline terminée.")
print("ACTION REQUISE :")
print("1. Il faut aller sur http://localhost:4040 -> Onglet 'SQL'")
print("2. Repèrer la dernière requête (Duration) puis noter ce temps.")
Comparaison Baseline et l'optimisé
5. Optimization — layout and joins
print("--- Démarrage Optimization ---")
silver_path = CFG["paths"]["silver"]
# Chemin pour la table optimisée
optimized_path = f"{CFG['paths']['silver']}_optimized_by_year"
# 1. Lecture de la table Silver existante (Partitionnée par State)
df_silver = spark.read.parquet(silver_path)
# --- PRÉPARATION DE L'OPTIMISATION ---
# On crée une nouvelle structure physique partitionnée par ANNÉE
# Cela prend un peu de temps à écrire, mais rendra la lecture future instantanée.
print("Création de la table optimisée (Partitionnée par Année)...")
df_optimized = df_silver.withColumn("year_part", F.year("event_time"))
(df_optimized
.write
.mode("overwrite")
.partitionBy("year_part") # <--- L'optimisation est ici
.parquet(optimized_path)
)
# --- LE DUEL : SCÉNARIO A vs SCÉNARIO B ---
# Requête : "Compter les accidents survenus en 2021"
# CAS A : Sur la table rangée par ÉTAT (Silver standard)
print("\n--- TEST A : Table Standard (Partition=State) ---")
print("Spark doit scanner tous les dossiers d'états pour trouver 2021.")
# On vide le cache pour être équitable
spark.catalog.clearCache()
# On lance la requête
df_silver.filter(F.year("event_time") == 2021).count()
print("ACTION REQUISE :")
print("1. Il faut aller sur http://localhost:4040 -> Onglet 'SQL'")
print("2. Repèrer l'avant dernière requête (Duration) puis noter ce temps.")
# CAS B : Sur la table rangée par ANNÉE (Optimisée)
print("\n--- TEST B : Table Optimisée (Partition=Year) ---")
print("Spark doit lire UNIQUEMENT le dossier year_part=2021.")
spark.catalog.clearCache()
df_opt_read = spark.read.parquet(optimized_path)
# On lance la même requête
df_opt_read.filter(F.col("year_part") == 2021).count()
print("Optimisation terminée.")
print("ACTION REQUISE :")
print("1. Il faut aller sur http://localhost:4040 -> Onglet 'SQL'")
print("2. Repèrer la dernière requête (Duration) puis noter ce temps.")
Arret de la session Spark
6. Cleanup
spark.stop()
print("Spark session stopped.")