# Copyright (c) 2023 - present, Anton Anikin # All rights reserved. import numpy as np import platform import struct import sys from typing import Optional is_windows = (platform.system() == "Windows") if is_windows: import win32file import win32pipe from time import sleep class Ipc: def __init__(self, name: str, is_server: bool, verbose: int = 0): self.verbose = verbose if is_windows: full_name = f"\\\\.\\pipe\\{name}" if is_server: # https://mhammond.github.io/pywin32/win32pipe__CreateNamedPipe_meth.html self.pipe = win32pipe.CreateNamedPipe( full_name, # pipeName : PyUnicode win32pipe.PIPE_ACCESS_DUPLEX, # openMode : int win32pipe.PIPE_TYPE_BYTE, # pipeMode : int 1, # nMaxInstances : int 0, # nOutBufferSize : int 0, # nInBufferSize : int 0, # nDefaultTimeOut : int None # sa : PySECURITY_ATTRIBUTES ) # https://mhammond.github.io/pywin32/win32pipe__ConnectNamedPipe_meth.html win32pipe.ConnectNamedPipe(self.pipe) else: time_step = 0.5 time_current = 0 while time_current < 10: try: # https://mhammond.github.io/pywin32/win32file__CreateFile_meth.html self.pipe = win32file.CreateFile( full_name, # fileName : PyUnicode win32file.GENERIC_READ | win32file.GENERIC_WRITE, # desiredAccess : int 0, # shareMode : int None, # attributes : PySECURITY_ATTRIBUTES win32file.OPEN_EXISTING, # CreationDisposition : int win32file.FILE_ATTRIBUTE_NORMAL, # flagsAndAttributes : int None # hTemplateFile : PyHANDLE ) break except Exception: time_current += time_step sleep(time_step) else: # unix-like systems ------------------------------------------------ if is_server: self._reader = open(name + "_c", "rb") self._writer = open(name + "_s", "wb") else: self._writer = open(name + "_c", "wb") self._reader = open(name + "_s", "rb") def _write(self, data): if is_windows: # https://mhammond.github.io/pywin32/win32file__WriteFile_meth.html errCode, nBytesWritten = win32file.WriteFile(self.pipe, data) else: self._writer.write(data) self._writer.flush() def _read(self, len): if is_windows: try: # https://mhammond.github.io/pywin32/win32file__ReadFile_meth.html errCode, data = win32file.ReadFile(self.pipe, len) except Exception: data = None return data else: return self._reader.read(len) def write_int(self, value: int): data = struct.pack("i", value) self._write(data) def write_double(self, value: float): data = struct.pack("d", value) self._write(data) def write_array(self, array: np.ndarray): data = array.tobytes(order="C") self._write(data) def read_int(self) -> int: data = self._read(4) return struct.unpack("i", data)[0] def read_int_opt(self) -> Optional[int]: data = self._read(4) if data: return struct.unpack("i", data)[0] else: return None def read_double(self) -> float: data = self._read(8) return struct.unpack("d", data)[0] def read_array(self, size: int, dtype: np.dtype) -> np.ndarray: data = self._read(size * dtype(0).itemsize) array = np.frombuffer(data, dtype=dtype) # fix for 'ValueError: assignment destination is read-only' on windows OS array = np.copy(array) return array def read_str(self) -> str: len = self.read_int() data = self._read(len) return data.decode(sys.getdefaultencoding()) def read_str_opt(self) -> Optional[str]: len = self.read_int_opt() if len: data = self._read(len) return data.decode(sys.getdefaultencoding()) else: return None