Detect data race
This notebook checks for possible data race that may occur when using critical sections as the synchronization primitive.
Prerequisites
- Supported versions:
- REVEN 2.9+
- This notebook should be run in a jupyter notebook server equipped with a REVEN 2 Python kernel.
REVEN comes with a Jupyter notebook server accessible with the
Open Python
button in theAnalyze
page of any scenario. - The following resources are needed for the analyzed scenario:
- Trace
- OSSI
- Memory History
- Backtrace
- Fast Search
Perimeter:
- Windows 10 64-bit
Limits
- Only support
RtlEnterCriticalSection
andRtlLeaveCriticalSection
as the lock (resp. unlock) primitives
Running
Fill out the parameters in the Parameters cell below, then run all the cells of this notebook.
Source
# ---
# jupyter:
# jupytext:
# formats: ipynb,py:percent
# text_representation:
# extension: .py
# format_name: percent
# format_version: '1.3'
# jupytext_version: 1.11.2
# kernelspec:
# display_name: reven
# language: python
# name: reven-python3
# ---
# %% [markdown]
# # Detect data race
# This notebook checks for possible data race that may occur when using critical sections as the synchronization
# primitive.
#
# ## Prerequisites
# - Supported versions:
# - REVEN 2.9+
# - This notebook should be run in a jupyter notebook server equipped with a REVEN 2 Python kernel.
# REVEN comes with a Jupyter notebook server accessible with the `Open Python` button in the `Analyze`
# page of any scenario.
# - The following resources are needed for the analyzed scenario:
# - Trace
# - OSSI
# - Memory History
# - Backtrace
# - Fast Search
#
# ## Perimeter:
# - Windows 10 64-bit
#
# ## Limits
# - Only support `RtlEnterCriticalSection` and `RtlLeaveCriticalSection` as the lock (resp. unlock) primitives
#
# ## Running
# Fill out the parameters in the [Parameters cell](#Parameters) below, then run all the cells of this notebook.
# %%
# For Python's type annotation
from typing import Dict, Iterator, List, Optional, Set, Tuple
# Reven specific
import reven2
from reven2.address import LogicalAddress
from reven2.arch import x64
from reven2.memhist import MemoryAccessOperation
from reven2.trace import Context, Trace, Transition
from reven2.util import collate
# %% [markdown]
# # Parameters
# %%
# Host and port of the running the scenario
host = '127.0.0.1'
port = 42897
# The PID and (or) the name of the binary of interest: if the binary name is given, only locks and unlocks called
# directly from the binary are counted.
pid = 2820
binary = None
begin_trans_id = None
end_trans_id = None
# %%
# Helper class which wraps Reven's runtime objects and give methods helping get information about calls to
# RtlEnterCriticalSection and RtlLeaveCriticalSection
class RuntimeHelper:
def __init__(self, host: str, port: int):
try:
server = reven2.RevenServer(host, port)
except RuntimeError:
raise RuntimeError(f'Cannot connect to the scenario at {host}:{port}')
self.trace = server.trace
self.ossi = server.ossi
self.search_symbol = self.trace.search.symbol
self.search_binary = self.trace.search.binary
try:
ntoskrnl = next(self.ossi.executed_binaries('^c:/windows/system32/ntoskrnl.exe$'))
except StopIteration:
raise RuntimeError('ntoskrnl.exe not found')
try:
self.swap_context = next(ntoskrnl.symbols('^SwapContext$'))
except StopIteration:
raise RuntimeError('SwapContext symbol not found')
try:
ntdll = next(self.ossi.executed_binaries('^c:/windows/system32/ntdll.dll$'))
except StopIteration:
raise RuntimeError('ntdll.dll not found')
try:
self.rtl_enter_critical_section = next(ntdll.symbols("^RtlEnterCriticalSection$"))
self.rtl_leave_critical_section = next(ntdll.symbols("^RtlLeaveCriticalSection$"))
except StopIteration:
raise RuntimeError('Rtl(Enter|Leave)CriticalSection symbol not found')
try:
self.ldrp_initialize_thread = next(ntdll.symbols("^LdrpInitializeThread$"))
except StopIteration:
self.ldrp_initialize_thread = None
try:
self.ldr_shutdown_thread = next(ntdll.symbols("^LdrShutdownThread$"))
except StopIteration:
self.ldr_shutdown_thread = None
def get_swap_contexts(self, from_context: Context, to_context: Context) -> Iterator[Context]:
if from_context >= to_context:
return
# Workaround for a current bug in searching symbol where to_context is the last context
if to_context == self.trace.last_context:
to_context = None
return self.search_symbol(self.swap_context, from_context, to_context)
def get_critical_section_locks(self, from_context: Context, to_context: Context) -> Iterator[Context]:
# For ease, if the from_context is the first context of the trace and also the beginning of a lock,
# then omit it since the caller is unknown
if from_context == self.trace.first_context:
from_context = from_context + 1
if from_context >= to_context:
return
# Workaround for a current bug in searching symbol where to_context is the last context
if to_context == self.trace.last_context:
to_context = None
for ctxt in self.search_symbol(self.rtl_enter_critical_section, from_context, to_context):
# Any context correspond to the entry of RtlEnterCriticalSection, since the first context
# does not count, decrease by 1 is safe.
yield ctxt - 1
def get_critical_section_unlocks(self, from_context: Context, to_context: Context) -> Iterator[Context]:
# For ease, if the from_context is the first context of the trace and also the beginning of an unlock,
# then omit it since the caller is unknown
if from_context == self.trace.first_context:
from_context = from_context + 1
if from_context >= to_context:
return
# Workaround for a current bug in searching symbol where to_context is the last context
if to_context == self.trace.last_context:
to_context = None
for ctxt in self.search_symbol(self.rtl_leave_critical_section, from_context, to_context):
# Any context correspond to the entry of RtlLeaveCriticalSection, since the first context
# does not count, decrease by 1 is safe.
yield ctxt - 1
def get_memory_accesses(self, from_context: Context, to_context: Context) \
-> Iterator[Tuple[int, Transition, MemoryAccessOperation]]:
try:
from_trans = from_context.transition_after()
to_trans = to_context.transition_before()
except IndexError:
return
mem_accs = self.trace.memory_accesses(from_transition=from_trans, to_transition=to_trans)
mem_read_addrs = set()
mem_write_addrs = set()
for ma in mem_accs:
if ma.virtual_address is None:
continue
if ma.transition.instruction is None:
continue
if ma.operation == MemoryAccessOperation.Read:
for i in range(ma.size):
mem_read_addr = ma.virtual_address.offset + i
if mem_read_addr in mem_read_addrs:
continue
mem_read_addrs.add(mem_read_addr)
yield (mem_read_addr, ma.transition, MemoryAccessOperation.Read)
else:
# MemoryAccessOperation.Write
for i in range(ma.size):
mem_write_addr = ma.virtual_address.offset + i
if mem_write_addr in mem_write_addrs:
continue
mem_write_addrs.add(mem_write_addr)
yield (mem_write_addr, ma.transition, MemoryAccessOperation.Write)
def get_free_memory_accesses(self, from_context: Context, to_context: Context) \
-> Iterator[Tuple[int, Transition, MemoryAccessOperation]]:
mem_accesses = self.get_memory_accesses(from_context, to_context)
for (addr, trans, rw) in mem_accesses:
ins_bytes = trans.instruction.raw
# Skip the access of `lock` prefixed instruction
if ins_bytes[0] == 0xf0:
continue
yield (addr, trans, rw)
def get_critical_section_handle(ctxt: Context) -> int:
return ctxt.read(x64.rcx)
def thread_id(ctxt: Context) -> int:
return ctxt.read(LogicalAddress(0x48, x64.gs), 4)
def is_kernel_mode(ctxt: Context) -> bool:
return ctxt.read(x64.cs) & 0x3 == 0
# %%
# Look for a possible first execution context of a binary
# Find the lower/upper bound of contexts on which the deadlock detection processes
def find_begin_end_context(sco: RuntimeHelper, pid: int, binary: Optional[str],
begin_id: Optional[int], end_id: Optional[int]) -> Tuple[Context, Context]:
begin_ctxt = None
if begin_id is not None:
try:
begin_trans = sco.trace.transition(begin_id)
begin_ctxt = begin_trans.context_after()
except IndexError:
begin_ctxt = None
if begin_ctxt is None:
if binary is not None:
for name in sco.ossi.executed_binaries(binary):
for ctxt in sco.search_binary(name):
if ctxt.ossi.process().pid == pid:
begin_ctxt = ctxt
break
if begin_ctxt is not None:
break
if begin_ctxt is None:
begin_ctxt = sco.trace.first_context
end_ctxt = None
if end_id is not None:
try:
end_trans = sco.trace.transition(end_id)
end_ctxt = end_trans.context_before()
except IndexError:
end_ctxt = None
if end_ctxt is None:
end_ctxt = sco.trace.last_context
if (end_ctxt <= begin_ctxt):
raise RuntimeError("The begin transition must be smaller than the end.")
return (begin_ctxt, end_ctxt)
# Get all execution contexts of a given process
def find_process_ranges(sco: RuntimeHelper, pid: int, first_context: Context, last_context: Context) \
-> Iterator[Tuple[Context, Context]]:
if last_context <= first_context:
return
ctxt_low = first_context
for ctxt in sco.get_swap_contexts(first_context, last_context):
if ctxt_low.ossi.process().pid == pid:
yield (ctxt_low, ctxt)
ctxt_low = ctxt
if ctxt_low < last_context:
if ctxt_low.ossi.process().pid == pid:
yield (ctxt_low, last_context)
# Start from a transition, return the first transition that is not a non-instruction, or None if there isn't one.
def ignore_non_instructions(trans: Transition, trace: Trace) -> Optional[Transition]:
while trans.instruction is None:
if trans == trace.last_transition:
return None
trans = trans + 1
return trans
# Extract user mode only context ranges from a context range (which may include also kernel mode ranges)
def find_usermode_ranges(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context) \
-> Iterator[Tuple[Context, Context]]:
trans = ignore_non_instructions(ctxt_low.transition_after(), sco.trace)
if trans is None:
return
ctxt_current = trans.context_before()
while ctxt_current < ctxt_high:
ctxt_next = ctxt_current.find_register_change(x64.cs, is_forward=True)
if not RuntimeHelper.is_kernel_mode(ctxt_current):
if ctxt_next is None or ctxt_next > ctxt_high:
yield (ctxt_current, ctxt_high)
break
else:
# It's safe to decrease ctxt_next by 1 because it was obtained from a forward find_register_change
yield (ctxt_current, ctxt_next - 1)
if ctxt_next is None:
break
ctxt_current = ctxt_next
# Get user mode only execution contexts of a given process
def find_process_usermode_ranges(trace: RuntimeHelper, pid: int, first_ctxt: Context, last_ctxt: Context) \
-> Iterator[Tuple[Context, Context]]:
for (ctxt_low, ctxt_high) in find_process_ranges(trace, pid, first_ctxt, last_ctxt):
usermode_ranges = find_usermode_ranges(trace, ctxt_low, ctxt_high)
for usermode_range in usermode_ranges:
yield usermode_range
# Check if the context is from an transition starting from the binary in interest
def context_is_in_binary(ctxt: Optional[Context], binary: Optional[str]) -> bool:
if ctxt is None:
return False
if binary is None:
return True
ctxt_loc = ctxt.ossi.location()
if ctxt_loc is None:
return False
ctxt_binary = ctxt_loc.binary
if ctxt_binary is not None:
return (binary in [ctxt_binary.name, ctxt_binary.filename, ctxt_binary.path])
return False
# Get lock (i.e. RtlEnterCriticalSection) call contexts
def get_locks(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context, binary: Optional[str]) \
-> Iterator[Context]:
for ctxt in sco.get_critical_section_locks(ctxt_low, ctxt_high):
if context_is_in_binary(ctxt, binary):
yield ctxt
# Get unlock (i.e. RtlLeaveCriticalSection) call contexts
def get_unlocks(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context, binary: Optional[str]) \
-> Iterator[Context]:
for ctxt in sco.get_critical_section_unlocks(ctxt_low, ctxt_high):
if context_is_in_binary(ctxt, binary):
yield ctxt
# Sort lock and unlock contexts (called in a range of contexts) in a correct order.
# Return a generator of pairs (bool, Context): True for lock, False for unlock.
def get_locks_unlocks(sco: RuntimeHelper, ctxt_low: Context, ctxt_high: Context, binary: Optional[str]) \
-> Iterator[Tuple[bool, Context]]:
def generate_locks():
for ctxt in get_locks(sco, ctxt_low, ctxt_high, binary):
yield (True, ctxt)
def generate_unlocks():
for ctxt in get_unlocks(sco, ctxt_low, ctxt_high, binary):
yield (False, ctxt)
return collate([generate_locks(), generate_unlocks()], key=lambda bool_context: bool_context[1])
# Generate all locks and unlocks of context ranges of a thread
def get_thread_usermode_locks_unlocks(sco: RuntimeHelper, ranges: List[Tuple[Context, Context]],
binary: Optional[str]) -> Iterator[Tuple[bool, Context]]:
for (ctxt_low, ctxt_high) in ranges:
for lock_unlock in get_locks_unlocks(sco, ctxt_low, ctxt_high, binary):
yield lock_unlock
# Get locks which are still alive until a transition
def get_transition_live_locks(sco: RuntimeHelper,
locks_unlocks: List[Tuple[bool, Context]], trans: Transition) -> List[Context]:
protection_locks: List[Context] = []
trans_ctxt = trans.context_before()
for (is_lock, ctxt) in locks_unlocks:
if ctxt > trans_ctxt:
return protection_locks
if is_lock:
protection_locks.append(ctxt)
continue
unlock_handle = RuntimeHelper.get_critical_section_handle(ctxt)
# look for the lastest corresponding lock
for (idx, lock_ctxt) in reversed(list(enumerate(protection_locks))):
lock_handle = RuntimeHelper.get_critical_section_handle(lock_ctxt)
if lock_handle == unlock_handle:
del protection_locks[idx]
break
return protection_locks
# %%
# Get the common memory accesses of two threads (of the same process): an access of multiple bytes is split into
# multiple accesses of a single byte.
# Return a generator of (address, (transition, read|write), (transition, read|write)).
def get_memory_accesses_intersection(first_mem_accesses: List[Tuple[int, Transition, MemoryAccessOperation]],
second_mem_accesses: List[Tuple[int, Transition, MemoryAccessOperation]]) \
-> Iterator[Tuple[int, Tuple[Transition, MemoryAccessOperation], Tuple[Transition, MemoryAccessOperation]]]:
def split_memory_accesses(mem_accesses):
mem_reads = []
mem_writes = []
for (addr, trans, rw) in mem_accesses:
if rw == MemoryAccessOperation.Read:
mem_reads.append((addr, trans))
else:
mem_writes.append((addr, trans))
return (mem_reads, mem_writes)
(first_mem_read_accesses, first_mem_write_accesses) = split_memory_accesses(first_mem_accesses)
(second_mem_read_accesses, second_mem_write_accesses) = split_memory_accesses(second_mem_accesses)
for (first_addr, first_trans) in first_mem_read_accesses:
for (second_addr, second_trans) in second_mem_write_accesses:
if first_addr == second_addr:
yield (first_addr,
(first_trans, MemoryAccessOperation.Read), (second_trans, MemoryAccessOperation.Write))
for (first_addr, first_trans) in first_mem_write_accesses:
for (second_addr, second_trans) in second_mem_read_accesses:
if first_addr == second_addr:
yield (first_addr,
(first_trans, MemoryAccessOperation.Write), (second_trans, MemoryAccessOperation.Read))
# Restore (i.e unsplit) memory accesses by regrouping pairs having the same transitions.
# Return a generator of (address, size, (transition, read|write)*)
def get_grouped_by_transition_memory_accesses_intersection(
first_mem_accesses: List[Tuple[int, Transition, MemoryAccessOperation]],
second_mem_accesses: List[Tuple[int, Transition, MemoryAccessOperation]]) \
-> Iterator[Tuple[int, int,
Tuple[Transition, MemoryAccessOperation], Tuple[Transition, MemoryAccessOperation]]]:
intersected_mem_accesses = get_memory_accesses_intersection(first_mem_accesses, second_mem_accesses)
mem_accesses_groups: Dict[Tuple[int, MemoryAccessOperation, int, MemoryAccessOperation], Set[int]] = {}
trans: Dict[int, Transition] = {}
for (addr, (first_trans, first_rw), (second_trans, second_rw)) in intersected_mem_accesses:
first_trans_id = first_trans.id
if first_trans_id not in trans:
trans[first_trans_id] = first_trans
second_trans_id = second_trans.id
if second_trans_id not in trans:
trans[second_trans_id] = second_trans
if (first_trans_id, first_rw, second_trans_id, second_rw) not in mem_accesses_groups:
mem_accesses_groups[(first_trans_id, first_rw, second_trans_id, second_rw)] = set()
mem_accesses_groups[(first_trans_id, first_rw, second_trans_id, second_rw)].add(addr)
for (first_trans_id, first_rw, second_trans_id, second_rw), mem_addrs in mem_accesses_groups.items():
while len(mem_addrs) > 0:
min_addr = min(mem_addrs)
current_addr = min_addr
while current_addr in mem_addrs:
mem_addrs.remove(current_addr)
current_addr = current_addr + 1
yield (min_addr, current_addr - min_addr,
(trans[first_trans_id], first_rw), (trans[second_trans_id], second_rw))
# Group memory accesses having the same address and size.
# Return a map: (mem_addr, size) -> list((first_trans, first_rw)), list((second_trans, second_rw)).
def get_grouped_by_address_memory_accesses_intersection(
mem_accesses: Iterator[Tuple[int, int,
Tuple[Transition, MemoryAccessOperation],
Tuple[Transition, MemoryAccessOperation]]]) \
-> Dict[Tuple[int, int],
Tuple[List[Tuple[Transition, MemoryAccessOperation]],
List[Tuple[Transition, MemoryAccessOperation]]]]:
# Dict: (mem_addr, size) -> list((first_trans, rw)), list((second_trans, rw))
mem_accesses_groups: Dict[Tuple[int, int],
Tuple[List[Tuple[Transition, MemoryAccessOperation]],
List[Tuple[Transition, MemoryAccessOperation]]]] = {}
for (addr, access_len, (first_trans, first_rw), (second_trans, second_rw)) in mem_accesses:
if (addr, access_len) not in mem_accesses_groups:
mem_accesses_groups[(addr, access_len)] = ([], [])
(first_accesses, second_accesses) = mem_accesses_groups[(addr, access_len)]
if (first_trans, first_rw) not in first_accesses:
first_accesses.append((first_trans, first_rw))
if (second_trans, second_rw) not in second_accesses:
second_accesses.append((second_trans, second_rw))
# Sort accesses using transition id and memory access operation
def acc_id(a):
return (a[0].id, 0 if a[1] == MemoryAccessOperation.Read else 1)
for (first_accesses, second_accesses) in mem_accesses_groups.values():
first_accesses.sort(key=acc_id)
second_accesses.sort(key=acc_id)
return mem_accesses_groups
# Heuristics to avoid irrelevant memory accesses.
def filter_access_by_heuristics(sco: RuntimeHelper, trans: Transition, blacklist: Set[int]) -> bool:
if trans.id in blacklist:
return True
# Memory accesses of the first transition are considered non-protected
try:
trans_ctxt = trans.context_before()
except IndexError:
return False
trans_location = trans_ctxt.ossi.location()
if trans_location is None:
return False
if trans_location.binary is None:
return False
if trans_location.binary.path != 'c:/windows/system32/ntdll.dll':
return False
trans_frames = trans_ctxt.stack.frames()
# Accesses of thread initialization/finalization are not counted
if sco.ldrp_initialize_thread is not None and sco.ldr_shutdown_thread is not None:
for frame in trans_frames:
frame_ctxt = frame.first_context
frame_location = frame_ctxt.ossi.location()
if frame_location is None:
continue
if frame_location.binary is None:
continue
if frame_location.binary.path != 'c:/windows/system32/ntdll.dll':
continue
if frame_location.symbol is None:
continue
if frame_location.symbol in [sco.ldr_shutdown_thread, sco.ldrp_initialize_thread]:
blacklist.add(trans.id)
return True
if trans_location.symbol is None:
return False
# Accesses of the synchronization APIs themselves are not counted.
if trans_location.symbol in [sco.rtl_enter_critical_section, sco.rtl_leave_critical_section]:
blacklist.add(trans.id)
return True
return False
# Get all locks (i.e. RtlEnterCriticalSection calls) and unlocks (i.e. RtlLeaveCriticalSection) calls of each thread
# given a list of context ranges.
# Return a map: thread_id -> list((is_lock, context))
def get_threads_locks_unlocks(process_ranges: List[Tuple[Context, Context]]) \
-> Dict[int, List[Tuple[bool, Context]]]:
threads_ranges: Dict[int, List[Tuple[Context, Context]]] = {}
for (ctxt_lo, ctxt_hi) in process_ranges:
tid = RuntimeHelper.thread_id(ctxt_lo)
if tid not in threads_ranges:
threads_ranges[tid] = []
threads_ranges[tid].append((ctxt_lo, ctxt_hi))
threads_locks_unlocks: Dict[int, List[Tuple[bool, Context]]] = {}
for tid, ranges in threads_ranges.items():
threads_locks_unlocks[tid] = list(get_thread_usermode_locks_unlocks(trace, ranges, binary))
return threads_locks_unlocks
# Get all memory accesses of each thread given a list of context ranges.
# Return a map: thread_id -> list((mem_addr, trans, read_or_write))
def get_threads_memory_accesses(process_ranges: List[Tuple[Context, Context]]) \
-> Dict[int, List[Tuple[int, Transition, MemoryAccessOperation]]]:
# tid -> list((mem_addr, trans, mem_access))
threads_memory_accesses: Dict[int, List[Tuple[int, Transition, MemoryAccessOperation]]] = {}
for (ctxt_lo, ctxt_hi) in process_ranges:
tid = RuntimeHelper.thread_id(ctxt_lo)
if tid not in threads_memory_accesses:
threads_memory_accesses[tid] = []
mem_accesses = trace.get_free_memory_accesses(ctxt_lo, ctxt_hi)
threads_memory_accesses[tid].extend(mem_accesses)
return threads_memory_accesses
def detect_data_race(trace: RuntimeHelper, pid: int, binary: Optional[str],
begin_trans_id: Optional[int], end_trans_id: Optional[Context]):
def get_lock_handles(sco: RuntimeHelper, locks: List[Context]) -> Set[int]:
lock_handles = set()
for ctxt in locks:
lock_handles.add(RuntimeHelper.get_critical_section_handle(ctxt))
return lock_handles
(first_context, last_context) = find_begin_end_context(trace, pid, binary, begin_trans_id, end_trans_id)
process_ranges = list(find_process_usermode_ranges(trace, pid, first_context, last_context))
thread_memory_accesses = get_threads_memory_accesses(process_ranges)
threads_locks_unlocks = get_threads_locks_unlocks(process_ranges)
# set(thread_id)
tids = set()
# map: (thread_id, transition_id) -> set(lock_handle)
cached_live_locks_handles: Dict[Tuple[int, int], Set[int]] = {}
# map: transition_id -> pc
cached_pcs: Dict[int, int] = {}
# set(transition_id)
filtered_trans_ids: Set[int] = set()
for first_tid, first_mem_accesses in thread_memory_accesses.items():
tids.add(first_tid)
for second_tid, second_mem_accesses in thread_memory_accesses.items():
if second_tid in tids:
continue
mem_accesses_intersected = \
get_grouped_by_transition_memory_accesses_intersection(first_mem_accesses, second_mem_accesses)
mem_accesses_grouped = get_grouped_by_address_memory_accesses_intersection(mem_accesses_intersected)
for ((mem_addr, mem_size), (first_accesses, second_accesses)) in mem_accesses_grouped.items():
race_pcs: Set[Tuple[int, int]] = set()
for (first_trans, first_rw) in first_accesses:
if (first_tid, first_trans.id) in cached_live_locks_handles:
first_lock_handles = cached_live_locks_handles[(first_tid, first_trans.id)]
else:
first_locks = get_transition_live_locks(trace, threads_locks_unlocks[first_tid], first_trans)
first_lock_handles = get_lock_handles(trace, first_locks)
if first_trans.id in cached_pcs:
first_pc = cached_pcs[first_trans.id]
else:
first_pc = first_trans.context_before().read(x64.rip)
for (second_trans, second_rw) in second_accesses:
if (second_tid, second_trans.id) in cached_live_locks_handles:
second_lock_handles = cached_live_locks_handles[(second_tid, second_trans.id)]
else:
second_locks = get_transition_live_locks(
trace, threads_locks_unlocks[second_tid], second_trans
)
second_lock_handles = get_lock_handles(trace, second_locks)
cached_live_locks_handles[(second_tid, second_trans.id)] = second_lock_handles
if second_trans.id in cached_pcs:
second_pc = cached_pcs[second_trans.id]
else:
second_pc = second_trans.context_before().read(x64.rip)
cached_pcs[second_trans.id] = second_pc
# Skip if both are the same operation (i.e. read/read or write/write)
if first_rw == second_rw:
continue
# There a common critical section protecting the accesses
if not first_lock_handles.isdisjoint(second_lock_handles):
continue
# Duplicated
if (first_pc, second_pc) in race_pcs:
continue
if filter_access_by_heuristics(trace, first_trans, filtered_trans_ids) and \
filter_access_by_heuristics(trace, second_trans, filtered_trans_ids):
continue
first_is_write = first_rw == MemoryAccessOperation.Write
second_is_write = second_rw == MemoryAccessOperation.Write
if first_trans < second_trans:
before_mem_opr_str = 'write' if first_is_write else 'read'
after_mem_opr_str = 'write' if second_is_write else 'read'
before_trans = first_trans
after_trans = second_trans
before_tid = first_tid
after_tid = second_tid
else:
before_mem_opr_str = 'write' if second_is_write else 'read'
after_mem_opr_str = 'write' if first_is_write else 'read'
before_trans = second_trans
after_trans = first_trans
before_tid = second_tid
after_tid = first_tid
print(f'Possible data race during {after_mem_opr_str} \
(transition #{after_trans.id}) of size {mem_size} at 0x{mem_addr:x} by thread {after_tid}.')
print(f'This conflicts with a previous {before_mem_opr_str} \
(transition #{before_trans.id}) by thread {before_tid}.\n')
race_pcs.add((first_pc, second_pc))
# %%
trace = RuntimeHelper(host, port)
detect_data_race(trace, pid, binary, begin_trans_id, end_trans_id)