REVEN-Axion 2018v1.4.4
ida_basicblock_simple.py

This is an example of IDA script using the REVEN Python API. Must be run from IDA!

See the script's documentation below for more information:

1 """
2 Purpose:
3  This script is a proof-of-concept for simple program profiling: each basic
4  block in a program will be colorized depending its execution frequency, one
5  can also observe basic block frequencies in the output window of IDA.
6 
7 Usage:
8  Use IDA to load the binary, and give the following arguments before
9  executing the script:
10  host = your REVEN server name, and
11  port = your REVEN's project port on this host.
12 
13 Remark:
14  For each basic block obtained from IDA's static analysis, the script counts
15  its occurrence frequency in the trace given by REVEN. The following limits
16  are known:
17  - the name of the binary must be obtained from REVEN (that is usually the
18  case if "dump process" is executed properly), and be case insentively
19  identical with the one analyzed in IDA (i.e. we do not rename the binary)
20 
21  - the binary must be mapped at a unique virtual base address in the REVEN's
22  project trace
23 
24  - very poor results for packed (or self-modifying) binaries (since the
25  limit of IDA's static analysis)
26 """
27 
28 import idaapi
29 import idautils
30 import idc
31 
32 import reven
33 
34 
35 
36 def main(host, port):
37  project = reven.Project(host, port)
38 
39  runtime_base_address = get_base_address(project)
40  if runtime_base_address is not None:
41  static_base_address = idaapi.get_imagebase()
42 
43  print 'base addresses:'
44  print ' static: 0x{:x}'.format(idaapi.get_imagebase())
45  print ' runtime: 0x{:x}'.format(runtime_base_address)
46 
47  offset = runtime_base_address - static_base_address
48 
49  static_bbs = collect_basic_blocks()
50  binary_trace = get_binary_trace(project)
51  dynamic_bbs = calculate_dynamic_basic_blocks(static_bbs, binary_trace, offset)
52 
53  bb_freq = calculate_dynamic_basic_block_frequency(dynamic_bbs, binary_trace, offset)
54  colorize_dynamic_basic_blocks(dynamic_bbs, bb_freq)
55 
56  print 'colorizing basic blocks done.'
57 
58  else:
59  print 'cannot find the binary {:s} in the REVEN\'s trace or it is mapped at different base addresses.'.format(os.path.basename(idc.GetInputFilePath()))
60 
61 def get_base_address(reven_project):
62  """
63  Look for the current IDA's binary name in the REVEN's binary mapping information.
64  If the binary is found, return the base address it is mapped at, else return None.
65  """
66  base_address = None
67  binary_name = os.path.basename(idc.GetInputFilePath()).lower() # The path information is completely irrelevant,
68  # so we will match against the binary name only
69  loaded_binaries = reven_project.binaries()
70  for bin_path in loaded_binaries:
71  if binary_name == os.path.basename(bin_path).lower():
72  bin_mappings = loaded_binaries[bin_path].mappings
73 
74  # Reven stores all the base addresses this binary is mapped at during the trace.
75  # There could be more than one if more than one process uses this binary
76  for address_space in bin_mappings.values():
77  if base_address is None:
78  base_address = address_space.base_address
79  elif base_address != address_space.base_address:
80  # More than one process uses the binary, and at different addresses!
81  # We don't have enough information to pick one.
82  return None
83 
84  return base_address
85 
86 
87 def get_binary_trace(reven_project):
88  """
89  Return a python generator that filters the trace on the current IDA's binary only.
90  """
91  # Select the main trace
92  trace = reven_project.trace('Execution run')
93  # Or in a better way:
94  # trace = project.traces()[0]
95 
96  # Again, the binary's path is irrelevant, match against its name only.
97  binary_name = os.path.basename(idc.GetInputFilePath())
98 
99  # See the documentation for more information, but note that this defaults to a "contains" matching algorithm.
100  # It may therfore include more that the binary we want.
101  reven_points = trace.search_point([reven.BinaryCriterion(pattern=binary_name,
102  case_sensitive=False)])
103  return list(reven_points)
104 
105 
106 def calculate_dynamic_basic_block_frequency(dynamic_bbs, reven_trace, relocation_offset):
107  bb_freq = { fst + relocation_offset: 0 for (fst, snd) in dynamic_bbs }
108 
109  for head_point in reven_trace:
110  bb = head_point.basic_block
111 
112  for ins in bb:
113  ins_addr = ins.address
114  if ins_addr in bb_freq:
115  bb_freq[ins_addr] += 1
116 
117  return { addr - relocation_offset: bb_freq[addr] for addr in bb_freq }
118 
119 
120 def calculate_frequency(ida_bbs, reven_trace, relocation_offset):
121  bb_freq = {bb.startEA + relocation_offset: 0 for bb in ida_bbs}
122 
123  # The generator we declared earlier will return the first point of each sequence only, not all the points.
124  # We could then iterate on the points using `next`, but since we only want the instruction's addresses and nothing
125  # relevant to this instant in the trace (memory, cpu, etc), we will iterate on the basic_block's instructions instead.
126  for head_point in reven_trace:
127  bb = head_point.basic_block
128 
129  # Note we use the default BasicBlock's iterator
130  for ins in bb:
131  ins_addr = ins.address
132  if ins_addr in bb_freq:
133  bb_freq[ins_addr] += 1
134  return {addr - relocation_offset: bb_freq[addr] for addr in bb_freq}
135 
136 
137 def collect_basic_blocks():
138  bbs = set()
139  for fun_head in idautils.Functions():
140  fun_flowchart = idaapi.FlowChart(idaapi.get_func(fun_head))
141  for bb in fun_flowchart:
142  # In some cases, IDA's analysis gives incorrect results, e.g.
143  # 1. startEA and endEA are identical
144  if (bb.startEA != bb.endEA):
145  bbs.add(bb)
146 
147  return bbs
148 
149 
150 def calculate_dynamic_basic_blocks(ida_bbs, reven_trace, relocation_offset):
151  dynamic_basic_blocks = { (bb.startEA + relocation_offset, bb.endEA + relocation_offset) for bb in ida_bbs }
152 
153  for head_point in reven_trace:
154  head_ins_addr = head_point.instruction.address
155  try:
156  (fst, snd) = next((fst, snd) for (fst, snd) in dynamic_basic_blocks if (head_ins_addr > fst) and (snd > head_ins_addr))
157 
158  dynamic_basic_blocks.remove((fst, snd))
159 
160  dynamic_basic_blocks.add((fst, head_ins_addr))
161 
162  dynamic_basic_blocks.add((head_ins_addr, snd))
163 
164  except StopIteration:
165  pass
166  except:
167  pass
168 
169  dynamic_basic_blocks = { (fst - relocation_offset, snd - relocation_offset) for (fst, snd) in dynamic_basic_blocks }
170 
171  return dynamic_basic_blocks
172 
173 
174 def color_gradient(bb_freq):
175  min_freq = bb_freq.itervalues().next()
176  max_freq = min_freq
177  for bb_addr in bb_freq:
178  if min_freq > bb_freq[bb_addr]:
179  min_freq = bb_freq[bb_addr]
180  if max_freq < bb_freq[bb_addr]:
181  max_freq = bb_freq[bb_addr]
182 
183  # calculate the green component of the basic block's color
184  bb_color = {}
185  max_color = 0xff
186  if max_freq == min_freq:
187  for bb_addr in bb_freq:
188  bb_color[bb_addr] = max_color
189  else:
190  color_step = float(max_color) / (max_freq - min_freq)
191  for bb_addr in bb_freq:
192  bb_color[bb_addr] = max_color - int(round((bb_freq[bb_addr] - min_freq) * color_step))
193 
194  return bb_color
195 
196 
197 def colorize_dynamic_basic_block(bb, color):
198  (addr, limit_addr) = bb
199  while addr < limit_addr:
200  idaapi.set_item_color(addr, color)
201  addr = idc.NextHead(addr)
202 
203 
204 def colorize_dynamic_basic_blocks(dynamic_bbs, bb_freq):
205  green_colors = color_gradient(bb_freq)
206 
207  bb_dict = { fst: (fst, snd) for (fst, snd) in dynamic_bbs }
208 
209  print '{:s}\t{:s}'.format('BAddr', 'Freq')
210 
211  for bb_fst_addr in bb_dict:
212  if bb_freq[bb_fst_addr] > 0:
213  # BBGGRR color format: BB = 0xb0, GG = green_colors[bb_head_addr], RR = 0xff
214  color = 0xb0 * 0x10000 + green_colors[bb_fst_addr] * 0x100 + 0xff
215  colorize_dynamic_basic_block(bb_dict[bb_fst_addr], color)
216 
217  print '0x{:x}\t{:d}'.format(bb_fst_addr, bb_freq[bb_fst_addr])
218 
219 
220 if __name__ == '__main__':
221  host_port_str = idc.AskStr('localhost:13370', "REVEN's project address")
222  if host_port_str is not None:
223  try:
224  host, port_str = host_port_str.split(':')
225  port = int(port_str)
226  print("REVEN's project: {}:{}").format(host, port)
227  main(host, port)
228  except ValueError:
229  print("please give a correct REVEN\'s project address, e.g. localhost:13370")
230  except RuntimeError, e:
231  print('{}').format(e)
232  except:
233  print('Unknown error')