DeepSeek DualPipe

プロセス間の効率的な双方向通信を実現するPythonユーティリティで、最小限のオーバーヘッドで並列データ処理を可能にします。

DualPipe Architecture Visualization

主な機能

双方向通信

親プロセスと子プロセス間のシームレスな双方向通信を実現し、両方向での効率的なデータ交換を可能にします。

ノンブロッキングI/O

デッドロックを防止し、プロセス間のデータフローをスムーズにするノンブロッキング入出力操作を実装します。

プロセス分離

プロセス間通信の便利なインターフェースを提供しながら、適切なプロセス分離を維持し、システムの安定性を向上させます。

エラー処理

例外とプロセス終了を適切に管理する堅牢なエラー処理メカニズムにより、本番環境での信頼性の高い運用を確保します。

DualPipeの仕組み

DualPipeのプロセスフローと通信メカニズムの視覚的な説明。

DualPipe Workflow Diagram

初期化プロセス

  1. 2つのパイプを作成:親から子へ、子から親へ
  2. プロセスをフォークして子プロセスを作成
  3. 子プロセスで:標準入出力をパイプにリダイレクト
  4. 親プロセスで:パイプにノンブロッキングI/Oを設定
  5. 子プロセスでターゲット関数を実行

通信フロー

  1. 親プロセスが親から子へのパイプにデータを書き込む
  2. 子プロセスが標準入力(パイプからリダイレクト)からデータを読み取る
  3. 子プロセスがデータを処理し、標準出力に書き込む
  4. 親プロセスが子から親へのパイプから読み取る
  5. ノンブロッキングI/Oがデッドロックを防止

Dualpipe: 一般的なユースケース

DualPipeが複雑なプロセス間通信の課題を解決できる実用的なアプリケーション。

並列データ処理

CPU集約型のデータ処理タスクを子プロセスにオフロードしながら、親プロセスとのインタラクティブな通信を維持します。

Example:

親プロセスが画像データを送信し、子プロセスが処理結果を返す画像処理パイプライン。

コマンド実行

別のプロセスでコマンドを実行し、その出力をリアルタイムで取得し、前の結果に基づいて追加のコマンドを送信する機能。

Example:

コマンド間で状態を維持する必要があるインタラクティブシェル環境やコマンドラインツール。

サービス通信

コントローラープロセスと双方向に通信する長時間実行サービスプロセスを作成します。

Example:

キューからタスクを処理し、結果をメインアプリケーションに報告するバックグラウンドワーカー。

サンドボックス実行

制御された通信チャネルを持つ別のプロセスで信頼されていないコードを実行します。

Example:

ユーザー提出コードを安全に実行しながら出力をキャプチャする必要があるコード評価システム。

Dualpipe: 他のIPC方法との比較

DualPipeと他のPythonプロセス間通信方法の比較。

FeatureDualPipesubprocessmultiprocessingos.popen
Bidirectional Communication✅ Built-in✅ With PIPE✅ With Queue❌ One-way only
Non-blocking I/O✅ Built-in⚠️ Requires setup✅ With Queue❌ Blocking
Process Isolation✅ Complete✅ Complete✅ Complete✅ Complete
API Simplicity✅ Simple⚠️ Complex⚠️ Moderate✅ Simple
Windows Support❌ Unix only✅ Cross-platform✅ Cross-platform✅ Cross-platform

注:比較はプロセス間の双方向通信の使いやすさに焦点を当てています。

DualPipeドキュメント

インストール、使用例、APIリファレンスを含むDualPipeユーティリティの包括的なドキュメント。

# DualPipe

A Python utility for efficient bidirectional communication between processes.

## DualPipe Overview

DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks.

## DualPipe Key Features

- **Bidirectional Communication**: Seamless two-way communication between parent and child processes
- **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow
- **Process Isolation**: Maintains proper isolation while providing convenient IPC
- **Error Handling**: Gracefully manages exceptions and process termination

## DualPipe Installation

```bash
# Clone the repository
git clone https://github.com/deepseek-ai/dualpipe.git
cd dualpipe

# Install the package
pip install -e .
```

## DualPipe Usage Example

```python
from dualpipe import DualPipe
import time

def echo_uppercase():
    """Simple function that reads lines and echoes them in uppercase"""
    while True:
        line = input()
        if not line:
            break
        print(line.upper())

# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)

# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")

# Read the responses
time.sleep(0.1)  # Give the child process time to respond

print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())

# Clean up
pipe.close()
```

## How It Works

DualPipe uses Unix pipes and process forking to create a child process and establish communication channels:

1. Two pipes are created: one for parent-to-child communication and one for child-to-parent
2. The process is forked, creating a child process
3. In the child process, stdin/stdout are redirected to the appropriate pipe ends
4. In the parent process, non-blocking I/O is set up for the pipes
5. The parent can then write to and read from the child process

## API Reference

### DualPipe(target_func, *args, **kwargs)

Creates a new DualPipe instance with a target function to run in the child process.

#### Methods

- **write(data)**: Write data to the child process
- **read(size=1024, timeout=None)**: Read data from the child process
- **readline(timeout=None)**: Read a line from the child process
- **close()**: Close the pipes and wait for the child process to terminate

## Limitations

- Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork()
- For Windows compatibility, consider using the multiprocessing or subprocess modules

## License

MIT

DualPipe実装

各コンポーネントを説明する詳細なコメント付きのDualPipeの完全な実装:

import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union

class DualPipe:
    """
    DualPipe: A utility for bidirectional communication with a child process.
    
    This class creates a child process and establishes two-way communication
    with it using pipes. It handles non-blocking I/O to prevent deadlocks.
    """
    
    def __init__(self, target_func: Callable, *args, **kwargs):
        """
        Initialize a DualPipe with a target function to run in the child process.
        
        Args:
            target_func: The function to execute in the child process
            *args, **kwargs: Arguments to pass to the target function
        """
        # Create pipes for parent-to-child and child-to-parent communication
        parent_to_child_r, parent_to_child_w = os.pipe()
        child_to_parent_r, child_to_parent_w = os.pipe()
        
        # Fork the process
        pid = os.fork()
        
        if pid == 0:  # Child process
            # Close unused pipe ends
            os.close(parent_to_child_w)
            os.close(child_to_parent_r)
            
            # Redirect stdin/stdout to the pipes
            os.dup2(parent_to_child_r, sys.stdin.fileno())
            os.close(parent_to_child_r)
            
            os.dup2(child_to_parent_w, sys.stdout.fileno())
            os.close(child_to_parent_w)
            
            # Execute the target function
            try:
                target_func(*args, **kwargs)
            except Exception as e:
                print(f"Error in child process: {e}", file=sys.stderr)
            finally:
                # Ensure clean exit
                os._exit(0)
        else:  # Parent process
            # Close unused pipe ends
            os.close(parent_to_child_r)
            os.close(child_to_parent_w)
            
            # Set non-blocking mode for the pipes
            for fd in (parent_to_child_w, child_to_parent_r):
                flags = fcntl.fcntl(fd, fcntl.F_GETFL)
                fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            
            # Store the pipe file descriptors and child process ID
            self.parent_to_child_w = parent_to_child_w
            self.child_to_parent_r = child_to_parent_r
            self.child_pid = pid
            self.buffer = b""
    
    def write(self, data: Union[str, bytes]) -> int:
        """
        Write data to the child process.
        
        Args:
            data: The data to write (string or bytes)
            
        Returns:
            Number of bytes written
        """
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        try:
            return os.write(self.parent_to_child_w, data)
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, try again later
                return 0
            raise
    
    def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
        """
        Read data from the child process.
        
        Args:
            size: Maximum number of bytes to read
            timeout: Maximum time to wait for data (None = non-blocking)
            
        Returns:
            Bytes read from the child process
        """
        if timeout is not None:
            # Wait for data to be available
            r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
            if not r:
                return b""  # Timeout occurred
        
        try:
            data = os.read(self.child_to_parent_r, size)
            self.buffer += data
            return data
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, no data available
                return b""
            raise
    
    def readline(self, timeout: Optional[float] = None) -> bytes:
        """
        Read a line from the child process.
        
        Args:
            timeout: Maximum time to wait for a complete line
            
        Returns:
            A line of data (including newline character)
        """
        start_time = time.time()
        
        while b'\n' not in self.buffer:
            if timeout is not None:
                elapsed = time.time() - start_time
                if elapsed >= timeout:
                    break
                remaining = timeout - elapsed
            else:
                remaining = None
                
            data = self.read(1024, remaining)
            if not data:
                break
        
        # Extract a line from the buffer if available
        if b'\n' in self.buffer:
            line, self.buffer = self.buffer.split(b'\n', 1)
            return line + b'\n'
        
        # Return partial line if no newline found
        result = self.buffer
        self.buffer = b""
        return result
    
    def close(self) -> Tuple[int, int]:
        """
        Close the pipes and wait for the child process to terminate.
        
        Returns:
            Tuple of (pid, status) from os.waitpid
        """
        os.close(self.parent_to_child_w)
        os.close(self.child_to_parent_r)
        return os.waitpid(self.child_pid, 0)

# Example usage
if __name__ == "__main__":
    def echo_uppercase():
        """Simple function that reads lines and echoes them in uppercase"""
        while True:
            line = sys.stdin.readline()
            if not line:
                break
            sys.stdout.write(line.upper())
            sys.stdout.flush()
    
    # Create a DualPipe with the echo_uppercase function
    pipe = DualPipe(echo_uppercase)
    
    # Send some data
    pipe.write("Hello, world!\n")
    pipe.write("Testing DualPipe\n")
    
    # Read the responses
    time.sleep(0.1)  # Give the child process time to respond
    
    print("Response 1:", pipe.readline().decode().strip())
    print("Response 2:", pipe.readline().decode().strip())
    
    # Clean up
    pipe.close()

この実装はos.fork()を使用しており、Unix系システム(Linux、macOSなど)でのみ利用可能です。

よくある質問 - Dualpipe

DualPipeは何に使用されますか?
DualPipeはPythonで親プロセスと子プロセス間の双方向通信を確立するために使用されます。特に、サブプロセスにコマンドを送信し、結果を受け取る必要がある並列処理タスクで、プロセス分離を維持しながら役立ちます。
DualPipeはどのようにデッドロックを防止しますか?
DualPipeはパイプファイル記述子にO_NONBLOCKフラグを設定することでノンブロッキングI/O操作を使用します。これにより、読み書き操作が無期限にブロックされないようにし、両方のプロセスが互いに待機する可能性のあるデッドロックを防止します。
DualPipeはWindowsで使用できますか?
ここで示されている実装はos.fork()を使用しており、Unix系システム(Linux、macOSなど)でのみ利用可能です。Windows互換性のためには、multiprocessingモジュールまたはsubprocessモジュールを使用するように実装を変更する必要があります。
DualPipeとPythonのsubprocessモジュールの違いは何ですか?
Pythonのsubprocessモジュールも子プロセスとの通信を可能にしますが、DualPipeはノンブロッキングI/Oによる双方向通信のために特別に設計されたよりシンプルなインターフェースを提供します。プロセス間で継続的にデータを交換する必要がある場合に特に役立ちます。