-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
178 lines (149 loc) · 6.04 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""
This file contains some util definitions for
testrunner. In particular
- functions to help interpret the output of SMT solvers
- functions to kill processes
"""
import logging
import os
import re
from enum import Enum
from datetime import datetime
import time
import psutil
from typing import (
Callable,
Iterable,
Tuple,
List,
TypeVar,
Union,
)
import itertools
T = TypeVar('T')
# Our scripts are commonly used for understanding output from SMT solvers
class Satisfiablity(str, Enum):
SAT = "SAT"
UNSAT = "UNSAT"
UNKNOWN = "UNKNOWN" # Actual result unknown
UNSURE = "UNSURE" # Could not find satisfiability
def satisfiability_of_output(output: str) -> Satisfiablity:
# TODO likely faster to do 'sat' in output.lower() or something
if re.search('Unsat', output):
return Satisfiablity.UNSAT
elif re.search('Sat', output):
return Satisfiablity.SAT
elif re.search('Unknown', output):
return Satisfiablity.UNKNOWN
return Satisfiablity.UNSURE
def valid_smt(filename: Union[bytes, str]) -> bool:
if os.fspath(filename).endswith('.smt2'):
return True
else:
return False
def exclude(bad_values: List[str]) -> Callable[[Union[bytes, str]], bool]:
def do_exclusion(dirname: Union[bytes, str]) -> bool:
if type(dirname) == bytes:
dirname = dirname.decode('utf8')
pathname = os.path.basename(os.fspath(dirname))
if pathname == '':
pathname = os.path.basename(os.fspath(dirname[:-1]))
return pathname not in bad_values
return do_exclusion
def partition(pred: Callable[[T], bool], iterable: Iterable[T]) -> Tuple[List[T], List[T]]:
"Use a predicate to partition entries into true entries and false entries"
# partition(is_odd, range(10)) --> 1 3 5 7 9 and 0 2 4 6 8
t1, t2 = itertools.tee(iterable)
return list(filter(pred, t2)), list(itertools.filterfalse(pred, t1))
def now_string() -> str:
"""Returns the time in a %Y-%m-%d-%H-%M-%S formatted string"""
now = datetime.now()
return now.strftime("%Y-%m-%d-%H-%M-%S")
def setup_logging_default(filename_suffix:str=""):
logging.basicConfig(filename=f'{now_string()}{filename_suffix}-log.txt', level=logging.INFO)
def setup_logging_debug(filename_suffix:str=""):
logging.basicConfig(filename=f'{now_string()}{filename_suffix}-log.txt', level=logging.DEBUG)
def flatten(lst):
"""Flattens a list so that any elements that were lists are replaces by the elements of that list.
ex: [1, [[2], 3]] -> [1, 2, 3]"""
output = []
for x in lst:
if type(x) != list:
output.append(x)
else:
output.extend(flatten(x))
return output
# stuff about killing processes
# sometimes we want to kill all child processes and sometimes just Z3
def kill_child_processes():
# kill all child processes
current_process = psutil.Process()
children = current_process.children(recursive=True)
children.sort(key=lambda x: x.pid, reverse=True) # NOTE: Why is this done? Can it be skipped?
for c in children:
logging.debug("Need to kill: " + str(c.pid) + " " + c.name() + "\n")
for c in children:
logging.debug("Trying to kill: " + str(c.pid) + " " + c.name() + "\n")
kill_process(c, c.name())
if children:
wait_for_kill()
def wait_for_kill():
# have to wait to make sure processes killed
# When the time was set to 15 seconds, there's still a small possibility of Z3 process
# hasn't been cleaned up yet. We increased it to 30 seconds to avoid that.
time.sleep(30) # seconds
# Kill only lingering solver processes
def kill_solvers():
flag = True
while flag:
for proc in psutil.process_iter():
flag = False
try:
if 'cvc5' in proc.name().lower() or 'z3' in proc.name().lower():
# We found a z3/cvc5 process
logging.warn(proc.name().lower() + ' is still running... killing')
kill_process(proc,proc.name())
wait_for_kill()
flag = True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# every now and then the bit of time between psutil.process_iter()
# and proc.name() allows a process to die and we get an exception
# on proc.name()
# if the process has died, great! and let's move on
pass
def check_process_running(process_name):
"""
Check if there is any running process that contains the given name processName.
from: https://thispointer.com/python-check-if-a-process-is-running-by-name-and-find-its-process-id-pid/
"""
process_name = process_name.lower()
# Iterate over the all the running process
for proc in psutil.process_iter():
try:
# Check if process name contains the given name string.
if process_name in proc.name().lower():
logging.debug(f"process '{process_name}' exists: " + str(proc.pid))
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return False
def kill_process(p, name):
"""Kills a process"""
try:
# occasionally the child process dies by itself and this then
# causes an exception b/c child.pid does not exist
# so the try/except is needed
p.kill()
# the following seems to have processlookup errors even though process
# exists
# os.killpg(p.pid, signal.SIGKILL)
logging.info("killed: " + name + "\n")
except ProcessLookupError:
logging.warning("os.kill could not find: " + str(p.pid) + "\n")
if check_process_running(name):
logging.error(f'process {p.pid} was unable to be killed, but is still running!')
exit(1)
except Exception as ex:
template = "An exception of type {0} occurred. Arguments:\n{1!r}"
message = template.format(type(ex).__name__, ex.args)
logging.exception(message + '\n' + "Did not kill: " + str(p.pid) + " " + name)