Skip to content

vllm.model_executor.layers.pooler

Modules:

Name Description
abstract
activations
common
seqwise

Poolers that produce an output aggregating all tokens in the sequence.

special
tokwise

Poolers that produce an output for each token in the sequence.

ActivationFn module-attribute

ActivationFn = Callable[[_T], _T]

ClassifierFn module-attribute

ClassifierFn = Callable[[Tensor], Tensor]

ProjectorFn module-attribute

ProjectorFn = Callable[[Tensor], Tensor]

DispatchPooler

Bases: Pooler

Dispatches calls to a sub-pooler based on the pooling task.

Source code in vllm/model_executor/layers/pooler/special.py
class DispatchPooler(Pooler):
    """Dispatches calls to a sub-pooler based on the pooling task."""

    @classmethod
    def for_embedding(cls, pooler_config: PoolerConfig):
        return cls(
            {
                "token_embed": pooler_for_token_embed(pooler_config),
                "embed": pooler_for_embed(pooler_config),
            },
        )

    @classmethod
    def for_seq_cls(
        cls,
        pooler_config: PoolerConfig,
        *,
        pooling: SequencePoolingMethod | SequencePoolingFn | None = None,
        classifier: ClassifierFn | None = None,
    ):
        return cls(
            {
                "token_classify": pooler_for_token_classify(
                    pooler_config,
                    pooling=AllPool(),
                    classifier=classifier,
                ),
                "classify": pooler_for_classify(
                    pooler_config,
                    pooling=pooling,
                    classifier=classifier,
                    act_fn="classify",
                ),
                "score": pooler_for_classify(
                    pooler_config,
                    pooling=pooling,
                    classifier=classifier,
                    act_fn="score",
                ),
            }
        )

    def __init__(self, poolers_by_task: Mapping[PoolingTask, Pooler]) -> None:
        super().__init__()

        for task, pooler in poolers_by_task.items():
            if task not in pooler.get_supported_tasks():
                raise ValueError(
                    f"{pooler=} does not support {task=}. "
                    f"Supported tasks: {pooler.get_supported_tasks()}"
                )

        self.poolers_by_task = poolers_by_task

    def get_supported_tasks(self) -> Set[PoolingTask]:
        return set(self.poolers_by_task)

    def get_pooling_updates(self, task: PoolingTask) -> PoolingParamsUpdate:
        return self.poolers_by_task[task].get_pooling_updates(task)

    def forward(
        self,
        hidden_states: torch.Tensor,
        pooling_metadata: PoolingMetadata,
    ) -> PoolerOutput:
        poolers_by_task = self.poolers_by_task

        outputs = list[torch.Tensor | None]()
        offset = 0
        for task, group in groupby(pooling_metadata.tasks):
            if not (pooler := poolers_by_task.get(task)):
                raise ValueError(
                    f"Unsupported task: {task!r} "
                    f"Supported tasks: {self.get_supported_tasks()}"
                )

            num_items = len(list(group))
            group_output: PoolerOutput = pooler(
                hidden_states,
                pooling_metadata[offset : offset + num_items],
            )

            outputs.extend(group_output)
            offset += num_items

        return outputs

    def extra_repr(self) -> str:
        s = f"supported_task={self.get_supported_tasks()}"
        return s

poolers_by_task instance-attribute

poolers_by_task = poolers_by_task

__init__

__init__(
    poolers_by_task: Mapping[PoolingTask, Pooler],
) -> None
Source code in vllm/model_executor/layers/pooler/special.py
def __init__(self, poolers_by_task: Mapping[PoolingTask, Pooler]) -> None:
    super().__init__()

    for task, pooler in poolers_by_task.items():
        if task not in pooler.get_supported_tasks():
            raise ValueError(
                f"{pooler=} does not support {task=}. "
                f"Supported tasks: {pooler.get_supported_tasks()}"
            )

    self.poolers_by_task = poolers_by_task

extra_repr

extra_repr() -> str
Source code in vllm/model_executor/layers/pooler/special.py
def extra_repr(self) -> str:
    s = f"supported_task={self.get_supported_tasks()}"
    return s

for_embedding classmethod

for_embedding(pooler_config: PoolerConfig)
Source code in vllm/model_executor/layers/pooler/special.py
@classmethod
def for_embedding(cls, pooler_config: PoolerConfig):
    return cls(
        {
            "token_embed": pooler_for_token_embed(pooler_config),
            "embed": pooler_for_embed(pooler_config),
        },
    )

for_seq_cls classmethod

for_seq_cls(
    pooler_config: PoolerConfig,
    *,
    pooling: SequencePoolingMethod
    | SequencePoolingFn
    | None = None,
    classifier: ClassifierFn | None = None,
)
Source code in vllm/model_executor/layers/pooler/special.py
@classmethod
def for_seq_cls(
    cls,
    pooler_config: PoolerConfig,
    *,
    pooling: SequencePoolingMethod | SequencePoolingFn | None = None,
    classifier: ClassifierFn | None = None,
):
    return cls(
        {
            "token_classify": pooler_for_token_classify(
                pooler_config,
                pooling=AllPool(),
                classifier=classifier,
            ),
            "classify": pooler_for_classify(
                pooler_config,
                pooling=pooling,
                classifier=classifier,
                act_fn="classify",
            ),
            "score": pooler_for_classify(
                pooler_config,
                pooling=pooling,
                classifier=classifier,
                act_fn="score",
            ),
        }
    )

forward

forward(
    hidden_states: Tensor, pooling_metadata: PoolingMetadata
) -> PoolerOutput
Source code in vllm/model_executor/layers/pooler/special.py
def forward(
    self,
    hidden_states: torch.Tensor,
    pooling_metadata: PoolingMetadata,
) -> PoolerOutput:
    poolers_by_task = self.poolers_by_task

    outputs = list[torch.Tensor | None]()
    offset = 0
    for task, group in groupby(pooling_metadata.tasks):
        if not (pooler := poolers_by_task.get(task)):
            raise ValueError(
                f"Unsupported task: {task!r} "
                f"Supported tasks: {self.get_supported_tasks()}"
            )

        num_items = len(list(group))
        group_output: PoolerOutput = pooler(
            hidden_states,
            pooling_metadata[offset : offset + num_items],
        )

        outputs.extend(group_output)
        offset += num_items

    return outputs

get_pooling_updates

get_pooling_updates(
    task: PoolingTask,
) -> PoolingParamsUpdate
Source code in vllm/model_executor/layers/pooler/special.py
def get_pooling_updates(self, task: PoolingTask) -> PoolingParamsUpdate:
    return self.poolers_by_task[task].get_pooling_updates(task)

get_supported_tasks

get_supported_tasks() -> Set[PoolingTask]
Source code in vllm/model_executor/layers/pooler/special.py
def get_supported_tasks(self) -> Set[PoolingTask]:
    return set(self.poolers_by_task)

IdentityPooler

Bases: Pooler

Source code in vllm/model_executor/layers/pooler/special.py
class IdentityPooler(Pooler):
    def get_supported_tasks(self) -> Set[PoolingTask]:
        return {"plugin", "score"}

    def forward(
        self,
        hidden_states: torch.Tensor,
        pooling_metadata: PoolingMetadata,
    ) -> PoolerOutput:
        return hidden_states

forward

forward(
    hidden_states: Tensor, pooling_metadata: PoolingMetadata
) -> PoolerOutput
Source code in vllm/model_executor/layers/pooler/special.py
def forward(
    self,
    hidden_states: torch.Tensor,
    pooling_metadata: PoolingMetadata,
) -> PoolerOutput:
    return hidden_states

get_supported_tasks

get_supported_tasks() -> Set[PoolingTask]
Source code in vllm/model_executor/layers/pooler/special.py
def get_supported_tasks(self) -> Set[PoolingTask]:
    return {"plugin", "score"}

Pooler

Bases: Module, ABC

The interface required for all poolers used in pooling models in vLLM.

Source code in vllm/model_executor/layers/pooler/abstract.py
class Pooler(nn.Module, ABC):
    """The interface required for all poolers used in pooling models in vLLM."""

    @abstractmethod
    def get_supported_tasks(self) -> Set[PoolingTask]:
        """Determine which pooling tasks are supported."""
        raise NotImplementedError

    def get_pooling_updates(self, task: PoolingTask) -> PoolingParamsUpdate:
        """
        Construct the updated pooling parameters to use for a supported task.
        """
        return PoolingParamsUpdate()

    @abstractmethod
    def forward(
        self,
        hidden_states: torch.Tensor,
        pooling_metadata: PoolingMetadata,
    ) -> PoolerOutput:
        raise NotImplementedError

forward abstractmethod

forward(
    hidden_states: Tensor, pooling_metadata: PoolingMetadata
) -> PoolerOutput
Source code in vllm/model_executor/layers/pooler/abstract.py
@abstractmethod
def forward(
    self,
    hidden_states: torch.Tensor,
    pooling_metadata: PoolingMetadata,
) -> PoolerOutput:
    raise NotImplementedError

get_pooling_updates

get_pooling_updates(
    task: PoolingTask,
) -> PoolingParamsUpdate

Construct the updated pooling parameters to use for a supported task.

Source code in vllm/model_executor/layers/pooler/abstract.py
def get_pooling_updates(self, task: PoolingTask) -> PoolingParamsUpdate:
    """
    Construct the updated pooling parameters to use for a supported task.
    """
    return PoolingParamsUpdate()

get_supported_tasks abstractmethod

get_supported_tasks() -> Set[PoolingTask]

Determine which pooling tasks are supported.

Source code in vllm/model_executor/layers/pooler/abstract.py
@abstractmethod
def get_supported_tasks(self) -> Set[PoolingTask]:
    """Determine which pooling tasks are supported."""
    raise NotImplementedError

PoolingParamsUpdate dataclass

Source code in vllm/model_executor/layers/pooler/common.py
@dataclass(frozen=True)
class PoolingParamsUpdate:
    requires_token_ids: bool = False
    """Set this flag to enable `get_prompt_token_ids` for your pooler."""

    def __or__(self, other: "PoolingParamsUpdate") -> "PoolingParamsUpdate":
        return PoolingParamsUpdate(
            requires_token_ids=self.requires_token_ids or other.requires_token_ids,
        )

    def apply(self, params: PoolingParams) -> None:
        params.requires_token_ids = self.requires_token_ids

requires_token_ids class-attribute instance-attribute

requires_token_ids: bool = False

Set this flag to enable get_prompt_token_ids for your pooler.

__init__

__init__(requires_token_ids: bool = False) -> None

__or__

Source code in vllm/model_executor/layers/pooler/common.py
def __or__(self, other: "PoolingParamsUpdate") -> "PoolingParamsUpdate":
    return PoolingParamsUpdate(
        requires_token_ids=self.requires_token_ids or other.requires_token_ids,
    )

apply

apply(params: PoolingParams) -> None
Source code in vllm/model_executor/layers/pooler/common.py
def apply(self, params: PoolingParams) -> None:
    params.requires_token_ids = self.requires_token_ids