This example demonstrates how to use the python API to build the CFG from the trace of a given process.
See the script's documentation below for more information.
Note that this script only iterates over points that belong to a given process, and not over the whole trace. For more details, look at the implementation of the get_process_point_ranges
and get_process_sequence_points
methods.
3 This script builds the control flow graph from the trace of a given process. 6 Give the following arguments: 7 - Host of the REVEN's project. (default: "localhost") 8 - Port of the REVEN's project. (default = 13370) 13 python cfg_from_trace.py --host localhost --port 13370 cr3 0x1e7e3000 output_dot_filename 15 Remark: The self-modifying/overlapping code is not supported 23 def check_process(project, cr3):
25 Check if a process is running in the whole trace. 26 @param cr3: The CR3 register's value associated to the process to check. 28 for process
in project.processes():
29 if process.cr3 == cr3:
31 raise ValueError(
'Unknown process with cr3 = {}'.format(hex(cr3)))
33 def get_cr3_from_pid(project, pid):
35 Get the CR3 register's value associated to a running process. 36 @param pid: The PID of the process. 38 for process
in project.processes():
39 if process.pid == pid:
41 raise ValueError(
'Unknown process with pid = {}'.format(pid))
43 def get_last_point(trace):
45 Get the last point of the given trace. 47 last_sequence = trace.sequence_count - 1
48 last_instruction = trace.sequence_length(last_sequence) - 1
49 return trace.point(last_sequence, last_instruction)
51 def get_process_point_ranges(project, cr3):
53 Get the ranges of main trace (e.g. 'Execution run') points where the process is running. 54 @param cr3: The CR3 register's value. 55 @return: A generator of tuples composed of the start and end point([start, end]). 57 main_trace = project.traces()[0]
58 first_point = main_trace.point(0, 0)
59 start = first_point
if first_point.cpu().read_register(
'cr3') == cr3
else None 60 for switch
in project.process_switches():
61 if main_trace.name != switch.point.trace.name:
65 start = switch.point.next_sequence()
67 yield (start, switch.point)
71 yield (start, get_last_point(main_trace))
73 def get_process_number_of_sequences(project, cr3):
74 process_point_ranges = get_process_point_ranges(project, cr3)
75 return sum([ last_point.sequence_index - first_point.sequence_index + 1 \
76 for first_point, last_point
in process_point_ranges ])
78 def get_process_sequence_points(project, cr3):
80 Get the first points of main trace (e.g. 'Execution run') sequences where the process is running in the main trace. 81 @param cr3: The CR3 register's value. 82 @return: A generator of points belonging to the main trace. 84 for first, last
in get_process_point_ranges(project, cr3):
86 while point
is not None and point <= last:
88 point = point.next_sequence()
90 class EmptySequencePoints(RuntimeError):
93 def construct_control_flow(head_points, number_of_head_points):
94 if number_of_head_points == 0:
95 raise EmptySequencePoints(
'The process is not running in the main trace.')
97 percentage_step_length = number_of_head_points / 100
99 examined_head_point_number = 0
101 print(
'Number of basic blocks in the trace: {}').format(number_of_head_points)
102 print(
'Building control flow graph, please wait')
105 distinguished_instructions = {}
109 previous_follower =
None 114 for point
in head_points:
115 has_head_point =
True 116 leader = point.instruction.address
118 if previous_follower
is not None:
119 control_flow.add((previous_follower, leader))
125 reven_basic_block = point.basic_block
127 for ins
in reven_basic_block:
128 if ins.address
not in distinguished_instructions:
129 distinguished_instructions[ins.address] = ins
131 previous_follower = ins.address
132 followers.add(previous_follower)
134 examined_head_point_number += 1
136 if percentage_step >= percentage_step_length:
138 print(
'...{:0.3f}%').format(examined_head_point_number * 100.0 / number_of_head_points),
141 print(
'...completed')
145 ins_iter = iter(sorted(distinguished_instructions.iterkeys()))
149 if ins
not in leaders:
153 basic_blocks[leader] = []
157 basic_blocks[leader].append(prev_ins)
160 if (prev_ins
in followers)
or (ins
in leaders):
161 if prev_ins
not in followers:
162 followers.add(prev_ins)
163 control_flow.add((prev_ins, ins))
165 except StopIteration:
171 basic_block_control_flow = set()
172 for leader
in basic_blocks:
173 follower = basic_blocks[leader][-1]
174 for head, tail
in control_flow:
176 basic_block_control_flow.add((leader, tail))
178 return (distinguished_instructions, entry_point, leaders, basic_blocks, basic_block_control_flow)
181 def build_control_flow_graph(instructions, entry_point, leaders, basic_blocks, control_flow):
182 def instruction_to_string(reven_ins):
183 ins_mnemonic = reven_ins.mnemonic
184 ins_operands =
', '.join(reven_ins.operands())
185 return (
'0x{:<10x}{:<8}{}'.format(reven_ins.address, ins_mnemonic, ins_operands)).lower()
187 def build_node_label(leader):
188 basic_block = [instructions[address]
for address
in basic_blocks[leader]]
189 return (
'\l'.join([instruction_to_string(ins)
for ins
in basic_block])) +
'\l' 191 cfg = pygraphviz.AGraph(strict=
False, directed=
True, name=
'Control Flow Graph')
192 cfg.node_attr[
'shape'] =
'box' 193 cfg.node_attr[
'style'] =
'rounded' 194 cfg.node_attr[
'fontname'] =
'Liberation Mono' 196 for leader
in leaders:
197 if leader == entry_point:
198 cfg.add_node(hex(leader),
199 label=
'entry point\n' + build_node_label(leader),
200 style=
'filled, rounded',
201 fillcolor=
'cornflowerblue')
203 cfg.add_node(hex(leader),
204 label=build_node_label(leader))
206 for head, tail
in control_flow:
207 cfg.add_edge(hex(head), hex(tail))
212 def main(reven_project, cr3, pid, output_graph_filename):
214 check_process(reven_project, cr3)
216 cr3 = get_cr3_from_pid(reven_project, pid)
218 head_points = get_process_sequence_points(reven_project, cr3)
219 number_of_head_points = get_process_number_of_sequences(reven_project, cr3)
221 instructions, entry_point, leaders, basic_blocks, control_flow = construct_control_flow(head_points, number_of_head_points)
223 print(
'Entry point: 0x{:x}').format(entry_point)
224 print(
'Instructions: {:d}').format(len(instructions))
225 print(
'Leaders: {:d}').format(len(leaders))
226 print(
'Control flow: {:d}').format(len(control_flow))
228 control_flow_graph = build_control_flow_graph(instructions, entry_point, leaders, basic_blocks, control_flow)
229 control_flow_graph.draw(path=output_graph_filename, format=
'dot', prog=
'dot')
230 print(
'Control flow graph built, please check the output dot file: {}').format(output_graph_filename)
235 if __name__ ==
'__main__':
236 parser = argparse.ArgumentParser()
237 parser.add_argument(
"--host", type=str, help=
'The reven server host. (default = "localhost")', default=
"localhost")
238 parser.add_argument(
"--port", type=int, help=
"The reven server port. (default = 13370)", default=13370)
239 subparsers = parser.add_subparsers(dest=
"subparser", help=
"Process identification")
240 cr3_parser = subparsers.add_parser(
'cr3', help=
"Use the CR3 register to identify a process (recommended).")
241 cr3_parser.add_argument(
"value", type=auto_int, help=
"The CR3 register value")
242 pid_parser = subparsers.add_parser(
'pid', help=
"Use the PID to identify a process.")
243 pid_parser.add_argument(
"value", type=int, help=
"The PID value")
244 parser.add_argument(
"out", type=str, help=
"The output filename (without extension).")
245 args = parser.parse_args()
247 cr3 = args.value
if args.subparser ==
'cr3' else None 248 pid = args.value
if args.subparser ==
'pid' else None 251 project = reven.Project(args.host, args.port)
252 main(project, cr3, pid, args.out +
'.dot')
253 except (RuntimeError, ValueError, EmptySequencePoints)
as e:
254 print(
'Error: {}').format(e)
256 print(
'Unknown error: {}').format(sys.exc_info())