主な機能
- 双方向通信
親プロセスと子プロセス間のシームレスな双方向通信を実現し、両方向での効率的なデータ交換を可能にします。
- ノンブロッキングI/O
デッドロックを防止し、プロセス間のデータフローをスムーズにするノンブロッキング入出力操作を実装します。
- プロセス分離
プロセス間通信の便利なインターフェースを提供しながら、適切なプロセス分離を維持し、システムの安定性を向上させます。
- エラー処理
例外とプロセス終了を適切に管理する堅牢なエラー処理メカニズムにより、本番環境での信頼性の高い運用を確保します。
DualPipeの仕組み
DualPipeのプロセスフローと通信メカニズムの視覚的な説明。
初期化プロセス
- 2つのパイプを作成:親から子へ、子から親へ
- プロセスをフォークして子プロセスを作成
- 子プロセスで:標準入出力をパイプにリダイレクト
- 親プロセスで:パイプにノンブロッキングI/Oを設定
- 子プロセスでターゲット関数を実行
通信フロー
- 親プロセスが親から子へのパイプにデータを書き込む
- 子プロセスが標準入力(パイプからリダイレクト)からデータを読み取る
- 子プロセスがデータを処理し、標準出力に書き込む
- 親プロセスが子から親へのパイプから読み取る
- ノンブロッキングI/Oがデッドロックを防止
Dualpipe: 一般的なユースケース
DualPipeが複雑なプロセス間通信の課題を解決できる実用的なアプリケーション。
並列データ処理
CPU集約型のデータ処理タスクを子プロセスにオフロードしながら、親プロセスとのインタラクティブな通信を維持します。
Example:
親プロセスが画像データを送信し、子プロセスが処理結果を返す画像処理パイプライン。
コマンド実行
別のプロセスでコマンドを実行し、その出力をリアルタイムで取得し、前の結果に基づいて追加のコマンドを送信する機能。
Example:
コマンド間で状態を維持する必要があるインタラクティブシェル環境やコマンドラインツール。
サービス通信
コントローラープロセスと双方向に通信する長時間実行サービスプロセスを作成します。
Example:
キューからタスクを処理し、結果をメインアプリケーションに報告するバックグラウンドワーカー。
サンドボックス実行
制御された通信チャネルを持つ別のプロセスで信頼されていないコードを実行します。
Example:
ユーザー提出コードを安全に実行しながら出力をキャプチャする必要があるコード評価システム。
Dualpipe: 他のIPC方法との比較
DualPipeと他のPythonプロセス間通信方法の比較。
Feature | DualPipe | subprocess | multiprocessing | os.popen |
---|---|---|---|---|
Bidirectional Communication | ✅ Built-in | ✅ With PIPE | ✅ With Queue | ❌ One-way only |
Non-blocking I/O | ✅ Built-in | ⚠️ Requires setup | ✅ With Queue | ❌ Blocking |
Process Isolation | ✅ Complete | ✅ Complete | ✅ Complete | ✅ Complete |
API Simplicity | ✅ Simple | ⚠️ Complex | ⚠️ Moderate | ✅ Simple |
Windows Support | ❌ Unix only | ✅ Cross-platform | ✅ Cross-platform | ✅ Cross-platform |
注:比較はプロセス間の双方向通信の使いやすさに焦点を当てています。
DualPipeドキュメント
インストール、使用例、APIリファレンスを含むDualPipeユーティリティの包括的なドキュメント。
# DualPipe A Python utility for efficient bidirectional communication between processes. ## DualPipe Overview DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks. ## DualPipe Key Features - **Bidirectional Communication**: Seamless two-way communication between parent and child processes - **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow - **Process Isolation**: Maintains proper isolation while providing convenient IPC - **Error Handling**: Gracefully manages exceptions and process termination ## DualPipe Installation ```bash # Clone the repository git clone https://github.com/deepseek-ai/dualpipe.git cd dualpipe # Install the package pip install -e . ``` ## DualPipe Usage Example ```python from dualpipe import DualPipe import time def echo_uppercase(): """Simple function that reads lines and echoes them in uppercase""" while True: line = input() if not line: break print(line.upper()) # Create a DualPipe with the echo_uppercase function pipe = DualPipe(echo_uppercase) # Send some data pipe.write("Hello, world!\n") pipe.write("Testing DualPipe\n") # Read the responses time.sleep(0.1) # Give the child process time to respond print("Response 1:", pipe.readline().decode().strip()) print("Response 2:", pipe.readline().decode().strip()) # Clean up pipe.close() ``` ## How It Works DualPipe uses Unix pipes and process forking to create a child process and establish communication channels: 1. Two pipes are created: one for parent-to-child communication and one for child-to-parent 2. The process is forked, creating a child process 3. In the child process, stdin/stdout are redirected to the appropriate pipe ends 4. In the parent process, non-blocking I/O is set up for the pipes 5. The parent can then write to and read from the child process ## API Reference ### DualPipe(target_func, *args, **kwargs) Creates a new DualPipe instance with a target function to run in the child process. #### Methods - **write(data)**: Write data to the child process - **read(size=1024, timeout=None)**: Read data from the child process - **readline(timeout=None)**: Read a line from the child process - **close()**: Close the pipes and wait for the child process to terminate ## Limitations - Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork() - For Windows compatibility, consider using the multiprocessing or subprocess modules ## License MIT
DualPipe実装
各コンポーネントを説明する詳細なコメント付きのDualPipeの完全な実装:
import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union
class DualPipe:
"""
DualPipe: A utility for bidirectional communication with a child process.
This class creates a child process and establishes two-way communication
with it using pipes. It handles non-blocking I/O to prevent deadlocks.
"""
def __init__(self, target_func: Callable, *args, **kwargs):
"""
Initialize a DualPipe with a target function to run in the child process.
Args:
target_func: The function to execute in the child process
*args, **kwargs: Arguments to pass to the target function
"""
# Create pipes for parent-to-child and child-to-parent communication
parent_to_child_r, parent_to_child_w = os.pipe()
child_to_parent_r, child_to_parent_w = os.pipe()
# Fork the process
pid = os.fork()
if pid == 0: # Child process
# Close unused pipe ends
os.close(parent_to_child_w)
os.close(child_to_parent_r)
# Redirect stdin/stdout to the pipes
os.dup2(parent_to_child_r, sys.stdin.fileno())
os.close(parent_to_child_r)
os.dup2(child_to_parent_w, sys.stdout.fileno())
os.close(child_to_parent_w)
# Execute the target function
try:
target_func(*args, **kwargs)
except Exception as e:
print(f"Error in child process: {e}", file=sys.stderr)
finally:
# Ensure clean exit
os._exit(0)
else: # Parent process
# Close unused pipe ends
os.close(parent_to_child_r)
os.close(child_to_parent_w)
# Set non-blocking mode for the pipes
for fd in (parent_to_child_w, child_to_parent_r):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Store the pipe file descriptors and child process ID
self.parent_to_child_w = parent_to_child_w
self.child_to_parent_r = child_to_parent_r
self.child_pid = pid
self.buffer = b""
def write(self, data: Union[str, bytes]) -> int:
"""
Write data to the child process.
Args:
data: The data to write (string or bytes)
Returns:
Number of bytes written
"""
if isinstance(data, str):
data = data.encode('utf-8')
try:
return os.write(self.parent_to_child_w, data)
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, try again later
return 0
raise
def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
"""
Read data from the child process.
Args:
size: Maximum number of bytes to read
timeout: Maximum time to wait for data (None = non-blocking)
Returns:
Bytes read from the child process
"""
if timeout is not None:
# Wait for data to be available
r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
if not r:
return b"" # Timeout occurred
try:
data = os.read(self.child_to_parent_r, size)
self.buffer += data
return data
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, no data available
return b""
raise
def readline(self, timeout: Optional[float] = None) -> bytes:
"""
Read a line from the child process.
Args:
timeout: Maximum time to wait for a complete line
Returns:
A line of data (including newline character)
"""
start_time = time.time()
while b'\n' not in self.buffer:
if timeout is not None:
elapsed = time.time() - start_time
if elapsed >= timeout:
break
remaining = timeout - elapsed
else:
remaining = None
data = self.read(1024, remaining)
if not data:
break
# Extract a line from the buffer if available
if b'\n' in self.buffer:
line, self.buffer = self.buffer.split(b'\n', 1)
return line + b'\n'
# Return partial line if no newline found
result = self.buffer
self.buffer = b""
return result
def close(self) -> Tuple[int, int]:
"""
Close the pipes and wait for the child process to terminate.
Returns:
Tuple of (pid, status) from os.waitpid
"""
os.close(self.parent_to_child_w)
os.close(self.child_to_parent_r)
return os.waitpid(self.child_pid, 0)
# Example usage
if __name__ == "__main__":
def echo_uppercase():
"""Simple function that reads lines and echoes them in uppercase"""
while True:
line = sys.stdin.readline()
if not line:
break
sys.stdout.write(line.upper())
sys.stdout.flush()
# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)
# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")
# Read the responses
time.sleep(0.1) # Give the child process time to respond
print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())
# Clean up
pipe.close()
この実装はos.fork()を使用しており、Unix系システム(Linux、macOSなど)でのみ利用可能です。
よくある質問 - Dualpipe
- DualPipeは何に使用されますか?
- DualPipeはPythonで親プロセスと子プロセス間の双方向通信を確立するために使用されます。特に、サブプロセスにコマンドを送信し、結果を受け取る必要がある並列処理タスクで、プロセス分離を維持しながら役立ちます。
- DualPipeはどのようにデッドロックを防止しますか?
- DualPipeはパイプファイル記述子にO_NONBLOCKフラグを設定することでノンブロッキングI/O操作を使用します。これにより、読み書き操作が無期限にブロックされないようにし、両方のプロセスが互いに待機する可能性のあるデッドロックを防止します。
- DualPipeはWindowsで使用できますか?
- ここで示されている実装はos.fork()を使用しており、Unix系システム(Linux、macOSなど)でのみ利用可能です。Windows互換性のためには、multiprocessingモジュールまたはsubprocessモジュールを使用するように実装を変更する必要があります。
- DualPipeとPythonのsubprocessモジュールの違いは何ですか?
- Pythonのsubprocessモジュールも子プロセスとの通信を可能にしますが、DualPipeはノンブロッキングI/Oによる双方向通信のために特別に設計されたよりシンプルなインターフェースを提供します。プロセス間で継続的にデータを交換する必要がある場合に特に役立ちます。