Skip to main content

CDC Pipeline

Change Data Capture (CDC) is the core of Data Stream's real-time capabilities.

Overview

CDC captures database changes at the source (PostgreSQL WAL) and streams them through the pipeline:

PostgreSQL → Debezium → NATS JetStream → Consumer → Redis → Clients

Components

PostgreSQL (Source)

Configured for logical replication:

-- wal_level must be 'logical'
SHOW wal_level;
-- logical

-- Replication slot for Debezium
SELECT * FROM pg_replication_slots;

Debezium (CDC Engine)

Captures WAL changes and publishes to NATS:

# debezium/application.properties
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=datastream
debezium.source.database.password=datastream
debezium.source.database.dbname=datastream
debezium.source.topic.prefix=datastream
debezium.source.table.include.list=public.rounds
debezium.source.plugin.name=pgoutput

debezium.sink.type=nats-jetstream
debezium.sink.nats-jetstream.url=nats://nats:4222
debezium.sink.nats-jetstream.stream=DebeziumStream

NATS JetStream (Message Broker)

Durable message streaming:

Streams:
├── DebeziumStream # Raw CDC events
│ └── datastream.public.rounds
└── ROUNDS # Segmented by game
├── rounds.game.crash
├── rounds.game.double
└── rounds.type.multiplier

Go Consumer

Processes CDC events and publishes to Redis:

// consumer/internal/adapters/nats/consumer.go

func (c *CDCConsumer) ProcessEvent(msg *nats.Msg) error {
var event CDCEvent
json.Unmarshal(msg.Data, &event)

// Skip non-insert events
if event.Payload.Op != "c" {
return nil
}

round := c.mapToRound(event.Payload.After)

// Deduplication
if c.isDuplicate(round.ID) {
return nil
}

// Publish to Redis
c.redisPublisher.Publish(round)

return nil
}

Redis (Read Model)

Stores processed data and provides Pub/Sub:

Keys:
├── latest:crash # Last round (STRING)
├── history:crash # History (ZSET)
├── history:type:multiplier # By type (ZSET)
├── stream:crash # Pub/Sub channel
└── processed:12345 # Dedup (STRING, TTL 5min)

Data Flow

1. INSERT in PostgreSQL

INSERT INTO rounds (game_id, status, extras, finished_at)
VALUES (1, 'finished', '{"point": "5.32"}', NOW());

2. WAL Entry Created

PostgreSQL writes to Write-Ahead Log.

3. Debezium Captures Change

Debezium reads WAL via logical replication:

{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 513,
"game_id": 1,
"status": "finished",
"extras": "{\"point\": \"5.32\"}",
"finished_at": 1768621542735266
},
"source": {...},
"op": "c",
"ts_ms": 1768621542740
}
}

4. Published to NATS

Debezium publishes to datastream.public.rounds subject.

5. Consumer Processes

// Enrich with game metadata
game := c.gameCache.Get(event.GameID)
round.GameSlug = game.Slug
round.GameType = game.Type

// Store in Redis
c.redis.Set(ctx, fmt.Sprintf("latest:%s", round.GameSlug), round)
c.redis.ZAdd(ctx, fmt.Sprintf("history:%s", round.GameSlug), redis.Z{
Score: float64(round.Timestamp),
Member: round,
})

// Publish to Pub/Sub
c.redis.Publish(ctx, fmt.Sprintf("stream:%s", round.GameSlug), round)

6. Clients Receive

Clients connected via WebSocket/SSE receive the update.

Latency Breakdown

StageLatency
PostgreSQL INSERT1-5ms
WAL write~1ms
Debezium capture~1ms
NATS publish~1ms
Consumer process~1ms
Redis publish~1ms
Total< 10ms

Deduplication

Prevents processing the same event twice:

func (c *Consumer) isDuplicate(roundID int64) bool {
key := fmt.Sprintf("processed:%d", roundID)
result, _ := c.redis.SetNX(ctx, key, "1", 5*time.Minute).Result()
return !result // If SetNX returns false, key already exists
}

Error Handling

Transient Errors

NATS JetStream provides at-least-once delivery:

sub.Consume(func(msg jetstream.Msg) {
if err := processEvent(msg); err != nil {
// Don't ack - will be redelivered
msg.Nak()
return
}
msg.Ack()
})

Poison Messages

Skip after max retries:

if msg.NumDelivered() > 10 {
log.Error("Max retries exceeded, skipping", "id", msg.ID)
msg.Term()
return
}

Monitoring

Debezium Health

curl http://localhost:8080/q/health

NATS Streams

curl http://localhost:8222/jsz | jq '.account_details[0].stream_detail'

Consumer Lag

# Check consumer position
nats consumer info ROUNDS consumer-name