summaryrefslogtreecommitdiff
path: root/common-bytes.py
diff options
context:
space:
mode:
authorPeter Wu <lekensteyn@gmail.com>2013-12-07 17:28:34 +0100
committerPeter Wu <lekensteyn@gmail.com>2013-12-07 17:28:34 +0100
commit621061e800447bc083a25cca60fe3a8062fe097e (patch)
treea207ec9f5e57c15ec6225e40f2269ea84409cd5e /common-bytes.py
parent1c00e4233347159f207151e76c4f48db962678a2 (diff)
downloadscripts-621061e800447bc083a25cca60fe3a8062fe097e.tar.gz
common-bytes.py: find patterns in fixed-length lines
Use case: analyse captured TCP traces.
Diffstat (limited to 'common-bytes.py')
-rwxr-xr-xcommon-bytes.py217
1 files changed, 217 insertions, 0 deletions
diff --git a/common-bytes.py b/common-bytes.py
new file mode 100755
index 0000000..1210ec0
--- /dev/null
+++ b/common-bytes.py
@@ -0,0 +1,217 @@
+#!/usr/bin/python
+# Given lines of fixed length, mark uncommon chars
+# Copyright (C) 2013 Peter Wu <lekensteyn@gmail.com>
+# License: GPLv3+
+
+import sys
+import fileinput
+
+CLEAR_COLOR = "\033[m"
+
+# Reads lines from files and/or stdin
+def read_lines(files, line_width, lines_limit):
+ lines = []
+ lineno = 0
+ for line in fileinput.input(files=files):
+ lineno += 1
+ line = line.rstrip()
+ line_len = len(line)
+ if line_width > 0:
+ line = line[:line_width]
+ if line_width > 0 and line_len != line_width:
+ print("Line {} has a different length {} (expected {})"\
+ .format(lineno, line_len, line_width),
+ file=sys.stderr)
+ return None, None
+
+ lines.append(line)
+
+ # set expected line width if unavailable
+ if not (line_width > 0):
+ line_width = line_len
+
+ if lines_limit > 0 and lineno >= lines_limit:
+ print("Stopped reading after {} lines".format(lineno),
+ file=sys.stderr)
+ break
+
+ if not lines:
+ print("Input is empty", file=sys.stderr)
+
+ return line_width, lines
+
+def count_chars(lines, line_width):
+ # an array for each column, storing a map counting byte occurences
+ counts = []
+ # for each column, store a tuple (highest count, most common chars)
+ top_counts = []
+
+ # Process columns and rows, building a map of counters
+ for col in range(0, line_width):
+ col_ctr = {}
+ for line in lines:
+ char = line[col]
+ if not char in col_ctr:
+ col_ctr[char] = 1
+ else:
+ col_ctr[char] += 1
+ counts.append(col_ctr)
+
+ # Find highest common chars count
+ top_count = -1
+ top_chars = []
+ for char, count in col_ctr.items():
+ if count > top_count:
+ top_count = count
+ top_chars = [char]
+ elif count == top_count:
+ top_chars.append(char)
+ top_counts.append((top_count, len(top_chars), top_chars))
+
+ return top_counts
+
+# Calculate which columns need to get colored.
+# 0 = the whole column is has the same single char (uncolored)
+# 1 = this cell is the most common char (green)
+# 2 = this cell is one of the most common chars (yellow)
+# 3 = this cell is not most common (red)
+CELL_ALLSAME = 0
+CELL_COMMON = 1
+CELL_COMMON_SHARED = 2
+CELL_OUTLIER = 3
+def color_cells(lines, line_width, top_counts):
+ lines_count = len(lines)
+ colormap = [None] * lines_count
+ for lineno, line in enumerate(lines):
+ line_color = colormap[lineno] = [CELL_ALLSAME] * line_width
+
+ for col in range(0, line_width):
+ char = line[col]
+ max_count, common_count, common_chars = top_counts[col]
+
+ if char in common_chars:
+ # the most common char has no other chars in the same range:
+ if common_count == 1:
+ # color columns with changed
+ if max_count != lines_count:
+ line_color[col] = CELL_COMMON
+ else:
+ line_color[col] = CELL_COMMON_SHARED
+ else: # not common at all
+ line_color[col] = CELL_OUTLIER
+
+ return colormap
+
+# Find columns that can be skipped
+# common at begin: N..[other] (threshold: 5)
+# common at end: [other]..N (threshold: 5)
+# common in middle: [other]..N..[other] (threshold: 7)
+def mark_skippable(colormap, line_width):
+ skip_mask = [0] * line_width
+ skip_cols = 0
+ threshold = 5
+ for col in reversed(range(0, line_width)):
+ if colormap[0][col] == 0:
+ # this column can maybe be skipped
+ skip_cols += 1
+ skip_mask[col] = -1
+ else:
+ # this column cannot be skipped
+ if skip_cols > threshold:
+ skip_mask[col + 1] = skip_cols
+ # Must be the middle now.
+ threshold = 10
+ skip_cols = 0
+ # at begin, are there any cells to skip?
+ threshold = 5
+ if skip_cols > threshold:
+ skip_mask[0] = skip_cols
+
+ return skip_mask
+
+# Display input, coloring changes
+def display_changes(lines, line_width, colormap, skip_mask, enable_skip):
+ skipping = False
+ for lineno, line in enumerate(lines):
+ line_out = ""
+ is_colored = False
+ line_color = colormap[lineno]
+ for col in range(0, line_width):
+ # skip common cells?
+ if enable_skip:
+ if skipping and skip_mask[col] == -1:
+ # This line is still the same
+ continue
+ elif skip_mask[col] > 0:
+ # N columns can be skipped (N = skip_mask[col])
+ line_out += "\033[1;30m..{}..".format(skip_mask[col])
+ is_colored = True
+ skipping = True
+ continue
+ else:
+ # end of skippable columns
+ skipping = False
+
+ # only change colors if not skipping
+ if skip_mask[col] != -1:
+ coloring = line_color[col]
+ if coloring == CELL_ALLSAME:
+ if is_colored:
+ line_out += CLEAR_COLOR
+ is_colored = False
+ elif coloring == CELL_COMMON:
+ line_out += "\033[0;32m" # green
+ is_colored = True
+ elif coloring == CELL_COMMON_SHARED:
+ line_out += "\033[0;33m" # yellow
+ is_colored = True
+ elif coloring == CELL_OUTLIER:
+ line_out += "\033[1;31m" # red
+ is_colored = True
+ else:
+ raise AssertionError("Invalid coloring state")
+
+ line_out += line[col]
+
+ # Any colors must be cleared at the end of the line
+ if is_colored:
+ line_out += CLEAR_COLOR
+
+ print(line_out)
+
+def main(args):
+ # Grab lines from stdin and/ or command line
+ line_width, lines = read_lines(args.files, args.line_width, args.lines_limit)
+ if not lines:
+ return 1
+
+ # Calculate character frequency per column
+ top_counts = count_chars(lines, line_width)
+
+ # calculate coloring
+ colormap = color_cells(lines, line_width, top_counts)
+
+ # Shorten lines by cutting out common stuff
+ skip_mask = mark_skippable(colormap, line_width)
+
+ # Finally display the input, colored.
+ display_changes(lines, line_width, colormap, skip_mask, args.enable_skip)
+
+ return 0
+
+if __name__ == "__main__":
+ import argparse
+ parser = argparse.ArgumentParser(
+ description="Assume common lines, mark tiny differences")
+ parser.add_argument("-c", required=False, type=int, default=0,
+ dest="line_width", metavar="LENGTH",
+ help="Truncate input lines to LENGTH (default 0, unlimited)")
+ parser.add_argument("-l", required=False, type=int, default=0,
+ dest="lines_limit", metavar="COUNT",
+ help="Limit the number of lines to COUNT (default 0, unlimited)")
+ parser.add_argument("-s", required=False, action="store_true",
+ dest="enable_skip",
+ help="Enable skipping common columns")
+ parser.add_argument("files", nargs=argparse.REMAINDER)
+ args = parser.parse_args()
+ sys.exit(main(args))