Real-time Clickstream Analytics

Data Engineering
Streaming
Kafka
Spark
Pipeline de streaming analytics pour l’analyse comportementale e-commerce

Contexte & Problématique

Question business : Comment détecter en temps réel les anomalies dans le comportement utilisateur e-commerce ?

En tant qu’analyste e-commerce, pouvoir observer en temps réel le comportement des utilisateurs permet de :

  • Détecter rapidement les anomalies (baisse soudaine du trafic, problème de checkout)
  • Optimiser l’expérience en identifiant les points de friction
  • Réagir vite aux événements (promotions, incidents techniques)

Ce projet implémente un pipeline de streaming analytics complet, de l’ingestion à la visualisation.

Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  CSV Events │────▶│    Kafka    │────▶│    Spark    │
│  (Source)   │     │  (Broker)   │     │ (Streaming) │
└─────────────┘     └─────────────┘     └─────────────┘
                                               │
                                               ▼
                    ┌─────────────┐     ┌─────────────┐
                    │  Streamlit  │◀────│ Delta Lake  │
                    │ (Dashboard) │     │  (Storage)  │
                    └─────────────┘     └─────────────┘

Composants

Service Rôle Port
Zookeeper Coordination Kafka 2181
Kafka Message broker 9092
Spark Stream processing 4040
Delta Lake Stockage transactionnel -
Streamlit Visualisation 8501

Données

Source

Dataset eCommerce Events (Kaggle) :

  • Volume : 2.3 Go, 5 fichiers CSV
  • Période : Octobre 2019 - Février 2020
  • Événements : view, cart, remove_from_cart, purchase

Schéma

event_schema = {
    'event_time': 'timestamp',      # Horodatage
    'event_type': 'string',          # Type d'événement
    'product_id': 'integer',         # ID produit
    'category_id': 'long',           # Catégorie
    'category_code': 'string',       # Code catégorie
    'brand': 'string',               # Marque
    'price': 'float',                # Prix
    'user_id': 'integer',            # ID utilisateur
    'user_session': 'string'         # Session
}

Implémentation

1. Producer Kafka

# Envoi des événements vers Kafka
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for event in read_csv_stream('events.csv'):
    producer.send('clickstream', value=event)

2. Spark Structured Streaming

# Lecture du stream Kafka
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, sum, col

spark = SparkSession.builder \
    .appName("ClickstreamAnalytics") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# Stream depuis Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "clickstream") \
    .load()

# Agrégations par fenêtre temporelle
aggregated = df \
    .withWatermark("event_time", "1 minute") \
    .groupBy(
        window("event_time", "1 minute"),
        "event_type"
    ) \
    .agg(
        count("*").alias("event_count"),
        countDistinct("user_session").alias("unique_sessions")
    )

# Écriture vers Delta Lake
aggregated.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .start("/data/aggregated")

3. Dashboard Streamlit

# Lecture depuis Delta Lake et affichage temps réel
import streamlit as st
from delta import DeltaTable

st.set_page_config(page_title="Clickstream Analytics", layout="wide")

# Auto-refresh configurable (1-30 secondes)
refresh_interval = st.sidebar.slider("Refresh interval", 1, 30, 5)

# Métriques temps réel
col1, col2, col3, col4 = st.columns(4)
col1.metric("Sessions actives", get_active_sessions())
col2.metric("Utilisateurs uniques", get_unique_users())
col3.metric("Revenue total", format_currency(get_total_revenue()))
col4.metric("Taux de conversion", f"{get_conversion_rate():.1%}")

# Funnel de conversion
st.plotly_chart(create_funnel_chart())

# Top produits
st.dataframe(get_top_products())

Métriques Temps Réel

Fenêtres d’agrégation

  • 1 minute : Détection d’anomalies rapide
  • 5 minutes : Tendances court terme
  • 1 heure : Vue d’ensemble

KPIs calculés

Métrique Description
Sessions actives Sessions avec activité dans la dernière fenêtre
Utilisateurs uniques Nombre de user_id distincts
Revenue total Somme des prix des événements “purchase”
Funnel view → cart → purchase
Top produits Produits les plus consultés/achetés

Démonstration

Le dashboard n’est pas déployé en ligne (nécessite l’infrastructure Kafka/Spark), mais le projet est entièrement reproductible en local avec Docker Compose.

Prérequis

  • Docker & Docker Compose
  • 4-5 Go RAM disponibles
  • 2-3 CPU cores

Lancement

# Démarrer l'infrastructure
docker-compose up -d

# Lancer le producer
python src/producer.py

# Lancer le consumer Spark
spark-submit src/consumer.py

# Lancer le dashboard
streamlit run src/dashboard.py

Technologies

Composant Technologie Version
Message Broker Apache Kafka 2.8
Coordination Zookeeper 3.7
Stream Processing PySpark 3.5.0
Stockage Delta Lake 3.1.0
Dashboard Streamlit -
Orchestration Docker Compose -

Structure du Projet

realtime-clickstream-analytics/
├── docker-compose.yml       # Infrastructure
├── src/
│   ├── producer.py          # Kafka producer
│   ├── consumer.py          # Spark streaming
│   └── dashboard.py         # Streamlit dashboard
├── data/
│   └── raw/                 # Fichiers CSV source
├── config/
│   └── config.yaml          # Configuration
└── README.md

Enseignements

Ce projet m’a permis de maîtriser :

  1. Architecture streaming : Comprendre les patterns de traitement en temps réel
  2. Apache Kafka : Ingestion et distribution de messages à grande échelle
  3. Spark Structured Streaming : Agrégations avec watermarks et fenêtres
  4. Delta Lake : Stockage transactionnel pour le streaming
  5. Docker Compose : Orchestration de services multiples

← Retour au Portfolio Analysis