REVEN-Axion 2018v1.4.4
cfg_from_trace.py

This example demonstrates how to use the python API to build the CFG from the trace of a given process.

See the script's documentation below for more information.

Note that this script only iterates over points that belong to a given process, and not over the whole trace. For more details, look at the implementation of the get_process_point_ranges and get_process_sequence_points methods.

1 """
2 Purpose:
3  This script builds the control flow graph from the trace of a given process.
4 
5 Usage:
6  Give the following arguments:
7  - Host of the REVEN's project. (default: "localhost")
8  - Port of the REVEN's project. (default = 13370)
9  - Process CR3 or PID.
10  - Output filename.
11 
12  For example:
13  python cfg_from_trace.py --host localhost --port 13370 cr3 0x1e7e3000 output_dot_filename
14 
15 Remark: The self-modifying/overlapping code is not supported
16 """
17 
18 import reven
19 import pygraphviz
20 import sys
21 import argparse
22 
23 def check_process(project, cr3):
24  """
25  Check if a process is running in the whole trace.
26  @param cr3: The CR3 register's value associated to the process to check.
27  """
28  for process in project.processes():
29  if process.cr3 == cr3:
30  return
31  raise ValueError('Unknown process with cr3 = {}'.format(hex(cr3)))
32 
33 def get_cr3_from_pid(project, pid):
34  """
35  Get the CR3 register's value associated to a running process.
36  @param pid: The PID of the process.
37  """
38  for process in project.processes():
39  if process.pid == pid:
40  return process.cr3
41  raise ValueError('Unknown process with pid = {}'.format(pid))
42 
43 def get_last_point(trace):
44  """
45  Get the last point of the given trace.
46  """
47  last_sequence = trace.sequence_count - 1
48  last_instruction = trace.sequence_length(last_sequence) - 1
49  return trace.point(last_sequence, last_instruction)
50 
51 def get_process_point_ranges(project, cr3):
52  """
53  Get the ranges of main trace (e.g. 'Execution run') points where the process is running.
54  @param cr3: The CR3 register's value.
55  @return: A generator of tuples composed of the start and end point([start, end]).
56  """
57  main_trace = project.traces()[0] # Execution run
58  first_point = main_trace.point(0, 0)
59  start = first_point if first_point.cpu().read_register('cr3') == cr3 else None
60  for switch in project.process_switches():
61  if main_trace.name != switch.point.trace.name:
62  continue
63  if start is None:
64  if cr3 == switch.cr3:
65  start = switch.point.next_sequence()
66  else:
67  yield (start, switch.point)
68  start = None
69  if start:
70  # special case where the trace finishes while the process is running.
71  yield (start, get_last_point(main_trace))
72 
73 def get_process_number_of_sequences(project, cr3):
74  process_point_ranges = get_process_point_ranges(project, cr3)
75  return sum([ last_point.sequence_index - first_point.sequence_index + 1 \
76  for first_point, last_point in process_point_ranges ])
77 
78 def get_process_sequence_points(project, cr3):
79  """
80  Get the first points of main trace (e.g. 'Execution run') sequences where the process is running in the main trace.
81  @param cr3: The CR3 register's value.
82  @return: A generator of points belonging to the main trace.
83  """
84  for first, last in get_process_point_ranges(project, cr3):
85  point = first
86  while point is not None and point <= last:
87  yield point
88  point = point.next_sequence()
89 
90 class EmptySequencePoints(RuntimeError):
91  pass
92 
93 def construct_control_flow(head_points, number_of_head_points):
94  if number_of_head_points == 0:
95  raise EmptySequencePoints('The process is not running in the main trace.')
96 
97  percentage_step_length = number_of_head_points / 100
98  percentage_step = 0
99  examined_head_point_number = 0
100 
101  print('Number of basic blocks in the trace: {}').format(number_of_head_points)
102  print('Building control flow graph, please wait')
103 
104  # CFG building algorithm (c.f. EAC2e, c.5.3.4)
105  distinguished_instructions = {}
106  leaders = set()
107 
108  entry_point = None
109  previous_follower = None
110  followers = set()
111  control_flow = set()
112 
113  # first pass: detect leaders
114  for point in head_points:
115  has_head_point = True
116  leader = point.instruction.address
117 
118  if previous_follower is not None:
119  control_flow.add((previous_follower, leader))
120  else:
121  entry_point = leader
122 
123  leaders.add(leader)
124 
125  reven_basic_block = point.basic_block
126 
127  for ins in reven_basic_block:
128  if ins.address not in distinguished_instructions:
129  distinguished_instructions[ins.address] = ins
130 
131  previous_follower = ins.address
132  followers.add(previous_follower)
133 
134  examined_head_point_number += 1
135  percentage_step += 1
136  if percentage_step >= percentage_step_length:
137  percentage_step = 0
138  print('...{:0.3f}%').format(examined_head_point_number * 100.0 / number_of_head_points),
139  sys.stdout.flush()
140 
141  print('...completed')
142 
143  # second pass: build basic blocks
144  basic_blocks = {}
145  ins_iter = iter(sorted(distinguished_instructions.iterkeys()))
146  try:
147  ins = next(ins_iter)
148  while True:
149  if ins not in leaders:
150  ins = next(ins_iter)
151  else:
152  leader = ins
153  basic_blocks[leader] = []
154 
155  while True:
156  prev_ins = ins
157  basic_blocks[leader].append(prev_ins)
158  ins = next(ins_iter)
159 
160  if (prev_ins in followers) or (ins in leaders):
161  if prev_ins not in followers: # then ins must be a leader
162  followers.add(prev_ins)
163  control_flow.add((prev_ins, ins))
164  break
165  except StopIteration:
166  pass
167 
168  # different from the original algorithm which works always given static form
169  # of the program, we need a third pass to build control flow since we cannot
170  # be sure that basic blocks are consecutive
171  basic_block_control_flow = set()
172  for leader in basic_blocks:
173  follower = basic_blocks[leader][-1]
174  for head, tail in control_flow:
175  if follower == head:
176  basic_block_control_flow.add((leader, tail))
177 
178  return (distinguished_instructions, entry_point, leaders, basic_blocks, basic_block_control_flow)
179 
180 
181 def build_control_flow_graph(instructions, entry_point, leaders, basic_blocks, control_flow):
182  def instruction_to_string(reven_ins):
183  ins_mnemonic = reven_ins.mnemonic
184  ins_operands = ', '.join(reven_ins.operands())
185  return ('0x{:<10x}{:<8}{}'.format(reven_ins.address, ins_mnemonic, ins_operands)).lower()
186 
187  def build_node_label(leader):
188  basic_block = [instructions[address] for address in basic_blocks[leader]]
189  return ('\l'.join([instruction_to_string(ins) for ins in basic_block])) + '\l'
190 
191  cfg = pygraphviz.AGraph(strict=False, directed=True, name='Control Flow Graph')
192  cfg.node_attr['shape'] ='box'
193  cfg.node_attr['style'] = 'rounded'
194  cfg.node_attr['fontname'] = 'Liberation Mono'
195 
196  for leader in leaders:
197  if leader == entry_point:
198  cfg.add_node(hex(leader),
199  label='entry point\n' + build_node_label(leader),
200  style='filled, rounded',
201  fillcolor='cornflowerblue')
202  else:
203  cfg.add_node(hex(leader),
204  label=build_node_label(leader))
205 
206  for head, tail in control_flow:
207  cfg.add_edge(hex(head), hex(tail))
208 
209  return cfg
210 
211 
212 def main(reven_project, cr3, pid, output_graph_filename):
213  if cr3 is not None:
214  check_process(reven_project, cr3)
215  else:
216  cr3 = get_cr3_from_pid(reven_project, pid)
217 
218  head_points = get_process_sequence_points(reven_project, cr3)
219  number_of_head_points = get_process_number_of_sequences(reven_project, cr3)
220 
221  instructions, entry_point, leaders, basic_blocks, control_flow = construct_control_flow(head_points, number_of_head_points)
222 
223  print('Entry point: 0x{:x}').format(entry_point)
224  print('Instructions: {:d}').format(len(instructions))
225  print('Leaders: {:d}').format(len(leaders))
226  print('Control flow: {:d}').format(len(control_flow))
227 
228  control_flow_graph = build_control_flow_graph(instructions, entry_point, leaders, basic_blocks, control_flow)
229  control_flow_graph.draw(path=output_graph_filename, format='dot', prog='dot')
230  print('Control flow graph built, please check the output dot file: {}').format(output_graph_filename)
231 
232 def auto_int(x):
233  return int(x, 0)
234 
235 if __name__ == '__main__':
236  parser = argparse.ArgumentParser()
237  parser.add_argument("--host", type=str, help='The reven server host. (default = "localhost")', default="localhost")
238  parser.add_argument("--port", type=int, help="The reven server port. (default = 13370)", default=13370)
239  subparsers = parser.add_subparsers(dest="subparser", help="Process identification")
240  cr3_parser = subparsers.add_parser('cr3', help="Use the CR3 register to identify a process (recommended).")
241  cr3_parser.add_argument("value", type=auto_int, help="The CR3 register value")
242  pid_parser = subparsers.add_parser('pid', help="Use the PID to identify a process.")
243  pid_parser.add_argument("value", type=int, help="The PID value")
244  parser.add_argument("out", type=str, help="The output filename (without extension).")
245  args = parser.parse_args()
246 
247  cr3 = args.value if args.subparser == 'cr3' else None
248  pid = args.value if args.subparser == 'pid' else None
249 
250  try:
251  project = reven.Project(args.host, args.port)
252  main(project, cr3, pid, args.out + '.dot')
253  except (RuntimeError, ValueError, EmptySequencePoints) as e:
254  print('Error: {}').format(e)
255  except:
256  print('Unknown error: {}').format(sys.exc_info())