Pular para o conteúdo principal

Hexagonal Architecture

Data Stream follows Hexagonal Architecture (Ports & Adapters) for clean separation of concerns.

Overview

Hexagonal Architecture

Core Principles

1. Domain at the Center

The domain layer contains pure business logic with no external dependencies:

// internal/core/domain/round.go
package domain

type Round struct {
ID int64
GameID int
GameSlug string
GameType string
FinishedAt time.Time
Extras json.RawMessage
Timestamp int64
}

// Pure business logic - no external deps
func (r *Round) IsValid() bool {
return r.ID > 0 && r.GameSlug != ""
}

2. Ports Define Contracts

Ports are interfaces that define how the domain interacts with the outside world:

// internal/core/ports/repositories.go
package ports

type RoundRepository interface {
GetLatest(ctx context.Context, gameSlug string) (*domain.Round, error)
GetHistory(ctx context.Context, gameSlug string, limit int) ([]domain.Round, error)
}

type StreamSubscriber interface {
Subscribe(ctx context.Context, channel string) (<-chan domain.Round, error)
}

3. Adapters Implement Ports

Adapters connect the domain to external systems:

// internal/adapters/outbound/redis/round_repository.go
package redis

type RoundRepository struct {
client *redis.Client
}

func (r *RoundRepository) GetLatest(ctx context.Context, gameSlug string) (*domain.Round, error) {
key := fmt.Sprintf("latest:%s", gameSlug)
data, err := r.client.Get(ctx, key).Result()
if err != nil {
return nil, domain.ErrRoundNotFound
}
// Parse and return...
}

Project Structure

backend/internal/
├── core/ # Business Logic
│ ├── domain/ # Entities
│ │ ├── game.go # Game entity
│ │ ├── round.go # Round entity
│ │ └── errors.go # Domain errors
│ │
│ ├── ports/ # Interfaces
│ │ ├── repositories.go # Data access
│ │ ├── messaging.go # Pub/Sub
│ │ └── services.go # Service ports
│ │
│ └── services/ # Use Cases
│ └── round_service.go # Query service

└── adapters/
├── inbound/ # Driving (Primary)
│ ├── http/ # REST handlers
│ ├── websocket/ # WS handlers
│ ├── sse/ # SSE handlers
│ └── webtransport/ # WT server

└── outbound/ # Driven (Secondary)
└── redis/ # Redis adapter

Benefits

Testability

Mock ports for unit testing:

type MockRoundRepo struct {
rounds map[string]*domain.Round
}

func (m *MockRoundRepo) GetLatest(ctx context.Context, slug string) (*domain.Round, error) {
if r, ok := m.rounds[slug]; ok {
return r, nil
}
return nil, domain.ErrRoundNotFound
}

func TestRoundService(t *testing.T) {
mock := &MockRoundRepo{
rounds: map[string]*domain.Round{
"crash": {ID: 1, GameSlug: "crash"},
},
}
service := services.NewRoundQueryService(mock)
// Test service...
}

Flexibility

Swap adapters without changing business logic:

// Easy to switch from Redis to PostgreSQL
var repo ports.RoundRepository

if useRedis {
repo = redis.NewRoundRepository(redisClient)
} else {
repo = postgres.NewRoundRepository(db)
}

service := services.NewRoundQueryService(repo)

Maintainability

Clear boundaries make code easier to understand and modify.

Wire Dependency Injection

DataStream uses Google Wire for compile-time dependency injection.

Why Wire?

Manual (main.go)Wire (di/)
Errors at runtimeErrors at compile-time
Manual orderingAutomatic ordering
Hard to refactorEasy to add deps
Zero reflectionZero reflection

Structure

di/
├── wire.go # Provider definitions (you write)
└── wire_gen.go # Generated code (wire generates)

Provider Sets

//go:build wireinject

package di

import "github.com/google/wire"

// PostgresSet provides database adapters
var PostgresSet = wire.NewSet(
postgres.NewGameRepository,
wire.Bind(new(ports.GameRepository), new(*postgres.GameRepository)),
)

// RedisSet provides cache adapters
var RedisSet = wire.NewSet(
redis.NewRoundRepository,
redis.NewStreamPublisher,
wire.Bind(new(ports.RoundRepository), new(*redis.RoundRepository)),
wire.Bind(new(ports.StreamPublisher), new(*redis.StreamPublisher)),
)

// NATSSet provides messaging adapters
var NATSSet = wire.NewSet(
natsadapter.NewCDCConsumer,
wire.Bind(new(ports.CDCConsumer), new(*natsadapter.CDCConsumer)),
)

// ApplicationSet provides use cases
var ApplicationSet = wire.NewSet(
services.NewRoundService,
services.NewGameCacheService,
)

// InitializeApp wires everything together
func InitializeApp(
ctx context.Context,
db *sql.DB,
redisClient *redis.Client,
nc *nats.Conn,
) (*App, error) {
wire.Build(
PostgresSet,
RedisSet,
NATSSet,
ApplicationSet,
ProvideApp,
)
return nil, nil
}

Usage

# Generate wire code
task wire

# Check if regeneration needed
task wire:check

Domain Errors

Typed errors for consistent error handling:

// internal/core/domain/errors.go

// Sentinel errors
var (
ErrGameNotFound = errors.New("game not found")
ErrRoundNotFound = errors.New("round not found")
ErrInvalidGameSlug = errors.New("invalid game slug")
)

// Typed errors
type NotFoundError struct {
Resource string
ID string
}

func (e *NotFoundError) Error() string {
return fmt.Sprintf("%s '%s' not found", e.Resource, e.ID)
}

type ValidationError struct {
Field string
Message string
}

func (e *ValidationError) Error() string {
return fmt.Sprintf("validation error on %s: %s", e.Field, e.Message)
}

// Helper functions
func IsNotFoundError(err error) bool {
var nfe *NotFoundError
return errors.As(err, &nfe) || errors.Is(err, ErrGameNotFound) || errors.Is(err, ErrRoundNotFound)
}

func IsValidationError(err error) bool {
var ve *ValidationError
return errors.As(err, &ve)
}

func NewNotFoundError(resource, id string) error {
return &NotFoundError{Resource: resource, ID: id}
}

Error Handling in Adapters

// HTTP adapter error handling
func errorHandler(c *fiber.Ctx, err error) error {
if domain.IsNotFoundError(err) {
return c.Status(404).JSON(fiber.Map{
"error": err.Error(),
"success": false,
})
}
if domain.IsValidationError(err) {
return c.Status(400).JSON(fiber.Map{
"error": err.Error(),
"success": false,
})
}
return c.Status(500).JSON(fiber.Map{
"error": "internal server error",
"success": false,
})
}

Production Middlewares

Panic Recovery

Captures panics and prevents server crashes:

app.Use(recover.New(recover.Config{
EnableStackTrace: true,
}))

Rate Limiter

Limits requests per IP:

api := app.Group("/api", limiter.New(limiter.Config{
Max: 100, // 100 requests
Expiration: 1 * time.Minute, // per minute
}))

Response headers:

X-Ratelimit-Limit: 100
X-Ratelimit-Remaining: 97
X-Ratelimit-Reset: 53

Custom Error Handler

Maps domain errors to HTTP responses:

app := fiber.New(fiber.Config{
ErrorHandler: func(c *fiber.Ctx, err error) error {
if domain.IsNotFoundError(err) {
return c.Status(404).JSON(fiber.Map{
"error": err.Error(),
"success": false,
})
}
if domain.IsValidationError(err) {
return c.Status(400).JSON(fiber.Map{
"error": err.Error(),
"success": false,
})
}
return c.Status(500).JSON(fiber.Map{
"error": "internal server error",
"success": false,
})
},
})