リアルタイムクリックストリーム分析
Data Engineering
Streaming
Kafka
Spark
Eコマース行動分析のためのストリーミング分析パイプライン
背景と課題
ビジネス課題:Eコマースでユーザー行動の異常をリアルタイムで検出するには?
Eコマースアナリストとして、ユーザー行動をリアルタイムで観察できることで以下が可能になります:
- 異常を素早く検出(急激なトラフィック減少、チェックアウトの問題)
- 体験を最適化(摩擦点を特定)
- イベントに迅速に対応(プロモーション、技術的インシデント)
このプロジェクトでは、取り込みから可視化までの完全なストリーミング分析パイプラインを実装します。
アーキテクチャ
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ CSVイベント │────▶│ Kafka │────▶│ Spark │
│ (ソース) │ │ (ブローカー) │ │(ストリーミング)│
└─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐ ┌─────────────┐
│ Streamlit │◀────│ Delta Lake │
│(ダッシュボード)│ │ (ストレージ) │
└─────────────┘ └─────────────┘
コンポーネント
| サービス | 役割 | ポート |
|---|---|---|
| Zookeeper | Kafka調整 | 2181 |
| Kafka | メッセージブローカー | 9092 |
| Spark | ストリーム処理 | 4040 |
| Delta Lake | トランザクションストレージ | - |
| Streamlit | 可視化 | 8501 |
データ
ソース
eCommerce Eventsデータセット(Kaggle):
- ボリューム:2.3 GB、5つのCSVファイル
- 期間:2019年10月〜2020年2月
- イベント:view、cart、remove_from_cart、purchase
スキーマ
event_schema = {
'event_time': 'timestamp', # タイムスタンプ
'event_type': 'string', # イベントタイプ
'product_id': 'integer', # 商品ID
'category_id': 'long', # カテゴリ
'category_code': 'string', # カテゴリコード
'brand': 'string', # ブランド
'price': 'float', # 価格
'user_id': 'integer', # ユーザーID
'user_session': 'string' # セッション
}実装
1. Kafkaプロデューサー
# イベントをKafkaに送信
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
for event in read_csv_stream('events.csv'):
producer.send('clickstream', value=event)2. Spark Structured Streaming
# Kafkaストリームからの読み取り
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, count, sum, col
spark = SparkSession.builder \
.appName("ClickstreamAnalytics") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# Kafkaからのストリーム
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "clickstream") \
.load()
# 時間ウィンドウ集計
aggregated = df \
.withWatermark("event_time", "1 minute") \
.groupBy(
window("event_time", "1 minute"),
"event_type"
) \
.agg(
count("*").alias("event_count"),
countDistinct("user_session").alias("unique_sessions")
)
# Delta Lakeへの書き込み
aggregated.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/tmp/checkpoint") \
.start("/data/aggregated")3. Streamlitダッシュボード
# Delta Lakeからの読み取りとリアルタイム表示
import streamlit as st
from delta import DeltaTable
st.set_page_config(page_title="Clickstream Analytics", layout="wide")
# 設定可能な自動更新(1-30秒)
refresh_interval = st.sidebar.slider("更新間隔", 1, 30, 5)
# リアルタイムメトリクス
col1, col2, col3, col4 = st.columns(4)
col1.metric("アクティブセッション", get_active_sessions())
col2.metric("ユニークユーザー", get_unique_users())
col3.metric("総収益", format_currency(get_total_revenue()))
col4.metric("コンバージョン率", f"{get_conversion_rate():.1%}")
# コンバージョンファネル
st.plotly_chart(create_funnel_chart())
# 人気商品
st.dataframe(get_top_products())リアルタイムメトリクス
集計ウィンドウ
- 1分:迅速な異常検出
- 5分:短期トレンド
- 1時間:概要
計算されるKPI
| メトリクス | 説明 |
|---|---|
| アクティブセッション | 最後のウィンドウでアクティビティのあるセッション |
| ユニークユーザー | 個別のuser_idカウント |
| 総収益 | 「purchase」イベントの価格合計 |
| ファネル | view → cart → purchase |
| 人気商品 | 最も閲覧/購入された商品 |
デモ
ダッシュボードはオンラインにデプロイされていません(Kafka/Sparkインフラが必要)が、Docker Composeを使用してローカルで完全に再現可能です。
前提条件
- Docker & Docker Compose
- 4-5 GB の利用可能なRAM
- 2-3 CPUコア
起動
# インフラを起動
docker-compose up -d
# プロデューサーを起動
python src/producer.py
# Sparkコンシューマーを起動
spark-submit src/consumer.py
# ダッシュボードを起動
streamlit run src/dashboard.pyテクノロジー
| コンポーネント | テクノロジー | バージョン |
|---|---|---|
| メッセージブローカー | Apache Kafka | 2.8 |
| 調整 | Zookeeper | 3.7 |
| ストリーム処理 | PySpark | 3.5.0 |
| ストレージ | Delta Lake | 3.1.0 |
| ダッシュボード | Streamlit | - |
| オーケストレーション | Docker Compose | - |
プロジェクト構造
realtime-clickstream-analytics/
├── docker-compose.yml # インフラ
├── src/
│ ├── producer.py # Kafkaプロデューサー
│ ├── consumer.py # Sparkストリーミング
│ └── dashboard.py # Streamlitダッシュボード
├── data/
│ └── raw/ # ソースCSVファイル
├── config/
│ └── config.yaml # 設定
└── README.md
学び
このプロジェクトで以下をマスターできました:
- ストリーミングアーキテクチャ:リアルタイム処理パターンの理解
- Apache Kafka:大規模なメッセージ取り込みと配信
- Spark Structured Streaming:ウォーターマークとウィンドウを使用した集計
- Delta Lake:ストリーミング用トランザクションストレージ
- Docker Compose:マルチサービスオーケストレーション