آیا تا به حال خواسته اید ترافیک شبکه خود را در زمان واقعی تجسم کنید؟ در این آموزش، روش ساخت داشبورد تحلیل ترافیک شبکه تعاملی با پایتون و Streamlit. Streamlit یک چارچوب پایتون منبع باز است که می توانید از آن برای توسعه برنامه های کاربردی وب برای تجزیه و تحلیل داده ها و پردازش داده ها استفاده کنید.

در پایان این آموزش، می‌دانید که چگونه بسته‌های شبکه خام را از NIC (کارت رابط شبکه) رایانه خود ضبط کنید. process داده ها و ایجاد تجسم های زیبا که در زمان واقعی به روز می شوند.

فهرست مطالب

  • چرا تجزیه و تحلیل ترافیک شبکه مهم است؟

  • پیش نیازها

  • چگونه پروژه خود را راه اندازی کنیم

  • چگونه عملکردهای اصلی را بسازیم

  • روش ایجاد تجسم های Streamlit

  • روش گرفتن بسته های شبکه

  • قرار دادن همه چیز با هم

  • پیشرفت های آینده

  • نتیجه گیری

چرا تجزیه و تحلیل ترافیک شبکه مهم است؟

تجزیه و تحلیل ترافیک شبکه یک نیاز حیاتی در شرکت هایی است که شبکه ها ستون فقرات تقریباً هر برنامه و سرویس را تشکیل می دهند. در هسته آن، ما تجزیه و تحلیل بسته های شبکه را داریم که شامل نظارت بر شبکه، گرفتن تمام ترافیک (ورودی و خروجی) و تفسیر این بسته ها در جریان جریان در یک شبکه است. شما می توانید از این تکنیک برای شناسایی الگوهای امنیتی، تشخیص ناهنجاری ها و اطمینان از امنیت و کارایی شبکه استفاده کنید.

این پروژه اثبات مفهومی که ما کار خواهیم کرد روی در این آموزش بسیار مفید است زیرا به شما کمک می کند فعالیت شبکه را در زمان واقعی تجسم و تجزیه و تحلیل کنید. و این به شما امکان می دهد تا درک کنید که چگونه عیب یابی، بهینه سازی عملکرد و تجزیه و تحلیل امنیتی در سیستم های سازمانی انجام می شود.

پیش نیازها

  • پایتون 3.8 یا نسخه جدیدتر نصب شده است روی سیستم شما

  • درک اولیه مفاهیم شبکه های کامپیوتری

  • آشنایی با زبان برنامه نویسی پایتون و کتابخانه های پرکاربرد آن.

  • دانش اولیه تکنیک های تجسم داده ها و کتابخانه ها.

چگونه پروژه خود را راه اندازی کنیم

برای شروع، ساختار پروژه را ایجاد کنید و ابزارهای لازم را با Pip با دستورات زیر نصب کنید:

mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly

استفاده خواهیم کرد Streamlit برای تجسم داشبورد، Pandas برای پردازش داده ها، Scapy برای گرفتن بسته های شبکه و پردازش بسته ها و در نهایت Plotly برای رسم نمودارها با داده های جمع آوری شده ما.

چگونه عملکردهای اصلی را بسازیم

ما تمام کدها را در یک فایل واحد به نام قرار خواهیم داد dashboard.py. ابتدا، بیایید با وارد کردن تمام عناصری که استفاده خواهیم کرد شروع کنیم:

import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket

حال اجازه دهید ورود به سیستم را با تنظیم یک پیکربندی اولیه ورود به سیستم پیکربندی کنیم. این برای ردیابی رویدادها و اجرای برنامه ما در حالت اشکال زدایی استفاده می شود. ما در حال حاضر سطح ورود به سیستم را به صورت تنظیم کرده ایم INFO، یعنی رویدادهای با سطح INFO یا بالاتر نمایش داده خواهد شد. اگر با ورود به پایتون آشنایی ندارید، توصیه می‌کنم این قطعه مستند را که عمیق است بررسی کنید.

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

بعد، ما پردازنده بسته خود را می سازیم. ما عملکرد پردازش بسته های ضبط شده خود را در این کلاس پیاده سازی خواهیم کرد.

class PacketProcessor:
    """Process and analyze network packets"""

    def __init__(self):
        self.protocol_map = {
            1: 'ICMP',
            6: 'TCP',
            17: 'UDP'
        }
        self.packet_data = []
        self.start_time = datetime.now()
        self.packet_count = 0
        self.lock = threading.Lock()

    def get_protocol_name(self, protocol_num: int) -> str:
        """Convert protocol number to name"""
        return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')

    def process_packet(self, packet) -> None:
        """Process a single packet and extract relevant information"""
        try:
            if IP in packet:
                with self.lock:
                    packet_info = {
                        'timestamp': datetime.now(),
                        'source': packet[IP].src,
                        'destination': packet[IP].dst,
                        'protocol': self.get_protocol_name(packet[IP].proto),
                        'size': len(packet),
                        'time_relative': (datetime.now() - self.start_time).total_seconds()
                    }

                    # Add TCP-specific information
                    if TCP in packet:
                        packet_info.update({
                            'src_port': packet[TCP].sport,
                            'dst_port': packet[TCP].dport,
                            'tcp_flags': packet[TCP].flags
                        })

                    # Add UDP-specific information
                    elif UDP in packet:
                        packet_info.update({
                            'src_port': packet[UDP].sport,
                            'dst_port': packet[UDP].dport
                        })

                    self.packet_data.append(packet_info)
                    self.packet_count += 1

                    # Keep only last 10000 packets to prevent memory issues
                    if len(self.packet_data) > 10000:
                        self.packet_data.pop(0)

        except Exception as e:
            logger.error(f"Error processing packet: {str(e)}")

    def get_dataframe(self) -> pd.DataFrame:
        """Convert packet data to pandas DataFrame"""
        with self.lock:
            return pd.DataFrame(self.packet_data)

این کلاس عملکرد اصلی ما را می سازد و دارای چندین توابع کاربردی است که برای پردازش بسته ها استفاده می شود.

پیشنهاد می‌کنیم بخوانید:  "متغیر محلی ارجاع شده قبل از انتساب" را در پایتون برطرف کنید

بسته های شبکه در سطح انتقال به دو دسته (TCP و UDP) و پروتکل ICMP در سطح شبکه طبقه بندی می شوند. اگر با مفاهیم TCP/IP آشنا نیستید، توصیه می کنم این مقاله را بررسی کنید روی FreeCodeCamp News.

سازنده ما تمام بسته‌های مشاهده شده را که در این سطل‌های پروتکل TCP/IP طبقه‌بندی شده‌اند که ما تعریف کرده‌ایم، پیگیری می‌کند. ما همچنین به زمان ضبط بسته، داده های گرفته شده و تعداد بسته های ضبط شده توجه خواهیم کرد.

ما همچنین از قفل نخ استفاده خواهیم کرد تا اطمینان حاصل کنیم که تنها یک بسته در یک زمان واحد پردازش می شود. این را می توان بیشتر گسترش داد تا پروژه بتواند پردازش بسته موازی داشته باشد.

را get_protocol_name تابع helper به ما کمک می کند تا نوع صحیح پروتکل را دریافت کنیم روی شماره پروتکل آنها برای دادن پیشینه روی به این ترتیب، Internet Assigned Numbers Authority (IANA) اعداد استاندارد شده ای را برای شناسایی پروتکل های مختلف در یک بسته شبکه اختصاص می دهد. همانطور که و هنگامی که ما این اعداد را در بسته شبکه تجزیه شده می بینیم، می دانیم که چه نوع پروتکلی در بسته ای که در حال حاضر رهگیری می شود استفاده می شود. برای محدوده این پروژه، ما فقط به TCP، UDP و ICMP (Ping) نگاشت خواهیم داشت. اگر با هر نوع بسته دیگری برخورد کنیم، آن را به عنوان دسته بندی می کنیم OTHER(<protocol_num>).

را process_packet تابع عملکرد اصلی ما را کنترل می کند process این بسته های فردی اگر بسته حاوی یک لایه IP باشد، آدرس های IP مبدا و مقصد، نوع پروتکل، اندازه بسته و زمان سپری شده از شروع ضبط بسته را یادداشت می کند.

برای بسته هایی با پروتکل های لایه انتقال خاص (مانند TCP و UDP)، پورت های مبدا و مقصد را به همراه پرچم های TCP برای بسته های TCP می گیریم. این جزئیات استخراج شده در حافظه ذخیره می شود packet_data فهرست ما همچنین پیگیری خواهیم کرد packet_count زمانی که این بسته ها پردازش می شوند.

را get_dataframe تابع به ما کمک می کند تا آن را تبدیل کنیم packet_data فهرست به الف Pandas چارچوب داده که سپس برای تجسم ما استفاده خواهد شد.

روش ایجاد تجسم های Streamlit

اکنون زمان آن است که داشبورد تعاملی Streamlit خود را بسازیم. تابعی به نام تعریف می کنیم create_visualization در dashboard.py اسکریپت (خارج از کلاس پردازش بسته ما).

def create_visualizations(df: pd.DataFrame):
    """Create all dashboard visualizations"""
    if len(df) > 0:
        # Protocol distribution
        protocol_counts = df['protocol'].value_counts()
        fig_protocol = px.pie(
            values=protocol_counts.values,
            names=protocol_counts.index,
            title="Protocol Distribution"
        )
        st.plotly_chart(fig_protocol, use_container_width=True)

        # Packets timeline
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
        fig_timeline = px.line(
            x=df_grouped.index,
            y=df_grouped.values,
            title="Packets per Second"
        )
        st.plotly_chart(fig_timeline, use_container_width=True)

        # Top source IPs
        top_sources = df['source'].value_counts().head(10)
        fig_sources = px.bar(
            x=top_sources.index,
            y=top_sources.values,
            title="Top Source IP Addresses"
        )
        st.plotly_chart(fig_sources, use_container_width=True)

این تابع چارچوب داده را به عنوان ورودی می گیرد و به ما کمک می کند سه نمودار / نمودار را ترسیم کنیم:

  1. نمودار توزیع پروتکل: این نمودار نسبت پروتکل های مختلف (به عنوان مثال، TCP، UDP، ICMP) را در ترافیک بسته های ضبط شده نمایش می دهد.

  2. نمودار جدول زمانی بسته ها: این نمودار تعداد بسته های پردازش شده در هر ثانیه را در یک دوره زمانی نشان می دهد.

  3. بالا Source نمودار آدرس های IP: این نمودار 10 آدرس IP برتر را که بیشترین بسته ها را در ترافیک ضبط شده ارسال کرده اند، مشخص می کند.

نمودار توزیع پروتکل به سادگی یک نمودار دایره ای از تعداد پروتکل ها برای سه نوع مختلف (همراه با OTHER) است. ما استفاده می کنیم Streamlit و Plotly ابزار پایتون برای رسم این نمودارها. از آنجایی که از زمان شروع ضبط بسته، مهر زمانی را نیز یادداشت کردیم، از این داده ها برای ترسیم روند بسته های ضبط شده در طول زمان استفاده خواهیم کرد.

پیشنهاد می‌کنیم بخوانید:  پیمایش لیست به ترتیب معکوس در پایتون

برای نمودار دوم، یک را انجام می دهیم groupby عملیات روی داده ها و دریافت تعداد بسته های ثبت شده در هر ثانیه (S مخفف ثانیه است)، و سپس در نهایت نمودار را رسم می کنیم.

در نهایت، برای نمودار سوم، IP های منبع متمایز مشاهده شده را شمارش می کنیم و نموداری از IP شمارش می کنیم تا 10 IP برتر را نشان دهیم.

روش گرفتن بسته های شبکه

اکنون، اجازه دهید عملکردی را ایجاد کنیم که به ما امکان می دهد داده های بسته شبکه را ضبط کنیم.

def start_packet_capture():
    """Start packet capture in a separate thread"""
    processor = PacketProcessor()

    def capture_packets():
        sniff(prn=processor.process_packet, store=False)

    capture_thread = threading.Thread(target=capture_packets, daemon=True)
    capture_thread.start()

    return processor

این یک تابع ساده است که نمونه را نشان می دهد PacketProcessor کلاس و سپس استفاده می کند sniff عملکرد در scapy ماژول برای شروع گرفتن بسته ها.

ما در اینجا از threading استفاده می کنیم تا بتوانیم بسته ها را مستقل از جریان برنامه اصلی بگیریم. این تضمین می‌کند که عملیات ضبط بسته، سایر عملیات‌ها مانند به‌روزرسانی داشبورد را در زمان واقعی مسدود نمی‌کند. ایجاد شده را نیز برمی گردانیم PacketProcessor به عنوان مثال تا بتوان از آن در برنامه اصلی ما استفاده کرد.

قرار دادن همه چیز با هم

حالا بیایید همه این قطعات را با ما به هم بدوزیم main تابعی که به عنوان تابع درایور برنامه ما عمل می کند.

def main():
    """Main function to run the dashboard"""
    st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
    st.title("Real-time Network Traffic Analysis")

    # Initialize packet processor in session state
    if 'processor' not in st.session_state:
        st.session_state.processor = start_packet_capture()
        st.session_state.start_time = time.time()

    # Create dashboard layout
    col1, col2 = st.columns(2)

    # Get current data
    df = st.session_state.processor.get_dataframe()

    # Display metrics
    with col1:
        st.metric("Total Packets", len(df))
    with col2:
        duration = time.time() - st.session_state.start_time
        st.metric("Capture Duration", f"{duration:.2f}s")

    # Display visualizations
    create_visualizations(df)

    # Display recent packets
    st.subheader("Recent Packets")
    if len(df) > 0:
        st.dataframe(
            df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
            use_container_width=True
        )

    # Add refresh button
    if st.button('Refresh Data'):
        st.rerun()

    # Auto refresh
    time.sleep(2)
    st.rerun()

این تابع نیز نمونه ای از Streamlit داشبورد، و همه اجزای خود را با هم ادغام کنید. ما ابتدا تنظیم کردیم page عنوان ما Streamlit داشبورد و سپس ما را مقداردهی اولیه کنید PacketProcessor. ما از حالت جلسه در استفاده می کنیم Streamlit برای اطمینان از اینکه تنها یک نمونه از گرفتن بسته ایجاد شده و وضعیت آن حفظ می شود.

اکنون، هر بار که داده‌ها پردازش می‌شوند، به صورت پویا چارچوب داده را از حالت جلسه دریافت می‌کنیم و شروع به نمایش معیارها و تجسم‌ها می‌کنیم. ما همچنین بسته های اخیراً گرفته شده را به همراه اطلاعاتی مانند مهر زمانی، IP های مبدا و مقصد، پروتکل و اندازه بسته نمایش خواهیم داد. همچنین این قابلیت را برای کاربر اضافه می کنیم که به صورت دستی داده ها را از داشبورد بازخوانی کند در حالی که ما نیز به طور خودکار هر دو ثانیه آن را بازخوانی می کنیم.

در نهایت برنامه را با دستور زیر اجرا می کنیم:

sudo streamlit run dashboard.py

توجه داشته باشید که باید برنامه را با آن اجرا کنید sudo از آنجایی که قابلیت های ضبط بسته نیاز به امتیازات اداری دارد. اگر شما هستید روی ویندوزت رو باز کن terminal به عنوان Administrator و سپس برنامه را بدون برنامه اجرا کنید sudo پیشوند

یک لحظه فرصت دهید تا برنامه شروع به گرفتن بسته ها کند. اگر همه چیز درست پیش برود، باید چیزی شبیه به این را ببینید:

داشبورد تجزیه و تحلیل ترافیک شبکه یک نمودار دایره ای را با توزیع پروتکل نشان می دهد: TCP (48.7%)، UDP (47.5%) و ICMP (3.8%). در زیر یک نمودار خطی وجود دارد که بسته ها را در هر ثانیه در طول زمان با چندین پیک قابل توجه نمایش می دهد. مجموع بسته ها 6743 و مدت زمان ضبط 118.63 ثانیه است.

داشبوردی با تم تیره که نمودار میله‌ای از آدرس‌های IP منبع اصلی و جدولی از بسته‌های اخیر با جزئیاتی مانند مهر زمان، منبع، مقصد، پروتکل و اندازه را نشان می‌دهد.

اینها همه تجسم هایی هستند که ما به تازگی در خود پیاده سازی کرده ایم Streamlit برنامه داشبورد

پیشرفت های آینده

با آن، در اینجا برخی از ایده های بهبود آینده وجود دارد که می توانید از آنها برای گسترش عملکردهای داشبورد استفاده کنید:

  1. قابلیت‌های یادگیری ماشینی را برای تشخیص ناهنجاری اضافه کنید

  2. پیاده سازی نقشه IP جغرافیایی

  3. ایجاد هشدارهای سفارشی بر اساس روی الگوهای تحلیل ترافیک

  4. گزینه های تحلیل محموله بسته را اضافه کنید

نتیجه گیری

تبریک می گویم! شما اکنون با موفقیت یک داشبورد تجزیه و تحلیل ترافیک شبکه بلادرنگ را با پایتون و Streamlit. این برنامه بینش های ارزشمندی در مورد رفتار شبکه ارائه می دهد و می تواند برای موارد استفاده مختلف، از نظارت بر امنیت گرفته تا بهینه سازی شبکه، گسترش یابد.

با آن، امیدوارم که برخی از اصول اولیه در مورد تجزیه و تحلیل ترافیک شبکه و همچنین کمی برنامه نویسی پایتون را یاد گرفته باشید. با تشکر برای خواندن!