マーケティングアナリティクスパイプライン(GCP + dbt)
Data Engineering
Marketing Analytics
GCP
マーケティングKPI追跡のためのモダンなデータエンジニアリングアーキテクチャ
背景と課題
ビジネス課題:マーケティングKPIをリアルタイムで追跡するためのモダンなインフラをどのように構築するか?
マーケティングチームは、情報に基づいた意思決定を行うために、パフォーマンス指標に素早くアクセスする必要があります。このプロジェクトでは、モダンなデータエンジニアリングのベストプラクティスを使用した完全なETLパイプラインの構築を実演します。
アーキテクチャ
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ ソース │────▶│ 抽出 │────▶│ BigQuery │────▶│ dbt │
│ (CSV) │ │ (Python) │ │ (Raw) │ │ (変換) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
│
▼
┌─────────────┐ ┌─────────────┐
│ Streamlit │◀────│ BigQuery │
│(ダッシュボード)│ │ (Analytics) │
└─────────────┘ └─────────────┘
コンポーネント
- 抽出:現実的なシミュレートされたマーケティングデータを生成するPythonスクリプト
-
ロード:Google BigQueryへのロード(データセット
marketing_raw) -
変換:KPIを計算するdbt(データセット
marketing_analytics) - 可視化:インタラクティブなStreamlitダッシュボード
実装
1. データ抽出
# src/extract_data.py
import pandas as pd
import numpy as np
def generate_marketing_data(start_date, end_date, seed=42):
"""現実的なシミュレートされたマーケティングデータを生成。"""
np.random.seed(seed)
channels = {
'Google Ads': {'sessions': 1500, 'conv_rate': 0.05, 'spend': 3000},
'Facebook Ads': {'sessions': 1200, 'conv_rate': 0.04, 'spend': 2500},
'LinkedIn': {'sessions': 400, 'conv_rate': 0.06, 'spend': 1500},
'Email': {'sessions': 800, 'conv_rate': 0.08, 'spend': 500},
'Direct': {'sessions': 2000, 'conv_rate': 0.03, 'spend': 0}
}
# 現実的な変動を持つ生成
# - セッションにポアソン分布
# - 週末効果(-30%)
# - 負の値なし
return df2. dbt変換
-- dbt_marketing/models/staging/stg_marketing_daily.sql
WITH source AS (
SELECT * FROM {{ source('marketing_raw', 'daily_performance') }}
),
transformed AS (
SELECT
date,
DATE_TRUNC(date, WEEK) AS week_start_date,
DATE_TRUNC(date, MONTH) AS month_start_date,
source AS marketing_source,
sessions,
conversions,
revenue,
spend,
-- 計算されたKPI
SAFE_DIVIDE(revenue, spend) AS roas,
SAFE_DIVIDE(spend, conversions) AS cost_per_conversion,
SAFE_DIVIDE(conversions, sessions) AS conversion_rate,
SAFE_DIVIDE(revenue, conversions) AS revenue_per_conversion,
SAFE_DIVIDE(revenue, sessions) AS revenue_per_session,
-- フラグ
spend = 0 AS is_organic_channel,
EXTRACT(DAYOFWEEK FROM date) IN (1, 7) AS is_weekend
FROM source
)
SELECT * FROM transformed3. Streamlitダッシュボード
ダッシュボードは以下を表示します:
- グローバルKPI:総支出、収益、コンバージョン、ROAS
- 時系列トレンド:日別の支出 vs 収益
- チャネル別パフォーマンス:マーケティングソースの比較
- ソース別ROAS:広告費用対効果
- コンバージョンファネル:セッション → コンバージョン
結果
計算されるKPI
| メトリクス | 計算式 | 用途 |
|---|---|---|
| ROAS | 収益 / 支出 | 全体的な効果 |
| CPA | 支出 / コンバージョン | 獲得コスト |
| コンバージョン率 | コンバージョン / セッション | トラフィック品質 |
| セッション収益 | 収益 / セッション | トラフィック価値 |
生成データ
- 期間:84日(2024年9月〜11月)
- チャネル:5つのマーケティングソース
- ボリューム:実行ごとに約420行
デモ
ダッシュボードでは以下が可能です:
- 日付範囲でフィルター
- 表示するチャネルを選択
- トレンドを可視化
- パフォーマンスを比較
テクノロジー
| コンポーネント | テクノロジー | バージョン |
|---|---|---|
| 言語 | Python | 3.13+ |
| データウェアハウス | Google BigQuery | Cloud |
| 変換 | dbt-core + dbt-bigquery | 1.10.3 |
| ダッシュボード | Streamlit | 1.51.0 |
| 可視化 | Plotly + Altair | - |
| 依存関係管理 | Poetry | - |
プロジェクト構造
marketing-data-pipeline/
├── src/
│ ├── extract_data.py # データ生成
│ ├── load_bigquery.py # BigQueryロード
│ └── dashboard.py # Streamlitダッシュボード
├── dbt_marketing/
│ ├── dbt_project.yml # dbt設定
│ └── models/
│ └── staging/
│ ├── sources.yml
│ └── stg_marketing_daily.sql
├── data/
│ └── raw/
│ └── marketing_data.csv
├── pyproject.toml # Poetry依存関係
└── README.md
実行
# 1. 抽出
python src/extract_data.py
# 2. BigQueryロード
python src/load_bigquery.py
# 3. dbt変換
cd dbt_marketing && dbt run
# 4. ダッシュボード
streamlit run src/dashboard.py学び
このプロジェクトはモダンデータスタックのベストプラクティスを示しています:
- 関心の分離:抽出、ロード、変換を分離
- Infrastructure as Code:バージョン管理された変換のためのdbt
- 再現性:シミュレートデータの固定シード
- ドキュメント化:dbtソースとモデルのドキュメント化