הוספה לפרויקט:
- טבלת משתמשים + מודל + סכמות
- פונקציות register + login
- יצירת JWT עם expiry
- תלות (dependency) שנקראת get_current_user
- הוספת אותוריזציה לכל פעולות ה־POST
1. עדכון models.py — הוספת טבלת משתמשים¶
from sqlalchemy import Column, Integer, String, Table, ForeignKey
from sqlalchemy.orm import relationship
from .database import Base
# משתמשים
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, nullable=False)
password_hash = Column(String, nullable=False)
2. עדכון schemas.py — סכמות register/login¶
from pydantic import BaseModel, ConfigDict
class UserBase(BaseModel):
username: str
class UserCreate(UserBase):
password: str
class UserRead(UserBase):
id: int
model_config = ConfigDict(from_attributes=True)
class UserLogin(BaseModel):
username: str
password: str
3. יצירת security.py — JWT + hashing¶
קובץ חדש: app/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import jwt, JWTError
from passlib.context import CryptContext
SECRET_KEY = "VERY_SECRET_KEY_CHANGE_ME"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str):
return pwd_context.hash(password)
def verify_password(password: str, hashed: str):
return pwd_context.verify(password, hashed)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return token
def decode_token(token: str):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
return None
4. עדכון controllers.py — register + login + get_current_user¶
from fastapi.security import OAuth2PasswordBearer
from .security import hash_password, verify_password, create_access_token, decode_token
from fastapi import Depends
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login")
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(lambda: None)):
payload = decode_token(token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid or expired token")
username = payload.get("sub")
user = db.query(models.User).filter(models.User.username == username).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
פונקציות משתמשים¶
# ---------------------- Users ----------------------
def register_user(user_in: schemas.UserCreate, db: Session):
exists = db.query(models.User).filter(models.User.username == user_in.username).first()
if exists:
raise HTTPException(status_code=400, detail="Username already exists")
hashed = hash_password(user_in.password)
user = models.User(username=user_in.username, password_hash=hashed)
db.add(user)
db.commit()
db.refresh(user)
return user
def login_user(user_in: schemas.UserLogin, db: Session):
user = db.query(models.User).filter(models.User.username == user_in.username).first()
if not user or not verify_password(user_in.password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid username or password")
token = create_access_token({"sub": user.username, "user_id": user.id})
return {"access_token": token, "token_type": "bearer"}
5. login + register¶
from fastapi.security import OAuth2PasswordRequestForm
@app.post("/register", response_model=schemas.UserRead)
def register_route(user_in: schemas.UserCreate, db: Session = Depends(get_db)):
return controllers.register_user(user_in, db)
@app.post("/login")
def login_route(user: schemas.UserLogin, db: Session = Depends(get_db)):
return controllers.login_user(user, db)
6. הוספת אותוריזציה¶
מוסיפים
current_user: models.User = Depends(controllers.get_current_user)
לכל יצירה של מלון/טיסה/אטרקציה/טיול.
דוגמה (מלונות)¶
@app.post("/hotels", response_model=schemas.HotelRead, status_code=status.HTTP_201_CREATED)
def create_hotel_route(
hotel_in: schemas.HotelCreate,
db: Session = Depends(get_db),
current_user = Depends(controllers.get_current_user)
):
return controllers.create_hotel(hotel_in, db)
עושים אותו הדבר ל־POST:
-
/flights -
/attractions -
/trips
7. בדיקת login + protected routes¶
- register:
curl -X POST http://127.0.0.1:8000/register \
-H "Content-Type: application/json" \
-d '{"username":"amit","password":"1234"}'
- login (זה Token):
curl -X POST http://127.0.0.1:8000/login \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "username=amit&password=1234"
- יצירת מלון (דורש Authorization):