This repository has been archived on 2025-07-20. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.
an/analyse.py
2024-12-12 16:19:59 +00:00

88 lines
No EOL
2.6 KiB
Python

import sys
import pcapkit
from pcapkit.const.tcp.flags import Flags as TCPFlags
import pcapkit.utilities.warnings
from pathlib import Path
from functools import reduce
import json
from collections import defaultdict
import os
from tqdm import tqdm
import logging
logging.getLogger("pcapkit").setLevel(logging.ERROR)
# libpcap style capture filter:
# host 45.76.35.230 or host 2001:19f0:5001:32f3:5400:3ff:fe07:2a1c
# did the transmission complete properly?
# did it timeout?
# how much data was transferred?
# how long did it take
# speed? though not especially useful
def process_trace(fname: str, results: dict):
# etc/etc/tcpdyn-4039-256K-009.pcap
_, port, _, run_number = (p := Path(fname)).name.removesuffix(p.suffix).split("-")
# print(f"{port=} {run_number=}")
extraction = pcapkit.extract(fin=fname, nofile=True, reassembly=True)
our_ip = extraction.frame[0][pcapkit.IP].src # only useful because we're always the one starting the connection
# print(f"{our_ip=}")
sent_server_data = filter(
lambda x: x[pcapkit.IP].src != our_ip, # was this sent from the server?
extraction.frame,
)
total_data_received = reduce(lambda acc, v: acc + len((tcpp := v[pcapkit.TCP])) - tcpp.length, sent_server_data, 0)
# print(total_data_received)
# print(total_data_received / 256000 * 100)
was_connection_reset = extraction.frame[-1][pcapkit.TCP].connection & TCPFlags.RST != 0
# print(f"{was_connection_reset=}")
connection_reset_by_remote = extraction.frame[-1][pcapkit.IP].src != our_ip
# print(f"{connection_reset_by_remote=}")
time_start = extraction.frame[0].info.time_epoch
time_end = extraction.frame[-1].info.time_epoch
runtime = time_end - time_start
# print(f"{runtime=} {runtime/60=}")
res = {
"run_number": run_number,
"total_data_received": total_data_received,
"was_connection_reset": was_connection_reset,
"reset_by_remote": connection_reset_by_remote,
"runtime": float(runtime),
"time_start": float(time_start),
"time_end": float(time_end),
}
results[port].append(res)
if __name__ == "__main__":
INPUT_DIRECTORY = sys.argv[1]
results = defaultdict(list)
eis = list(os.scandir(INPUT_DIRECTORY))
for entry in tqdm(eis):
if entry.is_dir():
continue
if not entry.name.endswith(".pcap"):
continue
process_trace(entry.path, results)
print(json.dumps(results, indent="\t"))