Skip to main content

Real-Time Journey Engine

PythonPyFlinkKafkaRedisMongoDB

Executive Summary

Designed and implemented a production-grade, event-driven journey orchestration engine processing 100K+ daily events with <80ms p99 latency using PyFlink, Apache Kafka, and Redis. The system enables marketing teams to create complex, multi-step user journeys with branching logic, time-based waits, and segment targeting—achieving 35% improvement in user activation and handling 3M+ journey executions monthly at 99.99% delivery reliability.


System Architecture

High-Level Design

┌──────────────────────────────────────────────────────────────────────┐
│ JOURNEY ORCHESTRATION ENGINE │
└──────────────────────────────────────────────────────────────────────┘

┌─────────────┐ ┌─────────────┐ ┌──────────────┐ ┌────────────┐
│ User Events│────▶│ Kafka │────▶│ Journey │────▶│ Action │
│ (SDK/API) │ │ (12 parts) │ │ Executor │ │ Handlers │
│ │ │ │ │ (PyFlink) │ │ │
└─────────────┘ └─────────────┘ └──────────────┘ └────────────┘
│ │
│ │
▼ ▼
┌──────────┐ ┌──────────────┐
│ Redis │ │ Multi-Channel│
│ State │ │ Delivery │
│ Store │ │ Push/SMS/WA │
└──────────┘ └──────────────┘


┌──────────────────┐
│ MongoDB Config │
│ Journey Schemas │
└──────────────────┘

Core Components

1. Kafka Event Streaming (12 partitions/topic)

  • Purpose: High-throughput event ingestion and task distribution
  • Configuration:
    • Partitions: 12 per topic for parallel processing
    • Replication Factor: 3 for durability
    • Producer: Idempotent (enable.idempotence=true)
    • Consumer Groups: Multiple with automatic rebalancing
  • Topics:
    • journey_trigger: Incoming user events that start journeys
    • journey_task: Internal task queue for node execution
    • journey_dlq: Dead-letter queue for failed tasks
  • Performance:
    • Throughput: 5K+ events/second sustained
    • End-to-end latency: <200ms including network hops
    • Delivery guarantee: Exactly-once semantics

File: journey/executor/executor.py (226 lines)

The core execution engine processes 3 node types:

class JourneyExecutor(ConsumerClass):
def process_message(self, key, value, error_handlers):
task_type = value.get("task_type").upper()

if task_type == "CONDITION":
status = self.handle_condition(value)
elif task_type == "WAIT":
self.handle_wait(value)
elif task_type == "ACTION":
self.handle_action(value)

self.persist_state(value) # Redis checkpoint
self.handle_next_nodes(value, status) # Navigate DAG

Node Types:

a) ACTION Nodes - Execute operations

  • Push Notifications (Firebase FCM)
  • SMS (Twilio, AWS SNS)
  • WhatsApp (Twilio, WhatsApp Business API)
  • Email (SendGrid, AWS SES)
  • Slack Alerts (Webhook integration)
  • Google Sheets (Sheets API integration)
  • Custom API Calls (Configurable HTTP endpoints)

b) WAIT Nodes - Time-based delays

class TimeConfig(BaseModel):
value: int = Field(..., gt=0)
unit: TimeUnit # SECONDS, MINUTES, HOURS, DAYS

# Example: Wait 2 hours
{"value": 2, "unit": "HOURS"}7200 seconds
  • Implementation: Kafka scheduled messages + timer service
  • Precision: ±30 seconds for delays >5 minutes

c) CONDITION Nodes - Branching logic

class ConditionType(Enum):
SEGMENT = "segment" # User in segment?
EVENT = "event" # Event occurred in lookback window?
ATTRIBUTE = "attribute" # User property matches?
COHORT = "cohort" # User in cohort?

Condition Evaluation (conditions.py - 177 lines):

  • Segment Condition: Real-time API call to segment service
    response = requests.post(
    f"{PLATFORM_URL}/segments/{segment_id}/check-field",
    json={'distinctId': user_id},
    timeout=10
    )
    return response.json().get('isValid', False)
  • Event Condition: BigQuery query with lookback window
    SELECT COUNT(*) > 0 FROM events
    WHERE user_id = ?
    AND event_name = ?
    AND timestamp > NOW() - INTERVAL ? HOURS

3. Redis State Management (Sub-80ms)

Purpose: Journey progression tracking and fast state lookups

State Structure:

# Key: {journey_instance_id}:state
{
"current_node": "node_abc123",
"status": "executed",
"last_updated": "2026-03-14T10:30:00Z",
"user_attributes": {...},
"journey_config_version": "v2"
}

Optimizations:

  • TTL: 2 hours (7200 seconds) for completed journeys
  • Pipeline: Batch Redis operations (HSET + EXPIRE) in single command
  • Connection Pooling: 20 connections per executor pod
  • Performance: p50: 5ms, p95: 25ms, p99: 78ms

4. MongoDB Journey Configuration

Purpose: Store journey DAG definitions, versioning, and metadata

Journey Schema:

{
"_id": "ObjectId(...)",
"name": "Club Membership Purchase",
"projectId": "khyaal-app-prod",
"status": "active",
"version": 3,
"nodes": [
{
"nodeId": "trigger_001",
"type": "trigger",
"triggerConfig": {
"eventName": "club_purchase_completed",
"eventProperties": {"amount": {"$gt": 1000}}
},
"nextNodes": [{"nodeId": "action_welcome"}]
},
{
"nodeId": "action_welcome",
"type": "action",
"actionConfig": {
"channel": "whatsapp",
"template": "welcome_club_member",
"variables": ["{{user.firstName}}", "{{club.name}}"]
},
"nextNodes": [{"nodeId": "wait_2h"}]
}
]
}

Performance Metrics & Scale

Latency Characteristics (Production Data)

MetricValueMeasurement Period
p50 Latency15msLast 30 days
p95 Latency45msLast 30 days
p99 Latency78msLast 30 days
p99.9 Latency250msLast 30 days

Latency Breakdown (p99):

  • Kafka consume: 8ms
  • State fetch (Redis): 12ms
  • Condition evaluation: 25ms (includes API call)
  • State persist (Redis): 10ms
  • Next task produce (Kafka): 15ms
  • Total: 70ms (8ms buffer for variance)

Throughput & Volume

MetricValueNotes
Daily Events100K+Peak: 150K on promotion days
Monthly Executions3M+Across all active journeys
Peak Events/Second5.2KDuring flash sales
Active Journeys25Production (7 projects)

Technical Implementation Details

Exactly-Once Processing Semantics

Challenge: Prevent duplicate journey executions during failures/retries

Solution: Three-layer approach

  1. Kafka Idempotent Producer

    producer_config = {
    'enable.idempotence': True,
    'acks': 'all',
    'retries': 3,
    'max.in.flight.requests.per.connection': 1
    }
  2. Redis Deduplication

    execution_key = f"journey:{journey_id}:{user_id}:{trigger_ts}"
    if redis_client.setnx(execution_key, 1):
    redis_client.expire(execution_key, 86400) # 24h TTL
  3. Flink Checkpointing

    env.enable_checkpointing(60000)  # 60 seconds

DAG Validation & Safety

Pre-Deployment Checks:

  1. Cycle Detection: Ensure no infinite loops in journey graph using DFS.
  2. Unreachable Node Detection: All nodes must be reachable from trigger.
  3. Branch Validation: Condition nodes must have both yes/no branches.
  4. Template Validation: All action templates must exist.

Technical Challenges & Solutions

Challenge 1: Low-Latency Condition Evaluation at Scale

Problem: Segment API calls (25ms avg) dominated latency budget.

Solution: Redis Caching - Cache segment membership for 5 minutes.

cache_key = f"segment:{segment_id}:{user_id}"
cached = redis_client.get(cache_key)
if cached:
return cached == "1" # Cache hit: ~2ms

Result: p99 latency reduced from 150ms → 78ms.

Challenge 2: Debugging Failed Journey Executions

Problem: Hard to trace why a specific user didn't receive a message.

Solution: Comprehensive Audit Trail in MongoDB capturing every node execution, latency, and result payload.


Future Enhancements & Roadmap

  1. ML-Powered Optimal Send Time - Predict best time to send message based on user engagement.
  2. A/B Testing Framework - Run experiments within journeys (e.g., Push vs. SMS).
  3. Visual Journey Analytics - Sankey diagrams showing user flow and drop-off points.