This module introduces two functions. The first one is used to get the memory address and the size of the buffer used to receive a network packet. While the second one is used to return a list of memory addresses and sizes of buffers used to send a network packet.

Source

"""
This module introduces two functions. The first one is used to get the memory address and the size of the buffer
used to receive a network packet. While the second one is used to return a list of memory addresses and
sizes of buffers used to send a network packet.

"""

from typing import Iterator as _Iterator, List as _List, Optional as _Optional, Tuple as _Tuple, cast as _cast

import reven2


def get_memory_address_and_size_of_received_network_packet(
    ctx: reven2.trace.Context,
) -> _Optional[reven2.MemoryRange[reven2.address._AbstractAddress]]:
    """
    This function returns a pair of memory address, size of the received packet
    'ctx' must be a context resulting from searching "RxPacketAssemble" symbol in the trace

    Information
    ===========

    @param ctx: the context used to retrieve the list of memory address and size of sent packet buffer
    """

    # To get the memory address of received packets, we need to dereference multiple times some pointers in memory.
    # The first one is rcx, as argument. It points to a huge structure.
    # We don't know its type so we can't use the type API.
    # at rcx+0x308 is a pointer to a structure, which contains the size at +8
    # at rcx+0x328 is a pointer index that points the right structure to get for the buffer
    # at rcx+0x328+8 * index is a pointer to the network buffer.

    # at rcx+0x308, then deref +0xc is a byte that is tested at 1 and 2. If 0, then the call to RxPacketAssemble is
    # the last one of a serie and doesn't contain any buffer to fetch.

    # Get a pointer to the huge structure
    pHugeStruct = reven2.address.LogicalAddress(ctx.read(reven2.arch.x64.rcx, reven2.types.USize))

    # Deref to get a pointer to the structure that contains the size
    pSizeStruct: reven2.address.LogicalAddress = reven2.address.LogicalAddress(
        ctx.read(pHugeStruct + 0x308, reven2.types.USize)
    )

    u8Flag: int = ctx.read(pSizeStruct + 0xC, reven2.types.U8)

    # Is last packet part?
    if (u8Flag & 0x3) == 0:
        return None

    u32Size: int = ctx.read(pSizeStruct + 0x8, reven2.types.U32)

    # Next get the index in the structure
    pu32IndexRaw = reven2.address.LogicalAddress(ctx.read(pHugeStruct + 0x328, reven2.types.USize))

    # The index is a dword (eax is used)
    u32IndexRaw: int = ctx.read(pu32IndexRaw, reven2.types.U32)

    # Now, the system perform an operation on this index.
    u32Index = (u32IndexRaw + u32IndexRaw * 2) * 2

    # Now get a pointer to the buffer
    pArray = reven2.address.LogicalAddress(ctx.read(pHugeStruct + 0x328, reven2.types.USize) + 8)
    pBuffer: reven2.address.LogicalAddress = reven2.address.LogicalAddress(
        ctx.read(pArray + 8 * u32Index + 0x20, reven2.types.USize)
    )

    return reven2.MemoryRange(pBuffer, u32Size)


def get_memory_addresses_and_sizes_of_sent_network_packet(
    reven_server: reven2.RevenServer, ctx: reven2.trace.Context
) -> _List[reven2.MemoryRange[reven2.address._AbstractAddress]]:
    """
    This Function returns a list of memory address, size of a sent packet

    'ctx' must be a context resulting from searching "E1000SendNetBufferLists" symbol in the trace

    Information
    ===========

    @param ctx: the context used to retrieve the memory address and size of received packet buffer
    """
    ndis = next(reven_server.ossi.executed_binaries("ndis.sys"))
    net_buffer_list_type = _cast(reven2.types.Struct, ndis.exact_type("_NET_BUFFER_LIST"))
    net_buffer_list: reven2.types.StructInstance = ctx.deref(
        reven2.arch.x64.rdx, reven2.types.Pointer(net_buffer_list_type)
    )
    net_buffer = net_buffer_list.field("FirstNetBuffer").deref_struct()

    mdl = net_buffer.field("CurrentMdl").deref_struct()
    mdlOffset = net_buffer.field("CurrentMdlOffset").read_int()

    packet_memory_addresses = []
    packet_memory_addresses.append(_get_network_packet_address_from_mdl(ctx, mdl, mdlOffset=mdlOffset))

    pNextMdl = mdl.field("Next").read_ptr().assert_struct()
    while pNextMdl.address.offset != 0:
        nextMdl = pNextMdl.deref()
        packet_memory_addresses.append(_get_network_packet_address_from_mdl(ctx, nextMdl))

        pNextMdl = nextMdl.field("Next").read_ptr().assert_struct()

    return packet_memory_addresses


def get_all_send_recv_packet_context(
    reven_server: reven2.RevenServer,
) -> _Tuple[_List[_Iterator[reven2.trace.Context]], _List[_Iterator[reven2.trace.Context]]]:
    """
    This function return a list of all contexts used to send or receive network packets.

    To get these contexts, this function searches the symbol `E1000SendNetBufferLists` to get contexts of
    sent network packets. and searches the symbol `RxPacketAssemble` to get contexts of received network packets.

    This function requires that the trace has the PDB of `e1g6032e.sys` binary otherwise no context will be found.

    'reven_server' is the L{reven2.RevenServer} instance on which to perform the search

    Information
    ===========

    @param reven_server: L{reven2.RevenServer} instance on which to search packets
    """
    # Get generators of search results
    send_queries = [
        reven_server.trace.search.symbol(symbol)
        for symbol in reven_server.ossi.symbols(pattern="E1000SendNetBufferLists", binary_hint="e1g6032e.sys")
    ]
    recv_queries = [
        reven_server.trace.search.symbol(symbol)
        for symbol in reven_server.ossi.symbols(pattern="RxPacketAssemble", binary_hint="e1g6032e.sys")
    ]

    if len(send_queries) == 0 and len(recv_queries) == 0:
        print(
            "No network packets exist in this trace, make sure that this trace is a network trace,"
            " and if it is, make sure that the PDB of `e1g6032e.sys` binary is available in the scenario"
        )

    return send_queries, recv_queries


def _get_network_packet_address_from_mdl(
    ctx: reven2.trace.Context, mdl: reven2.types.StructInstance, mdlOffset: int = 0
) -> reven2.MemoryRange[reven2.address._AbstractAddress]:
    pBufferStartVa = mdl.field("MappedSystemVa").read_ptr()
    u32Size = mdl.field("ByteCount").read_int()

    return reven2.MemoryRange(pBufferStartVa.address + mdlOffset, u32Size - mdlOffset)