Búsqueda de sitios web

Cómo construir un tablero de tráfico de red en tiempo real con Python y Strewlit


¿Alguna vez ha querido visualizar el tráfico de su red en tiempo real? En este tutorial, aprenderá a construir un panel de análisis de análisis de tráfico de red interactivo con Python y Streamlit . Streamlit es un marco de Python de código abierto que puede usar para desarrollar aplicaciones web para análisis de datos y procesamiento de datos.

Al final de este tutorial, sabrá cómo capturar paquetes de red sin procesar de la NIC (tarjeta de interfaz de red) de su computadora, procesar los datos y crear hermosas visualizaciones que se actualizarán en tiempo real.

Tabla de contenido

  • ¿Por qué es importante el análisis de tráfico de red?

  • Requisitos previos

  • Cómo configurar su proyecto

  • Cómo construir las funcionalidades centrales

  • Cómo crear las visualizaciones de transmisión

  • Cómo capturar los paquetes de red

  • Poniendo todo junto

  • Mejoras futuras

  • Conclusión

¿Por qué es importante el análisis de tráfico de red?

El análisis del tráfico de red es un requisito crítico en las empresas donde las redes forman la columna vertebral de casi todas las aplicaciones y servicios. En esencia, tenemos el análisis de paquetes de red que implica monitorear la red, capturar todo el tráfico (entrada y salida) e interpretar estos paquetes a medida que fluyen a través de una red. Puede utilizar esta técnica para identificar patrones de seguridad, detectar anomalías y garantizar la seguridad y eficiencia de la red.

Este proyecto de prueba de concepto en el que trabajaremos en este tutorial es particularmente útil ya que le ayuda a visualizar y analizar la actividad de la red en tiempo real. Y esto le permitirá comprender cómo se realizan la resolución de problemas, las optimizaciones del rendimiento y el análisis de seguridad en los sistemas empresariales.

Requisitos previos

  • Python 3.8 o una versión más nueva instalada en su sistema.

  • Una comprensión básica de los conceptos de redes informáticas.

  • Familiaridad con el lenguaje de programación de Python y sus bibliotecas ampliamente utilizadas.

  • Conocimiento básico de las técnicas y bibliotecas de visualización de datos.

Cómo configurar su proyecto

Para comenzar, cree la estructura del proyecto e instale las herramientas necesarias con PIP con los siguientes comandos:

mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly

Usaremos Streamlit para las visualizaciones del tablero, pandas para el procesamiento de datos, scapy para captura de paquetes de red y procesamiento de paquetes, y finalmente Plotly para trazar gráficos con nuestros datos recopilados.

Cómo construir las funcionalidades principales

Pondremos todo el código en un solo archivo llamado dashboard.py. En primer lugar, comencemos importando todos los elementos que usaremos:

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket

Ahora configuremos el registro estableciendo una configuración de registro básica. Esto se utilizará para rastrear eventos y ejecutar nuestra aplicación en modo de depuración. Actualmente hemos configurado el nivel de registro en INFO, lo que significa que se mostrarán eventos con nivel INFO o superior. Si no está familiarizado con el inicio de sesión en Python, le recomiendo que consulte esta documentación que profundiza.

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

A continuación, construiremos nuestro procesador de paquetes. Implementaremos la funcionalidad de procesar nuestros paquetes capturados en esta clase.

class PacketProcessor:
    """Process and analyze network packets"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = []
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Convert protocol number to name"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Process a single packet and extract relevant information"""
        try:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'source': packet[IP].src,
                        'destination': packet[IP].dst,
                        'protocol': self.get_protocol_name(packet[IP].proto),
                        'size': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    # Add TCP-specific information
                    if TCP in packet:
                        packet_info.update({
                            'src_port': packet[TCP].sport,
                            'dst_port': packet[TCP].dport,
                            'tcp_flags': packet[TCP].flags
                        })

                    # Add UDP-specific information
                    elif UDP in packet:
                        packet_info.update({
                            'src_port': packet[UDP].sport,
                            'dst_port': packet[UDP].dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    # Keep only last 10000 packets to prevent memory issues
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        except Exception as e:
            logger.error(f"Error processing packet: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Convert packet data to pandas DataFrame"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

Esta clase construirá nuestra funcionalidad principal y tiene varias funciones de utilidad que se utilizarán para procesar los paquetes.

Los paquetes de red se clasifican en dos en el nivel de transporte (TCP y UDP) y el protocolo ICMP en el nivel de red. Si no está familiarizado con los conceptos de TCP/IP, le recomiendo que consulte este artículo sobre FreeCodeCamp News.

Nuestro constructor realizará un seguimiento de todos los paquetes vistos que se clasifican en estos cubos de tipo de protocolo TCP/IP que definimos. También tomaremos nota del tiempo de captura de paquetes, los datos capturados y la cantidad de paquetes capturados.

También aprovecharemos un bloqueo de subprocesos para garantizar que solo se procese un paquete a la vez. Esto se puede ampliar aún más para permitir que el proyecto tenga procesamiento de paquetes paralelo.

La función auxiliar get_protocol_name nos ayuda a obtener el tipo correcto de protocolo en función de sus números de protocolo. Para brindar algunos antecedentes sobre esto, la Autoridad de Números Asignados de Internet (IANA) asigna números estandarizados para identificar diferentes protocolos en un paquete de red. A medida que veamos estos números en el paquete de red analizado, sabremos qué tipo de protocolo se está utilizando en el paquete actualmente interceptado. Para el alcance de este proyecto, asignaremos solo TCP, UDP e ICMP (Ping). Si encontramos cualquier otro tipo de paquete, lo categorizaremos como OTHER().

La función process_packet maneja nuestra funcionalidad principal que procesará estos paquetes individuales. Si el paquete contiene una capa IP, tomará nota de las direcciones IP de origen y destino, el tipo de protocolo, el tamaño del paquete y el tiempo transcurrido desde el inicio de la captura del paquete.

Para paquetes con protocolos específicos de capa de transporte (como TCP y UDP), capturaremos los puertos de origen y destino junto con los indicadores TCP para paquetes TCP. Estos detalles extraídos se almacenarán en la memoria en la lista Packet_data . También realizaremos un seguimiento del Packet_count cuando se procesan estos paquetes.

La función get_dataframe nos ayuda a convertir la lista paquete_data en una pandas data-marco que luego se utilizará para nuestra visualización.

Cómo crear las visualizaciones de transmisión

Ahora es el momento de que construamos nuestro panel de racionalización interactivo. Definiremos una función llamada create_visualization en el script dashboard.py (fuera de nuestra clase de procesamiento de paquetes).

def create_visualizations(df: pd.DataFrame):
    """Create all dashboard visualizations"""
    if len(df) > 0:
        # Protocol distribution
        protocol_counts = df['protocol'].value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Protocol Distribution"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        # Packets timeline
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Packets per Second"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        # Top source IPs
        top_sources = df['source'].value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="Top Source IP Addresses"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

Esta función tomará el marco de datos como entrada y nos ayudará a trazar tres gráficos/gráficos:

  1. Gráfico de distribución de protocolos: este gráfico mostrará la proporción de diferentes protocolos (por ejemplo, TCP, UDP, ICMP) en el tráfico de paquetes capturado.

  2. Gráfico de línea de tiempo de paquetes: este gráfico mostrará la cantidad de paquetes procesados por segundo durante un período de tiempo.

  3. Gráfico de direcciones IP de origen principal: este gráfico resaltará las 10 direcciones IP principales que enviaron la mayor cantidad de paquetes en el tráfico capturado.

El gráfico de distribución de protocolos es simplemente un gráfico circular de los recuentos de protocolos para los tres tipos diferentes (junto con OTROS). Usamos las herramientas Python Streamlit y Plotly para trazar estos gráficos. Dado que también anotamos la marca de tiempo desde que comenzó la captura de paquetes, usaremos estos datos para trazar la tendencia de los paquetes capturados a lo largo del tiempo.

Para el segundo gráfico, haremos una operación groupby en los datos y obtendremos el número de paquetes capturados en cada segundo (S significa segundos), y finalmente trazar la gráfica.

Finalmente, para el tercer gráfico, contaremos las IP de fuente distintas observadas y la gráfica de un gráfico de la IP cuenta para mostrar los 10 IP principales.

Cómo capturar los paquetes de red

Ahora, creemos la funcionalidad para permitirnos capturar datos de paquetes de red.

def start_packet_capture():
    """Start packet capture in a separate thread"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, store=False)

    capture_thread = threading.Thread(target=capture_packets, daemon=True)
    capture_thread.start()

    return processor

Esta es una función simple que crea una instancia de la clase PacketProcessor y luego usa la función sniff en el módulo scapy para comenzar a capturar los paquetes.

Utilizamos hilo aquí para permitirnos capturar paquetes de forma independiente del flujo principal del programa. Esto asegura que la operación de captura de paquetes no bloquee otras operaciones como actualizar el tablero en tiempo real. También devolvemos la instancia creada PacketProcessor para que pueda usarse en nuestro programa principal.

Poniendo todo junto

Ahora unamos todas estas piezas con nuestra función main que actuará como función controladora de nuestro programa.

def main():
    """Main function to run the dashboard"""
    st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
    st.title("Real-time Network Traffic Analysis")

    # Initialize packet processor in session state
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    # Create dashboard layout
    col1, col2 = st.columns(2)

    # Get current data
    df = st.session_state.processor.get_dataframe()

    # Display metrics
    with col1:
        st.metric("Total Packets", len(df))
    with col2:
        duration = time.time() - st.session_state.start_time
        st.metric("Capture Duration", f"{duration:.2f}s")

    # Display visualizations
    create_visualizations(df)

    # Display recent packets
    st.subheader("Recent Packets")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
            use_container_width=True
        )

    # Add refresh button
    if st.button('Refresh Data'):
        st.rerun()

    # Auto refresh
    time.sleep(2)
    st.rerun()

Esta función también instanciará el tablero Streamlit e integrará todos nuestros componentes juntos. Primero establecemos el título de la página de nuestro tablero Streamlit e inicializamos nuestro PacketProcessor . Usamos el estado de sesión en Streamlit para garantizar que solo se cree una instancia de captura de paquetes y se retiene el estado de este.

Ahora, obtendremos dinámicamente el marco de datos del estado de la sesión cada vez que se procesen los datos y comenzaremos a mostrar las métricas y las visualizaciones. También mostraremos los paquetes capturados recientemente junto con información como la marca de tiempo, las IP de origen y destino, el protocolo y el tamaño del paquete. También agregaremos la capacidad para que el usuario actualice manualmente los datos desde el panel mientras los actualizamos automáticamente cada dos segundos.

Finalmente ejecutemos el programa con el siguiente comando:

sudo streamlit run dashboard.py

Tenga en cuenta que deberá ejecutar el programa con sudo ya que las capacidades de captura de paquetes requieren privilegios administrativos. Si está en Windows, abra su terminal como administrador y luego ejecute el programa sin el prefijo sudo .

Espere un momento para que el programa comience a capturar paquetes. Si todo va bien, deberías ver algo como esto:

Estas son todas las visualizaciones que acabamos de implementar en nuestro programa de panel Streamlit.

Mejoras futuras

Con eso, aquí hay algunas ideas futuras de mejora que puede usar para extender las funcionalidades del tablero:

  1. Agregue capacidades de aprendizaje automático para la detección de anomalías

  2. Implementar la asignación geográfica de IP

  3. Cree alertas personalizadas basadas en patrones de análisis de tráfico

  4. Agregar opciones de análisis de carga útil de paquetes

Conclusión

¡Felicidades! Ahora ha creado con éxito un tablero de análisis de tráfico de red en tiempo real con Python y Streamlit . Este programa proporcionará información valiosa sobre el comportamiento de la red y puede extenderse para varios casos de uso, desde el monitoreo de seguridad hasta la optimización de la red.

Con eso, espero que hayas aprendido algunos conceptos básicos sobre el análisis del tráfico de red, así como un poco de programación en Python. ¡Gracias por leer!