This example demonstrates how to use the python API to find instructions in a given process that were generated at runtime (execution-after-write).
See the script's documentation below for more information.
Note that this script only iterates over points that belong to a given process, and not over the whole trace. For more details, look at the implementation of the get_process_point_ranges
and get_process_sequence_points
methods.
3 This script shows the runtime generated instructions of a given process 5 It is similar in spirit to the `executions-after-write` inspector, but done at a process level rather than on the 6 whole trace, and directly from python after the trace execution (the `executions-after-write` inspector is 7 not required for this script to work). 10 Give the following arguments: 11 - host:port of the REVEN's project (default: localhost:13370) 21 def check_process(project, cr3):
23 Check if a process is running in the whole trace. 24 @param cr3: The CR3 register's value associated to the process to check. 26 for process
in project.processes():
27 if process.cr3 == cr3:
29 raise ValueError(
'Unknown process with cr3 = {}'.format(hex(cr3)))
31 def get_cr3_from_pid(project, pid):
33 Get the CR3 register's value associated to a running process. 34 @param pid: The PID of the process. 36 for process
in project.processes():
37 if process.pid == pid:
39 raise ValueError(
'Unknown process with pid = {}'.format(pid))
41 def get_last_point(trace):
43 Get the last point of the given trace. 45 last_sequence = trace.sequence_count - 1
46 last_instruction = trace.sequence_length(last_sequence) - 1
47 return trace.point(last_sequence, last_instruction)
49 def get_process_point_ranges(project, cr3):
51 Get the ranges of main trace (e.g. 'Execution run') points where the process is running. 52 @param cr3: The CR3 register's value. 53 @return: A generator of tuples composed of the start and end point([start, end]). 55 main_trace = project.traces()[0]
56 first_point = main_trace.point(0, 0)
57 start = first_point
if first_point.cpu().read_register(
'cr3') == cr3
else None 58 for switch
in project.process_switches():
59 if main_trace.name != switch.point.trace.name:
63 start = switch.point.next_sequence()
65 yield (start, switch.point)
69 yield (start, get_last_point(main_trace))
71 def get_process_sequence_points(project, cr3):
73 Get the first points of main trace (e.g. 'Execution run') sequences where the process is running in the main trace. 74 @param cr3: The CR3 register's value. 75 @return: A generator of points belonging to the main trace. 77 for first, last
in get_process_point_ranges(project, cr3):
79 while point
is not None and point <= last:
81 point = point.next_sequence()
83 class EmptySequencePoints(RuntimeError):
86 def check_smc(sequence_points):
87 accessed_memory_type = {}
89 has_sequence_points =
False 95 def update_memory_type(address, mem_type):
96 if (address
in accessed_memory_type)
and \
97 (accessed_memory_type[address] ==
'W')
and \
99 smc_addresses.add(address)
100 raise TypeError(hex(address))
102 accessed_memory_type[address] = mem_type
104 for sequence_point
in sequence_points:
105 has_sequence_points =
True 106 point = sequence_point
107 basic_block = point.basic_block
108 for instruction
in basic_block:
109 ins_addr = instruction.address
110 ins_len = instruction.size
111 for addr
in range(ins_addr, ins_addr + ins_len):
113 update_memory_type(addr,
'X')
114 except TypeError
as e:
115 print(
'Self-modifying code detected: {} (X ==> W)').format(e)
117 mem_accesses = point.memory_accesses()
118 for mem_access
in mem_accesses:
119 if mem_access.is_write:
120 for addr
in range(mem_access.address, mem_access.address + mem_access.size):
121 update_memory_type(addr,
'W')
125 if not has_sequence_points:
126 raise EmptySequencePoints()
130 def main(project, cr3, pid):
132 check_process(project, cr3)
134 cr3 = get_cr3_from_pid(project, pid)
135 sequence_points = get_process_sequence_points(project, cr3)
138 smc_addresses = check_smc(sequence_points)
139 except EmptySequencePoints:
140 print(
'The process is not running in the main trace.')
143 if not smc_addresses:
144 print(
"Self-modifying code not found")
149 if __name__ ==
'__main__':
150 parser = argparse.ArgumentParser()
151 parser.add_argument(
"--host", type=str, help=
'The reven server host. (default = "localhost")', default=
"localhost")
152 parser.add_argument(
"--port", type=int, help=
"The reven server port. (default = 13370)", default=13370)
153 subparsers = parser.add_subparsers(dest=
"subparser", help=
"Process identification")
154 cr3_parser = subparsers.add_parser(
'cr3', help=
"Use the CR3 register to identify a process (recommended).")
155 cr3_parser.add_argument(
"value", type=auto_int, help=
"The CR3 register value")
156 pid_parser = subparsers.add_parser(
'pid', help=
"Use the PID to identify a process.")
157 pid_parser.add_argument(
"value", type=int, help=
"The PID value")
158 args = parser.parse_args()
160 cr3 = args.value
if args.subparser ==
'cr3' else None 161 pid = args.value
if args.subparser ==
'pid' else None 164 project = reven.Project(args.host, args.port)
165 main(project, cr3, pid)
166 except (RuntimeError, ValueError)
as e:
167 print(
'Error: {}').format(e)
169 print(
'Unknown error: {}').format(sys.exc_info())