לדלג לתוכן

3.4 CQRS ו Event Sourcing הרצאה

CQRS ו-Event Sourcing

שני דפוסים מתקדמים שלעתים קרובות הולכים יחד. נלמד כל אחד בנפרד.


CQRS - הפרדת פקודות ושאילתות

CQRS הוא ראשי תיבות של Command Query Responsibility Segregation.

הרעיון: פקודה (Command, כתיבה) ושאילתה (Query, קריאה) הן פעולות שונות - תנו להן מודלים שונים.

הבעיה שCQRS פותר

בארכיטקטורה רגילה, אותו מודל משרת גם קריאה וגם כתיבה:

# אותו Repository לכל הפעולות
class TaskRepository:
    def find_by_id(self, task_id): ...       # שאילתה
    def find_by_owner(self, owner_id): ...   # שאילתה
    def find_overdue(self): ...              # שאילתה מורכבת עם joins
    def find_with_stats(self): ...           # שאילתה מאוד מורכבת
    def save(self, task): ...                # פקודה
    def update(self, task): ...              # פקודה
    def delete(self, task_id): ...           # פקודה

הבעיה: שאילתות מורכבות דורשות joins, aggregations ואינדקסים שונים מאלה שכתיבה דורשת. הניסיון לאופטם אותן יחד מקשה על שניהם.

עם CQRS

# Command Side - מודל כתיבה
class TaskCommandModel:
    def create(self, task: dict) -> int: ...
    def update_status(self, task_id: int, status: str): ...
    def delete(self, task_id: int): ...


# Query Side - מודל קריאה (יכול להיות DB שונה!)
class TaskQueryModel:
    def get_task_with_project_info(self, task_id: int) -> dict: ...
    def get_dashboard_summary(self, owner_id: int) -> dict: ...
    def get_overdue_with_assignee_details(self) -> list[dict]: ...
    def search_full_text(self, query: str) -> list[dict]: ...

CQRS בפועל

# Command Handler
class CreateTaskCommandHandler:
    def __init__(self, command_repo: TaskCommandModel):
        self.repo = command_repo

    def handle(self, title: str, owner_id: int) -> int:
        if len(title) < 3:
            raise ValueError("כותרת קצרה מדי")
        return self.repo.create({"title": title, "owner_id": owner_id, "status": "todo"})


# Query Handler
class GetDashboardQueryHandler:
    def __init__(self, query_repo: TaskQueryModel):
        self.repo = query_repo

    def handle(self, owner_id: int) -> dict:
        return self.repo.get_dashboard_summary(owner_id)


# Routes
@router.post("/tasks")
def create_task(title: str, owner_id: int):
    task_id = create_task_handler.handle(title, owner_id)
    return {"id": task_id}


@router.get("/dashboard")
def get_dashboard(owner_id: int):
    return dashboard_handler.handle(owner_id)

CQRS עם מסדי נתונים שונים

בגרסה המתקדמת, ה-Command side וה-Query side משתמשים במסדי נתונים שונים:

כתיבה → PostgreSQL (מותאם לעסקאות ועקביות)
קריאה → Elasticsearch (מותאם לחיפוש מלל) + Redis (cache)

כשמשימה נוצרת:
1. נשמרת ב-PostgreSQL (כתיבה)
2. Event "TaskCreated" מתפרסם
3. Consumer מעדכן את Elasticsearch ו-Redis (קריאה)


Event Sourcing - מקור אירועים

הרעיון הבסיסי: במקום לשמור את המצב הנוכחי, שמרו את רצף האירועים שהגיע אליו.

מצב נוכחי לעומת אירועים

מסד נתונים רגיל:
tasks: id=1, title="תקן באג", status="done"

Event Sourcing:
events:
  1. TaskCreated(id=1, title="תקן באג", owner=5)
  2. TaskAssigned(task_id=1, assignee=7)
  3. TaskStatusChanged(task_id=1, status="in_progress")
  4. CommentAdded(task_id=1, text="עובד על זה")
  5. TaskStatusChanged(task_id=1, status="done")

כדי לדעת המצב הנוכחי של משימה - "מנגנים" את האירועים ברצף.

מימוש

from dataclasses import dataclass, field
from datetime import datetime


@dataclass
class Event:
    event_type: str
    task_id: int
    data: dict
    timestamp: datetime = field(default_factory=datetime.now)


class EventStore:
    def __init__(self):
        self._events: list[Event] = []

    def append(self, event: Event):
        self._events.append(event)

    def get_events_for_task(self, task_id: int) -> list[Event]:
        return [e for e in self._events if e.task_id == task_id]


class Task:
    def __init__(self):
        self.id: int | None = None
        self.title: str = ""
        self.status: str = ""
        self.assignee_id: int | None = None
        self.comments: list[str] = []

    def apply(self, event: Event):
        """מחיל אירוע אחד על המצב"""
        if event.event_type == "TaskCreated":
            self.id = event.task_id
            self.title = event.data["title"]
            self.status = "todo"
        elif event.event_type == "TaskAssigned":
            self.assignee_id = event.data["assignee_id"]
        elif event.event_type == "TaskStatusChanged":
            self.status = event.data["status"]
        elif event.event_type == "CommentAdded":
            self.comments.append(event.data["text"])

    @classmethod
    def from_events(cls, events: list[Event]) -> "Task":
        """משחזר את המצב מרצף אירועים"""
        task = cls()
        for event in events:
            task.apply(event)
        return task


class TaskCommandService:
    def __init__(self, event_store: EventStore):
        self.store = event_store

    def create_task(self, task_id: int, title: str, owner_id: int):
        self.store.append(Event("TaskCreated", task_id, {"title": title, "owner_id": owner_id}))

    def assign_task(self, task_id: int, assignee_id: int):
        self.store.append(Event("TaskAssigned", task_id, {"assignee_id": assignee_id}))

    def change_status(self, task_id: int, status: str):
        self.store.append(Event("TaskStatusChanged", task_id, {"status": status}))

    def get_task(self, task_id: int) -> Task:
        events = self.store.get_events_for_task(task_id)
        if not events:
            raise ValueError(f"משימה {task_id} לא נמצאה")
        return Task.from_events(events)


# שימוש
store = EventStore()
service = TaskCommandService(store)

service.create_task(1, "תקן באג", owner_id=5)
service.assign_task(1, assignee_id=7)
service.change_status(1, "in_progress")
service.change_status(1, "done")

task = service.get_task(1)
print(task.status)  # "done"
print(task.assignee_id)  # 7

# ניתן לדעת מה המצב היה בכל נקודת זמן!
events_until_assignment = store.get_events_for_task(1)[:2]
task_before_assignment = Task.from_events(events_until_assignment)
print(task_before_assignment.assignee_id)  # None

יתרונות Event Sourcing

Audit Trail מלא: יודעים בדיוק מה קרה, מתי, ומי עשה מה.

Time Travel: אפשר לשחזר את המצב בכל נקודת זמן.

Debug: כשיש באג - יש לכם את כל ההיסטוריה.

Projections: אפשר לבנות views שונים מאותם אירועים.

חסרונות Event Sourcing

מורכבות: הרבה יותר קוד מDB רגיל.

performance: שחזור מצב דורש replay של כל האירועים (נפתר עם snapshots).

לא מתאים לכל: מערכות רבות לא צריכות היסטוריה מלאה.

סיכום

דפוס פותר מורכבות
CQRS קריאה וכתיבה בעלי דרישות שונות בינונית
Event Sourcing צורך ב-audit trail ומצב היסטורי גבוהה

השתמשו בהם כשיש צורך אמיתי - לא כי הם "מגניבים".