Pesquisa de site

Como construir um painel de tráfego de rede em tempo real com python e streamlit


Você já quis visualizar o tráfego de sua rede em tempo real? Neste tutorial, você aprenderá a criar um painel de análise de tráfego de rede interativo com o Python e o Streamlit . STRILHTLIT é uma estrutura Python de código aberto que você pode usar para desenvolver aplicativos da Web para análise e processamento de dados.

No final deste tutorial, você saberá como capturar pacotes de rede bruta da NIC (cartão de interface de rede) do seu computador, processar os dados e criar belas visualizações que serão atualizadas em tempo real.

Índice

  • Por que a análise de tráfego de rede é importante?

  • Pré-requisitos

  • Como configurar seu projeto

  • Como construir as funcionalidades principais

  • Como criar as visualizações simplitadas

  • Como capturar os pacotes de rede

  • Juntando tudo

  • Melhorias Futuras

  • Conclusão

Por que a análise de tráfego de rede é importante?

A análise de tráfego de rede é um requisito crítico nas empresas em que as redes formam a espinha dorsal de quase todos os aplicativos e serviços. No centro, temos uma análise de pacotes de rede que envolvem monitorar a rede, capturar todo o tráfego (entrada e saída) e interpretar esses pacotes enquanto eles fluem através de uma rede. Você pode usar essa técnica para identificar padrões de segurança, detectar anomalias e garantir a segurança e a eficiência da rede.

Este projeto de prova de conceito no qual trabalharemos neste tutorial é particularmente útil, pois ajuda a visualizar e analisar a atividade da rede em tempo real. E isso permitirá que você entenda como a solução de problemas, otimizações de desempenho e análise de segurança são feitas em sistemas corporativos.

Pré -requisitos

  • Python 3.8 ou uma versão mais recente instalada em seu sistema.

  • Uma compreensão básica dos conceitos de redes de computadores.

  • Familiaridade com a linguagem de programação Python e suas bibliotecas amplamente usadas.

  • Conhecimento básico de técnicas de visualização de dados e bibliotecas.

Como configurar seu projeto

Para começar, crie a estrutura do projeto e instale as ferramentas necessárias com Pip com os seguintes comandos:

mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly

Estaremos usando Streamlit para as visualizações do painel, Pandas para o processamento de dados, Scapy para captura de pacotes de rede e processamento de pacotes e, finalmente, Plotly para traçar gráficos com nossos dados coletados.

Como construir as funcionalidades principais

Colocaremos todo o código em um único arquivo chamado Dashboard.py . Em primeiro lugar, vamos começar importando todos os elementos que usaremos:

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket

Agora, vamos configurar o registro configurando uma configuração básica de registro. Isso será usado para rastrear eventos e executar nosso aplicativo no modo de depuração. Atualmente, definimos o nível de log como info , o que significa que os eventos com nível info ou superior serão exibidos. Se você não estiver familiarizado com o login no Python, recomendo conferir esta peça de documentação que fica em profundidade.

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

Em seguida, criaremos nosso processador de pacotes. Implementaremos a funcionalidade de processar nossos pacotes capturados nesta classe.

class PacketProcessor:
    """Process and analyze network packets"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = []
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Convert protocol number to name"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Process a single packet and extract relevant information"""
        try:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'source': packet[IP].src,
                        'destination': packet[IP].dst,
                        'protocol': self.get_protocol_name(packet[IP].proto),
                        'size': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    # Add TCP-specific information
                    if TCP in packet:
                        packet_info.update({
                            'src_port': packet[TCP].sport,
                            'dst_port': packet[TCP].dport,
                            'tcp_flags': packet[TCP].flags
                        })

                    # Add UDP-specific information
                    elif UDP in packet:
                        packet_info.update({
                            'src_port': packet[UDP].sport,
                            'dst_port': packet[UDP].dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    # Keep only last 10000 packets to prevent memory issues
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        except Exception as e:
            logger.error(f"Error processing packet: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Convert packet data to pandas DataFrame"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

Esta classe construirá nossa funcionalidade principal e possui diversas funções utilitárias que serão usadas para processar os pacotes.

Os pacotes de rede são categorizados em dois no nível de transporte (TCP e UDP) e no protocolo ICMP no nível da rede. Se você não está familiarizado com os conceitos de TCP/IP, recomendo verificar este artigo no freeCodeCamp News.

Nosso construtor manterá o controle de todos os pacotes vistos que estão categorizados nesses buckets do tipo de protocolo TCP/IP que definimos. Também anotaremos o tempo de captura do pacote, os dados capturados e o número de pacotes capturados.

Também aproveitaremos um bloqueio de encadeamento para garantir que apenas um pacote seja processado em um único horário. Isso pode ser estendido ainda mais para permitir que o projeto tenha processamento de pacotes paralelos.

A função Helper Get_Protocol_Name nos ajuda a obter o tipo correto do protocolo com base em seus números de protocolo. Para fornecer alguns antecedentes sobre isso, a Autoridade de Números Atribuídos à Internet (IANA) atribui números padronizados para identificar diferentes protocolos em um pacote de rede. Como e quando vemos esses números no pacote de rede analisado, saberemos que tipo de protocolo está sendo usado no pacote atualmente interceptado. Para o escopo deste projeto, estaremos mapeando apenas para TCP, UDP e ICMP (ping). Se encontrarmos algum outro tipo de pacote, a categorizaremos como Outros () .

A função process_packet lida com nossa funcionalidade principal que processará esses pacotes individuais. Se o pacote contiver uma camada IP, ele anotará os endereços IP de origem e destino, o tipo de protocolo, o tamanho do pacote e o tempo decorrido desde o início da captura do pacote.

Para pacotes com protocolos específicos da camada de transporte (como TCP e UDP), capturaremos as portas de origem e destino junto com sinalizadores TCP para pacotes TCP. Esses detalhes extraídos serão armazenados na memória na lista packet_data. Também acompanharemos o packet_count conforme e quando esses pacotes são processados.

A função get_dataframe nos ajuda a converter a lista packet_data em um data-frame Pandas que será então usado para nossa visualização.

Como criar as visualizações simplitadas

Agora é hora de construir nosso painel interativo de streamlit. Definiremos uma função chamada create_visualization no script Dashboard.py (fora da nossa classe de processamento de pacotes).

def create_visualizations(df: pd.DataFrame):
    """Create all dashboard visualizations"""
    if len(df) > 0:
        # Protocol distribution
        protocol_counts = df['protocol'].value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Protocol Distribution"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        # Packets timeline
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Packets per Second"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        # Top source IPs
        top_sources = df['source'].value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="Top Source IP Addresses"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

Esta função tomará o quadro de dados como entrada e nos ajudará a traçar três tabelas/gráficos:

  1. Gráfico de distribuição de protocolo: Este gráfico exibirá a proporção de diferentes protocolos (por exemplo, TCP, UDP, ICMP) no tráfego de pacotes capturados.

  2. Gráfico de linha do tempo de pacotes: este gráfico mostrará o número de pacotes processados por segundo durante um período de tempo.

  3. Gráfico de endereços IP de origem superior: Este gráfico destacará os 10 principais endereços IP que enviaram mais pacotes no tráfego capturado.

O gráfico de distribuição do protocolo é simplesmente um gráfico de pizza da contagem de protocolo para os três tipos diferentes (junto com outros). Utilizamos o STRILHTLIT e Plotly Ferramentas Python para plotar esses gráficos. Como também observamos o registro de data e hora desde o início da captura de pacotes, usaremos esses dados para plotar a tendência de pacotes capturados ao longo do tempo.

Para o segundo gráfico, faremos uma operação groupby nos dados e obteremos o número de pacotes capturados em cada segundo (S significa segundos) e, finalmente, traçar o gráfico.

Finalmente, para o terceiro gráfico, contaremos os IPs de origem distinta observados e o gráfico da pista de IP contam para mostrar os 10 principais IPs.

Como capturar os pacotes de rede

Agora, vamos construir a funcionalidade que nos permitirá capturar dados de pacotes de rede.

def start_packet_capture():
    """Start packet capture in a separate thread"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, store=False)

    capture_thread = threading.Thread(target=capture_packets, daemon=True)
    capture_thread.start()

    return processor

Esta é uma função simples que instancia a classe PacketProcessor e então usa a função sniff no módulo scapy para iniciar a captura dos pacotes.

Usamos a rosca aqui para nos permitir capturar pacotes independentemente do fluxo principal do programa. Isso garante que a operação de captura de pacotes não bloqueie outras operações, como atualizar o painel em tempo real. Também devolvemos a instância PacketProcessor criada para que ele possa ser usado em nosso programa principal.

Juntando tudo

Agora vamos juntar todas essas peças com nossa função main que atuará como a função de driver do nosso programa.

def main():
    """Main function to run the dashboard"""
    st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
    st.title("Real-time Network Traffic Analysis")

    # Initialize packet processor in session state
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    # Create dashboard layout
    col1, col2 = st.columns(2)

    # Get current data
    df = st.session_state.processor.get_dataframe()

    # Display metrics
    with col1:
        st.metric("Total Packets", len(df))
    with col2:
        duration = time.time() - st.session_state.start_time
        st.metric("Capture Duration", f"{duration:.2f}s")

    # Display visualizations
    create_visualizations(df)

    # Display recent packets
    st.subheader("Recent Packets")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
            use_container_width=True
        )

    # Add refresh button
    if st.button('Refresh Data'):
        st.rerun()

    # Auto refresh
    time.sleep(2)
    st.rerun()

Esta função também instanciará o painel STRILHLIT e integrará todos os nossos componentes juntos. Primeiro, definimos o título da página do nosso painel Streamlit e, em seguida, inicializamos nosso PacketProcessor . Utilizamos o estado da sessão em streamlit para garantir que apenas uma instância de captura de pacotes seja criada e o estado seja retido.

Agora, obteremos dinamicamente o quadro de dados do estado da sessão sempre que os dados forem processados e começaremos a exibir as métricas e as visualizações. Também exibiremos os pacotes capturados recentemente, juntamente com informações como o registro de data e hora, IPs de origem e destino, protocolo e tamanho do pacote. Também adicionaremos a capacidade de o usuário atualizar manualmente os dados do painel enquanto também o atualizamos automaticamente a cada dois segundos.

Vamos finalmente executar o programa com o seguinte comando:

sudo streamlit run dashboard.py

Observe que você terá que executar o programa com sudo, pois os recursos de captura de pacotes requerem privilégios administrativos. Se você estiver no Windows, abra seu terminal como Administrador e execute o programa sem o prefixo sudo.

Dê um momento para o programa começar a capturar pacotes. Se tudo correr certo, você deve ver algo assim:

Essas são todas as visualizações que acabamos de implementar em nosso programa de painel Sirlit .

Melhorias Futuras

Com isso, aqui estão algumas ideias de melhorias futuras que você pode usar para estender as funcionalidades do painel:

  1. Adicionar recursos de aprendizado de máquina para detecção de anomalia

  2. Implementar mapeamento geográfico de IP

  3. Crie alertas personalizados com base em padrões de análise de tráfego

  4. Adicionar opções de análise de carga útil de pacotes

Conclusão

Parabéns! Agora você construiu com sucesso um painel de análise de tráfego de rede em tempo real com Python e Streamlit. Este programa fornecerá informações valiosas sobre o comportamento da rede e poderá ser estendido para vários casos de uso, desde monitoramento de segurança até otimização de rede.

Com isso, espero que você tenha aprendido algumas noções básicas sobre análise de tráfego de rede, bem como um pouco de programação Python. Obrigado por ler!