This module introduces two functions. The first one is used to get the memory address and the size of the buffer used to receive a network packet. While the second one is used to return a list of memory addresses and sizes of buffers used to send a network packet.
Source
"""
This module introduces two functions. The first one is used to get the memory address and the size of the buffer
used to receive a network packet. While the second one is used to return a list of memory addresses and
sizes of buffers used to send a network packet.
"""
from typing import Iterator as _Iterator, List as _List, Optional as _Optional, Tuple as _Tuple, cast as _cast
import reven2
def get_memory_address_and_size_of_received_network_packet(
ctx: reven2.trace.Context,
) -> _Optional[reven2.MemoryRange[reven2.address._AbstractAddress]]:
"""
This function returns a pair of memory address, size of the received packet
'ctx' must be a context resulting from searching "RxPacketAssemble" symbol in the trace
Information
===========
@param ctx: the context used to retrieve the list of memory address and size of sent packet buffer
"""
# To get the memory address of received packets, we need to dereference multiple times some pointers in memory.
# The first one is rcx, as argument. It points to a huge structure.
# We don't know its type so we can't use the type API.
# at rcx+0x308 is a pointer to a structure, which contains the size at +8
# at rcx+0x328 is a pointer index that points the right structure to get for the buffer
# at rcx+0x328+8 * index is a pointer to the network buffer.
# at rcx+0x308, then deref +0xc is a byte that is tested at 1 and 2. If 0, then the call to RxPacketAssemble is
# the last one of a serie and doesn't contain any buffer to fetch.
# Get a pointer to the huge structure
pHugeStruct = reven2.address.LogicalAddress(ctx.read(reven2.arch.x64.rcx, reven2.types.USize))
# Deref to get a pointer to the structure that contains the size
pSizeStruct: reven2.address.LogicalAddress = reven2.address.LogicalAddress(
ctx.read(pHugeStruct + 0x308, reven2.types.USize)
)
u8Flag: int = ctx.read(pSizeStruct + 0xC, reven2.types.U8)
# Is last packet part?
if (u8Flag & 0x3) == 0:
return None
u32Size: int = ctx.read(pSizeStruct + 0x8, reven2.types.U32)
# Next get the index in the structure
pu32IndexRaw = reven2.address.LogicalAddress(ctx.read(pHugeStruct + 0x328, reven2.types.USize))
# The index is a dword (eax is used)
u32IndexRaw: int = ctx.read(pu32IndexRaw, reven2.types.U32)
# Now, the system perform an operation on this index.
u32Index = (u32IndexRaw + u32IndexRaw * 2) * 2
# Now get a pointer to the buffer
pArray = reven2.address.LogicalAddress(ctx.read(pHugeStruct + 0x328, reven2.types.USize) + 8)
pBuffer: reven2.address.LogicalAddress = reven2.address.LogicalAddress(
ctx.read(pArray + 8 * u32Index + 0x20, reven2.types.USize)
)
return reven2.MemoryRange(pBuffer, u32Size)
def get_memory_addresses_and_sizes_of_sent_network_packet(
reven_server: reven2.RevenServer, ctx: reven2.trace.Context
) -> _List[reven2.MemoryRange[reven2.address._AbstractAddress]]:
"""
This Function returns a list of memory address, size of a sent packet
'ctx' must be a context resulting from searching "E1000SendNetBufferLists" symbol in the trace
Information
===========
@param ctx: the context used to retrieve the memory address and size of received packet buffer
"""
ndis = next(reven_server.ossi.executed_binaries("ndis.sys"))
net_buffer_list_type = _cast(reven2.types.Struct, ndis.exact_type("_NET_BUFFER_LIST"))
net_buffer_list: reven2.types.StructInstance = ctx.deref(
reven2.arch.x64.rdx, reven2.types.Pointer(net_buffer_list_type)
)
net_buffer = net_buffer_list.field("FirstNetBuffer").deref_struct()
mdl = net_buffer.field("CurrentMdl").deref_struct()
mdlOffset = net_buffer.field("CurrentMdlOffset").read_int()
packet_memory_addresses = []
packet_memory_addresses.append(_get_network_packet_address_from_mdl(ctx, mdl, mdlOffset=mdlOffset))
pNextMdl = mdl.field("Next").read_ptr().assert_struct()
while pNextMdl.address.offset != 0:
nextMdl = pNextMdl.deref()
packet_memory_addresses.append(_get_network_packet_address_from_mdl(ctx, nextMdl))
pNextMdl = nextMdl.field("Next").read_ptr().assert_struct()
return packet_memory_addresses
def get_all_send_recv_packet_context(
reven_server: reven2.RevenServer,
) -> _Tuple[_List[_Iterator[reven2.trace.Context]], _List[_Iterator[reven2.trace.Context]]]:
"""
This function return a list of all contexts used to send or receive network packets.
To get these contexts, this function searches the symbol `E1000SendNetBufferLists` to get contexts of
sent network packets. and searches the symbol `RxPacketAssemble` to get contexts of received network packets.
This function requires that the trace has the PDB of `e1g6032e.sys` binary otherwise no context will be found.
'reven_server' is the L{reven2.RevenServer} instance on which to perform the search
Information
===========
@param reven_server: L{reven2.RevenServer} instance on which to search packets
"""
# Get generators of search results
send_queries = [
reven_server.trace.search.symbol(symbol)
for symbol in reven_server.ossi.symbols(pattern="E1000SendNetBufferLists", binary_hint="e1g6032e.sys")
]
recv_queries = [
reven_server.trace.search.symbol(symbol)
for symbol in reven_server.ossi.symbols(pattern="RxPacketAssemble", binary_hint="e1g6032e.sys")
]
if len(send_queries) == 0 and len(recv_queries) == 0:
print(
"No network packets exist in this trace, make sure that this trace is a network trace,"
" and if it is, make sure that the PDB of `e1g6032e.sys` binary is available in the scenario"
)
return send_queries, recv_queries
def _get_network_packet_address_from_mdl(
ctx: reven2.trace.Context, mdl: reven2.types.StructInstance, mdlOffset: int = 0
) -> reven2.MemoryRange[reven2.address._AbstractAddress]:
pBufferStartVa = mdl.field("MappedSystemVa").read_ptr()
u32Size = mdl.field("ByteCount").read_int()
return reven2.MemoryRange(pBufferStartVa.address + mdlOffset, u32Size - mdlOffset)