Code & Configuration

Configuration (YAML)

Le fichier de1_project_config.yml centralise les paramètres du pipeline.

de1_project_config.yml
# Project Configuration
paths:
  raw_csv_glob: "data/US_Accidents_March23.csv"
  bronze: "outputs/project/bronze"
  silver: "outputs/project/silver"
  gold: "outputs/project/gold"
  proof: "proof"

layout:
  # Utilisé dans la cellule 3 et 5 pour le partitionnement
  partition_by: ["State"]

Notebook Jupyter: DE1_Project_Notebook_EN

Chargement du fichier de configuration et lancement de spark/p>

0. Load Config

import yaml, pathlib, datetime
from pyspark.sql import SparkSession, functions as F, types as T
import os

# Force Spark à utiliser l'adresse locale (localhost) pour éviter les erreurs réseaux
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
os.environ["OBJC_DISABLE_INITIALIZE_FORK_SAFETY"] = "YES"

with open("de1_project_config.yml") as f:
    CFG = yaml.safe_load(f)

spark = SparkSession.builder \
    .appName("DE1-Project-Lakehouse") \
    .config("spark.driver.host", "127.0.0.1") \
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
    .getOrCreate()

CFG 

proof = CFG["paths"]["proof"]
# Fonction pour sauvegarder le Plan Physique (La preuve technique)
def save_execution_plan(df, filename):
    # On récupère le plan "Expliqué" complet
    # mode="extended" donne le Parsed, Analyzed, Optimized et Physical plan
    plan = df._jdf.queryExecution().toString() 
    
    filepath = f"{proof}/{filename}.txt"
    with open(filepath, "w") as f:
        f.write(plan)
    print(f"Plan d'exécution sauvegardé dans : {filepath}")

print(f"Config loaded")

Ingestion brute CSV. Ajout des colonnes d'audit (ingested_at). Stockage Parquet non partitionné pour la vitesse.

1. Bronze — landing raw data

print("--- Démarrage Bronze ---")

raw_glob = CFG["paths"]["raw_csv_glob"]
bronze = CFG["paths"]["bronze"]

df_raw = (spark.read.option("header","true").csv(raw_glob))
df_raw.write.mode("overwrite").csv(bronze)  # keep raw as CSV copy
print("Bronze written:", bronze)

# 1. Enrichissement : Ajout des colonnes d'audit (Timestamp + Source)
# On réutilise 'df_raw' qui est déjà en mémoire
df_bronze_enhanced = df_raw \
    .withColumn("_ingested_at", F.current_timestamp()) \
    .withColumn("_source_file", F.input_file_name())

# --- PREUVE ---
# Cela prouve que Spark effectue un "FileScan csv"
save_execution_plan(df_bronze_enhanced, "bronze_ingestion_plan")

# 2. Sauvegarde en Parquet (Plus rapide pour l'étape Silver)
# On définit un nouveau chemin pour ne pas mélanger avec le CSV
bronze_parquet = f"{bronze}_parquet"

df_bronze_enhanced.write.mode("overwrite").parquet(bronze_parquet)

print(f"Version enrichie (Parquet) écrite dans : {bronze_parquet}")
print(f"Nombre de lignes ingérées : {df_bronze_enhanced.count()}")

Casting des types (Severity INT, Date Timestamp). Partitionnement initial par State, puis optimisé par Year.

2. Silver — cleaning and typing

from pyspark.sql import functions as F, types as T

print("--- Démarrage Silver ---")

# 1. Configuration des chemins
# On lit le Bronze (version Parquet optimisée de l'étape précédente)
bronze_path = f"{CFG['paths']['bronze']}_parquet"
silver = CFG["paths"]["silver"]

# 2. Lecture
df_bronze = spark.read.parquet(bronze_path)

# 3. Transformation & Typage 
# On convertit les String en types réels (Timestamp, Int, Double)
df_silver = (df_bronze
    .select(
        F.col("ID").alias("accident_id"),
        F.col("Severity").cast("int"),
        F.col("Start_Time").cast("timestamp").alias("event_time"),
        F.col("State"),
        F.col("City"),
        F.col("Temperature(F)").cast("double").alias("temp_f"),
        F.col("Weather_Condition"),
        F.col("_ingested_at"), # On garde la traçabilité
        F.col("_source_file")
    )
    # 4. Nettoyage : On supprime les lignes sans date ou sans état
    .dropna(subset=["event_time", "State", "accident_id"])
)

# --- PREUVE ---
# Sauvegarde le plan d'exécution avant l'écriture
save_execution_plan(df_silver, "silver_transformation_plan")

# 4. Écriture avec Partitionnement
# On récupère la colonne de partition depuis la config (ex: ["State"])
partition_cols = CFG['layout']['partition_by']

print(f"Écriture dans {silver} (Partitionné par {partition_cols})...")
df_silver.write.mode("overwrite").parquet(silver)
print("Silver written:", silver)

print(f"Nombre de lignes ingérées : {df_silver.count()}")

Agrégats métier : Accidents par État et tendances mensuelles pour le reporting.

3. Gold — analytics tables

print("--- Démarrage Gold ---")

silver_path = CFG["paths"]["silver"]
gold_path = CFG["paths"]["gold"]

# 1. Lecture de la couche Silver
df_silver = spark.read.parquet(silver_path)

# --- KPI 1 : Statistiques par État (Gravité moyenne & Total) ---
df_gold_state = (df_silver
    .groupBy("State")
    .agg(
        F.count("accident_id").alias("total_accidents"),
        F.round(F.avg("Severity"), 2).alias("avg_severity")
    )
    .orderBy(F.col("total_accidents").desc())
)

# --- KPI 2 : Analyse Temporelle (Par mois) ---
# On extrait le mois depuis event_time
df_gold_monthly = (df_silver
    .withColumn("Month", F.date_format("event_time", "yyyy-MM"))
    .groupBy("Month")
    .count()
    .orderBy("Month")
)

# --- PREUVE ---
# Référence avant optimisation
save_execution_plan(df_gold_state, "gold_q1_baseline_plan")

# 3. Écriture des résultats
print("Écriture des tables Gold...")
df_gold_state.write.mode("overwrite").parquet(f"{gold_path}/accidents_by_state")
df_gold_monthly.write.mode("overwrite").parquet(f"{gold_path}/accidents_by_month")

print(f"Résultats dans : {gold_path}")
print(f"Nombre de lignes de statistiques par Etat ingérées : {df_gold_state.count()}")
print(f"Nombre de lignes temporelle ingérées : {df_gold_monthly.count()}")

Métrique du minimum à trouver

4. Baseline plans and metrics

print("--- Démarrage Baseline Metrics ---")

# 1. Configuration
silver_path = CFG["paths"]["silver"]
gold_path = CFG["paths"]["gold"]

# Lecture de la table Silver (non-optimisée pour cette requête)
df_silver = spark.read.parquet(silver_path)

# 2. Définition de la Requête Q1 (Baseline)
# "Quels sont les états avec le plus d'accidents graves ?"
# C'est une requête lourde car elle doit scanner toute la table et grouper.
df_gold_q1_baseline = (df_silver
    .groupBy("State")
    .agg(
        F.count("accident_id").alias("total_accidents"),
        F.avg("Severity").alias("avg_severity")
    )
    .orderBy(F.col("total_accidents").desc())
)

# 3. PREUVE 1 : Sauvegarde du Plan Baseline
# Ce fichier montrera que Spark fait un gros scan (Scan Parquet)
save_execution_plan(df_gold_q1_baseline, "q1_baseline_plan")

# 4. Exécution pour Mesure (Baseline Time)
print("Lancement de l'exécution Baseline (Q1)...")

# On écrit le résultat dans un dossier spécifique "baseline"
# Le mode "overwrite" assure qu'on refait le calcul à chaque fois
(df_gold_q1_baseline
    .write
    .mode("overwrite")
    .parquet(f"{gold_path}/q1_baseline_results")
)

print("Baseline terminée.")
print("ACTION REQUISE :")
print("1. Il faut aller sur http://localhost:4040 -> Onglet 'SQL'")
print("2. Repèrer la dernière requête (Duration) puis noter ce temps.")

Comparaison Baseline et l'optimisé

5. Optimization — layout and joins

print("--- Démarrage Optimization ---")

silver_path = CFG["paths"]["silver"]
# Chemin pour la table optimisée
optimized_path = f"{CFG['paths']['silver']}_optimized_by_year"

# 1. Lecture de la table Silver existante (Partitionnée par State)
df_silver = spark.read.parquet(silver_path)

# --- PRÉPARATION DE L'OPTIMISATION ---
# On crée une nouvelle structure physique partitionnée par ANNÉE
# Cela prend un peu de temps à écrire, mais rendra la lecture future instantanée.
print("Création de la table optimisée (Partitionnée par Année)...")

df_optimized = df_silver.withColumn("year_part", F.year("event_time"))

(df_optimized
    .write
    .mode("overwrite")
    .partitionBy("year_part") # <--- L'optimisation est ici
    .parquet(optimized_path)
)

# --- LE DUEL : SCÉNARIO A vs SCÉNARIO B ---
# Requête : "Compter les accidents survenus en 2021"

# CAS A : Sur la table rangée par ÉTAT (Silver standard)
print("\n--- TEST A : Table Standard (Partition=State) ---")
print("Spark doit scanner tous les dossiers d'états pour trouver 2021.")

# On vide le cache pour être équitable
spark.catalog.clearCache()

# On lance la requête
df_silver.filter(F.year("event_time") == 2021).count()
print("ACTION REQUISE :")
print("1. Il faut aller sur http://localhost:4040 -> Onglet 'SQL'")
print("2. Repèrer l'avant dernière requête (Duration) puis noter ce temps.")


# CAS B : Sur la table rangée par ANNÉE (Optimisée)
print("\n--- TEST B : Table Optimisée (Partition=Year) ---")
print("Spark doit lire UNIQUEMENT le dossier year_part=2021.")

spark.catalog.clearCache()
df_opt_read = spark.read.parquet(optimized_path)

# On lance la même requête
df_opt_read.filter(F.col("year_part") == 2021).count()
print("Optimisation terminée.")
print("ACTION REQUISE :")
print("1. Il faut aller sur http://localhost:4040 -> Onglet 'SQL'")
print("2. Repèrer la dernière requête (Duration) puis noter ce temps.")

Arret de la session Spark

6. Cleanup

spark.stop()
print("Spark session stopped.")