Cómo construir un Honeypot en Python: una guía práctica para el engaño de seguridad
En ciberseguridad, un honeypot es un sistema señuelo diseñado para atraer y luego detectar atacantes potenciales que intentan comprometer el sistema. Al igual que un tarro de miel al aire libre atraería moscas.
Piense en estos honeypots como cámaras de seguridad para su sistema. Así como una cámara de seguridad nos ayuda a comprender quién está tratando de entrar en un edificio y cómo lo están haciendo, estos honeypots lo ayudarán a comprender quién está tratando de atacar su sistema y qué técnicas están usando.
Al final de este tutorial, podrá escribir un honeypot de demostración en Python y comprender cómo funcionan los honeypots.
Tabla de contenido
Comprender los tipos de Honeypots
-
Cómo configurar su entorno de desarrollo
Cómo construir el honeypot central
Implementar los oyentes de la red
Ejecute el Honeypot
Escribe el simulador de ataque Honeypot
Cómo analizar los datos de honeypot
Consideraciones de seguridad
Conclusión
Comprender los tipos de Honeypots
Antes de empezar a diseñar nuestro propio honeypot, comprendamos rápidamente sus diferentes tipos:
Honeypots de producción: este tipo de honeypots se colocan en un entorno de producción real y se utilizan para detectar ataques de seguridad reales. Por lo general, son de diseño simple, fáciles de mantener e implementar, y ofrecen una interacción limitada para reducir el riesgo.
-
Honeypots de investigación: son sistemas más complejos configurados por investigadores de seguridad para estudiar patrones de ataque, realizar análisis empíricos de estos patrones, recopilar muestras de malware y comprender nuevas técnicas de ataque que no se hayan descubierto previamente. A menudo emulan redes o sistemas operativos completos en lugar de comportarse como una aplicación en el entorno de producción.
Para este tutorial, construiremos un honeypot de interacción media que registre los intentos de conexión y el comportamiento básico del atacante.
Cómo configurar su entorno de desarrollo
Comencemos configurando su entorno de desarrollo en Python. Ejecute los siguientes comandos:
import socket
import sys
import datetime
import json
import threading
from pathlib import Path
# Configure logging directory
LOG_DIR = Path("honeypot_logs")
LOG_DIR.mkdir(exist_ok=True)
Nos apegaremos a las bibliotecas incorporadas, por lo que no necesitaremos instalar ninguna dependencia externa. Almacenaremos nuestros registros en el directorio honeypot_logs
.
Cómo construir el Honeypot central
Nuestro honeypot básico estará compuesto por tres componentes:
Un oyente de red que acepta conexiones
Un sistema de registro para registrar actividades.
Un servicio de emulación básico para interactuar con los atacantes
Ahora comencemos por inicializar la clase Core Honeypot:
class Honeypot:
def __init__(self, bind_ip="0.0.0.0", ports=None):
self.bind_ip = bind_ip
self.ports = ports or [21, 22, 80, 443] # Default ports to monitor
self.active_connections = {}
self.log_file = LOG_DIR / f"honeypot_{datetime.datetime.now().strftime('%Y%m%d')}.json"
def log_activity(self, port, remote_ip, data):
"""Log suspicious activity with timestamp and details"""
activity = {
"timestamp": datetime.datetime.now().isoformat(),
"remote_ip": remote_ip,
"port": port,
"data": data.decode('utf-8', errors='ignore')
}
with open(self.log_file, 'a') as f:
json.dump(activity, f)
f.write('\n')
def handle_connection(self, client_socket, remote_ip, port):
"""Handle individual connections and emulate services"""
service_banners = {
21: "220 FTP server ready\r\n",
22: "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1\r\n",
80: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n",
443: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n"
}
try:
# Send appropriate banner for the service
if port in service_banners:
client_socket.send(service_banners[port].encode())
# Receive data from attacker
while True:
data = client_socket.recv(1024)
if not data:
break
self.log_activity(port, remote_ip, data)
# Send fake response
client_socket.send(b"Command not recognized.\r\n")
except Exception as e:
print(f"Error handling connection: {e}")
finally:
client_socket.close()
Esta clase contiene mucha información importante, así que repasemos cada función una por una.
La función __init__
registra la IP y los números de puerto en los que alojaremos el honeypot, así como la ruta/nombre del archivo de registro. También mantendremos un registro del número total de conexiones activas que tenemos al honeypot.
La función log_activity
recibirá la información sobre la IP, los datos y el puerto al que la IP intentó una conexión. Luego agregaremos esta información a nuestro archivo de registro de JSON formated.
La función handle_connection
imitará estos servicios que se ejecutarán en los diferentes puertos que tenemos. Tendremos el honeypot ejecutándose en los puertos 21, 22, 80 y 443. Estos servicios son para FTP, SSH, HTTP y el protocolo HTTPS, respectivamente. Por lo tanto, cualquier atacante que intente interactuar con el honeypot debería esperar estos servicios en estos puertos.
Para imitar el comportamiento de estos servicios, utilizaremos los pancartas de servicio que usan en realidad. Esta función primero enviará el banner apropiado cuando el atacante se conecta, y luego recibirá los datos y lo registrará. El Honeypot también enviará una respuesta falsa " comando no reconocido " al atacante.
Implementar los oyentes de la red
Ahora implementemos los oyentes de red que manejarán las conexiones entrantes. Para esto, usaremos programación de sockets simple. Si no sabes cómo funciona la programación de sockets, consulta este artículo que explica algunos conceptos relacionados.
def start_listener(self, port):
"""Start a listener on specified port"""
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.bind_ip, port))
server.listen(5)
print(f"[*] Listening on {self.bind_ip}:{port}")
while True:
client, addr = server.accept()
print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")
# Handle connection in separate thread
client_handler = threading.Thread(
target=self.handle_connection,
args=(client, addr[0], port)
)
client_handler.start()
except Exception as e:
print(f"Error starting listener on port {port}: {e}")
La función start_listener
iniciará el servidor y escuchará en el puerto proporcionado. El bind_ip
para nosotros será 0.0.0.0
que indica que el servidor escuchará en todas las interfaces de red.
Ahora, manejaremos cada nueva conexión en un hilo separado, ya que podría haber casos en los que varios atacantes intenten interactuar con el honeypot o un script o herramienta de ataque esté escaneando el honeypot. Si no sabe cómo funcionan los subprocesos, puede consultar este artículo que explica los subprocesos y la concurrencia en Python.
Además, asegúrese de colocar esta función en la clase principal Honeypot
.
Ejecute el Honeypot
Ahora creemos la función main
que iniciará nuestro honeypot.
def main():
honeypot = Honeypot()
# Start listeners for each port in separate threads
for port in honeypot.ports:
listener_thread = threading.Thread(
target=honeypot.start_listener,
args=(port,)
)
listener_thread.daemon = True
listener_thread.start()
try:
# Keep main thread alive
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[*] Shutting down honeypot...")
sys.exit(0)
if __name__ == "__main__":
main()
Esta función instancia la clase honeypot
e inicia a los oyentes para cada uno de nuestros puertos definidos (21,22,80,443) como un hilo separado. Ahora, mantendremos nuestro hilo principal que ejecuta nuestro programa real vivo al ponerlo en un bucle infinito. Ponga todo esto en un guión y ejecutarlo.
Escribe el simulador de ataque Honeypot
Ahora intentemos simular algunos escenarios de ataque y apuntemos a nuestro honeypot para que podamos recopilar algunos datos en nuestro archivo de registro JSON.
Este simulador nos ayudará a demostrar algunos aspectos importantes sobre los honeypots:
Patrones de ataque realistas: el simulador simulará patrones de ataque comunes como escaneo de puertos, intentos de fuerza bruta y exploits específicos del servicio.
Intensidad variable: El simulador ajustará la intensidad de la simulación para probar cómo su honeypot maneja diferentes cargas.
Varios tipos de ataque: demostrará diferentes tipos de ataques que los atacantes reales podrían intentar, ayudándole a comprender cómo su honeypot responde a cada uno.
Conexiones simultáneas: el simulador utilizará subprocesos para probar cómo su honeypot maneja múltiples conexiones simultáneas.
# honeypot_simulator.py
import socket
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
import argparse
class HoneypotSimulator:
"""
A class to simulate different types of connections and attacks against our honeypot.
This helps in testing the honeypot's logging and response capabilities.
"""
def __init__(self, target_ip="127.0.0.1", intensity="medium"):
# Configuration for the simulator
self.target_ip = target_ip
self.intensity = intensity
# Common ports that attackers often probe
self.target_ports = [21, 22, 23, 25, 80, 443, 3306, 5432]
# Dictionary of common commands used by attackers for different services
self.attack_patterns = {
21: [ # FTP commands
"USER admin\r\n",
"PASS admin123\r\n",
"LIST\r\n",
"STOR malware.exe\r\n"
],
22: [ # SSH attempts
"SSH-2.0-OpenSSH_7.9\r\n",
"admin:password123\n",
"root:toor\n"
],
80: [ # HTTP requests
"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
"POST /admin HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
"GET /wp-admin HTTP/1.1\r\nHost: localhost\r\n\r\n"
]
}
# Intensity settings affect the frequency and volume of simulated attacks
self.intensity_settings = {
"low": {"max_threads": 2, "delay_range": (1, 3)},
"medium": {"max_threads": 5, "delay_range": (0.5, 1.5)},
"high": {"max_threads": 10, "delay_range": (0.1, 0.5)}
}
def simulate_connection(self, port):
"""
Simulates a connection attempt to a specific port with realistic attack patterns
"""
try:
# Create a new socket connection
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
print(f"[*] Attempting connection to {self.target_ip}:{port}")
sock.connect((self.target_ip, port))
# Get banner if any
banner = sock.recv(1024)
print(f"[+] Received banner from port {port}: {banner.decode('utf-8', 'ignore').strip()}")
# Send attack patterns based on the port
if port in self.attack_patterns:
for command in self.attack_patterns[port]:
print(f"[*] Sending command to port {port}: {command.strip()}")
sock.send(command.encode())
# Wait for response
try:
response = sock.recv(1024)
print(f"[+] Received response: {response.decode('utf-8', 'ignore').strip()}")
except socket.timeout:
print(f"[-] No response received from port {port}")
# Add realistic delay between commands
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
sock.close()
except ConnectionRefusedError:
print(f"[-] Connection refused on port {port}")
except socket.timeout:
print(f"[-] Connection timeout on port {port}")
except Exception as e:
print(f"[-] Error connecting to port {port}: {e}")
def simulate_port_scan(self):
"""
Simulates a basic port scan across common ports
"""
print(f"\n[*] Starting port scan simulation against {self.target_ip}")
for port in self.target_ports:
self.simulate_connection(port)
time.sleep(random.uniform(0.1, 0.3))
def simulate_brute_force(self, port):
"""
Simulates a brute force attack against a specific service
"""
common_usernames = ["admin", "root", "user", "test"]
common_passwords = ["password123", "admin123", "123456", "root"]
print(f"\n[*] Starting brute force simulation against port {port}")
for username in common_usernames:
for password in common_passwords:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((self.target_ip, port))
if port == 21: # FTP
sock.send(f"USER {username}\r\n".encode())
sock.recv(1024)
sock.send(f"PASS {password}\r\n".encode())
elif port == 22: # SSH
sock.send(f"{username}:{password}\n".encode())
sock.close()
time.sleep(random.uniform(0.1, 0.3))
except Exception as e:
print(f"[-] Error in brute force attempt: {e}")
def run_continuous_simulation(self, duration=300):
"""
Runs a continuous simulation for a specified duration
"""
print(f"\n[*] Starting continuous simulation for {duration} seconds")
print(f"[*] Intensity level: {self.intensity}")
end_time = time.time() + duration
with ThreadPoolExecutor(
max_workers=self.intensity_settings[self.intensity]["max_threads"]
) as executor:
while time.time() < end_time:
# Mix of different attack patterns
simulation_choices = [
lambda: self.simulate_port_scan(),
lambda: self.simulate_brute_force(21),
lambda: self.simulate_brute_force(22),
lambda: self.simulate_connection(80)
]
# Randomly choose and execute an attack pattern
executor.submit(random.choice(simulation_choices))
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
def main():
"""
Main function to run the honeypot simulator with command-line arguments
"""
parser = argparse.ArgumentParser(description="Honeypot Attack Simulator")
parser.add_argument("--target", default="127.0.0.1", help="Target IP address")
parser.add_argument(
"--intensity",
choices=["low", "medium", "high"],
default="medium",
help="Simulation intensity level"
)
parser.add_argument(
"--duration",
type=int,
default=300,
help="Simulation duration in seconds"
)
args = parser.parse_args()
simulator = HoneypotSimulator(args.target, args.intensity)
try:
simulator.run_continuous_simulation(args.duration)
except KeyboardInterrupt:
print("\n[*] Simulation interrupted by user")
except Exception as e:
print(f"[-] Simulation error: {e}")
finally:
print("\n[*] Simulation complete")
if __name__ == "__main__":
main()
Tenemos muchas cosas que hacer en este guión de simulación, así que analicémoslas una por una. También agregué comentarios para cada función y operación para que esto sea un poco más legible en el código.
Primero tenemos nuestra clase de utilidad llamada honeypotsimulator
. En esta clase, tenemos la función __ init __
que establece la configuración básica para nuestro simulador. Se necesitan dos parámetros: una dirección IP de destino (predeterminada a localhost) y un nivel de intensidad (por defecto a "medio").
También definimos tres componentes importantes: los puertos de destino para sondear (servicios comunes como FTP, SSH, HTTP), patrones de ataque específicos para cada servicio (como intentos de inicio de sesión y comandos) y configuraciones de intensidad que controlan cuán agresiva será nuestra simulación a través de hilo. conteos y retrasos en el tiempo.
La función simulate_connection
maneja intentos de conexión individuales a un puerto específico. Crea una conexión de socket, intenta obtener cualquier banner de servicio (como información de la versión SSH) y luego envía los comandos de ataque apropiados según el tipo de servicio. Agregamos manejo de errores para problemas comunes de red y también agregamos retrasos realistas entre comandos para imitar la interacción humana.
Nuestra función simulate_port_scan
actúa como una herramienta de reconocimiento, que cambiará sistemáticamente cada puerto en nuestra lista de destino. Es similar a cómo las herramientas como nmap
funcionan, pasando por los puertos uno por uno para ver qué servicios están disponibles. Para cada puerto, llama a la función simulate_connection
y agrega pequeños retrasos aleatorios para que el patrón de escaneo se vea más natural.
La función simulate_brute_force
mantiene listas de nombres de usuario y contraseñas comunes, intentando diferentes combinaciones contra servicios como FTP y SSH. Para cada intento, crea una nueva conexión, envía las credenciales de inicio de sesión en el formato correcto para ese servicio y luego cierra la conexión. Esto nos ayuda a probar qué tan bien el honeypot detecta y registra ataques de relleno de credenciales.
La función run_continuous_simulation
se ejecuta para una duración especificada, eligiendo aleatoriamente entre diferentes tipos de ataque como escaneo de puertos, fuerza bruta o ataques de servicio específicos. Utiliza el
Finalmente, tenemos la función main
que proporciona la interfaz de línea de comandos para el simulador. Utiliza argparse
para manejar argumentos de línea de comandos, lo que permite a los usuarios especificar la IP de destino, el nivel de intensidad y la duración de la simulación. Crea una instancia de la clase HoneypotSimulator
y gestiona la ejecución general, incluido el manejo adecuado de las interrupciones y errores del usuario.
Después de poner el código del simulador en un script separado, ejecutarlo con el siguiente comando:
# Run with default settings (medium intensity, localhost, 5 minutes)
python honeypot_simulator.py
# Run with custom settings
python honeypot_simulator.py --target 192.168.1.100 --intensity high --duration 600
Dado que estamos ejecutando el honeypot y el simulador en la misma máquina localmente, el objetivo será localhost
. Pero puede ser otra cosa en un escenario real o si está ejecutando el honeypot en una VM o en una máquina diferente, así que asegúrese de confirmar la IP antes de ejecutar el simulador.
Cómo analizar los datos de honeypot
Vamos a escribir rápidamente una función auxiliar que nos permitirá analizar todos los datos recopilados por el honeypot. Como hemos almacenado esto en un archivo de registro JSON, podemos analizarlo convenientemente utilizando el paquete JSON incorporado.
import datetime
import json
def analyze_logs(log_file):
"""Enhanced honeypot log analysis with temporal and behavioral patterns"""
ip_analysis = {}
port_analysis = {}
hourly_attacks = {}
data_patterns = {}
# Track session patterns
ip_sessions = {}
attack_timeline = []
with open(log_file, 'r') as f:
for line in f:
try:
activity = json.loads(line)
timestamp = datetime.datetime.fromisoformat(activity['timestamp'])
ip = activity['remote_ip']
port = activity['port']
data = activity['data']
# Initialize IP tracking if new
if ip not in ip_analysis:
ip_analysis[ip] = {
'total_attempts': 0,
'first_seen': timestamp,
'last_seen': timestamp,
'targeted_ports': set(),
'unique_payloads': set(),
'session_count': 0
}
# Update IP statistics
ip_analysis[ip]['total_attempts'] += 1
ip_analysis[ip]['last_seen'] = timestamp
ip_analysis[ip]['targeted_ports'].add(port)
ip_analysis[ip]['unique_payloads'].add(data.strip())
# Track hourly patterns
hour = timestamp.hour
hourly_attacks[hour] = hourly_attacks.get(hour, 0) + 1
# Analyze port targeting patterns
if port not in port_analysis:
port_analysis[port] = {
'total_attempts': 0,
'unique_ips': set(),
'unique_payloads': set()
}
port_analysis[port]['total_attempts'] += 1
port_analysis[port]['unique_ips'].add(ip)
port_analysis[port]['unique_payloads'].add(data.strip())
# Track payload patterns
if data.strip():
data_patterns[data.strip()] = data_patterns.get(data.strip(), 0) + 1
# Track attack timeline
attack_timeline.append({
'timestamp': timestamp,
'ip': ip,
'port': port
})
except (json.JSONDecodeError, KeyError) as e:
continue
# Analysis Report Generation
print("\n=== Honeypot Analysis Report ===")
# 1. IP-based Analysis
print("\nTop 10 Most Active IPs:")
sorted_ips = sorted(ip_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)[:10]
for ip, stats in sorted_ips:
duration = stats['last_seen'] - stats['first_seen']
print(f"\nIP: {ip}")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Active Duration: {duration}")
print(f"Unique Ports Targeted: {len(stats['targeted_ports'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 2. Port Analysis
print("\nPort Targeting Analysis:")
sorted_ports = sorted(port_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)
for port, stats in sorted_ports:
print(f"\nPort {port}:")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Unique Attackers: {len(stats['unique_ips'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 3. Temporal Analysis
print("\nHourly Attack Distribution:")
for hour in sorted(hourly_attacks.keys()):
print(f"Hour {hour:02d}: {hourly_attacks[hour]} attempts")
# 4. Attack Sophistication Analysis
print("\nAttacker Sophistication Analysis:")
for ip, stats in sorted_ips:
sophistication_score = (
len(stats['targeted_ports']) * 0.4 + # Port diversity
len(stats['unique_payloads']) * 0.6 # Payload diversity
)
print(f"IP {ip}: Sophistication Score {sophistication_score:.2f}")
# 5. Common Payload Patterns
print("\nTop 10 Most Common Payloads:")
sorted_payloads = sorted(data_patterns.items(),
key=lambda x: x[1],
reverse=True)[:10]
for payload, count in sorted_payloads:
if len(payload) > 50: # Truncate long payloads
payload = payload[:50] + "..."
print(f"Count {count}: {payload}")
Puede colocar esto en un archivo de script separado y llamar a la función en los registros JSON. Esta función nos proporcionará información completa del archivo JSON basada en los datos recopilados.
Nuestro análisis comienza agrupando los datos en varias categorías, como estadísticas basadas en IP, patrones de orientación de puertos, distribuciones de ataques por hora y características de carga útil. Para cada IP, realizamos un seguimiento del total de intentos, la primera y la última vez que se vio, los puertos de destino y las cargas útiles únicas. Esto nos ayudará a crear perfiles únicos para los atacantes.
También examinamos aquí los patrones de ataque basados en puertos que monitorean los puertos atacados con mayor frecuencia y cuántos atacantes únicos. También realizamos un análisis de sofisticación de los ataques que nos ayuda a identificar a los atacantes específicos, considerando factores como los puertos objetivo y las cargas útiles únicas utilizadas. Este análisis se utiliza para separar actividades de escaneo simples y ataques sofisticados.
El análisis temporal nos ayuda a identificar patrones en los intentos de ataque cada hora, lo que revela patrones en el momento del ataque y posibles campañas de orientación automatizadas. Finalmente, publicamos cargas útiles que se ven comúnmente para identificar cadenas o comandos de ataque que se ven comúnmente.
Consideraciones de seguridad
Al implementar este honeypot, asegúrese de considerar las siguientes medidas de seguridad:
Ejecute su honeypot en un entorno aislado. Normalmente, dentro de una máquina virtual o en su máquina local que está detrás de una NAT y un firewall.
Ejecute el honeypot con privilegios mínimos del sistema (generalmente no como raíz) para reducir el riesgo si se compromete.
Tenga cuidado con los datos recopilados si planea implementarlos como un honeypot de grado de producción o investigar el honeypot, ya que puede contener información de malware o información confidencial.
Implemente mecanismos de monitoreo sólidos para detectar intentos de salir del entorno de honeypot.
Conclusión
Con esto hemos construido nuestro honeypot, escrito un simulador para simular ataques para nuestro honeypot y analizamos los datos de nuestros registros de honeypot para hacer algunas inferencias simples. Es una excelente manera de comprender los conceptos de seguridad ofensivos y defensivos. Puede considerar construir esto para crear sistemas de detección más complejos y pensar en agregar características como:
Emulación de servicios dinámicos basada en el comportamiento de ataque
Integración con sistemas de inteligencia de amenazas que realizarán un mejor análisis de inferencia de estos registros de honeypot recopilados
Recopile registros incluso integrales más allá de los datos de IP, puerto y red a través de mecanismos avanzados de registro
Agregar capacidades de aprendizaje automático para detectar patrones de ataque
Recuerde que, aunque los honeypots son poderosas herramientas de seguridad, deben ser parte de una estrategia de seguridad defensiva integral, no la única línea de defensa.
¡Espero que hayas aprendido cómo funcionan los honeypots, cuál es su propósito, así como un poco de programación de Python también!