サイバーセキュリティプログラミング(パケット盗聴)
前回は基本的なTCP通信をPythonで実装しました.今回はrawソケットを使ってパケット盗聴をしてみましょう.
単純なパケット盗聴 (Windows & Linux 対応)
import socket import os # ターゲットホスト host = "192.168.3.7" # パケットを盗聴するために必要なパラメータを指定 if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP # スニッファーを作成 sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) # 接続待機 sniffer.bind((host, 0)) # キャプチャー結果にIPヘッダを含めるように指定する sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) # Windowsの場合はプロミスキャスモード(全てのパケットを取得する)を有効化する if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) print(sniffer.recvfrom(65565)) # Windowsの場合はプロミスキャスモード(全てのパケットを取得する)を元の無効化に戻す if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
IP層をデコードする
取得したパケットのIP層をデコードしてみましょう.
import socket import os import struct from ctypes import * host = "192.168.3.7" class IP(Structure): _fields_ = [ ("ihl", c_uint8, 4), ("version", c_uint8, 4), ("tos", c_uint8), ("len", c_uint16), ("id", c_uint16), ("offset", c_uint16), ("ttl", c_uint8), ("protocol_num",c_uint8), ("sum", c_uint16), ("src", c_uint32), ("dst", c_uint32) ] def __new__(self, socket_buffer=None): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer=None): self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"} self.src_address = socket.inet_ntoa(struct.pack("<L", self.src)) self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst)) try: self.protocol = self.protocol_map[self.protocol_num] except: self.protocol = str(self.protocol_num) # パケットを盗聴するために必要なパラメータを指定 if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP # スニッファーを作成 sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) # 接続を待機 sniffer.bind((host, 0)) # キャプチャー結果にIPヘッダーを含めるように指定する sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) try: while True: raw_buffer = sniffer.recvfrom(4096)[0] # バッファーの最初の20バイトを取り出して,IP構造体を作成する ip_header = IP(raw_buffer[0:20]) print("Protocol: {0} {1} -> {2}".format(ip_header.protocol, ip_header.src_address, ip_header.dst_address)) except: if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
ICMPのデコード
IPパケット内のICMPをデコードしてみましょう.
import socket import os import struct from ctypes import * host = "192.168.3.7" class IP(Structure): _fields_ = [ ("ihl", c_uint8, 4), ("version", c_uint8, 4), ("tos", c_uint8), ("len", c_uint16), ("id", c_uint16), ("offset", c_uint16), ("ttl", c_uint8), ("protocol_num", c_uint8), ("sum", c_uint16), ("src", c_uint32), ("dst", c_uint32) ] def __new__(self, socket_buffer=None): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer=None): self.protocol_map = {1: "ICMP", 6: "TCP", 17: "UDP"} self.src_address = socket.inet_ntoa(struct.pack("<L", self.src)) self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst)) try: self.protocol = self.protocol_map[self.protocol_num] except: self.protocol = str(self.protocol_num) class ICMP(Structure): _fields_ = [ ("type", c_uint8), ("code", c_uint8), ("checksum", c_uint16), ("unused", c_uint16), ("next_hop_mtu",c_uint16) ] def __new__(self, socket_buffer): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer): pass if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host, 0)) sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) try: while True: raw_buffer = sniffer.recvfrom(4096)[0] ip_header = IP(raw_buffer[0:20]) print("Protocol: {0} {1} -> {2}".format(ip_header.protocol, ip_header.src_address, ip_header.dst_address)) if ip_header.protocol == "ICMP": offset = ip_header.ihl * 4 buf = raw_buffer[offset:(offset + sizeof(ICMP))] icmp_header = ICMP(buf) print("ICMP -> Type: {0} Code: {1}".format(icmp_header.type, icmp_header.code)) except: if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
ネットワーク内のホストをスキャンする
最後に,ネットワーク内で稼働しているホストをスキャンしてみましょう.
# UDPデータグラムを標的ネットワークに送信し,それを元に # どのホストが稼働しているかを調査する import socket import os import struct from ctypes import * import threading import time from netaddr import IPNetwork,IPAddress # リッスンするホストのIPアドレス host = "192.168.3.7" # 標的のサブネット subnet = "192.168.3.0/24" # ICMPレスポンスのチェック用マジック文字列 magic_message = "PYTHONRULES!" # UDPデータグラムをサブネット全体に送信 def udp_sender(subnet,magic_message): time.sleep(5) sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for ip in IPNetwork(subnet): try: sender.sendto(magic_message.encode(),("%s" % ip,65212)) except: pass # IPヘッダー class IP(Structure): _fields_ = [ ("ihl", c_uint8, 4), ("version", c_uint8, 4), ("tos", c_uint8), ("len", c_uint16), ("id", c_uint16), ("offset", c_uint16), ("ttl", c_uint8), ("protocol_num", c_uint8), ("sum", c_uint16), ("src", c_uint32), ("dst", c_uint32) ] def __new__(self, socket_buffer=None): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer=None): # プロトコルの定数値を名称にマッピング self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"} # 可読なIPアドレスの値に変換 self.src_address = socket.inet_ntoa(struct.pack("<L",self.src)) self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst)) # 可読なプロトコル名称に変換 try: self.protocol = self.protocol_map[self.protocol_num] except: self.protocol = str(self.protocol_num) class ICMP(Structure): _fields_ = [ ("type", c_uint8), ("code", c_uint8), ("checksum", c_uint16), ("unused", c_uint16), ("next_hop_mtu", c_uint16) ] def __new__(self, socket_buffer): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer): pass # 前の例と同様の処理 if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host, 0)) sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) # パケットの送信開始 t = threading.Thread(target=udp_sender,args=(subnet,magic_message)) t.start() try: while True: # パケットの読み込み raw_buffer = sniffer.recvfrom(65565)[0] # バッファーの最初の20バイトからIP構造体を作成 ip_header = IP(raw_buffer[0:20]) # 検出されたプロトコルとホストを出力 #print "Protocol: %s %s -> %s" % (ip_header.protocol, ip_header.src_address, ip_header.dst_address) # ICMPであればそれを処理 if ip_header.protocol == "ICMP": # ICMPパケットの位置を計算 offset = ip_header.ihl * 4 buf = raw_buffer[offset:offset + sizeof(ICMP)] # ICMP構造体を作成 icmp_header = ICMP(buf) #print "ICMP -> Type: %d Code: %d" % (icmp_header.type, icmp_header.code) # コードとタイプが3であるかチェック if icmp_header.code == 3 and icmp_header.type == 3: # 標的サブネットのホストかを確認 if IPAddress(ip_header.src_address) in IPNetwork(subnet): # マジック文字列を含むか確認 if raw_buffer[len(raw_buffer)-len(magic_message):] == magic_message.encode(): print("Host Up: %s" % ip_header.src_address) # Ctrl-Cを処理 except KeyboardInterrupt: # Windowsの場合はプロミスキャスモードを無効化 if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)