#!/usr/bin/python # Given lines of fixed length, mark uncommon chars # Copyright (C) 2013 Peter Wu # License: GPLv3+ import sys import fileinput CLEAR_COLOR = "\033[m" # Reads lines from files and/or stdin def read_lines(files, line_width, lines_limit): lines = [] lineno = 0 for line in fileinput.input(files=files): lineno += 1 line = line.rstrip() if line_width > 0: line = line[:line_width] line_len = len(line) if line_width > 0 and line_len != line_width: print("Line {} has a different length {} (expected {})"\ .format(lineno, line_len, line_width), file=sys.stderr) return None, None lines.append(line) # set expected line width if unavailable if not (line_width > 0): line_width = line_len if lines_limit > 0 and lineno >= lines_limit: print("Stopped reading after {} lines".format(lineno), file=sys.stderr) break if not lines: print("Input is empty", file=sys.stderr) return line_width, lines def count_chars(lines, line_width): # an array for each column, storing a map counting byte occurences counts = [] # for each column, store a tuple (highest count, most common chars) top_counts = [] # Process columns and rows, building a map of counters for col in range(0, line_width): col_ctr = {} for line in lines: char = line[col] if not char in col_ctr: col_ctr[char] = 1 else: col_ctr[char] += 1 counts.append(col_ctr) # Find highest common chars count top_count = -1 top_chars = [] for char, count in col_ctr.items(): if count > top_count: top_count = count top_chars = [char] elif count == top_count: top_chars.append(char) top_counts.append((top_count, len(top_chars), top_chars)) return top_counts # Calculate which columns need to get colored. # 0 = the whole column is has the same single char (uncolored) # 1 = this cell is the most common char (green) # 2 = this cell is one of the most common chars (yellow) # 3 = this cell is not most common (red) CELL_ALLSAME = 0 CELL_COMMON = 1 CELL_COMMON_SHARED = 2 CELL_OUTLIER = 3 def color_cells(lines, line_width, top_counts): lines_count = len(lines) colormap = [None] * lines_count for lineno, line in enumerate(lines): line_color = colormap[lineno] = [CELL_ALLSAME] * line_width for col in range(0, line_width): char = line[col] max_count, common_count, common_chars = top_counts[col] if char in common_chars: # the most common char has no other chars in the same range: if common_count == 1: # color columns with changed if max_count != lines_count: line_color[col] = CELL_COMMON else: line_color[col] = CELL_COMMON_SHARED else: # not common at all line_color[col] = CELL_OUTLIER return colormap # Find columns that can be skipped # common at begin: N..[other] (threshold: 5) # common at end: [other]..N (threshold: 5) # common in middle: [other]..N..[other] (threshold: 9) def mark_skippable(colormap, line_width, context): skip_mask = [0] * line_width skip_cols = 0 threshold = 5 is_right = True for col in reversed(range(0, line_width)): if colormap[0][col] == 0: # this column can maybe be skipped skip_cols += 1 # If on the right, then there is no context to show. Otherwise, # reserve enough columns for the context. if is_right or skip_cols > context: skip_mask[col] = -1 else: # this column cannot be skipped # For the middle, more context need to be available if not is_right: skip_cols -= 2 * context else: skip_cols -= context # Insert skip marker if possible, otherwise ignore if skip_cols > threshold: # Display these columns... for i in range(1, context + 1): skip_mask[col + i] = 0 # ... and insert the skip count after the context skip_mask[col + 1 + context] = skip_cols # Must be the middle now. is_right = False threshold = 9 skip_cols = 0 # at begin, are there any cells to skip? # Keep context if there were differences on the right (i.e. if some columns # on the right were not skipped, but shown) if not is_right: skip_cols -= context threshold = 5 if skip_cols > threshold: skip_mask[0] = skip_cols return skip_mask colors = { CELL_ALLSAME: CLEAR_COLOR, CELL_COMMON: "\033[0;32m", # green, CELL_COMMON_SHARED: "\033[0;33m", # yellow CELL_OUTLIER: "\033[1;31m", # red } # Display input, coloring changes def display_changes(lines, line_width, colormap, skip_mask, enable_skip): for lineno, line in enumerate(lines): line_out = "" prev_color = CLEAR_COLOR line_color = colormap[lineno] skipping = False for col in range(0, line_width): # skip common cells? if enable_skip: if skipping and skip_mask[col] == -1: # This line is still the same continue elif skip_mask[col] > 0: # N columns can be skipped (N = skip_mask[col]) prev_color = "\033[1;30m" line_out += prev_color + "..{}..".format(skip_mask[col]) skipping = True continue else: # end of skippable columns skipping = False # apply coloring coloring = line_color[col] new_color = colors[coloring] if prev_color != new_color: line_out += new_color prev_color = new_color line_out += line[col] # Any colors must be cleared at the end of the line if prev_color != CLEAR_COLOR: line_out += CLEAR_COLOR print(line_out) def main(args): # Grab lines from stdin and/ or command line line_width, lines = read_lines(args.files, args.line_width, args.lines_limit) if not lines: return 1 # Calculate character frequency per column top_counts = count_chars(lines, line_width) # calculate coloring colormap = color_cells(lines, line_width, top_counts) # Shorten lines by cutting out common stuff skip_mask = mark_skippable(colormap, line_width, args.context) # Finally display the input, colored. display_changes(lines, line_width, colormap, skip_mask, args.enable_skip) return 0 if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description="Assume common lines, mark tiny differences") parser.add_argument("-C", required=False, type=int, default=0, dest="context", metavar="NUM", help="Number of columns to keep as context") parser.add_argument("-c", required=False, type=int, default=0, dest="line_width", metavar="LENGTH", help="Truncate input lines to LENGTH (default 0, unlimited)") parser.add_argument("-l", required=False, type=int, default=0, dest="lines_limit", metavar="COUNT", help="Limit the number of lines to COUNT (default 0, unlimited)") parser.add_argument("-s", required=False, action="store_true", dest="enable_skip", help="Enable skipping common columns") parser.add_argument("files", nargs=argparse.REMAINDER) args = parser.parse_args() try: sys.exit(main(args)) except BrokenPipeError: # ignore EPIPE for pipes like: thisScript.py | head pass