از طریق منوی جستجو مطلب مورد نظر خود در وبلاگ را به سرعت پیدا کنید
روش ساخت یک سیستم تشخیص نفوذ بلادرنگ با پایتون و Open-Source کتابخانه ها
سرفصلهای مطلب
سیستم تشخیص نفوذ (IDS) مانند یک دوربین امنیتی برای شبکه شما است. همانطور که دوربینهای امنیتی به شناسایی فعالیتهای مشکوک در دنیای فیزیکی کمک میکنند، یک IDS نیز شبکه شما را کنترل میکند تا به شناسایی هرگونه حمله سایبری بالقوه و نقض امنیت کمک کند.
در پایان این آموزش، روش عملکرد یک IDS را میدانید و میتوانید با استفاده از پایتون، سیستم نظارت بر شبکه خود را در زمان واقعی بسازید.
فهرست مطالب
-
آشنایی با انواع IDS
-
چگونه محیط توسعه خود را راه اندازی کنید
-
ساخت مؤلفه های Core IDS
-
ساخت موتور ضبط بسته ها
-
ساخت ماژول تحلیل ترافیک
-
ساخت موتور تشخیص
-
ساختن سیستم هشدار
-
قرار دادن آن همه با هم
-
-
ایده هایی برای گسترش IDS
-
ملاحظات امنیتی
-
تست IDS روی داده های ساختگی
-
بسته بندی
آشنایی با انواع IDS
قبل از اینکه وارد بخش کدنویسی شویم، بیایید انواع IDS را درک کنیم:
-
IDS مبتنی بر شبکه (NIDS): این سیستم ترافیک شبکه را برای فعالیت مشکوک رصد می کند.
-
IDS مبتنی بر میزبان (HIDS): این سیستم لاگ های سیستم و تغییرات فایل ها را رصد می کند روی میزبان فردی است و مستقیماً در شبکه مستقر نیست.
-
IDS مبتنی بر امضا: این سیستم یا در شبکه است یا روی را host و الگوهای حمله را بر اساس شناسایی می کند روی الگوهای شناخته شده
-
IDS مبتنی بر ناهنجاری: این سیستم رفتار غیرعادی را با استفاده از الگوریتم های اکتشافی و پیش بینی که آموزش دیده اند شناسایی می کند. روی الگوهای حمله قبلا دیده شده است.
برای این آموزش، شما یک سیستم ترکیبی می سازید که سیستم های تشخیص مبتنی بر امضا و مبتنی بر ناهنجاری را برای نظارت بر ترافیک شبکه ترکیب می کند.
چگونه محیط توسعه خود را راه اندازی کنید
بیایید با تنظیم محیط پایتون (من از پایتون 3 استفاده می کنم) و نصب پیش نیازهای زیر شروع کنیم:
pip install scapy
pip install python-nmap
pip install numpy
pip install sklearn
ساخت مؤلفه های Core IDS
IDS ما از چهار جزء اصلی تشکیل شده است:
-
یک سیستم ضبط بسته
-
ماژول آنالیز ترافیک
-
یک موتور تشخیص
-
یک سیستم هشدار
ساخت موتور ضبط بسته ها
بیایید با موتور ضبط بسته شروع کنیم. برای این کار از Scapy استفاده می کنیم. Scapy یک کتابخانه شبکه ای است که به ما اجازه می دهد تا عملیات مربوط به شبکه و شبکه را با استفاده از پایتون انجام دهیم.
ابتدا ما خودمان را تعریف می کنیم PacketCapture
کلاسی که به عنوان پایه IDS ما عمل خواهد کرد.
from scapy.all import sniff, IP, TCP
from collections import defaultdict
import threading
import queue
class PacketCapture:
def __init__(self):
self.packet_queue = queue.Queue()
self.stop_capture = threading.Event()
def packet_callback(self, packet):
if IP in packet and TCP in packet:
self.packet_queue.put(packet)
def start_capture(self, interface="eth0"):
def capture_thread():
sniff(iface=interface,
prn=self.packet_callback,
store=0,
stop_filter=lambda _: self.stop_capture.is_set())
self.capture_thread = threading.Thread(target=capture_thread)
self.capture_thread.start()
def stop(self):
self.stop_capture.set()
self.capture_thread.join()
بیایید به سرعت کد را مرور کنیم و بفهمیم که این توابع چه کاری انجام می دهند. برای این کار، از threading و صفها برای کارآمدی استفاده میکنید process و بسته های شبکه را ضبط کنید.
این init
متد کلاس را با ایجاد a مقداردهی اولیه می کند queue.Queue
برای ذخیره بستههای ضبطشده و یک رویداد رشتهای برای کنترل زمان توقف گرفتن بسته. این packet_callback
متد به عنوان یک کنترل کننده برای هر بسته ضبط شده عمل می کند و بررسی می کند که آیا بسته دارای هر دو لایه IP و TCP است. اگر چنین است، آن را برای پردازش بیشتر به صف اضافه می کند.
این start_capture
روش شروع به گرفتن بسته ها می کند روی یک رابط مشخص (پیشفرض به eth0
برای گرفتن بسته ها از رابط اترنت). اجرا کنید ifconfig
برای درک رابط های موجود و انتخاب رابط مناسب از لیست.
این تابع یک رشته مجزا برای اجرای تابع sniff Scapy ایجاد می کند که به طور مداوم رابط را برای بسته ها نظارت می کند. این stop_filter
پارامتر تضمین می کند که ضبط زمانی که stop_capture
رویداد تحریک می شود.
این stop
متد با تنظیم کردن، ضبط را متوقف می کند stop_capture
رویداد و منتظر می ماند تا thread اجرا شود و اطمینان حاصل شود process تمیز خاتمه می یابد این طراحی امکان گرفتن بدون درز بسته های بلادرنگ را بدون مسدود کردن رشته اصلی فراهم می کند.
ساخت ماژول تحلیل ترافیک
حالا بیایید ماژول تحلیل ترافیک را بنویسیم. این ماژول خواهد شد process بسته های ضبط شده و استخراج ویژگی های مربوطه.
class TrafficAnalyzer:
def __init__(self):
self.connections = defaultdict(list)
self.flow_stats = defaultdict(lambda: {
'packet_count': 0,
'byte_count': 0,
'start_time': None,
'last_time': None
})
def analyze_packet(self, packet):
if IP in packet and TCP in packet:
ip_src = packet[IP].src
ip_dst = packet[IP].dst
port_src = packet[TCP].sport
port_dst = packet[TCP].dport
flow_key = (ip_src, ip_dst, port_src, port_dst)
# Update flow statistics
stats = self.flow_stats[flow_key]
stats['packet_count'] += 1
stats['byte_count'] += len(packet)
current_time = packet.time
if not stats['start_time']:
stats['start_time'] = current_time
stats['last_time'] = current_time
return self.extract_features(packet, stats)
def extract_features(self, packet, stats):
return {
'packet_size': len(packet),
'flow_duration': stats['last_time'] - stats['start_time'],
'packet_rate': stats['packet_count'] / (stats['last_time'] - stats['start_time']),
'byte_rate': stats['byte_count'] / (stats['last_time'] - stats['start_time']),
'tcp_flags': packet[TCP].flags,
'window_size': packet[TCP].window
}
در این قسمت کد را تعریف می کنیم TrafficAnalyzer
کلاس برای تجزیه و تحلیل ترافیک شبکه در اینجا ما جریان های اتصال را دنبال می کنیم و آمار بسته ها را در زمان واقعی محاسبه می کنیم. ما استفاده می کنیم defaultdict
ساختار داده در پایتون برای مدیریت اتصالات و آمار جریان با سازماندهی داده ها بر اساس جریان های منحصر به فرد.
این __init__
متد دو ویژگی را مقداردهی اولیه می کند: connections
، که لیستی از بسته های مرتبط را برای هر جریان ذخیره می کند و flow_stats
، که آمارهای جمع آوری شده را برای هر جریان ذخیره می کند، مانند تعداد بسته ها، تعداد بایت ها، زمان شروع و زمان آخرین بسته.
این analyze_packet
روش هر بسته را پردازش می کند. اگر بسته حاوی لایههای IP و TCP باشد، آیپیها و پورتهای مبدا و مقصد را استخراج میکند و یک یک منحصربهفرد را تشکیل میدهد. flow_key
برای شناسایی جریان با افزایش تعداد بسته ها، اضافه کردن اندازه بسته به تعداد بایت ها و تنظیم یا به روز رسانی زمان شروع و آخرین جریان، آمار جریان را به روز می کند. در نهایت تماس می گیرد extract_features
برای محاسبه و برگرداندن معیارهای اضافی.
این extract_features
این روش مشخصات دقیق جریان و بسته جاری را محاسبه می کند. اینها شامل اندازه بسته، مدت زمان جریان، نرخ بسته، نرخ بایت، پرچم های TCP و اندازه پنجره TCP است. این معیارها برای شناسایی الگوها، ناهنجاری ها یا تهدیدهای بالقوه در ترافیک شبکه کاملاً مفید هستند.
ساخت موتور تشخیص
اکنون موتور تشخیص خود را تعریف میکنیم که هم مکانیسمهای تشخیص مبتنی بر ناهنجاری و هم امضا را اجرا میکند:
from sklearn.ensemble import IsolationForest
import numpy as np
class DetectionEngine:
def __init__(self):
self.anomaly_detector = IsolationForest(
contamination=0.1,
random_state=42
)
self.signature_rules = self.load_signature_rules()
self.training_data = []
def load_signature_rules(self):
return {
'syn_flood': {
'condition': lambda features: (
features['tcp_flags'] == 2 and # SYN flag
features['packet_rate'] > 100
)
},
'port_scan': {
'condition': lambda features: (
features['packet_size'] < 100 and
features['packet_rate'] > 50
)
}
}
def train_anomaly_detector(self, normal_traffic_data):
self.anomaly_detector.fit(normal_traffic_data)
def detect_threats(self, features):
threats = []
# Signature-based detection
for rule_name, rule in self.signature_rules.items():
if rule['condition'](features):
threats.append({
'type': 'signature',
'rule': rule_name,
'confidence': 1.0
})
# Anomaly-based detection
feature_vector = np.array([[
features['packet_size'],
features['packet_rate'],
features['byte_rate']
]])
anomaly_score = self.anomaly_detector.score_samples(feature_vector)[0]
if anomaly_score < -0.5: # Threshold for anomaly detection
threats.append({
'type': 'anomaly',
'score': anomaly_score,
'confidence': min(1.0, abs(anomaly_score))
})
return threats
این کد یک سیستم ترکیبی را تعریف می کند که روش های تشخیص مبتنی بر امضا و مبتنی بر ناهنجاری را ترکیب می کند. ما از مدل Isolation Forest برای تشخیص ناهنجاری ها و همچنین از قوانین از پیش تعریف شده برای شناسایی الگوهای حمله خاص استفاده می کنیم. اگر می خواهید در مورد روش عملکرد مدل جنگل ایزوله بیشتر بدانید، این مقاله را بررسی کنید.
در این قطعه کد، train_anomaly_detector
روش، مدل جنگل ایزوله را با استفاده از مجموعه داده ای از ویژگی های ترافیک عادی آموزش می دهد. این مدل را قادر می سازد تا الگوهای ترافیک معمولی را از ناهنجاری ها متمایز کند.
این detect_threats
این روش ویژگی های ترافیک شبکه را برای تهدیدات احتمالی با استفاده از دو رویکرد ارزیابی می کند:
-
تشخیص مبتنی بر امضا: به طور مکرر از هر یک از قوانین از پیش تعریف شده عبور می کند و شرایط قانون را برای ویژگی های ترافیک اعمال می کند. اگر یک قانون مطابقت داشته باشد، یک تهدید مبتنی بر امضا با اطمینان بالا ثبت می شود.
-
تشخیص مبتنی بر ناهنجاری: بردار ویژگی (اندازه بسته، نرخ بسته و نرخ بایت) را از طریق مدل Isolation Forest پردازش می کند تا امتیاز ناهنجاری را محاسبه کند. اگر امتیاز نشان دهنده رفتار غیرعادی باشد، موتور تشخیص آن را به عنوان یک ناهنجاری فعال می کند و یک امتیاز اطمینان متناسب با شدت ناهنجاری ایجاد می کند.
در نهایت، فهرست انبوهی از تهدیدات شناساییشده را با حاشیهنویسی مربوطه (اعم از امضا یا ناهنجاری)، قانون یا امتیازی که باعث ایجاد ناهنجاری شده است، و یک امتیاز اطمینان که نشان میدهد چقدر احتمال دارد که الگوی شناساییشده یک تهدید باشد، برمیگردانیم.
ساختن سیستم هشدار
حالا بیایید آخرین جزء IDS خود را بسازیم که سیستم هشدار است. خواهد شد process و تهدیدهای شناسایی شده را به روشی ساختاریافته ثبت کنید. شما همچنین می توانید سیستم را گسترش دهید تا مکانیسم های اعلان اضافی مانند بلیط های Slack، Jira و غیره را شامل شود. روی
import logging
import json
from datetime import datetime
class AlertSystem:
def __init__(self, log_file="ids_alerts.log"):
self.logger = logging.getLogger("IDS_Alerts")
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler(log_file)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def generate_alert(self, threat, packet_info):
alert = {
'timestamp': datetime.now().isoformat(),
'threat_type': threat['type'],
'source_ip': packet_info.get('source_ip'),
'destination_ip': packet_info.get('destination_ip'),
'confidence': threat.get('confidence', 0.0),
'details': threat
}
self.logger.warning(json.dumps(alert))
if threat['confidence'] > 0.8:
self.logger.critical(
f"High confidence threat detected: {json.dumps(alert)}"
)
# Implement additional notification methods here
# (e.g., email, Slack, SIEM integration)
این init
متد یک لاگر به نام تنظیم می کند IDS_Alerts
با یک INFO
سطح ورود به سیستم برای گرفتن اطلاعات هشدار. گزارشها را در یک فایل مشخص مینویسد، ids_alerts.log
به طور پیش فرض الف FileHandler
سیاهههای مربوط را به فایل هدایت می کند، در حالی که Formatter
اطمینان حاصل می کند که گزارش ها از یک قالب ثابت پیروی می کنند.
این generate_alert
متد مسئول ایجاد ورودی های هشدار ساختاریافته است. هر هشدار شامل اطلاعات کلیدی مانند مهر زمانی شناسایی، نوع تهدید، IP های مبدا و مقصد درگیر، سطح اطمینان شناسایی و جزئیات اضافی مربوط به تهدید است. این هشدارها به عنوان ثبت می شوند WARNING
پیام های سطح با فرمت JSON.
اگر سطح اطمینان یک تهدید شناسایی شده بالا باشد (بیشتر از 0.8)، هشدار افزایش یافته و به عنوان یک ثبت می شود. CRITICAL
پیام سطح توجه داشته باشید که این روش به گونهای طراحی شده است که قابل توسعه باشد و مکانیسمهای اعلان اضافی مانند ارسال هشدار از طریق ایمیل یا ادغام با سیستمهای شخص ثالث مانند راهحلهای Slack یا SIEM را امکانپذیر میکند.
قرار دادن آن همه با هم
اکنون بیایید همه اجزا را با هم در راه حل IDS کاملاً کاربردی خود ادغام کنیم:
class IntrusionDetectionSystem:
def __init__(self, interface="eth0"):
self.packet_capture = PacketCapture()
self.traffic_analyzer = TrafficAnalyzer()
self.detection_engine = DetectionEngine()
self.alert_system = AlertSystem()
self.interface = interface
def start(self):
print(f"Starting IDS روی interface {self.interface}")
self.packet_capture.start_capture(self.interface)
while True:
try:
packet = self.packet_capture.packet_queue.get(timeout=1)
features = self.traffic_analyzer.analyze_packet(packet)
if features:
threats = self.detection_engine.detect_threats(features)
for threat in threats:
packet_info = {
'source_ip': packet[IP].src,
'destination_ip': packet[IP].dst,
'source_port': packet[TCP].sport,
'destination_port': packet[TCP].dport
}
self.alert_system.generate_alert(threat, packet_info)
except queue.Empty:
continue
except KeyboardInterrupt:
print("Stopping IDS...")
self.packet_capture.stop()
break
if __name__ == "__main__":
ids = IntrusionDetectionSystem()
ids.start()
در این کد، IntrusionDetectionSystem
کلاس اجزای اصلی خود را تنظیم می کند: PacketCapture
برای گرفتن بسته ها از یک رابط شبکه، TrafficAnalyzer
برای استخراج و تجزیه و تحلیل ویژگی های بسته، DetectionEngine
برای شناسایی تهدیدها با استفاده از هر دو روش مبتنی بر امضا و مبتنی بر ناهنجاری، و AlertSystem
برای ثبت و تشدید تهدیدات شناسایی شده. پارامتر اینترفیس، رابط شبکه را برای نظارت مشخص میکند، بهطور پیشفرض eth0
(واسط اترنت که معمولاً نامگذاری می شود روی اکثر سیستم ها).
این start
تابع IDS را راه اندازی می کند. با شروع ضبط بسته شروع می شود روی رابط مشخص شده و به طور پیوسته وارد یک حلقه می شود process بسته های دریافتی برای هر بسته ضبط شده، سیستم ویژگی های خود را با استفاده از آن استخراج می کند TrafficAnalyzer
و آنها را برای تهدیدات احتمالی با استفاده از DetectionEngine
. اگر تهدیدی شناسایی شود، سیستم هشدارهای دقیقی را از طریق آن ایجاد می کند AlertSystem
.
سیستم در یک حلقه اجرا می شود تا زمانی که توسط یکی از دو استثنای کلیدی قطع شود: queue.Empty
، که در صورتی رخ می دهد که هیچ بسته ای برای پردازش در دسترس نباشد، و KeyboardInterrupt
، که با توقف ضبط بسته و خروج از حلقه، IDS را به خوبی متوقف می کند.
ایده هایی برای گسترش IDS
برای تقویت یا گسترش IDS، میتوانید ویژگیها/بهبودهای زیر را طراحی یا پیادهسازی کنید:
-
پیشرفت های یادگیری ماشین: میتوانید قابلیتهای IDS را با ترکیب مدلهای یادگیری عمیق مانند رمزگذارهای خودکار برای تشخیص ناهنجاری و استفاده از RNN برای تجزیه و تحلیل الگوی متوالی افزایش دهید. این توانایی سیستم را برای شناسایی تهدیدهای پیچیده و در حال تحول با استفاده از مهندسی ویژگی های پیشرفته بهبود می بخشد.
-
بهینه سازی عملکرد: میتوانید IDS را با استفاده از PyPy برای اجرای سریعتر، نمونهبرداری از بستهها برای مدیریت شبکههای پرترافیک و پردازش موازی برای مقیاسبندی کارآمد سیستم بهینه کنید.
-
قابلیت های یکپارچه سازی: میتوانید IDS را با در نظر گرفتن پشتیبانی از یک REST API برای نظارت از راه دور گسترش دهید، که تعامل یکپارچه با سیستمهای خارجی را ممکن میسازد.
ملاحظات امنیتی
هنگام استقرار IDS، توجه داشته باشید که این سیستم یک اثبات مفهوم است و برای موارد استفاده تولیدی در نظر گرفته نشده است. همچنین موارد زیر را در نظر داشته باشید:
-
سیستم را با مجوزهای مناسب اجرا کنید (root/admin برای گرفتن بسته مورد نیاز است)
-
سیاهههای مربوط به هشدار را ایمن کنید و چرخش گزارش مناسب را اجرا کنید
-
به طور منظم قوانین امضا را به روز کنید و مدل های تشخیص ناهنجاری را دوباره آموزش دهید
-
نظارت بر استفاده از منابع سیستم، به ویژه در محیط های پر ترافیک
-
کنترل های دسترسی مناسب را برای پیکربندی IDS و هشدارها اجرا کنید
تست IDS روی داده های ساختگی
برای تأیید عملکرد IDS خود، می توانید آن را با استفاده از داده های ساختگی که ترافیک شبکه دنیای واقعی را شبیه سازی می کند، آزمایش کنید. این به شما امکان می دهد تا مشاهده کنید که سیستم چگونه بسته ها را پردازش می کند، ترافیک را تجزیه و تحلیل می کند و هشدارها را بدون نیاز به محیط شبکه زنده ایجاد می کند.
برای تست IDS از تابع زیر استفاده کنید:
from scapy.all import IP, TCP
def test_ids():
# Create test packets to simulate various scenarios
test_packets = [
# Normal traffic
IP(src="192.168.1.1", dst="192.168.1.2") / TCP(sport=1234, dport=80, flags="A"),
IP(src="192.168.1.3", dst="192.168.1.4") / TCP(sport=1235, dport=443, flags="P"),
# SYN flood simulation
IP(src="10.0.0.1", dst="192.168.1.2") / TCP(sport=5678, dport=80, flags="S"),
IP(src="10.0.0.2", dst="192.168.1.2") / TCP(sport=5679, dport=80, flags="S"),
IP(src="10.0.0.3", dst="192.168.1.2") / TCP(sport=5680, dport=80, flags="S"),
# Port scan simulation
IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=22, flags="S"),
IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=23, flags="S"),
IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=25, flags="S"),
]
ids = IntrusionDetectionSystem()
# Simulate packet processing and threat detection
print("Starting IDS Test...")
for i, packet in enumerate(test_packets, 1):
print(f"\nProcessing packet {i}: {packet.summary()}")
# Analyze the packet
features = ids.traffic_analyzer.analyze_packet(packet)
if features:
# Detect threats based روی features
threats = ids.detection_engine.detect_threats(features)
if threats:
print(f"Detected threats: {threats}")
else:
print("No threats detected.")
else:
print("Packet does not contain IP/TCP layers or is ignored.")
print("\nIDS Test Completed.")
if __name__ == "__main__":
test_ids()
این سیستم را در برابر انواع حملات مانند سیل SYN و اسکن پورت آزمایش می کند.
بسته بندی
اکنون می دانید که چگونه با پایتون و چند کتابخانه منبع باز یک سیستم تشخیص نفوذ اولیه بسازید! این IDS برخی از مفاهیم اصلی امنیت شبکه و تشخیص تهدید در زمان واقعی را نشان می دهد.
به خاطر داشته باشید که این آموزش فقط برای اهداف آموزشی است. سیستمهای حرفهای در سطح سازمانی مانند Snort و Suricata طراحی شدهاند که میتوانند تهدیدات پیشرفته و استقرار در مقیاس بزرگ را مدیریت کنند.
امیدوارم در مورد اصول امنیت شبکه اطلاعاتی کسب کرده باشید و یاد گرفته باشید که چگونه می توان از پایتون برای ساخت راه حل های امنیتی عملی استفاده کرد.
منتشر شده در 1404-01-21 20:39:09