yvanai / pcap.py
nelsonmaligro's picture
Upload pcap.py
7ea8c58 verified
from scapy.all import rdpcap, TCP, UDP, DNS, DNSQR, DNSRR, ICMP, Raw, IP
from langchain.schema import Document
import re
from scapy.layers.http import HTTPRequest, HTTPResponse
from scapy.utils import PcapReader
from collections import defaultdict, deque
class pcapanalyze:
def __init__(self, path):
self.path = path
#def __enter__(self):
#return self.client
#def __exit__(self, exc_type, exc_val, exc_tb):
# Cleanup code, if necessary
#print('exit')
#del self.client
# Load and parse the PCAP file, focusing on TCP and UDP packets
def parse_pcap(self):
packets = rdpcap(self.path)
packets = reassemble_ip_fragments(packets)
parsed_data = []
streams = defaultdict(lambda: defaultdict(lambda: b""))
tcp_streams = defaultdict(lambda: defaultdict(list))
for pkt in packets:
protocol = None
src_ip, dst_ip, src_port, dst_port, data = None, None, None, None, None
# IP layer information
if pkt.haslayer(IP):
src_ip = pkt[IP].src
dst_ip = pkt[IP].dst
# TCP and UDP protocols
if pkt.haslayer(TCP) or pkt.haslayer(UDP):
src_port = pkt.sport
dst_port = pkt.dport
# HTTP Protocol
if pkt.haslayer(Raw):
try:
raw_data = extract_text_data(pkt[Raw])
#raw_data = pkt[Raw].load.decode("utf-8", errors="ignore")
if "HTTP" in raw_data:
protocol = "HTTP"
data = raw_data
except:
data = str(pkt[Raw].load)
# DNS Protocol
if pkt.haslayer(DNS):
protocol = "DNS"
dns_query = pkt[DNSQR].qname.decode("utf-8", errors="ignore") if pkt[DNS].qd else ""
# Handling DNS answer
if pkt[DNS].an:
dns_answer = []
for i in range(pkt[DNS].ancount):
rr = pkt[DNS].an[i]
# Handle different types of DNS responses
if isinstance(rr.rdata, bytes):
try:
dns_answer.append(rr.rdata.decode("utf-8", errors="ignore"))
except:
dns_answer.append(str(rr.rdata))
else:
dns_answer.append(str(rr.rdata))
dns_answer = ", ".join(dns_answer)
else:
dns_answer = ""
data = f"Query: {dns_query}, Answer: {dns_answer}"
# FTP Protocol
if pkt.haslayer(TCP):
if pkt[TCP].dport == 21 or pkt[TCP].sport == 21: # FTP typically uses port 21
protocol = "FTP"
try:
data = extract_text_data(pkt[Raw])
#data = pkt[Raw].load.decode("utf-8", errors="ignore")
except:
data = str(pkt[Raw].load)
# OCSP Protocol (typically on port 80 for HTTP-based OCSP)
if pkt.haslayer(TCP):
if pkt[TCP].dport == 80 or pkt[TCP].sport == 80:
if pkt.haslayer(Raw):
try:
#raw_data = pkt[Raw].load.decode("utf-8", errors="ignore")
raw_data = extract_text_data(pkt[Raw])
if "OCSP" in raw_data:
protocol = "OCSP"
data = raw_data
except:
data = str(pkt[Raw].load)
# Generic TCP/UDP Protocols
if protocol is None:
protocol = "TCP" if pkt.haslayer(TCP) else "UDP"
#data = str(pkt[Raw].load) if pkt.haslayer(Raw) else ""
data = extract_text_data(pkt[Raw]) if pkt.haslayer(Raw) else ""
# ICMP Protocol
if pkt.haslayer(ICMP):
protocol = "ICMP"
icmp_type = pkt[ICMP].type
icmp_code = pkt[ICMP].code
data = f"Type: {icmp_type}, Code: {icmp_code}"
# Append the parsed packet data to the list
if protocol:
parsed_data.append({
"src_ip": src_ip,
"dst_ip": dst_ip,
"protocol": protocol,
"src_port": src_port,
"dst_port": dst_port,
"data": remove_ansi_escape_sequences(data)
})
#reassemble http payload
tcp_streams = reassemble_tcp_streams(packets)
http_payloads = extract_http_payloads(tcp_streams)
return parsed_data, http_payloads
def prepare_documents(self, parsed_data, http_data):
documents = []
for packet in parsed_data:
content = (f"Source IP: {packet['src_ip']}, Destination IP: {packet['dst_ip']}, "
f"Protocol: {packet['protocol']}, Source Port: {packet['src_port']}, "
f"Destination Port: {packet['dst_port']}, Data: {packet['data']}")
documents.append(Document(page_content=content))
for stream_id, payload in http_data.items():
#print(stream_id)
content = (f"Source IP: {stream_id[0]}, Source Port: {stream_id[1]}, "
f"Protocol: http, Destnation IP: {stream_id[2]}, Destination Port: {stream_id[3]}, "
f"Data: {remove_ansi_escape_sequences(str(payload))}")
#print(content)
documents.append(Document(page_content=content))
return documents
def remove_ansi_escape_sequences(input_string):
# Define a regular expression pattern to match ANSI escape sequences
ansi_escape_pattern = r'\x1B(?:[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]'
# Use re.sub() to replace ANSI escape sequences with an empty string
cleaned_string = re.sub(ansi_escape_pattern, '', input_string)
cleaned_string = cleaned_string.encode("ascii", errors='ignore')
cleaned_string = cleaned_string.decode('utf-8', errors='ignore')
return cleaned_string
# Utility function to check if the payload is text or binary
def is_printable(s):
try:
s.decode('utf-8')
except (UnicodeDecodeError, AttributeError):
return False
return True
# Extract and decode text data from a Raw layer
def extract_text_data(raw_layer):
try:
data = raw_layer.load.decode("utf-8", errors="ignore")
return data if is_printable(data.encode("utf-8")) else ""
except:
return ""
# Function to reassemble TCP streams
def reassemble_tcp_streams(packets):
streams = defaultdict(lambda: defaultdict(lambda: b""))
tcp_streams = defaultdict(lambda: defaultdict(list))
# Collect data for TCP streams
for pkt in packets:
if TCP in pkt:
ip_src = pkt[IP].src
ip_dst = pkt[IP].dst
sport = pkt[TCP].sport
dport = pkt[TCP].dport
stream_id = (ip_src, sport, ip_dst, dport)
if pkt[TCP].payload:
tcp_streams[stream_id][pkt[TCP].seq].append(pkt[TCP].payload)
# Reassemble the streams based on sequence numbers
for stream_id, segments in tcp_streams.items():
full_data = b""
sorted_segments = sorted(segments.items())
for _, segs in sorted_segments:
for seg in segs:
full_data += bytes(seg)
streams[stream_id] = full_data
return streams
def safe_decode(data):
try:
return data.decode("utf-8", errors="ignore")
except UnicodeDecodeError:
return "<non-decodable binary data>"
# Function to extract and decode HTTP payloads
def extract_http_payloads(streams):
http_payloads = defaultdict(list)
for stream_id, data in streams.items():
try:
# Try to parse as HTTP request or response
if b"HTTP" in data:
if data.startswith(b"GET") or data.startswith(b"POST"):
req = HTTPRequest(data)
http_payloads[stream_id] = safe_decode(req.load)
#print(safe_decode(req.load))
elif b"HTTP/" in data:
res = HTTPResponse(data)
http_payloads[stream_id] = safe_decode(res.load)
#print(safe_decode(res.load))
except Exception as e:
print(f"Error parsing HTTP data: {e}")
return http_payloads
# Function to reassemble fragmented IP packets
def reassemble_ip_fragments(packets):
reassembled_packets = []
ip_fragments = defaultdict(list)
for pkt in packets:
if IP in pkt and pkt[IP].flags & 1: # Check for the 'MF' (More Fragments) flag
ip_id = (pkt[IP].src, pkt[IP].dst, pkt[IP].id)
ip_fragments[ip_id].append(pkt)
# If it's the last fragment (no 'MF' flag), reassemble
if pkt[IP].flags & 1 == 0:
full_packet = b''.join(bytes(fragment[IP].payload) for fragment in sorted(ip_fragments[ip_id], key=lambda x: x[IP].frag))
reassembled_packet = IP(bytes(pkt[IP])) # Reassemble into one packet
reassembled_packet[IP].remove_payload()
reassembled_packet[IP].add_payload(full_packet)
reassembled_packets.append(reassembled_packet)
del ip_fragments[ip_id] # Clear the stored fragments
else:
reassembled_packets.append(pkt)
return reassembled_packets