1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
#!/usr/bin/env python
"""
Restore conntrack entries from XML.
Usage:
sudo conntrack -L -o xml > ct.xml
# interface down / up: nmcli c up ConnectionNameHere
~/scripts/conntrack-restore ct.xml | sudo sh -x
Arch Linux package dependencies:
python-lxml conntrack-tools
"""
import lxml.etree as ET
import sys
import shlex
VERBOSE = False
options = []
def O(short_option, long_option, path, attr=None, text=None):
prefix, path = path.split("/", 1)
directions = {
"O": "original",
"R": "reply",
"I": "independent",
}
prefix = "meta[%s]" % " or ".join(
"@direction='%s'" % directions[d] for d in prefix
)
path = "%s/%s" % (prefix, path)
options.append({
"option": long_option if VERBOSE or not short_option else short_option,
"path": path,
"attr": attr,
"text": text,
})
# Filter parameters
O("-s", "--orig-src", "O/layer3/src")
O("-d", "--orig-dst", "O/layer3/dst")
O("-r", "--reply-src", "R/layer3/src")
O("-q", "--reply-dst", "R/layer3/dst")
O("-p", "--proto", "OR/layer4", attr="protoname")
O("-t", "--timeout", "I/timeout")
O("-m", "--mark", "I/mark")
# TODO -l --label
O("-c", "--secmark", "I/secmark")
O("-u", "--status", "I/assured", text="ASSURED")
O("-u", "--status", "I/unreplied", text="SEEN_REPLY")
# Protocol filter parameters
O(None, "--state", "I/state")
O(None, "--sport", "O/layer4/sport")
O(None, "--dport", "O/layer4/dport")
O(None, "--reply-port-src", "R/layer4/sport")
O(None, "--reply-port-dst", "R/layer4/dport")
if len(sys.argv) == 2:
ct_root = ET.parse(sys.argv[1]).getroot()
else:
# XML has encoding attr, lxml expects bytes.
conntrack_xml = sys.stdin.read().encode('utf8')
ct_root = ET.fromstring(conntrack_xml)
for flow in ct_root.iter("flow"):
arguments = []
for option in options:
option_name, path = option["option"], option["path"]
elements = flow.xpath(path)
if not elements:
#print("El not found: %s", path)
continue
e = elements[0]
if option["text"]:
text = option["text"]
elif option["attr"]:
text = e.attrib[option["attr"]]
else:
text = e.text
arguments += (option_name, text)
arguments[0:0] = [
"conntrack",
"--insert" if VERBOSE else "-I"
]
command = " ".join(shlex.quote(arg) for arg in arguments)
print(command)
|