לדלג לתוכן

3.2 ארכיטקטורת שכבות פתרון

תרגיל 1 - זיהוי הפרות - פתרון

הפרה 1: גישה ישירה ל-sqlite3 בתוך route.
- שייך ל: Presentation Layer
- "דלף" ל: Data Access Layer

הפרה 2: לוגיקה עסקית (בדיקת לא יותר מ-50 משימות) בתוך route.
- שייך ל: Presentation Layer
- "דלף" ל: Business Logic Layer

הפרה 3: שליחת מייל (smtplib) ישירות בתוך route.
- שייך ל: Presentation Layer
- "דלף" ל: Business Logic Layer (ו-Infrastructure Layer)


תרגיל 2 - ארגון מחדש - פתרון

# repositories/task_repository.py
import sqlite3


class TaskRepository:
    def __init__(self, db_path: str = "taskflow.db"):
        self._db_path = db_path

    def count_open_by_owner(self, owner_id: int) -> int:
        db = sqlite3.connect(self._db_path)
        return db.execute(
            "SELECT count(*) FROM tasks WHERE owner_id=? AND status != 'done'",
            (owner_id,)
        ).fetchone()[0]

    def save(self, title: str, owner_id: int) -> dict:
        db = sqlite3.connect(self._db_path)
        cursor = db.execute(
            "INSERT INTO tasks (title, status, owner_id) VALUES (?, 'todo', ?)",
            (title, owner_id)
        )
        db.commit()
        return {"id": cursor.lastrowid, "title": title, "status": "todo", "owner_id": owner_id}


# repositories/user_repository.py
import sqlite3


class UserRepository:
    def __init__(self, db_path: str = "taskflow.db"):
        self._db_path = db_path

    def find_by_id(self, user_id: int) -> dict | None:
        db = sqlite3.connect(self._db_path)
        row = db.execute("SELECT id, username, email FROM users WHERE id=?", (user_id,)).fetchone()
        if not row:
            return None
        return {"id": row[0], "username": row[1], "email": row[2]}


# services/task_service.py
class TaskService:
    MAX_OPEN_TASKS = 50

    def __init__(self):
        self._task_repo = TaskRepository()
        self._user_repo = UserRepository()
        self._email = EmailService()

    def create_task(self, title: str, owner_id: int) -> dict:
        user = self._user_repo.find_by_id(owner_id)
        if not user:
            raise ValueError(f"משתמש {owner_id} לא נמצא")

        open_count = self._task_repo.count_open_by_owner(owner_id)
        if open_count >= self.MAX_OPEN_TASKS:
            raise ValueError(f"לא ניתן ליצור יותר מ-{self.MAX_OPEN_TASKS} משימות פתוחות")

        task = self._task_repo.save(title, owner_id)
        self._email.send(user["email"], f"משימה חדשה: {title}")
        return task


# routes/tasks.py
from fastapi import APIRouter, HTTPException

router = APIRouter()
task_service = TaskService()


@router.post("/tasks", status_code=201)
def create_task(title: str, owner_id: int):
    try:
        return task_service.create_task(title, owner_id)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

תרגיל 3 - הוספת פיצ'ר - פתרון

# repositories/task_repository.py - הוספה
from datetime import date


def find_overdue(self, owner_id: int) -> list[dict]:
    today = date.today().isoformat()
    db = sqlite3.connect(self._db_path)
    rows = db.execute(
        """SELECT id, title, description, status, due_date
           FROM tasks
           WHERE owner_id=? AND status != 'done' AND due_date < ?""",
        (owner_id, today)
    ).fetchall()
    return [
        {"id": r[0], "title": r[1], "description": r[2],
         "status": r[3], "due_date": r[4]}
        for r in rows
    ]


# services/task_service.py - הוספה
def get_overdue_tasks(self, owner_id: int) -> list[dict]:
    user = self._user_repo.find_by_id(owner_id)
    if not user:
        raise ValueError(f"משתמש {owner_id} לא נמצא")
    return self._task_repo.find_overdue(owner_id)


# routes/tasks.py - הוספה
@router.get("/tasks/overdue")
def get_overdue_tasks(owner_id: int):
    try:
        tasks = task_service.get_overdue_tasks(owner_id)
        return [
            {"id": t["id"], "title": t["title"], "status": t["status"]}
            for t in tasks
        ]
    except ValueError as e:
        raise HTTPException(status_code=404, detail=str(e))