REVEN API Cookbook

Welcome to the REVEN API Cookbook.

This book is a collection of examples that demonstrate good practices to accomplish common tasks using the REVEN API.

Pre-requisites

This book assumes that you already read the Python API quick start guide.

Furthermore, all the examples of this book assume that you are in an environment where you can import reven2 successfully.
If this is not the case, please refer to the installation documentation of the Python API before starting this guide.

64-bit examples

This book assumes that you are analyzing scenarios in a 64-bit context. If analyzing a scenario in a 32-bit context, then you should read from the 32-bit variants of registers or using 32-bit addresses.

So, if an example use regs.rax, the register should be replaced by regs.eax, like the following example:
ctx.read(regs.rax, U64) # 64-bit
ctx.read(regs.eax, U32) # 32-bit

Common abbreviations and variable names

REVEN scripts tend to re-use the same abbreviations and variable names for common objects.

AbbreviationDescription
serverA reven2.RevenServer instance representing a connection to a REVEN server
pmA reven2.preview.project_manager.ProjectManager instance representing a connection to the Project Manager
traceA reven2.trace.Trace instance, usually obtained from server.trace
trA reven2.trace.Transition instance, usually obtained from trace.transition(tr_id)
ctxA reven2.trace.Context instance, usually obtained from e.g. trace.context_before(tr_id)

Connecting to a server

Server variable

The other examples of this book assume that your Python environment contains a server variable that represents a connection to a REVEN server.

From its host and port

REVEN v2.0.0
# Connecting to a reven server
hostname = "localhost"
port = 13370
server = reven2.RevenServer(hostname, port)

From the scenario's name

REVEN v2.3.0
Edition Enterprise
API preview

You can use a feature of the Workflow API to get a connection to a server from the scenario's name, rather than by specifying a port.

From the CLI:

from reven2.preview.project_manager import ProjectManager
pm = ProjectManager("http://localhost:8880")  # URL to the REVEN Project Manager
connection = pm.connect("cve-2016-7255")  # No need to specify "13370"
server = connection.server

From a script:

with pm.connect("cve-2016-7255") as server:
    # TODO: use the server
    pass

This is useful, as the server port will typically change at each reopening of the scenario, while the scenario name remains the same.

If no server is open for that particular scenario when executing the ProjectManager.connect method call, then a new one will be started.

Manipulating transitions and contexts

Getting a transition or context from a transition id

REVEN v2.2.0
tr = server.trace.transition(1234)
ctx_before = server.trace.context_before(1234)
ctx_after = server.trace.context_after(1234)

Context <-> Transition

Transition -> Context

REVEN v2.2.0
ctx_before = tr.context_before()
ctx_after = tr.context_after()

Context -> Transition

REVEN v2.6.0
if ctx != server.trace.first_context:
    tr_before = ctx.transition_before()
if ctx != server.trace.last_context:
    tr_after = ctx.transition_after()
REVEN v2.2.0
if ctx != server.trace.context_before(0):
    tr_before = ctx.transition_before()
if ctx != server.trace.context_after(server.trace.transition_count - 1):
    tr_after = ctx.transition_after()
There are not always transitions around a context

While transition.context_before/context_after() always works, one must handle the case where a context is the first/last of the trace, in which case no transition before/after it can be accessed.

Trying to access the transition before the first context/after the last, will trigger an IndexError.

Getting the next/previous context and transition

next_tr = tr + 1
prev_tr = tr - 1
next_ctx = ctx + 1
prev_ctx = ctx - 1
next_next_tr = tr + 2
# ...
There is not always a next/previous transition/context

Make sure that the resulting transition/context is in range when adding/subtracting an offset to generate a new transition/context.

Trying to access a transition/context out-of-range will trigger an IndexError.

Iterating on a range of transitions/contexts

REVEN v2.2.0
for tr in server.trace.transitions(0, 1000):
    print(tr)

for ctx in server.trace.contexts():
    print(ctx)

Getting the first/last context/transition in the trace

REVEN v2.6.0
first_tr = server.trace.first_transition
last_tr = server.trace.last_transition
first_ctx = server.trace.first_context
last_ctx = server.trace.last_context
REVEN v2.2.0
first_tr = server.trace.transition(0)
last_tr = server.trace.transition(server.trace.transition_count - 1)
first_ctx = server.trace.context_before(0)
last_ctx = server.trace.context_after(server.trace.transition_count - 1)

Reading values or structs from registers or memory

REVEN v2.2.0

Common imports for easy access

from reven2.address import LinearAddress, LogicalAddress, LogicalAddressSegmentIndex, PhysicalAddress
from reven2.arch import x64 as regs
from reven2.types import *

Getting the current ring

def current_ring(ctx):
    return ctx.read(regs.cs) & 3

Reading as a type

Integer types

ctx.read(regs.rax, U8)
ctx.read(regs.rax, U16)
ctx.read(regs.rax, I16)
ctx.read(regs.rax, BigEndian(U16))

Sample output:

96
35680
-29856
24715

String

ctx.read(LogicalAddress(0xffffe00041cac2ea), CString(encoding=Encoding.Utf16,
                                                     max_character_count=1000))

Sample output:

u'Network Store Interface Service'

Array

list(ctx.read(LogicalAddress(0xffffe00041cac2ea), Array(U8, 4)))

Sample output:

[78, 0, 101, 0]

Dereferencing pointers, reading the stack

Reading [rsp+0x20] manually:

addr = LogicalAddress(0x20) + ctx.read(regs.rsp, USize)
ctx.read(addr, U64)

Reading [rsp+0x20] using deref:

ctx.deref(regs.rsp, Pointer(U64, base_address=LogicalAddress(0x20)))

Sample output:

10738

Reading struct values

REVEN v2.11.0
OS Windows 64-bit
OS Windows 32-bit

Getting a struct type from a binary:

object_attributes_ty = next(server.ossi.executed_binaries("ntoskrnl")).exact_type("_OBJECT_ATTRIBUTES")

Sample output of printing object_attributes_ty:

StructKind.Struct _OBJECT_ATTRIBUTES /* 0x30 */ {
    /* 0x0 */ Length : U32,
    /* 0x8 */ RootDirectory : void*,
    /* 0x10 */ ObjectName : _UNICODE_STRING*,
    /* 0x18 */ Attributes : U32,
    /* 0x20 */ SecurityDescriptor : void*,
    /* 0x28 */ SecurityQualityOfService : void*,
}

Reading a struct instance from a source (register, memory, etc):

# Reading structure from pointer stored in r8
object_attributes = ctx.read(reven2.address.LogicalAddress(0xffffe00041cac2ea), object_attributes_ty)

# Read one field according to declared type
object_attributes.field("Length").read()

Sample output:

48

You can also:

# Read one field, forcing its type to int-like. Useful for IDE type hints
object_attributes.field("Attributes").read_int()

# Get whole struct content as bytes
object_attributes.as_bytes()

# Dereferences a pointer-like field, read it as an inner struct, display its type
print(object_attributes.field("ObjectName").deref_struct().type)

Sample output:

0
b'0\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x98\xd1]\x06\x00\x00\x00\x00B\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
StructKind.Struct _UNICODE_STRING /* 0x10 */ {
    /* 0x0 */ Length : U16,
    /* 0x2 */ MaximumLength : U16,
    /* 0x8 */ Buffer : U16*,
}

Parsing a raw buffer as a type

U16.parse(b"\x10\x20")
BigEndian(U16).parse(b"\x10\x20")
Array(U8, 2).parse(b"\x10\x20")

Sample output:

8208
4128
[16, 32]

Moving to the beginning of a function

REVEN v2.9.0
from reven2.trace import Transition
from typing import Tuple, Optional

def call_bounds(tr: Transition) -> Tuple[Optional[Transition], Optional[Transition]]:
    """
    Given a transition anywhere inside of a function, this function returns
    the transition at the beginning of the function call and the transition at
    the end of the function call.
    """
    return (tr.step_out(is_forward=False), tr.step_out(is_forward=True))

Searching for function calls

Looking for an exact symbol name

REVEN v2.2.0
symbol_name = "CreateProcessW"
binary_hint = r"kernelbase\.dll"
try:
    symbol = next(server.ossi.symbols(f"^{symbol_name}$",
                                      binary_hint=binary_hint))
except StopIteration:
    raise RuntimeError(f"Could not find symbol '{symbol_name}' with binary hint '{binary_hint}'")
for ctx in server.trace.search.symbol(symbol):
    print(ctx)
Handling of the case where the symbol is not in the trace

When looking for an exact symbol, you need to prepare for the case where the symbol is not available in the trace. Note that a symbol can be available even if it is never called.

In the example, it manifests with the iterator having no element, which raises a StopIteration, that we catch and then convert to a more specific error.

Symbol pattern is a regular expression

When looking for a single, exact symbol, you need to enclose the symbol's name in the pattern with ^$ because the pattern is actually a regular expression. Failure to do that could result in several matching symbols, with the risk that the rest of your code actually chooses the wrong one.

Looking for multiple symbols

REVEN v2.2.0

The example provides an iterator of the tuples where the first element is the context of the call, and the second the name of the called symbol.

from itertools import repeat

def first_symbol(symbol_name):
    return next(server.ossi.symbols(f"^{symbol_name}$", binary_hint=binary))


binary = "c:/windows/system32/ntoskrnl.exe"
symbols = ["NtCreateFile", "NtOpenFile", "NtOpenDirectoryObject"]

symbols_name = [(first_symbol(symbol), symbol) for symbol in symbols]

symbols_name = [zip(server.trace.search.symbol(symbol[0]),
                    repeat(symbol[1])) for symbol in symbols_name]

for ctx_name in reven2.util.collate(symbols_name, lambda ctx_name: ctx_name[0]):
    print(f"{ctx_name[1]}: {ctx_name[0]}")

Sample output:

NtCreateFile: Context before #4468509
NtCreateFile: Context before #4479526
NtCreateFile: Context before #6451786
NtCreateFile: Context before #6852400
NtCreateFile: Context before #7666717
NtCreateFile: Context before #8067013
NtCreateFile: Context before #8298671
NtCreateFile: Context before #8648240
NtOpenFile: Context before #26656294
NtCreateFile: Context before #35251786
NtOpenFile: Context before #36420358
NtOpenFile: Context before #43268534
NtOpenDirectoryObject: Context before #43420816
NtOpenFile: Context before #43450170

Looking for a crash

System crashes

REVEN v2.2.0
OS Windows 64-bit

Look for the symbol KeBugCheckEx in ntoskrnl:

crash_symbol = next(server.ossi.symbols("^KeBugCheckEx$",
                                        binary_hint="ntoskrnl"))
for ctx in server.trace.search.symbol(crash_symbol):
    print(f"System crash at {ctx}")

Process crashes

REVEN v2.3.0
OS Windows 64-bit

Look for the symbol KiUserExceptionDispatch in ntdll:

crash_symbol = next(server.ossi.symbols("^KiUserExceptionDispatch$",
                                        binary_hint="ntdll"))
for ctx in server.trace.search.symbol(crash_symbol):
    process = ctx.ossi.process()
    print(f"{process.name} crashed at {ctx}")

Finding out when a memory location is accessed

Getting all the memory accesses of a range of addresses

REVEN v2.2.0
for access in server.trace.memory_accesses(0xffff88007fc03000, 4096):
    print(access)

Sample output:

[#39 call 0xffffffff81611fe0 ($+0x165133)]Write access at @phy:0x7fc03ec8 (virtual address:
        lin:0xffff88007fc03ec8) of size 8
[#41 call qword ptr [0xffffffff81c24448]]Write access at @phy:0x7fc03ec0 (virtual address:
        lin:0xffff88007fc03ec0) of size 8
[#42 push rdx]Write access at @phy:0x7fc03eb8 (virtual address: lin:0xffff88007fc03eb8) of size 8
[#48 pop rdx]Read access at @phy:0x7fc03eb8 (virtual address: lin:0xffff88007fc03eb8) of size 8
[#49 ret ]Read access at @phy:0x7fc03ec0 (virtual address: lin:0xffff88007fc03ec0) of size 8
[#51 push rdi]Write access at @phy:0x7fc03ec0 (virtual address: lin:0xffff88007fc03ec0) of size 8
[#52 popfq ]Read access at @phy:0x7fc03ec0 (virtual address: lin:0xffff88007fc03ec0) of size 8
[#54 ret ]Read access at @phy:0x7fc03ec8 (virtual address: lin:0xffff88007fc03ec8) of size 8
[#60 call 0xffffffff814abe30 ($-0x108f)]Write access at @phy:0x7fc03ec8 (virtual address:
        lin:0xffff88007fc03ec8) of size 8
[#62 push r14]Write access at @phy:0x7fc03ec0 (virtual address: lin:0xffff88007fc03ec0) of size 8

Getting all the memory accesses on a range of transitions

REVEN v2.9.0
for access in server.trace.memory_accesses(from_transition=server.trace.transition(1000),
                                    to_transition=server.trace.transition(1010)):
    print(access)

Sample output:

[#1005 mov qword ptr [rsp+0x40], r14]Write access at @phy:0x6645a9b0
        (virtual address: lin:0xfffffe0ff31db9b0) of size 8
[#1007 mov qword ptr [rsp+0x98], r14]Write access at @phy:0x6645aa08
        (virtual address: lin:0xfffffe0ff31dba08) of size 8
[#1008 or dword ptr [rsp+0x90], 0x2]Read access at @phy:0x6645aa00
        (virtual address: lin:0xfffffe0ff31dba00) of size 4
[#1008 or dword ptr [rsp+0x90], 0x2]Write access at @phy:0x6645aa00
        (virtual address: lin:0xfffffe0ff31dba00) of size 4

Finding the memory accesses at a transition

REVEN v2.2.0
for access in tr.memory_accesses():
    print(access)

Sample output:

[MemoryAccess(transition=Transition(id=42), physical_address=PhysicalAddress(offset=0x7fc03eb8), size=8,
    operation=MemoryAccessOperation.Write, virtual_address=LinearAddress(offset=0xffff88007fc03eb8))]

Displaying the value of arguments and return value of a call

REVEN v2.10.0
API preview
OS Windows 64-bit
OS Linux 64-bit

When the prototype is known

Windows 64-bit

Use the Ms64 calling convention.

call_tr = tr.step_out(is_forward=False)
import reven2.preview.prototypes
prototypes = reven2.preview.prototypes.RevenPrototypes(server)
call_conv = prototypes.calling_conventions.Ms64
prototype = "char * __cdecl OaGetEnv(char const *);"
f = prototypes.parse_one_function(prototype, call_conv)
call = f.call_site_values(call_tr)
call.arg_n(0)
call.ret()

Sample output:

'OACACHEPARAMS'
0

Linux 64-bit

Use the Sysv64 calling convention.

call_tr = tr.step_out(is_forward=False)
import reven2.preview.prototypes
prototypes = reven2.preview.prototypes.RevenPrototypes(server)
call_conv = prototypes.calling_conventions.Sysv64
prototype = "struct FILE; FILE* fopen64(const char *filename, const char *mode);"
f = prototypes.parse_one_function(prototype, call_conv)
call = f.call_site_values(call_tr)
call.args()
call.ret()

Sample output:

{'filename': '/proc/spl/kstat/zfs/arcstats', 'mode': 'r'}
0

Using a default prototype

Example with 5 parameters.

call_tr = tr.step_out(is_forward=False)
import reven2.preview.prototypes
prototypes = reven2.preview.prototypes.RevenPrototypes(server)
call_conv = prototypes.calling_conventions.Ms64
prototype = "void* f(void* p0, void* p1, void* p2, void* p3, void* p4);"
f = prototypes.parse_one_function(prototype, call_conv)
call = f.call_site_values(call_tr)
call.args()
call.ret()

Sample output:

{'p0': 18446735287469384880,
 'p1': 0,
 'p2': 18446735287473289024,
 'p3': 0,
 'p4': 0}
1364968393473
Limitations apply

Using a default prototype works as long as the replaced parameters behave as void*.

This is notably not the case for structs passed by value that are larger than a pointer (in some calling conventions) and for floating-point arguments (in Sysv64).

Searching for values in memory

Searching for values in a range of memory at a single context

If the value you are looking for might be in a range of memory at a specific point in the trace, you can search it with the following:

pattern = bytearray([0x77, 0x43])
for match_address in ctx.search_in_memory(pattern,
                                          ctx.read(regs.rsp), 0x1000000):
     print(f"'{pattern}' is found starting at @{match_address}")

Searching for string-like values accessed in the trace

REVEN v2.2.0

If the value you are looking for looks like a string indexed by the strings resource (defaults to strings of printable characters of length 5-128, possibly UTF-16 encoded), then you can use the strings search API, which will be the fastest way:

from_tr = server.trace.first_transition
to_tr = server.trace.last_transition

for string in server.trace.strings("Network"):
    for access in string.memory_accesses(from_tr, to_tr):
        print(access)

Sample output:

[#4653465 mov word ptr [rcx], ax]Write access at @phy:0x18e8252c (virtual address: lin:0x3a252c) of size 2
[#4653475 mov word ptr [rcx], ax]Write access at @phy:0x18e8252e (virtual address: lin:0x3a252e) of size 2
[#4653485 mov word ptr [rcx], ax]Write access at @phy:0x18e82530 (virtual address: lin:0x3a2530) of size 2
[#4653495 mov word ptr [rcx], ax]Write access at @phy:0x18e82532 (virtual address: lin:0x3a2532) of size 2
[#4653505 mov word ptr [rcx], ax]Write access at @phy:0x18e82534 (virtual address: lin:0x3a2534) of size 2
[#4653515 mov word ptr [rcx], ax]Write access at @phy:0x18e82536 (virtual address: lin:0x3a2536) of size 2
[#4653525 mov word ptr [rcx], ax]Write access at @phy:0x18e82538 (virtual address: lin:0x3a2538) of size 2
[#4653535 mov word ptr [rcx], ax]Write access at @phy:0x18e8253a (virtual address: lin:0x3a253a) of size 2
[#4653545 mov word ptr [rcx], ax]Write access at @phy:0x18e8253c (virtual address: lin:0x3a253c) of size 2
[#4653555 mov word ptr [rcx], ax]Write access at @phy:0x18e8253e (virtual address: lin:0x3a253e) of size 2
[#4653565 mov word ptr [rcx], ax]Write access at @phy:0x18e82540 (virtual address: lin:0x3a2540) of size 2
[#4653575 mov word ptr [rcx], ax]Write access at @phy:0x18e82542 (virtual address: lin:0x3a2542) of size 2
[#4653585 mov word ptr [rcx], ax]Write access at @phy:0x18e82544 (virtual address: lin:0x3a2544) of size 2
[#4653595 mov word ptr [rcx], ax]Write access at @phy:0x18e82546 (virtual address: lin:0x3a2546) of size 2
[#4653605 mov word ptr [rcx], ax]Write access at @phy:0x18e82548 (virtual address: lin:0x3a2548) of size 2
[#4653615 mov word ptr [rcx], ax]Write access at @phy:0x18e8254a (virtual address: lin:0x3a254a) of size 2
[#4653625 mov word ptr [rcx], ax]Write access at @phy:0x18e8254c (virtual address: lin:0x3a254c) of size 2
Unaccessed values are not found by this method

This method only finds values in memory that are accessed (read from, written to) during the portion of the trace where the search takes place.

If you don't know if your value is accessed in the trace, you can associate this method with a search in memory at a context.

Searching for other kinds of values accessed in the trace

REVEN v2.6.0
from_ctx = server.trace.first_context
to_ctx = server.trace.last_context

for match in server.trace.search.memory(b"\xc0\xfc\x75\x02", from_ctx, to_ctx).matches():
    print(match)

Sample output:

id: 0 | @lin:0xab600 (mapped at Context before #5665813) | [Context before #5665813 - Context after #16899165] | 1 access(es)
id: 1 | @lin:0xbadf0 (mapped at Context before #5666826) | [Context before #5666826 - Context after #16899165] | 1 access(es)
id: 2 | @lin:0xac810 (mapped at Context before #5666966) | [Context before #5666966 - Context before #6141251] | 1 access(es)
id: 3 | @lin:0xb0935 (mapped at Context before #6140550) | [Context before #6140550 - Context after #16899165] | 1 access(es)
id: 4 | @lin:0xab790 (mapped at Context before #6143172) | [Context before #6143172 - Context after #16899165] | 1 access(es)
id: 5 | @lin:0xffffe0016aec9800 (mapped at Context before #6144279) | [Context before #6144279 - Context before #6379441] | 2 access(es)
id: 6 | @lin:0xffffe0016a7accc0 (mapped at Context before #6152909) | [Context before #6152909 - Context before #6372050] | 1 access(es)
id: 7 | @lin:0xc7e605 (mapped at Context before #6218204) | [Context before #6218204 - Context after #16899165] | 15 access(es)
id: 8 | @lin:0xc67780 (mapped at Context before #6235773) | [Context before #6235773 - Context after #16899165] | 10 access(es)
id: 9 | @lin:0x29feb26 (mapped at Context before #6360085) | [Context before #6360085 - Context before #6360107] | 2 access(es)
id: 10 | @lin:0x29feb26 (mapped at Context before #6360108) | [Context before #6360108 - Context before #6360254] | 3 access(es)
id: 11 | @lin:0x29feb26 (mapped at Context before #6360255) | [Context before #6360255 - Context after #16899165] | 2 access(es)
id: 12 | @lin:0x29feb06 (mapped at Context before #6360265) | [Context before #6360265 - Context before #6360264] | 0 access(es)
id: 13 | @lin:0x29feb06 (mapped at Context before #6360265) | [Context before #6360265 - Context after #16899165] | 2 access(es)
id: 14 | @lin:0x29fe946 (mapped at Context before #6360440) | [Context before #6360440 - Context before #6360439] | 0 access(es)
id: 15 | @lin:0x29fe946 (mapped at Context before #6360440) | [Context before #6360440 - Context before #6360458] | 2 access(es)
Unaccessed values are not found by this method

This method only finds values in memory that are accessed (read from, written to) during the portion of the trace where the search takes place.

If you don't know if your value is accessed in the trace, you can associate this method with a search in memory at a context.

Listing the processes, binaries in the trace

Listing the processes executed in the trace

REVEN v2.10.0
OS Windows 64-bit
for process in server.ossi.executed_processes():
    print(process)

Sample output:

cmd.exe (2320)
chat_client.exe (2832)
conhost.exe (2704)
cmd.exe (2716)
ShellExperienceHost.exe (2044)
svchost.exe (876)
conhost.exe (2596)
...

Listing the binaries executed in the trace

REVEN v2.2.0
for binary in server.ossi.executed_binaries():
   print(binary)

Sample output:

c:/windows/system32/wevtsvc.dll
c:/windows/system32/oleacc.dll
c:/windows/system32/inputswitch.dll
c:/windows/explorer.exe
c:/windows/system32/ci.dll
c:/windows/system32/drivers/pciidex.sys
c:/windows/system32/drivers/intelide.sys

Finding a single binary in the trace

REVEN v2.2.0
def find_one_binary(binary_path):
    """
    Return the binary corresponding to the passed portion of its path if any,
    None if there isn't one, and throws if there would be two matches or more.
    """
    query = server.ossi.executed_binaries(binary_path)
    try:
        first = next(query)
    except StopIteration:
        return None
    try:
        second = next(query)
        raise ValueError(f"Found multiple binaries '{first}' and '{second}' for query '{binary_path}'")
    except StopIteration:
        return first

Finding the base address where a binary has been loaded

Because a binary can be loaded multiple times at different addresses in a trace, we recover the base address from a Context where the binary is executed.

Finding the base address of the first instance of a binary in the trace

REVEN v2.2.0
def find_first_base_address(binary: reven2.ossi.ossi.Binary):
    for ctx in server.trace.search.binary(binary):
        return ctx.ossi.location().base_address

Finding all the base addresses of a binary in a specified process

REVEN v2.10.0
def find_base_address_in_process(binary: reven2.ossi.ossi.Binary, process: reven2.ossi.process.Process):
    for ctx_range in server.trace.filter(processes=(process,)):
        for ctx in server.trace.search.binary(binary, ctx_range.begin, ctx_range.end):
            return ctx.ossi.location().base_address

Focusing on a portion of the trace

Filtering on processes/ring

REVEN v2.10.0
processes = list(server.ossi.executed_processes("chat"))
for ctx_range in server.trace.filter(processes, reven2.filter.RingPolicy.R3Only):
    first_context = next(iter(ctx_range))
    print(f"{ctx_range}: {first_context.ossi.process()}\t| {first_context.ossi.location()}")

Sample output:

[Context before #5662909, Context before #5669296]: chat_client.exe (2816)	| ntdll!ZwDeviceIoControlFile+0x14
[Context before #5670173, Context before #5671124]: chat_client.exe (2816)	| ntdll!NtSetIoCompletion+0x14
[Context before #5672523, Context before #5673027]: chat_client.exe (2816)	| ntdll!NtRemoveIoCompletionEx+0x14
[Context before #5678502, Context before #5678522]: chat_client.exe (2816)	| chat_client!<tokio_timer::clock::clock::Clock as tokio_timer::timer::now::Now>::now+0x24
[Context before #5678723, Context before #5679392]: chat_client.exe (2816)	| ntdll!ZwQueryPerformanceCounter+0x14
...

Following the dataflow

Finding out from where data comes

The below displays all processes and functions that some tainted buffer goes through:

import reven2.preview.taint as tainting

tainter = tainting.Tainter(server.trace)

taint = tainter.simple_taint("rax", to_context=tr.context_before(), is_forward=False)

last_symbol = None
last_process = None
for access in taint.accesses().all():
    ctx_after = access.transition.context_after()
    new_process = ctx_after.ossi.process()
    new_symbol = ctx_after.ossi.location().symbol
    if new_symbol is None:
        if last_symbol is not None:
            print(f"{new_process}: ???")
        last_symbol = None
        last_process = new_process
        continue
    if (last_symbol is not None and last_process is not None
        and new_symbol == last_symbol
        and new_process.pid == last_process.pid
       ):
        continue

    last_symbol = new_symbol
    last_process = new_process

    print(f"{new_process}: {new_symbol}")

Sample output:

conhost.exe (2704): win32kfull!memcpy
conhost.exe (2704): msvcrt!memcpy
conhost.exe (2704): conhostv2!WriteCharsLegacy
conhost.exe (2704): conhostv2!WriteBuffer::Print
conhost.exe (2704): conhostv2!Microsoft::Console::VirtualTerminal::AdaptDispatch::Print
conhost.exe (2704): conhostv2!WriteChars
conhost.exe (2704): msvcrt!memcpy
conhost.exe (2704): conhostv2!WriteCharsLegacy
conhost.exe (2704): conhostv2!WriteBuffer::Print
conhost.exe (2704): conhostv2!Microsoft::Console::VirtualTerminal::AdaptDispatch::Print
conhost.exe (2704): conhostv2!WriteChars
conhost.exe (2704): msvcrt!memcpy
conhost.exe (2704): conhostv2!WriteCharsLegacy
conhost.exe (2704): conhostv2!WriteBuffer::Print
conhost.exe (2704): conhostv2!Microsoft::Console::VirtualTerminal::AdaptDispatch::Print
conhost.exe (2704): conhostv2!WriteChars
conhost.exe (2704): msvcrt!memcpy
conhost.exe (2704): conhostv2!WriteCharsLegacy
conhost.exe (2704): conhostv2!WriteBuffer::Print
conhost.exe (2704): conhostv2!Microsoft::Console::VirtualTerminal::AdaptDispatch::Print
conhost.exe (2704): conhostv2!WriteChars
conhost.exe (2704): msvcrt!memcpy
conhost.exe (2704): conhostv2!WriteCharsLegacy
conhost.exe (2704): conhostv2!WriteBuffer::Print
conhost.exe (2704): conhostv2!Microsoft::Console::VirtualTerminal::AdaptDispatch::Print
conhost.exe (2704): conhostv2!WriteChars
conhost.exe (2704): ???
chat_client.exe (2832): ntdll!memcpy
chat_client.exe (2832): chat_client!<alloc::vec::Vec<T> as alloc::vec::SpecExtend<T, I>>::from_iter
chat_client.exe (2832): ntdll!RtlpReAllocateHeapInternal
chat_client.exe (2832): ntdll!memcpy
chat_client.exe (2832): ntdll!RtlpReAllocateHeapInternal
chat_client.exe (2832): chat_client!<alloc::vec::Vec<T> as alloc::vec::SpecExtend<T, I>>::from_iter
chat_client.exe (2832): ntdll!RtlpAllocateHeapInternal
chat_client.exe (2832): chat_client!<alloc::vec::Vec<T> as alloc::vec::SpecExtend<T, I>>::from_iter
chat_client.exe (2832): msvcrt!memcpy
chat_client.exe (2832): netio!memcpy
chat_server.exe (648): netio!memcpy
chat_server.exe (648): ???
chat_server.exe (648): msvcrt!memcpy
chat_server.exe (648): chat_server!bytes::buf::buf_mut::BufMut::put
chat_server.exe (648): msvcrt!memcpy
chat_server.exe (648): chat_server!bytes::buf::buf_mut::BufMut::put
chat_server.exe (648): chat_server!<&'a str as bytes::buf::into_buf::IntoBuf>::into_buf
chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll
chat_server.exe (648): chat_server!bytes::bytes::Inner::reserve
chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll
chat_server.exe (648): chat_server!<futures::sync::mpsc::UnboundedReceiver<T> as futures::stream::Stream>::poll
chat_server.exe (648): chat_server!<futures::sync::mpsc::Receiver<T>>::next_message
chat_server.exe (648): chat_server!<futures::sync::mpsc::queue::Queue<T>>::pop
chat_server.exe (648): chat_server!<futures::sync::mpsc::Sender<T>>::queue_push_and_signal
chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll
chat_server.exe (648): chat_server!<bytes::bytes::Bytes as core::clone::Clone>::clone
chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll
chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as bytes::buf::buf_mut::BufMut>::put_slice
chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice
chat_server.exe (648): chat_server!bytes::bytes::Inner::reserve
chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice
chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll
chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as bytes::buf::buf_mut::BufMut>::put_slice
chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice
chat_server.exe (648): chat_server!bytes::bytes::Inner::reserve
chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice
chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll
chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as bytes::buf::buf_mut::BufMut>::put_slice
chat_server.exe (648): msvcrt!memcpy
chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as bytes::buf::buf_mut::BufMut>::put_slice
chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice
chat_server.exe (648): chat_server!bytes::bytes::Inner::reserve
chat_server.exe (648): chat_server!bytes::bytes::BytesMut::extend_from_slice
chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll
chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as core::clone::Clone>::clone
chat_server.exe (648): msvcrt!memcpy
chat_server.exe (648): chat_server!<bytes::bytes::BytesMut as core::clone::Clone>::clone
chat_server.exe (648): chat_server!<chat_server::Peer as futures::future::Future>::poll
chat_server.exe (648): chat_server!<chat_server::Lines as futures::stream::Stream>::poll
chat_server.exe (648): chat_server!bytes::bytes::BytesMut::split_off
chat_server.exe (648): chat_server!bytes::bytes::Inner::set_start
chat_server.exe (648): chat_server!bytes::bytes::BytesMut::split_off
chat_server.exe (648): chat_server!<chat_server::Lines as futures::stream::Stream>::poll
chat_server.exe (648): chat_server!bytes::bytes::BytesMut::split_to
chat_server.exe (648): chat_server!bytes::bytes::Inner::shallow_clone_sync
chat_server.exe (648): chat_server!bytes::bytes::BytesMut::split_to
chat_server.exe (648): chat_server!<chat_server::Lines as futures::stream::Stream>::poll
chat_server.exe (648): chat_server!<tokio_tcp::stream::TcpStream as tokio_io::async_read::AsyncRead>::read_buf
chat_server.exe (648): chat_server!mio::sys::windows::tcp::TcpStream::readv
chat_server.exe (648): chat_server!<&'a std::net::tcp::TcpStream as std::io::Read>::read
chat_server.exe (648): ws2_32!recv
chat_server.exe (648): mswsock!WSPRecv
chat_server.exe (648): ntoskrnl!IopXxxControlFile
chat_server.exe (648): ???
chat_server.exe (648): ntoskrnl!IopXxxControlFile
chat_server.exe (648): ntoskrnl!NtDeviceIoControlFile
chat_server.exe (648): ntoskrnl!KiSystemCall64
chat_server.exe (648): mswsock!WSPRecv
chat_server.exe (648): ws2_32!recv
chat_server.exe (648): chat_server!mio::sys::windows::tcp::TcpStream::readv
chat_client.exe (2816): ???
chat_client.exe (2816): netio!RtlCopyMdlToBuffer
chat_client.exe (2816): ???
chat_client.exe (2816): tcpip!TcpIndicateData
chat_client.exe (2816): ndis!NdisAdvanceNetBufferDataStart
chat_client.exe (2816): tcpip!IppPrevalidateLoopbackReceive
chat_client.exe (2816): tcpip!IpNlpFastContinueSendDatagram
chat_client.exe (2816): tcpip!IppSendDatagramsCommon
chat_client.exe (2816): tcpip!IppPreparePacketChecksum
chat_client.exe (2816): tcpip!IppSendDatagramsCommon
chat_client.exe (2816): tcpip!TcpTcbSend
chat_client.exe (2816): netio!NetioExtendNetBuffer
chat_client.exe (2816): tcpip!TcpSegmentTcbSend
chat_client.exe (2816): tcpip!TcpBeginTcbSend
chat_client.exe (2816): netio!NetioAllocateAndReferenceNetBufferListNetBufferMdlAndData
chat_client.exe (2816): ndis!NdisAllocateNetBufferList
chat_client.exe (2816): netio!NetioAllocateAndReferenceNetBufferListNetBufferMdlAndData
chat_client.exe (2816): tcpip!TcpBeginTcbSend
chat_client.exe (2816): tcpip!TcpTcbSend
chat_client.exe (2816): tcpip!TcpEnqueueTcbSend
chat_client.exe (2816): ???
chat_client.exe (2816): chat_client!<std::net::tcp::TcpStream as miow::net::TcpStreamExt>::write_overlapped
chat_client.exe (2816): chat_client!mio::sys::windows::tcp::StreamImp::schedule_write
chat_client.exe (2816): chat_client!mio::poll::SetReadiness::set_readiness
chat_client.exe (2816): chat_client!mio::sys::windows::tcp::StreamImp::schedule_write
chat_client.exe (2816): chat_client!mio::sys::windows::tcp::TcpStream::writev
chat_client.exe (2816): chat_client!<iovec::IoVec as core::ops::deref::Deref>::deref
chat_client.exe (2816): chat_client!mio::sys::windows::tcp::TcpStream::writev
chat_client.exe (2816): chat_client!mio::sys::windows::selector::ReadyBinding::get_buffer
chat_client.exe (2816): chat_client!<mio::net::tcp::TcpStream as std::io::Write>::write
chat_client.exe (2816): chat_client!iovec::IoVec::from_bytes
chat_client.exe (2816): chat_client!<mio::net::tcp::TcpStream as std::io::Write>::write
chat_client.exe (2816): chat_client!<tokio_reactor::poll_evented::PollEvented<E> as std::io::Write>::write
chat_client.exe (2816): chat_client!<tokio_reactor::poll_evented::PollEvented<E>>::poll_write_ready
chat_client.exe (2816): chat_client!<tokio_reactor::poll_evented::PollEvented<E> as std::io::Write>::write
chat_client.exe (2816): chat_client!_ZN99_$LT$tokio_io.._tokio_codec..framed_write..FramedWrite2$LT$T$GT$$u20$as$u20$futures..sink..Sink$GT$13poll_complete17h99e4d
chat_client.exe (2816): chat_client!bytes::buf::buf_mut::BufMut::put
chat_client.exe (2816): chat_client!<&'a str as bytes::buf::into_buf::IntoBuf>::into_buf
chat_client.exe (2816): chat_client!_ZN99_$LT$tokio_io.._tokio_codec..framed_write..FramedWrite2$LT$T$GT$$u20$as$u20$futures..sink..Sink$GT$10start_send17h590abca8
chat_client.exe (2816): chat_client!<futures::stream::split::SplitSink<S> as futures::sink::Sink>::start_send
chat_client.exe (2816): chat_client!<futures::stream::forward::Forward<T, U>>::try_start_send
chat_client.exe (2816): chat_client!<futures::stream::forward::Forward<T, U> as futures::future::Future>::poll
chat_client.exe (2816): chat_client!<futures::stream::map_err::MapErr<S, F> as futures::stream::Stream>::poll
chat_client.exe (2816): chat_client!<futures::sync::mpsc::Receiver<T> as futures::stream::Stream>::poll
chat_client.exe (2816): chat_client!<futures::sync::mpsc::Receiver<T>>::next_message
chat_client.exe (2816): chat_client!<futures::sync::mpsc::queue::Queue<T>>::pop
chat_client.exe (2816): chat_client!<futures::sync::mpsc::Sender<T>>::do_send
chat_client.exe (2816): chat_client!<futures::sink::send::Send<S> as futures::future::Future>::poll
chat_client.exe (2816): chat_client!futures::future::Future::wait
chat_client.exe (2816): chat_client!chat_client::read_stdin
chat_client.exe (2816): chat_client!<std::io::stdio::Stdin as std::io::Read>::read
chat_client.exe (2816): chat_client!<std::io::buffered::BufReader<R> as std::io::Read>::read
chat_client.exe (2816): chat_client!<std::io::buffered::BufReader<R> as std::io::BufRead>::fill_buf
chat_client.exe (2816): chat_client!std::sys::windows::stdio::Stdin::read
chat_client.exe (2816): chat_client!<std::io::cursor::Cursor<T> as std::io::Read>::read
chat_client.exe (2816): chat_client!std::sys::windows::stdio::Stdin::read
chat_client.exe (2816): ntdll!RtlpFreeHeap
chat_client.exe (2816): chat_client!std::sys::windows::stdio::Stdin::read
chat_client.exe (2816): chat_client!alloc::string::String::from_utf16
chat_client.exe (2816): chat_client!alloc::string::String::push
chat_client.exe (2816): ntdll!RtlpReAllocateHeapInternal
chat_client.exe (2816): chat_client!alloc::string::String::push
chat_client.exe (2816): ntdll!RtlpReAllocateHeapInternal
chat_client.exe (2816): chat_client!alloc::string::String::push
chat_client.exe (2816): ntdll!RtlpReAllocateHeapInternal
chat_client.exe (2816): chat_client!alloc::string::String::push
chat_client.exe (2816): ntdll!RtlpAllocateHeapInternal
chat_client.exe (2816): chat_client!alloc::string::String::from_utf16
chat_client.exe (2816): kernelbase!StrRetToStrW

Displaying data nicely in Jupyter notebooks

Detecting if the script is executed in a notebook or not

def in_notebook():
    """
    Detect if we are currently running a Jupyter notebook.

    This is used to display rendered results inline in Jupyter when we are
    executing in the context of a Jupyter notebook, or to display raw results
    on the standard output when we are executing in the context of a script.
    """
    try:
        from IPython import get_ipython  # type: ignore

        if get_ipython() is None or ("IPKernelApp" not in get_ipython().config):
            return False
    except ImportError:
        return False
    return True
REVEN v2.5.0
Environment Jupyter
display(tr)
display(address)
display(bookmark)

Sample output:

Synchronization links

Displaying in tables

# helper functions to output html tables
def table_line(cells):
    line = ""
    for cell in cells:
        line += "<td>{}</td>".format(cell)
    return "<tr>{}</tr>".format(line)

def display_table(title, headers, html_lines):
    header_line = ""
    for header in headers:
        header_line += "<th>{}</th>".format(header)
    header_line = "<tr>{}</tr>".format(header_line)
    display(HTML("""<h2>{}</h2><table>{} {}</table>""".format(title, header_line, html_lines)))

Pretty printing

Basics

REVEN v2.7.0
Environment Jupyter
from IPython.display import HTML, display
display(HTML(f"<strong>{tr.format_as_html()}</strong>"))

Sample output:

Transition displayed strong

Transition

REVEN v2.5.0
Environment Jupyter
import reven2
from IPython.display import HTML, display
import re
import itertools
import html

all_regs = {}
for reg in reven2.arch.helpers.x64_registers():
    all_regs[reg.name] = reg

def tokenize_string(string):
    return re.split(" |dword|ptr|\\[|\\]|\\+|\\*|,", string)

def tokenize_instruction(transition):
    if transition.instruction is None:
        return []
    return tokenize_string(str(transition.instruction))

def get_pretty_print_tr(tr, show_context=False, show_symbol=False):
    output = ""
    if show_symbol:
        output += "<span>{}</span><br/>".format(html.escape(str(tr.context_before().ossi.location())))

    output += tr._repr_html_()
    output += " <code>{}</code>".format(str(tr).split(" ", 1)[1])
    output += "<br/>"

    if not show_context:
        return '<p style="font-family:monospace" class="tex2jax_ignore">' + output + "</p>"

    instr_elements = tokenize_instruction(tr)
    done_already = []
    print_data = []
    for elem in instr_elements:
        if elem in done_already:
            continue
        done_already.append(elem)
        if elem in all_regs:
            before = tr.context_before().read(all_regs[elem])
            after = tr.context_after().read(all_regs[elem])
            if before == after or elem in ["rip"]:
                print_data.append("{} = {:x}".format(elem, before))
            else:
                print_data.append("{} = {:x} to {:x}".format(elem, before, after))

    output += ", ".join(print_data) + "<br/>"
    print_data = []

    max_items = 4
    accesses = list(itertools.islice(tr.memory_accesses(), max_items))
    for acc in accesses:
        elem = "{}[{:#x}]:{}".format("R" if acc.operation == reven2.memhist.MemoryAccessOperation.Read else "W",
                                     acc.virtual_address.offset,
                                     acc.size)
        try:
            before = tr.context_before().read(acc.virtual_address, acc.size)
            after = tr.context_after().read(acc.virtual_address, acc.size)
            if before == after:
                print_data.append("{} = {:x}".format(elem, before))
            else:
                print_data.append("{} = {:x} to {:x}".format(elem, before, after))
        except:
            print_data.append(elem + " = ?")

    if len(accesses) > max_items:
        print_data.append("...")
    output += ", ".join(print_data)

    return '<p style="font-family:monospace" class="tex2jax_ignore">' + output + "</p>"

def pretty_print_tr(tr, show_context=False, show_symbol=False):
    display(HTML(get_pretty_print_tr(tr, show_context=show_context, show_symbol=show_symbol)))

Sample output:

Pretty transitions

Address / Hexdump

REVEN v2.5.0
Environment Jupyter

Pretty print the content of a buffer at a specified address, in 0x10 increments. If passed a register, it will be dereferenced.

import reven2
from reven2.address import LinearAddress  # shortcut when reading addresses
from IPython.display import HTML, display

def print_buffer(b, addr=0, col=16, highlights = []):
    style_highlight = '<span style="background-color:yellow">'
    style_highlight_off = '</span>'
    output = '<code style="background-color:white">\n'

    prev_all_zeros = 0
    for i in range(int(len(b) / col)):

        total_sum = sum(b[i*col:(i+1)*col])
        for h in highlights:
            if h in range(i*col,col):
                total_sum = 0

        total_sum_next = sum(b[(i+1)*col:(i+2)*col])
        if total_sum == 0:
            prev_all_zeros += 1
        else:
            prev_all_zeros == 0
        if prev_all_zeros > 0 and total_sum_next == 0 and len(b) >= (i+2)*col:
            if prev_all_zeros == 1:
                output += "...\n"
            continue

        output += "{:016x}:".format(i*col + addr)

        for j in range(col):
            offset = j + i*col

            if offset >= len(b):
                break
            if j % 8 == 0:
                output += " "

            total_sum += b[offset]
            if offset in highlights:
                output += style_highlight
            output += "{:02x}".format(b[offset])
            if offset in highlights:
                output += style_highlight_off
            output += " "

        output += "- "
        for j in range(col):
            offset = j + i*col
            if offset >= len(b):
                break
            c = b[offset]

            if offset in highlights:
                output += style_highlight

            if c >= 32 and c <= 126:
                output += "{}".format(chr(c))
            else:
                output += "."

            if offset in highlights:
                output += style_highlight_off
        output += "\n"
    output += "</code>"
    display(HTML(output))

def pretty_print_addr(ctx, address, size):
    if isinstance(address, reven2.arch.register.Register):
        address = ctx.read(address)
    if isinstance(address, int):
        address = LinearAddress(address)
    print_buffer(ctx.read(address, size), address.offset)

Sample output:

Pretty addresses

Exporting tabular data to work with reporting tools

Pre-requisites: install pandas

REVEN v2.9.0
Environment Jupyter

This example requires the pandas Python package:

# From a code cell of a Jupyter notebook
try:
    import pandas
    print("pandas already installed")
except ImportError:
    print("Could not find pandas, attempting to install it from pip")
    import sys

    output = !{sys.executable} -m pip install pandas; echo $?  # noqa
    success = output[-1]

    for line in output[0:-1]:
        print(line)

    if int(success) != 0:
        raise RuntimeError("Error installing pandas")
    import pandas
    print("Successfully installed pandas")

Building tabular data from filters

REVEN v2.10.0
Dependency pandas
res = []

header = ["Context range", "Process", "Location"]

processes = list(server.ossi.executed_processes("chat"))
for ctx_range in server.trace.filter(processes, reven2.filter.RingPolicy.R3Only):
    first_context = next(iter(ctx_range))
    res.append((ctx_range,
                first_context.ossi.process(),
                first_context.ossi.location()))

df = pandas.DataFrame.from_records(res, columns=header)

Displaying tabular data in a Jupyter notebook

REVEN v2.9.0
Environment Jupyter
Dependency pandas
res = []

header = ["Context range", "Process", "Location"]

processes = list(server.ossi.executed_processes("chat"))
for ctx_range in server.trace.filter(processes, reven2.filter.RingPolicy.R3Only):
    first_context = next(iter(ctx_range))
    res.append((ctx_range,
                first_context.ossi.process(),
                first_context.ossi.location()))

df = pandas.DataFrame.from_records(res, columns=header)

# display table truncated in the middle
display(df)

Sample output:

Jupyter rendered table

Exporting tabular data to csv

REVEN v2.10.0
Dependency pandas
df.to_csv("data.csv")

This can then be opened in e.g. spreadsheet software:

Spreadsheet

Managing bookmarks

Working with bookmarks starting with a specific "tag"

Let's consider only bookmarks whose description starts with "auto".

Adding

for i in range(0, 100):
    server.bookmarks.add(server.trace.transition(i), f"auto: {i}")

Listing

for bookmark in server.bookmarks.all():
    if not bookmark.description.startswith("auto"):
        continue
    print(bookmark)

Removing

server.bookmarks.remove_if(lambda bookmark: bookmark.description.startswith("auto:"))

Finding more about the semantics of an instruction

Pre-requisites: install and setup Capstone

Environment Jupyter

This example requires the capstone Python package:

# From a code cell of a Jupyter notebook
try:
    import capstone
    print("capstone already installed")
except ImportError:
    print("Could not find capstone, attempting to install it from pip")
    import sys
    import subprocess

    command = [f"{sys.executable}", "-m", "pip", "install", "capstone"]
    p = subprocess.run(command)

    if int(p.returncode) != 0:
        raise RuntimeError("Error installing capstone")

    import capstone  # noqa
    print("Successfully installed capstone")

Using Capstone

Since we need to maintain several objects that we would need to pass to the various functions, the easiest way is to use classes:

class DisassembledInstruction:
    def __init__(self, _tr : reven2.trace.Transition, _cs_insn):
        self._tr = _tr
        self._cs_insn = _cs_insn

    def _read_transition_reg(self, reg: reven2.arch.register.Register):
        """
        Read the value of a register during computations performed by the instruction.

        For PC, it is the value after the instruction.
        For other registers, it is the value before the instruction.
        """
        if reg in [reven2.arch.x64.rip, reven2.arch.x64.eip]:
            return self._tr.pc + self._cs_insn.size
        else:
            return self._tr.context_before().read(reg)

    def dereferenced_address(self, op_index: int):
        from reven2.arch.register import Register

        cs_op = self._cs_insn.operands[op_index]

        if cs_op.type != capstone.CS_OP_MEM:
            raise IndexError("The selected operand is not a memory operand")

        dereferenced_address = 0

        if cs_op.value.mem.base != 0:
            base_reg = Register.from_name(self._cs_insn.reg_name(cs_op.value.mem.base))
            dereferenced_address += self._read_transition_reg(base_reg)

        if cs_op.value.mem.index != 0:
            index_reg = Register.from_name(self._cs_insn.reg_name(cs_op.value.mem.index))
            index = self._read_transition_reg(index_reg)
            dereferenced_address += (cs_op.value.mem.scale * index)

        dereferenced_address += cs_op.value.mem.disp

        # mask instruction depending on mode
        mask = 0xFFFF_FFFF_FFFF_FFFF if self._tr.mode == reven2.trace.Mode.X86_64 else 0xFFFF_FFFF

        return dereferenced_address & mask

    @property
    def capstone_instruction(self):
        return self._cs_insn

    @property
    def transition(self):
        return self._tr


class Disassembler:
    def __init__(self):
        self._md_64 = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_64)
        self._md_64.detail = True

        self._md_32 = capstone.Cs(capstone.CS_ARCH_X86, capstone.CS_MODE_32)
        self._md_32.detail = True

    def disassemble(self, tr: reven2.trace.Trace):
        """
        Attempt to disassemble with Capstone the instruction associated to the passed transition.

        Returns None if there is no instruction to disassemble.
        """
        if tr.instruction is not None:
            instruction = tr.instruction
        elif tr.exception is not None and tr.exception.related_instruction is not None:
           instruction = tr.exception.related_instruction
        else:
            return None

        if tr.mode == reven2.trace.Mode.X86_64:
            md = self._md_64
        elif tr.mode == reven2.trace.Mode.X86_32:
            md = self._md_32
        else:
            raise ValueError("Unsupported mode '{tr.mode}'")
        cs_insn = next(md.disasm(instruction.raw, instruction.size))

        return DisassembledInstruction(tr, cs_insn)

Disassembling REVEN instructions

REVEN v2.10.0
Dependency capstone
dsm = Disassembler()
insn = dsm.disassemble(tr)
# Access the capstone instruction
insn.capstone_instruction

Depending on your use-case you may want to skip disassembling instructions related to exceptions, as these are not always (fully) executed.

Compute dereferenced address

REVEN v2.2.0
Dependency capstone
hex(insn.dereferenced_address(0))

Sample output:

'0xfffff8024cfacfb0'

Convert capstone flags to REVEN register flags

REVEN v2.2.0
Dependency capstone
test_eflags = {
    capstone.x86.X86_EFLAGS_TEST_OF: reven2.arch.x64.of,
    capstone.x86.X86_EFLAGS_TEST_SF: reven2.arch.x64.sf,
    capstone.x86.X86_EFLAGS_TEST_ZF: reven2.arch.x64.zf,
    capstone.x86.X86_EFLAGS_TEST_PF: reven2.arch.x64.pf,
    capstone.x86.X86_EFLAGS_TEST_CF: reven2.arch.x64.cf,
    capstone.x86.X86_EFLAGS_TEST_NT: reven2.arch.x64.nt,
    capstone.x86.X86_EFLAGS_TEST_DF: reven2.arch.x64.df,
    capstone.x86.X86_EFLAGS_TEST_RF: reven2.arch.x64.rf,
    capstone.x86.X86_EFLAGS_TEST_IF: reven2.arch.x64.if_,
    capstone.x86.X86_EFLAGS_TEST_TF: reven2.arch.x64.tf,
    capstone.x86.X86_EFLAGS_TEST_AF: reven2.arch.x64.af,
}

for flag, reg in test_eflags.items():
    if not insn.capstone_instruction.eflags & flag:
        # register not present, skip
        continue
    print(f"{reg} is affected by the instruction")

Accessing handles

REVEN v2.11.0
OS Windows 64-bit

Listing the file handles owned by the process

# Focus on process handles, ignore the rest
for h in ctx.handles(kernel_handles = False, special_handles = False):
    try:
        # Request FileObject handles only, otherwise raise exception
        o = h.object(win.FileObject)
    except ValueError:
        continue
    print(f"{h}: {o.filename_with_device}")

Sample output:

Handle 0x4 (object: lin:0xffffc8016fc2c760): \Device\ConDrv\Reference
Handle 0x44 (object: lin:0xffffc8016fa92810): \Device\HarddiskVolume2\reven
Handle 0x48 (object: lin:0xffffc8016fa94a70): \Device\ConDrv\Connect
Handle 0x50 (object: lin:0xffffc8016fa921d0): \Device\ConDrv\Input
Handle 0x54 (object: lin:0xffffc8016fa91eb0): \Device\ConDrv\Output
Handle 0x58 (object: lin:0xffffc8016fa91eb0): \Device\ConDrv\Output
Handle 0xa4 (object: lin:0xffffc8016f8e3b00): \Device\HarddiskVolume2\reven\output

Getting a handle by value

h = ctx.handle(0xa4)
print(h.object())

Sample output:

File object "\Device\HarddiskVolume2\reven\output" (lin:0xffffc8016f8e3b00)

Managing ranges of memory

REVEN v2.11.0

A range of memory is a very common concept. Storing, updating, merging or iterating on such ranges are operations that are likely to be necessary to users of the REVEN API.

REVEN provides a "canonical" implementation in the API that spares users from having to deal with multiple reimplementations.

Creating & updating memory ranges

import reven2.memory_range as mr

# First range
r1 = mr.MemoryRange(reven2.address.LogicalAddress(0x1000), 0x1000)
# This range will overlap with previous one
r2 = mr.MemoryRange(reven2.address.LogicalAddress(0x1500), 0x1000)
# This range does not overlap
r3 = mr.MemoryRange(reven2.address.LogicalAddress(0x5000), 0x1000)

print(r1)
# Union is non-empty
print(r1.union(r2))
# Union is empty
print(r1.union(r2).union(r3))

Sample output:

[ds:0x1000; 4096]
[ds:0x1000; 5376]
None

Translating a range of virtual addresses

Users might need to access the physical addresses of a buffer. However, a range of memory may be mapped onto non-contiguous physical pages, rendering the conversion error-prone.

The MemoryRange object offers a translation method that is guaranteed to be correct.

virtual_range = mr.MemoryRange(reven2.address.LogicalAddress(0x10000), 0x2000)
for phy in virtual_range.translate(ctx): print(phy)

Sample output:

[phy:0x59ce000; 4096]
[phy:0x764f000; 4096]
This is the only recommended path

As mentioned above, it is easy to get the translation wrong: while translating a virtual address into physical address is valid, translating a whole buffer is more complex. Please use the MemoryRange object for this purpose.

Working with mulitple ranges

The API provides object to manage mutiple ranges, depending on the situation

Set of ranges

# First range
r1 = mr.MemoryRange(reven2.address.LogicalAddress(0x1000), 0x1000)
# This range will overlap with previous one
r2 = mr.MemoryRange(reven2.address.LogicalAddress(0x1500), 0x1000)
# This range does not overlap
r3 = mr.MemoryRange(reven2.address.LogicalAddress(0x5000), 0x1000)

# Range that overlap are merged
s = mr.MemoryRangeSet([r1, r2, r3])
for r in s: print(r)

Sample output:

[ds:0x1000; 5376]
[ds:0x5000; 4096]

Map of ranges

Maps of ranges are useful to associate data with these memory ranges. For example, you might want to associate buffers with a particular named source.

mem_map = mr.MemoryRangeMap(items=((r1, "from r1"), (r2, "from r2"), (r3, "from r3")),
                            # Do not handle subtraction:
                            subtract=lambda a: None,        
                            # Concatenate strings on merge:
                            merge=lambda a, b: a + ", " + b)
for r, source in mem_map: print(r, source)

Sample output:

[ds:0x1000; 1280] from r1
[ds:0x1500; 2816] from r1, from r2
[ds:0x2000; 1280] from r2
[ds:0x5000; 4096] from r3