141 lines
4.6 KiB
Python
141 lines
4.6 KiB
Python
# Copyright (c) 2023 - present, Anton Anikin <anton@anikin.xyz>
|
|
# All rights reserved.
|
|
|
|
import numpy as np
|
|
import platform
|
|
import struct
|
|
import sys
|
|
|
|
from typing import Optional
|
|
|
|
is_windows = (platform.system() == "Windows")
|
|
if is_windows:
|
|
import win32file
|
|
import win32pipe
|
|
from time import sleep
|
|
|
|
|
|
class Ipc:
|
|
def __init__(self, name: str, is_server: bool, verbose: int = 0):
|
|
self.verbose = verbose
|
|
|
|
if is_windows:
|
|
full_name = f"\\\\.\\pipe\\{name}"
|
|
|
|
if is_server:
|
|
# https://mhammond.github.io/pywin32/win32pipe__CreateNamedPipe_meth.html
|
|
self.pipe = win32pipe.CreateNamedPipe(
|
|
full_name, # pipeName : PyUnicode
|
|
win32pipe.PIPE_ACCESS_DUPLEX, # openMode : int
|
|
win32pipe.PIPE_TYPE_BYTE, # pipeMode : int
|
|
1, # nMaxInstances : int
|
|
0, # nOutBufferSize : int
|
|
0, # nInBufferSize : int
|
|
0, # nDefaultTimeOut : int
|
|
None # sa : PySECURITY_ATTRIBUTES
|
|
)
|
|
|
|
# https://mhammond.github.io/pywin32/win32pipe__ConnectNamedPipe_meth.html
|
|
win32pipe.ConnectNamedPipe(self.pipe)
|
|
|
|
else:
|
|
time_step = 0.5
|
|
time_current = 0
|
|
while time_current < 10:
|
|
try:
|
|
# https://mhammond.github.io/pywin32/win32file__CreateFile_meth.html
|
|
self.pipe = win32file.CreateFile(
|
|
full_name, # fileName : PyUnicode
|
|
win32file.GENERIC_READ |
|
|
win32file.GENERIC_WRITE, # desiredAccess : int
|
|
0, # shareMode : int
|
|
None, # attributes : PySECURITY_ATTRIBUTES
|
|
win32file.OPEN_EXISTING, # CreationDisposition : int
|
|
win32file.FILE_ATTRIBUTE_NORMAL, # flagsAndAttributes : int
|
|
None # hTemplateFile : PyHANDLE
|
|
)
|
|
break
|
|
|
|
except Exception:
|
|
time_current += time_step
|
|
sleep(time_step)
|
|
|
|
else: # unix-like systems ------------------------------------------------
|
|
if is_server:
|
|
self._reader = open(name + "_c", "rb")
|
|
self._writer = open(name + "_s", "wb")
|
|
else:
|
|
self._writer = open(name + "_c", "wb")
|
|
self._reader = open(name + "_s", "rb")
|
|
|
|
def _write(self, data):
|
|
if is_windows:
|
|
# https://mhammond.github.io/pywin32/win32file__WriteFile_meth.html
|
|
errCode, nBytesWritten = win32file.WriteFile(self.pipe, data)
|
|
|
|
else:
|
|
self._writer.write(data)
|
|
self._writer.flush()
|
|
|
|
def _read(self, len):
|
|
if is_windows:
|
|
try:
|
|
# https://mhammond.github.io/pywin32/win32file__ReadFile_meth.html
|
|
errCode, data = win32file.ReadFile(self.pipe, len)
|
|
except Exception:
|
|
data = None
|
|
|
|
return data
|
|
|
|
else:
|
|
return self._reader.read(len)
|
|
|
|
def write_int(self, value: int):
|
|
data = struct.pack("i", value)
|
|
self._write(data)
|
|
|
|
def write_double(self, value: float):
|
|
data = struct.pack("d", value)
|
|
self._write(data)
|
|
|
|
def write_array(self, array: np.ndarray):
|
|
data = array.tobytes(order="C")
|
|
self._write(data)
|
|
|
|
def read_int(self) -> int:
|
|
data = self._read(4)
|
|
return struct.unpack("i", data)[0]
|
|
|
|
def read_int_opt(self) -> Optional[int]:
|
|
data = self._read(4)
|
|
if data:
|
|
return struct.unpack("i", data)[0]
|
|
else:
|
|
return None
|
|
|
|
def read_double(self) -> float:
|
|
data = self._read(8)
|
|
return struct.unpack("d", data)[0]
|
|
|
|
def read_array(self, size: int, dtype: np.dtype) -> np.ndarray:
|
|
data = self._read(size * dtype(0).itemsize)
|
|
array = np.frombuffer(data, dtype=dtype)
|
|
|
|
# fix for 'ValueError: assignment destination is read-only' on windows OS
|
|
array = np.copy(array)
|
|
|
|
return array
|
|
|
|
def read_str(self) -> str:
|
|
len = self.read_int()
|
|
data = self._read(len)
|
|
return data.decode(sys.getdefaultencoding())
|
|
|
|
def read_str_opt(self) -> Optional[str]:
|
|
len = self.read_int_opt()
|
|
if len:
|
|
data = self._read(len)
|
|
return data.decode(sys.getdefaultencoding())
|
|
else:
|
|
return None
|