Process-Safe Port Manager Design¶
Author: LittlebullGit
Date: October 2024
Status: Design Document
Component: lightning.fabric.utilities.port_manager
Executive Summary¶
This document describes the design and implementation of a process-safe port allocation manager for PyTorch Lightning. The port manager prevents EADDRINUSE errors in distributed training tests by coordinating port allocation across multiple concurrent processes using file-based locking.
Problem Statement¶
Current Limitations¶
The original PortManager implementation is thread-safe but not process-safe:
- Thread-safe only: Uses - threading.Lock()which only protects within a single Python process
- In-memory state: Port allocations stored in process-local memory ( - set[int])
- Global singleton per process: Each process has its own instance with no inter-process communication 
- Race conditions in CI: When GPU tests run in batches (e.g., 5 concurrent pytest workers), multiple processes may allocate the same port 
Failure Scenario¶
Process A (pytest-xdist worker 0):
  - Allocates port 12345
  - Stores in local memory
Process B (pytest-xdist worker 1):
  - Unaware of Process A's allocation
  - Allocates same port 12345
  - Stores in local memory
Both processes attempt to bind → EADDRINUSE error
Requirements¶
- Process-safe: Coordinate port allocation across multiple concurrent processes 
- Platform-neutral: Support Linux, macOS, and Windows 
- Backward compatible: Existing API must continue to work unchanged 
- Test-focused: Optimized for test suite usage (up to 1-hour ML training tests) 
- Performance: Minimal overhead (<10ms per allocation) 
- Robust cleanup: Handle process crashes, stale locks, and orphaned ports 
- Configurable: Support isolated test runs via environment variables 
Architecture Overview¶
Components¶
┌─────────────────────────────────────────────────────────┐
│                     PortManager                          │
│  - Public API (allocate_port, release_port)             │
│  - Context manager support (__enter__, __exit__)        │
│  - In-memory cache for performance                      │
└─────────────────────┬───────────────────────────────────┘
                      │
        ┌─────────────┴──────────────┐
        │                            │
┌───────▼─────────┐         ┌────────▼────────┐
│   File Lock     │         │   State Store   │
│  (Platform      │         │   (JSON file)   │
│   specific)     │         └─────────────────┘
└─────────────────┘
        │
        ├─ UnixFileLock (fcntl.flock)
        └─ WindowsFileLock (msvcrt.locking)
File-Based Coordination¶
Lock File: lightning_port_manager.lock
- Platform-specific file locking mechanism 
- Ensures atomic read-modify-write operations 
- 30-second acquisition timeout with deadlock detection 
State File: lightning_port_manager_state.json
- JSON-formatted shared state 
- Atomic writes (temp file + rename) 
- PID-based port ownership tracking 
Default Location: System temp directory (from tempfile.gettempdir())
Override: Set LIGHTNING_PORT_LOCK_DIR environment variable
Detailed Design¶
1. Platform Abstraction Layer¶
FileLock Interface¶
class FileLock(ABC):
    """Abstract base class for platform-specific file locking."""
    @abstractmethod
    def acquire(self, timeout: float = 30.0) -> bool:
        """Acquire the lock, blocking up to timeout seconds.
        Args:
            timeout: Maximum seconds to wait for lock
        Returns:
            True if lock acquired, False on timeout
        """
    @abstractmethod
    def release(self) -> None:
        """Release the lock."""
    def __enter__(self):
        if not self.acquire():
            raise TimeoutError("Failed to acquire lock")
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()
        return False
Unix Implementation (fcntl)¶
class UnixFileLock(FileLock):
    """File locking using fcntl.flock (Linux, macOS)."""
    def acquire(self, timeout: float = 30.0) -> bool:
        import fcntl
        import time
        start = time.time()
        while time.time() - start < timeout:
            try:
                fcntl.flock(self._fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
                return True
            except (OSError, IOError):
                time.sleep(0.1)
        return False
Windows Implementation (msvcrt)¶
class WindowsFileLock(FileLock):
    """File locking using msvcrt.locking (Windows)."""
    def acquire(self, timeout: float = 30.0) -> bool:
        import msvcrt
        import time
        start = time.time()
        while time.time() - start < timeout:
            try:
                msvcrt.locking(self._fd, msvcrt.LK_NBLCK, 1)
                return True
            except OSError:
                time.sleep(0.1)
        return False
2. State Management¶
State Schema¶
{
  "version": "1.0",
  "allocated_ports": {
    "12345": {
      "pid": 54321,
      "allocated_at": 1729774800.123,
      "process_name": "pytest-xdist-worker-0"
    },
    "12346": {
      "pid": 54322,
      "allocated_at": 1729774801.456,
      "process_name": "pytest-xdist-worker-1"
    }
  },
  "recently_released": [
    {
      "port": 12340,
      "released_at": 1729774700.789,
      "pid": 54320
    }
  ]
}
State Operations¶
Design Pattern: Low-Level Primitives
Both _read_state() and _write_state() are low-level primitives that do not manage locking. The caller (high-level operations like allocate_port(), release_port()) is responsible for holding the file lock during the entire read-modify-write cycle. This ensures:
- Atomicity: Lock held across entire operation 
- Symmetry: Both primitives follow same pattern 
- Clarity: Clear separation between low-level and high-level operations 
Read State (Low-Level):
def _read_state(self) -> PortState:
    """Read state from file, cleaning stale entries.
    Low-level primitive - does NOT acquire lock.
    IMPORTANT: Caller must hold self._file_lock before calling.
    """
    if not self._state_file.exists():
        return PortState()  # Empty state
    try:
        with open(self._state_file, 'r') as f:
            data = json.load(f)
        state = PortState.from_dict(data)
        state.cleanup_stale_entries()  # Remove dead PIDs
        return state
    except (json.JSONDecodeError, OSError):
        # Corrupted state, start fresh
        log.warning("Corrupted state file detected, starting with clean state")
        return PortState()
Write State (Low-Level):
def _write_state(self, state: PortState) -> None:
    """Atomically write state to file.
    Low-level primitive - does NOT acquire lock.
    IMPORTANT: Caller must hold self._file_lock before calling.
    Uses atomic write pattern: write to temp file, then rename.
    """
    temp_file = self._state_file.with_suffix('.tmp')
    try:
        with open(temp_file, 'w') as f:
            json.dump(state.to_dict(), f, indent=2)
        # Atomic rename (platform-safe)
        temp_file.replace(self._state_file)
    finally:
        # Clean up temp file if it still exists
        temp_file.unlink(missing_ok=True)
Runtime Safety Checks (Optional):
To prevent misuse, we can add runtime assertions:
def _read_state(self) -> PortState:
    """Read state from file."""
    if not self._file_lock.is_locked():
        raise RuntimeError("_read_state called without holding lock")
    # ... rest of implementation
def _write_state(self, state: PortState) -> None:
    """Write state to file."""
    if not self._file_lock.is_locked():
        raise RuntimeError("_write_state called without holding lock")
    # ... rest of implementation
High-Level Operations (Manage Locking):
High-level public methods manage the file lock for entire operations:
def release_port(self, port: int) -> None:
    """Release a port (high-level operation).
    Manages locking internally - calls low-level primitives.
    """
    with self._file_lock:  # <-- Acquire lock
        state = self._read_state()       # Low-level read
        state.release_port(port)          # Modify state
        self._write_state(state)          # Low-level write
    # <-- Release lock
    # Update in-memory cache (outside lock)
    if port in self._allocated_ports:
        self._allocated_ports.remove(port)
        self._recently_released.append(port)
Pattern Summary:
Low-Level (_read_state, _write_state):
  - Do NOT acquire lock
  - Assume lock is held
  - Private methods (underscore prefix)
  - Called only by high-level operations
High-Level (allocate_port, release_port, cleanup_stale_entries):
  - Acquire lock using `with self._file_lock:`
  - Call low-level primitives inside critical section
  - Public API methods
  - Hold lock for entire read-modify-write cycle
3. Port Allocation Algorithm¶
def allocate_port(self, preferred_port: Optional[int] = None,
                  max_attempts: int = 1000) -> int:
    """Allocate a free port with process-safe coordination.
    Algorithm:
    1. Acquire file lock
    2. Read current state from file
    3. Clean up stale entries (dead PIDs, old timestamps)
    4. Check if preferred port is available
    5. Otherwise, find free port via OS
    6. Verify port not in allocated or recently_released
    7. Add to allocated_ports with current PID
    8. Write updated state to file
    9. Release file lock
    10. Update in-memory cache
    """
    with self._file_lock:
        state = self._read_state()
        # Try preferred port
        if preferred_port and self._is_port_available(preferred_port, state):
            port = preferred_port
        else:
            # Find free port
            for _ in range(max_attempts):
                port = self._find_free_port()
                if self._is_port_available(port, state):
                    break
            else:
                raise RuntimeError(f"Failed to allocate port after {max_attempts} attempts")
        # Allocate in state
        state.allocate_port(port, pid=os.getpid())
        self._write_state(state)
        # Update in-memory cache
        self._allocated_ports.add(port)
        return port
4. Cleanup Strategy¶
Three-Tier Cleanup¶
1. Normal Cleanup (atexit)
def release_all(self) -> None:
    """Release all ports allocated by this process."""
    with self._file_lock:
        state = self._read_state()
        current_pid = os.getpid()
        # Release ports owned by this PID
        ports_to_release = [
            port for port, info in state.allocated_ports.items()
            if info['pid'] == current_pid
        ]
        for port in ports_to_release:
            state.release_port(port)
        self._write_state(state)
2. Stale Entry Cleanup
def cleanup_stale_entries(self) -> int:
    """Remove ports from dead processes."""
    with self._file_lock:
        state = self._read_state()
        stale_count = 0
        for port, info in list(state.allocated_ports.items()):
            if not self._is_pid_alive(info['pid']):
                state.release_port(port)
                stale_count += 1
        # Remove old recently_released entries (>2 hours)
        cutoff = time.time() - 7200  # 2 hours
        state.recently_released = [
            entry for entry in state.recently_released
            if entry['released_at'] > cutoff
        ]
        self._write_state(state)
        return stale_count
3. Time-Based Cleanup
- Ports allocated >2 hours ago are considered stale 
- Automatically cleaned on next allocation 
- Prevents leaked ports from hung tests 
5. Context Manager Support¶
class PortManager:
    def __enter__(self):
        """Enter context manager."""
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit context manager - cleanup ports from this process."""
        self.release_all()
        return False  # Don't suppress exceptions
Usage Patterns:
# Pattern 1: Explicit management (backward compatible)
manager = get_port_manager()
port = manager.allocate_port()
try:
    # ... use port
finally:
    manager.release_port(port)
# Pattern 2: Single port context manager (existing)
with get_port_manager().allocated_port() as port:
    # ... use port
# Pattern 3: Manager-level context manager (NEW)
with get_port_manager() as manager:
    port1 = manager.allocate_port()
    port2 = manager.allocate_port()
    # ... use ports
# Automatic cleanup
6. Configuration¶
Environment Variables¶
LIGHTNING_PORT_LOCK_DIR
- Override default lock file location 
- Default: - tempfile.gettempdir()
- Use case: Isolate parallel CI jobs 
# Example: Parallel CI jobs on same machine
export LIGHTNING_PORT_LOCK_DIR=/tmp/lightning_ci_job_1
pytest tests/
# Job 2
export LIGHTNING_PORT_LOCK_DIR=/tmp/lightning_ci_job_2
pytest tests/
File Paths¶
def _get_lock_dir() -> Path:
    """Get directory for lock files, creating if needed."""
    lock_dir = os.getenv("LIGHTNING_PORT_LOCK_DIR", tempfile.gettempdir())
    lock_path = Path(lock_dir)
    lock_path.mkdir(parents=True, exist_ok=True)
    return lock_path
def _get_lock_file() -> Path:
    return _get_lock_dir() / "lightning_port_manager.lock"
def _get_state_file() -> Path:
    return _get_lock_dir() / "lightning_port_manager_state.json"
7. Pytest Integration¶
Session Hooks¶
# In tests/tests_fabric/conftest.py and tests/tests_pytorch/conftest.py
def pytest_sessionstart(session):
    """Clean stale port state at session start."""
    from lightning.fabric.utilities.port_manager import get_port_manager
    manager = get_port_manager()
    stale_count = manager.cleanup_stale_entries()
    if stale_count > 0:
        print(f"Cleaned up {stale_count} stale port(s) from previous runs")
def pytest_sessionfinish(session, exitstatus):
    """Final cleanup at session end."""
    from lightning.fabric.utilities.port_manager import get_port_manager
    manager = get_port_manager()
    manager.cleanup_stale_entries()
Test-Level Cleanup (Enhanced)¶
Existing retry logic in pytest_runtest_makereport is enhanced to:
- Release ports before retry 
- Clean up stale entries 
- Wait for OS TIME_WAIT state 
Performance Considerations¶
Optimization Strategies¶
1. In-Memory Cache
- Keep process-local cache of allocated ports 
- Only consult file state on allocation/release 
- Reduces file I/O by ~90% 
2. Lazy Cleanup
- Stale entry cleanup on allocation, not on every read 
- Batch cleanup operations 
- Amortize cleanup cost 
3. Lock Minimization
- Hold file lock only during critical section 
- Release immediately after state write 
- Typical lock hold time: <5ms 
4. Non-Blocking Fast Path
- Try non-blocking lock first 
- Fall back to blocking with timeout 
- Reduces contention in common case 
Performance Targets¶
| Operation | Target | Notes | 
|---|---|---|
| Port allocation | <10ms | Including file lock + I/O | 
| Port release | <5ms | Simple state update | 
| Stale cleanup | <50ms | May scan 100+ entries | 
| Lock contention | <1% | Processes rarely overlap | 
Error Handling¶
Lock Acquisition Failure¶
try:
    with self._file_lock:
        # ... allocate port
except TimeoutError as e:
    # Fail fast to prevent state divergence
    log.error("Failed to acquire file lock for port allocation")
    raise RuntimeError(
        "Unable to acquire file lock for port allocation. "
        "This prevents process-safe coordination. "
        "Check if another process is holding the lock or if the lock file is inaccessible."
    ) from e
Rationale: We fail fast on lock timeout instead of falling back to OS allocation. Fallback would bypass the shared state, allowing multiple processes to allocate the same port, defeating the purpose of process-safe coordination. By raising an error, we force the caller to handle the exceptional case explicitly rather than silently accepting a race condition.
Corrupted State File¶
try:
    state = json.load(f)
except json.JSONDecodeError:
    log.warning("Corrupted state file, starting fresh")
    return PortState()  # Empty state
Dead PID Detection¶
def _is_pid_alive(self, pid: int) -> bool:
    """Check if process is still running."""
    try:
        os.kill(pid, 0)  # Signal 0 = existence check
        return True
    except (OSError, ProcessLookupError):
        return False
Security Considerations¶
File Permissions¶
- Lock and state files created with default umask 
- No sensitive data stored (only port numbers and PIDs) 
- Consider restrictive permissions in multi-user environments 
Race Conditions¶
- Time-of-check-to-time-of-use: Mitigated by holding lock during entire allocation 
- Stale lock detection: Verify PID before breaking lock 
- Atomic writes: Use temp file + rename pattern 
Testing Strategy¶
Unit Tests¶
- File locking: Test acquire/release on each platform 
- State serialization: JSON encode/decode 
- PID validation: Alive/dead detection 
- Stale cleanup: Remove dead process ports 
- Context manager: Enter/exit behavior 
Integration Tests¶
- Multi-process allocation: Spawn 5+ processes, verify unique ports 
- Process crash recovery: Kill process mid-allocation, verify cleanup 
- Lock timeout: Simulate deadlock, verify recovery 
- Stress test: 1000+ allocations across processes 
Platform-Specific Tests¶
- Run full suite on Linux, macOS, Windows 
- Verify file locking behavior on each platform 
- Test in CI with pytest-xdist - -n 5
Rollback Plan¶
If critical issues arise:
- Revert the commit that introduced process-safe port manager (all changes are self-contained in the new files) 
- Remove any leftover lock/state files from the temp directory: - rm -f /tmp/lightning_port_manager*or the custom- LIGHTNING_PORT_LOCK_DIRlocation
- The implementation maintains full backward compatibility - all existing tests pass without modification 
Note: State files are self-contained JSON files with no schema migrations required. Stale entries will be automatically cleaned up on next session start.
Monitoring and Metrics¶
Logging Events¶
DEBUG Level:
- Port allocation/release 
- Lock acquisition 
WARNING Level:
- Lock contention (wait >1s) 
- Stale lock detection 
- Corrupted state recovery 
- High queue utilization (>80%) 
ERROR Level:
- Lock timeout 
- File I/O failures 
- Allocation failures after max retries 
Example Log Output¶
DEBUG: PortManager initialized with lock_dir=/tmp, pid=12345
DEBUG: Allocated port 12345 for pid=12345 in 3.2ms
WARNING: Lock contention detected, waited 1.5s for acquisition
WARNING: Cleaned up 3 stale ports from dead processes
ERROR: Failed to allocate port after 1000 attempts (allocated=50, queue=1020/1024)
Future Enhancements¶
Possible Improvements¶
- Port pool pre-allocation: Reserve block of ports upfront 
- Distributed coordination: Support multi-machine coordination (Redis/etcd) 
- Port affinity: Prefer certain port ranges per process 
- Metrics collection: Track allocation patterns, contention rates 
- Web UI: Visualize port allocation state (debug tool) 
Not Planned¶
- Cross-network coordination (out of scope) 
- Port forwarding/tunneling (different concern) 
- Permanent port reservations (tests only) 
Appendix¶
A. File Format Examples¶
Empty State:
{
  "version": "1.0",
  "allocated_ports": {},
  "recently_released": []
}
Active Allocations:
{
  "version": "1.0",
  "allocated_ports": {
    "12345": {
      "pid": 54321,
      "allocated_at": 1729774800.123,
      "process_name": "pytest-xdist-worker-0"
    }
  },
  "recently_released": [
    {
      "port": 12340,
      "released_at": 1729774700.789,
      "pid": 54320
    }
  ]
}
B. Platform Compatibility¶
| Platform | Lock Mechanism | Tested Versions | 
|---|---|---|
| Linux | fcntl.flock | Ubuntu 20.04, 22.04 | 
| macOS | fcntl.flock | macOS 13, 14 | 
| Windows | msvcrt.locking | Windows Server 2022 | 
C. References¶
D. FAQ¶
Q: Why file-based instead of shared memory? A: File-based is more portable, survives process crashes better, and works well with pytest-xdist’s process model.
Q: What happens if the state file is deleted mid-run? A: Next allocation will create a fresh state file. Some ports may be double-allocated until processes resync, but retry logic will recover.
Q: How do I debug port allocation issues?
A: Check {tempdir}/lightning_port_manager_state.json for current allocations and use LIGHTNING_PORT_LOCK_DIR for isolated debugging.
Q: Does this work with Docker/containers?
A: Yes, as long as containers share the same filesystem (via volume mount) and use the same LIGHTNING_PORT_LOCK_DIR.
Q: Why don’t _read_state() and _write_state() acquire locks themselves?
A: This is a deliberate design choice for consistency and correctness:
- Atomicity: The lock must be held across the entire read-modify-write cycle to prevent race conditions 
- Symmetry: Both low-level primitives follow the same pattern (no locking), making the code easier to understand 
- Clarity: High-level operations (public API) manage locking, low-level primitives (private) assume lock is held 
- Flexibility: Allows high-level operations to hold lock across multiple read/write operations efficiently 
If each primitive acquired its own lock, there would be a race condition between reading state and writing it back, allowing two processes to allocate the same port.
Document Version: 1.0 Last Updated: October 2024 Maintainer: LittlebullGit