リアルタイムクリックストリーム分析

Data Engineering
Streaming
Kafka
Spark
Eコマース行動分析のためのストリーミング分析パイプライン

背景と課題

ビジネス課題:Eコマースでユーザー行動の異常をリアルタイムで検出するには?

Eコマースアナリストとして、ユーザー行動をリアルタイムで観察できることで以下が可能になります:

  • 異常を素早く検出(急激なトラフィック減少、チェックアウトの問題)
  • 体験を最適化(摩擦点を特定)
  • イベントに迅速に対応(プロモーション、技術的インシデント)

このプロジェクトでは、取り込みから可視化までの完全なストリーミング分析パイプラインを実装します。

アーキテクチャ

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  CSVイベント │────▶│    Kafka    │────▶│    Spark    │
│  (ソース)   │     │  (ブローカー) │     │(ストリーミング)│
└─────────────┘     └─────────────┘     └─────────────┘
                                               │
                                               ▼
                    ┌─────────────┐     ┌─────────────┐
                    │  Streamlit  │◀────│ Delta Lake  │
                    │(ダッシュボード)│    │ (ストレージ) │
                    └─────────────┘     └─────────────┘

コンポーネント

サービス 役割 ポート
Zookeeper Kafka調整 2181
Kafka メッセージブローカー 9092
Spark ストリーム処理 4040
Delta Lake トランザクションストレージ -
Streamlit 可視化 8501

データ

ソース

eCommerce Eventsデータセット(Kaggle):

  • ボリューム:2.3 GB、5つのCSVファイル
  • 期間:2019年10月〜2020年2月
  • イベント:view、cart、remove_from_cart、purchase

スキーマ

event_schema = {
    'event_time': 'timestamp',      # タイムスタンプ
    'event_type': 'string',          # イベントタイプ
    'product_id': 'integer',         # 商品ID
    'category_id': 'long',           # カテゴリ
    'category_code': 'string',       # カテゴリコード
    'brand': 'string',               # ブランド
    'price': 'float',                # 価格
    'user_id': 'integer',            # ユーザーID
    'user_session': 'string'         # セッション
}

実装

1. Kafkaプロデューサー

# イベントをKafkaに送信
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

for event in read_csv_stream('events.csv'):
    producer.send('clickstream', value=event)

2. Spark Structured Streaming

# Kafkaストリームからの読み取り
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, sum, col

spark = SparkSession.builder \
    .appName("ClickstreamAnalytics") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .getOrCreate()

# Kafkaからのストリーム
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "clickstream") \
    .load()

# 時間ウィンドウ集計
aggregated = df \
    .withWatermark("event_time", "1 minute") \
    .groupBy(
        window("event_time", "1 minute"),
        "event_type"
    ) \
    .agg(
        count("*").alias("event_count"),
        countDistinct("user_session").alias("unique_sessions")
    )

# Delta Lakeへの書き込み
aggregated.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoint") \
    .start("/data/aggregated")

3. Streamlitダッシュボード

# Delta Lakeからの読み取りとリアルタイム表示
import streamlit as st
from delta import DeltaTable

st.set_page_config(page_title="Clickstream Analytics", layout="wide")

# 設定可能な自動更新(1-30秒)
refresh_interval = st.sidebar.slider("更新間隔", 1, 30, 5)

# リアルタイムメトリクス
col1, col2, col3, col4 = st.columns(4)
col1.metric("アクティブセッション", get_active_sessions())
col2.metric("ユニークユーザー", get_unique_users())
col3.metric("総収益", format_currency(get_total_revenue()))
col4.metric("コンバージョン率", f"{get_conversion_rate():.1%}")

# コンバージョンファネル
st.plotly_chart(create_funnel_chart())

# 人気商品
st.dataframe(get_top_products())

リアルタイムメトリクス

集計ウィンドウ

  • 1分:迅速な異常検出
  • 5分:短期トレンド
  • 1時間:概要

計算されるKPI

メトリクス 説明
アクティブセッション 最後のウィンドウでアクティビティのあるセッション
ユニークユーザー 個別のuser_idカウント
総収益 「purchase」イベントの価格合計
ファネル view → cart → purchase
人気商品 最も閲覧/購入された商品

デモ

ダッシュボードはオンラインにデプロイされていません(Kafka/Sparkインフラが必要)が、Docker Composeを使用してローカルで完全に再現可能です。

前提条件

  • Docker & Docker Compose
  • 4-5 GB の利用可能なRAM
  • 2-3 CPUコア

起動

# インフラを起動
docker-compose up -d

# プロデューサーを起動
python src/producer.py

# Sparkコンシューマーを起動
spark-submit src/consumer.py

# ダッシュボードを起動
streamlit run src/dashboard.py

テクノロジー

コンポーネント テクノロジー バージョン
メッセージブローカー Apache Kafka 2.8
調整 Zookeeper 3.7
ストリーム処理 PySpark 3.5.0
ストレージ Delta Lake 3.1.0
ダッシュボード Streamlit -
オーケストレーション Docker Compose -

プロジェクト構造

realtime-clickstream-analytics/
├── docker-compose.yml       # インフラ
├── src/
│   ├── producer.py          # Kafkaプロデューサー
│   ├── consumer.py          # Sparkストリーミング
│   └── dashboard.py         # Streamlitダッシュボード
├── data/
│   └── raw/                 # ソースCSVファイル
├── config/
│   └── config.yaml          # 設定
└── README.md

学び

このプロジェクトで以下をマスターできました:

  1. ストリーミングアーキテクチャ:リアルタイム処理パターンの理解
  2. Apache Kafka:大規模なメッセージ取り込みと配信
  3. Spark Structured Streaming:ウォーターマークとウィンドウを使用した集計
  4. Delta Lake:ストリーミング用トランザクションストレージ
  5. Docker Compose:マルチサービスオーケストレーション

← ポートフォリオ分析に戻る