Skip to content

Mutex


File: threads/mutex.py

This file provides a framework-agnostic implementation of mutexes, allowing the use of different locking mechanisms depending on the threading framework (e.g., standard Python threading or PyQt6).

Classes:

  • IMutex

    Abstract base class for mutex implementations.

  • ThreadingMutex

    Lock implementation using Python's threading.RLock.

  • QtMutex

    Lock implementation using PyQt6's QMutex.

  • MutexFactory

    Factory class for creating mutex instances based on the selected implementation.

IMutex

Bases: ABC

Abstract base class for mutex implementations.

Methods:

  • acquire

    Acquires the mutex.

  • release

    Releases the mutex.

  • __enter__

    Context manager entry method. Acquires the mutex.

  • __exit__

    Context manager exit method. Releases the mutex.

Source code in kern_comm_lib/base/threads/mutex.py
class IMutex(ABC):
  """Abstract base class for mutex implementations."""

  @abstractmethod
  def acquire(
      self, blocking: bool = True, timeout: float = -1
  ) -> AStatusOrElse[bool]:
    """Acquires the mutex.

    Args:
      blocking (default: True): Whether to block until the mutex is acquired.
      timeout (default: -1): Maximum time to wait for the mutex in seconds.

    Returns:
      True if the mutex was acquired, False otherwise, if an error occurred
      a Status object is returned.
    """
    pass

  @abstractmethod
  def release(self) -> Status:
    """Releases the mutex."""
    pass

  def __enter__(self):
    """Context manager entry method. Acquires the mutex."""
    self.acquire()
    return self

  def __exit__(self, exc_type, exc_val, exc_tb):
    """Context manager exit method. Releases the mutex."""
    self.release()

acquire abstractmethod

acquire(blocking: bool = True, timeout: float = -1) -> AStatusOrElse[bool]

Acquires the mutex.

Parameters:

  • blocking (default, default: True ) –

    True): Whether to block until the mutex is acquired.

  • timeout (default, default: -1 ) –

    -1): Maximum time to wait for the mutex in seconds.

Returns:

  • AStatusOrElse[bool]

    True if the mutex was acquired, False otherwise, if an error occurred

  • AStatusOrElse[bool]

    a Status object is returned.

Source code in kern_comm_lib/base/threads/mutex.py
@abstractmethod
def acquire(
    self, blocking: bool = True, timeout: float = -1
) -> AStatusOrElse[bool]:
  """Acquires the mutex.

  Args:
    blocking (default: True): Whether to block until the mutex is acquired.
    timeout (default: -1): Maximum time to wait for the mutex in seconds.

  Returns:
    True if the mutex was acquired, False otherwise, if an error occurred
    a Status object is returned.
  """
  pass

release abstractmethod

release() -> Status

Releases the mutex.

Source code in kern_comm_lib/base/threads/mutex.py
@abstractmethod
def release(self) -> Status:
  """Releases the mutex."""
  pass

__enter__

__enter__()

Context manager entry method. Acquires the mutex.

Source code in kern_comm_lib/base/threads/mutex.py
def __enter__(self):
  """Context manager entry method. Acquires the mutex."""
  self.acquire()
  return self

__exit__

__exit__(exc_type, exc_val, exc_tb)

Context manager exit method. Releases the mutex.

Source code in kern_comm_lib/base/threads/mutex.py
def __exit__(self, exc_type, exc_val, exc_tb):
  """Context manager exit method. Releases the mutex."""
  self.release()

ThreadingMutex

Bases: IMutex

Lock implementation using Python's threading.RLock.

Attributes:

  • _mutex (RLock) –

    The underlying reentrant lock.

Methods:

  • __init__

    Initializes a new ThreadingMutex instance.

  • acquire

    Acquires the mutex.

  • release

    Releases the mutex.

Source code in kern_comm_lib/base/threads/mutex.py
class ThreadingMutex(IMutex):
  """Lock implementation using Python's threading.RLock.

  Attributes:
    _mutex: The underlying reentrant lock.
  """

  def __init__(self) -> None:
    """Initializes a new ThreadingMutex instance."""
    self._mutex: threading.RLock = threading.RLock()

  def acquire(
      self, blocking: bool = True, timeout: float = -1
  ) -> AStatusOrElse[bool]:
    """Acquires the mutex.

    Args:
      blocking (default: True): Whether to block until the mutex is acquired.
      timeout (default: -1): Maximum time to wait for the mutex in seconds.

    Returns:
      True if the mutex was acquired, False otherwise, if an error occurred
      a Status object is returned.
    """
    # <editor-fold desc="Checks">
    check.DCHECK_NOT_NONE(blocking)
    check.DCHECK_NOT_NONE(timeout)
    check.DCHECK_GREATER_THAN(timeout, 0)
    # </editor-fold>
    try:
      return self._mutex.acquire(blocking=blocking, timeout=timeout)
    except Exception as e:
      return Status.from_exception(e)

  def release(self) -> Status:
    """Releases the mutex.

    Returns:
      A Status object indicating success or failure of the operation.
    """
    try:
      self._mutex.release()
      return Status()
    except RuntimeError:
      return Status.from_status_code(
          StatusCode.FAILED_PRECONDITION,
          "Failed precondition: The mutex was not acquired.",
      )

__init__

__init__() -> None

Initializes a new ThreadingMutex instance.

Source code in kern_comm_lib/base/threads/mutex.py
def __init__(self) -> None:
  """Initializes a new ThreadingMutex instance."""
  self._mutex: threading.RLock = threading.RLock()

acquire

acquire(blocking: bool = True, timeout: float = -1) -> AStatusOrElse[bool]

Acquires the mutex.

Parameters:

  • blocking (default, default: True ) –

    True): Whether to block until the mutex is acquired.

  • timeout (default, default: -1 ) –

    -1): Maximum time to wait for the mutex in seconds.

Returns:

  • AStatusOrElse[bool]

    True if the mutex was acquired, False otherwise, if an error occurred

  • AStatusOrElse[bool]

    a Status object is returned.

Source code in kern_comm_lib/base/threads/mutex.py
def acquire(
    self, blocking: bool = True, timeout: float = -1
) -> AStatusOrElse[bool]:
  """Acquires the mutex.

  Args:
    blocking (default: True): Whether to block until the mutex is acquired.
    timeout (default: -1): Maximum time to wait for the mutex in seconds.

  Returns:
    True if the mutex was acquired, False otherwise, if an error occurred
    a Status object is returned.
  """
  # <editor-fold desc="Checks">
  check.DCHECK_NOT_NONE(blocking)
  check.DCHECK_NOT_NONE(timeout)
  check.DCHECK_GREATER_THAN(timeout, 0)
  # </editor-fold>
  try:
    return self._mutex.acquire(blocking=blocking, timeout=timeout)
  except Exception as e:
    return Status.from_exception(e)

release

release() -> Status

Releases the mutex.

Returns:

  • Status

    A Status object indicating success or failure of the operation.

Source code in kern_comm_lib/base/threads/mutex.py
def release(self) -> Status:
  """Releases the mutex.

  Returns:
    A Status object indicating success or failure of the operation.
  """
  try:
    self._mutex.release()
    return Status()
  except RuntimeError:
    return Status.from_status_code(
        StatusCode.FAILED_PRECONDITION,
        "Failed precondition: The mutex was not acquired.",
    )

QtMutex

Bases: IMutex

Lock implementation using PyQt6's QMutex.

Attributes:

  • _mutex (QMutex) –

    The underlying PyQt6 mutex.

Methods:

  • __init__

    Initializes a new QtMutex instance.

  • acquire

    Acquires the mutex.

  • release

    Releases the mutex.

Source code in kern_comm_lib/base/threads/mutex.py
class QtMutex(IMutex):
  """Lock implementation using PyQt6's QMutex.

  Attributes:
    _mutex: The underlying PyQt6 mutex.
  """

  def __init__(self) -> None:
    """Initializes a new QtMutex instance."""
    from PyQt6.QtCore import QMutex

    self._mutex: QMutex = QMutex()

  def acquire(
      self, blocking: bool = True, timeout: float = -1
  ) -> AStatusOrElse[bool]:
    """Acquires the mutex.

    Args:
      blocking (default: True): Whether to block until the mutex is acquired.
      timeout (default: -1): Maximum time to wait for the mutex in seconds.

    Returns:
      True if the mutex was acquired, False otherwise, if an error occurred
      a Status object is returned.
    """
    # <editor-fold desc="Checks">
    check.DCHECK_NOT_NONE(blocking)
    check.DCHECK_NOT_NONE(timeout)
    check.DCHECK_GREATER_THAN(timeout, 0)
    # </editor-fold>
    try:
      if timeout > 0:
        return self._mutex.tryLock(int(timeout * 1000))
      elif blocking:
        try:
          self._mutex.lock()
          return True
        except Exception:
          return Status.from_status_code(
              StatusCode.UNKNOWN,
              "Unknown error occurred while acquiring the mutex while blocking=True.",
          )
      else:
        return self._mutex.tryLock()
    except Exception as e:
      return Status.from_exception(e)

  def release(self) -> Status:
    """Releases the mutex.

    Returns:
      A Status object indicating success or failure of the operation.
    """
    try:
      self._mutex.unlock()
      return Status()
    except RuntimeError:
      return Status.from_status_code(
          StatusCode.FAILED_PRECONDITION,
          "Failed precondition: The mutex was not acquired.",
      )

__init__

__init__() -> None

Initializes a new QtMutex instance.

Source code in kern_comm_lib/base/threads/mutex.py
def __init__(self) -> None:
  """Initializes a new QtMutex instance."""
  from PyQt6.QtCore import QMutex

  self._mutex: QMutex = QMutex()

acquire

acquire(blocking: bool = True, timeout: float = -1) -> AStatusOrElse[bool]

Acquires the mutex.

Parameters:

  • blocking (default, default: True ) –

    True): Whether to block until the mutex is acquired.

  • timeout (default, default: -1 ) –

    -1): Maximum time to wait for the mutex in seconds.

Returns:

  • AStatusOrElse[bool]

    True if the mutex was acquired, False otherwise, if an error occurred

  • AStatusOrElse[bool]

    a Status object is returned.

Source code in kern_comm_lib/base/threads/mutex.py
def acquire(
    self, blocking: bool = True, timeout: float = -1
) -> AStatusOrElse[bool]:
  """Acquires the mutex.

  Args:
    blocking (default: True): Whether to block until the mutex is acquired.
    timeout (default: -1): Maximum time to wait for the mutex in seconds.

  Returns:
    True if the mutex was acquired, False otherwise, if an error occurred
    a Status object is returned.
  """
  # <editor-fold desc="Checks">
  check.DCHECK_NOT_NONE(blocking)
  check.DCHECK_NOT_NONE(timeout)
  check.DCHECK_GREATER_THAN(timeout, 0)
  # </editor-fold>
  try:
    if timeout > 0:
      return self._mutex.tryLock(int(timeout * 1000))
    elif blocking:
      try:
        self._mutex.lock()
        return True
      except Exception:
        return Status.from_status_code(
            StatusCode.UNKNOWN,
            "Unknown error occurred while acquiring the mutex while blocking=True.",
        )
    else:
      return self._mutex.tryLock()
  except Exception as e:
    return Status.from_exception(e)

release

release() -> Status

Releases the mutex.

Returns:

  • Status

    A Status object indicating success or failure of the operation.

Source code in kern_comm_lib/base/threads/mutex.py
def release(self) -> Status:
  """Releases the mutex.

  Returns:
    A Status object indicating success or failure of the operation.
  """
  try:
    self._mutex.unlock()
    return Status()
  except RuntimeError:
    return Status.from_status_code(
        StatusCode.FAILED_PRECONDITION,
        "Failed precondition: The mutex was not acquired.",
    )

MutexFactory

Factory class for creating mutex instances based on the selected implementation.

Attributes:

  • _mutex_class (type[IMutex]) –

    The class to use for creating mutex instances.

Methods:

Source code in kern_comm_lib/base/threads/mutex.py
class MutexFactory:
  """Factory class for creating mutex instances based on the selected implementation.

  Attributes:
    _mutex_class: The class to use for creating mutex instances.
  """

  _mutex_class: type[IMutex] = ThreadingMutex

  @classmethod
  def set_mutex_implementation(cls, lock_class: type[IMutex]) -> None:
    """Sets the mutex implementation to use.

    Args:
      lock_class: The mutex class to use for creating instances.
    """
    cls._mutex_class = lock_class

  @classmethod
  def create_mutex(cls) -> IMutex:
    """Creates a new mutex instance using the selected implementation.

    Returns:
      A new mutex instance.
    """
    return cls._mutex_class()

set_mutex_implementation classmethod

set_mutex_implementation(lock_class: type[IMutex]) -> None

Sets the mutex implementation to use.

Parameters:

  • lock_class (type[IMutex]) –

    The mutex class to use for creating instances.

Source code in kern_comm_lib/base/threads/mutex.py
@classmethod
def set_mutex_implementation(cls, lock_class: type[IMutex]) -> None:
  """Sets the mutex implementation to use.

  Args:
    lock_class: The mutex class to use for creating instances.
  """
  cls._mutex_class = lock_class

create_mutex classmethod

create_mutex() -> IMutex

Creates a new mutex instance using the selected implementation.

Returns:

  • IMutex

    A new mutex instance.

Source code in kern_comm_lib/base/threads/mutex.py
@classmethod
def create_mutex(cls) -> IMutex:
  """Creates a new mutex instance using the selected implementation.

  Returns:
    A new mutex instance.
  """
  return cls._mutex_class()