سیستم تشخیص نفوذ (IDS) مانند یک دوربین امنیتی برای شبکه شما است. همانطور که دوربین‌های امنیتی به شناسایی فعالیت‌های مشکوک در دنیای فیزیکی کمک می‌کنند، یک IDS نیز شبکه شما را کنترل می‌کند تا به شناسایی هرگونه حمله سایبری بالقوه و نقض امنیت کمک کند.

در پایان این آموزش، روش عملکرد یک IDS را می‌دانید و می‌توانید با استفاده از پایتون، سیستم نظارت بر شبکه خود را در زمان واقعی بسازید.

فهرست مطالب

  • آشنایی با انواع IDS

  • چگونه محیط توسعه خود را راه اندازی کنید

  • ساخت مؤلفه های Core IDS

    • ساخت موتور ضبط بسته ها

    • ساخت ماژول تحلیل ترافیک

    • ساخت موتور تشخیص

    • ساختن سیستم هشدار

    • قرار دادن آن همه با هم

  • ایده هایی برای گسترش IDS

  • ملاحظات امنیتی

  • تست IDS روی داده های ساختگی

  • بسته بندی

آشنایی با انواع IDS

قبل از اینکه وارد بخش کدنویسی شویم، بیایید انواع IDS را درک کنیم:

  1. IDS مبتنی بر شبکه (NIDS): این سیستم ترافیک شبکه را برای فعالیت مشکوک رصد می کند.

  2. IDS مبتنی بر میزبان (HIDS): این سیستم لاگ های سیستم و تغییرات فایل ها را رصد می کند روی میزبان فردی است و مستقیماً در شبکه مستقر نیست.

  3. IDS مبتنی بر امضا: این سیستم یا در شبکه است یا روی را host و الگوهای حمله را بر اساس شناسایی می کند روی الگوهای شناخته شده

  4. IDS مبتنی بر ناهنجاری: این سیستم رفتار غیرعادی را با استفاده از الگوریتم های اکتشافی و پیش بینی که آموزش دیده اند شناسایی می کند. روی الگوهای حمله قبلا دیده شده است.

برای این آموزش، شما یک سیستم ترکیبی می سازید که سیستم های تشخیص مبتنی بر امضا و مبتنی بر ناهنجاری را برای نظارت بر ترافیک شبکه ترکیب می کند.

چگونه محیط توسعه خود را راه اندازی کنید

بیایید با تنظیم محیط پایتون (من از پایتون 3 استفاده می کنم) و نصب پیش نیازهای زیر شروع کنیم:

pip install scapy
pip install python-nmap
pip install numpy
pip install sklearn

ساخت مؤلفه های Core IDS

IDS ما از چهار جزء اصلی تشکیل شده است:

  1. یک سیستم ضبط بسته

  2. ماژول آنالیز ترافیک

  3. یک موتور تشخیص

  4. یک سیستم هشدار

ساخت موتور ضبط بسته ها

بیایید با موتور ضبط بسته شروع کنیم. برای این کار از Scapy استفاده می کنیم. Scapy یک کتابخانه شبکه ای است که به ما اجازه می دهد تا عملیات مربوط به شبکه و شبکه را با استفاده از پایتون انجام دهیم.

ابتدا ما خودمان را تعریف می کنیم PacketCapture کلاسی که به عنوان پایه IDS ما عمل خواهد کرد.

from scapy.all import sniff, IP, TCP
from collections import defaultdict
import threading
import queue

class PacketCapture:
    def __init__(self):
        self.packet_queue = queue.Queue()
        self.stop_capture = threading.Event()

    def packet_callback(self, packet):
        if IP in packet and TCP in packet:
            self.packet_queue.put(packet)

    def start_capture(self, interface="eth0"):
        def capture_thread():
            sniff(iface=interface,
                  prn=self.packet_callback,
                  store=0,
                  stop_filter=lambda _: self.stop_capture.is_set())

        self.capture_thread = threading.Thread(target=capture_thread)
        self.capture_thread.start()

    def stop(self):
        self.stop_capture.set()
        self.capture_thread.join()

بیایید به سرعت کد را مرور کنیم و بفهمیم که این توابع چه کاری انجام می دهند. برای این کار، از threading و صف‌ها برای کارآمدی استفاده می‌کنید process و بسته های شبکه را ضبط کنید.

این init متد کلاس را با ایجاد a مقداردهی اولیه می کند queue.Queue برای ذخیره بسته‌های ضبط‌شده و یک رویداد رشته‌ای برای کنترل زمان توقف گرفتن بسته. این packet_callback متد به عنوان یک کنترل کننده برای هر بسته ضبط شده عمل می کند و بررسی می کند که آیا بسته دارای هر دو لایه IP و TCP است. اگر چنین است، آن را برای پردازش بیشتر به صف اضافه می کند.

این start_capture روش شروع به گرفتن بسته ها می کند روی یک رابط مشخص (پیش‌فرض به eth0 برای گرفتن بسته ها از رابط اترنت). اجرا کنید ifconfig برای درک رابط های موجود و انتخاب رابط مناسب از لیست.

این تابع یک رشته مجزا برای اجرای تابع sniff Scapy ایجاد می کند که به طور مداوم رابط را برای بسته ها نظارت می کند. این stop_filter پارامتر تضمین می کند که ضبط زمانی که stop_capture رویداد تحریک می شود.

این stop متد با تنظیم کردن، ضبط را متوقف می کند stop_capture رویداد و منتظر می ماند تا thread اجرا شود و اطمینان حاصل شود process تمیز خاتمه می یابد این طراحی امکان گرفتن بدون درز بسته های بلادرنگ را بدون مسدود کردن رشته اصلی فراهم می کند.

ساخت ماژول تحلیل ترافیک

حالا بیایید ماژول تحلیل ترافیک را بنویسیم. این ماژول خواهد شد process بسته های ضبط شده و استخراج ویژگی های مربوطه.

class TrafficAnalyzer:
    def __init__(self):
        self.connections = defaultdict(list)
        self.flow_stats = defaultdict(lambda: {
            'packet_count': 0,
            'byte_count': 0,
            'start_time': None,
            'last_time': None
        })

    def analyze_packet(self, packet):
        if IP in packet and TCP in packet:
            ip_src = packet[IP].src
            ip_dst = packet[IP].dst
            port_src = packet[TCP].sport
            port_dst = packet[TCP].dport

            flow_key = (ip_src, ip_dst, port_src, port_dst)

            # Update flow statistics
            stats = self.flow_stats[flow_key]
            stats['packet_count'] += 1
            stats['byte_count'] += len(packet)
            current_time = packet.time

            if not stats['start_time']:
                stats['start_time'] = current_time
            stats['last_time'] = current_time

            return self.extract_features(packet, stats)

    def extract_features(self, packet, stats):
        return {
            'packet_size': len(packet),
            'flow_duration': stats['last_time'] - stats['start_time'],
            'packet_rate': stats['packet_count'] / (stats['last_time'] - stats['start_time']),
            'byte_rate': stats['byte_count'] / (stats['last_time'] - stats['start_time']),
            'tcp_flags': packet[TCP].flags,
            'window_size': packet[TCP].window
        }

در این قسمت کد را تعریف می کنیم TrafficAnalyzer کلاس برای تجزیه و تحلیل ترافیک شبکه در اینجا ما جریان های اتصال را دنبال می کنیم و آمار بسته ها را در زمان واقعی محاسبه می کنیم. ما استفاده می کنیم defaultdict ساختار داده در پایتون برای مدیریت اتصالات و آمار جریان با سازماندهی داده ها بر اساس جریان های منحصر به فرد.

پیشنهاد می‌کنیم بخوانید:  پیشوند 'b' در زبان پایتون String Literals

این __init__ متد دو ویژگی را مقداردهی اولیه می کند: connections، که لیستی از بسته های مرتبط را برای هر جریان ذخیره می کند و flow_stats، که آمارهای جمع آوری شده را برای هر جریان ذخیره می کند، مانند تعداد بسته ها، تعداد بایت ها، زمان شروع و زمان آخرین بسته.

این analyze_packet روش هر بسته را پردازش می کند. اگر بسته حاوی لایه‌های IP و TCP باشد، آی‌پی‌ها و پورت‌های مبدا و مقصد را استخراج می‌کند و یک یک منحصربه‌فرد را تشکیل می‌دهد. flow_key برای شناسایی جریان با افزایش تعداد بسته ها، اضافه کردن اندازه بسته به تعداد بایت ها و تنظیم یا به روز رسانی زمان شروع و آخرین جریان، آمار جریان را به روز می کند. در نهایت تماس می گیرد extract_features برای محاسبه و برگرداندن معیارهای اضافی.

این extract_features این روش مشخصات دقیق جریان و بسته جاری را محاسبه می کند. اینها شامل اندازه بسته، مدت زمان جریان، نرخ بسته، نرخ بایت، پرچم های TCP و اندازه پنجره TCP است. این معیارها برای شناسایی الگوها، ناهنجاری ها یا تهدیدهای بالقوه در ترافیک شبکه کاملاً مفید هستند.

ساخت موتور تشخیص

اکنون موتور تشخیص خود را تعریف می‌کنیم که هم مکانیسم‌های تشخیص مبتنی بر ناهنجاری و هم امضا را اجرا می‌کند:

from sklearn.ensemble import IsolationForest
import numpy as np

class DetectionEngine:
    def __init__(self):
        self.anomaly_detector = IsolationForest(
            contamination=0.1,
            random_state=42
        )
        self.signature_rules = self.load_signature_rules()
        self.training_data = []

    def load_signature_rules(self):
        return {
            'syn_flood': {
                'condition': lambda features: (
                    features['tcp_flags'] == 2 and  # SYN flag
                    features['packet_rate'] > 100
                )
            },
            'port_scan': {
                'condition': lambda features: (
                    features['packet_size'] < 100 and
                    features['packet_rate'] > 50
                )
            }
        }

    def train_anomaly_detector(self, normal_traffic_data):
        self.anomaly_detector.fit(normal_traffic_data)

    def detect_threats(self, features):
        threats = []

        # Signature-based detection
        for rule_name, rule in self.signature_rules.items():
            if rule['condition'](features):
                threats.append({
                    'type': 'signature',
                    'rule': rule_name,
                    'confidence': 1.0
                })

        # Anomaly-based detection
        feature_vector = np.array([[
            features['packet_size'],
            features['packet_rate'],
            features['byte_rate']
        ]])

        anomaly_score = self.anomaly_detector.score_samples(feature_vector)[0]
        if anomaly_score < -0.5:  # Threshold for anomaly detection
            threats.append({
                'type': 'anomaly',
                'score': anomaly_score,
                'confidence': min(1.0, abs(anomaly_score))
            })

        return threats

این کد یک سیستم ترکیبی را تعریف می کند که روش های تشخیص مبتنی بر امضا و مبتنی بر ناهنجاری را ترکیب می کند. ما از مدل Isolation Forest برای تشخیص ناهنجاری ها و همچنین از قوانین از پیش تعریف شده برای شناسایی الگوهای حمله خاص استفاده می کنیم. اگر می خواهید در مورد روش عملکرد مدل جنگل ایزوله بیشتر بدانید، این مقاله را بررسی کنید.

در این قطعه کد، train_anomaly_detector روش، مدل جنگل ایزوله را با استفاده از مجموعه داده ای از ویژگی های ترافیک عادی آموزش می دهد. این مدل را قادر می سازد تا الگوهای ترافیک معمولی را از ناهنجاری ها متمایز کند.

این detect_threats این روش ویژگی های ترافیک شبکه را برای تهدیدات احتمالی با استفاده از دو رویکرد ارزیابی می کند:

  1. تشخیص مبتنی بر امضا: به طور مکرر از هر یک از قوانین از پیش تعریف شده عبور می کند و شرایط قانون را برای ویژگی های ترافیک اعمال می کند. اگر یک قانون مطابقت داشته باشد، یک تهدید مبتنی بر امضا با اطمینان بالا ثبت می شود.

  2. تشخیص مبتنی بر ناهنجاری: بردار ویژگی (اندازه بسته، نرخ بسته و نرخ بایت) را از طریق مدل Isolation Forest پردازش می کند تا امتیاز ناهنجاری را محاسبه کند. اگر امتیاز نشان دهنده رفتار غیرعادی باشد، موتور تشخیص آن را به عنوان یک ناهنجاری فعال می کند و یک امتیاز اطمینان متناسب با شدت ناهنجاری ایجاد می کند.

در نهایت، فهرست انبوهی از تهدیدات شناسایی‌شده را با حاشیه‌نویسی مربوطه (اعم از امضا یا ناهنجاری)، قانون یا امتیازی که باعث ایجاد ناهنجاری شده است، و یک امتیاز اطمینان که نشان می‌دهد چقدر احتمال دارد که الگوی شناسایی‌شده یک تهدید باشد، برمی‌گردانیم.

ساختن سیستم هشدار

حالا بیایید آخرین جزء IDS خود را بسازیم که سیستم هشدار است. خواهد شد process و تهدیدهای شناسایی شده را به روشی ساختاریافته ثبت کنید. شما همچنین می توانید سیستم را گسترش دهید تا مکانیسم های اعلان اضافی مانند بلیط های Slack، Jira و غیره را شامل شود. روی

import logging
import json
from datetime import datetime

class AlertSystem:
    def __init__(self, log_file="ids_alerts.log"):
        self.logger = logging.getLogger("IDS_Alerts")
        self.logger.setLevel(logging.INFO)

        handler = logging.FileHandler(log_file)
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)

    def generate_alert(self, threat, packet_info):
        alert = {
            'timestamp': datetime.now().isoformat(),
            'threat_type': threat['type'],
            'source_ip': packet_info.get('source_ip'),
            'destination_ip': packet_info.get('destination_ip'),
            'confidence': threat.get('confidence', 0.0),
            'details': threat
        }

        self.logger.warning(json.dumps(alert))

        if threat['confidence'] > 0.8:
            self.logger.critical(
                f"High confidence threat detected: {json.dumps(alert)}"
            )
            # Implement additional notification methods here
            # (e.g., email, Slack, SIEM integration)

این init متد یک لاگر به نام تنظیم می کند IDS_Alerts با یک INFO سطح ورود به سیستم برای گرفتن اطلاعات هشدار. گزارش‌ها را در یک فایل مشخص می‌نویسد، ids_alerts.log به طور پیش فرض الف FileHandler سیاهههای مربوط را به فایل هدایت می کند، در حالی که Formatter اطمینان حاصل می کند که گزارش ها از یک قالب ثابت پیروی می کنند.

این generate_alert متد مسئول ایجاد ورودی های هشدار ساختاریافته است. هر هشدار شامل اطلاعات کلیدی مانند مهر زمانی شناسایی، نوع تهدید، IP های مبدا و مقصد درگیر، سطح اطمینان شناسایی و جزئیات اضافی مربوط به تهدید است. این هشدارها به عنوان ثبت می شوند WARNING پیام های سطح با فرمت JSON.

اگر سطح اطمینان یک تهدید شناسایی شده بالا باشد (بیشتر از 0.8)، هشدار افزایش یافته و به عنوان یک ثبت می شود. CRITICAL پیام سطح توجه داشته باشید که این روش به گونه‌ای طراحی شده است که قابل توسعه باشد و مکانیسم‌های اعلان اضافی مانند ارسال هشدار از طریق ایمیل یا ادغام با سیستم‌های شخص ثالث مانند راه‌حل‌های Slack یا SIEM را امکان‌پذیر می‌کند.

پیشنهاد می‌کنیم بخوانید:  راهنمای React ComponentA احتمالاً مهمترین مفهومی است که در React باید درک کنید. این یکی از بلوک‌های اصلی React است که به ما امکان می‌دهد یک UI را به قطعات مستقل و قابل استفاده مجدد تقسیم کنیم و کار ساخت UI را بسیار آسان‌تر می‌کند. سپس همه این اجزای مستقل با هم ترکیب می شوند ...

قرار دادن آن همه با هم

اکنون بیایید همه اجزا را با هم در راه حل IDS کاملاً کاربردی خود ادغام کنیم:

class IntrusionDetectionSystem:
    def __init__(self, interface="eth0"):
        self.packet_capture = PacketCapture()
        self.traffic_analyzer = TrafficAnalyzer()
        self.detection_engine = DetectionEngine()
        self.alert_system = AlertSystem()

        self.interface = interface

    def start(self):
        print(f"Starting IDS روی interface {self.interface}")
        self.packet_capture.start_capture(self.interface)

        while True:
            try:
                packet = self.packet_capture.packet_queue.get(timeout=1)
                features = self.traffic_analyzer.analyze_packet(packet)

                if features:
                    threats = self.detection_engine.detect_threats(features)

                    for threat in threats:
                        packet_info = {
                            'source_ip': packet[IP].src,
                            'destination_ip': packet[IP].dst,
                            'source_port': packet[TCP].sport,
                            'destination_port': packet[TCP].dport
                        }
                        self.alert_system.generate_alert(threat, packet_info)

            except queue.Empty:
                continue
            except KeyboardInterrupt:
                print("Stopping IDS...")
                self.packet_capture.stop()
                break

if __name__ == "__main__":
    ids = IntrusionDetectionSystem()
    ids.start()

در این کد، IntrusionDetectionSystem کلاس اجزای اصلی خود را تنظیم می کند: PacketCapture برای گرفتن بسته ها از یک رابط شبکه، TrafficAnalyzer برای استخراج و تجزیه و تحلیل ویژگی های بسته، DetectionEngine برای شناسایی تهدیدها با استفاده از هر دو روش مبتنی بر امضا و مبتنی بر ناهنجاری، و AlertSystem برای ثبت و تشدید تهدیدات شناسایی شده. پارامتر اینترفیس، رابط شبکه را برای نظارت مشخص می‌کند، به‌طور پیش‌فرض eth0 (واسط اترنت که معمولاً نامگذاری می شود روی اکثر سیستم ها).

این start تابع IDS را راه اندازی می کند. با شروع ضبط بسته شروع می شود روی رابط مشخص شده و به طور پیوسته وارد یک حلقه می شود process بسته های دریافتی برای هر بسته ضبط شده، سیستم ویژگی های خود را با استفاده از آن استخراج می کند TrafficAnalyzer و آنها را برای تهدیدات احتمالی با استفاده از DetectionEngine. اگر تهدیدی شناسایی شود، سیستم هشدارهای دقیقی را از طریق آن ایجاد می کند AlertSystem.

سیستم در یک حلقه اجرا می شود تا زمانی که توسط یکی از دو استثنای کلیدی قطع شود: queue.Empty، که در صورتی رخ می دهد که هیچ بسته ای برای پردازش در دسترس نباشد، و KeyboardInterrupt، که با توقف ضبط بسته و خروج از حلقه، IDS را به خوبی متوقف می کند.

ایده هایی برای گسترش IDS

برای تقویت یا گسترش IDS، می‌توانید ویژگی‌ها/بهبودهای زیر را طراحی یا پیاده‌سازی کنید:

  1. پیشرفت های یادگیری ماشین: می‌توانید قابلیت‌های IDS را با ترکیب مدل‌های یادگیری عمیق مانند رمزگذارهای خودکار برای تشخیص ناهنجاری و استفاده از RNN برای تجزیه و تحلیل الگوی متوالی افزایش دهید. این توانایی سیستم را برای شناسایی تهدیدهای پیچیده و در حال تحول با استفاده از مهندسی ویژگی های پیشرفته بهبود می بخشد.

  2. بهینه سازی عملکرد: می‌توانید IDS را با استفاده از PyPy برای اجرای سریع‌تر، نمونه‌برداری از بسته‌ها برای مدیریت شبکه‌های پرترافیک و پردازش موازی برای مقیاس‌بندی کارآمد سیستم بهینه کنید.

  3. قابلیت های یکپارچه سازی: می‌توانید IDS را با در نظر گرفتن پشتیبانی از یک REST API برای نظارت از راه دور گسترش دهید، که تعامل یکپارچه با سیستم‌های خارجی را ممکن می‌سازد.

ملاحظات امنیتی

هنگام استقرار IDS، توجه داشته باشید که این سیستم یک اثبات مفهوم است و برای موارد استفاده تولیدی در نظر گرفته نشده است. همچنین موارد زیر را در نظر داشته باشید:

  • سیستم را با مجوزهای مناسب اجرا کنید (root/admin برای گرفتن بسته مورد نیاز است)

  • سیاهههای مربوط به هشدار را ایمن کنید و چرخش گزارش مناسب را اجرا کنید

  • به طور منظم قوانین امضا را به روز کنید و مدل های تشخیص ناهنجاری را دوباره آموزش دهید

  • نظارت بر استفاده از منابع سیستم، به ویژه در محیط های پر ترافیک

  • کنترل های دسترسی مناسب را برای پیکربندی IDS و هشدارها اجرا کنید

تست IDS روی داده های ساختگی

برای تأیید عملکرد IDS خود، می توانید آن را با استفاده از داده های ساختگی که ترافیک شبکه دنیای واقعی را شبیه سازی می کند، آزمایش کنید. این به شما امکان می دهد تا مشاهده کنید که سیستم چگونه بسته ها را پردازش می کند، ترافیک را تجزیه و تحلیل می کند و هشدارها را بدون نیاز به محیط شبکه زنده ایجاد می کند.

برای تست IDS از تابع زیر استفاده کنید:

from scapy.all import IP, TCP

def test_ids():
    # Create test packets to simulate various scenarios
    test_packets = [
        # Normal traffic
        IP(src="192.168.1.1", dst="192.168.1.2") / TCP(sport=1234, dport=80, flags="A"),
        IP(src="192.168.1.3", dst="192.168.1.4") / TCP(sport=1235, dport=443, flags="P"),

        # SYN flood simulation
        IP(src="10.0.0.1", dst="192.168.1.2") / TCP(sport=5678, dport=80, flags="S"),
        IP(src="10.0.0.2", dst="192.168.1.2") / TCP(sport=5679, dport=80, flags="S"),
        IP(src="10.0.0.3", dst="192.168.1.2") / TCP(sport=5680, dport=80, flags="S"),

        # Port scan simulation
        IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=22, flags="S"),
        IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=23, flags="S"),
        IP(src="192.168.1.100", dst="192.168.1.2") / TCP(sport=4321, dport=25, flags="S"),
    ]

    ids = IntrusionDetectionSystem()

    # Simulate packet processing and threat detection
    print("Starting IDS Test...")
    for i, packet in enumerate(test_packets, 1):
        print(f"\nProcessing packet {i}: {packet.summary()}")

        # Analyze the packet
        features = ids.traffic_analyzer.analyze_packet(packet)

        if features:
            # Detect threats based روی features
            threats = ids.detection_engine.detect_threats(features)

            if threats:
                print(f"Detected threats: {threats}")
            else:
                print("No threats detected.")
        else:
            print("Packet does not contain IP/TCP layers or is ignored.")

    print("\nIDS Test Completed.")

if __name__ == "__main__":
    test_ids()

این سیستم را در برابر انواع حملات مانند سیل SYN و اسکن پورت آزمایش می کند.

بسته بندی

اکنون می دانید که چگونه با پایتون و چند کتابخانه منبع باز یک سیستم تشخیص نفوذ اولیه بسازید! این IDS برخی از مفاهیم اصلی امنیت شبکه و تشخیص تهدید در زمان واقعی را نشان می دهد.

به خاطر داشته باشید که این آموزش فقط برای اهداف آموزشی است. سیستم‌های حرفه‌ای در سطح سازمانی مانند Snort و Suricata طراحی شده‌اند که می‌توانند تهدیدات پیشرفته و استقرار در مقیاس بزرگ را مدیریت کنند.

امیدوارم در مورد اصول امنیت شبکه اطلاعاتی کسب کرده باشید و یاد گرفته باشید که چگونه می توان از پایتون برای ساخت راه حل های امنیتی عملی استفاده کرد.