3.2 ארכיטקטורת שכבות פתרון
תרגיל 1 - זיהוי הפרות - פתרון¶
הפרה 1: גישה ישירה ל-sqlite3 בתוך route.
- שייך ל: Presentation Layer
- "דלף" ל: Data Access Layer
הפרה 2: לוגיקה עסקית (בדיקת לא יותר מ-50 משימות) בתוך route.
- שייך ל: Presentation Layer
- "דלף" ל: Business Logic Layer
הפרה 3: שליחת מייל (smtplib) ישירות בתוך route.
- שייך ל: Presentation Layer
- "דלף" ל: Business Logic Layer (ו-Infrastructure Layer)
תרגיל 2 - ארגון מחדש - פתרון¶
# repositories/task_repository.py
import sqlite3
class TaskRepository:
def __init__(self, db_path: str = "taskflow.db"):
self._db_path = db_path
def count_open_by_owner(self, owner_id: int) -> int:
db = sqlite3.connect(self._db_path)
return db.execute(
"SELECT count(*) FROM tasks WHERE owner_id=? AND status != 'done'",
(owner_id,)
).fetchone()[0]
def save(self, title: str, owner_id: int) -> dict:
db = sqlite3.connect(self._db_path)
cursor = db.execute(
"INSERT INTO tasks (title, status, owner_id) VALUES (?, 'todo', ?)",
(title, owner_id)
)
db.commit()
return {"id": cursor.lastrowid, "title": title, "status": "todo", "owner_id": owner_id}
# repositories/user_repository.py
import sqlite3
class UserRepository:
def __init__(self, db_path: str = "taskflow.db"):
self._db_path = db_path
def find_by_id(self, user_id: int) -> dict | None:
db = sqlite3.connect(self._db_path)
row = db.execute("SELECT id, username, email FROM users WHERE id=?", (user_id,)).fetchone()
if not row:
return None
return {"id": row[0], "username": row[1], "email": row[2]}
# services/task_service.py
class TaskService:
MAX_OPEN_TASKS = 50
def __init__(self):
self._task_repo = TaskRepository()
self._user_repo = UserRepository()
self._email = EmailService()
def create_task(self, title: str, owner_id: int) -> dict:
user = self._user_repo.find_by_id(owner_id)
if not user:
raise ValueError(f"משתמש {owner_id} לא נמצא")
open_count = self._task_repo.count_open_by_owner(owner_id)
if open_count >= self.MAX_OPEN_TASKS:
raise ValueError(f"לא ניתן ליצור יותר מ-{self.MAX_OPEN_TASKS} משימות פתוחות")
task = self._task_repo.save(title, owner_id)
self._email.send(user["email"], f"משימה חדשה: {title}")
return task
# routes/tasks.py
from fastapi import APIRouter, HTTPException
router = APIRouter()
task_service = TaskService()
@router.post("/tasks", status_code=201)
def create_task(title: str, owner_id: int):
try:
return task_service.create_task(title, owner_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
תרגיל 3 - הוספת פיצ'ר - פתרון¶
# repositories/task_repository.py - הוספה
from datetime import date
def find_overdue(self, owner_id: int) -> list[dict]:
today = date.today().isoformat()
db = sqlite3.connect(self._db_path)
rows = db.execute(
"""SELECT id, title, description, status, due_date
FROM tasks
WHERE owner_id=? AND status != 'done' AND due_date < ?""",
(owner_id, today)
).fetchall()
return [
{"id": r[0], "title": r[1], "description": r[2],
"status": r[3], "due_date": r[4]}
for r in rows
]
# services/task_service.py - הוספה
def get_overdue_tasks(self, owner_id: int) -> list[dict]:
user = self._user_repo.find_by_id(owner_id)
if not user:
raise ValueError(f"משתמש {owner_id} לא נמצא")
return self._task_repo.find_overdue(owner_id)
# routes/tasks.py - הוספה
@router.get("/tasks/overdue")
def get_overdue_tasks(owner_id: int):
try:
tasks = task_service.get_overdue_tasks(owner_id)
return [
{"id": t["id"], "title": t["title"], "status": t["status"]}
for t in tasks
]
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))