From 45e7d98b4a2f8e93abcc603d6b4c809a00d20355 Mon Sep 17 00:00:00 2001
From: cod3SK
@@ -17,6 +17,7 @@ ADNS is an end-to-end demo of a modern network anomaly detection platform. It in
| Frontend dashboard | `frontend/adns-frontend/` | Vite/React UI with anomaly charts, severity donut, and attack simulation buttons. |
| ML lab | `ml/` | Preprocessing scripts (`preprocess/`), meta-model notebooks, and `train_flow_detector.py` for the live scorer. |
| Model artifacts | `api/model_artifacts/` | `meta_model_combined.joblib` (ExtraTrees+XGBoost) + `flow_detector.joblib` (sklearn pipeline). |
+| Attack generator | `core/attack_generator.py` | Stdlib-only CLI; generates synthetic attack flows and POSTs them to `/ingest` for live demo runs. |
| Ops | `deployment/`, `worker/`, `assets/` | Systemd units, scripts, and misc assets. Research docs live in `docs/`. |
Generated datasets live under `data/`, and derived artifacts (clean CSVs, model outputs) live under `outputs/`; both are gitignored to keep the repo lean.
@@ -183,13 +184,25 @@ Copy the resulting artifacts (both `flow_detector.joblib` and `meta_model_combin
## Demo Tips
-- Use the **Attack Simulation Controls** at the top of the dashboard to trigger Attack, Scanning, DoS, DDoS, or Injection scenarios. They call `/api/simulate`, inject synthetic flows, and immediately refresh the charts/donut.
- The **Threat Timeline** and **Severity Mix** donut help narrate how the model responds as traffic changes.
-- `POST /api/simulate` can also be driven via scripts/cURL for automation:
+- To inject synthetic attack traffic, use the CLI tool in `core/attack_generator.py` (requires only stdlib — no Flask deps):
```bash
-curl -X POST http://localhost:5000/simulate -H 'Content-Type: application/json' \
- -d '{"type":"ddos","count":80}'
+# One-shot batch of 80 DDoS flows
+python core/attack_generator.py --type ddos --count 80
+
+# Stream injection flows for 2 minutes
+python core/attack_generator.py --type injection --duration 120 --interval 1
+
+# Supported types: attack, scanning, dos, ddos, injection
+```
+
+- `POST /ingest` (which the generator targets) can also be called directly via cURL:
+
+```bash
+# Minimal single-flow ingest
+curl -X POST http://localhost:5000/ingest -H 'Content-Type: application/json' \
+ -d '[{"src_ip":"10.0.0.1","dst_ip":"8.8.8.8","proto":"TCP","bytes":50000}]'
```
## Testing
diff --git a/api/app.py b/api/app.py
index 07a65c8..2c09278 100644
--- a/api/app.py
+++ b/api/app.py
@@ -3,12 +3,13 @@
import random
import shutil
import subprocess
+import sys
import threading
import time
from datetime import datetime, timedelta, timezone
from functools import wraps
-from flask import Flask, jsonify, request
+from flask import Flask, jsonify, request, send_from_directory
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import inspect, text
@@ -37,6 +38,10 @@
# token is unset the endpoints stay disabled — fail closed rather than open.
ADMIN_TOKEN = os.environ.get("ADNS_ADMIN_TOKEN", "").strip()
+# Prevents multiple concurrent streaming threads from accumulating (OOM risk)
+_STREAM_LOCK = threading.Lock()
+_stream_active = False
+
def _extract_request_token() -> str:
auth = request.headers.get("Authorization", "")
@@ -251,18 +256,41 @@ def ensure_killswitch_rules_enabled(enabled: bool) -> None:
_run_cmd(["iptables", "-D", "INPUT", "-i", iface, "-j", "DROP"])
-def block_ip_os(ip: str, allow: bool = False) -> tuple[bool, str]:
- """
- Apply or remove a DROP rule for the given source IP. Best effort; requires NET_ADMIN.
- """
+def _block_ip_windows(ip: str, allow: bool) -> tuple[bool, str]:
+ """Windows Firewall via netsh advfirewall. Requires an elevated (Administrator) process."""
+ rule_in = f"ADNS Block {ip} in"
+ rule_out = f"ADNS Block {ip} out"
+ all_ok = True
+
+ for rule_name, direction in ((rule_in, "in"), (rule_out, "out")):
+ check_cmd = ["netsh", "advfirewall", "firewall", "show", "rule", f"name={rule_name}"]
+ exists, _ = _run_cmd(check_cmd)
+ if allow:
+ if exists:
+ ok, msg = _run_cmd(["netsh", "advfirewall", "firewall", "delete", "rule", f"name={rule_name}"])
+ all_ok = all_ok and ok
+ else:
+ if not exists:
+ ok, msg = _run_cmd([
+ "netsh", "advfirewall", "firewall", "add", "rule",
+ f"name={rule_name}",
+ f"dir={direction}",
+ "action=block",
+ f"remoteip={ip}",
+ ])
+ all_ok = all_ok and ok
+
+ return all_ok, "unblocked" if allow else "blocked"
+
+
+def _block_ip_iptables(ip: str, allow: bool) -> tuple[bool, str]:
+ """iptables-based blocking for Linux. Requires NET_ADMIN / root."""
rules = [
- # Drop anything coming from the IP
(
["iptables", "-C", "INPUT", "-s", ip, "-j", "DROP"],
["iptables", "-I", "INPUT", "-s", ip, "-j", "DROP"],
["iptables", "-D", "INPUT", "-s", ip, "-j", "DROP"],
),
- # Drop anything going to the IP
(
["iptables", "-C", "OUTPUT", "-d", ip, "-j", "DROP"],
["iptables", "-I", "OUTPUT", "-d", ip, "-j", "DROP"],
@@ -282,17 +310,22 @@ def block_ip_os(ip: str, allow: bool = False) -> tuple[bool, str]:
if msg:
messages.append(msg)
continue
-
if exists:
continue
-
ok, msg = _run_cmd(add_cmd)
all_ok = all_ok and ok
if msg:
messages.append(msg)
detail = "; ".join(messages) if messages else ""
- return all_ok, detail or ("allow" if allow else "blocked")
+ return all_ok, detail or ("unblocked" if allow else "blocked")
+
+
+def block_ip_os(ip: str, allow: bool = False) -> tuple[bool, str]:
+ """Apply or remove a firewall rule for the given IP. Best effort; requires elevated privileges."""
+ if sys.platform == "win32":
+ return _block_ip_windows(ip, allow)
+ return _block_ip_iptables(ip, allow)
simulation_detector = DetectionEngine()
@@ -791,57 +824,71 @@ def simulate_attack():
interval_seconds = max(0.5, min(interval_seconds, 5.0))
if duration_seconds > 0:
+ global _stream_active
+ with _STREAM_LOCK:
+ if _stream_active:
+ return (
+ jsonify({"error": "a streaming simulation is already running; wait for it to finish"}),
+ 409,
+ )
+ _stream_active = True
batch_size = max(5, min(count, 200))
def _stream_simulation() -> None:
- deadline = time.time() + duration_seconds
- with app.app_context():
- total_generated = 0
- while time.time() < deadline:
- flows = generate_attack_flows(attack_type, batch_size)
- for flow in flows:
- db.session.add(flow)
- db.session.flush()
-
- for flow in flows:
- pred = simulation_detector.predict(db.session, flow)
- if isinstance(pred, (list, tuple)) and len(pred) == 3:
- score, label, attack_label = pred
- else:
- score, label = pred
- attack_label = None
- base_labels = {"normal", "watch", "anomaly"}
- candidate_attack = None
- if label and label.lower() not in base_labels:
- candidate_attack = label
- elif attack_label and label and label.lower() != "normal":
- candidate_attack = attack_label
- elif label and label.lower() in {"normal", "watch"}:
- candidate_attack = _infer_scanning(flow)
- extras = dict(flow.extra or {})
- if candidate_attack and candidate_attack.lower() not in base_labels:
- extras["attack_type"] = candidate_attack
- else:
- extras.pop("attack_type", None)
- flow.extra = extras
- db.session.add(
- Prediction(
- flow_id=flow.id,
- score=score,
- label=label,
- created_at=datetime.now(timezone.utc),
+ global _stream_active
+ try:
+ deadline = time.time() + duration_seconds
+ with app.app_context():
+ total_generated = 0
+ while time.time() < deadline:
+ flows = generate_attack_flows(attack_type, batch_size)
+ for flow in flows:
+ db.session.add(flow)
+ db.session.flush()
+
+ for flow in flows:
+ pred = simulation_detector.predict(db.session, flow)
+ if isinstance(pred, (list, tuple)) and len(pred) == 3:
+ score, label, attack_label = pred
+ else:
+ score, label = pred
+ attack_label = None
+ base_labels = {"normal", "watch", "anomaly"}
+ candidate_attack = None
+ if label and label.lower() not in base_labels:
+ candidate_attack = label
+ elif attack_label and label and label.lower() != "normal":
+ candidate_attack = attack_label
+ elif label and label.lower() in {"normal", "watch"}:
+ candidate_attack = _infer_scanning(flow)
+ extras = dict(flow.extra or {})
+ if candidate_attack and candidate_attack.lower() not in base_labels:
+ extras["attack_type"] = candidate_attack
+ else:
+ extras.pop("attack_type", None)
+ flow.extra = extras
+ db.session.add(
+ Prediction(
+ flow_id=flow.id,
+ score=score,
+ label=label,
+ created_at=datetime.now(timezone.utc),
+ )
)
- )
-
- db.session.commit()
- enforce_flow_retention()
- total_generated += len(flows)
- time.sleep(interval_seconds)
- app.logger.info(
- "completed streaming simulate: %s flows over %s seconds",
- total_generated,
- duration_seconds,
- )
+
+ db.session.commit()
+ db.session.expunge_all() # free identity-map refs between batches
+ enforce_flow_retention()
+ total_generated += len(flows)
+ time.sleep(interval_seconds)
+ app.logger.info(
+ "completed streaming simulate: %s flows over %s seconds",
+ total_generated,
+ duration_seconds,
+ )
+ finally:
+ with _STREAM_LOCK:
+ _stream_active = False
threading.Thread(target=_stream_simulation, daemon=True).start()
return jsonify(
@@ -993,7 +1040,6 @@ def anomalies():
@app.post("/block_ip")
-@require_admin_token
def block_ip():
payload = request.get_json(silent=True) or {}
ip = str(payload.get("ip") or "").strip()
@@ -1009,8 +1055,13 @@ def block_ip():
db.session.add(BlockedIP(ip=ip, active=True, created_at=now))
db.session.commit()
- ok, msg = block_ip_os(ip, allow=False)
- return jsonify({"status": "blocked", "ip": ip, "os_action": "ok" if ok else "failed", "detail": msg})
+ # OS-level iptables block — only attempted when admin token is configured and caller provides it
+ os_status = "not_configured"
+ provided = _extract_request_token()
+ if ADMIN_TOKEN and provided and hmac.compare_digest(provided, ADMIN_TOKEN):
+ ok, msg = block_ip_os(ip, allow=False)
+ os_status = "ok" if ok else "failed"
+ return jsonify({"status": "blocked", "ip": ip, "os_action": os_status})
@app.get("/blocked_ips")
@@ -1021,7 +1072,6 @@ def blocked_ips():
@app.post("/unblock_ip")
-@require_admin_token
def unblock_ip():
payload = request.get_json(silent=True) or {}
ip = str(payload.get("ip") or "").strip()
@@ -1032,8 +1082,13 @@ def unblock_ip():
if record:
record.active = False
db.session.commit()
- ok, msg = block_ip_os(ip, allow=True)
- return jsonify({"status": "unblocked", "ip": ip, "os_action": "ok" if ok else "failed", "detail": msg})
+
+ os_status = "not_configured"
+ provided = _extract_request_token()
+ if ADMIN_TOKEN and provided and hmac.compare_digest(provided, ADMIN_TOKEN):
+ ok, msg = block_ip_os(ip, allow=True)
+ os_status = "ok" if ok else "failed"
+ return jsonify({"status": "unblocked", "ip": ip, "os_action": os_status})
@app.route("/killswitch", methods=["GET", "POST"])
@@ -1052,6 +1107,23 @@ def killswitch():
return jsonify({"enabled": bool(KILL_SWITCH_STATE.get("enabled", False))})
+# ---------------------------------------------------------------
+# Frontend static file serving (desktop / self-contained mode)
+# Set ADNS_FRONTEND_DIST to the React dist/ directory to enable.
+# In dev mode the Vite dev server handles this; this route is a no-op.
+# ---------------------------------------------------------------
+@app.route("/", defaults={"path": ""})
+@app.route("/