Skip to content

vllm.v1.worker.gpu.buffer_utils

StagedWriteTensor

Source code in vllm/v1/worker/gpu/buffer_utils.py
class StagedWriteTensor:
    def __init__(
        self,
        size: int | Sequence[int],
        dtype: torch.dtype,
        device: torch.device,
        max_concurrency: int = 2,
        uva_instead_of_gpu: bool = False,
    ):
        supported_dtypes = [torch.int32, torch.int64, torch.float32]
        if dtype not in supported_dtypes:
            raise ValueError(
                f"Unsupported dtype {dtype}: should be one of {supported_dtypes}"
            )
        self.num_rows = size if isinstance(size, int) else size[0]
        self.dtype = dtype
        self.max_concurrency = max_concurrency

        if not uva_instead_of_gpu:
            # Create a GPU tensor (default)
            self.gpu = torch.zeros(size, dtype=dtype, device=device)
        else:
            # For a large but not-frequently-accessed tensor, we can use UVA instead of
            # GPU to save GPU memory
            self._uva_buf = UvaBuffer(size, dtype)
            self.gpu = self._uva_buf.uva

        self._staged_write_indices: list[int] = []
        self._staged_write_starts: list[int] = []
        self._staged_write_contents: list[int | float] = []
        self._staged_write_cu_lens: list[int] = []

        self.write_indices = UvaBufferPool(
            self.num_rows, dtype=torch.int32, max_concurrency=max_concurrency
        )
        self.write_starts = UvaBufferPool(
            self.num_rows, dtype=torch.int32, max_concurrency=max_concurrency
        )
        init_size = next_power_of_2(self.num_rows)
        self.write_contents = UvaBufferPool(
            init_size, dtype=dtype, max_concurrency=max_concurrency
        )
        self.write_cu_lens = UvaBufferPool(
            self.num_rows, dtype=torch.int32, max_concurrency=max_concurrency
        )

    def stage_write(
        self,
        index: int,
        start: int,
        x: Iterable[int] | Iterable[float],
    ) -> None:
        assert index >= 0
        assert start >= 0
        if not x:
            return
        self._staged_write_indices.append(index)
        self._staged_write_starts.append(start)
        self._staged_write_contents.extend(x)
        self._staged_write_cu_lens.append(len(self._staged_write_contents))

    def stage_write_elem(self, index: int, x: int) -> None:
        assert index >= 0
        self._staged_write_indices.append(index)
        self._staged_write_starts.append(0)
        self._staged_write_contents.append(x)
        self._staged_write_cu_lens.append(len(self._staged_write_contents))

    def apply_write(self) -> None:
        n = len(self._staged_write_indices)
        if n == 0:
            return

        indices_uva = self.write_indices.copy_to_uva(self._staged_write_indices)
        starts_uva = self.write_starts.copy_to_uva(self._staged_write_starts)
        cu_lens_uva = self.write_cu_lens.copy_to_uva(self._staged_write_cu_lens)

        # Special handling for write_contents
        diff_len = len(self._staged_write_contents)
        assert isinstance(self.write_contents.size, int)
        if diff_len > self.write_contents.size:
            # Re-allocate a larger buffer for the write_contents
            new_size = next_power_of_2(diff_len)
            self.write_contents = UvaBufferPool(
                new_size, dtype=self.dtype, max_concurrency=self.max_concurrency
            )
            # NOTE(woosuk): Since the previous write_contents buffer is released,
            # we perform a synchronization here to ensure that all data transfers
            # involving the old buffer have finished before allocating a new one.
            # This prevents potential race conditions. The slight overhead is
            # negligible because the reallocations are infrequent in practice.
            torch.cuda.synchronize()
        contents_uva = self.write_contents.copy_to_uva(self._staged_write_contents)

        # Write diffs to the GPU buffer
        _apply_write_kernel[(n,)](
            self.gpu,
            self.gpu.stride(0),
            indices_uva,
            starts_uva,
            contents_uva,
            cu_lens_uva,
            BLOCK_SIZE=1024,
        )
        # Clear the staged writes
        self.clear_staged_writes()

    def clear_staged_writes(self) -> None:
        self._staged_write_indices.clear()
        self._staged_write_starts.clear()
        self._staged_write_contents.clear()
        self._staged_write_cu_lens.clear()

_staged_write_contents instance-attribute

_staged_write_contents: list[int | float] = []

_staged_write_cu_lens instance-attribute

_staged_write_cu_lens: list[int] = []

_staged_write_indices instance-attribute

_staged_write_indices: list[int] = []

_staged_write_starts instance-attribute

_staged_write_starts: list[int] = []

_uva_buf instance-attribute

_uva_buf = UvaBuffer(size, dtype)

dtype instance-attribute

dtype = dtype

gpu instance-attribute

gpu = zeros(size, dtype=dtype, device=device)

max_concurrency instance-attribute

max_concurrency = max_concurrency

num_rows instance-attribute

num_rows = size if isinstance(size, int) else size[0]

write_contents instance-attribute

write_contents = UvaBufferPool(
    init_size, dtype=dtype, max_concurrency=max_concurrency
)

write_cu_lens instance-attribute

write_cu_lens = UvaBufferPool(
    num_rows, dtype=int32, max_concurrency=max_concurrency
)

write_indices instance-attribute

write_indices = UvaBufferPool(
    num_rows, dtype=int32, max_concurrency=max_concurrency
)

write_starts instance-attribute

write_starts = UvaBufferPool(
    num_rows, dtype=int32, max_concurrency=max_concurrency
)

__init__

__init__(
    size: int | Sequence[int],
    dtype: dtype,
    device: device,
    max_concurrency: int = 2,
    uva_instead_of_gpu: bool = False,
)
Source code in vllm/v1/worker/gpu/buffer_utils.py
def __init__(
    self,
    size: int | Sequence[int],
    dtype: torch.dtype,
    device: torch.device,
    max_concurrency: int = 2,
    uva_instead_of_gpu: bool = False,
):
    supported_dtypes = [torch.int32, torch.int64, torch.float32]
    if dtype not in supported_dtypes:
        raise ValueError(
            f"Unsupported dtype {dtype}: should be one of {supported_dtypes}"
        )
    self.num_rows = size if isinstance(size, int) else size[0]
    self.dtype = dtype
    self.max_concurrency = max_concurrency

    if not uva_instead_of_gpu:
        # Create a GPU tensor (default)
        self.gpu = torch.zeros(size, dtype=dtype, device=device)
    else:
        # For a large but not-frequently-accessed tensor, we can use UVA instead of
        # GPU to save GPU memory
        self._uva_buf = UvaBuffer(size, dtype)
        self.gpu = self._uva_buf.uva

    self._staged_write_indices: list[int] = []
    self._staged_write_starts: list[int] = []
    self._staged_write_contents: list[int | float] = []
    self._staged_write_cu_lens: list[int] = []

    self.write_indices = UvaBufferPool(
        self.num_rows, dtype=torch.int32, max_concurrency=max_concurrency
    )
    self.write_starts = UvaBufferPool(
        self.num_rows, dtype=torch.int32, max_concurrency=max_concurrency
    )
    init_size = next_power_of_2(self.num_rows)
    self.write_contents = UvaBufferPool(
        init_size, dtype=dtype, max_concurrency=max_concurrency
    )
    self.write_cu_lens = UvaBufferPool(
        self.num_rows, dtype=torch.int32, max_concurrency=max_concurrency
    )

apply_write

apply_write() -> None
Source code in vllm/v1/worker/gpu/buffer_utils.py
def apply_write(self) -> None:
    n = len(self._staged_write_indices)
    if n == 0:
        return

    indices_uva = self.write_indices.copy_to_uva(self._staged_write_indices)
    starts_uva = self.write_starts.copy_to_uva(self._staged_write_starts)
    cu_lens_uva = self.write_cu_lens.copy_to_uva(self._staged_write_cu_lens)

    # Special handling for write_contents
    diff_len = len(self._staged_write_contents)
    assert isinstance(self.write_contents.size, int)
    if diff_len > self.write_contents.size:
        # Re-allocate a larger buffer for the write_contents
        new_size = next_power_of_2(diff_len)
        self.write_contents = UvaBufferPool(
            new_size, dtype=self.dtype, max_concurrency=self.max_concurrency
        )
        # NOTE(woosuk): Since the previous write_contents buffer is released,
        # we perform a synchronization here to ensure that all data transfers
        # involving the old buffer have finished before allocating a new one.
        # This prevents potential race conditions. The slight overhead is
        # negligible because the reallocations are infrequent in practice.
        torch.cuda.synchronize()
    contents_uva = self.write_contents.copy_to_uva(self._staged_write_contents)

    # Write diffs to the GPU buffer
    _apply_write_kernel[(n,)](
        self.gpu,
        self.gpu.stride(0),
        indices_uva,
        starts_uva,
        contents_uva,
        cu_lens_uva,
        BLOCK_SIZE=1024,
    )
    # Clear the staged writes
    self.clear_staged_writes()

clear_staged_writes

clear_staged_writes() -> None
Source code in vllm/v1/worker/gpu/buffer_utils.py
def clear_staged_writes(self) -> None:
    self._staged_write_indices.clear()
    self._staged_write_starts.clear()
    self._staged_write_contents.clear()
    self._staged_write_cu_lens.clear()

stage_write

stage_write(
    index: int,
    start: int,
    x: Iterable[int] | Iterable[float],
) -> None
Source code in vllm/v1/worker/gpu/buffer_utils.py
def stage_write(
    self,
    index: int,
    start: int,
    x: Iterable[int] | Iterable[float],
) -> None:
    assert index >= 0
    assert start >= 0
    if not x:
        return
    self._staged_write_indices.append(index)
    self._staged_write_starts.append(start)
    self._staged_write_contents.extend(x)
    self._staged_write_cu_lens.append(len(self._staged_write_contents))

stage_write_elem

stage_write_elem(index: int, x: int) -> None
Source code in vllm/v1/worker/gpu/buffer_utils.py
def stage_write_elem(self, index: int, x: int) -> None:
    assert index >= 0
    self._staged_write_indices.append(index)
    self._staged_write_starts.append(0)
    self._staged_write_contents.append(x)
    self._staged_write_cu_lens.append(len(self._staged_write_contents))

UvaBackedTensor

Source code in vllm/v1/worker/gpu/buffer_utils.py
class UvaBackedTensor:
    def __init__(
        self,
        size: int | Sequence[int],
        dtype: torch.dtype,
        max_concurrency: int = 2,
    ):
        self.dtype = dtype
        self.max_concurrency = max_concurrency

        # Source of truth
        self.cpu = torch.zeros(size, dtype=dtype, device="cpu", pin_memory=False)
        self.np = self.cpu.numpy()

        # Buffers for concurrency
        self.pool = UvaBufferPool(size, dtype, max_concurrency)
        self.gpu = self.pool.copy_to_uva(self.np)

    def copy_to_uva(self, n: int | None = None) -> torch.Tensor:
        # CPU-to-CPU copy
        self.gpu = self.pool.copy_to_uva(self.np[:n] if n is not None else self.np)
        return self.gpu

cpu instance-attribute

cpu = zeros(
    size, dtype=dtype, device="cpu", pin_memory=False
)

dtype instance-attribute

dtype = dtype

gpu instance-attribute

gpu = copy_to_uva(np)

max_concurrency instance-attribute

max_concurrency = max_concurrency

np instance-attribute

np = numpy()

pool instance-attribute

pool = UvaBufferPool(size, dtype, max_concurrency)

__init__

__init__(
    size: int | Sequence[int],
    dtype: dtype,
    max_concurrency: int = 2,
)
Source code in vllm/v1/worker/gpu/buffer_utils.py
def __init__(
    self,
    size: int | Sequence[int],
    dtype: torch.dtype,
    max_concurrency: int = 2,
):
    self.dtype = dtype
    self.max_concurrency = max_concurrency

    # Source of truth
    self.cpu = torch.zeros(size, dtype=dtype, device="cpu", pin_memory=False)
    self.np = self.cpu.numpy()

    # Buffers for concurrency
    self.pool = UvaBufferPool(size, dtype, max_concurrency)
    self.gpu = self.pool.copy_to_uva(self.np)

copy_to_uva

copy_to_uva(n: int | None = None) -> Tensor
Source code in vllm/v1/worker/gpu/buffer_utils.py
def copy_to_uva(self, n: int | None = None) -> torch.Tensor:
    # CPU-to-CPU copy
    self.gpu = self.pool.copy_to_uva(self.np[:n] if n is not None else self.np)
    return self.gpu

UvaBuffer

Source code in vllm/v1/worker/gpu/buffer_utils.py
class UvaBuffer:
    def __init__(self, size: int | Sequence[int], dtype: torch.dtype):
        if not is_uva_available():
            raise RuntimeError("UVA is not available")
        self.cpu = torch.zeros(size, dtype=dtype, device="cpu", pin_memory=True)
        self.np = self.cpu.numpy()
        self.uva = get_cuda_view_from_cpu_tensor(self.cpu)

cpu instance-attribute

cpu = zeros(
    size, dtype=dtype, device="cpu", pin_memory=True
)

np instance-attribute

np = numpy()

uva instance-attribute

__init__

__init__(size: int | Sequence[int], dtype: dtype)
Source code in vllm/v1/worker/gpu/buffer_utils.py
def __init__(self, size: int | Sequence[int], dtype: torch.dtype):
    if not is_uva_available():
        raise RuntimeError("UVA is not available")
    self.cpu = torch.zeros(size, dtype=dtype, device="cpu", pin_memory=True)
    self.np = self.cpu.numpy()
    self.uva = get_cuda_view_from_cpu_tensor(self.cpu)

UvaBufferPool

Source code in vllm/v1/worker/gpu/buffer_utils.py
class UvaBufferPool:
    def __init__(
        self,
        size: int | Sequence[int],
        dtype: torch.dtype,
        max_concurrency: int = 2,
    ):
        self.size = size
        self.dtype = dtype
        self.max_concurrency = max_concurrency

        # UVA buffers for concurrency
        self._uva_bufs = [UvaBuffer(size, dtype) for _ in range(max_concurrency)]
        # Current buffer index
        self._curr = 0

    def copy_to_uva(self, x: torch.Tensor | np.ndarray | list) -> torch.Tensor:
        # Round robin to the next buffer.
        self._curr = (self._curr + 1) % self.max_concurrency
        buf = self._uva_bufs[self._curr]
        # CPU-to-CPU copy
        dst = buf.cpu if isinstance(x, torch.Tensor) else buf.np
        n = len(x)
        dst[:n] = x
        return buf.uva[:n]

    def copy_to_gpu(
        self,
        x: torch.Tensor | np.ndarray,
        out: torch.Tensor | None = None,
    ) -> torch.Tensor:
        uva = self.copy_to_uva(x)
        if out is None:
            # CPU-to-GPU copy
            return uva.clone()
        # CPU-to-GPU copy
        return out.copy_(uva, non_blocking=True)

_curr instance-attribute

_curr = 0

_uva_bufs instance-attribute

_uva_bufs = [
    (UvaBuffer(size, dtype))
    for _ in (range(max_concurrency))
]

dtype instance-attribute

dtype = dtype

max_concurrency instance-attribute

max_concurrency = max_concurrency

size instance-attribute

size = size

__init__

__init__(
    size: int | Sequence[int],
    dtype: dtype,
    max_concurrency: int = 2,
)
Source code in vllm/v1/worker/gpu/buffer_utils.py
def __init__(
    self,
    size: int | Sequence[int],
    dtype: torch.dtype,
    max_concurrency: int = 2,
):
    self.size = size
    self.dtype = dtype
    self.max_concurrency = max_concurrency

    # UVA buffers for concurrency
    self._uva_bufs = [UvaBuffer(size, dtype) for _ in range(max_concurrency)]
    # Current buffer index
    self._curr = 0

copy_to_gpu

copy_to_gpu(
    x: Tensor | ndarray, out: Tensor | None = None
) -> Tensor
Source code in vllm/v1/worker/gpu/buffer_utils.py
def copy_to_gpu(
    self,
    x: torch.Tensor | np.ndarray,
    out: torch.Tensor | None = None,
) -> torch.Tensor:
    uva = self.copy_to_uva(x)
    if out is None:
        # CPU-to-GPU copy
        return uva.clone()
    # CPU-to-GPU copy
    return out.copy_(uva, non_blocking=True)

copy_to_uva

copy_to_uva(x: Tensor | ndarray | list) -> Tensor
Source code in vllm/v1/worker/gpu/buffer_utils.py
def copy_to_uva(self, x: torch.Tensor | np.ndarray | list) -> torch.Tensor:
    # Round robin to the next buffer.
    self._curr = (self._curr + 1) % self.max_concurrency
    buf = self._uva_bufs[self._curr]
    # CPU-to-CPU copy
    dst = buf.cpu if isinstance(x, torch.Tensor) else buf.np
    n = len(x)
    dst[:n] = x
    return buf.uva[:n]

_apply_write_kernel

_apply_write_kernel(
    output_ptr,
    output_stride,
    write_indices_ptr,
    write_starts_ptr,
    write_contents_ptr,
    write_cu_lens_ptr,
    BLOCK_SIZE: constexpr,
)
Source code in vllm/v1/worker/gpu/buffer_utils.py
@triton.jit
def _apply_write_kernel(
    output_ptr,
    output_stride,
    write_indices_ptr,
    write_starts_ptr,
    write_contents_ptr,
    write_cu_lens_ptr,
    BLOCK_SIZE: tl.constexpr,
):
    pid = tl.program_id(0)
    row_idx = tl.load(write_indices_ptr + pid)
    start_idx = tl.load(write_starts_ptr + pid)

    cu_start = tl.load(write_cu_lens_ptr + pid - 1) if pid > 0 else 0
    cu_end = tl.load(write_cu_lens_ptr + pid)
    content_len = cu_end - cu_start

    for i in range(0, content_len, BLOCK_SIZE):
        block = i + tl.arange(0, BLOCK_SIZE)
        mask = block < content_len
        content = tl.load(write_contents_ptr + cu_start + block, mask=mask)
        tl.store(
            output_ptr + row_idx * output_stride + start_idx + block, content, mask=mask
        )