Skip to content

vllm.v1.core.kv_cache_manager

logger module-attribute

logger = init_logger(__name__)

KVCacheBlocks dataclass

The allocation result of KVCacheManager, work as the interface between Scheduler and KVCacheManager, to hide KVCacheManager's internal data structure from the Scheduler.

Source code in vllm/v1/core/kv_cache_manager.py
@dataclass
class KVCacheBlocks:
    """
    The allocation result of KVCacheManager, work as the interface between
    Scheduler and KVCacheManager, to hide KVCacheManager's internal data
    structure from the Scheduler.
    """

    blocks: tuple[Sequence[KVCacheBlock], ...]
    """
    `blocks[i][j]` refers to the i-th kv_cache_group
    and the j-th block of tokens.We don't use block of
    tokens as the outer dimension because it assumes all
    kv_cache_groups have the same number of blocks, which is true for now but
    will be broken if we want to give different block_size to different
    kv_cache_groups in the future.

    Each single type KVCacheBlocks could be represented as:
    - list[KVCacheBlock] for more than one KVCacheBlock
    - an empty tuple for requests without KVCacheBlock
      (a precomputed KVCacheBlocks is in KVCacheManager to avoid GC overhead)
    """

    def __add__(self, other: "KVCacheBlocks") -> "KVCacheBlocks":
        """Adds two KVCacheBlocks instances."""
        return KVCacheBlocks(
            tuple(
                list(itertools.chain(blk1, blk2))
                for blk1, blk2 in zip(self.blocks, other.blocks)
            )
        )

    @overload
    def get_block_ids(
        self,
        allow_none: Literal[False] = False,
    ) -> tuple[list[int], ...]: ...

    @overload
    def get_block_ids(
        self,
        allow_none: Literal[True] = True,
    ) -> tuple[list[int], ...] | None: ...

    def get_block_ids(
        self,
        allow_none: bool = False,
    ) -> tuple[list[int], ...] | None:
        """
        Converts the KVCacheBlocks instance to block_ids.

        Returns:
            tuple[list[int], ...]: A tuple of lists where:
                - the outer tuple corresponds to KV cache groups
                - each inner list contains the block_ids of the blocks in that
                  group
        """
        if allow_none and all(len(group) == 0 for group in self.blocks):
            return None
        return tuple([blk.block_id for blk in group] for group in self.blocks)

    def get_unhashed_block_ids(self) -> list[int]:
        """Get block_ids of unhashed blocks from KVCacheBlocks instance."""
        assert len(self.blocks) == 1, "Only one group is supported"
        return [block.block_id for block in self.blocks[0] if block.block_hash is None]

    def new_empty(self) -> "KVCacheBlocks":
        """
        Creates a new KVCacheBlocks instance with no blocks.
        """
        return KVCacheBlocks(tuple(() for _ in range(len(self.blocks))))

blocks instance-attribute

blocks: tuple[Sequence[KVCacheBlock], ...]

blocks[i][j] refers to the i-th kv_cache_group and the j-th block of tokens.We don't use block of tokens as the outer dimension because it assumes all kv_cache_groups have the same number of blocks, which is true for now but will be broken if we want to give different block_size to different kv_cache_groups in the future.

Each single type KVCacheBlocks could be represented as: - list[KVCacheBlock] for more than one KVCacheBlock - an empty tuple for requests without KVCacheBlock (a precomputed KVCacheBlocks is in KVCacheManager to avoid GC overhead)

__add__

__add__(other: KVCacheBlocks) -> KVCacheBlocks

Adds two KVCacheBlocks instances.

Source code in vllm/v1/core/kv_cache_manager.py
def __add__(self, other: "KVCacheBlocks") -> "KVCacheBlocks":
    """Adds two KVCacheBlocks instances."""
    return KVCacheBlocks(
        tuple(
            list(itertools.chain(blk1, blk2))
            for blk1, blk2 in zip(self.blocks, other.blocks)
        )
    )

__init__

__init__(
    blocks: tuple[Sequence[KVCacheBlock], ...],
) -> None

get_block_ids

get_block_ids(
    allow_none: Literal[False] = False,
) -> tuple[list[int], ...]
get_block_ids(
    allow_none: Literal[True] = True,
) -> tuple[list[int], ...] | None
get_block_ids(
    allow_none: bool = False,
) -> tuple[list[int], ...] | None

Converts the KVCacheBlocks instance to block_ids.

Returns:

Type Description
tuple[list[int], ...] | None

tuple[list[int], ...]: A tuple of lists where: - the outer tuple corresponds to KV cache groups - each inner list contains the block_ids of the blocks in that group

Source code in vllm/v1/core/kv_cache_manager.py
def get_block_ids(
    self,
    allow_none: bool = False,
) -> tuple[list[int], ...] | None:
    """
    Converts the KVCacheBlocks instance to block_ids.

    Returns:
        tuple[list[int], ...]: A tuple of lists where:
            - the outer tuple corresponds to KV cache groups
            - each inner list contains the block_ids of the blocks in that
              group
    """
    if allow_none and all(len(group) == 0 for group in self.blocks):
        return None
    return tuple([blk.block_id for blk in group] for group in self.blocks)

get_unhashed_block_ids

get_unhashed_block_ids() -> list[int]

Get block_ids of unhashed blocks from KVCacheBlocks instance.

Source code in vllm/v1/core/kv_cache_manager.py
def get_unhashed_block_ids(self) -> list[int]:
    """Get block_ids of unhashed blocks from KVCacheBlocks instance."""
    assert len(self.blocks) == 1, "Only one group is supported"
    return [block.block_id for block in self.blocks[0] if block.block_hash is None]

new_empty

new_empty() -> KVCacheBlocks

Creates a new KVCacheBlocks instance with no blocks.

Source code in vllm/v1/core/kv_cache_manager.py
def new_empty(self) -> "KVCacheBlocks":
    """
    Creates a new KVCacheBlocks instance with no blocks.
    """
    return KVCacheBlocks(tuple(() for _ in range(len(self.blocks))))

KVCacheManager

Source code in vllm/v1/core/kv_cache_manager.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
class KVCacheManager:
    def __init__(
        self,
        kv_cache_config: KVCacheConfig,
        max_model_len: int,
        hash_block_size: int,
        enable_caching: bool = True,
        use_eagle: bool = False,
        log_stats: bool = False,
        enable_kv_cache_events: bool = False,
        dcp_world_size: int = 1,
        pcp_world_size: int = 1,
        metrics_collector: KVCacheMetricsCollector | None = None,
    ) -> None:
        self.max_model_len = max_model_len

        self.enable_caching = enable_caching
        self.use_eagle = use_eagle
        self.log_stats = log_stats
        self.metrics_collector = metrics_collector
        # FIXME: make prefix cache stats conditional on log_stats. We still need
        # this comment because when the log stats is enabled there are still
        # potential configs we could expose in the future.
        self.prefix_cache_stats = PrefixCacheStats() if log_stats else None

        self.coordinator = get_kv_cache_coordinator(
            kv_cache_config=kv_cache_config,
            max_model_len=self.max_model_len,
            use_eagle=self.use_eagle,
            enable_caching=self.enable_caching,
            enable_kv_cache_events=enable_kv_cache_events,
            dcp_world_size=dcp_world_size,
            pcp_world_size=pcp_world_size,
            hash_block_size=hash_block_size,
            metrics_collector=self.metrics_collector,
        )
        self.num_kv_cache_groups = len(kv_cache_config.kv_cache_groups)
        self.block_pool = self.coordinator.block_pool
        self.kv_cache_config = kv_cache_config

        # Pre-constructed KVCacheBlocks with no blocks, callers should use this
        # via create_kv_cache_blocks instead of creating new ones to avoid GC
        # overhead.
        #
        # We use nested tuples to ensure the empty KVCacheBlocks is immutable.
        self.empty_kv_cache_blocks = KVCacheBlocks(
            tuple(() for _ in range(self.num_kv_cache_groups))
        )

    @property
    def usage(self) -> float:
        """Get the KV cache usage.

        Returns:
            The KV cache usage (between 0.0 and 1.0).
        """
        return self.block_pool.get_usage()

    def make_prefix_cache_stats(self) -> PrefixCacheStats | None:
        """Get (and reset) the prefix cache stats.

        Returns:
            The current prefix caching stats, or None if logging is disabled.
        """
        if not self.log_stats:
            return None
        stats = self.prefix_cache_stats
        self.prefix_cache_stats = PrefixCacheStats()
        return stats

    def get_computed_blocks(self, request: Request) -> tuple[KVCacheBlocks, int]:
        """Get the computed (cached) blocks for the request.
        Note that the computed blocks must be full.

        Args:
            request: The request to get the computed blocks.

        Returns:
            A tuple containing:
                - A list of blocks that are computed for the request.
                - The number of computed tokens.
        """
        # We skip finding the prefix cache hit when prefix caching is
        # disabled or the request is marked as skipping kv cache read
        # (which happens when the request requires prompt logprobs
        # or calls a pooling model with all pooling).
        if not self.enable_caching or request.skip_reading_prefix_cache:
            return self.empty_kv_cache_blocks, 0

        # NOTE: When all tokens hit the cache, we must recompute the last token
        # to obtain logits. Thus, set max_cache_hit_length to prompt_length - 1.
        # This can trigger recomputation of an entire block, rather than just
        # the single last token, because allocate_slots() requires
        # num_computed_tokens to be block-size aligned. Removing this limitation
        # could slightly improve performance in the future.
        max_cache_hit_length = request.num_tokens - 1
        computed_blocks, num_new_computed_tokens = (
            self.coordinator.find_longest_cache_hit(
                request.block_hashes, max_cache_hit_length
            )
        )

        if self.log_stats:
            assert self.prefix_cache_stats is not None
            self.prefix_cache_stats.record(
                num_tokens=request.num_tokens,
                num_hits=num_new_computed_tokens,
                preempted=request.num_preemptions > 0,
            )

        return self.create_kv_cache_blocks(computed_blocks), num_new_computed_tokens

    def allocate_slots(
        self,
        request: Request,
        num_new_tokens: int,
        num_new_computed_tokens: int = 0,
        new_computed_blocks: KVCacheBlocks | None = None,
        num_lookahead_tokens: int = 0,
        num_external_computed_tokens: int = 0,
        delay_cache_blocks: bool = False,
        num_encoder_tokens: int = 0,
    ) -> KVCacheBlocks | None:
        """Add slots for a request with new tokens to append.

        Args:
            request: The request to allocate slots.
            num_new_tokens: The number of new tokens to be allocated and computed.
            num_new_computed_tokens: The number of new computed tokens just
                hitting the prefix caching, excluding external tokens.
            new_computed_blocks: The cached blocks for the above new computed
                tokens, grouped as a tuple by kv cache groups.
            num_lookahead_tokens: The number of speculative tokens to allocate.
                This is used by spec decode proposers with kv-cache such
                as eagle.
            num_external_computed_tokens: The number of tokens that their
                KV caches are not cached by vLLM but cached by the connector.
            delay_cache_blocks: Whether to skip caching the blocks. This is
                used by P/D when allocating blocks used in a KV transfer
                which will complete in a future step.
            num_encoder_tokens: The number of encoder tokens to allocate for
                cross-attention in encoder-decoder models(e.g., Whisper).
                For decoder-only models, this should be 0.

        Blocks layout:
        ```
        ----------------------------------------------------------------------
        | < comp > | < new_comp > | < ext_comp >  | < new >  | < lookahead > |
        ----------------------------------------------------------------------
                                                  |   < to be computed >     |
        ----------------------------------------------------------------------
                                  |            < to be allocated >           |
        ----------------------------------------------------------------------
                                  | < to be cached (roughly, |
                                  | details below)>          |
        ----------------------------------------------------------------------
        | Prefix-cached tokens from either vLLM   |
        | or connector. Can be safely removed if  |
        | they are outside sliding window.        |
        ----------------------------------------------------------------------
        |   < cached by vLLM >    | not cached by |
                                  | vLLM, but     |
        | ref_cnt  | ref_cnt not  | cached by     |
        | increased| increased yet| connector     |
        ----------------------------------------------------------------------
        ```

        Abbrivations:

        ```
        comp      = request.num_computed_tokens
        new_comp  = num_new_computed_tokens
                  = len(new_computed_blocks) * block_size
        ext_comp  = num_external_computed_tokens, cached by the connector
        new       = num_new_tokens, including unverified draft tokens
        lookahead = num_lookahead_tokens
        ```

        NOTE: for new tokens which include both verified and unverified draft
        tokens, we only cache the verified tokens (by capping the number at
        `request.num_tokens`).

        The allocation has three stages:
        - Free unnecessary blocks in `comp` and check
           if we have sufficient free blocks (return None if not).
        - Handle prefix tokens (`comp + new_comp + ext_comp`):
            - Free unnecessary blocks (e.g. outside sliding window)
            - Allocate new blocks for `ext_comp` tokens inside
              sliding window
        - Allocate new blocks for tokens to be computed (`new + lookahead`)

        Returns:
            A list of new allocated blocks.
        """
        # When loading KV data asynchronously, we may have zero new tokens to
        # compute while still allocating slots for externally computed tokens.
        if num_new_tokens == 0 and num_external_computed_tokens == 0:
            raise ValueError(
                "num_new_tokens must be greater than 0 when there are no "
                "external computed tokens"
            )

        if new_computed_blocks is not None:
            new_computed_block_list = new_computed_blocks.blocks
        else:
            new_computed_block_list = self.empty_kv_cache_blocks.blocks

        # The number of computed tokens is the number of computed tokens plus
        # the new prefix caching hits
        num_local_computed_tokens = (
            request.num_computed_tokens + num_new_computed_tokens
        )
        total_computed_tokens = min(
            num_local_computed_tokens + num_external_computed_tokens,
            self.max_model_len,
        )
        num_tokens_need_slot = min(
            total_computed_tokens + num_new_tokens + num_lookahead_tokens,
            self.max_model_len,
        )

        # Free the blocks that are skipped during the attention computation
        # (e.g., tokens outside the sliding window).
        # We can do this even if we cannot schedule this request due to
        # insufficient free blocks.
        # Should call this function before allocating new blocks to reduce
        # the number of evicted blocks.
        self.coordinator.remove_skipped_blocks(
            request.request_id, total_computed_tokens
        )

        num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate(
            request_id=request.request_id,
            num_tokens=num_tokens_need_slot,
            new_computed_blocks=new_computed_block_list,
            num_encoder_tokens=num_encoder_tokens,
            total_computed_tokens=num_local_computed_tokens
            + num_external_computed_tokens,
        )

        if num_blocks_to_allocate > self.block_pool.get_num_free_blocks():
            # Cannot allocate new blocks
            return None

        if (
            new_computed_block_list is not self.empty_kv_cache_blocks.blocks
            or num_external_computed_tokens > 0
        ):
            # Append the new computed blocks to the request blocks until now to
            # avoid the case where the new blocks cannot be allocated.
            self.coordinator.allocate_new_computed_blocks(
                request_id=request.request_id,
                new_computed_blocks=new_computed_block_list,
                num_local_computed_tokens=num_local_computed_tokens,
                num_external_computed_tokens=num_external_computed_tokens,
            )

        new_blocks = self.coordinator.allocate_new_blocks(
            request.request_id, num_tokens_need_slot, num_encoder_tokens
        )

        # P/D: delay caching blocks if we have to recv from
        # remote. Update state for locally cached blocks.
        if not self.enable_caching or delay_cache_blocks:
            return self.create_kv_cache_blocks(new_blocks)

        # NOTE(woosuk): We want to commit (cache) up to num_local_computed_tokens
        # + num_external_computed_tokens + num_new_tokens, but must exclude
        # "non-committable" tokens (e.g., draft tokens that could be rejected).
        # Therefore, we cap the number at `request.num_tokens`, ensuring only
        # "finalized" tokens are cached.
        num_tokens_to_cache = min(
            total_computed_tokens + num_new_tokens,
            request.num_tokens,
        )
        self.coordinator.cache_blocks(request, num_tokens_to_cache)

        return self.create_kv_cache_blocks(new_blocks)

    def free(self, request: Request) -> None:
        """Free the blocks allocated for the request.
        We free the blocks in reverse order so that the tail blocks are evicted
        first when caching is enabled.

        Args:
            request: The request to free the blocks.
        """
        self.coordinator.free(request.request_id)

    def remove_skipped_blocks(
        self, request_id: str, total_computed_tokens: int
    ) -> None:
        """Remove the blocks that are no longer needed from `blocks` and replace
        the removed blocks with null_block.

        Args:
            request_id: The request ID.
            total_computed_tokens: The total number of computed tokens, including
                local computed tokens and external computed tokens.
        """
        self.coordinator.remove_skipped_blocks(request_id, total_computed_tokens)

    def evict_blocks(self, block_ids: set[int]) -> None:
        """evict blocks from the prefix cache by their block IDs.

        Args:
            block_ids: Set of block IDs to evict from cache.
        """
        self.block_pool.evict_blocks(block_ids)

    def reset_prefix_cache(self) -> bool:
        """Reset prefix cache. This function may be used in RLHF
        flows to invalidate prefix caching after the weights are updated,
        or used for resetting prefix caching status for benchmarking.

        Returns:
            bool: True if the prefix cache is successfully reset,
            False otherwise.
        """
        if not self.block_pool.reset_prefix_cache():
            return False
        if self.log_stats:
            assert self.prefix_cache_stats is not None
            self.prefix_cache_stats.reset = True
        return True

    def get_num_common_prefix_blocks(self, running_request_id: str) -> list[int]:
        """Calculate the number of common prefix blocks for each kv cache group.

        The function selects a running request and iterates through its blocks.
        A block is considered a common prefix block if ALL requests with
        allocated KV cache share it (i.e., ref_cnt equals the number of entries
        in req_to_blocks).

        NOTE(woosuk): The number of requests with allocated KV cache is **greater
        than or equal to** the number of requests scheduled in the current step.
        This is because having allocated KV cache only indicates that:
        1. The request has not yet finished, and
        2. The request holds its blocks unfreed.

        While all scheduled requests must have allocated KV cache, the inverse
        is not necessarily true. There may be requests with allocated KV cache
        that are not scheduled in the current step.

        This can result in an edge case where the number of common prefix blocks
        is 0, even though all scheduled requests share a common prefix. This
        occurs because there may be unscheduled requests that do not share the
        common prefix. Currently, this case cannot be easily detected, so the
        function returns 0 in such cases.

        Args:
            running_request_id: The request ID of any running request, used to
                identify the common prefix blocks.

        Returns:
            list[int]: The number of common prefix blocks for each kv cache
            group.
        """
        return self.coordinator.get_num_common_prefix_blocks(running_request_id)

    def take_events(self) -> list[KVCacheEvent]:
        """Take the KV cache events from the block pool.

        Returns:
            A list of KV cache events.
        """
        return self.block_pool.take_events()

    def get_blocks(self, request_id: str) -> KVCacheBlocks:
        """Get the blocks of a request."""
        return self.create_kv_cache_blocks(self.coordinator.get_blocks(request_id))

    def get_block_ids(self, request_id: str) -> tuple[list[int], ...]:
        """Get the block ids of a request."""
        return self.get_blocks(request_id).get_block_ids()

    def cache_blocks(self, request: Request, num_computed_tokens: int) -> None:
        """Cache the blocks for the request, if enabled.

        Args:
            request: The request to cache the blocks.
            num_computed_tokens: The number of computed tokens, including tokens
                that are already cached and tokens to be cached.
        """
        if self.enable_caching:
            self.coordinator.cache_blocks(request, num_computed_tokens)

    def create_kv_cache_blocks(
        self, blocks: tuple[list[KVCacheBlock], ...]
    ) -> KVCacheBlocks:
        # Only create new KVCacheBlocks for non-empty blocks
        return KVCacheBlocks(blocks) if any(blocks) else self.empty_kv_cache_blocks

block_pool instance-attribute

block_pool = block_pool

coordinator instance-attribute

coordinator = get_kv_cache_coordinator(
    kv_cache_config=kv_cache_config,
    max_model_len=max_model_len,
    use_eagle=use_eagle,
    enable_caching=enable_caching,
    enable_kv_cache_events=enable_kv_cache_events,
    dcp_world_size=dcp_world_size,
    pcp_world_size=pcp_world_size,
    hash_block_size=hash_block_size,
    metrics_collector=metrics_collector,
)

empty_kv_cache_blocks instance-attribute

empty_kv_cache_blocks = KVCacheBlocks(
    tuple(() for _ in (range(num_kv_cache_groups)))
)

enable_caching instance-attribute

enable_caching = enable_caching

kv_cache_config instance-attribute

kv_cache_config = kv_cache_config

log_stats instance-attribute

log_stats = log_stats

max_model_len instance-attribute

max_model_len = max_model_len

metrics_collector instance-attribute

metrics_collector = metrics_collector

num_kv_cache_groups instance-attribute

num_kv_cache_groups = len(kv_cache_groups)

prefix_cache_stats instance-attribute

prefix_cache_stats = (
    PrefixCacheStats() if log_stats else None
)

usage property

usage: float

Get the KV cache usage.

Returns:

Type Description
float

The KV cache usage (between 0.0 and 1.0).

use_eagle instance-attribute

use_eagle = use_eagle

__init__

__init__(
    kv_cache_config: KVCacheConfig,
    max_model_len: int,
    hash_block_size: int,
    enable_caching: bool = True,
    use_eagle: bool = False,
    log_stats: bool = False,
    enable_kv_cache_events: bool = False,
    dcp_world_size: int = 1,
    pcp_world_size: int = 1,
    metrics_collector: KVCacheMetricsCollector
    | None = None,
) -> None
Source code in vllm/v1/core/kv_cache_manager.py
def __init__(
    self,
    kv_cache_config: KVCacheConfig,
    max_model_len: int,
    hash_block_size: int,
    enable_caching: bool = True,
    use_eagle: bool = False,
    log_stats: bool = False,
    enable_kv_cache_events: bool = False,
    dcp_world_size: int = 1,
    pcp_world_size: int = 1,
    metrics_collector: KVCacheMetricsCollector | None = None,
) -> None:
    self.max_model_len = max_model_len

    self.enable_caching = enable_caching
    self.use_eagle = use_eagle
    self.log_stats = log_stats
    self.metrics_collector = metrics_collector
    # FIXME: make prefix cache stats conditional on log_stats. We still need
    # this comment because when the log stats is enabled there are still
    # potential configs we could expose in the future.
    self.prefix_cache_stats = PrefixCacheStats() if log_stats else None

    self.coordinator = get_kv_cache_coordinator(
        kv_cache_config=kv_cache_config,
        max_model_len=self.max_model_len,
        use_eagle=self.use_eagle,
        enable_caching=self.enable_caching,
        enable_kv_cache_events=enable_kv_cache_events,
        dcp_world_size=dcp_world_size,
        pcp_world_size=pcp_world_size,
        hash_block_size=hash_block_size,
        metrics_collector=self.metrics_collector,
    )
    self.num_kv_cache_groups = len(kv_cache_config.kv_cache_groups)
    self.block_pool = self.coordinator.block_pool
    self.kv_cache_config = kv_cache_config

    # Pre-constructed KVCacheBlocks with no blocks, callers should use this
    # via create_kv_cache_blocks instead of creating new ones to avoid GC
    # overhead.
    #
    # We use nested tuples to ensure the empty KVCacheBlocks is immutable.
    self.empty_kv_cache_blocks = KVCacheBlocks(
        tuple(() for _ in range(self.num_kv_cache_groups))
    )

allocate_slots

allocate_slots(
    request: Request,
    num_new_tokens: int,
    num_new_computed_tokens: int = 0,
    new_computed_blocks: KVCacheBlocks | None = None,
    num_lookahead_tokens: int = 0,
    num_external_computed_tokens: int = 0,
    delay_cache_blocks: bool = False,
    num_encoder_tokens: int = 0,
) -> KVCacheBlocks | None

Add slots for a request with new tokens to append.

Parameters:

Name Type Description Default
request Request

The request to allocate slots.

required
num_new_tokens int

The number of new tokens to be allocated and computed.

required
num_new_computed_tokens int

The number of new computed tokens just hitting the prefix caching, excluding external tokens.

0
new_computed_blocks KVCacheBlocks | None

The cached blocks for the above new computed tokens, grouped as a tuple by kv cache groups.

None
num_lookahead_tokens int

The number of speculative tokens to allocate. This is used by spec decode proposers with kv-cache such as eagle.

0
num_external_computed_tokens int

The number of tokens that their KV caches are not cached by vLLM but cached by the connector.

0
delay_cache_blocks bool

Whether to skip caching the blocks. This is used by P/D when allocating blocks used in a KV transfer which will complete in a future step.

False
num_encoder_tokens int

The number of encoder tokens to allocate for cross-attention in encoder-decoder models(e.g., Whisper). For decoder-only models, this should be 0.

0

Blocks layout:

----------------------------------------------------------------------
| < comp > | < new_comp > | < ext_comp >  | < new >  | < lookahead > |
----------------------------------------------------------------------
                                          |   < to be computed >     |
----------------------------------------------------------------------
                          |            < to be allocated >           |
----------------------------------------------------------------------
                          | < to be cached (roughly, |
                          | details below)>          |
----------------------------------------------------------------------
| Prefix-cached tokens from either vLLM   |
| or connector. Can be safely removed if  |
| they are outside sliding window.        |
----------------------------------------------------------------------
|   < cached by vLLM >    | not cached by |
                          | vLLM, but     |
| ref_cnt  | ref_cnt not  | cached by     |
| increased| increased yet| connector     |
----------------------------------------------------------------------

Abbrivations:

comp      = request.num_computed_tokens
new_comp  = num_new_computed_tokens
          = len(new_computed_blocks) * block_size
ext_comp  = num_external_computed_tokens, cached by the connector
new       = num_new_tokens, including unverified draft tokens
lookahead = num_lookahead_tokens

NOTE: for new tokens which include both verified and unverified draft tokens, we only cache the verified tokens (by capping the number at request.num_tokens).

The allocation has three stages: - Free unnecessary blocks in comp and check if we have sufficient free blocks (return None if not). - Handle prefix tokens (comp + new_comp + ext_comp): - Free unnecessary blocks (e.g. outside sliding window) - Allocate new blocks for ext_comp tokens inside sliding window - Allocate new blocks for tokens to be computed (new + lookahead)

Returns:

Type Description
KVCacheBlocks | None

A list of new allocated blocks.

Source code in vllm/v1/core/kv_cache_manager.py
def allocate_slots(
    self,
    request: Request,
    num_new_tokens: int,
    num_new_computed_tokens: int = 0,
    new_computed_blocks: KVCacheBlocks | None = None,
    num_lookahead_tokens: int = 0,
    num_external_computed_tokens: int = 0,
    delay_cache_blocks: bool = False,
    num_encoder_tokens: int = 0,
) -> KVCacheBlocks | None:
    """Add slots for a request with new tokens to append.

    Args:
        request: The request to allocate slots.
        num_new_tokens: The number of new tokens to be allocated and computed.
        num_new_computed_tokens: The number of new computed tokens just
            hitting the prefix caching, excluding external tokens.
        new_computed_blocks: The cached blocks for the above new computed
            tokens, grouped as a tuple by kv cache groups.
        num_lookahead_tokens: The number of speculative tokens to allocate.
            This is used by spec decode proposers with kv-cache such
            as eagle.
        num_external_computed_tokens: The number of tokens that their
            KV caches are not cached by vLLM but cached by the connector.
        delay_cache_blocks: Whether to skip caching the blocks. This is
            used by P/D when allocating blocks used in a KV transfer
            which will complete in a future step.
        num_encoder_tokens: The number of encoder tokens to allocate for
            cross-attention in encoder-decoder models(e.g., Whisper).
            For decoder-only models, this should be 0.

    Blocks layout:
    ```
    ----------------------------------------------------------------------
    | < comp > | < new_comp > | < ext_comp >  | < new >  | < lookahead > |
    ----------------------------------------------------------------------
                                              |   < to be computed >     |
    ----------------------------------------------------------------------
                              |            < to be allocated >           |
    ----------------------------------------------------------------------
                              | < to be cached (roughly, |
                              | details below)>          |
    ----------------------------------------------------------------------
    | Prefix-cached tokens from either vLLM   |
    | or connector. Can be safely removed if  |
    | they are outside sliding window.        |
    ----------------------------------------------------------------------
    |   < cached by vLLM >    | not cached by |
                              | vLLM, but     |
    | ref_cnt  | ref_cnt not  | cached by     |
    | increased| increased yet| connector     |
    ----------------------------------------------------------------------
    ```

    Abbrivations:

    ```
    comp      = request.num_computed_tokens
    new_comp  = num_new_computed_tokens
              = len(new_computed_blocks) * block_size
    ext_comp  = num_external_computed_tokens, cached by the connector
    new       = num_new_tokens, including unverified draft tokens
    lookahead = num_lookahead_tokens
    ```

    NOTE: for new tokens which include both verified and unverified draft
    tokens, we only cache the verified tokens (by capping the number at
    `request.num_tokens`).

    The allocation has three stages:
    - Free unnecessary blocks in `comp` and check
       if we have sufficient free blocks (return None if not).
    - Handle prefix tokens (`comp + new_comp + ext_comp`):
        - Free unnecessary blocks (e.g. outside sliding window)
        - Allocate new blocks for `ext_comp` tokens inside
          sliding window
    - Allocate new blocks for tokens to be computed (`new + lookahead`)

    Returns:
        A list of new allocated blocks.
    """
    # When loading KV data asynchronously, we may have zero new tokens to
    # compute while still allocating slots for externally computed tokens.
    if num_new_tokens == 0 and num_external_computed_tokens == 0:
        raise ValueError(
            "num_new_tokens must be greater than 0 when there are no "
            "external computed tokens"
        )

    if new_computed_blocks is not None:
        new_computed_block_list = new_computed_blocks.blocks
    else:
        new_computed_block_list = self.empty_kv_cache_blocks.blocks

    # The number of computed tokens is the number of computed tokens plus
    # the new prefix caching hits
    num_local_computed_tokens = (
        request.num_computed_tokens + num_new_computed_tokens
    )
    total_computed_tokens = min(
        num_local_computed_tokens + num_external_computed_tokens,
        self.max_model_len,
    )
    num_tokens_need_slot = min(
        total_computed_tokens + num_new_tokens + num_lookahead_tokens,
        self.max_model_len,
    )

    # Free the blocks that are skipped during the attention computation
    # (e.g., tokens outside the sliding window).
    # We can do this even if we cannot schedule this request due to
    # insufficient free blocks.
    # Should call this function before allocating new blocks to reduce
    # the number of evicted blocks.
    self.coordinator.remove_skipped_blocks(
        request.request_id, total_computed_tokens
    )

    num_blocks_to_allocate = self.coordinator.get_num_blocks_to_allocate(
        request_id=request.request_id,
        num_tokens=num_tokens_need_slot,
        new_computed_blocks=new_computed_block_list,
        num_encoder_tokens=num_encoder_tokens,
        total_computed_tokens=num_local_computed_tokens
        + num_external_computed_tokens,
    )

    if num_blocks_to_allocate > self.block_pool.get_num_free_blocks():
        # Cannot allocate new blocks
        return None

    if (
        new_computed_block_list is not self.empty_kv_cache_blocks.blocks
        or num_external_computed_tokens > 0
    ):
        # Append the new computed blocks to the request blocks until now to
        # avoid the case where the new blocks cannot be allocated.
        self.coordinator.allocate_new_computed_blocks(
            request_id=request.request_id,
            new_computed_blocks=new_computed_block_list,
            num_local_computed_tokens=num_local_computed_tokens,
            num_external_computed_tokens=num_external_computed_tokens,
        )

    new_blocks = self.coordinator.allocate_new_blocks(
        request.request_id, num_tokens_need_slot, num_encoder_tokens
    )

    # P/D: delay caching blocks if we have to recv from
    # remote. Update state for locally cached blocks.
    if not self.enable_caching or delay_cache_blocks:
        return self.create_kv_cache_blocks(new_blocks)

    # NOTE(woosuk): We want to commit (cache) up to num_local_computed_tokens
    # + num_external_computed_tokens + num_new_tokens, but must exclude
    # "non-committable" tokens (e.g., draft tokens that could be rejected).
    # Therefore, we cap the number at `request.num_tokens`, ensuring only
    # "finalized" tokens are cached.
    num_tokens_to_cache = min(
        total_computed_tokens + num_new_tokens,
        request.num_tokens,
    )
    self.coordinator.cache_blocks(request, num_tokens_to_cache)

    return self.create_kv_cache_blocks(new_blocks)

cache_blocks

cache_blocks(
    request: Request, num_computed_tokens: int
) -> None

Cache the blocks for the request, if enabled.

Parameters:

Name Type Description Default
request Request

The request to cache the blocks.

required
num_computed_tokens int

The number of computed tokens, including tokens that are already cached and tokens to be cached.

required
Source code in vllm/v1/core/kv_cache_manager.py
def cache_blocks(self, request: Request, num_computed_tokens: int) -> None:
    """Cache the blocks for the request, if enabled.

    Args:
        request: The request to cache the blocks.
        num_computed_tokens: The number of computed tokens, including tokens
            that are already cached and tokens to be cached.
    """
    if self.enable_caching:
        self.coordinator.cache_blocks(request, num_computed_tokens)

create_kv_cache_blocks

create_kv_cache_blocks(
    blocks: tuple[list[KVCacheBlock], ...],
) -> KVCacheBlocks
Source code in vllm/v1/core/kv_cache_manager.py
def create_kv_cache_blocks(
    self, blocks: tuple[list[KVCacheBlock], ...]
) -> KVCacheBlocks:
    # Only create new KVCacheBlocks for non-empty blocks
    return KVCacheBlocks(blocks) if any(blocks) else self.empty_kv_cache_blocks

evict_blocks

evict_blocks(block_ids: set[int]) -> None

evict blocks from the prefix cache by their block IDs.

Parameters:

Name Type Description Default
block_ids set[int]

Set of block IDs to evict from cache.

required
Source code in vllm/v1/core/kv_cache_manager.py
def evict_blocks(self, block_ids: set[int]) -> None:
    """evict blocks from the prefix cache by their block IDs.

    Args:
        block_ids: Set of block IDs to evict from cache.
    """
    self.block_pool.evict_blocks(block_ids)

free

free(request: Request) -> None

Free the blocks allocated for the request. We free the blocks in reverse order so that the tail blocks are evicted first when caching is enabled.

Parameters:

Name Type Description Default
request Request

The request to free the blocks.

required
Source code in vllm/v1/core/kv_cache_manager.py
def free(self, request: Request) -> None:
    """Free the blocks allocated for the request.
    We free the blocks in reverse order so that the tail blocks are evicted
    first when caching is enabled.

    Args:
        request: The request to free the blocks.
    """
    self.coordinator.free(request.request_id)

get_block_ids

get_block_ids(request_id: str) -> tuple[list[int], ...]

Get the block ids of a request.

Source code in vllm/v1/core/kv_cache_manager.py
def get_block_ids(self, request_id: str) -> tuple[list[int], ...]:
    """Get the block ids of a request."""
    return self.get_blocks(request_id).get_block_ids()

get_blocks

get_blocks(request_id: str) -> KVCacheBlocks

Get the blocks of a request.

Source code in vllm/v1/core/kv_cache_manager.py
def get_blocks(self, request_id: str) -> KVCacheBlocks:
    """Get the blocks of a request."""
    return self.create_kv_cache_blocks(self.coordinator.get_blocks(request_id))

get_computed_blocks

get_computed_blocks(
    request: Request,
) -> tuple[KVCacheBlocks, int]

Get the computed (cached) blocks for the request. Note that the computed blocks must be full.

Parameters:

Name Type Description Default
request Request

The request to get the computed blocks.

required

Returns:

Type Description
tuple[KVCacheBlocks, int]

A tuple containing: - A list of blocks that are computed for the request. - The number of computed tokens.

Source code in vllm/v1/core/kv_cache_manager.py
def get_computed_blocks(self, request: Request) -> tuple[KVCacheBlocks, int]:
    """Get the computed (cached) blocks for the request.
    Note that the computed blocks must be full.

    Args:
        request: The request to get the computed blocks.

    Returns:
        A tuple containing:
            - A list of blocks that are computed for the request.
            - The number of computed tokens.
    """
    # We skip finding the prefix cache hit when prefix caching is
    # disabled or the request is marked as skipping kv cache read
    # (which happens when the request requires prompt logprobs
    # or calls a pooling model with all pooling).
    if not self.enable_caching or request.skip_reading_prefix_cache:
        return self.empty_kv_cache_blocks, 0

    # NOTE: When all tokens hit the cache, we must recompute the last token
    # to obtain logits. Thus, set max_cache_hit_length to prompt_length - 1.
    # This can trigger recomputation of an entire block, rather than just
    # the single last token, because allocate_slots() requires
    # num_computed_tokens to be block-size aligned. Removing this limitation
    # could slightly improve performance in the future.
    max_cache_hit_length = request.num_tokens - 1
    computed_blocks, num_new_computed_tokens = (
        self.coordinator.find_longest_cache_hit(
            request.block_hashes, max_cache_hit_length
        )
    )

    if self.log_stats:
        assert self.prefix_cache_stats is not None
        self.prefix_cache_stats.record(
            num_tokens=request.num_tokens,
            num_hits=num_new_computed_tokens,
            preempted=request.num_preemptions > 0,
        )

    return self.create_kv_cache_blocks(computed_blocks), num_new_computed_tokens

get_num_common_prefix_blocks

get_num_common_prefix_blocks(
    running_request_id: str,
) -> list[int]

Calculate the number of common prefix blocks for each kv cache group.

The function selects a running request and iterates through its blocks. A block is considered a common prefix block if ALL requests with allocated KV cache share it (i.e., ref_cnt equals the number of entries in req_to_blocks).

NOTE(woosuk): The number of requests with allocated KV cache is greater than or equal to the number of requests scheduled in the current step. This is because having allocated KV cache only indicates that: 1. The request has not yet finished, and 2. The request holds its blocks unfreed.

While all scheduled requests must have allocated KV cache, the inverse is not necessarily true. There may be requests with allocated KV cache that are not scheduled in the current step.

This can result in an edge case where the number of common prefix blocks is 0, even though all scheduled requests share a common prefix. This occurs because there may be unscheduled requests that do not share the common prefix. Currently, this case cannot be easily detected, so the function returns 0 in such cases.

Parameters:

Name Type Description Default
running_request_id str

The request ID of any running request, used to identify the common prefix blocks.

required

Returns:

Type Description
list[int]

list[int]: The number of common prefix blocks for each kv cache

list[int]

group.

Source code in vllm/v1/core/kv_cache_manager.py
def get_num_common_prefix_blocks(self, running_request_id: str) -> list[int]:
    """Calculate the number of common prefix blocks for each kv cache group.

    The function selects a running request and iterates through its blocks.
    A block is considered a common prefix block if ALL requests with
    allocated KV cache share it (i.e., ref_cnt equals the number of entries
    in req_to_blocks).

    NOTE(woosuk): The number of requests with allocated KV cache is **greater
    than or equal to** the number of requests scheduled in the current step.
    This is because having allocated KV cache only indicates that:
    1. The request has not yet finished, and
    2. The request holds its blocks unfreed.

    While all scheduled requests must have allocated KV cache, the inverse
    is not necessarily true. There may be requests with allocated KV cache
    that are not scheduled in the current step.

    This can result in an edge case where the number of common prefix blocks
    is 0, even though all scheduled requests share a common prefix. This
    occurs because there may be unscheduled requests that do not share the
    common prefix. Currently, this case cannot be easily detected, so the
    function returns 0 in such cases.

    Args:
        running_request_id: The request ID of any running request, used to
            identify the common prefix blocks.

    Returns:
        list[int]: The number of common prefix blocks for each kv cache
        group.
    """
    return self.coordinator.get_num_common_prefix_blocks(running_request_id)

make_prefix_cache_stats

make_prefix_cache_stats() -> PrefixCacheStats | None

Get (and reset) the prefix cache stats.

Returns:

Type Description
PrefixCacheStats | None

The current prefix caching stats, or None if logging is disabled.

Source code in vllm/v1/core/kv_cache_manager.py
def make_prefix_cache_stats(self) -> PrefixCacheStats | None:
    """Get (and reset) the prefix cache stats.

    Returns:
        The current prefix caching stats, or None if logging is disabled.
    """
    if not self.log_stats:
        return None
    stats = self.prefix_cache_stats
    self.prefix_cache_stats = PrefixCacheStats()
    return stats

remove_skipped_blocks

remove_skipped_blocks(
    request_id: str, total_computed_tokens: int
) -> None

Remove the blocks that are no longer needed from blocks and replace the removed blocks with null_block.

Parameters:

Name Type Description Default
request_id str

The request ID.

required
total_computed_tokens int

The total number of computed tokens, including local computed tokens and external computed tokens.

required
Source code in vllm/v1/core/kv_cache_manager.py
def remove_skipped_blocks(
    self, request_id: str, total_computed_tokens: int
) -> None:
    """Remove the blocks that are no longer needed from `blocks` and replace
    the removed blocks with null_block.

    Args:
        request_id: The request ID.
        total_computed_tokens: The total number of computed tokens, including
            local computed tokens and external computed tokens.
    """
    self.coordinator.remove_skipped_blocks(request_id, total_computed_tokens)

reset_prefix_cache

reset_prefix_cache() -> bool

Reset prefix cache. This function may be used in RLHF flows to invalidate prefix caching after the weights are updated, or used for resetting prefix caching status for benchmarking.

Returns:

Name Type Description
bool bool

True if the prefix cache is successfully reset,

bool

False otherwise.

Source code in vllm/v1/core/kv_cache_manager.py
def reset_prefix_cache(self) -> bool:
    """Reset prefix cache. This function may be used in RLHF
    flows to invalidate prefix caching after the weights are updated,
    or used for resetting prefix caching status for benchmarking.

    Returns:
        bool: True if the prefix cache is successfully reset,
        False otherwise.
    """
    if not self.block_pool.reset_prefix_cache():
        return False
    if self.log_stats:
        assert self.prefix_cache_stats is not None
        self.prefix_cache_stats.reset = True
    return True

take_events

take_events() -> list[KVCacheEvent]

Take the KV cache events from the block pool.

Returns:

Type Description
list[KVCacheEvent]

A list of KV cache events.

Source code in vllm/v1/core/kv_cache_manager.py
def take_events(self) -> list[KVCacheEvent]:
    """Take the KV cache events from the block pool.

    Returns:
        A list of KV cache events.
    """
    return self.block_pool.take_events()