לדלג לתוכן

הוספה לפרויקט:
- טבלת משתמשים + מודל + סכמות
- פונקציות register + login
- יצירת JWT עם expiry
- תלות (dependency) שנקראת get_current_user
- הוספת אותוריזציה לכל פעולות ה־POST


1. עדכון models.py — הוספת טבלת משתמשים

from sqlalchemy import Column, Integer, String, Table, ForeignKey
from sqlalchemy.orm import relationship
from .database import Base

# משתמשים
class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    username = Column(String, unique=True, nullable=False)
    password_hash = Column(String, nullable=False)

2. עדכון schemas.py — סכמות register/login

from pydantic import BaseModel, ConfigDict

class UserBase(BaseModel):
    username: str

class UserCreate(UserBase):
    password: str

class UserRead(UserBase):
    id: int
    model_config = ConfigDict(from_attributes=True)

class UserLogin(BaseModel):
    username: str
    password: str

3. יצירת security.py — JWT + hashing

קובץ חדש: app/security.py

from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError
from passlib.context import CryptContext

SECRET_KEY = "VERY_SECRET_KEY_CHANGE_ME"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str):
    return pwd_context.hash(password)

def verify_password(password: str, hashed: str):
    return pwd_context.verify(password, hashed)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()

    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)

    to_encode.update({"exp": expire})

    token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return token

def decode_token(token: str):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        return payload
    except JWTError:
        return None

4. עדכון controllers.py — register + login + get_current_user

from fastapi.security import OAuth2PasswordBearer
from .security import hash_password, verify_password, create_access_token, decode_token
from fastapi import Depends

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")

def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(lambda: None)):
    payload = decode_token(token)
    if not payload:
        raise HTTPException(status_code=401, detail="Invalid or expired token")

    username = payload.get("sub")
    user = db.query(models.User).filter(models.User.username == username).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")

    return user

פונקציות משתמשים

# ---------------------- Users ----------------------
def register_user(user_in: schemas.UserCreate, db: Session):
    exists = db.query(models.User).filter(models.User.username == user_in.username).first()
    if exists:
        raise HTTPException(status_code=400, detail="Username already exists")

    hashed = hash_password(user_in.password)
    user = models.User(username=user_in.username, password_hash=hashed)

    db.add(user)
    db.commit()
    db.refresh(user)
    return user


def login_user(user_in: schemas.UserLogin, db: Session):
    user = db.query(models.User).filter(models.User.username == user_in.username).first()
    if not user or not verify_password(user_in.password, user.password_hash):
        raise HTTPException(status_code=401, detail="Invalid username or password")

    token = create_access_token({"sub": user.username, "user_id": user.id})
    return {"access_token": token, "token_type": "bearer"}

5. login + register

from fastapi.security import OAuth2PasswordRequestForm

@app.post("/register", response_model=schemas.UserRead)
def register_route(user_in: schemas.UserCreate, db: Session = Depends(get_db)):
    return controllers.register_user(user_in, db)


@app.post("/login")
def login_route(user: schemas.UserLogin, db: Session = Depends(get_db)):
    return controllers.login_user(user, db)

6. הוספת אותוריזציה

מוסיפים
current_user: models.User = Depends(controllers.get_current_user)

לכל יצירה של מלון/טיסה/אטרקציה/טיול.

דוגמה (מלונות)

@app.post("/hotels", response_model=schemas.HotelRead, status_code=status.HTTP_201_CREATED)
def create_hotel_route(
    hotel_in: schemas.HotelCreate,
    db: Session = Depends(get_db),
    current_user = Depends(controllers.get_current_user)
):
    return controllers.create_hotel(hotel_in, db)

עושים אותו הדבר ל־POST:

  • /flights

  • /attractions

  • /trips


7. בדיקת login + protected routes

  1. register:
curl -X POST http://127.0.0.1:8000/register \
  -H "Content-Type: application/json" \
  -d '{"username":"amit","password":"1234"}'
  1. login (זה Token):
curl -X POST http://127.0.0.1:8000/login \
  -H "Content-Type: application/x-www-form-urlencoded" \
  -d "username=amit&password=1234"
  1. יצירת מלון (דורש Authorization):
curl -X POST http://127.0.0.1:8000/hotels \
  -H "Authorization: Bearer TOKEN_HERE" \
  -H "Content-Type: application/json" \
  -d '{"name":"Hilton","city":"London"}'