از طریق منوی جستجو مطلب مورد نظر خود در وبلاگ را به سرعت پیدا کنید
روش ساخت داشبورد ترافیک شبکه بلادرنگ با پایتون و استریم لیت
سرفصلهای مطلب
آیا تا به حال خواسته اید ترافیک شبکه خود را در زمان واقعی تجسم کنید؟ در این آموزش، روش ساخت داشبورد تحلیل ترافیک شبکه تعاملی با پایتون و Streamlit
. Streamlit
یک چارچوب پایتون منبع باز است که می توانید از آن برای توسعه برنامه های کاربردی وب برای تجزیه و تحلیل داده ها و پردازش داده ها استفاده کنید.
در پایان این آموزش، میدانید که چگونه بستههای شبکه خام را از NIC (کارت رابط شبکه) رایانه خود ضبط کنید. process داده ها و ایجاد تجسم های زیبا که در زمان واقعی به روز می شوند.
فهرست مطالب
-
چرا تجزیه و تحلیل ترافیک شبکه مهم است؟
-
پیش نیازها
-
چگونه پروژه خود را راه اندازی کنیم
-
چگونه عملکردهای اصلی را بسازیم
-
روش ایجاد تجسم های Streamlit
-
روش گرفتن بسته های شبکه
-
قرار دادن همه چیز با هم
-
پیشرفت های آینده
-
نتیجه گیری
چرا تجزیه و تحلیل ترافیک شبکه مهم است؟
تجزیه و تحلیل ترافیک شبکه یک نیاز حیاتی در شرکت هایی است که شبکه ها ستون فقرات تقریباً هر برنامه و سرویس را تشکیل می دهند. در هسته آن، ما تجزیه و تحلیل بسته های شبکه را داریم که شامل نظارت بر شبکه، گرفتن تمام ترافیک (ورودی و خروجی) و تفسیر این بسته ها در جریان جریان در یک شبکه است. شما می توانید از این تکنیک برای شناسایی الگوهای امنیتی، تشخیص ناهنجاری ها و اطمینان از امنیت و کارایی شبکه استفاده کنید.
این پروژه اثبات مفهومی که ما کار خواهیم کرد روی در این آموزش بسیار مفید است زیرا به شما کمک می کند فعالیت شبکه را در زمان واقعی تجسم و تجزیه و تحلیل کنید. و این به شما امکان می دهد تا درک کنید که چگونه عیب یابی، بهینه سازی عملکرد و تجزیه و تحلیل امنیتی در سیستم های سازمانی انجام می شود.
پیش نیازها
-
پایتون 3.8 یا نسخه جدیدتر نصب شده است روی سیستم شما
-
درک اولیه مفاهیم شبکه های کامپیوتری
-
آشنایی با زبان برنامه نویسی پایتون و کتابخانه های پرکاربرد آن.
-
دانش اولیه تکنیک های تجسم داده ها و کتابخانه ها.
چگونه پروژه خود را راه اندازی کنیم
برای شروع، ساختار پروژه را ایجاد کنید و ابزارهای لازم را با Pip با دستورات زیر نصب کنید:
mkdir network-dashboard
cd network-dashboard
pip install streamlit pandas scapy plotly
استفاده خواهیم کرد Streamlit
برای تجسم داشبورد، Pandas
برای پردازش داده ها، Scapy
برای گرفتن بسته های شبکه و پردازش بسته ها و در نهایت Plotly
برای رسم نمودارها با داده های جمع آوری شده ما.
چگونه عملکردهای اصلی را بسازیم
ما تمام کدها را در یک فایل واحد به نام قرار خواهیم داد dashboard.py
. ابتدا، بیایید با وارد کردن تمام عناصری که استفاده خواهیم کرد شروع کنیم:
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scapy.all import *
from collections import defaultdict
import time
from datetime import datetime
import threading
import warnings
import logging
from typing import Dict, List, Optional
import socket
حال اجازه دهید ورود به سیستم را با تنظیم یک پیکربندی اولیه ورود به سیستم پیکربندی کنیم. این برای ردیابی رویدادها و اجرای برنامه ما در حالت اشکال زدایی استفاده می شود. ما در حال حاضر سطح ورود به سیستم را به صورت تنظیم کرده ایم INFO
، یعنی رویدادهای با سطح INFO
یا بالاتر نمایش داده خواهد شد. اگر با ورود به پایتون آشنایی ندارید، توصیه میکنم این قطعه مستند را که عمیق است بررسی کنید.
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
بعد، ما پردازنده بسته خود را می سازیم. ما عملکرد پردازش بسته های ضبط شده خود را در این کلاس پیاده سازی خواهیم کرد.
class PacketProcessor:
"""Process and analyze network packets"""
def __init__(self):
self.protocol_map = {
1: 'ICMP',
6: 'TCP',
17: 'UDP'
}
self.packet_data = []
self.start_time = datetime.now()
self.packet_count = 0
self.lock = threading.Lock()
def get_protocol_name(self, protocol_num: int) -> str:
"""Convert protocol number to name"""
return self.protocol_map.get(protocol_num, f'OTHER({protocol_num})')
def process_packet(self, packet) -> None:
"""Process a single packet and extract relevant information"""
try:
if IP in packet:
with self.lock:
packet_info = {
'timestamp': datetime.now(),
'source': packet[IP].src,
'destination': packet[IP].dst,
'protocol': self.get_protocol_name(packet[IP].proto),
'size': len(packet),
'time_relative': (datetime.now() - self.start_time).total_seconds()
}
# Add TCP-specific information
if TCP in packet:
packet_info.update({
'src_port': packet[TCP].sport,
'dst_port': packet[TCP].dport,
'tcp_flags': packet[TCP].flags
})
# Add UDP-specific information
elif UDP in packet:
packet_info.update({
'src_port': packet[UDP].sport,
'dst_port': packet[UDP].dport
})
self.packet_data.append(packet_info)
self.packet_count += 1
# Keep only last 10000 packets to prevent memory issues
if len(self.packet_data) > 10000:
self.packet_data.pop(0)
except Exception as e:
logger.error(f"Error processing packet: {str(e)}")
def get_dataframe(self) -> pd.DataFrame:
"""Convert packet data to pandas DataFrame"""
with self.lock:
return pd.DataFrame(self.packet_data)
این کلاس عملکرد اصلی ما را می سازد و دارای چندین توابع کاربردی است که برای پردازش بسته ها استفاده می شود.
بسته های شبکه در سطح انتقال به دو دسته (TCP و UDP) و پروتکل ICMP در سطح شبکه طبقه بندی می شوند. اگر با مفاهیم TCP/IP آشنا نیستید، توصیه می کنم این مقاله را بررسی کنید روی FreeCodeCamp News.
سازنده ما تمام بستههای مشاهده شده را که در این سطلهای پروتکل TCP/IP طبقهبندی شدهاند که ما تعریف کردهایم، پیگیری میکند. ما همچنین به زمان ضبط بسته، داده های گرفته شده و تعداد بسته های ضبط شده توجه خواهیم کرد.
ما همچنین از قفل نخ استفاده خواهیم کرد تا اطمینان حاصل کنیم که تنها یک بسته در یک زمان واحد پردازش می شود. این را می توان بیشتر گسترش داد تا پروژه بتواند پردازش بسته موازی داشته باشد.
را get_protocol_name
تابع helper به ما کمک می کند تا نوع صحیح پروتکل را دریافت کنیم روی شماره پروتکل آنها برای دادن پیشینه روی به این ترتیب، Internet Assigned Numbers Authority (IANA) اعداد استاندارد شده ای را برای شناسایی پروتکل های مختلف در یک بسته شبکه اختصاص می دهد. همانطور که و هنگامی که ما این اعداد را در بسته شبکه تجزیه شده می بینیم، می دانیم که چه نوع پروتکلی در بسته ای که در حال حاضر رهگیری می شود استفاده می شود. برای محدوده این پروژه، ما فقط به TCP، UDP و ICMP (Ping) نگاشت خواهیم داشت. اگر با هر نوع بسته دیگری برخورد کنیم، آن را به عنوان دسته بندی می کنیم OTHER(<protocol_num>)
.
را process_packet
تابع عملکرد اصلی ما را کنترل می کند process این بسته های فردی اگر بسته حاوی یک لایه IP باشد، آدرس های IP مبدا و مقصد، نوع پروتکل، اندازه بسته و زمان سپری شده از شروع ضبط بسته را یادداشت می کند.
برای بسته هایی با پروتکل های لایه انتقال خاص (مانند TCP و UDP)، پورت های مبدا و مقصد را به همراه پرچم های TCP برای بسته های TCP می گیریم. این جزئیات استخراج شده در حافظه ذخیره می شود packet_data
فهرست ما همچنین پیگیری خواهیم کرد packet_count
زمانی که این بسته ها پردازش می شوند.
را get_dataframe
تابع به ما کمک می کند تا آن را تبدیل کنیم packet_data
فهرست به الف Pandas
چارچوب داده که سپس برای تجسم ما استفاده خواهد شد.
روش ایجاد تجسم های Streamlit
اکنون زمان آن است که داشبورد تعاملی Streamlit خود را بسازیم. تابعی به نام تعریف می کنیم create_visualization
در dashboard.py
اسکریپت (خارج از کلاس پردازش بسته ما).
def create_visualizations(df: pd.DataFrame):
"""Create all dashboard visualizations"""
if len(df) > 0:
# Protocol distribution
protocol_counts = df['protocol'].value_counts()
fig_protocol = px.pie(
values=protocol_counts.values,
names=protocol_counts.index,
title="Protocol Distribution"
)
st.plotly_chart(fig_protocol, use_container_width=True)
# Packets timeline
df['timestamp'] = pd.to_datetime(df['timestamp'])
df_grouped = df.groupby(df['timestamp'].dt.floor('S')).size()
fig_timeline = px.line(
x=df_grouped.index,
y=df_grouped.values,
title="Packets per Second"
)
st.plotly_chart(fig_timeline, use_container_width=True)
# Top source IPs
top_sources = df['source'].value_counts().head(10)
fig_sources = px.bar(
x=top_sources.index,
y=top_sources.values,
title="Top Source IP Addresses"
)
st.plotly_chart(fig_sources, use_container_width=True)
این تابع چارچوب داده را به عنوان ورودی می گیرد و به ما کمک می کند سه نمودار / نمودار را ترسیم کنیم:
-
نمودار توزیع پروتکل: این نمودار نسبت پروتکل های مختلف (به عنوان مثال، TCP، UDP، ICMP) را در ترافیک بسته های ضبط شده نمایش می دهد.
-
نمودار جدول زمانی بسته ها: این نمودار تعداد بسته های پردازش شده در هر ثانیه را در یک دوره زمانی نشان می دهد.
-
بالا Source نمودار آدرس های IP: این نمودار 10 آدرس IP برتر را که بیشترین بسته ها را در ترافیک ضبط شده ارسال کرده اند، مشخص می کند.
نمودار توزیع پروتکل به سادگی یک نمودار دایره ای از تعداد پروتکل ها برای سه نوع مختلف (همراه با OTHER) است. ما استفاده می کنیم Streamlit
و Plotly
ابزار پایتون برای رسم این نمودارها. از آنجایی که از زمان شروع ضبط بسته، مهر زمانی را نیز یادداشت کردیم، از این داده ها برای ترسیم روند بسته های ضبط شده در طول زمان استفاده خواهیم کرد.
برای نمودار دوم، یک را انجام می دهیم groupby
عملیات روی داده ها و دریافت تعداد بسته های ثبت شده در هر ثانیه (S
مخفف ثانیه است)، و سپس در نهایت نمودار را رسم می کنیم.
در نهایت، برای نمودار سوم، IP های منبع متمایز مشاهده شده را شمارش می کنیم و نموداری از IP شمارش می کنیم تا 10 IP برتر را نشان دهیم.
روش گرفتن بسته های شبکه
اکنون، اجازه دهید عملکردی را ایجاد کنیم که به ما امکان می دهد داده های بسته شبکه را ضبط کنیم.
def start_packet_capture():
"""Start packet capture in a separate thread"""
processor = PacketProcessor()
def capture_packets():
sniff(prn=processor.process_packet, store=False)
capture_thread = threading.Thread(target=capture_packets, daemon=True)
capture_thread.start()
return processor
این یک تابع ساده است که نمونه را نشان می دهد PacketProcessor
کلاس و سپس استفاده می کند sniff
عملکرد در scapy
ماژول برای شروع گرفتن بسته ها.
ما در اینجا از threading استفاده می کنیم تا بتوانیم بسته ها را مستقل از جریان برنامه اصلی بگیریم. این تضمین میکند که عملیات ضبط بسته، سایر عملیاتها مانند بهروزرسانی داشبورد را در زمان واقعی مسدود نمیکند. ایجاد شده را نیز برمی گردانیم PacketProcessor
به عنوان مثال تا بتوان از آن در برنامه اصلی ما استفاده کرد.
قرار دادن همه چیز با هم
حالا بیایید همه این قطعات را با ما به هم بدوزیم main
تابعی که به عنوان تابع درایور برنامه ما عمل می کند.
def main():
"""Main function to run the dashboard"""
st.set_page_config(page_title="Network Traffic Analysis", layout="wide")
st.title("Real-time Network Traffic Analysis")
# Initialize packet processor in session state
if 'processor' not in st.session_state:
st.session_state.processor = start_packet_capture()
st.session_state.start_time = time.time()
# Create dashboard layout
col1, col2 = st.columns(2)
# Get current data
df = st.session_state.processor.get_dataframe()
# Display metrics
with col1:
st.metric("Total Packets", len(df))
with col2:
duration = time.time() - st.session_state.start_time
st.metric("Capture Duration", f"{duration:.2f}s")
# Display visualizations
create_visualizations(df)
# Display recent packets
st.subheader("Recent Packets")
if len(df) > 0:
st.dataframe(
df.tail(10)[['timestamp', 'source', 'destination', 'protocol', 'size']],
use_container_width=True
)
# Add refresh button
if st.button('Refresh Data'):
st.rerun()
# Auto refresh
time.sleep(2)
st.rerun()
این تابع نیز نمونه ای از Streamlit
داشبورد، و همه اجزای خود را با هم ادغام کنید. ما ابتدا تنظیم کردیم page عنوان ما Streamlit
داشبورد و سپس ما را مقداردهی اولیه کنید PacketProcessor
. ما از حالت جلسه در استفاده می کنیم Streamlit
برای اطمینان از اینکه تنها یک نمونه از گرفتن بسته ایجاد شده و وضعیت آن حفظ می شود.
اکنون، هر بار که دادهها پردازش میشوند، به صورت پویا چارچوب داده را از حالت جلسه دریافت میکنیم و شروع به نمایش معیارها و تجسمها میکنیم. ما همچنین بسته های اخیراً گرفته شده را به همراه اطلاعاتی مانند مهر زمانی، IP های مبدا و مقصد، پروتکل و اندازه بسته نمایش خواهیم داد. همچنین این قابلیت را برای کاربر اضافه می کنیم که به صورت دستی داده ها را از داشبورد بازخوانی کند در حالی که ما نیز به طور خودکار هر دو ثانیه آن را بازخوانی می کنیم.
در نهایت برنامه را با دستور زیر اجرا می کنیم:
sudo streamlit run dashboard.py
توجه داشته باشید که باید برنامه را با آن اجرا کنید sudo
از آنجایی که قابلیت های ضبط بسته نیاز به امتیازات اداری دارد. اگر شما هستید روی ویندوزت رو باز کن terminal به عنوان Administrator و سپس برنامه را بدون برنامه اجرا کنید sudo
پیشوند
یک لحظه فرصت دهید تا برنامه شروع به گرفتن بسته ها کند. اگر همه چیز درست پیش برود، باید چیزی شبیه به این را ببینید:
اینها همه تجسم هایی هستند که ما به تازگی در خود پیاده سازی کرده ایم Streamlit
برنامه داشبورد
پیشرفت های آینده
با آن، در اینجا برخی از ایده های بهبود آینده وجود دارد که می توانید از آنها برای گسترش عملکردهای داشبورد استفاده کنید:
-
قابلیتهای یادگیری ماشینی را برای تشخیص ناهنجاری اضافه کنید
-
پیاده سازی نقشه IP جغرافیایی
-
ایجاد هشدارهای سفارشی بر اساس روی الگوهای تحلیل ترافیک
-
گزینه های تحلیل محموله بسته را اضافه کنید
نتیجه گیری
تبریک می گویم! شما اکنون با موفقیت یک داشبورد تجزیه و تحلیل ترافیک شبکه بلادرنگ را با پایتون و Streamlit
. این برنامه بینش های ارزشمندی در مورد رفتار شبکه ارائه می دهد و می تواند برای موارد استفاده مختلف، از نظارت بر امنیت گرفته تا بهینه سازی شبکه، گسترش یابد.
با آن، امیدوارم که برخی از اصول اولیه در مورد تجزیه و تحلیل ترافیک شبکه و همچنین کمی برنامه نویسی پایتون را یاد گرفته باشید. با تشکر برای خواندن!
منتشر شده در 1404-01-04 12:12:10