DeepSeek DualPipe

Python-утилита для эффективного двунаправленного взаимодействия между процессами, обеспечивающая параллельную обработку данных с минимальными накладными расходами.

DualPipe Architecture Visualization

Ключевые особенности

Двунаправленная связь

Обеспечивает беспрепятственную двустороннюю связь между родительским и дочерним процессами, позволяя эффективно обмениваться данными в обоих направлениях.

Неблокирующий ввод-вывод

Реализует неблокирующие операции ввода-вывода для предотвращения взаимоблокировок и обеспечения плавного потока данных между процессами.

Изоляция процессов

Поддерживает надлежащую изоляцию процессов, предоставляя удобный интерфейс для межпроцессного взаимодействия, повышая стабильность системы.

Обработка ошибок

Надежные механизмы обработки ошибок для корректного управления исключениями и завершением процессов, обеспечивающие надежную работу в производственных средах.

Как работает DualPipe

Визуальное объяснение процесса работы DualPipe и механизма коммуникации.

DualPipe Workflow Diagram

Процесс инициализации

  1. Создание двух каналов: от родителя к ребенку и от ребенка к родителю
  2. Форк процесса для создания дочернего процесса
  3. В дочернем процессе: перенаправление stdin/stdout на каналы
  4. В родительском процессе: настройка неблокирующего ввода-вывода для каналов
  5. Выполнение целевой функции в дочернем процессе

Поток коммуникации

  1. Родитель записывает данные в канал от родителя к ребенку
  2. Ребенок читает данные из stdin (перенаправленного из канала)
  3. Ребенок обрабатывает данные и записывает в stdout
  4. Родитель читает из канала от ребенка к родителю
  5. Неблокирующий ввод-вывод предотвращает взаимоблокировки

Dualpipe: Типичные случаи использования

Практические приложения, где DualPipe может решить сложные задачи межпроцессного взаимодействия.

Параллельная обработка данных

Выгрузка CPU-интенсивных задач обработки данных в дочерние процессы при сохранении интерактивного взаимодействия с родительским процессом.

Example:

Конвейеры обработки изображений, где родитель отправляет данные изображения, а ребенок возвращает обработанные результаты.

Выполнение команд

Выполнение команд в отдельном процессе и захват их вывода в реальном времени с возможностью отправки дополнительных команд на основе предыдущих результатов.

Example:

Интерактивные среды оболочки или инструменты командной строки, которым необходимо поддерживать состояние между командами.

Коммуникация служб

Создание долгоработающих служебных процессов, которые двунаправленно взаимодействуют с процессом-контроллером.

Example:

Фоновые рабочие процессы, которые обрабатывают задачи из очереди и сообщают результаты обратно в основное приложение.

Изолированное выполнение

Запуск ненадежного кода в отдельном процессе с контролируемыми каналами связи.

Example:

Системы оценки кода, которым необходимо безопасно выполнять код, предоставленный пользователем, с одновременным захватом вывода.

Dualpipe: Сравнение с другими методами IPC

Сравнение DualPipe с другими методами межпроцессного взаимодействия в Python.

FeatureDualPipesubprocessmultiprocessingos.popen
Bidirectional Communication✅ Built-in✅ With PIPE✅ With Queue❌ One-way only
Non-blocking I/O✅ Built-in⚠️ Requires setup✅ With Queue❌ Blocking
Process Isolation✅ Complete✅ Complete✅ Complete✅ Complete
API Simplicity✅ Simple⚠️ Complex⚠️ Moderate✅ Simple
Windows Support❌ Unix only✅ Cross-platform✅ Cross-platform✅ Cross-platform

Примечание: Сравнение фокусируется на удобстве использования для двунаправленной связи между процессами.

Документация DualPipe

Исчерпывающая документация по утилите DualPipe, включая установку, примеры использования и справочник по API.

# DualPipe

A Python utility for efficient bidirectional communication between processes.

## DualPipe Overview

DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks.

## DualPipe Key Features

- **Bidirectional Communication**: Seamless two-way communication between parent and child processes
- **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow
- **Process Isolation**: Maintains proper isolation while providing convenient IPC
- **Error Handling**: Gracefully manages exceptions and process termination

## DualPipe Installation

```bash
# Clone the repository
git clone https://github.com/deepseek-ai/dualpipe.git
cd dualpipe

# Install the package
pip install -e .
```

## DualPipe Usage Example

```python
from dualpipe import DualPipe
import time

def echo_uppercase():
    """Simple function that reads lines and echoes them in uppercase"""
    while True:
        line = input()
        if not line:
            break
        print(line.upper())

# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)

# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")

# Read the responses
time.sleep(0.1)  # Give the child process time to respond

print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())

# Clean up
pipe.close()
```

## How It Works

DualPipe uses Unix pipes and process forking to create a child process and establish communication channels:

1. Two pipes are created: one for parent-to-child communication and one for child-to-parent
2. The process is forked, creating a child process
3. In the child process, stdin/stdout are redirected to the appropriate pipe ends
4. In the parent process, non-blocking I/O is set up for the pipes
5. The parent can then write to and read from the child process

## API Reference

### DualPipe(target_func, *args, **kwargs)

Creates a new DualPipe instance with a target function to run in the child process.

#### Methods

- **write(data)**: Write data to the child process
- **read(size=1024, timeout=None)**: Read data from the child process
- **readline(timeout=None)**: Read a line from the child process
- **close()**: Close the pipes and wait for the child process to terminate

## Limitations

- Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork()
- For Windows compatibility, consider using the multiprocessing or subprocess modules

## License

MIT

Реализация DualPipe

Полная реализация DualPipe с подробными комментариями, объясняющими каждый компонент:

import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union

class DualPipe:
    """
    DualPipe: A utility for bidirectional communication with a child process.
    
    This class creates a child process and establishes two-way communication
    with it using pipes. It handles non-blocking I/O to prevent deadlocks.
    """
    
    def __init__(self, target_func: Callable, *args, **kwargs):
        """
        Initialize a DualPipe with a target function to run in the child process.
        
        Args:
            target_func: The function to execute in the child process
            *args, **kwargs: Arguments to pass to the target function
        """
        # Create pipes for parent-to-child and child-to-parent communication
        parent_to_child_r, parent_to_child_w = os.pipe()
        child_to_parent_r, child_to_parent_w = os.pipe()
        
        # Fork the process
        pid = os.fork()
        
        if pid == 0:  # Child process
            # Close unused pipe ends
            os.close(parent_to_child_w)
            os.close(child_to_parent_r)
            
            # Redirect stdin/stdout to the pipes
            os.dup2(parent_to_child_r, sys.stdin.fileno())
            os.close(parent_to_child_r)
            
            os.dup2(child_to_parent_w, sys.stdout.fileno())
            os.close(child_to_parent_w)
            
            # Execute the target function
            try:
                target_func(*args, **kwargs)
            except Exception as e:
                print(f"Error in child process: {e}", file=sys.stderr)
            finally:
                # Ensure clean exit
                os._exit(0)
        else:  # Parent process
            # Close unused pipe ends
            os.close(parent_to_child_r)
            os.close(child_to_parent_w)
            
            # Set non-blocking mode for the pipes
            for fd in (parent_to_child_w, child_to_parent_r):
                flags = fcntl.fcntl(fd, fcntl.F_GETFL)
                fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            
            # Store the pipe file descriptors and child process ID
            self.parent_to_child_w = parent_to_child_w
            self.child_to_parent_r = child_to_parent_r
            self.child_pid = pid
            self.buffer = b""
    
    def write(self, data: Union[str, bytes]) -> int:
        """
        Write data to the child process.
        
        Args:
            data: The data to write (string or bytes)
            
        Returns:
            Number of bytes written
        """
        if isinstance(data, str):
            data = data.encode('utf-8')
        
        try:
            return os.write(self.parent_to_child_w, data)
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, try again later
                return 0
            raise
    
    def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
        """
        Read data from the child process.
        
        Args:
            size: Maximum number of bytes to read
            timeout: Maximum time to wait for data (None = non-blocking)
            
        Returns:
            Bytes read from the child process
        """
        if timeout is not None:
            # Wait for data to be available
            r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
            if not r:
                return b""  # Timeout occurred
        
        try:
            data = os.read(self.child_to_parent_r, size)
            self.buffer += data
            return data
        except OSError as e:
            if e.errno == errno.EAGAIN:
                # Would block, no data available
                return b""
            raise
    
    def readline(self, timeout: Optional[float] = None) -> bytes:
        """
        Read a line from the child process.
        
        Args:
            timeout: Maximum time to wait for a complete line
            
        Returns:
            A line of data (including newline character)
        """
        start_time = time.time()
        
        while b'\n' not in self.buffer:
            if timeout is not None:
                elapsed = time.time() - start_time
                if elapsed >= timeout:
                    break
                remaining = timeout - elapsed
            else:
                remaining = None
                
            data = self.read(1024, remaining)
            if not data:
                break
        
        # Extract a line from the buffer if available
        if b'\n' in self.buffer:
            line, self.buffer = self.buffer.split(b'\n', 1)
            return line + b'\n'
        
        # Return partial line if no newline found
        result = self.buffer
        self.buffer = b""
        return result
    
    def close(self) -> Tuple[int, int]:
        """
        Close the pipes and wait for the child process to terminate.
        
        Returns:
            Tuple of (pid, status) from os.waitpid
        """
        os.close(self.parent_to_child_w)
        os.close(self.child_to_parent_r)
        return os.waitpid(self.child_pid, 0)

# Example usage
if __name__ == "__main__":
    def echo_uppercase():
        """Simple function that reads lines and echoes them in uppercase"""
        while True:
            line = sys.stdin.readline()
            if not line:
                break
            sys.stdout.write(line.upper())
            sys.stdout.flush()
    
    # Create a DualPipe with the echo_uppercase function
    pipe = DualPipe(echo_uppercase)
    
    # Send some data
    pipe.write("Hello, world!\n")
    pipe.write("Testing DualPipe\n")
    
    # Read the responses
    time.sleep(0.1)  # Give the child process time to respond
    
    print("Response 1:", pipe.readline().decode().strip())
    print("Response 2:", pipe.readline().decode().strip())
    
    # Clean up
    pipe.close()

Эта реализация использует os.fork(), который доступен только в Unix-подобных системах (Linux, macOS и т.д.).

Часто задаваемые вопросы - Dualpipe

Для чего используется DualPipe?
DualPipe используется для установления двунаправленной связи между родительским и дочерним процессами в Python. Он особенно полезен для задач параллельной обработки, где вам нужно отправлять команды в подпроцесс и получать результаты обратно, сохраняя при этом изоляцию процессов.
Как DualPipe предотвращает взаимоблокировки?
DualPipe использует неблокирующие операции ввода-вывода, устанавливая флаг O_NONBLOCK на файловые дескрипторы каналов. Это гарантирует, что операции чтения и записи не будут блокироваться на неопределенное время, предотвращая потенциальные взаимоблокировки, когда оба процесса ждут друг друга.
Можно ли использовать DualPipe в Windows?
Показанная здесь реализация использует os.fork(), который доступен только в Unix-подобных системах (Linux, macOS и т.д.). Для совместимости с Windows вам потребуется изменить реализацию, чтобы использовать модуль multiprocessing или модуль subprocess.
В чем разница между DualPipe и модулем subprocess в Python?
Хотя модуль subprocess в Python также позволяет взаимодействовать с дочерними процессами, DualPipe предоставляет более упрощенный интерфейс, специально разработанный для двунаправленной связи с неблокирующим вводом-выводом. Он особенно полезен, когда вам нужно непрерывно обмениваться данными между процессами.