diff options
26 files changed, 955 insertions, 1459 deletions
diff --git a/doc/README.display_filter b/doc/README.display_filter index 348f88e202..db3ee6eb51 100644 --- a/doc/README.display_filter +++ b/doc/README.display_filter @@ -554,45 +554,51 @@ expression generator. How to add a new test to dfilter-test.py ======================================== -If you need a new packet, create an instance of Packet() -inside dfilter-test.py, and add its hexdump as you see in the -existing Packet instances. +Note: dfilter-test.py requires Python 2.7 -Add a method to run your tests to an existing class which is -a sub-class of Test, or add your own new sub-class of Test. -The 'tests' field in each class must list the test methods that -are to be called. Each test must return the result of: +"tools/dfilter-test.py" is the main test script. It includes +the test from files in tools/dftestlib. You can add a test +to a file in tools/dftestlib, or you can create a new file +in tools/dftestlib. If you do add a new file, you must +import it (and the class it defines) in "tools/dfilter-test.py" - self.DFilterCount(packet_object, dfilter_text, expected_count) +Each new test class must define "trace_file", which names +a capture file in "tools/dftestfiles". All the tests +run in that class will use that one capture file. -If you have added a new sub-class of Test, it must be added -to the global all_tests variable. +There are 2 methods you can use for testing: -Then, simply run "dfilter-test.py". You can run the tests -in a single Test sub-class by naming that sub-class on the -dfilter-test.py command-line, like: +assertDfilter(dfilter_text, expected_count) + + This will run the display filter through tshark, on the + file named by "trace_file", and assert that the + number of resulting packets equals "expected_count". This + also asserts that tshark does not fail; success with zero + matches is not the same as failure to compile the display + filter string. + +assertDFilterFail(dfilter_text) -$ ./tools/dfilter-test.py TVB -Note: Bytes test does not yet test FT_INT64. -Note: Scanner test does not yet test embedded double-quote. -TVB - ck_eq_1 ... OK - ck_slice_1 ... OK - ck_slice_2 ... OK - ck_slice_3 ... OK - ck_contains_1 ... OK - ck_contains_2 ... OK - ck_contains_3 ... OK - ck_contains_4 ... OK - ck_contains_5 ... OK + This will run tshark with the display filter, and + asser that tshark fails. This is useful when expecting + display filter syntax errors to be caught. + +Then, simply run "dfilter-test.py". You can run the tests +in a single Test class by naming that -class on the +dfilter-test.py command-line, or even run a single +test by naming it. E.g., the following are all valid ways +of running dfilter-test.py: +# Run all tests +$ ./tools/dfilter-test.py -Total Tests Run: 9 -Total Tests Succeeded: 9 -Total Tests Failed: 0 +# Run all tests in "testTVB" +$ ./tools/dfilter-test.py testTVB +# Run the the "test_contains_1" test from testTVB +$ ./tools/dfilter-test.py testTVB.test_contains_1 Note that dfilter-test.py should be run from the top of the Wireshark distribution, so it knows where to find the default -text2pcap and tshark executables. +tshark executable. diff --git a/tools/dfilter-test.py b/tools/dfilter-test.py index 0f65c9b19d..cd0f93ce6c 100755 --- a/tools/dfilter-test.py +++ b/tools/dfilter-test.py @@ -6,7 +6,7 @@ Test-suite to test wireshark's dfilter mechanism. # # $Id$ # -# Copyright (C) 2003 by Gilbert Ramirez <gram@alumni.rice.edu> +# Copyright (C) 2003-2013 by Gilbert Ramirez <gram@alumni.rice.edu> # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License @@ -24,1434 +24,25 @@ Test-suite to test wireshark's dfilter mechanism. import os import sys -import atexit -import tempfile import types -import getopt - -# Global variables that can be overridden by user - -REMOVE_TEMP_FILES = 1 -VERBOSE = 0 -TEXT2PCAP = os.path.join(".", "text2pcap") -TSHARK = os.path.join(".", "tshark") - -# Some DLT values. Add more from <net/bpf.h> if you need to. - -DLT_NULL = 0 # no link-layer encapsulation -DLT_EN10MB = 1 # Ethernet (10Mb) -DLT_EN3MB = 2 # Experimental Ethernet (3Mb) -DLT_AX25 = 3 # Amateur Radio AX.25 -DLT_PRONET = 4 # Proteon ProNET Token Ring -DLT_CHAOS = 5 # Chaos -DLT_IEEE802 = 6 # IEEE 802 Networks -DLT_ARCNET = 7 # ARCNET -DLT_SLIP = 8 # Serial Line IP -DLT_PPP = 9 # Point-to-point Protocol -DLT_FDDI = 10 # FDDI -DLT_FRELAY = 107 # Frame Relay - -################################################################################ - -class RunCommandError: - """The exception that run_cmd can produce.""" - pass - -def run_cmd(cmd): - """Run a command. 'cmd' is either a string or - a tuple/array of strings. Returns a tuple of - the output of the command and the return value. - If an error did not occur, the return value is None, not 0. - If an error occured while trying to run the command, - RunCommandError is raised. - Both, or either, the output and the return value, may - be None if RunCommandError is raised..""" - - if type(cmd) == types.TupleType: - cmd = ' '.join(cmd) - - output = None - error = None - - if VERBOSE: - print "Running", cmd - - try: - pipe = os.popen(cmd) - output = pipe.readlines() - error = pipe.close() - - except OSError: - raise RunCommandError - - return (output, error) - - -def remove_file(filename): - """Remove a file. No exceptions are produced even - when the file cannot be removed.""" - try: - os.remove(filename) - except OSError: - pass - - -class Packet: - """Knows how to convert a string representing the - hex-dump of packet into a libpcap file.""" - - def __init__(self, linklayer): - """Linklayer is a DLT value.""" - self.linklayer = linklayer - self.data = None - self.filename = None - self.time_fmt = None - - def Filename(self): - """Returns the filename of the packet trace. - The first time this is called, the libpcap trace - file is created. During subsequent calls, the libpcap - tracee file already exists, so the filename is simply - returned. Care is taken so that the libpcap trace file - is automatically deleted when this Python process - exits.""" - if not self.filename: - # Create the temporary text file. - hex_filename = tempfile.mktemp("-dfilter-test.txt") - - # Tell Python to remove the file when exiting - if REMOVE_TEMP_FILES: - atexit.register(remove_file, hex_filename) - - try: - hex_fh = open(hex_filename, "w") - hex_fh.write(self.data) - hex_fh.write("\n") - hex_fh.close() - except IOError, err: - sys.exit("Could not write to %s: %s" % \ - (hex_filename, err)) - - - # Create the pcap file - self.filename = tempfile.mktemp("-dfilter-test.cap") - - # Tell Python to remove the file when exiting - if REMOVE_TEMP_FILES: - atexit.register(remove_file, self.filename) - - cmd = (TEXT2PCAP, "-q -l", str(self.linklayer)) - - if self.time_fmt: - cmd = cmd + ("-t", "'" + self.time_fmt + "'") - - cmd = cmd + (hex_filename, self.filename) - - try: - (output, error) = run_cmd(cmd) - except RunCommandError: - sys.exit("Could not produce trace file.") - - if error != None: - sys.exit("Could not produce trace file.") - - - if not REMOVE_TEMP_FILES: - print "(", self.filename, ") ...", - - return self.filename - - -OK = 0 -FAILED = 1 - -class Test: - """Base class for test classes.""" - - def Run(self): - """Run the tests listed in self.tests. - Return the score.""" - - num_run = 0 - num_succeeded = 0 - - for test in self.tests: - print "\t", test.__name__ , "...", - retval = test(self) - if retval == OK: - print "OK" - num_succeeded += 1 - else: - print "FAILED" - num_run += 1 - - return (num_run, num_succeeded) - - - def DFilterCount(self, packet, dfilter, num_lines_expected): - """Run a dfilter on a packet file and expect - a certain number of output lines. If num_lines_expected - is None, then the tshark command is expected to fail - with a non-zero return value.""" - - packet_file = packet.Filename() - - cmd = (TSHARK, "-n -r", packet_file, "-R '", dfilter, "'") - - tshark_failed = 0 - - try: - (output, retval) = run_cmd(cmd) - except RunCommandError: - tshark_failed = 1 - -# print "GOT", len(output), "lines:", output, retval - - if retval: - tshark_failed = 1 - - if tshark_failed: - if num_lines_expected == None: - if VERBOSE: - print "\nGot:", output - return OK - else: - print "\nGot:", output - return FAILED - - elif len(output) == num_lines_expected: - if VERBOSE: - print "\nGot:", output - return OK - else: - print "\nGot:", output - return FAILED - - -################################################################################ -# Add packets here -# Watch out for trailing backslashes. If the last character in the line is a -# backslash, the data won't convert properly. Just remove the backslash or -# replace it with another character. I haven't determined if this is due to -# Python's "here-document" parsing, or due to text2pcap. -################################################################################ - -# IPX RIP Response -pkt_ipx_rip = Packet(DLT_EN10MB) -pkt_ipx_rip.data = """ -0000 ff ff ff ff ff ff 00 aa 00 a3 e3 a4 00 28 ff ff ........ .....(.. -0010 00 28 00 01 00 00 00 28 ff ff ff ff ff ff 04 53 .(.....( .......S -0020 00 00 00 28 00 aa 00 a3 e3 a4 04 53 00 02 39 17 ...(.... ...S..9. -0030 29 e2 00 01 00 02 00 00 00 00 00 00 )....... .... -""" - -# IPv6 -pkt_ipv6 = Packet(DLT_EN10MB) -pkt_ipv6.data = """ -0000 33 33 00 00 99 99 00 00 86 05 80 fa 86 dd 60 00 33...... ......`. -0010 00 00 00 20 00 01 fe 80 00 00 00 00 00 00 02 00 ... .... ........ -0020 86 ff fe 05 80 fa ff 05 00 00 00 00 00 00 00 00 ........ ........ -0030 00 00 00 00 99 99 3a 00 01 00 05 02 00 00 83 00 ......:. ........ -0040 44 ed 00 00 00 00 ff 05 00 00 00 00 00 00 00 00 D....... ........ -0050 00 00 00 00 99 99 ...... -""" - -# ARP -pkt_arp = Packet(DLT_FRELAY) -pkt_arp.data = """ -0000 18 41 03 00 80 00 00 00 08 06 00 0f 08 00 02 04 .A...... ........ -0010 00 08 00 00 0a ce 01 02 00 64 00 00 00 00 ........ .d.... -""" - -# NFS -pkt_nfs = Packet(DLT_FDDI) -pkt_nfs.time_fmt = "%Y-%m-%d %H:%M:%S." -pkt_nfs.data = """ -2002-12-31 07:55:31.3 -0000 51 10 00 d4 cd 59 6f 00 07 4a 01 6e 00 aa aa 03 Q....Yo. .J.n.... -0010 00 00 00 08 00 45 00 00 b4 1c cf 40 00 fc 11 a4 .....E.. ...@.... -0020 cd ac 19 64 0e c6 5f e6 14 03 ff 08 01 00 a0 79 ...d.._. .......y -0030 f9 7b 55 8a eb 00 00 00 00 00 00 00 02 00 01 86 .{U..... ........ -0040 a3 00 00 00 03 00 00 00 01 00 00 00 01 00 00 00 ........ ........ -0050 4c 36 db 91 97 00 00 00 0a 61 74 6d 63 6c 69 65 L6...... .atmclie -0060 6e 74 32 00 00 00 00 00 00 00 00 00 01 00 00 00 nt2..... ........ -0070 0b 00 00 00 01 00 00 00 00 00 00 00 02 00 00 00 ........ ........ -0080 03 00 00 00 04 00 00 00 05 00 00 00 06 00 00 00 ........ ........ -0090 07 00 00 00 08 00 00 00 09 00 00 00 0c 00 00 00 ........ ........ -00a0 00 00 00 00 00 00 00 00 20 21 92 13 00 a7 92 59 ........ !.....Y -00b0 07 20 00 00 00 00 02 4a 77 db b5 19 01 19 00 00 . .....J w....... -00c0 00 01 a4 06 00 97 1b 05 00 ........ . - -2002-12-31 07:55:32.0 -0000 51 00 07 4a 01 6e 00 10 00 d4 cd 59 6f aa aa 03 Q..J.n.. ...Yo... -0010 00 00 00 08 00 45 00 00 8c 6d 3c 00 00 40 11 50 .....E.. .m<..@.P -0020 89 c6 5f e6 14 ac 19 64 0e 08 01 03 ff 00 78 1d .._....d ......x. -0030 99 7b 55 8a eb 00 00 00 01 00 00 00 00 00 00 00 .{U..... ........ -0040 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ........ ........ -0050 01 00 00 01 ed 00 00 00 01 00 00 00 00 00 00 00 ........ ........ -0060 1e 00 00 00 00 00 04 07 60 00 00 00 00 00 04 20 ........ `...... -0070 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 ........ ........ -0080 19 00 00 00 00 00 02 4a 77 36 db 94 da 0c 84 5c .......J w6...... -0090 68 32 1e 28 e9 00 00 00 00 32 23 d4 10 0a 21 fe h2.(.... .2#...!. -00a0 80 -""" - -# NTP -pkt_ntp = Packet(DLT_EN10MB) -pkt_ntp.data = """ -0000 08 00 2b 91 e8 3a 08 00 2b e4 c4 43 08 00 45 00 ..+..:.. +..C..E. -0010 00 4c 64 4c 00 00 1e 11 02 47 82 dc 18 3e 82 dc .LdL.... .G...>.. -0020 18 18 00 7b 00 7b 00 38 ee 1c 1b 04 06 f5 00 00 ...{.{.8 ........ -0030 10 0d 00 00 05 57 82 dc 18 18 ba 29 66 36 7d d0 .....W.. ...)f6}. -0040 00 00 ba 29 66 36 7d 58 40 00 ba 29 66 36 7d d0 ...)f6}X @..)f6}. -0050 00 00 ba 29 66 76 7d 50 50 00 ...)fv}P P. -""" - - -# HTTP -pkt_http = Packet(DLT_EN10MB) -pkt_http.time_fmt = "%Y-%m-%d %H:%M:%S." -pkt_http.data = """ -2002-12-31 07:55:31.3 -0000 00 e0 81 00 b0 28 00 09 6b 88 f5 c9 08 00 45 00 .....(.. k.....E. -0010 00 c1 d2 49 40 00 80 06 c8 5b 0a 00 00 05 cf 2e ...I@... .[...... -0020 86 5e 0c c3 00 50 a8 00 76 87 7d e0 14 02 50 18 .^...P.. v.}...P. -0030 fa f0 ad 62 00 00 48 45 41 44 20 2f 76 34 2f 69 ...b..HE AD /v4/i -0040 75 69 64 65 6e 74 2e 63 61 62 3f 30 33 30 37 30 uident.c ab?03070 -0050 31 31 32 30 38 20 48 54 54 50 2f 31 2e 31 0d 0a 11208 HT TP/1.1.. -0060 41 63 63 65 70 74 3a 20 2a 2f 2a 0d 0a 55 73 65 Accept: */*..Use -0070 72 2d 41 67 65 6e 74 3a 20 49 6e 64 75 73 74 72 r-Agent: Industr -0080 79 20 55 70 64 61 74 65 20 43 6f 6e 74 72 6f 6c y Update Control -0090 0d 0a 48 6f 73 74 3a 20 77 69 6e 64 6f 77 73 75 ..Host: windowsu -00a0 70 64 61 74 65 2e 6d 69 63 72 6f 73 6f 66 74 2e pdate.mi crosoft. -00b0 63 6f 6d 0d 0a 43 6f 6e 6e 65 63 74 69 6f 6e 3a com..Con nection: -00c0 20 4b 65 65 70 2d 41 6c 69 76 65 0d 0a 0d 0a Keep-Al ive.... -""" - - -# TFTP -pkt_tftp = Packet(DLT_IEEE802) -pkt_tftp.data = """ -0000 10 40 00 20 35 01 2b 59 00 06 29 17 93 f8 aa aa .@. 5.+Y ..)..... -0010 03 00 00 00 08 00 45 00 00 37 f9 39 00 00 40 11 ......E. .7.9..@. -0020 a6 db c0 a8 2c 7b c0 a8 2c d5 f9 39 00 45 00 23 ....,{.. ,..9.E.# -0030 8d 73 00 01 43 3a 5c 49 42 4d 54 43 50 49 50 5c .s..C:\I BMTCPIP. -0040 6c 63 63 6d 2e 31 00 6f 63 74 65 74 00 lccm.1.o ctet. -""" - - -################################################################################ -# Add tests here -################################################################################ - -class Bytes(Test): - """Tests routines in ftype-bytes.c""" - - def __init__(self): - print "Note: Bytes test does not yet test FT_INT64." - - def ck_eq_1(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.dst == ff:ff:ff:ff:ff:ff", 1) - - def ck_eq_2(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src == ff:ff:ff:ff:ff:ff", 0) - - def ck_ne_1(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.dst != ff:ff:ff:ff:ff:ff", 0) - - def ck_ne_2(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src != ff:ff:ff:ff:ff:ff", 1) - - def ck_gt_1(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src > 00:aa:00:a3:e3:ff", 0) - - def ck_gt_2(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src > 00:aa:00:a3:e3:a4", 0) - - def ck_gt_3(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src > 00:aa:00:a3:e3:00", 1) - - def ck_ge_1(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src >= 00:aa:00:a3:e3:ff", 0) - - def ck_ge_2(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src >= 00:aa:00:a3:e3:a4", 1) - - def ck_ge_3(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src >= 00:aa:00:a3:e3:00", 1) - - def ck_lt_1(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src < 00:aa:00:a3:e3:ff", 1) - - def ck_lt_2(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src < 00:aa:00:a3:e3:a4", 0) - - def ck_lt_3(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src < 00:aa:00:a3:e3:00", 0) - - def ck_le_1(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src <= 00:aa:00:a3:e3:ff", 1) - - def ck_le_2(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src <= 00:aa:00:a3:e3:a4", 1) - - def ck_le_3(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src <= 00:aa:00:a3:e3:00", 0) - - def ck_slice_1(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src[0:3] == 00:aa:00", 1) - - def ck_slice_2(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src[-3:3] == a3:e3:a4", 1) - - def ck_slice_3(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src[1:4] == aa:00:a3:e3", 1) - - def ck_slice_4(self): - return self.DFilterCount(pkt_ipx_rip, - "eth.src[0] == 00", 1) - - def ck_ipv6_1(self): - return self.DFilterCount(pkt_ipv6, - "ipv6.dst == ff05::9999", 1) - - def ck_ipv6_2(self): - return self.DFilterCount(pkt_ipv6, - "ipv6.dst == ff05::9990", 0) - - # ck_eq_1 checks FT_ETHER; this checks FT_BYTES - def ck_bytes_1(self): - return self.DFilterCount(pkt_arp, - "arp.dst.hw == 00:64", 1) - - # ck_eq_2 checks FT_ETHER; this checks FT_BYTES - def ck_bytes_2(self): - return self.DFilterCount(pkt_arp, - "arp.dst.hw == 00:00", 0) - - # ck_eq_1 checks FT_ETHER; this checks FT_UINT64 - def ck_uint64_1(self): - return self.DFilterCount(pkt_nfs, - "nfs.fattr3.size == 264032", 1) - - # ck_eq_2 checks FT_ETHER; this checks FT_UINT64 - def ck_uint64_2(self): - return self.DFilterCount(pkt_nfs, - "nfs.fattr3.size == 264000", 0) - - def ck_contains_1(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.node contains a3", 1) - - def ck_contains_2(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.node contains a3:e3", 1) - - def ck_contains_3(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.node contains 00:aa:00:a3:e3:a4", 1) - - def ck_contains_4(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.node contains aa:e3", 0) - - - tests = [ - ck_eq_1, - ck_eq_2, - ck_ne_1, - ck_ne_2, - ck_gt_1, - ck_gt_2, - ck_gt_3, - ck_ge_1, - ck_ge_2, - ck_ge_3, - ck_lt_1, - ck_lt_2, - ck_lt_3, - ck_le_1, - ck_le_2, - ck_le_3, - ck_slice_1, - ck_slice_2, - ck_slice_3, - ck_slice_4, - ck_ipv6_1, - ck_ipv6_2, - ck_bytes_1, - ck_bytes_2, - ck_uint64_1, - ck_uint64_2, - ck_contains_1, - ck_contains_2, - ck_contains_3, - ck_contains_4, - ] - - -class Double(Test): - """Tests routines in ftype-double.c""" - - def ck_eq_1(self): - # This works on ia32/Linux - # http://www.cslab.vt.edu/manuals/glibc-2.2.3/html_node/libc_673.html - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay == 0.0626983642578125", 1) - - def ck_eq_2(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay == 0.0626", 0) - - def ck_gt_1(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay > 1.0626", 0) - - def ck_gt_2(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay > 0.0626983642578125", 0) - - def ck_gt_3(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay > 0.0026", 1) - - def ck_ge_1(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay >= 1.0626", 0) - - def ck_ge_2(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay >= 0.0626983642578125", 1) - - def ck_ge_3(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay > 0.0026", 1) - - def ck_lt_1(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay < 1.0626", 1) - - def ck_lt_2(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay < 0.0626983642578125", 0) - - def ck_lt_3(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay < 0.0026", 0) - - def ck_le_1(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay <= 1.0626", 1) - - def ck_le_2(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay <= 0.0626983642578125", 1) - - def ck_le_3(self): - return self.DFilterCount(pkt_ntp, - "ntp.rootdelay <= 0.0026", 0) - - - tests = [ - ck_eq_1, - ck_eq_2, - ck_gt_1, - ck_gt_2, - ck_gt_3, - ck_ge_1, - ck_ge_2, - ck_ge_3, - ck_lt_1, - ck_lt_2, - ck_lt_3, - ck_le_1, - ck_le_2, - ck_le_3, - ] - -class Integer(Test): - """Tests routines in ftype-integer.c""" - - def ck_eq_1(self): - return self.DFilterCount(pkt_ntp, - "ip.version == 4", 1) - - def ck_eq_2(self): - return self.DFilterCount(pkt_ntp, - "ip.version == 6", 0) - - def ck_ne_1(self): - return self.DFilterCount(pkt_ntp, - "ip.version != 0", 1) - - def ck_ne_2(self): - return self.DFilterCount(pkt_ntp, - "ip.version != 4", 0) - - def ck_u_gt_1(self): - return self.DFilterCount(pkt_ntp, - "ip.version > 3", 1) - - def ck_u_gt_2(self): - return self.DFilterCount(pkt_ntp, - "ip.version > 4", 0) - - def ck_u_gt_3(self): - return self.DFilterCount(pkt_ntp, - "ip.version > 5", 0) - - def ck_u_ge_1(self): - return self.DFilterCount(pkt_ntp, - "ip.version >= 3", 1) - - def ck_u_ge_2(self): - return self.DFilterCount(pkt_ntp, - "ip.version >= 4", 1) - - def ck_u_ge_3(self): - return self.DFilterCount(pkt_ntp, - "ip.version >= 5", 0) - - def ck_u_lt_1(self): - return self.DFilterCount(pkt_ntp, - "ip.version < 3", 0) - - def ck_u_lt_2(self): - return self.DFilterCount(pkt_ntp, - "ip.version < 4", 0) - - def ck_u_lt_3(self): - return self.DFilterCount(pkt_ntp, - "ip.version < 5", 1) - - def ck_u_le_1(self): - return self.DFilterCount(pkt_ntp, - "ip.version <= 3", 0) - - def ck_u_le_2(self): - return self.DFilterCount(pkt_ntp, - "ip.version <= 4", 1) - - def ck_u_le_3(self): - return self.DFilterCount(pkt_ntp, - "ip.version <= 5", 1) - - def ck_s_gt_1(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision > -12", 1) - - def ck_s_gt_2(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision > -11", 0) - - def ck_s_gt_3(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision > -10", 0) - - def ck_s_ge_1(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision >= -12", 1) - - def ck_s_ge_2(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision >= -11", 1) - - def ck_s_ge_3(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision >= -10", 0) - - def ck_s_lt_1(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision < -12", 0) - - def ck_s_lt_2(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision < -11", 0) - - def ck_s_lt_3(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision < -10", 1) - - def ck_s_le_1(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision <= -12", 0) - - def ck_s_le_2(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision <= -11", 1) - - def ck_s_le_3(self): - return self.DFilterCount(pkt_ntp, - "ntp.precision <= -10", 1) - - def ck_bool_eq_1(self): - return self.DFilterCount(pkt_ntp, - "ip.flags.df == 0", 1) - - def ck_bool_eq_2(self): - return self.DFilterCount(pkt_ntp, - "ip.flags.df == 1", 0) - - def ck_bool_ne_1(self): - return self.DFilterCount(pkt_ntp, - "ip.flags.df != 1", 1) - - def ck_bool_ne_2(self): - return self.DFilterCount(pkt_ntp, - "ip.flags.df != 0", 0) - - def ck_ipx_1(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.net == 0x28", 1) - - def ck_ipx_2(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.net == 0x29", 0) - - - tests = [ - ck_eq_1, - ck_eq_2, - ck_ne_1, - ck_ne_2, - ck_u_gt_1, - ck_u_gt_2, - ck_u_gt_3, - ck_u_ge_1, - ck_u_ge_2, - ck_u_ge_3, - ck_u_lt_1, - ck_u_lt_2, - ck_u_lt_3, - ck_u_le_1, - ck_u_le_2, - ck_u_le_3, - ck_s_gt_1, - ck_s_gt_2, - ck_s_gt_3, - ck_s_ge_1, - ck_s_ge_2, - ck_s_ge_3, - ck_s_lt_1, - ck_s_lt_2, - ck_s_lt_3, - ck_s_le_1, - ck_s_le_2, - ck_s_le_3, - ck_bool_eq_1, - ck_bool_eq_2, - ck_bool_ne_1, - ck_bool_ne_2, - ck_ipx_1, - ck_ipx_2, - ] - -class IPv4(Test): - """Tests routines in ftype-ipv4.c""" - - def ck_eq_1(self): - return self.DFilterCount(pkt_nfs, - "ip.src == 172.25.100.14", 1) - - def ck_eq_2(self): - return self.DFilterCount(pkt_nfs, - "ip.src == 255.255.255.255", 0) - - def ck_ne_1(self): - return self.DFilterCount(pkt_nfs, - "ip.src != 172.25.100.14", 1) - - def ck_ne_2(self): - return self.DFilterCount(pkt_nfs, - "ip.src != 255.255.255.255", 2) - - def ck_gt_1(self): - return self.DFilterCount(pkt_nfs, - "ip.dst > 198.95.230.200", 0) - - def ck_gt_2(self): - return self.DFilterCount(pkt_nfs, - "ip.dst > 198.95.230.20", 0) - - def ck_gt_3(self): - return self.DFilterCount(pkt_nfs, - "ip.dst > 198.95.230.10", 1) - - def ck_ge_1(self): - return self.DFilterCount(pkt_nfs, - "ip.dst >= 198.95.230.200", 0) - - def ck_ge_2(self): - return self.DFilterCount(pkt_nfs, - "ip.dst >= 198.95.230.20", 1) - - def ck_ge_3(self): - return self.DFilterCount(pkt_nfs, - "ip.dst >= 198.95.230.10", 1) - - def ck_lt_1(self): - return self.DFilterCount(pkt_nfs, - "ip.src < 172.25.100.140", 1) - - def ck_lt_2(self): - return self.DFilterCount(pkt_nfs, - "ip.src < 172.25.100.14", 0) - - def ck_lt_3(self): - return self.DFilterCount(pkt_nfs, - "ip.src < 172.25.100.10", 0) - - def ck_le_1(self): - return self.DFilterCount(pkt_nfs, - "ip.src <= 172.25.100.140", 1) - - def ck_le_2(self): - return self.DFilterCount(pkt_nfs, - "ip.src <= 172.25.100.14", 1) - - def ck_le_3(self): - return self.DFilterCount(pkt_nfs, - "ip.src <= 172.25.100.10", 0) - - def ck_cidr_eq_1(self): - return self.DFilterCount(pkt_nfs, - "ip.src == 172.25.100.14/32", 1) - - def ck_cidr_eq_2(self): - return self.DFilterCount(pkt_nfs, - "ip.src == 172.25.100.0/24", 1) - - def ck_cidr_eq_3(self): - return self.DFilterCount(pkt_nfs, - "ip.src == 172.25.0.0/16", 1) - - def ck_cidr_eq_4(self): - return self.DFilterCount(pkt_nfs, - "ip.src == 172.0.0.0/8", 1) - - def ck_cidr_ne_1(self): - return self.DFilterCount(pkt_nfs, - "ip.src != 172.25.100.14/32", 1) - - def ck_cidr_ne_2(self): - return self.DFilterCount(pkt_nfs, - "ip.src != 172.25.100.0/24", 1) - - def ck_cidr_ne_3(self): - return self.DFilterCount(pkt_nfs, - "ip.src != 172.25.0.0/16", 1) - - def ck_cidr_ne_4(self): - return self.DFilterCount(pkt_nfs, - "ip.src != 200.0.0.0/8", 2) - - tests = [ - ck_eq_1, - ck_eq_2, - ck_ne_1, - ck_ne_2, - ck_gt_1, - ck_gt_2, - ck_gt_3, - ck_ge_1, - ck_ge_2, - ck_ge_3, - ck_lt_1, - ck_lt_2, - ck_lt_3, - ck_le_1, - ck_le_2, - ck_le_3, - ck_cidr_eq_1, - ck_cidr_eq_2, - ck_cidr_eq_3, - ck_cidr_eq_4, - ck_cidr_ne_1, - ck_cidr_ne_2, - ck_cidr_ne_3, - ck_cidr_ne_4, - ] - -class String(Test): - """Tests routines in ftype-string.c""" - - def ck_eq_1(self): - return self.DFilterCount(pkt_http, - 'http.request.method == "HEAD"', 1) - - def ck_eq_2(self): - return self.DFilterCount(pkt_http, - 'http.request.method == "POST"', 0) - - def ck_gt_1(self): - return self.DFilterCount(pkt_http, - 'http.request.method > "HEAC"', 1) - - def ck_gt_2(self): - return self.DFilterCount(pkt_http, - 'http.request.method > "HEAD"', 0) - - def ck_gt_3(self): - return self.DFilterCount(pkt_http, - 'http.request.method > "HEAE"', 0) - - def ck_ge_1(self): - return self.DFilterCount(pkt_http, - 'http.request.method >= "HEAC"', 1) - - def ck_ge_2(self): - return self.DFilterCount(pkt_http, - 'http.request.method >= "HEAD"', 1) - - def ck_ge_3(self): - return self.DFilterCount(pkt_http, - 'http.request.method >= "HEAE"', 0) - - def ck_lt_1(self): - return self.DFilterCount(pkt_http, - 'http.request.method < "HEAC"', 0) - - def ck_lt_2(self): - return self.DFilterCount(pkt_http, - 'http.request.method < "HEAD"', 0) - - def ck_lt_3(self): - return self.DFilterCount(pkt_http, - 'http.request.method < "HEAE"', 1) - - def ck_le_1(self): - return self.DFilterCount(pkt_http, - 'http.request.method <= "HEAC"', 0) - - def ck_le_2(self): - return self.DFilterCount(pkt_http, - 'http.request.method <= "HEAD"', 1) - - def ck_le_3(self): - return self.DFilterCount(pkt_http, - 'http.request.method <= "HEAE"', 1) - - # XXX - this isn't handled in wireshark yet - def ck_slice_1(self): - return self.DFilterCount(pkt_http, - 'http.request.method[0] == "H"', 1) - - def ck_slice_2(self): - return self.DFilterCount(pkt_http, - 'http.request.method[0] == "P"', 0) - - def ck_slice_3(self): - return self.DFilterCount(pkt_http, - 'http.request.method[0:4] == "HEAD"', 1) - - def ck_slice_4(self): - return self.DFilterCount(pkt_http, - 'http.request.method[0:4] != "HEAD"', 0) - - def ck_slice_5(self): - return self.DFilterCount(pkt_http, - 'http.request.method[1:2] == "EA"', 1) - - def ck_slice_6(self): - return self.DFilterCount(pkt_http, - 'http.request.method[1:2] > "EA"', 0) - - def ck_slice_7(self): - return self.DFilterCount(pkt_http, - 'http.request.method[-1] == "D"', 1) - - def ck_slice_8(self): - return self.DFilterCount(pkt_http, - 'http.request.method[-2] == "D"', 0) - - def ck_stringz_1(self): - return self.DFilterCount(pkt_tftp, - 'tftp.type == "octet"', 1) - - def ck_stringz_2(self): - return self.DFilterCount(pkt_tftp, - 'tftp.type == "junk"', 0) - - def ck_contains_1(self): - return self.DFilterCount(pkt_http, - 'http.request.method contains "E"', 1) - - def ck_contains_2(self): - return self.DFilterCount(pkt_http, - 'http.request.method contains "EA"', 1) - - def ck_contains_3(self): - return self.DFilterCount(pkt_http, - 'http.request.method contains "HEAD"', 1) - - def ck_contains_4(self): - return self.DFilterCount(pkt_http, - 'http.request.method contains "POST"', 0) - - def ck_contains_5(self): - return self.DFilterCount(pkt_http, - 'http.request.method contains 50:4f:53:54"', None) # "POST" - - def ck_contains_6(self): - return self.DFilterCount(pkt_http, - 'http.request.method contains 48:45:41:44"', 1) # "HEAD" - - def ck_contains_fail_0(self): - return self.DFilterCount(pkt_http, - 'http.user_agent contains "update"', 0) - - def ck_contains_fail_1(self): - return self.DFilterCount(pkt_http, - 'http.user_agent contains "UPDATE"', 0) - - def ck_contains_upper_0(self): - return self.DFilterCount(pkt_http, - 'upper(http.user_agent) contains "UPDATE"', 1) - - def ck_contains_upper_1(self): - return self.DFilterCount(pkt_http, - 'upper(http.user_agent) contains "update"', 0) - - def ck_contains_upper_2(self): - return self.DFilterCount(pkt_http, - 'upper(tcp.seq) == 4', None) - - def ck_contains_lower_0(self): - return self.DFilterCount(pkt_http, - 'lower(http.user_agent) contains "UPDATE"', 0) - - def ck_contains_lower_1(self): - return self.DFilterCount(pkt_http, - 'lower(http.user_agent) contains "update"', 1) - - def ck_contains_lower_2(self): - return self.DFilterCount(pkt_http, - 'lower(tcp.seq) == 4', None) - - - tests = [ - ck_eq_1, - ck_eq_2, - ck_gt_1, - ck_gt_2, - ck_gt_3, - ck_ge_1, - ck_ge_2, - ck_ge_3, - ck_lt_1, - ck_lt_2, - ck_lt_3, - ck_le_1, - ck_le_2, - ck_le_3, -# XXX -# ck_slice_1, -# ck_slice_2, -# ck_slice_3, -# ck_slice_4, -# ck_slice_5, -# ck_slice_6, -# ck_slice_7, -# ck_slice_8, - ck_stringz_1, - ck_stringz_2, - ck_contains_1, - ck_contains_2, - ck_contains_3, - ck_contains_4, - ck_contains_5, - ck_contains_fail_0, - ck_contains_fail_1, - ck_contains_upper_0, - ck_contains_upper_1, - ck_contains_upper_2, - ck_contains_lower_0, - ck_contains_lower_1, - ck_contains_lower_2, - ] - - -class Time(Test): - """Tests routines in ftype-time.c""" - - def ck_eq_1(self): - return self.DFilterCount(pkt_http, - 'frame.time == "Dec 31, 2002 07:55:31.3"', 1) - - def ck_eq_2(self): - return self.DFilterCount(pkt_http, - 'frame.time == "Jan 31, 2002 07:55:31.3"', 0) - - def ck_ne_1(self): - return self.DFilterCount(pkt_http, - 'frame.time != "Dec 31, 2002 07:55:31.3"', 0) - - def ck_ne_2(self): - return self.DFilterCount(pkt_http, - 'frame.time != "Jan 31, 2002 07:55:31.3"', 1) - - def ck_gt_1(self): - return self.DFilterCount(pkt_http, - 'frame.time > "Dec 31, 2002 07:54:31.3"', 1) - - def ck_gt_2(self): - return self.DFilterCount(pkt_http, - 'frame.time > "Dec 31, 2002 07:55:31.3"', 0) - - def ck_gt_3(self): - return self.DFilterCount(pkt_http, - 'frame.time > "Dec 31, 2002 07:56:31.3"', 0) - - def ck_ge_1(self): - return self.DFilterCount(pkt_http, - 'frame.time >= "Dec 31, 2002 07:54:31.3"', 1) - - def ck_ge_2(self): - return self.DFilterCount(pkt_http, - 'frame.time >= "Dec 31, 2002 07:55:31.3"', 1) - - def ck_ge_3(self): - return self.DFilterCount(pkt_http, - 'frame.time >= "Dec 31, 2002 07:56:31.3"', 0) - - def ck_lt_1(self): - return self.DFilterCount(pkt_http, - 'frame.time < "Dec 31, 2002 07:54:31.3"', 0) - - def ck_lt_2(self): - return self.DFilterCount(pkt_http, - 'frame.time < "Dec 31, 2002 07:55:31.3"', 0) - - def ck_lt_3(self): - return self.DFilterCount(pkt_http, - 'frame.time < "Dec 31, 2002 07:56:31.3"', 1) - - def ck_le_1(self): - return self.DFilterCount(pkt_http, - 'frame.time <= "Dec 31, 2002 07:54:31.3"', 0) - - def ck_le_2(self): - return self.DFilterCount(pkt_http, - 'frame.time <= "Dec 31, 2002 07:55:31.3"', 1) - - def ck_le_3(self): - return self.DFilterCount(pkt_http, - 'frame.time <= "Dec 31, 2002 07:56:31.3"', 1) - - def ck_relative_time_1(self): - return self.DFilterCount(pkt_nfs, - "frame.time_delta == 0.7", 1) - - def ck_relative_time_2(self): - return self.DFilterCount(pkt_nfs, - "frame.time_delta > 0.7", 0) - - def ck_relative_time_3(self): - return self.DFilterCount(pkt_nfs, - "frame.time_delta < 0.7", 1) - - tests = [ - ck_eq_1, - ck_eq_2, - ck_ne_1, - ck_ne_2, - ck_gt_1, - ck_gt_2, - ck_gt_3, - ck_ge_1, - ck_ge_2, - ck_ge_3, - ck_lt_1, - ck_lt_2, - ck_lt_3, - ck_le_1, - ck_le_2, - ck_le_3, - ck_relative_time_1, - ck_relative_time_2, - ck_relative_time_3, - ] - -class TVB(Test): - """Tests routines in ftype-tvb.c""" - - def ck_eq_1(self): - # We expect 0 because even though this byte - # string matches the 'eth' protocol, protocols cannot - # work in an '==' comparison yet. - return self.DFilterCount(pkt_http, - "eth == 00:e0:81:00:b0:28:00:09:6b:88:f6:c9:08:00", 0) - - def ck_slice_1(self): - return self.DFilterCount(pkt_http, - "ip[0:2] == 45:00", 1) - - def ck_slice_2(self): - return self.DFilterCount(pkt_http, - "ip[0:2] == 00:00", 0) - - def ck_slice_3(self): - return self.DFilterCount(pkt_http, - "ip[2:2] == 00:c1", 1) - - # These don't work yet in Wireshark - def ck_slice_4(self): - return self.DFilterCount(pkt_http, - "ip[-5] == 0x86", 1) - - def ck_slice_5(self): - return self.DFilterCount(pkt_http, - "ip[-1] == 0x86", 0) - - - def ck_contains_1(self): - return self.DFilterCount(pkt_http, - "eth contains 6b", 1) - - def ck_contains_2(self): - return self.DFilterCount(pkt_http, - "eth contains 09:6b:88", 1) - - def ck_contains_3(self): - return self.DFilterCount(pkt_http, - "eth contains 00:e0:81:00:b0:28:00:09:6b:88:f5:c9:08:00", 1) - - def ck_contains_4(self): - return self.DFilterCount(pkt_http, - "eth contains ff:ff:ff", 0) - - def ck_contains_5(self): - return self.DFilterCount(pkt_http, - 'http contains "HEAD"', 1) - - - tests = [ - ck_eq_1, - - ck_slice_1, - ck_slice_2, - ck_slice_3, -# XXX -# ck_slice_4, -# ck_slice_5, - ck_contains_1, - ck_contains_2, - ck_contains_3, - ck_contains_4, - ck_contains_5, - ] - - -class Scanner(Test): - """Tests routines in scanner.l""" - - def __init__(self): - print "Note: Scanner test does not yet test embedded double-quote." - - def ck_dquote_1(self): - return self.DFilterCount(pkt_http, - 'http.request.method == "HEAD"', 1) - - def ck_dquote_2(self): - return self.DFilterCount(pkt_http, - 'http.request.method == "\\x48EAD"', 1) - - def ck_dquote_3(self): - return self.DFilterCount(pkt_http, - 'http.request.method == "\\x58EAD"', 0) - - def ck_dquote_4(self): - return self.DFilterCount(pkt_http, - 'http.request.method == "\\110EAD"', 1) - - def ck_dquote_5(self): - return self.DFilterCount(pkt_http, - 'http.request.method == "\\111EAD"', 0) - - def ck_dquote_6(self): - return self.DFilterCount(pkt_http, - 'http.request.method == "\\HEAD"', 1) - - tests = [ - ck_dquote_1, - ck_dquote_2, - ck_dquote_3, - ck_dquote_4, - ck_dquote_5, - ] - -class Range(Test): - """Tests range routines""" - - # This test uess ipx.src.node with value 00:aa:00:a3:e3:a4 - - def ck_slice_1_pos(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.node[1] == aa", 1) - - def ck_slice_1_neg(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.node[1] == bb", 0) - - def ck_slice_1_hex_pos(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.node[1] == 0xaa", 1) - - def ck_slice_1_hex_neg(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.node[1] == 0xbb", 0) - - def ck_slice_2_pos(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.node[3:2] == a3:e3", 1) - - def ck_slice_2_neg(self): - return self.DFilterCount(pkt_ipx_rip, - "ipx.src.node[3:2] == cc:dd", 0) - - tests = [ - ck_slice_1_pos, - ck_slice_1_neg, - ck_slice_1_hex_pos, - ck_slice_1_hex_neg, - ck_slice_2_pos, - ck_slice_2_neg, - ] - - -################################################################################ - -# These are the test objects to run. -# Keep these in alphabetical order so the help message -# shows them in order. -all_tests = [ - Bytes(), - Double(), - Integer(), - IPv4(), - Range(), - Scanner(), - String(), - Time(), - TVB(), - ] - -def usage(): - print "usage: %s [OPTS] [TEST ...]" % (sys.argv[0],) - print "\t-p PATH : path to find both tshark and text2pcap (DEFAULT: . )" - print "\t-t FILE : location of tshark binary" - print "\t-x FILE : location of text2pcap binary" - print "\t-k : keep temporary files" - print "\t-v : verbose" - print - print "By not mentioning a test name, all tests are run." - print "Available tests are:" - for test in all_tests: - print "\t", test.__class__.__name__ - sys.exit(1) - -def main(): - - global TSHARK - global TEXT2PCAP - global VERBOSE - global REMOVE_TEMP_FILES - - # Parse the command-line options - optstring = "p:t:x:kv" - longopts = [] - - try: - opts, specific_tests = getopt.getopt(sys.argv[1:], optstring, longopts) - except getopt.GetoptError: - usage() - - for opt, arg in opts: - if opt == "-t": - TSHARK = arg - elif opt == "-x": - TEXT2PCAP = arg - elif opt == "-v": - VERBOSE = 1 - elif opt == "-p": - TEXT2PCAP = os.path.join(arg, "text2pcap") - TSHARK = os.path.join(arg, "tshark") - elif opt == "-k": - REMOVE_TEMP_FILES = 0 - else: - print "Un-handled option:", opt - usage() - - # Sanity test - if not os.path.exists(TSHARK): - sys.exit("tshark program '%s' does not exist." % (TSHARK,)) - - if not os.path.exists(TEXT2PCAP): - sys.exit("text2pcap program '%s' does not exist." % (TEXT2PCAP,)) - - - # Determine which tests to run. - tests_to_run = [] - if specific_tests: - # Go through the tests looking for the ones whose names - # match the command-line arguments. - all_ok = 1 - for test_name in specific_tests: - for test in all_tests: - if test_name == test.__class__.__name__: - tests_to_run.append(test) - break - else: - print >> sys.stderr, "%s is unrecognized as a test." % \ - (test_name,) - all_ok = 0 - - if not all_ok: - sys.exit(1) - else: - tests_to_run = all_tests - - # Run the tests and keep score. - tot_run = 0 - tot_succeeded = 0 - for test in tests_to_run: - print test.__class__.__name__ - (run, succeeded) = test.Run() - tot_run += run - tot_succeeded += succeeded - print - - print - print "Total Tests Run:", tot_run - print "Total Tests Succeeded:", tot_succeeded - print "Total Tests Failed:", tot_run - tot_succeeded - - if tot_succeeded == tot_run: - sys.exit(0) - else: - sys.exit(1) +import unittest + +# Import each test class so unittest.main() can find them +from dftestlib.bytes_type import testBytes +from dftestlib.bytes_ether import testBytesEther +from dftestlib.bytes_ipv6 import testBytesIPv6 +from dftestlib.double import testDouble +from dftestlib.integer import testInteger +from dftestlib.integer_1byte import testInteger1Byte +from dftestlib.ipv4 import testIPv4 +from dftestlib.range_method import testRange +from dftestlib.scanner import testScanner +from dftestlib.string_type import testString +from dftestlib.stringz import testStringz +from dftestlib.time_type import testTime +from dftestlib.time_relative import testTimeRelative +from dftestlib.tvb import testTVB +from dftestlib.uint64 import testUINT64 if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print "\nInterrupted by user." + unittest.main(verbosity=2) diff --git a/tools/dftestfiles/arp.cap b/tools/dftestfiles/arp.cap Binary files differnew file mode 100644 index 0000000000..f9fca0c1f6 --- /dev/null +++ b/tools/dftestfiles/arp.cap diff --git a/tools/dftestfiles/http.cap b/tools/dftestfiles/http.cap Binary files differnew file mode 100644 index 0000000000..145c2b05dc --- /dev/null +++ b/tools/dftestfiles/http.cap diff --git a/tools/dftestfiles/ipv6.cap b/tools/dftestfiles/ipv6.cap Binary files differnew file mode 100644 index 0000000000..bdfb72ce71 --- /dev/null +++ b/tools/dftestfiles/ipv6.cap diff --git a/tools/dftestfiles/ipx_rip.cap b/tools/dftestfiles/ipx_rip.cap Binary files differnew file mode 100644 index 0000000000..89533cb14d --- /dev/null +++ b/tools/dftestfiles/ipx_rip.cap diff --git a/tools/dftestfiles/nfs.cap b/tools/dftestfiles/nfs.cap Binary files differnew file mode 100644 index 0000000000..e71d80be74 --- /dev/null +++ b/tools/dftestfiles/nfs.cap diff --git a/tools/dftestfiles/ntp.cap b/tools/dftestfiles/ntp.cap Binary files differnew file mode 100644 index 0000000000..815ed719f9 --- /dev/null +++ b/tools/dftestfiles/ntp.cap diff --git a/tools/dftestfiles/tftp.cap b/tools/dftestfiles/tftp.cap Binary files differnew file mode 100644 index 0000000000..f88c22a055 --- /dev/null +++ b/tools/dftestfiles/tftp.cap diff --git a/tools/dftestlib/__init__.py b/tools/dftestlib/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/tools/dftestlib/__init__.py diff --git a/tools/dftestlib/bytes_ether.py b/tools/dftestlib/bytes_ether.py new file mode 100644 index 0000000000..849e6ac209 --- /dev/null +++ b/tools/dftestlib/bytes_ether.py @@ -0,0 +1,104 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testBytesEther(dftest.DFTest): + trace_file = "ipx_rip.cap" + + ### Note: Bytes test does not yet test FT_INT64. + + def test_eq_1(self): + dfilter = "eth.dst == ff:ff:ff:ff:ff:ff" + self.assertDFilterCount(dfilter, 1) + + def test_eq_2(self): + dfilter = "eth.src == ff:ff:ff:ff:ff:ff" + self.assertDFilterCount(dfilter, 0) + + def test_ne_1(self): + dfilter = "eth.dst != ff:ff:ff:ff:ff:ff" + self.assertDFilterCount(dfilter, 0) + + def test_ne_2(self): + dfilter = "eth.src != ff:ff:ff:ff:ff:ff" + self.assertDFilterCount(dfilter, 1) + + def test_gt_1(self): + dfilter = "eth.src > 00:aa:00:a3:e3:ff" + self.assertDFilterCount(dfilter, 0) + + def test_gt_2(self): + dfilter = "eth.src > 00:aa:00:a3:e3:a4" + self.assertDFilterCount(dfilter, 0) + + def test_gt_3(self): + dfilter = "eth.src > 00:aa:00:a3:e3:00" + self.assertDFilterCount(dfilter, 1) + + def test_ge_1(self): + dfilter = "eth.src >= 00:aa:00:a3:e3:ff" + self.assertDFilterCount(dfilter, 0) + + def test_ge_2(self): + dfilter = "eth.src >= 00:aa:00:a3:e3:a4" + self.assertDFilterCount(dfilter, 1) + + def test_ge_3(self): + dfilter = "eth.src >= 00:aa:00:a3:e3:00" + self.assertDFilterCount(dfilter, 1) + + def test_lt_1(self): + dfilter = "eth.src < 00:aa:00:a3:e3:ff" + self.assertDFilterCount(dfilter, 1) + + def test_lt_2(self): + dfilter = "eth.src < 00:aa:00:a3:e3:a4" + self.assertDFilterCount(dfilter, 0) + + def test_lt_3(self): + dfilter = "eth.src < 00:aa:00:a3:e3:00" + self.assertDFilterCount(dfilter, 0) + + def test_le_1(self): + dfilter = "eth.src <= 00:aa:00:a3:e3:ff" + self.assertDFilterCount(dfilter, 1) + + def test_le_2(self): + dfilter = "eth.src <= 00:aa:00:a3:e3:a4" + self.assertDFilterCount(dfilter, 1) + + def test_le_3(self): + dfilter = "eth.src <= 00:aa:00:a3:e3:00" + self.assertDFilterCount(dfilter, 0) + + def test_slice_1(self): + dfilter = "eth.src[0:3] == 00:aa:00" + self.assertDFilterCount(dfilter, 1) + + def test_slice_2(self): + dfilter = "eth.src[-3:3] == a3:e3:a4" + self.assertDFilterCount(dfilter, 1) + + def test_slice_3(self): + dfilter = "eth.src[1:4] == aa:00:a3:e3" + self.assertDFilterCount(dfilter, 1) + + def test_slice_4(self): + dfilter = "eth.src[0] == 00" + self.assertDFilterCount(dfilter, 1) + + def test_contains_1(self): + dfilter = "ipx.src.node contains a3" + self.assertDFilterCount(dfilter, 1) + + def test_contains_2(self): + dfilter = "ipx.src.node contains a3:e3" + self.assertDFilterCount(dfilter, 1) + + def test_contains_3(self): + dfilter = "ipx.src.node contains 00:aa:00:a3:e3:a4" + self.assertDFilterCount(dfilter, 1) + + def test_contains_4(self): + dfilter = "ipx.src.node contains aa:e3" + self.assertDFilterCount(dfilter, 0) diff --git a/tools/dftestlib/bytes_ipv6.py b/tools/dftestlib/bytes_ipv6.py new file mode 100644 index 0000000000..4736051b9e --- /dev/null +++ b/tools/dftestlib/bytes_ipv6.py @@ -0,0 +1,14 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testBytesIPv6(dftest.DFTest): + trace_file = "ipv6.cap" + + def test_ipv6_1(self): + dfilter = "ipv6.dst == ff05::9999" + self.assertDFilterCount(dfilter, 1) + + def test_ipv6_2(self): + dfilter = "ipv6.dst == ff05::9990" + self.assertDFilterCount(dfilter, 0) diff --git a/tools/dftestlib/bytes_type.py b/tools/dftestlib/bytes_type.py new file mode 100644 index 0000000000..e17c9a14f0 --- /dev/null +++ b/tools/dftestlib/bytes_type.py @@ -0,0 +1,14 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testBytes(dftest.DFTest): + trace_file = "arp.cap" + + def test_bytes_1(self): + dfilter = "arp.dst.hw == 00:64" + self.assertDFilterCount(dfilter, 1) + + def test_ipv6_2(self): + dfilter = "arp.dst.hw == 00:00" + self.assertDFilterCount(dfilter, 0) diff --git a/tools/dftestlib/dftest.py b/tools/dftestlib/dftest.py new file mode 100644 index 0000000000..2fa44eec8c --- /dev/null +++ b/tools/dftestlib/dftest.py @@ -0,0 +1,76 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +import os +import tempfile +import unittest + +from dftestlib import util + +# The binaries to use. We assume we are running +# from the top of the wireshark distro +TSHARK = os.path.join(".", "tshark") + +class DFTest(unittest.TestCase): + """Base class for all tests in this dfilter-test collection.""" + + # Remove these file when finished (in tearDownClass) + files_to_remove = [] + + @classmethod + def setUpClass(cls): + """Create the trace file to be used in the tests.""" + assert cls.trace_file + + # if the class sets the 'trace_file' field, then it + # names the trace file to use for the tests. It *should* + # reside in dftestfiles + assert not os.path.isabs(cls.trace_file) + cls.trace_file = os.path.join(".", "tools", "dftestfiles", + cls.trace_file) + + @classmethod + def tearDownClass(cls): + """Remove the trace file used in the tests.""" + for filename in cls.files_to_remove: + if os.path.exists(filename): + try: + os.remove(filename) + except OSError: + pass + + + def runDFilter(self, dfilter): + # Create the tshark command + cmdv = [TSHARK, + "-n", # No name resolution + "-r", # Next arg is trace file to read + self.trace_file, + "-Y", # packet display filter (used to be -R) + dfilter] + + (status, output) = util.exec_cmdv(cmdv) + return status, output + + + def assertDFilterCount(self, dfilter, expected_count): + """Run a display filter and expect a certain number of packets.""" + + (status, output) = self.runDFilter(dfilter) + + # tshark must succeed + self.assertEqual(status, util.SUCCESS, output) + + # Split the output (one big string) into lines, removing + # empty lines (extra newline at end of output) + lines = [L for L in output.split("\n") if L != ""] + + msg = "Expected %d, got: %s" % (expected_count, output) + self.assertEqual(len(lines), expected_count, msg) + + def assertDFilterFail(self, dfilter): + """Run a display filter and expect tshark to fail""" + + (status, output) = self.runDFilter(dfilter) + + # tshark must succeed + self.assertNotEqual(status, util.SUCCESS, output) diff --git a/tools/dftestlib/double.py b/tools/dftestlib/double.py new file mode 100644 index 0000000000..9b84a25c79 --- /dev/null +++ b/tools/dftestlib/double.py @@ -0,0 +1,63 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testDouble(dftest.DFTest): + + trace_file = "ntp.cap" + + def test_eq_1(self): + dfilter = "ntp.rootdelay == 0.0626983642578125" + self.assertDFilterCount(dfilter, 1) + + def test_eq_2(self): + dfilter = "ntp.rootdelay == 0.0626" + self.assertDFilterCount(dfilter, 0) + + def test_gt_1(self): + dfilter = "ntp.rootdelay > 1.0626" + self.assertDFilterCount(dfilter, 0) + + def test_gt_2(self): + dfilter = "ntp.rootdelay > 0.0626983642578125" + self.assertDFilterCount(dfilter, 0) + + def test_gt_3(self): + dfilter = "ntp.rootdelay > 0.0026" + self.assertDFilterCount(dfilter, 1) + + def test_ge_1(self): + dfilter = "ntp.rootdelay >= 1.0026" + self.assertDFilterCount(dfilter, 0) + + def test_ge_2(self): + dfilter = "ntp.rootdelay >= 0.0626983642578125" + self.assertDFilterCount(dfilter, 1) + + def test_ge_3(self): + dfilter = "ntp.rootdelay >= 0.0026" + self.assertDFilterCount(dfilter, 1) + + def test_lt_1(self): + dfilter = "ntp.rootdelay < 1.0026" + self.assertDFilterCount(dfilter, 1) + + def test_lt_2(self): + dfilter = "ntp.rootdelay < 0.0626983642578125" + self.assertDFilterCount(dfilter, 0) + + def test_lt_3(self): + dfilter = "ntp.rootdelay < 0.0026" + self.assertDFilterCount(dfilter, 0) + + def test_le_1(self): + dfilter = "ntp.rootdelay <= 1.0026" + self.assertDFilterCount(dfilter, 1) + + def test_le_2(self): + dfilter = "ntp.rootdelay <= 0.0626983642578125" + self.assertDFilterCount(dfilter, 1) + + def test_le_3(self): + dfilter = "ntp.rootdelay <= 0.0026" + self.assertDFilterCount(dfilter, 0) diff --git a/tools/dftestlib/integer.py b/tools/dftestlib/integer.py new file mode 100644 index 0000000000..327b68daa1 --- /dev/null +++ b/tools/dftestlib/integer.py @@ -0,0 +1,134 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testInteger(dftest.DFTest): + trace_file = "ntp.cap" + + def test_eq_1(self): + dfilter = "ip.version == 4" + self.assertDFilterCount(dfilter, 1) + + def test_eq_2(self): + dfilter = "ip.version == 6" + self.assertDFilterCount(dfilter, 0) + + def test_ne_1(self): + dfilter = "ip.version != 0" + self.assertDFilterCount(dfilter, 1) + + def test_ne_2(self): + dfilter = "ip.version != 4" + self.assertDFilterCount(dfilter, 0) + + def test_u_gt_1(self): + dfilter = "ip.version > 3" + self.assertDFilterCount(dfilter, 1) + + def test_u_gt_2(self): + dfilter = "ip.version > 4" + self.assertDFilterCount(dfilter, 0) + + def test_u_gt_3(self): + dfilter = "ip.version > 5" + self.assertDFilterCount(dfilter, 0) + + def test_u_ge_1(self): + dfilter = "ip.version >= 3" + self.assertDFilterCount(dfilter, 1) + + def test_u_ge_2(self): + dfilter = "ip.version >= 4" + self.assertDFilterCount(dfilter, 1) + + def test_u_ge_3(self): + dfilter = "ip.version >= 5" + self.assertDFilterCount(dfilter, 0) + + def test_u_lt_1(self): + dfilter = "ip.version < 3" + self.assertDFilterCount(dfilter, 0) + + def test_u_lt_2(self): + dfilter = "ip.version < 4" + self.assertDFilterCount(dfilter, 0) + + def test_u_lt_3(self): + dfilter = "ip.version < 5" + self.assertDFilterCount(dfilter, 1) + + def test_u_le_1(self): + dfilter = "ip.version <= 3" + self.assertDFilterCount(dfilter, 0) + + def test_u_le_2(self): + dfilter = "ip.version <= 4" + self.assertDFilterCount(dfilter, 1) + + def test_u_le_3(self): + dfilter = "ip.version <= 5" + self.assertDFilterCount(dfilter, 1) + + def test_s_gt_1(self): + dfilter = "ntp.precision > -12" + self.assertDFilterCount(dfilter, 1) + + def test_s_gt_2(self): + dfilter = "ntp.precision > -11" + self.assertDFilterCount(dfilter, 0) + + def test_s_gt_3(self): + dfilter = "ntp.precision > -10" + self.assertDFilterCount(dfilter, 0) + + def test_s_ge_1(self): + dfilter = "ntp.precision >= -12" + self.assertDFilterCount(dfilter, 1) + + def test_s_ge_2(self): + dfilter = "ntp.precision >= -11" + self.assertDFilterCount(dfilter, 1) + + def test_s_ge_3(self): + dfilter = "ntp.precision >= -10" + self.assertDFilterCount(dfilter, 0) + + def test_s_lt_1(self): + dfilter = "ntp.precision < -12" + self.assertDFilterCount(dfilter, 0) + + def test_s_lt_2(self): + dfilter = "ntp.precision < -11" + self.assertDFilterCount(dfilter, 0) + + def test_s_lt_3(self): + dfilter = "ntp.precision < -10" + self.assertDFilterCount(dfilter, 1) + + def test_s_le_1(self): + dfilter = "ntp.precision <= -12" + self.assertDFilterCount(dfilter, 0) + + def test_s_le_2(self): + dfilter = "ntp.precision <= -11" + self.assertDFilterCount(dfilter, 1) + + def test_s_le_3(self): + dfilter = "ntp.precision <= -10" + self.assertDFilterCount(dfilter, 1) + + def test_bool_eq_1(self): + dfilter = "ip.flags.df == 0" + self.assertDFilterCount(dfilter, 1) + + def test_bool_eq_2(self): + dfilter = "ip.flags.df == 1" + self.assertDFilterCount(dfilter, 0) + + def test_bool_ne_1(self): + dfilter = "ip.flags.df != 1" + self.assertDFilterCount(dfilter, 1) + + def test_bool_ne_2(self): + dfilter = "ip.flags.df != 0" + self.assertDFilterCount(dfilter, 0) diff --git a/tools/dftestlib/integer_1byte.py b/tools/dftestlib/integer_1byte.py new file mode 100644 index 0000000000..4c869a37af --- /dev/null +++ b/tools/dftestlib/integer_1byte.py @@ -0,0 +1,15 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testInteger1Byte(dftest.DFTest): + + trace_file = "ipx_rip.cap" + + def test_ipx_1(self): + dfilter = "ipx.src.net == 0x28" + self.assertDFilterCount(dfilter, 1) + + def test_ipx_2(self): + dfilter = "ipx.src.net == 0x29" + self.assertDFilterCount(dfilter, 0) diff --git a/tools/dftestlib/ipv4.py b/tools/dftestlib/ipv4.py new file mode 100644 index 0000000000..dc028a1897 --- /dev/null +++ b/tools/dftestlib/ipv4.py @@ -0,0 +1,108 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testIPv4(dftest.DFTest): + trace_file = "nfs.cap" + + def test_uint64_1(self): + dfilter = "nfs.fattr3.size == 264032" + self.assertDFilterCount(dfilter, 1) + + def test_eq_1(self): + dfilter = "ip.src == 172.25.100.14" + self.assertDFilterCount(dfilter, 1) + + def test_eq_2(self): + dfilter = "ip.src == 255.255.255.255" + self.assertDFilterCount(dfilter, 0) + + def test_ne_1(self): + dfilter = "ip.src != 172.25.100.14" + self.assertDFilterCount(dfilter, 1) + + def test_ne_2(self): + dfilter = "ip.src != 255.255.255.255" + self.assertDFilterCount(dfilter, 2) + + def test_gt_1(self): + dfilter = "ip.dst > 198.95.230.200" + self.assertDFilterCount(dfilter, 0) + + def test_gt_2(self): + dfilter = "ip.dst > 198.95.230.20" + self.assertDFilterCount(dfilter, 0) + + def test_gt_3(self): + dfilter = "ip.dst > 198.95.230.10" + self.assertDFilterCount(dfilter, 1) + + def test_ge_1(self): + dfilter = "ip.dst >= 198.95.230.200" + self.assertDFilterCount(dfilter, 0) + + def test_ge_2(self): + dfilter = "ip.dst >= 198.95.230.20" + self.assertDFilterCount(dfilter, 1) + + def test_ge_3(self): + dfilter = "ip.dst >= 198.95.230.10" + self.assertDFilterCount(dfilter, 1) + + def test_lt_1(self): + dfilter = "ip.src < 172.25.100.140" + self.assertDFilterCount(dfilter, 1) + + def test_lt_2(self): + dfilter = "ip.src < 172.25.100.14" + self.assertDFilterCount(dfilter, 0) + + def test_lt_3(self): + dfilter = "ip.src < 172.25.100.10" + self.assertDFilterCount(dfilter, 0) + + def test_le_1(self): + dfilter = "ip.src <= 172.25.100.140" + self.assertDFilterCount(dfilter, 1) + + def test_le_2(self): + dfilter = "ip.src <= 172.25.100.14" + self.assertDFilterCount(dfilter, 1) + + def test_le_3(self): + dfilter = "ip.src <= 172.25.100.10" + self.assertDFilterCount(dfilter, 0) + + def test_cidr_eq_1(self): + dfilter = "ip.src == 172.25.100.14/32" + self.assertDFilterCount(dfilter, 1) + + def test_cidr_eq_2(self): + dfilter = "ip.src == 172.25.100.0/24" + self.assertDFilterCount(dfilter, 1) + + def test_cidr_eq_3(self): + dfilter = "ip.src == 172.25.0.0/16" + self.assertDFilterCount(dfilter, 1) + + def test_cidr_eq_4(self): + dfilter = "ip.src == 172.0.0.0/8" + self.assertDFilterCount(dfilter, 1) + + def test_cidr_ne_1(self): + dfilter = "ip.src != 172.25.100.14/32" + self.assertDFilterCount(dfilter, 1) + + def test_cidr_ne_2(self): + dfilter = "ip.src != 172.25.100.0/24" + self.assertDFilterCount(dfilter, 1) + + def test_cidr_ne_3(self): + dfilter = "ip.src != 172.25.0.0/16" + self.assertDFilterCount(dfilter, 1) + + def test_cidr_ne_4(self): + dfilter = "ip.src != 200.0.0.0/8" + self.assertDFilterCount(dfilter, 2) + + diff --git a/tools/dftestlib/range_method.py b/tools/dftestlib/range_method.py new file mode 100644 index 0000000000..06be80b576 --- /dev/null +++ b/tools/dftestlib/range_method.py @@ -0,0 +1,30 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testRange(dftest.DFTest): + trace_file = "ipx_rip.cap" + + def test_slice_1_pos(self): + dfilter = "ipx.src.node[1] == aa" + self.assertDFilterCount(dfilter, 1) + + def test_slice_1_neg(self): + dfilter = "ipx.src.node[1] == bb" + self.assertDFilterCount(dfilter, 0) + + def test_slice_1_hex_pos(self): + dfilter = "ipx.src.node[1] == 0xaa" + self.assertDFilterCount(dfilter, 1) + + def test_slice_1_hex_neg(self): + dfilter = "ipx.src.node[1] == 0xbb" + self.assertDFilterCount(dfilter, 0) + + def test_slice_2_pos(self): + dfilter = "ipx.src.node[3:2] == a3:e3" + self.assertDFilterCount(dfilter, 1) + + def test_slice_2_neg(self): + dfilter = "ipx.src.node[3:2] == cc:dd" + self.assertDFilterCount(dfilter, 0) diff --git a/tools/dftestlib/scanner.py b/tools/dftestlib/scanner.py new file mode 100644 index 0000000000..bca1f1924e --- /dev/null +++ b/tools/dftestlib/scanner.py @@ -0,0 +1,30 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testScanner(dftest.DFTest): + trace_file = "http.cap" + + def test_dquote_1(self): + dfilter = 'http.request.method == "HEAD"' + self.assertDFilterCount(dfilter, 1) + + def test_dquote_2(self): + dfilter = 'http.request.method == "\\x48EAD"' + self.assertDFilterCount(dfilter, 1) + + def test_dquote_3(self): + dfilter = 'http.request.method == "\\x58EAD"' + self.assertDFilterCount(dfilter, 0) + + def test_dquote_4(self): + dfilter = 'http.request.method == "\\110EAD"' + self.assertDFilterCount(dfilter, 1) + + def test_dquote_5(self): + dfilter = 'http.request.method == "\\111EAD"' + self.assertDFilterCount(dfilter, 0) + + def test_dquote_6(self): + dfilter = 'http.request.method == "\\HEAD"' + self.assertDFilterCount(dfilter, 1) diff --git a/tools/dftestlib/string_type.py b/tools/dftestlib/string_type.py new file mode 100644 index 0000000000..fcc6e76f9e --- /dev/null +++ b/tools/dftestlib/string_type.py @@ -0,0 +1,159 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testString(dftest.DFTest): + trace_file = "http.cap" + + def test_eq_1(self): + dfilter = 'http.request.method == "HEAD"' + self.assertDFilterCount(dfilter, 1) + + def test_eq_2(self): + dfilter = 'http.request.method == "POST"' + self.assertDFilterCount(dfilter, 0) + + def test_gt_1(self): + dfilter = 'http.request.method > "HEAC"' + self.assertDFilterCount(dfilter, 1) + + def test_gt_2(self): + dfilter = 'http.request.method > "HEAD"' + self.assertDFilterCount(dfilter, 0) + + def test_gt_3(self): + dfilter = 'http.request.method > "HEAE"' + self.assertDFilterCount(dfilter, 0) + + def test_ge_1(self): + dfilter = 'http.request.method >= "HEAC"' + self.assertDFilterCount(dfilter, 1) + + def test_ge_2(self): + dfilter = 'http.request.method >= "HEAD"' + self.assertDFilterCount(dfilter, 1) + + def test_ge_3(self): + dfilter = 'http.request.method >= "HEAE"' + self.assertDFilterCount(dfilter, 0) + + def test_lt_1(self): + dfilter = 'http.request.method < "HEAC"' + self.assertDFilterCount(dfilter, 0) + + def test_lt_2(self): + dfilter = 'http.request.method < "HEAD"' + self.assertDFilterCount(dfilter, 0) + + def test_lt_3(self): + dfilter = 'http.request.method < "HEAE"' + self.assertDFilterCount(dfilter, 1) + + def test_le_1(self): + dfilter = 'http.request.method <= "HEAC"' + self.assertDFilterCount(dfilter, 0) + + def test_le_2(self): + dfilter = 'http.request.method <= "HEAD"' + self.assertDFilterCount(dfilter, 1) + + def test_le_3(self): + dfilter = 'http.request.method <= "HEAE"' + self.assertDFilterCount(dfilter, 1) + + def test_slice_1(self): + dfilter = 'http.request.method[0] == "H"' + self.assertDFilterCount(dfilter, 1) + + def test_slice_2(self): + dfilter = 'http.request.method[0] == "P"' + self.assertDFilterCount(dfilter, 0) + + def test_slice_3(self): + dfilter = 'http.request.method[0:4] == "HEAD"' + self.assertDFilterCount(dfilter, 1) + + def test_slice_4(self): + dfilter = 'http.request.method[0:4] != "HEAD"' + self.assertDFilterCount(dfilter, 0) + + def test_slice_5(self): + dfilter = 'http.request.method[1:2] == "EA"' + self.assertDFilterCount(dfilter, 1) + + def test_slice_6(self): + dfilter = 'http.request.method[1:2] > "EA"' + self.assertDFilterCount(dfilter, 0) + + def test_slice_7(self): + dfilter = 'http.request.method[-1] == "D"' + self.assertDFilterCount(dfilter, 1) + + def test_slice_8(self): + dfilter = 'http.request.method[-2] == "D"' + self.assertDFilterCount(dfilter, 0) + + def xxxtest_stringz_1(self): + return self.DFilterCount(pkt_tftp, + 'tftp.type == "octet"', 1) + + def xxxtest_stringz_2(self): + return self.DFilterCount(pkt_tftp, + 'tftp.type == "junk"', 0) + + def test_contains_1(self): + dfilter = 'http.request.method contains "E"' + self.assertDFilterCount(dfilter, 1) + + def test_contains_2(self): + dfilter = 'http.request.method contains "EA"' + self.assertDFilterCount(dfilter, 1) + + def test_contains_3(self): + dfilter = 'http.request.method contains "HEAD"' + self.assertDFilterCount(dfilter, 1) + + def test_contains_4(self): + dfilter = 'http.request.method contains "POST"' + self.assertDFilterCount(dfilter, 0) + + def test_contains_5(self): + dfilter = 'http.request.method contains 50:4f:53:54' # "POST" + self.assertDFilterCount(dfilter, 0) + + def test_contains_6(self): + dfilter = 'http.request.method contains 48:45:41:44' # "HEAD" + self.assertDFilterCount(dfilter, 1) + + def test_contains_fail_0(self): + dfilter = 'http.user_agent contains "update"' + self.assertDFilterCount(dfilter, 0) + + def test_contains_fail_1(self): + dfilter = 'http.user_agent contains "UPDATE"' + self.assertDFilterCount(dfilter, 0) + + def test_contains_upper_0(self): + dfilter = 'upper(http.user_agent) contains "UPDATE"' + self.assertDFilterCount(dfilter, 1) + + def test_contains_upper_1(self): + dfilter = 'upper(http.user_agent) contains "update"' + self.assertDFilterCount(dfilter, 0) + + def test_contains_upper_2(self): + dfilter = 'upper(tcp.seq) == 4' + self.assertDFilterFail(dfilter) + + def test_contains_lower_0(self): + dfilter = 'lower(http.user_agent) contains "UPDATE"' + self.assertDFilterCount(dfilter, 0) + + def test_contains_lower_1(self): + dfilter = 'lower(http.user_agent) contains "update"' + self.assertDFilterCount(dfilter, 1) + + def test_contains_lower_2(self): + dfilter = 'lower(tcp.seq) == 4' + self.assertDFilterFail(dfilter) + diff --git a/tools/dftestlib/stringz.py b/tools/dftestlib/stringz.py new file mode 100644 index 0000000000..c3e85ea52a --- /dev/null +++ b/tools/dftestlib/stringz.py @@ -0,0 +1,19 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testStringz(dftest.DFTest): + trace_file = "tftp.cap" + + def test_stringz_1(self): + dfilter = 'tftp.type == octet' + self.assertDFilterCount(dfilter, 1) + + def test_stringz_2(self): + dfilter = 'tftp.type == "octet"' + self.assertDFilterCount(dfilter, 1) + + def test_stringz_3(self): + dfilter = 'tftp.type == junk' + self.assertDFilterCount(dfilter, 0) + diff --git a/tools/dftestlib/time_relative.py b/tools/dftestlib/time_relative.py new file mode 100644 index 0000000000..68abae511d --- /dev/null +++ b/tools/dftestlib/time_relative.py @@ -0,0 +1,19 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testTimeRelative(dftest.DFTest): + trace_file = "nfs.cap" + + def test_relative_time_1(self): + dfilter = "frame.time_delta == 0.7" + self.assertDFilterCount(dfilter, 1) + + def test_relative_time_2(self): + dfilter = "frame.time_delta > 0.7" + self.assertDFilterCount(dfilter, 0) + + def test_relative_time_3(self): + dfilter = "frame.time_delta < 0.7" + self.assertDFilterCount(dfilter, 1) + diff --git a/tools/dftestlib/time_type.py b/tools/dftestlib/time_type.py new file mode 100644 index 0000000000..41982cb126 --- /dev/null +++ b/tools/dftestlib/time_type.py @@ -0,0 +1,71 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testTime(dftest.DFTest): + trace_file = "http.cap" + + def test_eq_1(self): + dfilter = 'frame.time == "Dec 31, 2002 07:55:31.3"' + self.assertDFilterCount(dfilter, 1) + + def test_eq_2(self): + dfilter = 'frame.time == "Jan 31, 2002 07:55:31.3"' + self.assertDFilterCount(dfilter, 0) + + def test_ne_1(self): + dfilter = 'frame.time != "Dec 31, 2002 07:55:31.3"' + self.assertDFilterCount(dfilter, 0) + + def test_ne_2(self): + dfilter = 'frame.time != "Jan 31, 2002 07:55:31.3"' + self.assertDFilterCount(dfilter, 1) + + def test_gt_1(self): + dfilter = 'frame.time > "Dec 31, 2002 07:54:31.3"' + self.assertDFilterCount(dfilter, 1) + + def test_gt_2(self): + dfilter = 'frame.time > "Dec 31, 2002 07:55:31.3"' + self.assertDFilterCount(dfilter, 0) + + def test_gt_3(self): + dfilter = 'frame.time > "Dec 31, 2002 07:56:31.3"' + self.assertDFilterCount(dfilter, 0) + + def test_ge_1(self): + dfilter = 'frame.time >= "Dec 31, 2002 07:54:31.3"' + self.assertDFilterCount(dfilter, 1) + + def test_ge_2(self): + dfilter = 'frame.time >= "Dec 31, 2002 07:55:31.3"' + self.assertDFilterCount(dfilter, 1) + + def test_ge_3(self): + dfilter = 'frame.time >= "Dec 31, 2002 07:56:31.3"' + self.assertDFilterCount(dfilter, 0) + + def test_lt_1(self): + dfilter = 'frame.time < "Dec 31, 2002 07:54:31.3"' + self.assertDFilterCount(dfilter, 0) + + def test_lt_2(self): + dfilter = 'frame.time < "Dec 31, 2002 07:55:31.3"' + self.assertDFilterCount(dfilter, 0) + + def test_lt_3(self): + dfilter = 'frame.time < "Dec 31, 2002 07:56:31.3"' + self.assertDFilterCount(dfilter, 1) + + def test_le_1(self): + dfilter = 'frame.time <= "Dec 31, 2002 07:54:31.3"' + self.assertDFilterCount(dfilter, 0) + + def test_le_2(self): + dfilter = 'frame.time <= "Dec 31, 2002 07:55:31.3"' + self.assertDFilterCount(dfilter, 1) + + def test_le_3(self): + dfilter = 'frame.time <= "Dec 31, 2002 07:56:31.3"' + self.assertDFilterCount(dfilter, 1) + diff --git a/tools/dftestlib/uint64.py b/tools/dftestlib/uint64.py new file mode 100644 index 0000000000..c6cf82b040 --- /dev/null +++ b/tools/dftestlib/uint64.py @@ -0,0 +1,14 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +from dftestlib import dftest + +class testUINT64(dftest.DFTest): + trace_file = "nfs.cap" + + def test_uint64_1(self): + dfilter = "nfs.fattr3.size == 264032" + self.assertDFilterCount(dfilter, 1) + + def test_uint64_2(self): + dfilter = "nfs.fattr3.size == 264000" + self.assertDFilterCount(dfilter, 0) diff --git a/tools/dftestlib/util.py b/tools/dftestlib/util.py new file mode 100644 index 0000000000..7c66385907 --- /dev/null +++ b/tools/dftestlib/util.py @@ -0,0 +1,29 @@ +# Copyright (c) 2013 by Gilbert Ramirez <gram@alumni.rice.edu> + +import subprocess + +SUCCESS = 0 +def exec_cmdv(cmdv, cwd=None, stdin=None): + """Run the commands in cmdv, returning (retval, output), + where output is stdout and stderr combined. + If cwd is given, the child process runs in that directory. + If a filehandle is passed as stdin, it is used as stdin. + If there is an OS-level error, None is the retval.""" + + try: + output = subprocess.check_output(cmdv, stderr=subprocess.STDOUT, + cwd=cwd, stdin=stdin) + retval = SUCCESS + + # If file isn't executable + except OSError, e: + output = str(e) + retval = None + + # If process returns non-zero + except subprocess.CalledProcessError, e: + output = e.output + retval = e.returncode + + return (retval, output) + |