Spaces:
Sleeping
Sleeping
from scapy.all import rdpcap, TCP, UDP, DNS, DNSQR, DNSRR, ICMP, Raw, IP | |
from langchain.schema import Document | |
import re | |
from scapy.layers.http import HTTPRequest, HTTPResponse | |
from scapy.utils import PcapReader | |
from collections import defaultdict, deque | |
class pcapanalyze: | |
def __init__(self, path): | |
self.path = path | |
#def __enter__(self): | |
#return self.client | |
#def __exit__(self, exc_type, exc_val, exc_tb): | |
# Cleanup code, if necessary | |
#print('exit') | |
#del self.client | |
# Load and parse the PCAP file, focusing on TCP and UDP packets | |
def parse_pcap(self): | |
packets = rdpcap(self.path) | |
packets = reassemble_ip_fragments(packets) | |
parsed_data = [] | |
streams = defaultdict(lambda: defaultdict(lambda: b"")) | |
tcp_streams = defaultdict(lambda: defaultdict(list)) | |
for pkt in packets: | |
protocol = None | |
src_ip, dst_ip, src_port, dst_port, data = None, None, None, None, None | |
# IP layer information | |
if pkt.haslayer(IP): | |
src_ip = pkt[IP].src | |
dst_ip = pkt[IP].dst | |
# TCP and UDP protocols | |
if pkt.haslayer(TCP) or pkt.haslayer(UDP): | |
src_port = pkt.sport | |
dst_port = pkt.dport | |
# HTTP Protocol | |
if pkt.haslayer(Raw): | |
try: | |
raw_data = extract_text_data(pkt[Raw]) | |
#raw_data = pkt[Raw].load.decode("utf-8", errors="ignore") | |
if "HTTP" in raw_data: | |
protocol = "HTTP" | |
data = raw_data | |
except: | |
data = str(pkt[Raw].load) | |
# DNS Protocol | |
if pkt.haslayer(DNS): | |
protocol = "DNS" | |
dns_query = pkt[DNSQR].qname.decode("utf-8", errors="ignore") if pkt[DNS].qd else "" | |
# Handling DNS answer | |
if pkt[DNS].an: | |
dns_answer = [] | |
for i in range(pkt[DNS].ancount): | |
rr = pkt[DNS].an[i] | |
# Handle different types of DNS responses | |
if isinstance(rr.rdata, bytes): | |
try: | |
dns_answer.append(rr.rdata.decode("utf-8", errors="ignore")) | |
except: | |
dns_answer.append(str(rr.rdata)) | |
else: | |
dns_answer.append(str(rr.rdata)) | |
dns_answer = ", ".join(dns_answer) | |
else: | |
dns_answer = "" | |
data = f"Query: {dns_query}, Answer: {dns_answer}" | |
# FTP Protocol | |
if pkt.haslayer(TCP): | |
if pkt[TCP].dport == 21 or pkt[TCP].sport == 21: # FTP typically uses port 21 | |
protocol = "FTP" | |
try: | |
data = extract_text_data(pkt[Raw]) | |
#data = pkt[Raw].load.decode("utf-8", errors="ignore") | |
except: | |
data = str(pkt[Raw].load) | |
# OCSP Protocol (typically on port 80 for HTTP-based OCSP) | |
if pkt.haslayer(TCP): | |
if pkt[TCP].dport == 80 or pkt[TCP].sport == 80: | |
if pkt.haslayer(Raw): | |
try: | |
#raw_data = pkt[Raw].load.decode("utf-8", errors="ignore") | |
raw_data = extract_text_data(pkt[Raw]) | |
if "OCSP" in raw_data: | |
protocol = "OCSP" | |
data = raw_data | |
except: | |
data = str(pkt[Raw].load) | |
# Generic TCP/UDP Protocols | |
if protocol is None: | |
protocol = "TCP" if pkt.haslayer(TCP) else "UDP" | |
#data = str(pkt[Raw].load) if pkt.haslayer(Raw) else "" | |
data = extract_text_data(pkt[Raw]) if pkt.haslayer(Raw) else "" | |
# ICMP Protocol | |
if pkt.haslayer(ICMP): | |
protocol = "ICMP" | |
icmp_type = pkt[ICMP].type | |
icmp_code = pkt[ICMP].code | |
data = f"Type: {icmp_type}, Code: {icmp_code}" | |
# Append the parsed packet data to the list | |
if protocol: | |
parsed_data.append({ | |
"src_ip": src_ip, | |
"dst_ip": dst_ip, | |
"protocol": protocol, | |
"src_port": src_port, | |
"dst_port": dst_port, | |
"data": remove_ansi_escape_sequences(data) | |
}) | |
#reassemble http payload | |
tcp_streams = reassemble_tcp_streams(packets) | |
http_payloads = extract_http_payloads(tcp_streams) | |
return parsed_data, http_payloads | |
def prepare_documents(self, parsed_data, http_data): | |
documents = [] | |
for packet in parsed_data: | |
content = (f"Source IP: {packet['src_ip']}, Destination IP: {packet['dst_ip']}, " | |
f"Protocol: {packet['protocol']}, Source Port: {packet['src_port']}, " | |
f"Destination Port: {packet['dst_port']}, Data: {packet['data']}") | |
documents.append(Document(page_content=content)) | |
for stream_id, payload in http_data.items(): | |
#print(stream_id) | |
content = (f"Source IP: {stream_id[0]}, Source Port: {stream_id[1]}, " | |
f"Protocol: http, Destnation IP: {stream_id[2]}, Destination Port: {stream_id[3]}, " | |
f"Data: {remove_ansi_escape_sequences(str(payload))}") | |
#print(content) | |
documents.append(Document(page_content=content)) | |
return documents | |
def remove_ansi_escape_sequences(input_string): | |
# Define a regular expression pattern to match ANSI escape sequences | |
ansi_escape_pattern = r'\x1B(?:[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]' | |
# Use re.sub() to replace ANSI escape sequences with an empty string | |
cleaned_string = re.sub(ansi_escape_pattern, '', input_string) | |
cleaned_string = cleaned_string.encode("ascii", errors='ignore') | |
cleaned_string = cleaned_string.decode('utf-8', errors='ignore') | |
return cleaned_string | |
# Utility function to check if the payload is text or binary | |
def is_printable(s): | |
try: | |
s.decode('utf-8') | |
except (UnicodeDecodeError, AttributeError): | |
return False | |
return True | |
# Extract and decode text data from a Raw layer | |
def extract_text_data(raw_layer): | |
try: | |
data = raw_layer.load.decode("utf-8", errors="ignore") | |
return data if is_printable(data.encode("utf-8")) else "" | |
except: | |
return "" | |
# Function to reassemble TCP streams | |
def reassemble_tcp_streams(packets): | |
streams = defaultdict(lambda: defaultdict(lambda: b"")) | |
tcp_streams = defaultdict(lambda: defaultdict(list)) | |
# Collect data for TCP streams | |
for pkt in packets: | |
if TCP in pkt: | |
ip_src = pkt[IP].src | |
ip_dst = pkt[IP].dst | |
sport = pkt[TCP].sport | |
dport = pkt[TCP].dport | |
stream_id = (ip_src, sport, ip_dst, dport) | |
if pkt[TCP].payload: | |
tcp_streams[stream_id][pkt[TCP].seq].append(pkt[TCP].payload) | |
# Reassemble the streams based on sequence numbers | |
for stream_id, segments in tcp_streams.items(): | |
full_data = b"" | |
sorted_segments = sorted(segments.items()) | |
for _, segs in sorted_segments: | |
for seg in segs: | |
full_data += bytes(seg) | |
streams[stream_id] = full_data | |
return streams | |
def safe_decode(data): | |
try: | |
return data.decode("utf-8", errors="ignore") | |
except UnicodeDecodeError: | |
return "<non-decodable binary data>" | |
# Function to extract and decode HTTP payloads | |
def extract_http_payloads(streams): | |
http_payloads = defaultdict(list) | |
for stream_id, data in streams.items(): | |
try: | |
# Try to parse as HTTP request or response | |
if b"HTTP" in data: | |
if data.startswith(b"GET") or data.startswith(b"POST"): | |
req = HTTPRequest(data) | |
http_payloads[stream_id] = safe_decode(req.load) | |
#print(safe_decode(req.load)) | |
elif b"HTTP/" in data: | |
res = HTTPResponse(data) | |
http_payloads[stream_id] = safe_decode(res.load) | |
#print(safe_decode(res.load)) | |
except Exception as e: | |
print(f"Error parsing HTTP data: {e}") | |
return http_payloads | |
# Function to reassemble fragmented IP packets | |
def reassemble_ip_fragments(packets): | |
reassembled_packets = [] | |
ip_fragments = defaultdict(list) | |
for pkt in packets: | |
if IP in pkt and pkt[IP].flags & 1: # Check for the 'MF' (More Fragments) flag | |
ip_id = (pkt[IP].src, pkt[IP].dst, pkt[IP].id) | |
ip_fragments[ip_id].append(pkt) | |
# If it's the last fragment (no 'MF' flag), reassemble | |
if pkt[IP].flags & 1 == 0: | |
full_packet = b''.join(bytes(fragment[IP].payload) for fragment in sorted(ip_fragments[ip_id], key=lambda x: x[IP].frag)) | |
reassembled_packet = IP(bytes(pkt[IP])) # Reassemble into one packet | |
reassembled_packet[IP].remove_payload() | |
reassembled_packet[IP].add_payload(full_packet) | |
reassembled_packets.append(reassembled_packet) | |
del ip_fragments[ip_id] # Clear the stored fragments | |
else: | |
reassembled_packets.append(pkt) | |
return reassembled_packets |