Source code for rtcog.utils.shared_memory_manager

from typing import Optional
from multiprocessing.shared_memory import SharedMemory
from rtcog.utils.log import get_logger

log = get_logger()

[docs] class SharedMemoryManager: """ Manager for SharedMemory with open and cleanup logic. This class wraps multiprocessing.shared_memory.SharedMemory to provide automatic cleanup. """ def __init__(self, name: str, create: bool = False, size: Optional[int] = None): """ Initialize the SharedMemoryManager. Parameters ---------- name : str Name of the shared memory segment. create : bool, optional If True, create a new shared memory segment. If False, attach to existing. size : int, optional Size in bytes for creation (required if create=True). """ self.name = name self.shm = None self._owner = create self._size = size if create and size is None: raise ValueError("size is required when create=True") log.debug(f'Initialized SharedMemoryManager for "{self.name}", create={self._owner}, size={self._size}')
[docs] def open(self) -> SharedMemory: """Create/attach to shared memory.""" try: if self._owner: try: self.shm = SharedMemory(create=True, size=self._size, name=self.name) except FileExistsError: # If already exists, clean up and retry log.debug(f"Shared memory '{self.name}' already exists, cleaning up and retrying") old = SharedMemory(name=self.name) old.close() old.unlink() self.shm = SharedMemory(create=True, size=self._size, name=self.name) log.debug(f"Created new shared memory '{self.name}' with size {self._size}") else: self.shm = SharedMemory(name=self.name) except Exception as e: log.error(f"Failed to create/attach shared memory '{self.name}': {e}") raise return self.shm
[docs] def cleanup(self): """Clean up the shared memory segment.""" if self.shm is not None: try: self.shm.close() if self._owner: self.shm.unlink() log.debug(f"Unlinked shared memory '{self.name}'") except Exception as e: log.warning(f"Error during shared memory cleanup for '{self.name}': {e}") finally: self.shm = None