3.4 CQRS ו Event Sourcing הרצאה
CQRS ו-Event Sourcing¶
שני דפוסים מתקדמים שלעתים קרובות הולכים יחד. נלמד כל אחד בנפרד.
CQRS - הפרדת פקודות ושאילתות¶
CQRS הוא ראשי תיבות של Command Query Responsibility Segregation.
הרעיון: פקודה (Command, כתיבה) ושאילתה (Query, קריאה) הן פעולות שונות - תנו להן מודלים שונים.
הבעיה שCQRS פותר¶
בארכיטקטורה רגילה, אותו מודל משרת גם קריאה וגם כתיבה:
# אותו Repository לכל הפעולות
class TaskRepository:
def find_by_id(self, task_id): ... # שאילתה
def find_by_owner(self, owner_id): ... # שאילתה
def find_overdue(self): ... # שאילתה מורכבת עם joins
def find_with_stats(self): ... # שאילתה מאוד מורכבת
def save(self, task): ... # פקודה
def update(self, task): ... # פקודה
def delete(self, task_id): ... # פקודה
הבעיה: שאילתות מורכבות דורשות joins, aggregations ואינדקסים שונים מאלה שכתיבה דורשת. הניסיון לאופטם אותן יחד מקשה על שניהם.
עם CQRS¶
# Command Side - מודל כתיבה
class TaskCommandModel:
def create(self, task: dict) -> int: ...
def update_status(self, task_id: int, status: str): ...
def delete(self, task_id: int): ...
# Query Side - מודל קריאה (יכול להיות DB שונה!)
class TaskQueryModel:
def get_task_with_project_info(self, task_id: int) -> dict: ...
def get_dashboard_summary(self, owner_id: int) -> dict: ...
def get_overdue_with_assignee_details(self) -> list[dict]: ...
def search_full_text(self, query: str) -> list[dict]: ...
CQRS בפועל¶
# Command Handler
class CreateTaskCommandHandler:
def __init__(self, command_repo: TaskCommandModel):
self.repo = command_repo
def handle(self, title: str, owner_id: int) -> int:
if len(title) < 3:
raise ValueError("כותרת קצרה מדי")
return self.repo.create({"title": title, "owner_id": owner_id, "status": "todo"})
# Query Handler
class GetDashboardQueryHandler:
def __init__(self, query_repo: TaskQueryModel):
self.repo = query_repo
def handle(self, owner_id: int) -> dict:
return self.repo.get_dashboard_summary(owner_id)
# Routes
@router.post("/tasks")
def create_task(title: str, owner_id: int):
task_id = create_task_handler.handle(title, owner_id)
return {"id": task_id}
@router.get("/dashboard")
def get_dashboard(owner_id: int):
return dashboard_handler.handle(owner_id)
CQRS עם מסדי נתונים שונים¶
בגרסה המתקדמת, ה-Command side וה-Query side משתמשים במסדי נתונים שונים:
כשמשימה נוצרת:
1. נשמרת ב-PostgreSQL (כתיבה)
2. Event "TaskCreated" מתפרסם
3. Consumer מעדכן את Elasticsearch ו-Redis (קריאה)
Event Sourcing - מקור אירועים¶
הרעיון הבסיסי: במקום לשמור את המצב הנוכחי, שמרו את רצף האירועים שהגיע אליו.
מצב נוכחי לעומת אירועים¶
מסד נתונים רגיל:
tasks: id=1, title="תקן באג", status="done"
Event Sourcing:
events:
1. TaskCreated(id=1, title="תקן באג", owner=5)
2. TaskAssigned(task_id=1, assignee=7)
3. TaskStatusChanged(task_id=1, status="in_progress")
4. CommentAdded(task_id=1, text="עובד על זה")
5. TaskStatusChanged(task_id=1, status="done")
כדי לדעת המצב הנוכחי של משימה - "מנגנים" את האירועים ברצף.
מימוש¶
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Event:
event_type: str
task_id: int
data: dict
timestamp: datetime = field(default_factory=datetime.now)
class EventStore:
def __init__(self):
self._events: list[Event] = []
def append(self, event: Event):
self._events.append(event)
def get_events_for_task(self, task_id: int) -> list[Event]:
return [e for e in self._events if e.task_id == task_id]
class Task:
def __init__(self):
self.id: int | None = None
self.title: str = ""
self.status: str = ""
self.assignee_id: int | None = None
self.comments: list[str] = []
def apply(self, event: Event):
"""מחיל אירוע אחד על המצב"""
if event.event_type == "TaskCreated":
self.id = event.task_id
self.title = event.data["title"]
self.status = "todo"
elif event.event_type == "TaskAssigned":
self.assignee_id = event.data["assignee_id"]
elif event.event_type == "TaskStatusChanged":
self.status = event.data["status"]
elif event.event_type == "CommentAdded":
self.comments.append(event.data["text"])
@classmethod
def from_events(cls, events: list[Event]) -> "Task":
"""משחזר את המצב מרצף אירועים"""
task = cls()
for event in events:
task.apply(event)
return task
class TaskCommandService:
def __init__(self, event_store: EventStore):
self.store = event_store
def create_task(self, task_id: int, title: str, owner_id: int):
self.store.append(Event("TaskCreated", task_id, {"title": title, "owner_id": owner_id}))
def assign_task(self, task_id: int, assignee_id: int):
self.store.append(Event("TaskAssigned", task_id, {"assignee_id": assignee_id}))
def change_status(self, task_id: int, status: str):
self.store.append(Event("TaskStatusChanged", task_id, {"status": status}))
def get_task(self, task_id: int) -> Task:
events = self.store.get_events_for_task(task_id)
if not events:
raise ValueError(f"משימה {task_id} לא נמצאה")
return Task.from_events(events)
# שימוש
store = EventStore()
service = TaskCommandService(store)
service.create_task(1, "תקן באג", owner_id=5)
service.assign_task(1, assignee_id=7)
service.change_status(1, "in_progress")
service.change_status(1, "done")
task = service.get_task(1)
print(task.status) # "done"
print(task.assignee_id) # 7
# ניתן לדעת מה המצב היה בכל נקודת זמן!
events_until_assignment = store.get_events_for_task(1)[:2]
task_before_assignment = Task.from_events(events_until_assignment)
print(task_before_assignment.assignee_id) # None
יתרונות Event Sourcing¶
Audit Trail מלא: יודעים בדיוק מה קרה, מתי, ומי עשה מה.
Time Travel: אפשר לשחזר את המצב בכל נקודת זמן.
Debug: כשיש באג - יש לכם את כל ההיסטוריה.
Projections: אפשר לבנות views שונים מאותם אירועים.
חסרונות Event Sourcing¶
מורכבות: הרבה יותר קוד מDB רגיל.
performance: שחזור מצב דורש replay של כל האירועים (נפתר עם snapshots).
לא מתאים לכל: מערכות רבות לא צריכות היסטוריה מלאה.
סיכום¶
| דפוס | פותר | מורכבות |
|---|---|---|
| CQRS | קריאה וכתיבה בעלי דרישות שונות | בינונית |
| Event Sourcing | צורך ב-audit trail ומצב היסטורי | גבוהה |
השתמשו בהם כשיש צורך אמיתי - לא כי הם "מגניבים".