import sys import pcapkit from pcapkit.const.tcp.flags import Flags as TCPFlags import pcapkit.utilities.warnings from pathlib import Path from functools import reduce import json from collections import defaultdict import os from tqdm import tqdm import logging logging.getLogger("pcapkit").setLevel(logging.ERROR) # libpcap style capture filter: # host 45.76.35.230 or host 2001:19f0:5001:32f3:5400:3ff:fe07:2a1c # did the transmission complete properly? # did it timeout? # how much data was transferred? # how long did it take # speed? though not especially useful def process_trace(fname: str, results: dict): # etc/etc/tcpdyn-4039-256K-009.pcap _, port, _, run_number = (p := Path(fname)).name.removesuffix(p.suffix).split("-") # print(f"{port=} {run_number=}") extraction = pcapkit.extract(fin=fname, nofile=True, reassembly=True) our_ip = extraction.frame[0][pcapkit.IP].src # only useful because we're always the one starting the connection # print(f"{our_ip=}") sent_server_data = filter( lambda x: x[pcapkit.IP].src != our_ip, # was this sent from the server? extraction.frame, ) total_data_received = reduce(lambda acc, v: acc + len((tcpp := v[pcapkit.TCP])) - tcpp.length, sent_server_data, 0) # print(total_data_received) # print(total_data_received / 256000 * 100) was_connection_reset = extraction.frame[-1][pcapkit.TCP].connection & TCPFlags.RST != 0 # print(f"{was_connection_reset=}") connection_reset_by_remote = extraction.frame[-1][pcapkit.IP].src != our_ip # print(f"{connection_reset_by_remote=}") time_start = extraction.frame[0].info.time_epoch time_end = extraction.frame[-1].info.time_epoch runtime = time_end - time_start # print(f"{runtime=} {runtime/60=}") res = { "run_number": run_number, "total_data_received": total_data_received, "was_connection_reset": was_connection_reset, "reset_by_remote": connection_reset_by_remote, "runtime": float(runtime), "time_start": float(time_start), "time_end": float(time_end), } results[port].append(res) if __name__ == "__main__": INPUT_DIRECTORY = sys.argv[1] results = defaultdict(list) eis = list(os.scandir(INPUT_DIRECTORY)) for entry in tqdm(eis): if entry.is_dir(): continue if not entry.name.endswith(".pcap"): continue process_trace(entry.path, results) print(json.dumps(results, indent="\t"))