Ключевые особенности
- Двунаправленная связь
Обеспечивает беспрепятственную двустороннюю связь между родительским и дочерним процессами, позволяя эффективно обмениваться данными в обоих направлениях.
- Неблокирующий ввод-вывод
Реализует неблокирующие операции ввода-вывода для предотвращения взаимоблокировок и обеспечения плавного потока данных между процессами.
- Изоляция процессов
Поддерживает надлежащую изоляцию процессов, предоставляя удобный интерфейс для межпроцессного взаимодействия, повышая стабильность системы.
- Обработка ошибок
Надежные механизмы обработки ошибок для корректного управления исключениями и завершением процессов, обеспечивающие надежную работу в производственных средах.
Как работает DualPipe
Визуальное объяснение процесса работы DualPipe и механизма коммуникации.
Процесс инициализации
- Создание двух каналов: от родителя к ребенку и от ребенка к родителю
- Форк процесса для создания дочернего процесса
- В дочернем процессе: перенаправление stdin/stdout на каналы
- В родительском процессе: настройка неблокирующего ввода-вывода для каналов
- Выполнение целевой функции в дочернем процессе
Поток коммуникации
- Родитель записывает данные в канал от родителя к ребенку
- Ребенок читает данные из stdin (перенаправленного из канала)
- Ребенок обрабатывает данные и записывает в stdout
- Родитель читает из канала от ребенка к родителю
- Неблокирующий ввод-вывод предотвращает взаимоблокировки
Dualpipe: Типичные случаи использования
Практические приложения, где DualPipe может решить сложные задачи межпроцессного взаимодействия.
Параллельная обработка данных
Выгрузка CPU-интенсивных задач обработки данных в дочерние процессы при сохранении интерактивного взаимодействия с родительским процессом.
Example:
Конвейеры обработки изображений, где родитель отправляет данные изображения, а ребенок возвращает обработанные результаты.
Выполнение команд
Выполнение команд в отдельном процессе и захват их вывода в реальном времени с возможностью отправки дополнительных команд на основе предыдущих результатов.
Example:
Интерактивные среды оболочки или инструменты командной строки, которым необходимо поддерживать состояние между командами.
Коммуникация служб
Создание долгоработающих служебных процессов, которые двунаправленно взаимодействуют с процессом-контроллером.
Example:
Фоновые рабочие процессы, которые обрабатывают задачи из очереди и сообщают результаты обратно в основное приложение.
Изолированное выполнение
Запуск ненадежного кода в отдельном процессе с контролируемыми каналами связи.
Example:
Системы оценки кода, которым необходимо безопасно выполнять код, предоставленный пользователем, с одновременным захватом вывода.
Dualpipe: Сравнение с другими методами IPC
Сравнение DualPipe с другими методами межпроцессного взаимодействия в Python.
Feature | DualPipe | subprocess | multiprocessing | os.popen |
---|---|---|---|---|
Bidirectional Communication | ✅ Built-in | ✅ With PIPE | ✅ With Queue | ❌ One-way only |
Non-blocking I/O | ✅ Built-in | ⚠️ Requires setup | ✅ With Queue | ❌ Blocking |
Process Isolation | ✅ Complete | ✅ Complete | ✅ Complete | ✅ Complete |
API Simplicity | ✅ Simple | ⚠️ Complex | ⚠️ Moderate | ✅ Simple |
Windows Support | ❌ Unix only | ✅ Cross-platform | ✅ Cross-platform | ✅ Cross-platform |
Примечание: Сравнение фокусируется на удобстве использования для двунаправленной связи между процессами.
Документация DualPipe
Исчерпывающая документация по утилите DualPipe, включая установку, примеры использования и справочник по API.
# DualPipe A Python utility for efficient bidirectional communication between processes. ## DualPipe Overview DualPipe provides a simple interface for creating child processes and establishing two-way communication with them. It handles the complexities of pipe creation, process forking, and non-blocking I/O to prevent deadlocks. ## DualPipe Key Features - **Bidirectional Communication**: Seamless two-way communication between parent and child processes - **Non-blocking I/O**: Prevents deadlocks and ensures smooth data flow - **Process Isolation**: Maintains proper isolation while providing convenient IPC - **Error Handling**: Gracefully manages exceptions and process termination ## DualPipe Installation ```bash # Clone the repository git clone https://github.com/deepseek-ai/dualpipe.git cd dualpipe # Install the package pip install -e . ``` ## DualPipe Usage Example ```python from dualpipe import DualPipe import time def echo_uppercase(): """Simple function that reads lines and echoes them in uppercase""" while True: line = input() if not line: break print(line.upper()) # Create a DualPipe with the echo_uppercase function pipe = DualPipe(echo_uppercase) # Send some data pipe.write("Hello, world!\n") pipe.write("Testing DualPipe\n") # Read the responses time.sleep(0.1) # Give the child process time to respond print("Response 1:", pipe.readline().decode().strip()) print("Response 2:", pipe.readline().decode().strip()) # Clean up pipe.close() ``` ## How It Works DualPipe uses Unix pipes and process forking to create a child process and establish communication channels: 1. Two pipes are created: one for parent-to-child communication and one for child-to-parent 2. The process is forked, creating a child process 3. In the child process, stdin/stdout are redirected to the appropriate pipe ends 4. In the parent process, non-blocking I/O is set up for the pipes 5. The parent can then write to and read from the child process ## API Reference ### DualPipe(target_func, *args, **kwargs) Creates a new DualPipe instance with a target function to run in the child process. #### Methods - **write(data)**: Write data to the child process - **read(size=1024, timeout=None)**: Read data from the child process - **readline(timeout=None)**: Read a line from the child process - **close()**: Close the pipes and wait for the child process to terminate ## Limitations - Only available on Unix-like systems (Linux, macOS, etc.) due to the use of os.fork() - For Windows compatibility, consider using the multiprocessing or subprocess modules ## License MIT
Реализация DualPipe
Полная реализация DualPipe с подробными комментариями, объясняющими каждый компонент:
import os
import sys
import select
import fcntl
import errno
import time
from typing import Callable, List, Optional, Tuple, Union
class DualPipe:
"""
DualPipe: A utility for bidirectional communication with a child process.
This class creates a child process and establishes two-way communication
with it using pipes. It handles non-blocking I/O to prevent deadlocks.
"""
def __init__(self, target_func: Callable, *args, **kwargs):
"""
Initialize a DualPipe with a target function to run in the child process.
Args:
target_func: The function to execute in the child process
*args, **kwargs: Arguments to pass to the target function
"""
# Create pipes for parent-to-child and child-to-parent communication
parent_to_child_r, parent_to_child_w = os.pipe()
child_to_parent_r, child_to_parent_w = os.pipe()
# Fork the process
pid = os.fork()
if pid == 0: # Child process
# Close unused pipe ends
os.close(parent_to_child_w)
os.close(child_to_parent_r)
# Redirect stdin/stdout to the pipes
os.dup2(parent_to_child_r, sys.stdin.fileno())
os.close(parent_to_child_r)
os.dup2(child_to_parent_w, sys.stdout.fileno())
os.close(child_to_parent_w)
# Execute the target function
try:
target_func(*args, **kwargs)
except Exception as e:
print(f"Error in child process: {e}", file=sys.stderr)
finally:
# Ensure clean exit
os._exit(0)
else: # Parent process
# Close unused pipe ends
os.close(parent_to_child_r)
os.close(child_to_parent_w)
# Set non-blocking mode for the pipes
for fd in (parent_to_child_w, child_to_parent_r):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
# Store the pipe file descriptors and child process ID
self.parent_to_child_w = parent_to_child_w
self.child_to_parent_r = child_to_parent_r
self.child_pid = pid
self.buffer = b""
def write(self, data: Union[str, bytes]) -> int:
"""
Write data to the child process.
Args:
data: The data to write (string or bytes)
Returns:
Number of bytes written
"""
if isinstance(data, str):
data = data.encode('utf-8')
try:
return os.write(self.parent_to_child_w, data)
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, try again later
return 0
raise
def read(self, size: int = 1024, timeout: Optional[float] = None) -> bytes:
"""
Read data from the child process.
Args:
size: Maximum number of bytes to read
timeout: Maximum time to wait for data (None = non-blocking)
Returns:
Bytes read from the child process
"""
if timeout is not None:
# Wait for data to be available
r, _, _ = select.select([self.child_to_parent_r], [], [], timeout)
if not r:
return b"" # Timeout occurred
try:
data = os.read(self.child_to_parent_r, size)
self.buffer += data
return data
except OSError as e:
if e.errno == errno.EAGAIN:
# Would block, no data available
return b""
raise
def readline(self, timeout: Optional[float] = None) -> bytes:
"""
Read a line from the child process.
Args:
timeout: Maximum time to wait for a complete line
Returns:
A line of data (including newline character)
"""
start_time = time.time()
while b'\n' not in self.buffer:
if timeout is not None:
elapsed = time.time() - start_time
if elapsed >= timeout:
break
remaining = timeout - elapsed
else:
remaining = None
data = self.read(1024, remaining)
if not data:
break
# Extract a line from the buffer if available
if b'\n' in self.buffer:
line, self.buffer = self.buffer.split(b'\n', 1)
return line + b'\n'
# Return partial line if no newline found
result = self.buffer
self.buffer = b""
return result
def close(self) -> Tuple[int, int]:
"""
Close the pipes and wait for the child process to terminate.
Returns:
Tuple of (pid, status) from os.waitpid
"""
os.close(self.parent_to_child_w)
os.close(self.child_to_parent_r)
return os.waitpid(self.child_pid, 0)
# Example usage
if __name__ == "__main__":
def echo_uppercase():
"""Simple function that reads lines and echoes them in uppercase"""
while True:
line = sys.stdin.readline()
if not line:
break
sys.stdout.write(line.upper())
sys.stdout.flush()
# Create a DualPipe with the echo_uppercase function
pipe = DualPipe(echo_uppercase)
# Send some data
pipe.write("Hello, world!\n")
pipe.write("Testing DualPipe\n")
# Read the responses
time.sleep(0.1) # Give the child process time to respond
print("Response 1:", pipe.readline().decode().strip())
print("Response 2:", pipe.readline().decode().strip())
# Clean up
pipe.close()
Эта реализация использует os.fork(), который доступен только в Unix-подобных системах (Linux, macOS и т.д.).
Часто задаваемые вопросы - Dualpipe
- Для чего используется DualPipe?
- DualPipe используется для установления двунаправленной связи между родительским и дочерним процессами в Python. Он особенно полезен для задач параллельной обработки, где вам нужно отправлять команды в подпроцесс и получать результаты обратно, сохраняя при этом изоляцию процессов.
- Как DualPipe предотвращает взаимоблокировки?
- DualPipe использует неблокирующие операции ввода-вывода, устанавливая флаг O_NONBLOCK на файловые дескрипторы каналов. Это гарантирует, что операции чтения и записи не будут блокироваться на неопределенное время, предотвращая потенциальные взаимоблокировки, когда оба процесса ждут друг друга.
- Можно ли использовать DualPipe в Windows?
- Показанная здесь реализация использует os.fork(), который доступен только в Unix-подобных системах (Linux, macOS и т.д.). Для совместимости с Windows вам потребуется изменить реализацию, чтобы использовать модуль multiprocessing или модуль subprocess.
- В чем разница между DualPipe и модулем subprocess в Python?
- Хотя модуль subprocess в Python также позволяет взаимодействовать с дочерними процессами, DualPipe предоставляет более упрощенный интерфейс, специально разработанный для двунаправленной связи с неблокирующим вводом-выводом. Он особенно полезен, когда вам нужно непрерывно обмениваться данными между процессами.