Search in memory

Purpose

Search the memory at a specific context for a string or for an array of bytes.

The memory range to search in is defined by a starting address and a search_size.

All unmapped addresses are ignored during the search.

How to use

usage: search_in_memory.py [-h] --host HOST -p PORT --transition TRANSITION
                           --address ADDRESS --pattern PATTERN
                           [--search-size SEARCH_SIZE] [--backward]

optional arguments:
  -h, --help            show this help message and exit
  --host HOST           Reven host, as a string (default: "localhost")
  -p PORT, --port PORT  Reven port, as an int (default: 13370)
  --transition TRANSITION
                        transition id. the context before this id will be
                        searched
  --address ADDRESS     The start address of the memory area to search in. It
                        can be a hex offset as 0xfff123 (same as ds:0xfff123),
                        hex offset prefixed by segment register as
                        gs:0xfff123, hex offset prefixed by hex segment index
                        as 0x20:0xfff123, hex offset prefixed by 'lin' for
                        linear address, or offset prefixed by 'phy' for
                        physical address.
  --pattern PATTERN     pattern that will be searched. It can be a normal
                        string as 'test', or a string of bytes as
                        '\x01\x02\x03\x04'.Maximum accepted length is 4096
  --search-size SEARCH_SIZE
                        The size of memory area to search in. accepted value
                        can take a suffix, like 1000, 10kb or 10mb.Default
                        value is 1000mb
  --backward            If present the search will go in backward direction.

Known limitations

  • Currently, this script cannot handle logical addresses that are not aligned on a memory page (4K) with their corresponding physical address. In 64 bits, this can happen mainly for the gs and fs segment registers. If you encounter this limitation, you can manually translate your virtual address using its translate method, and then restart the search on the resulting physical address (limiting the search range to 4K, so as to remain in the boundaries of the virtual page).

  • Pattern length must be less than or equal to the page size (4k).

Supported versions

REVEN 2.6+.

Supported perimeter

Any REVEN scenario.

Dependencies

None.

Source

import argparse
import sys
from copy import copy

import reven2
import reven2.address as _address
import reven2.arch.x64 as x64_regs


"""
# Search in memory

## Purpose

Search the memory at a specific context for a string or for an array of bytes.

The memory range to search in is defined by a starting address and a search_size.

All unmapped addresses are ignored during the search.

## How to use

```bash
usage: search_in_memory.py [-h] --host HOST -p PORT --transition TRANSITION
                           --address ADDRESS --pattern PATTERN
                           [--search-size SEARCH_SIZE] [--backward]

optional arguments:
  -h, --help            show this help message and exit
  --host HOST           Reven host, as a string (default: "localhost")
  -p PORT, --port PORT  Reven port, as an int (default: 13370)
  --transition TRANSITION
                        transition id. the context before this id will be
                        searched
  --address ADDRESS     The start address of the memory area to search in. It
                        can be a hex offset as 0xfff123 (same as ds:0xfff123),
                        hex offset prefixed by segment register as
                        gs:0xfff123, hex offset prefixed by hex segment index
                        as 0x20:0xfff123, hex offset prefixed by 'lin' for
                        linear address, or offset prefixed by 'phy' for
                        physical address.
  --pattern PATTERN     pattern that will be searched. It can be a normal
                        string as 'test', or a string of bytes as
                        '\x01\x02\x03\x04'.Maximum accepted length is 4096
  --search-size SEARCH_SIZE
                        The size of memory area to search in. accepted value
                        can take a suffix, like 1000, 10kb or 10mb.Default
                        value is 1000mb
  --backward            If present the search will go in backward direction.
```

## Known limitations

- Currently, this script cannot handle logical addresses that are not aligned on a memory page (4K)
with  their corresponding physical address. In 64 bits, this can happen mainly for
the `gs` and `fs` segment registers.
  If you encounter this limitation, you can manually translate your virtual address
  using its `translate` method, and then restart the search on the resulting physical address
  (limiting the search range to 4K, so as to remain in the boundaries of the virtual page).

- Pattern length must be less than or equal to the page size (4k).

## Supported versions

REVEN 2.6+.

## Supported perimeter

Any REVEN scenario.

## Dependencies

None.
"""


class MemoryFinder(object):
    r"""
    This class is a helper class to search the memory at a specific context for a string or for an array of bytes.
    The memory range to search in is defined by a starting address and a search_size.

    The matching addresses are returned.

    Known limitation
    ================

    Currently, this class cannot handle logical addresses that are not aligned on a memory page (4K)
    with  their corresponding physical address. In 64 bits, this can happen mainly for
    the `gs` and `fs` segment registers.

    If you encounter this limitation, you can manually translate your virtual address
    using its `translate` method, and then restart the search on the resulting physical address
    (limiting the search range to 4K, so as to remain in the boundaries of the virtual page).

    Pattern length must be less than or equal to page size (4k).

    Examples
    ========

    >>> # Search the first context starting from the address ds:0xfffff123123 for the string 'string'
    >>> # Search_size default value is 1000MB.
    >>> # Memory range to search in is: [ds:0xfffff123123, ds:0xfffff123123 + 1000MB]
    >>> for address, progress in MemoryFinder(context, 0xfffff123123).query('string'):
    ...     sys.stderr.write("progress: %d%s\r" % (int(progress / finder.search_size * 100), '%'))
    ...     if address:
    ...         print("found match at {}".format(address))
    found match at ds:0xfffff123444
    ...

    >>> # Search the first context starting from the address lin:0xfffff123123 for the
    >>> # array of bytes '\\x35\\xfe\\x0e\\x4a'
    >>> # Search size default value is 1000MB
    >>> # Memory range to search in is: [lin:0xfffff123123, lin:0xfffff123123 + 1000MB]
    >>> address = reven2.address.LinearAddress(0xfffff123123)
    >>> for address, progress in MemoryFinder(context, address).query('\\x35\\xfe\\x0e\\x4a'):
    ...     sys.stderr.write("progress: %d%s\r" % (int(progress / finder.search_size * 100), '%'))
    ...     if address:
    ...         print("found match at {}".format(address))
    found match at ds:0xfffff125229
    ...

    >>> # Search the first context starting from the address gs:0x180 for the string 'string'
    >>> # Search size value is 100MB
    >>> # Memory range to search in is: [gs:0x180, ds:0x180 + 100MB]
    >>> address = reven2.address.LogicalAddress(0x180, reven2.arch.x64.gs)
    >>> for address, progress in MemoryFinder(context, address, 100*1024*1024).query('string'):
    ...     sys.stderr.write("progress: %d%s\r" % (int(progress / finder.search_size * 100), '%'))
    ...     if address:
    ...         print("found match at {}".format(address))
    found match at ds:0xfffff123444
    ...

    >>> # Search the first context starting from the address ds:0xfffff123123 for the string 'string'
    >>> # in backward direction.
    >>> # Search_size default value is 1000MB.
    >>> # Memory range to search in is: [ds:0xfffff123123, ds:0xfffff123123 + 1000MB]
    >>> for address, progress in MemoryFinder(context, 0xfffff123123).query('string', False):
    ...     sys.stderr.write("progress: %d%s\r" % (int(progress / finder.search_size * 100), '%'))
    ...     if address:
    ...         print("found match at {}".format(address))
    found match at ds:0xfffff123004
    ...
    """
    page_size = 0x1000
    progress_step = 0x10000

    def __init__(self, context, address, search_size=1000 * 1024**2):
        r"""
        Initialize a C{MemoryFinder} from context and address

        Information
        ===========

        @param context: C{reven2.trace.Context} where searching will be done.
        @param address: a class from C{reven2.address} the address where the search will be started.
        @param search_size: an C{Integer} representing the size, in bytes, of the search range.


        @raises TypeError: if context is not a C{reven2.trace.Context} or address is not a C{Integer} or
                           one of the address classes on C{reven2.address}.
        @raises RunTimeError: If the address is a virtual address that is not aligned to its
                              corresponding physical address.
        """
        if not isinstance(context, reven2.trace.Context):
            raise TypeError("context must be an instance of reven2.trace.Context class")
        self._context = context

        search_addr = copy(address)
        if not isinstance(search_addr, _address._AbstractAddress):
            try:
                # if address is of type int make it a logical address with ds as segment register
                search_addr = _address.LogicalAddress(address)
            except TypeError:
                raise TypeError(
                    "address must be an instance of a class from reven2.address " "module or an integer value."
                )

        self._search_size = search_size
        self._start = search_addr

    @property
    def search_size(self):
        return self._search_size

    def query(self, pattern, is_forward=True):
        r"""
        Iterate the search range looking for the specified pattern.

        This method returns a generator of tuples, of the form C{(A, processed_bytes)}, such that:

        - C{processed_bytes} indicates the number of bytes already processed in the search range.
        - C{A} is either an address of the same type as the input address, or C{None}.
          If an address is returned, it corresponds to an address matching the searched pattern.
          C{None} is returned every 40KB of the search range, as a means of indicating progress.

        Information
        ===========

        @param pattern: A C{str} or C{bytearray}. The pattern to look for in memory.
                        Note: C{str} pattern is converted to bytearray using ascci encoding.

        @param is_forward: C{bool}, C{True} to search in forward direction and C{False}
                           to search in backward direction

        @returns: a generator of tuples, where the tuples are either:
                - C{(None, processed_bytes)} every  40KB of the search range,
                - C{(matching_address, processed_bytes)} each time a matching_address is found.
        """

        # pattern is a byte array or a string
        search_pattern = copy(pattern)
        if not isinstance(search_pattern, bytearray):
            if isinstance(search_pattern, str):
                search_pattern = bytearray(str.encode(pattern))
            else:
                raise RuntimeError("Cannot parse pattern, bad format.")

        if len(search_pattern) > self.page_size:
            raise RuntimeError("Maximum length of pattern must be less than or equal to %d." % self.page_size)

        return self._search(search_pattern, is_forward)

    def _search(self, pattern, is_forward):
        def loop_condition(curr, end):
            return curr < end if is_forward else curr > end

        cross_page_addition = len(pattern) - 1

        iteration_step = self.page_size if is_forward else -self.page_size
        curr = self._start
        end = curr + self._search_size if is_forward else curr - self._search_size
        prev = None
        progress = 0

        # first loop detects the first mapped address, then test if it aligned
        # this step is only applied for logical address
        if not isinstance(curr, reven2.address.PhysicalAddress):
            while loop_condition(curr, end):
                phy = curr.translate(self._context)
                if phy is None:
                    curr += iteration_step
                    progress += self.page_size
                    if progress % self.progress_step == 0:
                        yield None, progress
                    continue
                # linear -> physical alignment is guaranteed on 4k boundary:
                # If linear is 0xxxxx123, physical will be 0xyyyy123
                # logical -> linear alignment is not guaranteed because segment offset goes down to the byte
                # (or at least down to less than 4k): logical gs:0x123 could be linear 0xzzzzz456
                # Problem is: gs:0x0 might not be at start of page, 0x0:0x1000 might span on two pages
                # instead of one. To solve: we need to translate logical -> physical for start address,
                # and take note of offset to use that to compute actual start of page
                # currently, we don't treat the case where logical -> linear alignment isn't valid.
                if curr.offset % self.page_size != phy.offset % self.page_size:
                    raise RuntimeError(
                        "The provided address is not aligned on a memory page (4K)"
                        "with  their corresponding physical address. Only aligned "
                        "addresses can be handled."
                    )
                break

        # second loop starts the search
        while loop_condition(curr, end):
            # get offset between current address and the start of the page
            # This offset is zero except in the first iteration may be different to zero
            offset = curr.offset % self.page_size
            # compute the length of the buffer to read.
            # This buffer length equals the page size except in the first iteration may be different
            buffer_length = self.page_size if offset == 0 else (self.page_size - offset if is_forward else offset)
            # the iteration step to go forward or backward
            iteration_step = buffer_length if is_forward else -buffer_length
            # compute the address to read it.
            # in forward this address is the current address,
            # in backward we have to read until the current address so it is current - buffer length
            read_address = curr if is_forward else curr - buffer_length
            # if the read buffer will exceed the search range adjust it
            if is_forward and read_address + buffer_length > end:
                buffer_length = end.offset - read_address.offset
            elif not is_forward and read_address < end:
                read_address = end
                buffer_length = curr.offset - read_address.offset

            try:
                buffer = self._context.read(read_address, buffer_length, raw=True)
            except Exception:
                curr += iteration_step
                progress += self.page_size
                prev = None
                if progress % self.progress_step == 0:
                    yield None, progress
                continue

            # Add necessary bytes from previous page to allow cross-page matches
            addr_offset = 0
            if prev is not None:
                if is_forward:
                    prev_buf_len = -len(prev) if cross_page_addition > len(prev) else -cross_page_addition
                    buffer = prev[prev_buf_len:] + buffer if prev_buf_len < 0 else buffer
                    addr_offset = prev_buf_len
                else:
                    prev_buf_len = len(prev) if cross_page_addition > len(prev) else cross_page_addition
                    buffer = buffer + prev[:prev_buf_len]

            index = 0
            addr_res = []
            while True:
                index = buffer.find(pattern, index)
                if index == -1:
                    break
                addr_res.append(read_address + index + addr_offset)
                index += 1

            for addr in addr_res if is_forward else reversed(addr_res):
                yield addr, progress

            progress += self.page_size
            prev = buffer
            curr += iteration_step


def parse_address(string_address):
    segments = [x64_regs.ds, x64_regs.cs, x64_regs.es, x64_regs.ss, x64_regs.gs, x64_regs.fs]

    def _str_to_seg(str_reg):
        for segment in segments:
            if str_reg == segment.name:
                return segment
        return None

    try:
        # Try to parse address as offset only as 0xfff123.
        return _address.LogicalAddress(int(string_address, base=16))
    except ValueError:
        pass
    # Try to parse address as prefex:offset as 0x32:0xfff123, gs:0xfff123, lin:0xfff123 or phy:0xff123.
    res = string_address.split(":")
    if len(res) != 2:
        raise RuntimeError("Cannot parse address, bad format")

    try:
        offset = int(res[1].strip(), base=16)
    except ValueError:
        raise RuntimeError("Cannot parse address, bad format")

    try:
        # Try to parse it as 0x32:0xfff123.
        segment_index = int(res[0].strip(), base=16)
        return _address.LogicalAddressSegmentIndex(segment_index, offset)
    except ValueError:
        pass

    lower_res0 = res[0].lower().strip()
    # Try parse it as ds:0xfff123, cs::0xfff123, es::0xfff123, ss::0xfff123, gs::0xfff123 or fs::0xfff123.
    sreg = _str_to_seg(lower_res0)
    if sreg:
        return _address.LogicalAddress(offset, sreg)
    elif lower_res0 == "lin":
        # Try parse it as lin:0xfff123.
        return _address.LinearAddress(offset)
    elif lower_res0 == "phy":
        # Try parse it as phy:0xfff123.
        return _address.PhysicalAddress(offset)
    else:
        raise RuntimeError("Cannot parse address, bad format")


def parse_search_size(string_size):
    try:
        # try to convert it to int
        return int(string_size)
    except ValueError:
        pass

    # try to convert it to int without the two last char
    lower_string = string_size.lower()
    ssize = lower_string[:-2]
    try:
        size = int(ssize)
    except ValueError:
        raise RuntimeError("Cannot parse search size, bad format")
    # convert it according to its suffix
    if lower_string.endswith("kb"):
        return size * 1024
    elif lower_string.endswith("mb"):
        return size * 1024 * 1024
    else:
        raise RuntimeError("Cannot parse search size, bad format")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--host", type=str, default="localhost", required=True, help='Reven host, as a string (default: "localhost")'
    )
    parser.add_argument(
        "-p", "--port", type=int, default="13370", required=True, help="Reven port, as an int (default: 13370)"
    )
    parser.add_argument(
        "--transition", type=int, required=True, help="transition id. the context before this id will be searched"
    )
    parser.add_argument(
        "--address",
        type=str,
        required=True,
        help="The start address of the memory area to search in. "
        "It can be a hex offset as 0xfff123 (same as ds:0xfff123), "
        "hex offset prefixed by segment register as gs:0xfff123, "
        "hex offset prefixed by hex segment index as 0x20:0xfff123, "
        "hex offset prefixed by 'lin' for linear address, "
        "or offset prefixed by 'phy' for physical address.",
    )
    parser.add_argument(
        "--pattern",
        type=str,
        required=True,
        help="pattern that will be searched. "
        "It can be a normal string as 'test', "
        "or a string of bytes as '\\x01\\x02\\x03\\x04'."
        "Maximum accepted length is 4096",
    )
    parser.add_argument(
        "--search-size",
        type=str,
        default="1000mb",
        help="The size of memory area to search in. "
        "accepted value can take a suffix, like 1000, 10kb or 10mb."
        "Default value is 1000mb",
    )
    parser.add_argument(
        "--backward", default=False, action="store_true", help="If present the search will go in backward direction."
    )

    args = parser.parse_args()

    try:
        pattern = bytearray(map(ord, bytearray(map(ord, args.pattern.strip())).decode("unicode_escape")))
    except Exception as e:
        raise RuntimeError("Cannot parse pattern, bad format(%s)" % str(e))

    address = parse_address(args.address.strip())

    reven_server = reven2.RevenServer(args.host, args.port)
    context = reven_server.trace.context_before(args.transition)

    finder = MemoryFinder(context, address, parse_search_size(args.search_size.strip()))

    for address, progress in finder.query(pattern, not args.backward):
        sys.stderr.write("progress: %d%s\r" % (int(progress / finder.search_size * 100), "%"))
        if address:
            print("found match at {}".format(address))