Como construir um Honeypot em Python: um guia prático para enganar a segurança
Na segurança cibernética, um honeypot é um sistema de chamariz projetado para atrair e, em seguida, detectar possíveis atacantes que tentam comprometer o sistema. Assim como uma panela de mel sentada ao ar livre atrairia moscas.
Pense nesses honeypots como câmeras de segurança para o seu sistema. Assim como uma câmera de segurança nos ajuda a entender quem está tentando entrar em um prédio e como eles estão fazendo isso, esses honeypots o ajudarão a entender quem está tentando atacar seu sistema e quais técnicas estão usando.
Ao final deste tutorial, você será capaz de escrever um honeypot de demonstração em Python e entender como funcionam os honeypots.
Índice
Compreendendo os tipos de honeypots
-
Como configurar seu ambiente de desenvolvimento
Como construir o Honeypot principal
Implementar os ouvintes de rede
Execute o Honeypot
Escreva o simulador de ataque do honeypot
Como analisar dados do Honeypot
Considerações de segurança
Conclusão
Compreendendo os tipos de honeypots
Antes de começarmos a projetar nosso próprio honeypot, vamos entender rapidamente seus diferentes tipos:
Honeypots de produção: esses tipos de honeypots são colocados em um ambiente de produção real e usados para detectar ataques de segurança reais. Eles normalmente têm design simples, são fáceis de manter e implantar e oferecem interação limitada para reduzir riscos.
-
Pesquisa HoneyPots: esses são sistemas mais complexos criados por pesquisadores de segurança para estudar padrões de ataque, realizar análises empíricas sobre esses padrões, coletar amostras de malware e entender novas técnicas de ataque que não são descobertas anteriormente. Eles geralmente imitam sistemas ou redes operacionais inteiros, em vez de se comportar como um aplicativo no ambiente de produção.
Para este tutorial, criaremos um honeypot de interação média que registra tentativas de conexão e comportamento básico do invasor.
Como configurar seu ambiente de desenvolvimento
Vamos começar configurando seu ambiente de desenvolvimento em Python. Execute os seguintes comandos:
import socket
import sys
import datetime
import json
import threading
from pathlib import Path
# Configure logging directory
LOG_DIR = Path("honeypot_logs")
LOG_DIR.mkdir(exist_ok=True)
Estaremos nos atendo às bibliotecas integradas, portanto não precisaremos instalar nenhuma dependência externa. Armazenaremos nossos logs no diretório honeypot_logs
.
Como construir o honeypot principal
Nosso honeypot básico será composto por três componentes:
Um ouvinte de rede que aceita conexões
Um sistema de registro para registrar atividades
Um serviço básico de emulação para interagir com os atacantes
Agora vamos começar inicializando a classe principal do Honeypot:
class Honeypot:
def __init__(self, bind_ip="0.0.0.0", ports=None):
self.bind_ip = bind_ip
self.ports = ports or [21, 22, 80, 443] # Default ports to monitor
self.active_connections = {}
self.log_file = LOG_DIR / f"honeypot_{datetime.datetime.now().strftime('%Y%m%d')}.json"
def log_activity(self, port, remote_ip, data):
"""Log suspicious activity with timestamp and details"""
activity = {
"timestamp": datetime.datetime.now().isoformat(),
"remote_ip": remote_ip,
"port": port,
"data": data.decode('utf-8', errors='ignore')
}
with open(self.log_file, 'a') as f:
json.dump(activity, f)
f.write('\n')
def handle_connection(self, client_socket, remote_ip, port):
"""Handle individual connections and emulate services"""
service_banners = {
21: "220 FTP server ready\r\n",
22: "SSH-2.0-OpenSSH_8.2p1 Ubuntu-4ubuntu0.1\r\n",
80: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n",
443: "HTTP/1.1 200 OK\r\nServer: Apache/2.4.41 (Ubuntu)\r\n\r\n"
}
try:
# Send appropriate banner for the service
if port in service_banners:
client_socket.send(service_banners[port].encode())
# Receive data from attacker
while True:
data = client_socket.recv(1024)
if not data:
break
self.log_activity(port, remote_ip, data)
# Send fake response
client_socket.send(b"Command not recognized.\r\n")
except Exception as e:
print(f"Error handling connection: {e}")
finally:
client_socket.close()
Esta classe contém muitas informações importantes, então vamos examinar cada função uma por uma.
A função __ init __
registra os números de IP e porta nos quais hospedaremos o Honeypot, bem como o nome/nome do arquivo do arquivo de log. Também manteremos um registro do número total de conexões ativas que temos no Honeypot.
A função log_activity
receberá as informações sobre o IP, os dados e a porta à qual o IP tentou uma conexão. Em seguida, anexaremos essas informações ao nosso arquivo de log formatado JSON.
A função handle_connection
irá imitar esses serviços que estarão rodando nas diferentes portas que temos. Teremos o honeypot rodando nas portas 21, 22, 80 e 443. Esses serviços são para FTP, SSH, HTTP e protocolo HTTPS, respectivamente. Portanto, qualquer invasor que tente interagir com o honeypot deve esperar esses serviços nessas portas.
Para imitar o comportamento desses serviços, usaremos os banners de serviço que eles usam na realidade. Esta função enviará primeiro o banner apropriado quando o invasor se conectar e, em seguida, receberá os dados e os registrará. O honeypot também enviará uma resposta falsa “Comando não reconhecido” de volta ao invasor.
Implementar os ouvintes da rede
Agora, vamos implementar os ouvintes de rede que lidarão com as conexões recebidas. Para isso, usaremos a programação simples de soquete. Se você não conhece como funciona a programação do soquete, confira este artigo que explica alguns conceitos relacionados a ele.
def start_listener(self, port):
"""Start a listener on specified port"""
try:
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.bind_ip, port))
server.listen(5)
print(f"[*] Listening on {self.bind_ip}:{port}")
while True:
client, addr = server.accept()
print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")
# Handle connection in separate thread
client_handler = threading.Thread(
target=self.handle_connection,
args=(client, addr[0], port)
)
client_handler.start()
except Exception as e:
print(f"Error starting listener on port {port}: {e}")
A função start_listener
iniciará o servidor e ouvirá na porta fornecida. O bind_ip
para nós será 0.0.0.0
, o que indica que o servidor estará ouvindo em todas as interfaces de rede.
Agora, cuidaremos de cada nova conexão em um encadeamento separado, pois pode haver casos em que vários invasores tentam interagir com o honeypot ou um script ou ferramenta de ataque está digitalizando o honeypot. Se você não estiver ciente de como o threading funciona, pode conferir este artigo que explica a rosqueamento e a simultaneidade no Python.
Além disso, certifique -se de colocar essa função na classe Core honeypot
.
Execute o Honeypot
Agora vamos criar a função main
que iniciará nosso honeypot.
def main():
honeypot = Honeypot()
# Start listeners for each port in separate threads
for port in honeypot.ports:
listener_thread = threading.Thread(
target=honeypot.start_listener,
args=(port,)
)
listener_thread.daemon = True
listener_thread.start()
try:
# Keep main thread alive
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n[*] Shutting down honeypot...")
sys.exit(0)
if __name__ == "__main__":
main()
Esta função instancia a classe Honeypot
e inicia os ouvintes para cada uma de nossas portas definidas (21,22,80,443) como um thread separado. Agora, manteremos vivo nosso thread principal que está executando nosso programa real, colocando-o em um loop infinito. Junte tudo isso em um script e execute-o.
Escreva o Simulador de Ataque Honeypot
Agora vamos tentar simular alguns cenários de ataque e direcionar nosso honeypot para que possamos coletar alguns dados em nosso arquivo de log JSON.
Este simulador nos ajudará a demonstrar alguns aspectos importantes sobre HoneyPots:
Padrões de ataque realistas: o simulador simulará padrões de ataque comuns, como varredura de portas, tentativas de força bruta e explorações específicas de serviços.
Intensidade variável: o simulador ajustará a intensidade da simulação para testar como o honeypot lida com cargas diferentes.
Vários tipos de ataque: ele demonstrará diferentes tipos de ataques que os invasores reais podem tentar, ajudando você a entender como seu honeypot responde a cada um.
Conexões simultâneas: o simulador usará threading para testar como seu honeypot lida com múltiplas conexões simultâneas.
# honeypot_simulator.py
import socket
import time
import random
import threading
from concurrent.futures import ThreadPoolExecutor
import argparse
class HoneypotSimulator:
"""
A class to simulate different types of connections and attacks against our honeypot.
This helps in testing the honeypot's logging and response capabilities.
"""
def __init__(self, target_ip="127.0.0.1", intensity="medium"):
# Configuration for the simulator
self.target_ip = target_ip
self.intensity = intensity
# Common ports that attackers often probe
self.target_ports = [21, 22, 23, 25, 80, 443, 3306, 5432]
# Dictionary of common commands used by attackers for different services
self.attack_patterns = {
21: [ # FTP commands
"USER admin\r\n",
"PASS admin123\r\n",
"LIST\r\n",
"STOR malware.exe\r\n"
],
22: [ # SSH attempts
"SSH-2.0-OpenSSH_7.9\r\n",
"admin:password123\n",
"root:toor\n"
],
80: [ # HTTP requests
"GET / HTTP/1.1\r\nHost: localhost\r\n\r\n",
"POST /admin HTTP/1.1\r\nHost: localhost\r\nContent-Length: 0\r\n\r\n",
"GET /wp-admin HTTP/1.1\r\nHost: localhost\r\n\r\n"
]
}
# Intensity settings affect the frequency and volume of simulated attacks
self.intensity_settings = {
"low": {"max_threads": 2, "delay_range": (1, 3)},
"medium": {"max_threads": 5, "delay_range": (0.5, 1.5)},
"high": {"max_threads": 10, "delay_range": (0.1, 0.5)}
}
def simulate_connection(self, port):
"""
Simulates a connection attempt to a specific port with realistic attack patterns
"""
try:
# Create a new socket connection
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
print(f"[*] Attempting connection to {self.target_ip}:{port}")
sock.connect((self.target_ip, port))
# Get banner if any
banner = sock.recv(1024)
print(f"[+] Received banner from port {port}: {banner.decode('utf-8', 'ignore').strip()}")
# Send attack patterns based on the port
if port in self.attack_patterns:
for command in self.attack_patterns[port]:
print(f"[*] Sending command to port {port}: {command.strip()}")
sock.send(command.encode())
# Wait for response
try:
response = sock.recv(1024)
print(f"[+] Received response: {response.decode('utf-8', 'ignore').strip()}")
except socket.timeout:
print(f"[-] No response received from port {port}")
# Add realistic delay between commands
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
sock.close()
except ConnectionRefusedError:
print(f"[-] Connection refused on port {port}")
except socket.timeout:
print(f"[-] Connection timeout on port {port}")
except Exception as e:
print(f"[-] Error connecting to port {port}: {e}")
def simulate_port_scan(self):
"""
Simulates a basic port scan across common ports
"""
print(f"\n[*] Starting port scan simulation against {self.target_ip}")
for port in self.target_ports:
self.simulate_connection(port)
time.sleep(random.uniform(0.1, 0.3))
def simulate_brute_force(self, port):
"""
Simulates a brute force attack against a specific service
"""
common_usernames = ["admin", "root", "user", "test"]
common_passwords = ["password123", "admin123", "123456", "root"]
print(f"\n[*] Starting brute force simulation against port {port}")
for username in common_usernames:
for password in common_passwords:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
sock.connect((self.target_ip, port))
if port == 21: # FTP
sock.send(f"USER {username}\r\n".encode())
sock.recv(1024)
sock.send(f"PASS {password}\r\n".encode())
elif port == 22: # SSH
sock.send(f"{username}:{password}\n".encode())
sock.close()
time.sleep(random.uniform(0.1, 0.3))
except Exception as e:
print(f"[-] Error in brute force attempt: {e}")
def run_continuous_simulation(self, duration=300):
"""
Runs a continuous simulation for a specified duration
"""
print(f"\n[*] Starting continuous simulation for {duration} seconds")
print(f"[*] Intensity level: {self.intensity}")
end_time = time.time() + duration
with ThreadPoolExecutor(
max_workers=self.intensity_settings[self.intensity]["max_threads"]
) as executor:
while time.time() < end_time:
# Mix of different attack patterns
simulation_choices = [
lambda: self.simulate_port_scan(),
lambda: self.simulate_brute_force(21),
lambda: self.simulate_brute_force(22),
lambda: self.simulate_connection(80)
]
# Randomly choose and execute an attack pattern
executor.submit(random.choice(simulation_choices))
time.sleep(random.uniform(*self.intensity_settings[self.intensity]["delay_range"]))
def main():
"""
Main function to run the honeypot simulator with command-line arguments
"""
parser = argparse.ArgumentParser(description="Honeypot Attack Simulator")
parser.add_argument("--target", default="127.0.0.1", help="Target IP address")
parser.add_argument(
"--intensity",
choices=["low", "medium", "high"],
default="medium",
help="Simulation intensity level"
)
parser.add_argument(
"--duration",
type=int,
default=300,
help="Simulation duration in seconds"
)
args = parser.parse_args()
simulator = HoneypotSimulator(args.target, args.intensity)
try:
simulator.run_continuous_simulation(args.duration)
except KeyboardInterrupt:
print("\n[*] Simulation interrupted by user")
except Exception as e:
print(f"[-] Simulation error: {e}")
finally:
print("\n[*] Simulation complete")
if __name__ == "__main__":
main()
Temos muita coisa acontecendo neste script de simulação, então vamos dividi -lo um por um. Também adicionei comentários para todas as funções e operação para tornar isso um pouco mais legível no código.
Primeiro temos nossa classe de utilitário chamada HoneypotSimulator
. Nesta classe temos a função __init__
que define a configuração básica do nosso simulador. São necessários dois parâmetros: um endereço IP de destino (padrão para localhost) e um nível de intensidade (padrão para "médio").
Também definimos três componentes importantes: as portas de destino para investigar (serviços comuns como FTP, SSH, HTTP), padrões de ataque específicos para cada serviço (como tentativas e comandos de login) e configurações de intensidade que controlam como nossa simulação será agressiva através do Thread Thread contagens e atrasos no tempo.
A função simulate_connection
lida com tentativas de conexão individuais a uma porta específica. Ele cria uma conexão de soquete, tenta obter qualquer banner de serviço (como informações da versão SSH) e envia comandos de ataque apropriados com base no tipo de serviço. Adicionamos o tratamento de erros para problemas comuns de rede e também adicionamos atrasos realistas entre os comandos para imitar a interação humana.
Nossa função simulate_port_scan
atua como uma ferramenta de reconhecimento, que irá verificar sistematicamente cada porta em nossa lista de alvos. É semelhante ao modo como ferramentas como nmap
funcionam – passando pelas portas uma por uma para ver quais serviços estão disponíveis. Para cada porta, ele chama a função simulate_connection
e adiciona pequenos atrasos aleatórios para fazer o padrão de varredura parecer mais natural.
A função simulate_brute_force
mantém listas de nomes de usuário e senhas comuns, tentando combinações diferentes contra serviços como FTP e SSH. Para cada tentativa, cria uma nova conexão, envia as credenciais de login no formato correto para esse serviço e fecha a conexão. Isso nos ajuda a testar o quão bem o honeypot detecta e registra ataques de enchimento de credenciais.
A função run_continuous_simulation
é executada por um período especificado, escolhendo aleatoriamente entre diferentes tipos de ataque, como varredura de porta, força bruta ou ataques de serviço específicos. Ele usa o ThreadPoolExecutor
do Python para executar vários ataques simultaneamente com base no nível de intensidade especificado.
Por fim, temos a função principal
que fornece a interface da linha de comando para o simulador. Ele usa argparse
para lidar com argumentos da linha de comando, permitindo que os usuários especifiquem o IP de destino, o nível de intensidade e a duração da simulação. Ele cria uma instância da classe honeypotsimulator
e gerencia a execução geral, incluindo o manuseio adequado das interrupções e erros do usuário.
Depois de colocar o código do simulador em um script separado, execute -o com o seguinte comando:
# Run with default settings (medium intensity, localhost, 5 minutes)
python honeypot_simulator.py
# Run with custom settings
python honeypot_simulator.py --target 192.168.1.100 --intensity high --duration 600
Como estamos executando o honeypot e também o simulador localmente na mesma máquina, o destino será localhost
. Mas pode ser outra coisa em um cenário real ou se você estiver executando o honeypot em uma VM ou em uma máquina diferente – portanto, certifique-se de confirmar o IP antes de executar o simulador.
Como analisar dados do Honeypot
Vamos escrever rapidamente uma função auxiliar que nos permitirá analisar todos os dados coletados pelo Honeypot. Como armazenamos isso em um arquivo de log JSON, podemos analisá-lo convenientemente usando o pacote JSON integrado.
import datetime
import json
def analyze_logs(log_file):
"""Enhanced honeypot log analysis with temporal and behavioral patterns"""
ip_analysis = {}
port_analysis = {}
hourly_attacks = {}
data_patterns = {}
# Track session patterns
ip_sessions = {}
attack_timeline = []
with open(log_file, 'r') as f:
for line in f:
try:
activity = json.loads(line)
timestamp = datetime.datetime.fromisoformat(activity['timestamp'])
ip = activity['remote_ip']
port = activity['port']
data = activity['data']
# Initialize IP tracking if new
if ip not in ip_analysis:
ip_analysis[ip] = {
'total_attempts': 0,
'first_seen': timestamp,
'last_seen': timestamp,
'targeted_ports': set(),
'unique_payloads': set(),
'session_count': 0
}
# Update IP statistics
ip_analysis[ip]['total_attempts'] += 1
ip_analysis[ip]['last_seen'] = timestamp
ip_analysis[ip]['targeted_ports'].add(port)
ip_analysis[ip]['unique_payloads'].add(data.strip())
# Track hourly patterns
hour = timestamp.hour
hourly_attacks[hour] = hourly_attacks.get(hour, 0) + 1
# Analyze port targeting patterns
if port not in port_analysis:
port_analysis[port] = {
'total_attempts': 0,
'unique_ips': set(),
'unique_payloads': set()
}
port_analysis[port]['total_attempts'] += 1
port_analysis[port]['unique_ips'].add(ip)
port_analysis[port]['unique_payloads'].add(data.strip())
# Track payload patterns
if data.strip():
data_patterns[data.strip()] = data_patterns.get(data.strip(), 0) + 1
# Track attack timeline
attack_timeline.append({
'timestamp': timestamp,
'ip': ip,
'port': port
})
except (json.JSONDecodeError, KeyError) as e:
continue
# Analysis Report Generation
print("\n=== Honeypot Analysis Report ===")
# 1. IP-based Analysis
print("\nTop 10 Most Active IPs:")
sorted_ips = sorted(ip_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)[:10]
for ip, stats in sorted_ips:
duration = stats['last_seen'] - stats['first_seen']
print(f"\nIP: {ip}")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Active Duration: {duration}")
print(f"Unique Ports Targeted: {len(stats['targeted_ports'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 2. Port Analysis
print("\nPort Targeting Analysis:")
sorted_ports = sorted(port_analysis.items(),
key=lambda x: x[1]['total_attempts'],
reverse=True)
for port, stats in sorted_ports:
print(f"\nPort {port}:")
print(f"Total Attempts: {stats['total_attempts']}")
print(f"Unique Attackers: {len(stats['unique_ips'])}")
print(f"Unique Payloads: {len(stats['unique_payloads'])}")
# 3. Temporal Analysis
print("\nHourly Attack Distribution:")
for hour in sorted(hourly_attacks.keys()):
print(f"Hour {hour:02d}: {hourly_attacks[hour]} attempts")
# 4. Attack Sophistication Analysis
print("\nAttacker Sophistication Analysis:")
for ip, stats in sorted_ips:
sophistication_score = (
len(stats['targeted_ports']) * 0.4 + # Port diversity
len(stats['unique_payloads']) * 0.6 # Payload diversity
)
print(f"IP {ip}: Sophistication Score {sophistication_score:.2f}")
# 5. Common Payload Patterns
print("\nTop 10 Most Common Payloads:")
sorted_payloads = sorted(data_patterns.items(),
key=lambda x: x[1],
reverse=True)[:10]
for payload, count in sorted_payloads:
if len(payload) > 50: # Truncate long payloads
payload = payload[:50] + "..."
print(f"Count {count}: {payload}")
Você pode colocar isso em um arquivo de script separado e chamar a função nos logs JSON. Esta função nos fornecerá insights abrangentes do arquivo JSON com base nos dados coletados.
Nossa análise começa agrupando os dados em várias categorias, como estatísticas baseadas em IP, padrões de segmentação por porta, distribuições de ataques por hora e características de carga útil. Para cada IP, estamos rastreando tentativas totais, primeiro e último tempos vistos, portas direcionadas e cargas úteis exclusivas. Isso nos ajudará a criar perfis exclusivos para os atacantes.
Também examinamos aqui os padrões de ataque baseados em portas que monitoram as portas visadas com mais frequência e por quantos invasores únicos. Também realizamos uma análise de sofisticação de ataque que nos ajuda a identificar invasores direcionados, considerando fatores como portas direcionadas e cargas úteis exclusivas usadas. Esta análise é usada para separar atividades simples de varredura e ataques sofisticados.
A análise temporal nos ajuda a identificar padrões em tentativas de ataque de hora em hora, revelando padrões no tempo de ataque e possíveis campanhas de segmentação automatizadas. Por fim, publicamos cargas comumente vistas para identificar strings ou comandos de ataque comumente vistos.
Considerações de segurança
Ao implantar este honeypot, considere as seguintes medidas de segurança:
Execute seu honeypot em um ambiente isolado. Normalmente dentro de uma VM ou em sua máquina local protegida por NAT e firewall.
Execute o honeypot com privilégios mínimos de sistema (normalmente não como root) para reduzir o risco se estiver comprometido.
Seja cauteloso com os dados coletados, se você planeja implantá-los como um honeypot de grau de produção ou pesquisa, pois pode conter malware ou informações confidenciais.
Implemente mecanismos robustos de monitoramento para detectar tentativas de sair do ambiente honeypot.
Conclusão
Com isso construímos nosso honeypot, escrevemos um simulador para simular ataques ao nosso honeypot e analisamos os dados de nossos logs de honeypot para fazer algumas inferências simples. É uma excelente maneira de compreender conceitos de segurança tanto ofensivos quanto defensivos. Você pode considerar desenvolver isso para criar sistemas de detecção mais complexos e pensar em adicionar recursos como:
Emulação de serviço dinâmico com base no comportamento do ataque
Integração com sistemas de inteligência de ameaças que realizarão uma melhor análise de inferência desses logs de honeypot coletados
Reúna registros ainda mais abrangentes além dos dados de IP, porta e rede por meio de mecanismos avançados de registro
Adicione recursos de aprendizado de máquina para detectar padrões de ataque
Lembre -se de que, embora os honeypots sejam poderosos ferramentas de segurança, eles devem fazer parte de uma estratégia de segurança defensiva abrangente, não a única linha de defesa.
Espero que você tenha aprendido sobre como os honeypots funcionam, qual é o seu propósito e também um pouco de programação do Python!