File: threads/mutex.py
This file provides a framework-agnostic implementation of mutexes, allowing
the use of different locking mechanisms depending on the threading framework
(e.g., standard Python threading or PyQt6).
Classes:
-
IMutex
–
Abstract base class for mutex implementations.
-
ThreadingMutex
–
Lock implementation using Python's threading.RLock.
-
QtMutex
–
Lock implementation using PyQt6's QMutex.
-
MutexFactory
–
Factory class for creating mutex instances based on the selected implementation.
IMutex
Bases: ABC
Abstract base class for mutex implementations.
Methods:
Source code in kern_comm_lib/base/threads/mutex.py
| class IMutex(ABC):
"""Abstract base class for mutex implementations."""
@abstractmethod
def acquire(
self, blocking: bool = True, timeout: float = -1
) -> AStatusOrElse[bool]:
"""Acquires the mutex.
Args:
blocking (default: True): Whether to block until the mutex is acquired.
timeout (default: -1): Maximum time to wait for the mutex in seconds.
Returns:
True if the mutex was acquired, False otherwise, if an error occurred
a Status object is returned.
"""
pass
@abstractmethod
def release(self) -> Status:
"""Releases the mutex."""
pass
def __enter__(self):
"""Context manager entry method. Acquires the mutex."""
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit method. Releases the mutex."""
self.release()
|
acquire
abstractmethod
acquire(blocking: bool = True, timeout: float = -1) -> AStatusOrElse[bool]
Acquires the mutex.
Parameters:
-
blocking
(default
, default:
True
)
–
True): Whether to block until the mutex is acquired.
-
timeout
(default
, default:
-1
)
–
-1): Maximum time to wait for the mutex in seconds.
Returns:
-
AStatusOrElse[bool]
–
True if the mutex was acquired, False otherwise, if an error occurred
-
AStatusOrElse[bool]
–
a Status object is returned.
Source code in kern_comm_lib/base/threads/mutex.py
| @abstractmethod
def acquire(
self, blocking: bool = True, timeout: float = -1
) -> AStatusOrElse[bool]:
"""Acquires the mutex.
Args:
blocking (default: True): Whether to block until the mutex is acquired.
timeout (default: -1): Maximum time to wait for the mutex in seconds.
Returns:
True if the mutex was acquired, False otherwise, if an error occurred
a Status object is returned.
"""
pass
|
release
abstractmethod
Releases the mutex.
Source code in kern_comm_lib/base/threads/mutex.py
| @abstractmethod
def release(self) -> Status:
"""Releases the mutex."""
pass
|
__enter__
Context manager entry method. Acquires the mutex.
Source code in kern_comm_lib/base/threads/mutex.py
| def __enter__(self):
"""Context manager entry method. Acquires the mutex."""
self.acquire()
return self
|
__exit__
__exit__(exc_type, exc_val, exc_tb)
Context manager exit method. Releases the mutex.
Source code in kern_comm_lib/base/threads/mutex.py
| def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit method. Releases the mutex."""
self.release()
|
ThreadingMutex
Bases: IMutex
Lock implementation using Python's threading.RLock.
Attributes:
-
_mutex
(RLock
)
–
The underlying reentrant lock.
Methods:
Source code in kern_comm_lib/base/threads/mutex.py
| class ThreadingMutex(IMutex):
"""Lock implementation using Python's threading.RLock.
Attributes:
_mutex: The underlying reentrant lock.
"""
def __init__(self) -> None:
"""Initializes a new ThreadingMutex instance."""
self._mutex: threading.RLock = threading.RLock()
def acquire(
self, blocking: bool = True, timeout: float = -1
) -> AStatusOrElse[bool]:
"""Acquires the mutex.
Args:
blocking (default: True): Whether to block until the mutex is acquired.
timeout (default: -1): Maximum time to wait for the mutex in seconds.
Returns:
True if the mutex was acquired, False otherwise, if an error occurred
a Status object is returned.
"""
# <editor-fold desc="Checks">
check.DCHECK_NOT_NONE(blocking)
check.DCHECK_NOT_NONE(timeout)
check.DCHECK_GREATER_THAN(timeout, 0)
# </editor-fold>
try:
return self._mutex.acquire(blocking=blocking, timeout=timeout)
except Exception as e:
return Status.from_exception(e)
def release(self) -> Status:
"""Releases the mutex.
Returns:
A Status object indicating success or failure of the operation.
"""
try:
self._mutex.release()
return Status()
except RuntimeError:
return Status.from_status_code(
StatusCode.FAILED_PRECONDITION,
"Failed precondition: The mutex was not acquired.",
)
|
__init__
Initializes a new ThreadingMutex instance.
Source code in kern_comm_lib/base/threads/mutex.py
| def __init__(self) -> None:
"""Initializes a new ThreadingMutex instance."""
self._mutex: threading.RLock = threading.RLock()
|
acquire
acquire(blocking: bool = True, timeout: float = -1) -> AStatusOrElse[bool]
Acquires the mutex.
Parameters:
-
blocking
(default
, default:
True
)
–
True): Whether to block until the mutex is acquired.
-
timeout
(default
, default:
-1
)
–
-1): Maximum time to wait for the mutex in seconds.
Returns:
-
AStatusOrElse[bool]
–
True if the mutex was acquired, False otherwise, if an error occurred
-
AStatusOrElse[bool]
–
a Status object is returned.
Source code in kern_comm_lib/base/threads/mutex.py
| def acquire(
self, blocking: bool = True, timeout: float = -1
) -> AStatusOrElse[bool]:
"""Acquires the mutex.
Args:
blocking (default: True): Whether to block until the mutex is acquired.
timeout (default: -1): Maximum time to wait for the mutex in seconds.
Returns:
True if the mutex was acquired, False otherwise, if an error occurred
a Status object is returned.
"""
# <editor-fold desc="Checks">
check.DCHECK_NOT_NONE(blocking)
check.DCHECK_NOT_NONE(timeout)
check.DCHECK_GREATER_THAN(timeout, 0)
# </editor-fold>
try:
return self._mutex.acquire(blocking=blocking, timeout=timeout)
except Exception as e:
return Status.from_exception(e)
|
release
Releases the mutex.
Returns:
-
Status
–
A Status object indicating success or failure of the operation.
Source code in kern_comm_lib/base/threads/mutex.py
| def release(self) -> Status:
"""Releases the mutex.
Returns:
A Status object indicating success or failure of the operation.
"""
try:
self._mutex.release()
return Status()
except RuntimeError:
return Status.from_status_code(
StatusCode.FAILED_PRECONDITION,
"Failed precondition: The mutex was not acquired.",
)
|
QtMutex
Bases: IMutex
Lock implementation using PyQt6's QMutex.
Attributes:
-
_mutex
(QMutex
)
–
The underlying PyQt6 mutex.
Methods:
Source code in kern_comm_lib/base/threads/mutex.py
| class QtMutex(IMutex):
"""Lock implementation using PyQt6's QMutex.
Attributes:
_mutex: The underlying PyQt6 mutex.
"""
def __init__(self) -> None:
"""Initializes a new QtMutex instance."""
from PyQt6.QtCore import QMutex
self._mutex: QMutex = QMutex()
def acquire(
self, blocking: bool = True, timeout: float = -1
) -> AStatusOrElse[bool]:
"""Acquires the mutex.
Args:
blocking (default: True): Whether to block until the mutex is acquired.
timeout (default: -1): Maximum time to wait for the mutex in seconds.
Returns:
True if the mutex was acquired, False otherwise, if an error occurred
a Status object is returned.
"""
# <editor-fold desc="Checks">
check.DCHECK_NOT_NONE(blocking)
check.DCHECK_NOT_NONE(timeout)
check.DCHECK_GREATER_THAN(timeout, 0)
# </editor-fold>
try:
if timeout > 0:
return self._mutex.tryLock(int(timeout * 1000))
elif blocking:
try:
self._mutex.lock()
return True
except Exception:
return Status.from_status_code(
StatusCode.UNKNOWN,
"Unknown error occurred while acquiring the mutex while blocking=True.",
)
else:
return self._mutex.tryLock()
except Exception as e:
return Status.from_exception(e)
def release(self) -> Status:
"""Releases the mutex.
Returns:
A Status object indicating success or failure of the operation.
"""
try:
self._mutex.unlock()
return Status()
except RuntimeError:
return Status.from_status_code(
StatusCode.FAILED_PRECONDITION,
"Failed precondition: The mutex was not acquired.",
)
|
__init__
Initializes a new QtMutex instance.
Source code in kern_comm_lib/base/threads/mutex.py
| def __init__(self) -> None:
"""Initializes a new QtMutex instance."""
from PyQt6.QtCore import QMutex
self._mutex: QMutex = QMutex()
|
acquire
acquire(blocking: bool = True, timeout: float = -1) -> AStatusOrElse[bool]
Acquires the mutex.
Parameters:
-
blocking
(default
, default:
True
)
–
True): Whether to block until the mutex is acquired.
-
timeout
(default
, default:
-1
)
–
-1): Maximum time to wait for the mutex in seconds.
Returns:
-
AStatusOrElse[bool]
–
True if the mutex was acquired, False otherwise, if an error occurred
-
AStatusOrElse[bool]
–
a Status object is returned.
Source code in kern_comm_lib/base/threads/mutex.py
| def acquire(
self, blocking: bool = True, timeout: float = -1
) -> AStatusOrElse[bool]:
"""Acquires the mutex.
Args:
blocking (default: True): Whether to block until the mutex is acquired.
timeout (default: -1): Maximum time to wait for the mutex in seconds.
Returns:
True if the mutex was acquired, False otherwise, if an error occurred
a Status object is returned.
"""
# <editor-fold desc="Checks">
check.DCHECK_NOT_NONE(blocking)
check.DCHECK_NOT_NONE(timeout)
check.DCHECK_GREATER_THAN(timeout, 0)
# </editor-fold>
try:
if timeout > 0:
return self._mutex.tryLock(int(timeout * 1000))
elif blocking:
try:
self._mutex.lock()
return True
except Exception:
return Status.from_status_code(
StatusCode.UNKNOWN,
"Unknown error occurred while acquiring the mutex while blocking=True.",
)
else:
return self._mutex.tryLock()
except Exception as e:
return Status.from_exception(e)
|
release
Releases the mutex.
Returns:
-
Status
–
A Status object indicating success or failure of the operation.
Source code in kern_comm_lib/base/threads/mutex.py
| def release(self) -> Status:
"""Releases the mutex.
Returns:
A Status object indicating success or failure of the operation.
"""
try:
self._mutex.unlock()
return Status()
except RuntimeError:
return Status.from_status_code(
StatusCode.FAILED_PRECONDITION,
"Failed precondition: The mutex was not acquired.",
)
|
MutexFactory
Factory class for creating mutex instances based on the selected implementation.
Attributes:
-
_mutex_class
(type[IMutex]
)
–
The class to use for creating mutex instances.
Methods:
Source code in kern_comm_lib/base/threads/mutex.py
| class MutexFactory:
"""Factory class for creating mutex instances based on the selected implementation.
Attributes:
_mutex_class: The class to use for creating mutex instances.
"""
_mutex_class: type[IMutex] = ThreadingMutex
@classmethod
def set_mutex_implementation(cls, lock_class: type[IMutex]) -> None:
"""Sets the mutex implementation to use.
Args:
lock_class: The mutex class to use for creating instances.
"""
cls._mutex_class = lock_class
@classmethod
def create_mutex(cls) -> IMutex:
"""Creates a new mutex instance using the selected implementation.
Returns:
A new mutex instance.
"""
return cls._mutex_class()
|
set_mutex_implementation
classmethod
set_mutex_implementation(lock_class: type[IMutex]) -> None
Sets the mutex implementation to use.
Parameters:
-
lock_class
(type[IMutex]
)
–
The mutex class to use for creating instances.
Source code in kern_comm_lib/base/threads/mutex.py
| @classmethod
def set_mutex_implementation(cls, lock_class: type[IMutex]) -> None:
"""Sets the mutex implementation to use.
Args:
lock_class: The mutex class to use for creating instances.
"""
cls._mutex_class = lock_class
|
create_mutex
classmethod
Creates a new mutex instance using the selected implementation.
Returns:
Source code in kern_comm_lib/base/threads/mutex.py
| @classmethod
def create_mutex(cls) -> IMutex:
"""Creates a new mutex instance using the selected implementation.
Returns:
A new mutex instance.
"""
return cls._mutex_class()
|