From 621061e800447bc083a25cca60fe3a8062fe097e Mon Sep 17 00:00:00 2001 From: Peter Wu Date: Sat, 7 Dec 2013 17:28:34 +0100 Subject: common-bytes.py: find patterns in fixed-length lines Use case: analyse captured TCP traces. --- common-bytes.py | 217 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100755 common-bytes.py (limited to 'common-bytes.py') diff --git a/common-bytes.py b/common-bytes.py new file mode 100755 index 0000000..1210ec0 --- /dev/null +++ b/common-bytes.py @@ -0,0 +1,217 @@ +#!/usr/bin/python +# Given lines of fixed length, mark uncommon chars +# Copyright (C) 2013 Peter Wu +# License: GPLv3+ + +import sys +import fileinput + +CLEAR_COLOR = "\033[m" + +# Reads lines from files and/or stdin +def read_lines(files, line_width, lines_limit): + lines = [] + lineno = 0 + for line in fileinput.input(files=files): + lineno += 1 + line = line.rstrip() + line_len = len(line) + if line_width > 0: + line = line[:line_width] + if line_width > 0 and line_len != line_width: + print("Line {} has a different length {} (expected {})"\ + .format(lineno, line_len, line_width), + file=sys.stderr) + return None, None + + lines.append(line) + + # set expected line width if unavailable + if not (line_width > 0): + line_width = line_len + + if lines_limit > 0 and lineno >= lines_limit: + print("Stopped reading after {} lines".format(lineno), + file=sys.stderr) + break + + if not lines: + print("Input is empty", file=sys.stderr) + + return line_width, lines + +def count_chars(lines, line_width): + # an array for each column, storing a map counting byte occurences + counts = [] + # for each column, store a tuple (highest count, most common chars) + top_counts = [] + + # Process columns and rows, building a map of counters + for col in range(0, line_width): + col_ctr = {} + for line in lines: + char = line[col] + if not char in col_ctr: + col_ctr[char] = 1 + else: + col_ctr[char] += 1 + counts.append(col_ctr) + + # Find highest common chars count + top_count = -1 + top_chars = [] + for char, count in col_ctr.items(): + if count > top_count: + top_count = count + top_chars = [char] + elif count == top_count: + top_chars.append(char) + top_counts.append((top_count, len(top_chars), top_chars)) + + return top_counts + +# Calculate which columns need to get colored. +# 0 = the whole column is has the same single char (uncolored) +# 1 = this cell is the most common char (green) +# 2 = this cell is one of the most common chars (yellow) +# 3 = this cell is not most common (red) +CELL_ALLSAME = 0 +CELL_COMMON = 1 +CELL_COMMON_SHARED = 2 +CELL_OUTLIER = 3 +def color_cells(lines, line_width, top_counts): + lines_count = len(lines) + colormap = [None] * lines_count + for lineno, line in enumerate(lines): + line_color = colormap[lineno] = [CELL_ALLSAME] * line_width + + for col in range(0, line_width): + char = line[col] + max_count, common_count, common_chars = top_counts[col] + + if char in common_chars: + # the most common char has no other chars in the same range: + if common_count == 1: + # color columns with changed + if max_count != lines_count: + line_color[col] = CELL_COMMON + else: + line_color[col] = CELL_COMMON_SHARED + else: # not common at all + line_color[col] = CELL_OUTLIER + + return colormap + +# Find columns that can be skipped +# common at begin: N..[other] (threshold: 5) +# common at end: [other]..N (threshold: 5) +# common in middle: [other]..N..[other] (threshold: 7) +def mark_skippable(colormap, line_width): + skip_mask = [0] * line_width + skip_cols = 0 + threshold = 5 + for col in reversed(range(0, line_width)): + if colormap[0][col] == 0: + # this column can maybe be skipped + skip_cols += 1 + skip_mask[col] = -1 + else: + # this column cannot be skipped + if skip_cols > threshold: + skip_mask[col + 1] = skip_cols + # Must be the middle now. + threshold = 10 + skip_cols = 0 + # at begin, are there any cells to skip? + threshold = 5 + if skip_cols > threshold: + skip_mask[0] = skip_cols + + return skip_mask + +# Display input, coloring changes +def display_changes(lines, line_width, colormap, skip_mask, enable_skip): + skipping = False + for lineno, line in enumerate(lines): + line_out = "" + is_colored = False + line_color = colormap[lineno] + for col in range(0, line_width): + # skip common cells? + if enable_skip: + if skipping and skip_mask[col] == -1: + # This line is still the same + continue + elif skip_mask[col] > 0: + # N columns can be skipped (N = skip_mask[col]) + line_out += "\033[1;30m..{}..".format(skip_mask[col]) + is_colored = True + skipping = True + continue + else: + # end of skippable columns + skipping = False + + # only change colors if not skipping + if skip_mask[col] != -1: + coloring = line_color[col] + if coloring == CELL_ALLSAME: + if is_colored: + line_out += CLEAR_COLOR + is_colored = False + elif coloring == CELL_COMMON: + line_out += "\033[0;32m" # green + is_colored = True + elif coloring == CELL_COMMON_SHARED: + line_out += "\033[0;33m" # yellow + is_colored = True + elif coloring == CELL_OUTLIER: + line_out += "\033[1;31m" # red + is_colored = True + else: + raise AssertionError("Invalid coloring state") + + line_out += line[col] + + # Any colors must be cleared at the end of the line + if is_colored: + line_out += CLEAR_COLOR + + print(line_out) + +def main(args): + # Grab lines from stdin and/ or command line + line_width, lines = read_lines(args.files, args.line_width, args.lines_limit) + if not lines: + return 1 + + # Calculate character frequency per column + top_counts = count_chars(lines, line_width) + + # calculate coloring + colormap = color_cells(lines, line_width, top_counts) + + # Shorten lines by cutting out common stuff + skip_mask = mark_skippable(colormap, line_width) + + # Finally display the input, colored. + display_changes(lines, line_width, colormap, skip_mask, args.enable_skip) + + return 0 + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser( + description="Assume common lines, mark tiny differences") + parser.add_argument("-c", required=False, type=int, default=0, + dest="line_width", metavar="LENGTH", + help="Truncate input lines to LENGTH (default 0, unlimited)") + parser.add_argument("-l", required=False, type=int, default=0, + dest="lines_limit", metavar="COUNT", + help="Limit the number of lines to COUNT (default 0, unlimited)") + parser.add_argument("-s", required=False, action="store_true", + dest="enable_skip", + help="Enable skipping common columns") + parser.add_argument("files", nargs=argparse.REMAINDER) + args = parser.parse_args() + sys.exit(main(args)) -- cgit v1.2.1