Detect data race

This notebook checks for possible data race that may occur when using critical sections as the synchronization primitive.

Prerequisites

  • Supported versions:
  • REVEN 2.9+
  • This notebook should be run in a jupyter notebook server equipped with a REVEN 2 Python kernel. REVEN comes with a Jupyter notebook server accessible with the Open Python button in the Analyze page of any scenario.
  • The following resources are needed for the analyzed scenario:
  • Trace
  • OSSI
  • Memory History
  • Backtrace
  • Fast Search

Perimeter:

  • Windows 10 64-bit

Limits

  • Only support RtlEnterCriticalSection and RtlLeaveCriticalSection as the lock (resp. unlock) primitives

Running

Fill out the parameters in the Parameters cell below, then run all the cells of this notebook.

Source

# ---
# jupyter:
#   jupytext:
#     formats: ipynb,py:percent
#     text_representation:
#       extension: .py
#       format_name: percent
#       format_version: '1.3'
#       jupytext_version: 1.11.2
#   kernelspec:
#     display_name: reven
#     language: python
#     name: reven-python3
# ---

# %% [markdown]
# # Detect data race
# This notebook checks for possible data race that may occur when using critical sections as the synchronization
# primitive.
#
# ## Prerequisites
#  - Supported versions:
#     - REVEN 2.9+
#  - This notebook should be run in a jupyter notebook server equipped with a REVEN 2 Python kernel.
# REVEN comes with a Jupyter notebook server accessible with the `Open Python` button in the `Analyze`
# page of any scenario.
#  - The following resources are needed for the analyzed scenario:
#     - Trace
#     - OSSI
#     - Memory History
#     - Backtrace
#     - Fast Search
#
# ## Perimeter:
#  - Windows 10 64-bit
#
# ## Limits
#  - Only support `RtlEnterCriticalSection` and `RtlLeaveCriticalSection` as the lock (resp. unlock) primitives
#
# ## Running
# Fill out the parameters in the [Parameters cell](#Parameters) below, then run all the cells of this notebook.

# %%
# For Python's type annotation
from typing import Dict, Iterator, List, Optional, Set, Tuple

# Reven specific
import reven2
from reven2.address import LogicalAddress
from reven2.arch import x64
from reven2.memhist import MemoryAccessOperation
from reven2.trace import Context, Trace, Transition
from reven2.util import collate

# %% [markdown]
# # Parameters

# %%
# Host and port of the running the scenario
host = '127.0.0.1'
port = 42897

# The PID and (or) the name of the binary of interest: if the binary name is given, only locks and unlocks called
# directly from the binary are counted.
pid = 2820
binary = None

begin_trans_id = None
end_trans_id = None


# %%
# Helper class which wraps Reven's runtime objects and give methods helping get information about calls to
# RtlEnterCriticalSection and RtlLeaveCriticalSection
class RuntimeHelper:
    def __init__(self, host: str, port: int):
        try:
            server = reven2.RevenServer(host, port)
        except RuntimeError:
            raise RuntimeError(f'Cannot connect to the scenario at {host}:{port}')

        self.trace = server.trace
        self.ossi = server.ossi
        self.search_symbol = self.trace.search.symbol
        self.search_binary = self.trace.search.binary

        try:
            ntoskrnl = next(self.ossi.executed_binaries('^c:/windows/system32/ntoskrnl.exe$'))
        except StopIteration:
            raise RuntimeError('ntoskrnl.exe not found')

        try:
            self.swap_context = next(ntoskrnl.symbols('^SwapContext$'))
        except StopIteration:
            raise RuntimeError('SwapContext symbol not found')

        try:
            ntdll = next(self.ossi.executed_binaries('^c:/windows/system32/ntdll.dll$'))
        except StopIteration:
            raise RuntimeError('ntdll.dll not found')

        try:
            self.rtl_enter_critical_section = next(ntdll.symbols("^RtlEnterCriticalSection$"))
            self.rtl_leave_critical_section = next(ntdll.symbols("^RtlLeaveCriticalSection$"))
        except StopIteration:
            raise RuntimeError('Rtl(Enter|Leave)CriticalSection symbol not found')

        try:
            self.ldrp_initialize_thread = next(ntdll.symbols("^LdrpInitializeThread$"))
        except StopIteration:
            self.ldrp_initialize_thread = None

        try:
            self.ldr_shutdown_thread = next(ntdll.symbols("^LdrShutdownThread$"))
        except StopIteration:
            self.ldr_shutdown_thread = None

    def get_swap_contexts(self, from_context: Context, to_context: Context) -> Iterator[Context]:
        if from_context >= to_context:
            return

        # Workaround for a current bug in searching symbol where to_context is the last context
        if to_context == self.trace.last_context:
            to_context = None

        return self.search_symbol(self.swap_context, from_context, to_context)

    def get_critical_section_locks(self, from_context: Context, to_context: Context) -> Iterator[Context]:
        # For ease, if the from_context is the first context of the trace and also the beginning of a lock,
        # then omit it since the caller is unknown
        if from_context == self.trace.first_context:
            from_context = from_context + 1

        if from_context >= to_context:
            return

        # Workaround for a current bug in searching symbol where to_context is the last context
        if to_context == self.trace.last_context:
            to_context = None

        for ctxt in self.search_symbol(self.rtl_enter_critical_section, from_context, to_context):
            # Any context correspond to the entry of RtlEnterCriticalSection, since the first context
            # does not count, decrease by 1 is safe.
            yield ctxt - 1

    def get_critical_section_unlocks(self, from_context: Context, to_context: Context) -> Iterator[Context]:
        # For ease, if the from_context is the first context of the trace and also the beginning of an unlock,
        # then omit it since the caller is unknown
        if from_context == self.trace.first_context:
            from_context = from_context + 1

        if from_context >= to_context:
            return

        # Workaround for a current bug in searching symbol where to_context is the last context
        if to_context == self.trace.last_context:
            to_context = None

        for ctxt in self.search_symbol(self.rtl_leave_critical_section, from_context, to_context):
            # Any context correspond to the entry of RtlLeaveCriticalSection, since the first context
            # does not count, decrease by 1 is safe.
            yield ctxt - 1

    def get_memory_accesses(self, from_context: Context, to_context: Context) \
            -> Iterator[Tuple[int, Transition, MemoryAccessOperation]]:
        try:
            from_trans = from_context.transition_after()
            to_trans = to_context.transition_before()
        except IndexError:
            return

        mem_accs = self.trace.memory_accesses(from_transition=from_trans, to_transition=to_trans)
        mem_read_addrs = set()
        mem_write_addrs = set()
        for ma in mem_accs:
            if ma.virtual_address is None:
                continue

            if ma.transition.instruction is None:
                continue

            if ma.operation == MemoryAccessOperation.Read:
                for i in range(ma.size):
                    mem_read_addr = ma.virtual_address.offset + i
                    if mem_read_addr in mem_read_addrs:
                        continue

                    mem_read_addrs.add(mem_read_addr)
                    yield (mem_read_addr, ma.transition, MemoryAccessOperation.Read)
            else:
                # MemoryAccessOperation.Write
                for i in range(ma.size):
                    mem_write_addr = ma.virtual_address.offset + i
                    if mem_write_addr in mem_write_addrs:
                        continue

                    mem_write_addrs.add(mem_write_addr)
                    yield (mem_write_addr, ma.transition, MemoryAccessOperation.Write)

    def get_free_memory_accesses(self, from_context: Context, to_context: Context) \
            -> Iterator[Tuple[int, Transition, MemoryAccessOperation]]:

        mem_accesses = self.get_memory_accesses(from_context, to_context)
        for (addr, trans, rw) in mem_accesses:
            ins_bytes = trans.instruction.raw
            # Skip the access of `lock` prefixed instruction
            if ins_bytes[0] == 0xf0:
                continue
            yield (addr, trans, rw)

    def get_critical_section_handle(ctxt: Context) -> int:
        return ctxt.read(x64.rcx)

    def thread_id(ctxt: Context) -> int:
        return ctxt.read(LogicalAddress(0x48, x64.gs), 4)

    def is_kernel_mode(ctxt: Context) -> bool:
        return ctxt.read(x64.cs) & 0x3 == 0


# %%
# Look for a possible first execution context of a binary
# Find the lower/upper bound of contexts on which the deadlock detection processes
def find_begin_end_context(sco: RuntimeHelper, pid: int, binary: Optional[str],
                           begin_id: Optional[int], end_id: Optional[int]) -> Tuple[Context, Context]:
    begin_ctxt = None
    if begin_id is not None:
        try:
            begin_trans = sco.trace.transition(begin_id)
            begin_ctxt = begin_trans.context_after()
        except IndexError:
            begin_ctxt = None

    if begin_ctxt is None:
        if binary is not None:
            for name in sco.ossi.executed_binaries(binary):
                for ctxt in sco.search_binary(name):
                    if ctxt.ossi.process().pid == pid:
                        begin_ctxt = ctxt
                        break
                if begin_ctxt is not None:
                    break

    if begin_ctxt is None:
        begin_ctxt = sco.trace.first_context

    end_ctxt = None
    if end_id is not None:
        try:
            end_trans = sco.trace.transition(end_id)
            end_ctxt = end_trans.context_before()
        except IndexError:
            end_ctxt = None

    if end_ctxt is None:
        end_ctxt = sco.trace.last_context

    if (end_ctxt <= begin_ctxt):
        raise RuntimeError("The begin transition must be smaller than the end.")

    return (begin_ctxt, end_ctxt)


# Get all execution contexts of a given process
def find_process_ranges(sco: RuntimeHelper, pid: int, first_context: Context, last_context: Context) \
        -> Iterator[Tuple[Context, Context]]:
    if last_context <= first_context:
        return

    ctxt_low = first_context
    for ctxt in sco.get_swap_contexts(first_context, last_context):
        if ctxt_low.ossi.process().pid == pid:
            yield (ctxt_low, ctxt)
        ctxt_low = ctxt

    if ctxt_low < last_context:
        if ctxt_low.ossi.process().pid == pid:
            yield (ctxt_low, last_context)


# Start from a transition, return the first transition that is not a non-instruction, or None if there isn't one.
def ignore_non_instructions(trans: Transition, trace: Trace) -> Optional[Transition]:
    while trans.instruction is None:
        if trans == trace.last_transition:
            return None
        trans = trans + 1

    return trans


# Extract user mode only context ranges from a context range (which may include also kernel mode ranges)
def find_usermode_ranges(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context) \
        -> Iterator[Tuple[Context, Context]]:
    trans = ignore_non_instructions(ctxt_low.transition_after(), sco.trace)
    if trans is None:
        return
    ctxt_current = trans.context_before()

    while ctxt_current < ctxt_high:
        ctxt_next = ctxt_current.find_register_change(x64.cs, is_forward=True)

        if not RuntimeHelper.is_kernel_mode(ctxt_current):
            if ctxt_next is None or ctxt_next > ctxt_high:
                yield (ctxt_current, ctxt_high)
                break
            else:
                # It's safe to decrease ctxt_next by 1 because it was obtained from a forward find_register_change
                yield (ctxt_current, ctxt_next - 1)

        if ctxt_next is None:
            break

        ctxt_current = ctxt_next


# Get user mode only execution contexts of a given process
def find_process_usermode_ranges(trace: RuntimeHelper, pid: int, first_ctxt: Context, last_ctxt: Context) \
        -> Iterator[Tuple[Context, Context]]:
    for (ctxt_low, ctxt_high) in find_process_ranges(trace, pid, first_ctxt, last_ctxt):
        usermode_ranges = find_usermode_ranges(trace, ctxt_low, ctxt_high)
        for usermode_range in usermode_ranges:
            yield usermode_range


# Check if the context is from an transition starting from the binary in interest
def context_is_in_binary(ctxt: Optional[Context], binary: Optional[str]) -> bool:
    if ctxt is None:
        return False

    if binary is None:
        return True

    ctxt_loc = ctxt.ossi.location()
    if ctxt_loc is None:
        return False

    ctxt_binary = ctxt_loc.binary
    if ctxt_binary is not None:
        return (binary in [ctxt_binary.name, ctxt_binary.filename, ctxt_binary.path])

    return False


# Get lock (i.e. RtlEnterCriticalSection) call contexts
def get_locks(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context, binary: Optional[str]) \
        -> Iterator[Context]:
    for ctxt in sco.get_critical_section_locks(ctxt_low, ctxt_high):
        if context_is_in_binary(ctxt, binary):
            yield ctxt


# Get unlock (i.e. RtlLeaveCriticalSection) call contexts
def get_unlocks(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context, binary: Optional[str]) \
        -> Iterator[Context]:
    for ctxt in sco.get_critical_section_unlocks(ctxt_low, ctxt_high):
        if context_is_in_binary(ctxt, binary):
            yield ctxt


# Sort lock and unlock contexts (called in a range of contexts) in a correct order.
# Return a generator of pairs (bool, Context): True for lock, False for unlock.
def get_locks_unlocks(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context, binary: Optional[str]) \
        -> Iterator[Tuple[bool, Context]]:
    def generate_locks():
        for ctxt in get_locks(sco, ctxt_low, ctxt_high, binary):
            yield (True, ctxt)

    def generate_unlocks():
        for ctxt in get_unlocks(sco, ctxt_low, ctxt_high, binary):
            yield (False, ctxt)

    return collate([generate_locks(), generate_unlocks()], key=lambda bool_context: bool_context[1])


# Generate all locks and unlocks of context ranges of a thread
def get_thread_usermode_locks_unlocks(sco: RuntimeHelper, ranges: List[Tuple[Context, Context]],
                                      binary: Optional[str]) -> Iterator[Tuple[bool, Context]]:
    for (ctxt_low, ctxt_high) in ranges:
        for lock_unlock in get_locks_unlocks(sco, ctxt_low, ctxt_high, binary):
            yield lock_unlock


# Get locks which are still alive until a transition
def get_transition_live_locks(sco: RuntimeHelper,
                              locks_unlocks: List[Tuple[bool, Context]], trans: Transition) -> List[Context]:
    protection_locks: List[Context] = []
    trans_ctxt = trans.context_before()

    for (is_lock, ctxt) in locks_unlocks:
        if ctxt > trans_ctxt:
            return protection_locks

        if is_lock:
            protection_locks.append(ctxt)
            continue

        unlock_handle = RuntimeHelper.get_critical_section_handle(ctxt)
        # look for the lastest corresponding lock
        for (idx, lock_ctxt) in reversed(list(enumerate(protection_locks))):
            lock_handle = RuntimeHelper.get_critical_section_handle(lock_ctxt)
            if lock_handle == unlock_handle:
                del protection_locks[idx]
                break

    return protection_locks


# %%
# Get the common memory accesses of two threads (of the same process): an access of multiple bytes is split into
# multiple accesses of a single byte.
# Return a generator of (address, (transition, read|write), (transition, read|write)).
def get_memory_accesses_intersection(first_mem_accesses: List[Tuple[int, Transition, MemoryAccessOperation]],
                                     second_mem_accesses: List[Tuple[int, Transition, MemoryAccessOperation]]) \
        -> Iterator[Tuple[int, Tuple[Transition, MemoryAccessOperation], Tuple[Transition, MemoryAccessOperation]]]:

    def split_memory_accesses(mem_accesses):
        mem_reads = []
        mem_writes = []
        for (addr, trans, rw) in mem_accesses:
            if rw == MemoryAccessOperation.Read:
                mem_reads.append((addr, trans))
            else:
                mem_writes.append((addr, trans))
        return (mem_reads, mem_writes)

    (first_mem_read_accesses, first_mem_write_accesses) = split_memory_accesses(first_mem_accesses)
    (second_mem_read_accesses, second_mem_write_accesses) = split_memory_accesses(second_mem_accesses)

    for (first_addr, first_trans) in first_mem_read_accesses:
        for (second_addr, second_trans) in second_mem_write_accesses:
            if first_addr == second_addr:
                yield (first_addr,
                       (first_trans, MemoryAccessOperation.Read), (second_trans, MemoryAccessOperation.Write))

    for (first_addr, first_trans) in first_mem_write_accesses:
        for (second_addr, second_trans) in second_mem_read_accesses:
            if first_addr == second_addr:
                yield (first_addr,
                       (first_trans, MemoryAccessOperation.Write), (second_trans, MemoryAccessOperation.Read))


# Restore (i.e unsplit) memory accesses by regrouping pairs having the same transitions.
# Return a generator of (address, size, (transition, read|write)*)
def get_grouped_by_transition_memory_accesses_intersection(
    first_mem_accesses: List[Tuple[int, Transition, MemoryAccessOperation]],
    second_mem_accesses: List[Tuple[int, Transition, MemoryAccessOperation]]) \
        -> Iterator[Tuple[int, int,
                          Tuple[Transition, MemoryAccessOperation], Tuple[Transition, MemoryAccessOperation]]]:

    intersected_mem_accesses = get_memory_accesses_intersection(first_mem_accesses, second_mem_accesses)
    mem_accesses_groups: Dict[Tuple[int, MemoryAccessOperation, int, MemoryAccessOperation], Set[int]] = {}
    trans: Dict[int, Transition] = {}

    for (addr, (first_trans, first_rw), (second_trans, second_rw)) in intersected_mem_accesses:
        first_trans_id = first_trans.id
        if first_trans_id not in trans:
            trans[first_trans_id] = first_trans

        second_trans_id = second_trans.id
        if second_trans_id not in trans:
            trans[second_trans_id] = second_trans

        if (first_trans_id, first_rw, second_trans_id, second_rw) not in mem_accesses_groups:
            mem_accesses_groups[(first_trans_id, first_rw, second_trans_id, second_rw)] = set()

        mem_accesses_groups[(first_trans_id, first_rw, second_trans_id, second_rw)].add(addr)

    for (first_trans_id, first_rw, second_trans_id, second_rw), mem_addrs in mem_accesses_groups.items():
        while len(mem_addrs) > 0:
            min_addr = min(mem_addrs)
            current_addr = min_addr
            while current_addr in mem_addrs:
                mem_addrs.remove(current_addr)
                current_addr = current_addr + 1

            yield (min_addr, current_addr - min_addr,
                   (trans[first_trans_id], first_rw), (trans[second_trans_id], second_rw))


# Group memory accesses having the same address and size.
# Return a map: (mem_addr, size) -> list((first_trans, first_rw)), list((second_trans, second_rw)).
def get_grouped_by_address_memory_accesses_intersection(
    mem_accesses: Iterator[Tuple[int, int,
                                 Tuple[Transition, MemoryAccessOperation],
                                 Tuple[Transition, MemoryAccessOperation]]]) \
        -> Dict[Tuple[int, int],
                Tuple[List[Tuple[Transition, MemoryAccessOperation]],
                      List[Tuple[Transition, MemoryAccessOperation]]]]:

    # Dict: (mem_addr, size) -> list((first_trans, rw)), list((second_trans, rw))
    mem_accesses_groups: Dict[Tuple[int, int],
                              Tuple[List[Tuple[Transition, MemoryAccessOperation]],
                                    List[Tuple[Transition, MemoryAccessOperation]]]] = {}

    for (addr, access_len, (first_trans, first_rw), (second_trans, second_rw)) in mem_accesses:
        if (addr, access_len) not in mem_accesses_groups:
            mem_accesses_groups[(addr, access_len)] = ([], [])

        (first_accesses, second_accesses) = mem_accesses_groups[(addr, access_len)]
        if (first_trans, first_rw) not in first_accesses:
            first_accesses.append((first_trans, first_rw))

        if (second_trans, second_rw) not in second_accesses:
            second_accesses.append((second_trans, second_rw))

    # Sort accesses using transition id and memory access operation
    def acc_id(a):
        return (a[0].id, 0 if a[1] == MemoryAccessOperation.Read else 1)
    for (first_accesses, second_accesses) in mem_accesses_groups.values():
        first_accesses.sort(key=acc_id)
        second_accesses.sort(key=acc_id)

    return mem_accesses_groups


# Heuristics to avoid irrelevant memory accesses.
def filter_access_by_heuristics(sco: RuntimeHelper, trans: Transition, blacklist: Set[int]) -> bool:
    if trans.id in blacklist:
        return True

    # Memory accesses of the first transition are considered non-protected
    try:
        trans_ctxt = trans.context_before()
    except IndexError:
        return False

    trans_location = trans_ctxt.ossi.location()
    if trans_location is None:
        return False

    if trans_location.binary is None:
        return False

    if trans_location.binary.path != 'c:/windows/system32/ntdll.dll':
        return False

    trans_frames = trans_ctxt.stack.frames()

    # Accesses of thread initialization/finalization are not counted
    if sco.ldrp_initialize_thread is not None and sco.ldr_shutdown_thread is not None:
        for frame in trans_frames:
            frame_ctxt = frame.first_context
            frame_location = frame_ctxt.ossi.location()

            if frame_location is None:
                continue

            if frame_location.binary is None:
                continue

            if frame_location.binary.path != 'c:/windows/system32/ntdll.dll':
                continue

            if frame_location.symbol is None:
                continue

            if frame_location.symbol in [sco.ldr_shutdown_thread, sco.ldrp_initialize_thread]:
                blacklist.add(trans.id)
                return True

    if trans_location.symbol is None:
        return False

    # Accesses of the synchronization APIs themselves are not counted.
    if trans_location.symbol in [sco.rtl_enter_critical_section, sco.rtl_leave_critical_section]:
        blacklist.add(trans.id)
        return True

    return False


# Get all locks (i.e. RtlEnterCriticalSection calls) and unlocks (i.e. RtlLeaveCriticalSection) calls of each thread
# given a list of context ranges.
# Return a map: thread_id -> list((is_lock, context))
def get_threads_locks_unlocks(process_ranges: List[Tuple[Context, Context]]) \
        -> Dict[int, List[Tuple[bool, Context]]]:

    threads_ranges: Dict[int, List[Tuple[Context, Context]]] = {}
    for (ctxt_lo, ctxt_hi) in process_ranges:
        tid = RuntimeHelper.thread_id(ctxt_lo)
        if tid not in threads_ranges:
            threads_ranges[tid] = []

        threads_ranges[tid].append((ctxt_lo, ctxt_hi))

    threads_locks_unlocks: Dict[int, List[Tuple[bool, Context]]] = {}
    for tid, ranges in threads_ranges.items():
        threads_locks_unlocks[tid] = list(get_thread_usermode_locks_unlocks(trace, ranges, binary))

    return threads_locks_unlocks


# Get all memory accesses of each thread given a list of context ranges.
# Return a map: thread_id -> list((mem_addr, trans, read_or_write))
def get_threads_memory_accesses(process_ranges: List[Tuple[Context, Context]]) \
        -> Dict[int, List[Tuple[int, Transition, MemoryAccessOperation]]]:

    # tid -> list((mem_addr, trans, mem_access))
    threads_memory_accesses: Dict[int, List[Tuple[int, Transition, MemoryAccessOperation]]] = {}
    for (ctxt_lo, ctxt_hi) in process_ranges:
        tid = RuntimeHelper.thread_id(ctxt_lo)
        if tid not in threads_memory_accesses:
            threads_memory_accesses[tid] = []

        mem_accesses = trace.get_free_memory_accesses(ctxt_lo, ctxt_hi)
        threads_memory_accesses[tid].extend(mem_accesses)

    return threads_memory_accesses


def detect_data_race(trace: RuntimeHelper, pid: int, binary: Optional[str],
                     begin_trans_id: Optional[int], end_trans_id: Optional[Context]):

    def get_lock_handles(sco: RuntimeHelper, locks: List[Context]) -> Set[int]:
        lock_handles = set()
        for ctxt in locks:
            lock_handles.add(RuntimeHelper.get_critical_section_handle(ctxt))
        return lock_handles

    (first_context, last_context) = find_begin_end_context(trace, pid, binary, begin_trans_id, end_trans_id)
    process_ranges = list(find_process_usermode_ranges(trace, pid, first_context, last_context))

    thread_memory_accesses = get_threads_memory_accesses(process_ranges)
    threads_locks_unlocks = get_threads_locks_unlocks(process_ranges)

    # set(thread_id)
    tids = set()

    # map: (thread_id, transition_id) -> set(lock_handle)
    cached_live_locks_handles: Dict[Tuple[int, int], Set[int]] = {}

    # map: transition_id -> pc
    cached_pcs: Dict[int, int] = {}

    # set(transition_id)
    filtered_trans_ids: Set[int] = set()

    for first_tid, first_mem_accesses in thread_memory_accesses.items():
        tids.add(first_tid)
        for second_tid, second_mem_accesses in thread_memory_accesses.items():
            if second_tid in tids:
                continue

            mem_accesses_intersected = \
                get_grouped_by_transition_memory_accesses_intersection(first_mem_accesses, second_mem_accesses)
            mem_accesses_grouped = get_grouped_by_address_memory_accesses_intersection(mem_accesses_intersected)

            for ((mem_addr, mem_size), (first_accesses, second_accesses)) in mem_accesses_grouped.items():
                race_pcs: Set[Tuple[int, int]] = set()

                for (first_trans, first_rw) in first_accesses:
                    if (first_tid, first_trans.id) in cached_live_locks_handles:
                        first_lock_handles = cached_live_locks_handles[(first_tid, first_trans.id)]
                    else:
                        first_locks = get_transition_live_locks(trace, threads_locks_unlocks[first_tid], first_trans)
                        first_lock_handles = get_lock_handles(trace, first_locks)

                    if first_trans.id in cached_pcs:
                        first_pc = cached_pcs[first_trans.id]
                    else:
                        first_pc = first_trans.context_before().read(x64.rip)

                    for (second_trans, second_rw) in second_accesses:
                        if (second_tid, second_trans.id) in cached_live_locks_handles:
                            second_lock_handles = cached_live_locks_handles[(second_tid, second_trans.id)]
                        else:
                            second_locks = get_transition_live_locks(
                                trace, threads_locks_unlocks[second_tid], second_trans
                            )
                            second_lock_handles = get_lock_handles(trace, second_locks)
                            cached_live_locks_handles[(second_tid, second_trans.id)] = second_lock_handles

                        if second_trans.id in cached_pcs:
                            second_pc = cached_pcs[second_trans.id]
                        else:
                            second_pc = second_trans.context_before().read(x64.rip)
                            cached_pcs[second_trans.id] = second_pc

                        # Skip if both are the same operation (i.e. read/read or write/write)
                        if first_rw == second_rw:
                            continue

                        # There a common critical section protecting the accesses
                        if not first_lock_handles.isdisjoint(second_lock_handles):
                            continue

                        # Duplicated
                        if (first_pc, second_pc) in race_pcs:
                            continue

                        if filter_access_by_heuristics(trace, first_trans, filtered_trans_ids) and \
                                filter_access_by_heuristics(trace, second_trans, filtered_trans_ids):
                            continue

                        first_is_write = first_rw == MemoryAccessOperation.Write
                        second_is_write = second_rw == MemoryAccessOperation.Write

                        if first_trans < second_trans:
                            before_mem_opr_str = 'write' if first_is_write else 'read'
                            after_mem_opr_str = 'write' if second_is_write else 'read'

                            before_trans = first_trans
                            after_trans = second_trans

                            before_tid = first_tid
                            after_tid = second_tid

                        else:
                            before_mem_opr_str = 'write' if second_is_write else 'read'
                            after_mem_opr_str = 'write' if first_is_write else 'read'

                            before_trans = second_trans
                            after_trans = first_trans

                            before_tid = second_tid
                            after_tid = first_tid

                        print(f'Possible data race during {after_mem_opr_str} \
(transition #{after_trans.id}) of size {mem_size} at 0x{mem_addr:x} by thread {after_tid}.')
                        print(f'This conflicts with a previous {before_mem_opr_str} \
(transition #{before_trans.id}) by thread {before_tid}.\n')

                        race_pcs.add((first_pc, second_pc))


# %%
trace = RuntimeHelper(host, port)
detect_data_race(trace, pid, binary, begin_trans_id, end_trans_id)