1502 lines · 55,242 bytes · MIT License · Zero dependencies
#!/usr/bin/env python3
"""
╔═══════════════════════════════════════════════════════════════════╗
║ CryptoAnnihilator — Behavioral Crypto Mining Detector & Killer ║
║ By Rav-n-Vic — Open Source Crypto Defense ║
║ License: MIT ║
╚═══════════════════════════════════════════════════════════════════╝
Detects and kills crypto miners by their BEHAVIOR, not their signature.
Works against known, unknown, custom-compiled, and obfuscated miners.
Why this works when ClamAV doesn't:
ClamAV recognizes virus DNA. Hackers just change the DNA.
CryptoAnnihilator recognizes sick behavior. You can't mine without it.
Detection Layers:
1. NETWORK — Stratum protocol pattern matching (mining.submit, etc.)
2. BEHAVIOR — Sustained high CPU + active outbound connections
3. SIGNATURES — Known miner names + mining pool DNS blocking
4. FINGERPRINT — Connection behavioral fingerprinting (catches evasive miners)
Zero pip dependencies. Python 3.6+ stdlib only. Single file deployment.
Usage:
# Scan once and report (dry run):
sudo python3 crypto_annihilator.py
# Scan and kill detected miners:
sudo python3 crypto_annihilator.py --kill
# Run as a continuous daemon:
sudo python3 crypto_annihilator.py --kill --daemon
# Install network-level firewall rules:
sudo python3 crypto_annihilator.py --install-firewall
# Install DNS-level mining pool blocks:
sudo python3 crypto_annihilator.py --install-dns-block
# Full protection (all layers, kill mode, daemon):
sudo python3 crypto_annihilator.py --kill --daemon --install-firewall --install-dns-block
# Event integration (external SIEM/alerting):
sudo python3 crypto_annihilator.py --kill --daemon --event-dir /var/run/event/events
"""
import argparse
import glob
import json
import logging
import os
import re
import signal
import socket
import subprocess
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
__version__ = "1.0.0"
__author__ = "Rav-n-Vic"
__license__ = "MIT"
# ─── CONFIGURATION ──────────────────────────────────────────────────
# Known Stratum protocol patterns (case-insensitive matching)
STRATUM_PATTERNS = [
b"mining.submit",
b"mining.subscribe",
b"mining.authorize",
b"mining.notify",
b"mining.set_difficulty",
b"mining.set_target",
b"mining.set_extranonce",
b"eth_submitWork",
b"eth_getWork",
b"eth_submitHashrate",
b"login", # XMR Stratum uses generic "login" with specific params
]
# Known mining pool ports
MINING_PORTS = {
3333, 3334, 3335, 3336, # Common Stratum
4444, 4443, # Common Stratum SSL
5555, 5556, # Monero pools
6666, 6667, # Alt pools
7777, 7778, # Alt pools
8332, 8333, # Bitcoin RPC/P2P
8899, # Solana
9332, 9333, # Litecoin
10034, # MetaMiner
14433, 14444, # Stratum SSL
20535, 20536, # GRIN
24443, # NiceHash SSL
30301, 30303, # Ethereum P2P
42000, 42069, # Various pools
45560, 45700, # Monero pools
}
# Mining pool domain patterns (regex)
MINING_DOMAIN_PATTERNS = [
r".*pool\.(com|org|net|io|cc|dev|xyz)$",
r".*mining\.(com|org|net|io|cc|dev)$",
r".*stratum.*",
r".*\.nicehash\.com$",
r".*\.nanopool\.org$",
r".*\.2miners\.com$",
r".*\.ethermine\.org$",
r".*\.f2pool\.(com|io)$",
r".*\.antpool\.com$",
r".*\.viabtc\.com$",
r".*\.poolin\.(com|me)$",
r".*\.slushpool\.com$",
r".*\.braiins\.(com|os)$",
r".*\.flexpool\.(io|org)$",
r".*\.hiveon\.(com|net)$",
r".*\.minergate\.com$",
r".*\.hashvault\.pro$",
r".*\.supportxmr\.com$",
r".*\.moneroocean\.stream$",
r".*\.herominers\.com$",
r".*\.unmineable\.com$",
r".*\.crazypool\.org$",
r".*\.woolypooly\.com$",
r".*\.ravenminer\.com$",
r".*\.zergpool\.com$",
r".*\.zpool\.(ca|io)$",
r".*\.prohashing\.com$",
r".*\.miningpoolhub\.com$",
r".*\.hashrate\.to$",
r".*\.litecoinpool\.org$",
r".*\.dwarfpool\.com$",
r".*\.coinfoundry\.org$",
]
# Known miner process names (supplementary — behavior detection is primary)
KNOWN_MINER_NAMES = {
"xmrig", "xmr-stak", "cpuminer", "cgminer", "bfgminer",
"minerd", "minergate", "ethminer", "t-rex", "teamredminer",
"phoenixminer", "lolminer", "nbminer", "gminer", "nanominer",
"srbminer", "wildrig", "ccminer", "bminer", "claymore",
"ewbf", "dstm", "excavator", "nicehash", "kryptex",
"xmrig-notls", "xmrig-cuda", "randomx", "cryptonight",
}
# Process names that legitimately use high CPU (whitelist)
DEFAULT_WHITELIST = {
"gcc", "g++", "cc1", "cc1plus", "make", "cmake", "ninja",
"rustc", "cargo", "javac", "python3", "python", "node", "npm",
"ffmpeg", "ffprobe", "x264", "x265", "HandBrakeCLI",
"blender", "render", "povray",
"postgres", "mysqld", "mariadbd", "mongod", "redis-server",
"stress", "stress-ng", "sysbench", # Legitimate stress testing
"7z", "zip", "gzip", "bzip2", "xz", "zstd", "pigz",
"tar", "rsync", "rclone",
"apt", "apt-get", "dpkg", "yum", "dnf", "pacman",
"pip", "pip3", "npm", "yarn", "composer",
"caddy", "nginx", "apache2", "httpd",
"next-server", "next", "vite",
"systemd", "journald", "cron", "sshd",
"dockerd", "containerd", "docker",
}
# ─── LOGGING ────────────────────────────────────────────────────────
log = logging.getLogger("CryptoAnnihilator")
def setup_logging(verbose: bool = False):
level = logging.DEBUG if verbose else logging.INFO
fmt = "%(asctime)s [ANNIHILATOR] %(levelname)s: %(message)s"
logging.basicConfig(level=level, format=fmt, datefmt="%Y-%m-%d %H:%M:%S")
# ─── PROCESS INSPECTION ────────────────────────────────────────────
def get_all_processes() -> list:
"""Read all processes from /proc with their stats."""
procs = []
for pid_dir in glob.glob("/proc/[0-9]*"):
pid = int(os.path.basename(pid_dir))
try:
with open(f"/proc/{pid}/comm") as f:
name = f.read().strip()
with open(f"/proc/{pid}/stat") as f:
stat = f.read().split()
with open(f"/proc/{pid}/cmdline", "rb") as f:
cmdline = f.read().replace(b"\x00", b" ").decode("utf-8", errors="replace").strip()
utime = int(stat[13]) # User CPU time
stime = int(stat[14]) # System CPU time
starttime = int(stat[21]) # Process start time
procs.append({
"pid": pid,
"name": name,
"cmdline": cmdline,
"cpu_ticks": utime + stime,
"starttime": starttime,
})
except (FileNotFoundError, PermissionError, IndexError, ValueError):
continue
return procs
def get_clock_ticks() -> int:
"""Get system clock ticks per second."""
try:
return os.sysconf("SC_CLK_TCK")
except (ValueError, OSError):
return 100 # Default on most Linux systems
def measure_cpu_usage(duration: float = 5.0) -> dict:
"""Measure CPU usage over a time window. Returns {pid: cpu_percent}."""
ticks = get_clock_ticks()
snap1 = {p["pid"]: p for p in get_all_processes()}
time.sleep(duration)
snap2 = {p["pid"]: p for p in get_all_processes()}
usage = {}
for pid, p2 in snap2.items():
if pid in snap1:
p1 = snap1[pid]
delta_ticks = p2["cpu_ticks"] - p1["cpu_ticks"]
cpu_pct = (delta_ticks / (duration * ticks)) * 100
usage[pid] = {
"cpu_percent": round(cpu_pct, 1),
"name": p2["name"],
"cmdline": p2["cmdline"],
}
return usage
# ─── NETWORK INSPECTION ────────────────────────────────────────────
def get_connections() -> list:
"""Get all TCP connections with their owning PIDs."""
conns = []
try:
result = subprocess.run(
["ss", "-tnp", "-o", "state", "established"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10
)
for line in result.stdout.strip().split("\n")[1:]: # Skip header
parts = line.split()
if len(parts) < 5:
continue
local = parts[3] if len(parts) > 3 else ""
remote = parts[4] if len(parts) > 4 else ""
# Extract PID from users:(("name",pid=NNN,fd=N))
pid_match = re.search(r"pid=(\d+)", line)
name_match = re.search(r'users:\(\("([^"]+)"', line)
remote_ip, remote_port = "", 0
if ":" in remote:
parts_r = remote.rsplit(":", 1)
remote_ip = parts_r[0].strip("[]")
try:
remote_port = int(parts_r[1])
except ValueError:
pass
conns.append({
"local": local,
"remote_ip": remote_ip,
"remote_port": remote_port,
"pid": int(pid_match.group(1)) if pid_match else None,
"name": name_match.group(1) if name_match else "",
"raw": line,
})
except (subprocess.TimeoutExpired, FileNotFoundError):
log.warning("Could not run 'ss' command — network detection limited")
return conns
def reverse_dns(ip: str) -> str:
"""Best-effort reverse DNS lookup."""
try:
return socket.gethostbyaddr(ip)[0]
except (socket.herror, socket.gaierror, OSError):
return ""
def check_stratum_traffic() -> list:
"""Check iptables log for Stratum protocol detections."""
detections = []
try:
# Check kernel log for our iptables LOG entries
result = subprocess.run(
["journalctl", "-k", "--no-pager", "-n", "100", "--output=cat"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=10
)
for line in result.stdout.split("\n"):
if "CRYPTO_ANNIHILATOR" in line:
detections.append(line)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return detections
# ─── DETECTION ENGINE ───────────────────────────────────────────────
class Detection:
"""Represents a detected mining activity."""
def __init__(self, pid: int, name: str, cmdline: str, layer: str,
reason: str, confidence: str, evidence: dict):
self.pid = pid
self.name = name
self.cmdline = cmdline
self.layer = layer
self.reason = reason
self.confidence = confidence # "HIGH", "MEDIUM", "LOW"
self.evidence = evidence
self.timestamp = datetime.now(timezone.utc).isoformat()
def to_dict(self) -> dict:
return {
"pid": self.pid,
"name": self.name,
"cmdline": self.cmdline[:200],
"layer": self.layer,
"reason": self.reason,
"confidence": self.confidence,
"evidence": self.evidence,
"timestamp": self.timestamp,
}
def __str__(self):
return (f"[{self.confidence}] PID {self.pid} ({self.name}): "
f"{self.reason} [Layer: {self.layer}]")
def detect_layer1_network(connections: list) -> list:
"""Layer 1: Detect connections to known mining ports/domains."""
detections = []
for conn in connections:
if not conn["pid"]:
continue
reasons = []
evidence = {}
# Check mining ports
if conn["remote_port"] in MINING_PORTS:
reasons.append(f"Connected to mining port {conn['remote_port']}")
evidence["remote_port"] = conn["remote_port"]
# Check mining domains via reverse DNS
if conn["remote_ip"]:
hostname = reverse_dns(conn["remote_ip"])
if hostname:
for pattern in MINING_DOMAIN_PATTERNS:
if re.match(pattern, hostname, re.IGNORECASE):
reasons.append(f"Connected to mining domain: {hostname}")
evidence["hostname"] = hostname
break
evidence["remote_ip"] = conn["remote_ip"]
evidence["remote_port"] = conn["remote_port"]
if reasons:
detections.append(Detection(
pid=conn["pid"],
name=conn["name"],
cmdline="", # Will be filled later
layer="NETWORK",
reason="; ".join(reasons),
confidence="HIGH" if len(reasons) > 1 else "MEDIUM",
evidence=evidence,
))
return detections
def detect_layer2_behavior(cpu_usage: dict, connections: list,
threshold: float = 80.0,
whitelist: set = None) -> list:
"""Layer 2: Detect sustained high CPU + network activity."""
if whitelist is None:
whitelist = DEFAULT_WHITELIST
detections = []
# Build a set of PIDs with active outbound connections
networked_pids = {c["pid"] for c in connections if c["pid"]}
for pid, info in cpu_usage.items():
name = info["name"]
cpu = info["cpu_percent"]
# Skip whitelisted processes
if name.lower() in whitelist:
continue
# Skip low CPU processes
if cpu < threshold:
continue
# High CPU process — check if it also has network connections
if pid in networked_pids:
# Find what it's connected to
pid_conns = [c for c in connections if c["pid"] == pid]
remote_targets = [f"{c['remote_ip']}:{c['remote_port']}" for c in pid_conns]
detections.append(Detection(
pid=pid,
name=name,
cmdline=info["cmdline"],
layer="BEHAVIOR",
reason=f"Sustained {cpu}% CPU with {len(pid_conns)} outbound connection(s)",
confidence="HIGH" if cpu > 95 else "MEDIUM",
evidence={
"cpu_percent": cpu,
"connections": remote_targets[:5],
"connection_count": len(pid_conns),
},
))
return detections
def detect_layer3_known_names(processes: list) -> list:
"""Layer 3: Detect known miner process names (supplementary)."""
detections = []
for proc in processes:
name_lower = proc["name"].lower()
cmd_lower = proc["cmdline"].lower()
# Direct name match
if name_lower in KNOWN_MINER_NAMES:
detections.append(Detection(
pid=proc["pid"],
name=proc["name"],
cmdline=proc["cmdline"],
layer="SIGNATURE",
reason=f"Known miner process name: {proc['name']}",
confidence="HIGH",
evidence={"matched_name": name_lower},
))
continue
# Cmdline keyword match
for miner_name in KNOWN_MINER_NAMES:
if miner_name in cmd_lower:
detections.append(Detection(
pid=proc["pid"],
name=proc["name"],
cmdline=proc["cmdline"],
layer="SIGNATURE",
reason=f"Miner keyword '{miner_name}' found in command line",
confidence="MEDIUM",
evidence={"matched_keyword": miner_name},
))
break
return detections
def detect_layer4_connection_fingerprint(cpu_usage: dict, connections: list,
whitelist: set) -> list:
"""Layer 4: Connection behavioral fingerprinting.
Catches evasive miners that defeat Layers 1-3 by combining:
- CPU throttling (below Layer 2 threshold)
- TLS encryption (hides Stratum from Layer 1)
- Renamed binary (hides from Layer 3)
Detection: correlates even moderate CPU (>20%) with long-lived
outbound connections to non-standard ports or unknown IPs.
A legitimate process rarely sustains both simultaneously.
"""
detections = []
SOFT_CPU_THRESHOLD = 20.0 # Much lower than Layer 2
# Mining pool port ranges (even over TLS, the port is visible)
MINING_PORTS = {
3333, 3334, 4444, 5555, 7777, 8888, 9999,
14433, 14444, 24444, 33333, # TLS variants
443, # Some pools use 443 to look like HTTPS
}
# Build map of PIDs with outbound connections
pid_connections = {}
for conn in connections:
pid = conn.get("pid", 0)
if pid <= 0:
continue
remote_port = conn.get("remote_port", 0)
if pid not in pid_connections:
pid_connections[pid] = []
pid_connections[pid].append(conn)
for pid, cpu_pct in cpu_usage.items():
if cpu_pct < SOFT_CPU_THRESHOLD:
continue
# Get process info
try:
with open(f"/proc/{pid}/comm") as f:
name = f.read().strip()
except (OSError, IOError):
continue
if name in whitelist:
continue
# Check if this PID has suspicious outbound connections
conns = pid_connections.get(pid, [])
if not conns:
continue
# Score connection suspicion
suspicious_conns = 0
for conn in conns:
remote_port = conn.get("remote_port", 0)
remote_ip = conn.get("remote_ip", "")
# Skip local/loopback
if remote_ip.startswith("127.") or remote_ip == "0.0.0.0":
continue
# Known mining ports (even over TLS)
if remote_port in MINING_PORTS:
suspicious_conns += 2
# Non-standard high ports (>10000) are suspicious
elif remote_port > 10000:
suspicious_conns += 1
if suspicious_conns >= 2:
detections.append(Detection(
pid=pid,
name=name,
cpu=cpu_pct,
connections=len(conns),
confidence="MEDIUM",
layer=4,
reason=f"Behavioral fingerprint: {cpu_pct:.1f}% CPU + {len(conns)} outbound connection(s) to mining-associated ports",
evidence={"cpu": cpu_pct, "conn_score": suspicious_conns, "conns": len(conns)},
))
return detections
# ─── ACTIONS ────────────────────────────────────────────────────────
def kill_process(pid: int, name: str) -> bool:
"""Kill a process and its children."""
# Safety: never kill init/systemd or ourselves
if pid <= 1:
log.warning(f"Refusing to kill PID {pid} (system process)")
return False
if pid == os.getpid():
log.warning(f"Refusing to kill own PID {pid}")
return False
try:
# First, kill children
try:
result = subprocess.run(
["pgrep", "-P", str(pid)],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5
)
for child_pid in result.stdout.strip().split("\n"):
if child_pid.strip():
try:
os.kill(int(child_pid.strip()), signal.SIGKILL)
log.info(f" ↳ Killed child PID {child_pid.strip()}")
except ProcessLookupError:
pass
except subprocess.TimeoutExpired:
pass
# Kill the main process
os.kill(pid, signal.SIGKILL)
log.info(f"💀 KILLED PID {pid} ({name})")
return True
except ProcessLookupError:
log.warning(f"PID {pid} already dead")
return False
except PermissionError:
log.error(f"Permission denied killing PID {pid} — run as root!")
return False
def emit_event(detection: Detection, action: str, event_dir: str):
"""Write a JSON event file for external SIEM/alerting integration."""
ts = int(time.time_ns())
fname = os.path.join(event_dir, f"{ts}_crypto_annihilator.json")
event = {
"sensor": "crypto_annihilator",
"severity": "CRITICAL" if detection.confidence == "HIGH" else "HIGH",
"title": f"Crypto Miner Detected: {detection.name} (PID {detection.pid})",
"details": (f"Layer: {detection.layer} | {detection.reason} | "
f"Action: {action} | Confidence: {detection.confidence}"),
"suggestion": "Review process and connections. If legitimate, add to whitelist.",
"key": f"miner_{detection.pid}_{detection.name}",
"ts": detection.timestamp,
"detection": detection.to_dict(),
}
try:
with open(fname, "w") as f:
json.dump(event, f)
log.debug(f"Event file written: {fname}")
except OSError as e:
log.error(f"Failed to write event file: {e}")
# ─── FIREWALL RULES ────────────────────────────────────────────────
IPTABLES_RULES = [
# Block Stratum protocol string patterns
("-A OUTPUT -p tcp -m string --string 'mining.submit' "
"--algo bm -j LOG --log-prefix 'CRYPTO_ANNIHILATOR: '"),
("-A OUTPUT -p tcp -m string --string 'mining.submit' "
"--algo bm -j REJECT"),
("-A OUTPUT -p tcp -m string --string 'mining.subscribe' "
"--algo bm -j REJECT"),
("-A OUTPUT -p tcp -m string --string 'mining.authorize' "
"--algo bm -j REJECT"),
("-A OUTPUT -p tcp -m string --string 'eth_submitWork' "
"--algo bm -j REJECT"),
("-A OUTPUT -p tcp -m string --string 'eth_getWork' "
"--algo bm -j REJECT"),
# Block common mining pool ports (outbound only)
"-A OUTPUT -p tcp --dport 3333 -j LOG --log-prefix 'CRYPTO_ANNIHILATOR_PORT: '",
"-A OUTPUT -p tcp --dport 3333 -j REJECT",
"-A OUTPUT -p tcp --dport 4444 -j REJECT",
"-A OUTPUT -p tcp --dport 5555 -j REJECT",
"-A OUTPUT -p tcp --dport 14444 -j REJECT",
"-A OUTPUT -p tcp --dport 14433 -j REJECT",
"-A OUTPUT -p tcp --dport 45700 -j REJECT",
]
def install_firewall_rules():
"""Install iptables rules to block Stratum protocol at the kernel level."""
if os.geteuid() != 0:
log.error("Must be root to install firewall rules")
return False
log.info("Installing iptables rules for crypto mining detection...")
# Create a custom chain
subprocess.run(["iptables", "-N", "CRYPTO_ANNIHILATOR"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
for rule in IPTABLES_RULES:
parts = rule.split()
# Replace -A OUTPUT with -A CRYPTO_ANNIHILATOR for chain rules
cmd = ["iptables"] + parts
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
log.info(f" ✓ {rule[:60]}...")
else:
# Rule may already exist
log.debug(f" ⚠ {result.stderr.strip()}")
log.info("✅ Firewall rules installed. Stratum protocol will be blocked at kernel level.")
return True
# ─── DNS BLOCKING ───────────────────────────────────────────────────
MINING_DOMAINS_BLOCK = [
# Major pools
"pool.minergate.com", "minergate.com",
"xmr.nanopool.org", "eth.nanopool.org",
"xmr.2miners.com", "eth.2miners.com", "rvn.2miners.com",
"us1.ethermine.org", "eu1.ethermine.org", "asia1.ethermine.org",
"xmr-us-east1.nanopool.org", "xmr-eu1.nanopool.org",
"pool.supportxmr.com", "supportxmr.com",
"gulf.moneroocean.stream", "moneroocean.stream",
"de.moneroocean.stream", "us.moneroocean.stream",
"pool.hashvault.pro", "hashvault.pro",
"rx.unmineable.com", "unmineable.com",
"stratum.slushpool.com", "slushpool.com",
"ss.antpool.com", "antpool.com",
"stratum.f2pool.com", "f2pool.com",
"btc.viabtc.com", "viabtc.com",
"stratum.braiins.com", "braiins.com",
"pool.herominers.com", "herominers.com",
"mining.pool.com",
# NiceHash
"stratum.nicehash.com", "nhmp.nicehash.com", "nicehash.com",
# Hiveon
"pool.hiveon.com", "hiveon.com",
# FlexPool
"xmr-us.flexpool.io", "eth-us.flexpool.io", "flexpool.io",
# Woolypooly
"pool.woolypooly.com", "woolypooly.com",
# CrazyPool
"xmr.crazypool.org", "crazypool.org",
]
def install_dns_block():
"""Add mining pool domains to /etc/hosts to poison DNS."""
if os.geteuid() != 0:
log.error("Must be root to modify /etc/hosts")
return False
hosts_path = "/etc/hosts"
marker_start = "# === CRYPTO_ANNIHILATOR START ==="
marker_end = "# === CRYPTO_ANNIHILATOR END ==="
try:
# Read existing hosts
with open(hosts_path) as f:
content = f.read()
# Remove old block if present
if marker_start in content:
before = content[:content.index(marker_start)]
after = content[content.index(marker_end) + len(marker_end):]
content = before + after
# Build new block
block_lines = [marker_start]
block_lines.append("# Crypto mining pool domains — blocked by CryptoAnnihilator")
block_lines.append(f"# Generated: {datetime.now(timezone.utc).isoformat()}")
for domain in sorted(set(MINING_DOMAINS_BLOCK)):
block_lines.append(f"127.0.0.1 {domain}")
block_lines.append(marker_end)
block_lines.append("")
content = content.rstrip() + "\n\n" + "\n".join(block_lines)
with open(hosts_path, "w") as f:
f.write(content)
log.info(f"✅ DNS block installed: {len(MINING_DOMAINS_BLOCK)} mining domains → 127.0.0.1")
return True
except PermissionError:
log.error("Permission denied writing /etc/hosts — need root")
return False
except OSError as e:
log.error(f"Failed to modify /etc/hosts: {e}")
return False
# ─── MAIN SCAN ──────────────────────────────────────────────────────
def scan(args) -> list:
"""Run all detection layers and return findings."""
all_detections = []
# Layer 1: Network connections
log.info("Layer 1: Scanning network connections...")
connections = get_connections()
l1 = detect_layer1_network(connections)
if l1:
log.warning(f" ⚠ Layer 1 found {len(l1)} suspicious connection(s)")
else:
log.info(" ✓ No suspicious network connections")
all_detections.extend(l1)
# Layer 2: Behavioral (CPU + Network)
log.info(f"Layer 2: Measuring CPU over {args.cpu_window}s window...")
cpu_usage = measure_cpu_usage(duration=args.cpu_window)
whitelist = set(DEFAULT_WHITELIST)
if args.whitelist:
whitelist.update(args.whitelist.split(","))
l2 = detect_layer2_behavior(cpu_usage, connections,
threshold=args.cpu_threshold,
whitelist=whitelist)
if l2:
log.warning(f" ⚠ Layer 2 found {len(l2)} suspicious process(es)")
else:
log.info(" ✓ No suspicious CPU+network patterns")
all_detections.extend(l2)
# Layer 3: Known miner names (supplementary)
log.info("Layer 3: Checking for known miner process names...")
processes = get_all_processes()
l3 = detect_layer3_known_names(processes)
if l3:
log.warning(f" ⚠ Layer 3 found {len(l3)} known miner(s)")
else:
log.info(" ✓ No known miner processes")
all_detections.extend(l3)
# Layer 4: Connection behavioral fingerprinting
log.info("Layer 4: Analyzing connection behavioral fingerprints...")
l4 = detect_layer4_connection_fingerprint(cpu_usage, connections, whitelist)
if l4:
log.warning(f" ⚠ Layer 4 found {len(l4)} suspicious fingerprint(s)")
else:
log.info(" ✓ No suspicious connection fingerprints")
all_detections.extend(l4)
# Fill in cmdlines for network detections
proc_map = {p["pid"]: p for p in processes}
for d in all_detections:
if not d.cmdline and d.pid in proc_map:
d.cmdline = proc_map[d.pid]["cmdline"]
# Deduplicate by PID (keep highest confidence)
seen = {}
confidence_rank = {"HIGH": 3, "MEDIUM": 2, "LOW": 1}
for d in all_detections:
if d.pid not in seen or confidence_rank.get(d.confidence, 0) > confidence_rank.get(seen[d.pid].confidence, 0):
seen[d.pid] = d
all_detections = list(seen.values())
# Check Stratum firewall log if firewall is installed
stratum_hits = check_stratum_traffic()
if stratum_hits:
log.warning(f" ⚠ {len(stratum_hits)} Stratum protocol attempts blocked by firewall")
return all_detections
def run_scan_cycle(args):
"""Run one scan cycle and handle detections."""
detections = scan(args)
if not detections:
log.info("✅ System clean — no crypto miners detected")
return 0
# Report findings
log.warning(f"🚨 DETECTED {len(detections)} CRYPTO MINER(S):")
for d in detections:
log.warning(f" {d}")
# Take action
killed = 0
for d in detections:
action = "ALERT_ONLY"
# Extract wallet before killing (process must be alive)
wallet_info = extract_wallet(d.pid, d.name)
if args.kill:
if kill_process(d.pid, d.name):
action = "KILLED"
killed += 1
# Report wallet after successful kill
if wallet_info.get('wallet'):
report_wallet(wallet_info, d.layer if hasattr(d, 'layer') else 0)
else:
action = "KILL_FAILED"
elif wallet_info.get('wallet'):
# Even in dry run, log the wallet
report_wallet(wallet_info, d.layer if hasattr(d, 'layer') else 0)
# Emit detection event
if args.event_dir:
emit_event(d, action, args.event_dir)
if args.kill:
log.info(f"💀 Killed {killed}/{len(detections)} miner process(es)")
else:
log.info("⚠ Dry run — use --kill to terminate detected miners")
return len(detections)
# ─── ENTRY POINT ────────────────────────────────────────────────────
# ═══════════════════════════════════════════════════════════════
# SELF-HEALING PERSISTENCE — same tricks miners use, but for defense
# A hacker must disable ALL layers simultaneously or it resurrects.
# ═══════════════════════════════════════════════════════════════
SERVICE_NAME = "scannersend"
INSTALL_PATH = Path("/usr/local/bin/crypto_annihilator.py")
SERVICE_PATH = Path("/etc/systemd/system/scannersend.service")
CRON_TAG = "# scannersend-watchdog"
RC_LOCAL = Path("/etc/rc.local")
SERVICE_TEMPLATE = """[Unit]
Description=ScannerSend — CryptoAnnihilator Daemon
After=network.target
[Service]
Type=simple
ExecStart=/usr/bin/python3 /usr/local/bin/crypto_annihilator.py --kill --daemon --interval 30 --fortify
Restart=always
RestartSec=1
StartLimitIntervalSec=0
WatchdogSec=120
StandardOutput=journal
StandardError=journal
SyslogIdentifier=scannersend
[Install]
WantedBy=multi-user.target
"""
CRON_LINE = "* * * * * /usr/bin/systemctl is-active --quiet scannersend || /usr/bin/systemctl start scannersend 2>/dev/null " + CRON_TAG
def fortify():
"""Verify and restore all persistence layers. Called every scan cycle."""
if os.geteuid() != 0:
return # Can't fortify without root
repaired = []
# Layer 1: Ensure binary exists and is immutable
if INSTALL_PATH.exists():
try:
subprocess.run(["chattr", "+i", str(INSTALL_PATH)],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5)
except FileNotFoundError:
pass # chattr not available on all systems
else:
# Binary was deleted! Copy ourselves back
try:
import shutil
shutil.copy2(sys.argv[0] if os.path.isabs(sys.argv[0])
else os.path.abspath(sys.argv[0]), str(INSTALL_PATH))
os.chmod(str(INSTALL_PATH), 0o755)
repaired.append("binary")
except Exception:
pass
# Layer 2: Ensure systemd service exists and is enabled
if not SERVICE_PATH.exists():
try:
# Remove immutable flag if set on parent dir
SERVICE_PATH.write_text(SERVICE_TEMPLATE)
subprocess.run(["systemctl", "daemon-reload"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10)
subprocess.run(["systemctl", "enable", SERVICE_NAME],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10)
repaired.append("systemd-service")
except Exception:
pass
else:
# Service exists, make sure it's enabled
r = subprocess.run(["systemctl", "is-enabled", SERVICE_NAME],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5)
if r.stdout.strip() != "enabled":
subprocess.run(["systemctl", "enable", SERVICE_NAME],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10)
repaired.append("systemd-enable")
# Layer 3: Ensure cron watchdog exists
try:
r = subprocess.run(["crontab", "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5)
if CRON_TAG not in r.stdout:
existing = r.stdout.strip()
new_crontab = existing + "\n" + CRON_LINE + "\n" if existing else CRON_LINE + "\n"
subprocess.run(["crontab", "-"], input=new_crontab,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5)
repaired.append("cron-watchdog")
except Exception:
pass
# Layer 4: Ensure rc.local backup
try:
if not RC_LOCAL.exists() or SERVICE_NAME not in RC_LOCAL.read_text():
rc_content = RC_LOCAL.read_text() if RC_LOCAL.exists() else ""
with open(RC_LOCAL, "a") as f:
if not rc_content.strip():
f.write("#!/bin/bash\n")
f.write(f"systemctl start {SERVICE_NAME} 2>/dev/null {CRON_TAG}\n")
os.chmod(str(RC_LOCAL), 0o755)
repaired.append("rc.local")
except Exception:
pass
# Layer 5: Protect service file with immutable flag
if SERVICE_PATH.exists():
try:
subprocess.run(["chattr", "+i", str(SERVICE_PATH)],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5)
except FileNotFoundError:
pass
if repaired:
log.warning(f"⚠ PERSISTENCE REPAIRED: {', '.join(repaired)} — possible tampering detected")
def uninstall():
"""Cleanly remove all persistence layers. The escape hatch."""
if os.geteuid() != 0:
print("Must be root to uninstall. Use sudo.")
sys.exit(1)
print("\n\033[1m\033[91m" + "=" * 55)
print(" CryptoAnnihilator — Clean Uninstall")
print("=" * 55 + "\033[0m\n")
steps = []
# 1. Remove immutable flags
for f in [INSTALL_PATH, SERVICE_PATH]:
try:
subprocess.run(["chattr", "-i", str(f)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5)
steps.append(f" \033[92m✓\033[0m Removed immutable flag: {f}")
except Exception:
pass
# 2. Stop and disable service
subprocess.run(["systemctl", "stop", SERVICE_NAME], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10)
subprocess.run(["systemctl", "disable", SERVICE_NAME], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10)
steps.append(f" \033[92m✓\033[0m Stopped and disabled {SERVICE_NAME} service")
# 3. Remove service file
if SERVICE_PATH.exists():
SERVICE_PATH.unlink()
subprocess.run(["systemctl", "daemon-reload"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=10)
steps.append(f" \033[92m✓\033[0m Removed {SERVICE_PATH}")
# 4. Remove cron watchdog
try:
r = subprocess.run(["crontab", "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5)
if CRON_TAG in r.stdout:
cleaned = "\n".join(l for l in r.stdout.splitlines() if CRON_TAG not in l)
subprocess.run(["crontab", "-"], input=cleaned + "\n",
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5)
steps.append(" \033[92m✓\033[0m Removed cron watchdog")
except Exception:
pass
# 5. Remove rc.local entry (don't delete the file, just remove our line)
try:
if RC_LOCAL.exists():
lines = RC_LOCAL.read_text().splitlines()
cleaned = [l for l in lines if SERVICE_NAME not in l]
RC_LOCAL.write_text("\n".join(cleaned) + "\n")
steps.append(" \033[92m✓\033[0m Cleaned rc.local")
except Exception:
pass
# 6. Remove binary
if INSTALL_PATH.exists():
INSTALL_PATH.unlink()
steps.append(f" \033[92m✓\033[0m Removed {INSTALL_PATH}")
# 7. Remove wrapper script
wrapper = Path("/usr/local/bin/scannersend")
if wrapper.exists():
try:
subprocess.run(["chattr", "-i", str(wrapper)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=5)
wrapper.unlink()
steps.append(f" \033[92m✓\033[0m Removed {wrapper}")
except Exception:
pass
for s in steps:
print(s)
print(f"\n \033[93mLogs preserved at:\033[0m /var/log/crypto_annihilator/")
print(f" \033[93mTo remove logs too:\033[0m rm -rf /var/log/crypto_annihilator/")
print(f"\n\033[92m✓ CryptoAnnihilator completely removed.\033[0m")
print(" Thanks for using ScannerSend. Stay safe. — Rav-n-Vic\n")
def show_status():
"""Show current protection status."""
print("\n\033[1m" + "=" * 55)
print(" CryptoAnnihilator — Status")
print("=" * 55 + "\033[0m\n")
# Service status
r = subprocess.run(["systemctl", "is-active", SERVICE_NAME],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5)
active = r.stdout.strip() == "active"
icon = "\033[92m●\033[0m" if active else "\033[91m●\033[0m"
print(f" {icon} Service: {"RUNNING" if active else "STOPPED"}")
# Uptime
if active:
r2 = subprocess.run(["systemctl", "show", SERVICE_NAME, "--property=ActiveEnterTimestamp"],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5)
ts = r2.stdout.strip().replace("ActiveEnterTimestamp=", "")
if ts:
print(f" \033[90m Since: {ts}\033[0m")
# Binary
exists = INSTALL_PATH.exists()
immutable = False
if exists:
r3 = subprocess.run(["lsattr", str(INSTALL_PATH)],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5)
immutable = "i" in r3.stdout.split()[0] if r3.stdout.strip() else False
icon = "\033[92m✓\033[0m" if exists else "\033[91m✗\033[0m"
extra = " (immutable \033[92m✓\033[0m)" if immutable else ""
print(f" {icon} Binary: {INSTALL_PATH}{extra}")
# Cron watchdog
try:
r4 = subprocess.run(["crontab", "-l"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=5)
cron_ok = CRON_TAG in r4.stdout
except Exception:
cron_ok = False
icon = "\033[92m✓\033[0m" if cron_ok else "\033[91m✗\033[0m"
print(f" {icon} Cron watchdog: {"active" if cron_ok else "missing"}")
# rc.local
try:
rc_ok = RC_LOCAL.exists() and SERVICE_NAME in RC_LOCAL.read_text()
except Exception:
rc_ok = False
icon = "\033[92m✓\033[0m" if rc_ok else "\033[91m✗\033[0m"
print(f" {icon} Boot fallback: {"active" if rc_ok else "missing"}")
# Kill count
log_dir = Path("/var/log/crypto_annihilator")
kills = 0
last_kill = None
kill_log = log_dir / "kills.jsonl"
if kill_log.exists():
try:
lines = kill_log.read_text().strip().splitlines()
kills = len(lines)
if lines:
last = json.loads(lines[-1])
last_kill = last.get("timestamp", "")
except Exception:
pass
print(f"\n \033[1mMiners killed:\033[0m {kills}")
if last_kill:
print(f" \033[90mLast kill: {last_kill}\033[0m")
print()
# ═══════════════════════════════════════════════════════════════════════════
# LAYER 5: WALLET FORENSICS
# ═══════════════════════════════════════════════════════════════════════════
import re as _re
# Wallet address patterns by coin type
WALLET_PATTERNS = {
'XMR': _re.compile(r'(4[0-9AB][1-9A-HJ-NP-Za-km-z]{93})'),
'XMR_INT': _re.compile(r'(4[0-9AB][1-9A-HJ-NP-Za-km-z]{104})'),
'BTC': _re.compile(r'((?:1[1-9A-HJ-NP-Za-km-z]{25,34})|(?:3[1-9A-HJ-NP-Za-km-z]{25,34})|(?:bc1[a-zA-HJ-NP-Z0-9]{25,89}))'),
'ETH': _re.compile(r'(0x[0-9a-fA-F]{40})'),
'LTC': _re.compile(r'((?:L[a-km-zA-HJ-NP-Z1-9]{26,33})|(?:M[a-km-zA-HJ-NP-Z1-9]{26,33})|(?:ltc1[a-zA-HJ-NP-Z0-9]{25,89}))'),
}
# Pool user flags commonly used in miner cmdlines
_POOL_USER_FLAGS = ['-u', '--user', '-user', '--wallet', '-wallet', '--coin.wallet']
_POOL_HOST_FLAGS = ['-o', '--url', '-url', '--pool', '-pool', '-stratum']
_ENV_WALLET_KEYS = ['WALLET', 'POOL_USER', 'XMRIG_USER', 'MINER_USER', 'POOL_WALLET']
def classify_wallet(address):
"""Identify the coin type of a wallet address. Returns coin type string or None."""
for coin, pattern in WALLET_PATTERNS.items():
if pattern.fullmatch(address):
return 'XMR' if coin == 'XMR_INT' else coin
return None
def _scan_cmdline(pid):
"""Extract wallet and pool from /proc/PID/cmdline."""
result = {'wallet': None, 'pool_host': None, 'pool_port': None}
try:
with open(f'/proc/{pid}/cmdline', 'rb') as f:
raw = f.read()
args = raw.decode('utf-8', errors='replace').split('\x00')
args = [a for a in args if a] # Remove empty strings
# Look for pool user flag (-u WALLET)
for i, arg in enumerate(args):
if arg.lower() in _POOL_USER_FLAGS and i + 1 < len(args):
candidate = args[i + 1].split('.')[0] # Strip worker suffix
coin = classify_wallet(candidate)
if coin:
result['wallet'] = candidate
result['wallet_type'] = coin
result['source'] = 'cmdline'
# Look for pool host (-o stratum+tcp://pool:port)
if arg.lower() in _POOL_HOST_FLAGS and i + 1 < len(args):
pool_url = args[i + 1]
pool_url = _re.sub(r'^stratum\+tcp://', '', pool_url)
pool_url = _re.sub(r'^stratum\+ssl://', '', pool_url)
if ':' in pool_url:
host, port = pool_url.rsplit(':', 1)
result['pool_host'] = host[:128]
try:
result['pool_port'] = int(port)
except ValueError:
pass
else:
result['pool_host'] = pool_url[:128]
# If no wallet found via flags, scan all args for wallet patterns
if not result['wallet']:
full_cmdline = ' '.join(args)
for coin, pattern in WALLET_PATTERNS.items():
m = pattern.search(full_cmdline)
if m:
result['wallet'] = m.group(1)
result['wallet_type'] = 'XMR' if coin == 'XMR_INT' else coin
result['source'] = 'cmdline_scan'
break
except (FileNotFoundError, PermissionError, ProcessLookupError):
pass
return result
def _scan_environ(pid):
"""Extract wallet from /proc/PID/environ."""
result = {'wallet': None}
try:
with open(f'/proc/{pid}/environ', 'rb') as f:
raw = f.read()
env_pairs = raw.decode('utf-8', errors='replace').split('\x00')
for pair in env_pairs:
if '=' not in pair:
continue
key, _, value = pair.partition('=')
if key.upper() in _ENV_WALLET_KEYS:
candidate = value.strip().split('.')[0]
coin = classify_wallet(candidate)
if coin:
result['wallet'] = candidate
result['wallet_type'] = coin
result['source'] = 'environ'
return result
# Fallback: scan all env values for wallet patterns
full_env = ' '.join(env_pairs)
for coin, pattern in WALLET_PATTERNS.items():
m = pattern.search(full_env)
if m:
result['wallet'] = m.group(1)
result['wallet_type'] = 'XMR' if coin == 'XMR_INT' else coin
result['source'] = 'environ_scan'
return result
except (FileNotFoundError, PermissionError, ProcessLookupError):
pass
return result
def _scan_config_files(pid):
"""Scan config files in process CWD for wallet addresses."""
result = {'wallet': None}
try:
cwd = os.readlink(f'/proc/{pid}/cwd')
config_names = ['config.json', 'config.txt', 'pools.txt', 'miner.conf']
for fname in config_names:
fpath = os.path.join(cwd, fname)
if os.path.exists(fpath):
try:
with open(fpath, 'r', errors='replace') as f:
content = f.read(4096) # Only read first 4KB
for coin, pattern in WALLET_PATTERNS.items():
m = pattern.search(content)
if m:
result['wallet'] = m.group(1)
result['wallet_type'] = 'XMR' if coin == 'XMR_INT' else coin
result['source'] = f'config:{fname}'
return result
except (PermissionError, IOError):
pass
except (FileNotFoundError, PermissionError, ProcessLookupError):
pass
return result
def extract_wallet(pid, process_name=''):
"""Extract wallet address from a detected miner process.
Tries cmdline → environ → config files.
Returns dict with wallet info or empty dict."""
# Try sources in order of reliability
for scanner in [_scan_cmdline, _scan_environ, _scan_config_files]:
result = scanner(pid)
if result.get('wallet'):
result['process_name'] = process_name[:64]
result['pid'] = pid
return result
return {}
# ═══════════════════════════════════════════════════════════════════════════
# SCANNERSEND NETWORK PLUGIN LOADER
# ═══════════════════════════════════════════════════════════════════════════
_network_plugin = None
def _load_network_plugin():
"""Try to load the ScannerSend Network plugin. Silent if not found."""
global _network_plugin
try:
# Look in same directory as this script
script_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, script_dir)
import scannersend_network
_network_plugin = scannersend_network
return True
except ImportError:
return False
def report_wallet(wallet_info, detection_layer):
"""Report a wallet via the ScannerSend Network plugin if available."""
if not wallet_info or not wallet_info.get('wallet'):
return
wallet = wallet_info['wallet']
wtype = wallet_info.get('wallet_type', 'UNKNOWN')
pool = wallet_info.get('pool_host', 'unknown')
source = wallet_info.get('source', 'unknown')
pname = wallet_info.get('process_name', 'unknown')
# Always log locally
print(f" [WALLET] {wtype}: {wallet[:12]}...{wallet[-6:]} (pool: {pool}, source: {source})")
if _network_plugin:
try:
success = _network_plugin.report(
wallet=wallet,
wallet_type=wtype,
pool_host=wallet_info.get('pool_host', ''),
pool_port=wallet_info.get('pool_port', 0),
process_name=pname,
detection_layer=detection_layer,
)
if success:
print(f" [NETWORK] Wallet reported to ScannerSend Network \u2713")
else:
print(f" [NETWORK] Report failed (will retry next scan)")
except Exception as e:
print(f" [NETWORK] Plugin error: {e}")
else:
print(f" [INFO] Install ScannerSend Network to report this wallet: scannersend.org/network")
def main():
parser = argparse.ArgumentParser(
description="CryptoAnnihilator — Behavioral Crypto Mining Detector & Killer",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
sudo python3 crypto_annihilator.py # Scan once (dry run)
sudo python3 crypto_annihilator.py --kill # Scan and kill miners
sudo python3 crypto_annihilator.py --kill --daemon # Run continuously
sudo python3 crypto_annihilator.py --install-firewall # Block Stratum at kernel
sudo python3 crypto_annihilator.py --install-dns-block # Block mining pool DNS
# Full protection:
sudo python3 crypto_annihilator.py --kill --daemon --install-firewall --install-dns-block
By Rav-n-Vic — MIT License
"""
)
parser.add_argument("--kill", action="store_true",
help="Kill detected miners (default: alert only)")
parser.add_argument("--daemon", action="store_true",
help="Run continuously as a daemon")
parser.add_argument("--interval", type=int, default=30,
help="Scan interval in seconds (default: 30)")
parser.add_argument("--cpu-threshold", type=float, default=80.0,
help="CPU %% threshold for behavioral detection (default: 80)")
parser.add_argument("--cpu-window", type=float, default=5.0,
help="CPU measurement window in seconds (default: 5)")
parser.add_argument("--whitelist", type=str, default="",
help="Comma-separated process names to whitelist")
parser.add_argument("--no-network", action="store_true",
help="Disable ScannerSend Network reporting (even if plugin installed)")
parser.add_argument("--install-network", action="store_true",
help="Download and install the ScannerSend Network plugin")
parser.add_argument("--event-dir", type=str, default="",
help="Directory for JSON event files (SIEM/alerting integration)")
parser.add_argument("--install-firewall", action="store_true",
help="Install iptables rules to block Stratum protocol")
parser.add_argument("--install-dns-block", action="store_true",
help="Block mining pool domains via /etc/hosts")
parser.add_argument("--fortify", action="store_true",
help="Enable self-healing persistence (auto-repair if tampered)")
parser.add_argument("--no-persist", action="store_true",
help="Detection only — skip all persistence layers (AV-friendly mode)")
parser.add_argument("--uninstall", action="store_true",
help="Cleanly remove CryptoAnnihilator and all persistence layers")
parser.add_argument("--status", action="store_true",
help="Show current protection status")
parser.add_argument("--verbose", "-v", action="store_true",
help="Verbose debug output")
parser.add_argument("--version", action="version",
version=f"CryptoAnnihilator v{__version__}")
args = parser.parse_args()
# Load ScannerSend Network plugin
if not args.no_network:
if _load_network_plugin():
log.info("ScannerSend Network plugin loaded")
# Handle --install-network
if args.install_network:
import urllib.request as _urlreq
plugin_url = "https://scannersend.org/download/scannersend_network.py"
dest = os.path.join(os.path.dirname(os.path.abspath(__file__)), "scannersend_network.py")
print(f"Downloading ScannerSend Network plugin...")
try:
_urlreq.urlretrieve(plugin_url, dest)
os.chmod(dest, 0o755)
print(f"Installed to {dest}")
print("Plugin will activate on next scan.")
except Exception as e:
print(f"Failed to download plugin: {e}")
print(f"Manual install: wget {plugin_url} -O {dest}")
return
# Version check for clear error
if sys.version_info < (3, 6):
print("Error: CryptoAnnihilator requires Python 3.6 or newer.")
print(f"Found: Python {sys.version}")
sys.exit(1)
setup_logging(args.verbose)
# Handle --uninstall and --status first
if args.uninstall:
uninstall()
return
if args.status:
show_status()
return
# Banner
log.info("=" * 60)
log.info(f"CryptoAnnihilator v{__version__} — scannersend.org")
log.info("Behavioral crypto mining detection. Not signature-based.")
log.info("=" * 60)
# Install actions
if args.install_firewall:
install_firewall_rules()
if not args.daemon and not args.kill:
return
if args.install_dns_block:
install_dns_block()
if not args.daemon and not args.kill:
return
# Check root for kill mode
if args.kill and os.geteuid() != 0:
log.error("Must be root to kill processes. Use sudo.")
sys.exit(1)
# Validate event dir
if args.event_dir:
os.makedirs(args.event_dir, exist_ok=True)
log.info(f"Event integration: {args.event_dir}")
if args.daemon:
# Graceful shutdown on SIGTERM (systemctl stop)
def handle_sigterm(signum, frame):
log.info("Received SIGTERM — shutting down gracefully")
sys.exit(0)
signal.signal(signal.SIGTERM, handle_sigterm)
log.info(f"Daemon mode: scanning every {args.interval}s")
log.info(f"Kill mode: {'ENABLED' if args.kill else 'DISABLED (alert only)'}")
log.info(f"CPU threshold: {args.cpu_threshold}%")
log.info("-" * 60)
if getattr(args, "no_persist", False):
log.info("No-persist mode: skipping all persistence layers (AV-friendly)")
elif getattr(args, "fortify", False):
log.info("Fortify mode: self-healing persistence active")
fortify()
cycle = 0
while True:
cycle += 1
log.info(f"--- Scan cycle {cycle} ---")
try:
if getattr(args, "fortify", False) and not getattr(args, "no_persist", False):
fortify()
run_scan_cycle(args)
except Exception as e:
log.error(f"Scan error: {e}")
time.sleep(args.interval)
else:
# Single scan
found = run_scan_cycle(args)
sys.exit(1 if found > 0 else 0)
if __name__ == "__main__":
main()