kopia lustrzana https://github.com/corrscope/corrscope
shmems
rodzic
43dc6f25d0
commit
6b776d12f7
|
@ -6,10 +6,11 @@ from concurrent.futures import ProcessPoolExecutor, Future
|
|||
from contextlib import ExitStack, contextmanager
|
||||
from enum import unique
|
||||
from fractions import Fraction
|
||||
from multiprocessing.shared_memory import SharedMemory
|
||||
from pathlib import Path
|
||||
from queue import Queue, Empty
|
||||
from threading import Thread
|
||||
from typing import Iterator, Optional, List, Callable, Tuple
|
||||
from typing import Iterator, Optional, List, Callable, Tuple, Dict, Union
|
||||
|
||||
import attr
|
||||
|
||||
|
@ -64,6 +65,7 @@ def named_threads():
|
|||
|
||||
named_threads()
|
||||
|
||||
|
||||
# Placing Enum before any other superclass results in errors.
|
||||
# Placing DumpEnumAsStr before IntEnum or (int, Enum) results in errors on Python 3.6:
|
||||
# - TypeError: object.__new__(BenchmarkMode) is not safe, use int.__new__()
|
||||
|
@ -389,22 +391,61 @@ class CorrScope:
|
|||
|
||||
# Multiprocess
|
||||
def play_parallel():
|
||||
ncores = len(os.sched_getaffinity(0))
|
||||
framebuffer_nbyte = len(renderer.get_frame())
|
||||
|
||||
# determine thread count
|
||||
def _nthread():
|
||||
import psutil
|
||||
|
||||
ncores = len(os.sched_getaffinity(0))
|
||||
stats = psutil.virtual_memory() # returns a named tuple
|
||||
nbyte_available = getattr(stats, "available")
|
||||
|
||||
# Don't create more threads than can take up 1/3 of remaining free
|
||||
# memory in shmem framebuffers. (matplotlib will take up as much
|
||||
# memory internally, and ffmpeg/other apps will take up more still.)
|
||||
max_safe_threads = nbyte_available // framebuffer_nbyte // 3
|
||||
return min(ncores, max_safe_threads)
|
||||
|
||||
nthread = _nthread()
|
||||
|
||||
# setup threading
|
||||
abort_from_thread = threading.Event()
|
||||
# self.arg.is_aborted() from GUI, abort_from_thread.is_set() from thread
|
||||
is_aborted = lambda: self.arg.is_aborted() or abort_from_thread.is_set()
|
||||
|
||||
@attr.dataclass
|
||||
class RenderToOutput:
|
||||
frame_idx: int
|
||||
shmem: SharedMemory
|
||||
completion: "Future[None]"
|
||||
|
||||
# Same size as ProcessPoolExecutor, so threads won't starve if they all
|
||||
# finish a job at the same time.
|
||||
render_to_output: "Queue[Tuple[int, Future[ByteBuffer]] | None]" = Queue(
|
||||
ncores
|
||||
)
|
||||
render_to_output: "Queue[RenderToOutput | None]" = Queue(nthread)
|
||||
|
||||
def worker_create_renderer(renderer: RendererFrontend):
|
||||
# Release all shmems after finishing rendering.
|
||||
all_shmems: List[SharedMemory] = [
|
||||
SharedMemory(create=True, size=framebuffer_nbyte)
|
||||
for _ in range(nthread)
|
||||
]
|
||||
|
||||
# Only send unused shmems to a worker process, and wait for it to be
|
||||
# returned before reusing.
|
||||
avail_shmems: "Queue[SharedMemory]" = Queue()
|
||||
for shmem in all_shmems:
|
||||
avail_shmems.put(shmem)
|
||||
|
||||
def worker_create_renderer(
|
||||
renderer: RendererFrontend, shmem_names: List[str]
|
||||
):
|
||||
global WORKER_RENDERER
|
||||
global SHMEMS
|
||||
# TODO del self.renderer and recreate Renderer if it can't be pickled?
|
||||
WORKER_RENDERER = renderer
|
||||
SHMEMS = {
|
||||
name: SharedMemory(name) for name in shmem_names
|
||||
} # type: Dict[str, SharedMemory]
|
||||
|
||||
# TODO https://stackoverflow.com/questions/2829329/catch-a-threads-exception-in-the-caller-thread
|
||||
def render_thread():
|
||||
|
@ -455,12 +496,20 @@ class CorrScope:
|
|||
if not should_render:
|
||||
continue
|
||||
|
||||
# blocks until frames get rendered and shmem is returned by
|
||||
# output_thread().
|
||||
shmem = avail_shmems.get()
|
||||
|
||||
# blocking
|
||||
render_to_output.put(
|
||||
(
|
||||
RenderToOutput(
|
||||
frame,
|
||||
shmem,
|
||||
pool.submit(
|
||||
worker_render_frame, render_inputs, trigger_samples
|
||||
worker_render_frame,
|
||||
render_inputs,
|
||||
trigger_samples,
|
||||
shmem.name,
|
||||
),
|
||||
)
|
||||
)
|
||||
|
@ -471,13 +520,17 @@ class CorrScope:
|
|||
global worker_render_frame # hack to allow pickling function
|
||||
|
||||
def worker_render_frame(
|
||||
render_inputs: List[RenderInput], trigger_samples: List[int]
|
||||
) -> ByteBuffer:
|
||||
global WORKER_RENDERER
|
||||
render_inputs: List[RenderInput],
|
||||
trigger_samples: List[int],
|
||||
shmem_name: str,
|
||||
):
|
||||
global WORKER_RENDERER, SHMEMS
|
||||
renderer = WORKER_RENDERER
|
||||
renderer.update_main_lines(render_inputs, trigger_samples)
|
||||
frame_data = renderer.get_frame()
|
||||
return bytes(frame_data)
|
||||
|
||||
shmem = SHMEMS[shmem_name]
|
||||
shmem.buf[:] = frame_data
|
||||
|
||||
def output_thread():
|
||||
while True:
|
||||
|
@ -486,11 +539,15 @@ class CorrScope:
|
|||
output.terminate()
|
||||
break
|
||||
|
||||
msg = render_to_output.get() # blocking
|
||||
if msg is None:
|
||||
render_msg: Union[
|
||||
RenderToOutput, None
|
||||
] = render_to_output.get() # blocking
|
||||
if render_msg is None:
|
||||
break
|
||||
frame, render_future = msg
|
||||
frame_data: ByteBuffer = render_future.result()
|
||||
|
||||
# Wait for shmem to be filled with data.
|
||||
render_msg.completion.result()
|
||||
frame_data = render_msg.shmem.buf
|
||||
|
||||
if not_benchmarking or benchmark_mode == BenchmarkMode.OUTPUT:
|
||||
# Output frame
|
||||
|
@ -500,9 +557,11 @@ class CorrScope:
|
|||
break
|
||||
if is_aborted():
|
||||
# Outputting frame happens after most computation finished.
|
||||
thread_shared.end_frame = frame + 1
|
||||
thread_shared.end_frame = render_msg.frame_idx + 1
|
||||
break
|
||||
|
||||
avail_shmems.put(render_msg.shmem)
|
||||
|
||||
if is_aborted():
|
||||
# If is_aborted() is True but render_thread() is blocked on
|
||||
# render_to_output.put(), then we need to clear the queue so
|
||||
|
@ -510,14 +569,21 @@ class CorrScope:
|
|||
# = True and terminate.
|
||||
while True:
|
||||
try:
|
||||
render_to_output.get(block=False)
|
||||
render_msg = render_to_output.get(block=False)
|
||||
if render_msg is None:
|
||||
continue # probably empty?
|
||||
|
||||
avail_shmems.put(render_msg.shmem)
|
||||
except Empty:
|
||||
break
|
||||
|
||||
print("exit output")
|
||||
|
||||
shmem_names: List[str] = [shmem.name for shmem in all_shmems]
|
||||
with ProcessPoolExecutor(
|
||||
ncores, initializer=worker_create_renderer, initargs=(renderer,)
|
||||
nthread,
|
||||
initializer=worker_create_renderer,
|
||||
initargs=(renderer, shmem_names),
|
||||
) as pool:
|
||||
render_handle = Thread(target=render_thread, name="render_thread")
|
||||
output_handle = Thread(target=output_thread, name="output_thread")
|
||||
|
@ -528,6 +594,16 @@ class CorrScope:
|
|||
render_handle.join()
|
||||
output_handle.join()
|
||||
|
||||
# TODO is it a problem that ProcessPoolExecutor's
|
||||
# worker_create_renderer() creates SharedMemory handles, which are
|
||||
# never closed when the process terminates?
|
||||
#
|
||||
# Do we have to construct a new SharedMemory on every
|
||||
# worker_render_frame()? Does this thrash the page tables?
|
||||
|
||||
for shmem in all_shmems:
|
||||
shmem.unlink()
|
||||
|
||||
with self._load_outputs():
|
||||
if self.arg.parallel:
|
||||
play_parallel()
|
||||
|
|
|
@ -843,6 +843,34 @@ files = [
|
|||
dev = ["pre-commit", "tox"]
|
||||
testing = ["pytest", "pytest-benchmark"]
|
||||
|
||||
[[package]]
|
||||
name = "psutil"
|
||||
version = "5.9.6"
|
||||
description = "Cross-platform lib for process and system monitoring in Python."
|
||||
optional = false
|
||||
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
|
||||
files = [
|
||||
{file = "psutil-5.9.6-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:fb8a697f11b0f5994550555fcfe3e69799e5b060c8ecf9e2f75c69302cc35c0d"},
|
||||
{file = "psutil-5.9.6-cp27-cp27m-manylinux2010_i686.whl", hash = "sha256:91ecd2d9c00db9817a4b4192107cf6954addb5d9d67a969a4f436dbc9200f88c"},
|
||||
{file = "psutil-5.9.6-cp27-cp27m-manylinux2010_x86_64.whl", hash = "sha256:10e8c17b4f898d64b121149afb136c53ea8b68c7531155147867b7b1ac9e7e28"},
|
||||
{file = "psutil-5.9.6-cp27-cp27mu-manylinux2010_i686.whl", hash = "sha256:18cd22c5db486f33998f37e2bb054cc62fd06646995285e02a51b1e08da97017"},
|
||||
{file = "psutil-5.9.6-cp27-cp27mu-manylinux2010_x86_64.whl", hash = "sha256:ca2780f5e038379e520281e4c032dddd086906ddff9ef0d1b9dcf00710e5071c"},
|
||||
{file = "psutil-5.9.6-cp27-none-win32.whl", hash = "sha256:70cb3beb98bc3fd5ac9ac617a327af7e7f826373ee64c80efd4eb2856e5051e9"},
|
||||
{file = "psutil-5.9.6-cp27-none-win_amd64.whl", hash = "sha256:51dc3d54607c73148f63732c727856f5febec1c7c336f8f41fcbd6315cce76ac"},
|
||||
{file = "psutil-5.9.6-cp36-abi3-macosx_10_9_x86_64.whl", hash = "sha256:c69596f9fc2f8acd574a12d5f8b7b1ba3765a641ea5d60fb4736bf3c08a8214a"},
|
||||
{file = "psutil-5.9.6-cp36-abi3-manylinux_2_12_i686.manylinux2010_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:92e0cc43c524834af53e9d3369245e6cc3b130e78e26100d1f63cdb0abeb3d3c"},
|
||||
{file = "psutil-5.9.6-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:748c9dd2583ed86347ed65d0035f45fa8c851e8d90354c122ab72319b5f366f4"},
|
||||
{file = "psutil-5.9.6-cp36-cp36m-win32.whl", hash = "sha256:3ebf2158c16cc69db777e3c7decb3c0f43a7af94a60d72e87b2823aebac3d602"},
|
||||
{file = "psutil-5.9.6-cp36-cp36m-win_amd64.whl", hash = "sha256:ff18b8d1a784b810df0b0fff3bcb50ab941c3b8e2c8de5726f9c71c601c611aa"},
|
||||
{file = "psutil-5.9.6-cp37-abi3-win32.whl", hash = "sha256:a6f01f03bf1843280f4ad16f4bde26b817847b4c1a0db59bf6419807bc5ce05c"},
|
||||
{file = "psutil-5.9.6-cp37-abi3-win_amd64.whl", hash = "sha256:6e5fb8dc711a514da83098bc5234264e551ad980cec5f85dabf4d38ed6f15e9a"},
|
||||
{file = "psutil-5.9.6-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:daecbcbd29b289aac14ece28eca6a3e60aa361754cf6da3dfb20d4d32b6c7f57"},
|
||||
{file = "psutil-5.9.6.tar.gz", hash = "sha256:e4b92ddcd7dd4cdd3f900180ea1e104932c7bce234fb88976e2a3b296441225a"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"]
|
||||
|
||||
[[package]]
|
||||
name = "pyinstaller"
|
||||
version = "4.10"
|
||||
|
@ -1332,4 +1360,4 @@ qt6 = ["PyQt6"]
|
|||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.8"
|
||||
content-hash = "4921d9fe6844483b707d820b80ad69afd347b79f42efc1a92607f6cf0a0e703d"
|
||||
content-hash = "cfc25dce73509c95474a7522c2b9cae11b26e0bb8b5b1eb2669ea45f4d7df3db"
|
||||
|
|
|
@ -32,6 +32,7 @@ QtPy = "^2.0.1"
|
|||
PyQt5 = {version = "^5.15", optional = true}
|
||||
PyQt6 = {version = "^6.2", optional = true}
|
||||
appnope = "^0.1.3"
|
||||
psutil = "^5.9.6"
|
||||
|
||||
[tool.poetry.extras]
|
||||
qt5 = ["PyQt5"]
|
||||
|
|
Ładowanie…
Reference in New Issue