REVEN v2 Python API quick start
With the REVEN v2 Python API, reverse engineers can automate the analysis of a scenario using script.
About this document
This document is a quick start guide to the REVEN v2 Python API. The REVEN v2 Python API can be used to automate several aspects of REVEN:
- The recording/replay workflow (Workflow Python API), only available in the Enterprise Edition.
- The analysis of an already replayed scenario (Analysis Python API).
This document focuses solely on the Analysis Python API. It covers the following topics:
- Installation
- Basic usage
- Main concepts
- Overview of the available features
Along the way, this document provides some simple recipes you can use to automate various tasks.
Installation
Please refer to the Installation page for more information on installing the Python API.
You can also use the Jupyter notebook integration to use the API.
Basic usage
Once you've installed the Python API (see the Installation document), you're ready for your first script.
Import the reven2
package:
>>> # Importing the API package
>>> import reven2
Connecting to a server
To use the Python API, you have to connect to a REVEN server started on the scenario you want to analyze. To do this, you must provide the host and port of your REVEN server:
>>> # Connecting to a reven server
>>> hostname = "localhost"
>>> port = 13370
>>> server = reven2.RevenServer(hostname, port)
>>> server
Reven server (localhost:13370) [connected]
If you are using the Python API from the same machine than the REVEN server itself, then the host is "localhost"
,
otherwise it is the address of your server.
To find the port, you can go to the Analyze page for the scenario you want to connect with, and the port number will be
displayed in the label above the buttons (REVEN running on port xxxx
):
Alternatively, you can find the port in the Active sessions
list:
Finally, if you have an Axion client connected to your REVEN server, you can find the port in the titlebar of the Axion window:
Connecting to a server from the scenario's name
NOTE: This section only applies to REVEN enterprise edition.
You can use a feature of the Workflow API to get a connection to a server from the scenario's name, rather than by specifying a port:
>>> from reven2.preview.project_manager import ProjectManager
>>> pm = ProjectManager("http://localhost:8880") # URL to the REVEN Project Manager
>>> connection = pm.connect("cve-2016-7255") # No need to specify "13370"
>>> server = connection.server
>>> server
Reven server (localhost:13370) [connected]
This is useful, as the server port will typically change at each reopening of the scenario, while the scenario name remains the same.
If no server is open for that particular scenario when executing the ProjectManager.connect
method call, then a new one will be started.
Root object of the API, tree of objects
The RevenServer
instance serves as the root object of the API from where you can access all the features of the API.
The following diagram gives a high-level view of the Python API:
For instance, from there you can get the execution trace and ask for the total number of transitions in the trace:
>>> # Getting the trace object
>>> trace = server.trace
>>> # Getting the number of transitions in the trace
>>> trace.transition_count
2847570054
In your Python interactive shell, you can also use the help built-in function to directly access the documentation while coding (see the official Python documentation for more details on this function).
We recommend using a feature-rich shell like ipython or bpython to benefit from e.g. auto-completion while using the Python API.
Main concepts
Getting a point in time
As is visible in Axion, all instructions are identified by a single unique integer, called the transition id. The transition id starts at 0 for the first instruction in the trace, and is incremented by 1 for each consecutive instruction.
NOTE: We are using the term Transition
rather than Instruction
here, because technically, not all Transition
s in
the trace are Instruction
s: when an interrupt or a fault occurs, it is also denoted by a Transition
that changed the
Context
, although no Instruction
was executed. Similarly, instructions that execute only partially (due to being
interrupted by e.g. a pagefault) are not considered as normal Instruction
s. You can see a Transition
as a
generalized Instruction
, i.e. something that modifies the context.
Getting a transition
You can get interesting transition numbers from Axion's instruction view.
>>> # Getting a transition
>>> transition = trace.transition(1234)
>>> # Displays the transition as seen in Axion
>>> print(transition)
#1234 jne 0xfffff800c9821dc7 ($+0xac)
>>> # Is this transition an instruction?
>>> transition.instruction is not None
True
Getting a context
A Transition
is representing a change in the trace, while Context
s represent a state in the trace.
From a transition, you can get either the context before the transition was applied, or the context after the transition was applied:
>>> # Comparing rip before and after executing an instruction
>>> ctx_before = transition.context_before()
>>> ctx_after = transition.context_after()
>>> "0x{:x}".format(ctx_before.read(reven2.arch.x64.rip))
'0xfffff800c9821d1b'
>>> "0x{:x}".format(ctx_after.read(reven2.arch.x64.rip))
'0xfffff800c9821d21'
>>> # Directly getting a context from the trace object
>>> trace.context_before(0x1234) == trace.transition(0x1234).context_before()
True
>>> # Getting a transition back from a context
>>> transition.context_before().transition_after() == transition
True
Reading a context
A common operation on a Context
instance is to read the state of the CPU registers as well as memory.
The API provides the read
method on Context
, that allows to read from a source
.
Getting a register or an address
To read from a register source, you can reference elements exposed by the arch
package:
>>> import reven2.arch.x64 as regs
>>> ctx = transition.context_before()
>>> ctx.read(regs.rax)
35680
>>> ctx.read(regs.al)
96
>>> # Are we in kernel land?
>>> ctx.read(regs.cs) & 3 == 0
True
To read from a source address, use the address
module to construct addresses:
>>> # Comparing the bytes at RIP in memory with the bytes of the instruction
>>> from reven2.address import LogicalAddress, LinearAddress, PhysicalAddress
>>> rip = ctx.read(regs.rip)
>>> instruction = transition.instruction
>>> ctx.read(LogicalAddress(rip, regs.cs), instruction.size) == instruction.raw
True
Reading as a type
The types
package of the API provides classes and instance dedicated to the representation of data types.
They allow to read a register or some memory as a specific data type.
>>> from reven2 import types
>>> # Reading rax as various integer types
>>> ctx.read(regs.rax, types.U8)
96
>>> ctx.read(regs.rax, types.U16)
35680
>>> ctx.read(regs.rax, types.I16)
-29856
>>> # Reading in a different endianness (default is little endian)
>>> ctx.read(regs.rax, types.BigEndian(types.U16))
24715
>>> # Reading some memory as a String
>>> ctx.read(LogicalAddress(0xffffe00041cac2ea), types.CString(encoding=types.Encoding.Utf16, max_character_count=1000))
u'Network Store Interface Service'
>>> # Reading the same memory as a small array of bytes
>>> ctx.read(LogicalAddress(0xffffe00041cac2ea), types.Array(types.U8, 4))
[78, 0, 101, 0]
>>> # Dereferencing rsp + 0x20 in two steps
>>> addr = LogicalAddress(0x20) + ctx.read(regs.rsp, types.USize)
>>> ctx.read(addr, types.U64)
10738
>>> # Dereferencing rsp + 0x20 in one step
>>> ctx.deref(regs.rsp, types.Pointer(types.U64, base_address=LogicalAddress(0x20)))
10738
Identifying points of interest
One of the first tasks you need to perform during an analysis is finding an interesting point from where to start the analysis. The API provides some tools designed to identify these points of interests.
Getting and using symbol information
A typical starting point for an analysis is to search points where a specific symbol is executed. In the API, this is done in two steps:
- Identify the symbol in the available symbols of the trace.
- Search for the identified symbol.
For the first step, you need to recover the OS Semantics Information (OSSI) instance tied to your RevenServer
instance:
>>> # Recovering the OSSI object
>>> ossi = server.ossi
Note that for the OSSI feature to work in the API, the necessary OSSI resources must have been generated. Failure to do so may result in several of the called methods to fail with an exception. Please refer to the documentation of each method for more information.
From there you can use the methods of the Ossi
instance to get the binaries that were executed in the trace, and all
the symbols of these binaries.
Note that each of these methods, like all methods returning several results of the API, return Python generator objects.
>>> # Getting the first binary named "ntoskrnl.exe" in the list of executed binaries in the trace
>>> ntoskrnl = next(ossi.executed_binaries("ntoskrnl.exe"))
>>> ntoskrnl
Binary(path='c:/windows/system32/ntoskrnl.exe')
>>> # Getting the list of the symbols in "ntoskrnl.exe" containing "NtCreateFile"
>>> nt_create_files = list(ntoskrnl.symbols("NtCreateFile"))
>>> nt_create_files
[Symbol(binary='ntoskrnl', name='NtCreateFile', rva=0x4123b0), Symbol(binary='ntoskrnl', name='VerifierNtCreateFile', rva=0x6cf7bc)]
Once you have a symbol or a binary, you can use the search feature to look for contexts whose rip
location
matches the symbol or binary.
>>> # Getting the first context inside of the first call to `NtCreateFile` in the trace
>>> create_file_ctx = next(trace.search.symbol(nt_create_files[0]))
>>> create_file_ctx
Context(id=14771105)
>>> # Getting the first context executing the `whoami.exe` binary
>>> whoami = next(ossi.executed_binaries("whoami.exe"))
>>> whoami_ctx = next(trace.search.binary(whoami))
>>> whoami_ctx
Context(id=2616590520)
For any context, you can request the current OSSI location and process:
>>> # Checking that the current symbol is NtCreateFile
>>> create_file_ctx.ossi.location()
Location(binary='ntoskrnl', symbol='NtCreateFile', address=0xfffff800c9c133b0, base_address=0xfffff800c9801000, rva=0x4123b0)
>>> # Getting the current process
>>> create_file_ctx.ossi.process()
Process(name='ShellExperienceHost.exe', pid=2412, ppid=616, asid=0x6a09e000)
>>> # When the symbol is unknown it is not displayed and set to None
>>> trace.context_before(1639373926).ossi.location()
Location(binary='sppsvc', address=0x7ff62952c880, base_address=0x7ff629390000, rva=0x19c880)
>>> trace.context_before(1639373926).ossi.location().symbol is None
True
>>> # When the whole location is unknown it is set to None
>>> trace.context_before(2215773766).ossi.location() is None
True
You can also request the location corresponding to a different (currently mapped) cs virtual address:
>>> # Requesting the 'NtCreateFile' symbol location from a context at a different location
>>> ctx.ossi.location()
Location(binary='ntoskrnl', symbol='PoExecutePerfCheck', address=0xfffff800c9821d1b, base_address=0xfffff800c9801000, rva=0x20d1b)
>>> ctx.ossi.location(0xfffff800c9c133b0)
Location(binary='ntoskrnl', symbol='NtCreateFile', address=0xfffff800c9c133b0, base_address=0xfffff800c9801000, rva=0x4123b0)
>>> # Moving a bit changes the rva
>>> hex(ctx.ossi.location(0xfffff800c9c133df).rva)
'0x4123df'
Searching executed addresses in the trace
If you don't have a symbol attached to your address, you can also search for a specific address using the search function:
>>> # Searching for an executed address we saw in `whoami.exe`
>>> whoami_ctx == next(trace.search.pc(0x7ff72169c730))
True
Searching for strings in the trace
You can use the strings feature to search points in the trace where strings are first accessed or created:
>>> # Looking for a string containing "Network"
>>> string = next(trace.strings("Network"))
>>> string
String(data='Network Store Interface Service\\0', size=64, address=LinearAddress(offset=0xffffe00041cac2ea), first_access=#40814 movdqu xmm0, xmmword ptr [rdx + rcx], last_access=Transition(id=40828), encoding=<Encoding.Utf16: 1>)
>>> # Getting the list of memory accesses for the string
>>> for access in string.memory_accesses():
... print(access)
...
[#40814 movdqu xmm0, xmmword ptr [rdx + rcx]]Read access at @phy:0x7cffb2e8 (virtual address: lin:0xffffe00041cac2e8) of size 8
[#40815 movdqu xmm1, xmmword ptr [rdx + rcx + 0x10]]Read access at @phy:0x7cffb2f0 (virtual address: lin:0xffffe00041cac2f0) of size 16
[#40821 movdqu xmm0, xmmword ptr [rdx + rcx]]Read access at @phy:0x7cffb300 (virtual address: lin:0xffffe00041cac300) of size 16
[#40822 movdqu xmm1, xmmword ptr [rdx + rcx + 0x10]]Read access at @phy:0x7cffb310 (virtual address: lin:0xffffe00041cac310) of size 16
[#40828 movdqu xmm0, xmmword ptr [rdx + rcx]]Read access at @phy:0x7cffb320 (virtual address: lin:0xffffe00041cac320) of size 16
Manually iterating in the trace
Another way of searching interesting points is by iterating over contexts or transitions, and then looking for various information by inspecting the context or transition. Beware that if you iterate on a large portion of the trace, it may take a very long time to complete, so prefer the predefined search APIs that use optimized indexes whenever it is possible.
>>> # Finding first instruction whose mnemonic is swapgs
>>> # Warning: this example may take some time to execute
>>> import builtins # for Python 2/3 cross compatibility
>>> def find_mnemonic(trace, mnemonic, from_transition=None, to_transition=None):
... for i in builtins.range(from_transition.id if from_transition is not None else 0,
... to_transition.id if to_transition is not None else trace.transition_count):
... t = trace.transition(i)
... if t.instruction is not None and mnemonic in t.instruction.mnemonic:
... yield t
...
>>> next(find_mnemonic(trace, "swapgs"))
Transition(id=184230)
Combining the predefined search APIs with manual iteration allows to iterate over a smaller portion of the trace to extract useful information:
>>> # Finding all files that are created in a call to NtCreateFile
>>> def read_filename(ctx):
... # filename is stored in a UNICODE_STRING structure,
... # which is stored inside of an object_attribute structure,
... # a pointer to which is stored as third argument (r8) to the call
... object_attribute_addr = ctx.read(regs.r8, types.USize)
... # the pointer to the unicode string is stored as third member at offset 0x10 of object_attribute
... punicode_addr = object_attribute_addr + 0x10
... unicode_addr = ctx.read(LogicalAddress(punicode_addr), types.USize)
... # the length is stored as first member of UNICODE_STRING, at offset 0x0
... unicode_length = ctx.read(LogicalAddress(unicode_addr) + 0, types.U16)
... # the buffer is stored as third member of UNICODE_STRING, at offset 0x8
... buffer_addr = ctx.read(LogicalAddress(unicode_addr) + 8, types.USize)
... filename = ctx.read(LogicalAddress(buffer_addr),
... types.CString(encoding=types.Encoding.Utf16, max_size=unicode_length))
... return filename
...
>>> for (index, ctx) in enumerate(trace.search.symbol(nt_create_files[0])):
... if index > 5:
... break
... print("{}: {}".format(ctx, read_filename(ctx)))
...
Context before #14771105: \??\C:\Windows\SystemApps\ShellExperienceHost_cw5n1h2txyewy\resources.pri
Context before #14816618: \??\PhysicalDrive0
Context before #16353064: \??\C:\Users\reven\AppData\Local\...\AC\Microsoft
Context before #16446049: \??\C:\Users\reven\AppData\Local\...\AC\Microsoft\Windows
Context before #16698900: \??\C:\Windows\rescache\_merged\2428212390\2218571205.pri
Context before #26715236: \??\C:\Windows\system32\dps.dll
Moving in the trace
Once you identified point(s) of interest, the next step in the analysis is to navigate by following data from these points.
The API provides several features that can be used to do so.
Using the memory history
The main way to use the memory history in the trace is to use the Trace.memory_accesses
method.
This method allows to look for the next access to some memory range, starting from a transition and in a given
direction:
>>> # Choosing a memory range to track
>>> address = LogicalAddress(0xffffe00041cac2ea)
>>> # Getting the next access to that memory range from the current point
>>> memhist_transition = trace.transition(40818)
>>> next(trace.memory_accesses(address, size=64, from_transition=memhist_transition))
MemoryAccess(transition=Transition(id=40821), physical_address=PhysicalAddress(offset=0x7cffb300), size=8, operation=MemoryAccessOperation.Read, virtual_address=LinearAddress(offset=0xffffe00041cac300))
>>> # Getting the previous access to that memory range from the current point
>>> next(trace.memory_accesses(address, size=64, from_transition=memhist_transition, is_forward=False))
MemoryAccess(transition=Transition(id=40815), physical_address=PhysicalAddress(offset=0x7cffb2f8), size=8, operation=MemoryAccessOperation.Read, virtual_address=LinearAddress(offset=0xffffe00041cac2f8))
>>> # Getting all accesses to that memory range in the trace
>>> for access in trace.memory_accesses(address, size=64):
... print(access)
...
[#40814 movdqu xmm0, xmmword ptr [rdx + rcx]]Read access at @phy:0x7cffb2e8 (virtual address: lin:0xffffe00041cac2e8) of size 8
[#40815 movdqu xmm1, xmmword ptr [rdx + rcx + 0x10]]Read access at @phy:0x7cffb2f0 (virtual address: lin:0xffffe00041cac2f0) of size 8
[#40815 movdqu xmm1, xmmword ptr [rdx + rcx + 0x10]]Read access at @phy:0x7cffb2f8 (virtual address: lin:0xffffe00041cac2f8) of size 8
[#40821 movdqu xmm0, xmmword ptr [rdx + rcx]]Read access at @phy:0x7cffb300 (virtual address: lin:0xffffe00041cac300) of size 8
[#40821 movdqu xmm0, xmmword ptr [rdx + rcx]]Read access at @phy:0x7cffb308 (virtual address: lin:0xffffe00041cac308) of size 8
[#40822 movdqu xmm1, xmmword ptr [rdx + rcx + 0x10]]Read access at @phy:0x7cffb310 (virtual address: lin:0xffffe00041cac310) of size 8
[#40822 movdqu xmm1, xmmword ptr [rdx + rcx + 0x10]]Read access at @phy:0x7cffb318 (virtual address: lin:0xffffe00041cac318) of size 8
[#40828 movdqu xmm0, xmmword ptr [rdx + rcx]]Read access at @phy:0x7cffb320 (virtual address: lin:0xffffe00041cac320) of size 8
[#40828 movdqu xmm0, xmmword ptr [rdx + rcx]]Read access at @phy:0x7cffb328 (virtual address: lin:0xffffe00041cac328) of size 8
Note that the memory history works with physical addresses under the hood. Although it accepts virtual addresses in input, the range of virtual addresses in translated to physical ranges before querying the memory history. As a result, the vitual address range needs to mapped at the context of the translation for the call to succeed.
A secondary method to use is the Transition.memory_accesses
method that provides all the memory accesses that occurred
at a given transition.
>>> # Getting all memory that is accessed during a "rep mov" operation
>>> rep_tr = trace.transition(49579) # found with "find_mnemonic"
>>> print(rep_tr)
#49579 rep outsd dx, dword ptr [rsi]
>>> [(access.virtual_address, access.size) for access in rep_tr.memory_accesses()]
[(LinearAddress(offset=0xffffe000427c6fb0), 2), (LinearAddress(offset=0xffffe000427c6fb2), 2), (LinearAddress(offset=0xffffe000427c6fb4), 2), (LinearAddress(offset=0xffffe000427c6fb6), 2), (LinearAddress(offset=0xffffe000427c6fb8), 2), (LinearAddress(offset=0xffffe000427c6fba), 2)]
Using the backtrace
For any context, you can get the associated call stack by calling the Context.stack
property:
>>> # Getting the call stack
>>> rep_ctx = rep_tr.context_before()
>>> stack = rep_ctx.stack
>>> stack
Stack(Context=49579)
>>> # Displaying a human-readable backtrace
>>> stack.backtrace
[0] - #49574 - ataport!AtaPortWritePortBufferUshort
[1] - #49409 - atapi+0x23b0
[2] - #49392 - ataport!AtaPortGetParentBusType+0x3ef0
[3] - #49367 - ataport+0x6164
[4] - #49347 - ataport!AtaPortInitialize+0x49e0
[5] - #49323 - ntoskrnl!KeSynchronizeExecution
[6] - #49064 - ataport!AtaPortGetParentBusType+0x3ef0
[7] - #49000 - ataport!AtaPortInitialize+0x45a0
[8] - #48910 - ataport!AtaPortInitialize+0x3370
[9] - #48662 - ataport!AtaPortInitialize+0x2fac
[10] - #48097 - wdf01000+0x368e0
[11] - #47666 - wdf01000+0x19a70
[12] - #47597 - cdrom+0x5e30
[13] - #47529 - cdrom+0x1000
[14] - #46015 - wdf01000+0x368e0
[15] - #45936 - wdf01000+0x23ff8
[16] - #45927 - wdf01000+0x24d60
[17] - #45894 - ntoskrnl!IopProcessWorkItem
[18] - ??? - ntoskrnl!ExpWorkerThread+0x80
From there, you can use the backtrace to navigate in at least two ways:
- By going back to the caller of the current frame.
>>> # Finding back the caller transition if it exists
>>> print(next(stack.frames()).creation_transition)
#49574 call qword ptr [rip + 0x4af2]
- By going back to the previous stack. This allows for instance to switch from kernel land to user land, or to find/skip syscalls when necessary.
>>> stack.prev_stack().backtrace
[0] - #45719 - ntoskrnl!SwapContext
[1] - #45695 - ntoskrnl!KiSwapContext
[2] - #45487 - ntoskrnl!KiSwapThread
[3] - #45431 - ntoskrnl!KiCommitThreadWait
[4] - #45248 - ntoskrnl!KeWaitForMultipleObjects
[5] - ??? - dxgkrnl!DxgkUnreferenceDxgResource+0x2576a
Feature overview
The following table offers a simple comparison between widgets and features of Axion and Python API methods:
Widget | API |
---|---|
CPU | Context.read |
Instruction view | Transition , Context.ossi.location , Context.ossi.process |
Hex dump | Context.read |
Memory History | Trace.memory_accesses , Transition.memory_accesses |
Search | Trace.search |
Backtrace | Context.stack |
String | Trace.strings |
Taint | Available in preview: preview.taint |
Going further
This concludes the Python API quick start guide. For further information on the Python API, please refer to the following documents:
-
Python API analysis examples that are distributed in the
Downloads
page of the REVEN Project Manager. These scripts demonstrate the possibilities offered by the Python API through more elaborate examples. -
IDA Python API examples that are distributed in the
Downloads
page of the REVEN Project Manager. These scripts showcase the IDA compatibility of the Python API, that is, the simple capability of using the Python API from IDA:>>> import reven2 # This works from IDA, too
-
The full Python API reference documentation