前言

KV cache 是 vLLM 的基础,KV 值的传递是 PD 分离实现的关键,目前 vLLM 中由 kv_transfer 模块(包括两个版本 V0/V1)完成。

在 KV cache 传递过程中要考虑哪些问题,传输会不会成为系统的瓶颈?本文主要讨论 PD 分离下 KV cache 传递机制,对过程中可能遇到的问题、未来演进方向进行探讨。

1、KV cache传递常见问题

训练/推理过程中,数据在跨设备之间的传递已较为普遍,KV 值在 P/D 之间传递可以认为是一个子类应用,因此在实现方式带宽、延时、计算通信重叠等方面都会遇到与模型分布式训练/推理类似的问题。

这里讨论几个 KV 传递当前常见的问题:

(1)传递的常见方式有哪些?

根据目前一些框架的形式来看,主要有中心存储离散式(分布式)两种方式,当然也可以是两者的结合。

所谓中心存储就是建立一个跨设备的 KV store,由它统一管理 KV 值,包括 KV 的增、删、查、传递等工作,推理实例(P/D)连接 KV store 后只需负责往里面增、查数据。

另一种是分布式:P2P 的方式进行数据传递,各个实例分别管理自己的存储,比如一个 P 实例计算完成后,向目标 D 实例建立通信完成 KV 值传递,这种方式没有统一的传递中介。

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_大模型学习

**两种方式各有利弊:**中心存储能够构建更大的集群管理体系、充分利用各种存储介质、传输通道,计算重用的效能更高,但性能在某些情况下可能会较差(取决于系统设计),维护成本较高;

离散式的优势是架构清晰,性能更好,但扩展性、链路稳定性都不理想。

两种方式可以混用,但目前 Mooncake、Dynamo 等主流推理解决方案更倾向于中心存储的方式,把传递中的复杂度问题内部解决。

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_大模型学习_02

Mooncake 的中心存储

这里简单罗列存储介质和传输方式:

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_大模型入门_03

(2)配合模型并行传递需要注意的问题?

当加载模型过大时一般需要用到模型并行(TP、PP、EP、SP 等)。

而这会使得模型计算的 KV 值分散在各个 rank 设备中(GPU/NPU),而 P 模型和 D 模型的分布式策略可能还不一样。

比如 P 实例用 TP4、EP1,D 实例用 TP1、PP2、EP4,这样 P 和 D 实例无法直接进行一对一的 rank 传递。

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_人工智能_04

若切分策略相同,可以找副本位置进行传递,这样效率更高,比如直接用 NCCL 的 P2P 通信;

当切分方式不一样时,就需要考虑先聚合后传递再分发的方式,比如让 TP 组中 rank0 进行汇聚后传递,但这样可能不能充分利用传输链路的优势(如 NVLink)。

当然 KV blocks 的尺寸足够细时,不用聚合,但会引发一个新问题,多个 rank 要从相同 rank 拉数据,这样是否会因冲突产生阻塞,导致效率降低?是值得考虑的。

(3) KV cache 传递一定是只有 P 端到 D 端吗?

PD 分离多对多模式,P 和 D 之间任意映射都要能完成。在 vLLM 有 prefix cache 功能,可以利用历史计算数据,存在一种情况:D 端的 block 保存的数据,P 端是否能够利用?

根据 prefix cache 的 block 管理来看(如下图)这完全是可能的。所以 P 可以从 D 拉取已计算数据。

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_大模型入门_05

顺便提一下,在 vLLM 的 V1 版本中 D 的调度逻辑可以设计成:KV cache 先本地查找匹配,然后再从远端获取。这样当本地已有匹配值时,避免从 P 实例去拉取重复的 KV 值。

(4)KV 传输与模型计算传输是否会争抢带宽?

这里分两种情况:按模型整体计算传输按层计算传输

按模型整体计算传输是指一个请求的 P 处理全部结束再传输 KV 值,通常该情景下能够错开通信链路的使用,不会出现带宽争抢,但也存在一些极端情况,模型计算速度快于传输速度。

出现上一个请求的 KV 传递还未结束下一个请求的计算就需要使用通信链路的情况;

另一个是按层计算(或者按 block)传递:完成一个单位的 KV cache 计算时向下一阶段的 D 实例立刻进行异步传输。

这种方式能够避免串行带来的性能下降(可能),但也需要考虑当模型计算的通信带宽使用较满时,两者会出现带宽争抢。

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_人工智能_06

2、vLLM 的现状分析

vLLM 的 PD 分离在前面文章中有过简单方案分析,提到了两个关键点:

  • 目前 vLLM 架构正在调整,所以许多特性有两套方案
  • PD 分离方案目前处于原型探索阶段

目前,KV cache 传递机制在 kv transfer 中实现(vllm/vllm/distributed/kv_transfer/),存在 V0 和 V1 两个版本。

V1 的版本迭代速度较快,代码可能会出现较大改动,这里以 0.8.5 版本进行现状分析,主要了解目前架构和接口设计。

(1)V0 版本

KV cache 的传递在 PD 实例之间通过**连接器(Connector)**完成,目前 V0 版本的结构如下图所示,是典型的生产者-消费者模式。

connector 里面包括查询缓冲(Lookup Buffer)、传递管道(pipe),还有生产者(producer,P 实例)、消费者(consumer,D 实例)。

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_大模型入门_07

架构上: pipe 负责数据传递、buffer 负责构建一个 FIFO 队列缓冲数据、connector 在上层负责操作的协调并对外提供接口。

它们的一些关键函数如下所示:

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_AI大模型_08

Connector 模块

在 connector 的上层有一个 transfer agent 模块,它主要负责对外暴露调用接口,初始化里面调用工厂类(KVConnectorFactory)根据 KVTransferConfig 来决定使用什么 connector。

class KVTransferAgent:    def __init__(        self,        rank: int,        local_rank: int,        config: "VllmConfig",    ):        self.config = config        if config.kv_transfer_config is None:            raise ValueError("KVTransferConfig is not set in the VllmConfig,"                             " cannot initialize KVConnector.")        assert self.config.kv_transfer_config.is_kv_transfer_instance, "KV"\            "TransferAgent should only be used when kv_connector is set."        # 工厂类的作用        self.connector = KVConnectorFactory.create_connector_v0(            rank, local_rank, config)

connector 基类的定义参考:

vllm/vllm/distributed/kv_transfer/kv_connector/base.py

基类函数主要是两个:发送函数(P 调用)、接收函数(D 调用)。

其操作逻辑如下所示:

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_AI大模型_09

connector 的具体实现,可以根据不同的存储库来实现,目前 V0 版本中当前已注册的工厂类如下:

KVConnectorFactory.register_connector(    "PyNcclConnector",    "vllm.distributed.kv_transfer.kv_connector.simple_connector",    "SimpleConnector")KVConnectorFactory.register_connector(    "MooncakeConnector",    "vllm.distributed.kv_transfer.kv_connector.simple_connector",    "SimpleConnector")KVConnectorFactory.register_connector(    "LMCacheConnector",    "vllm.distributed.kv_transfer.kv_connector.lmcache_connector",    "LMCacheConnector")KVConnectorFactory.register_connector(    "MooncakeStoreConnector",    "vllm.distributed.kv_transfer.kv_connector.mooncake_store_connector",    "MooncakeStoreConnector")

Lookup Buffer 模块

查找缓冲主要用于构建一个 FIFO 队列的数据缓冲区,它能保证数据传递顺序,控制数据占用的显存量,主要的对外接口是 insert(生产者调用)、drop_select(消费者调用),其大致逻辑如下图所示。

生产者侧会创建一个 buffer(双端队列),insert 函数会在主线程中将 KV 数据添加到队列,同时创建一个后台子线程 drop_select_handler,负责从 buffer 中取出数据并发送给消费者。

生产者与消费者之间的通信有两个步骤,首先,消费者会发送 input_tokens 和 roi,生产者根据这两个信息判断 KV cache 数据是否已计算完成,若已完成则将 key、value、hidden 等数据发送给消费者。

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_大模型_10

这里用一段代码解释一下 roi 的作用:

取值操作操作:input_tokens = torch.tensor([1, 2, 3, 4, 5])roi = torch.tensor([False, True, False, True, False], dtype=torch.bool)result = input_tokens[roi]print(result)  # 输出:tensor([2, 4])POP操作:原始 buffer: [10, 20, 30, 40, 50]input_tokens: tensor([1, 2, 3, 4, 5])roi: tensor([False,  True, False,  True, False])buffer.pop(input_tokens, roi)处理后的 buffer: [10, 30, 50]

Pipe 模块

pipe 用来进行具体的数据传递,对外接口:

class KVPipeBase(ABC):    """    This class provides an interface for sending and receiving tensors, or    None, by distributed communications.    """    @abstractmethod    def send_tensor(self, tensor: Optional[torch.Tensor]) -> None:        """Send a tensor, or None, via the pipe.        """        raise NotImplementedError    @abstractmethod    def recv_tensor(self) -> Optional[torch.Tensor]:        """Receive a tensor (can be None) from the pipeline.        """        raise NotImplementedError    @abstractmethod    def close(self) -> None:        """Close the pipeline and release resources.        """        raise NotImplementedError

以 PyNcclPipe 为例,用于在分布式环境中实现张量及其元数据的发送和接收功能。

里面的一些关键步骤如下:

  • **配置与设备选择:**保存传入的配置信息 KVTransferConfig,并根据配置或传入的参数选择设备(CUDA 或 CPU)。
  • **分布式连接建立:**使用 StatelessProcessGroup 创建一个分布式进程组,通过 barrier 方法确保连接正确初始化。
  • **发送和接收实现选择:**根据设备类型选择合适的发送和接收实现,对于 CUDA 设备使用 PyNcclCommunicator,对于 CPU 设备使用 StatelessProcessGroup 的对象发送和接收方法。
  • **传输相关变量初始化:**初始化传输线程池、缓冲区大小和缓冲区大小阈值。

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_大模型_11

(2)V1 版本

V1 版本中 PD 的角色关系已变得比较模糊,P 和 D 既可以是生产者也可以是消费者,而且默认开启 prefix KV cache 特性后 P 可以从 D 实例中获取已计算好的 Block(虽然这个功能暂未实现)。

V1 的 transfer 实现主要架构如下图所示,最大特点是 connector 有两个执行角色(Role):scheduler_connector 和 worker_connector,分别在 scheduler 线程和 worker 线程中执行。

scheduler 负责指挥 worker 进行 KV 数据的传递,两者之间的信息桥梁是元数据(KVConnectorMetadata),worker 通过 metadata 知道哪些 KV 值需要从远端加载。

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_人工智能_12

考虑一个问题**:**KV connector 按理来说只要让 worker 能调用就行,为什么在 scheduler 里面也要运行?

这个问题要从 scheduler 的重复计算功能角度思考,因为 scheduler 可以通过匹配利用历史计算数据(block),在远端的 KV cache 也可以认为是一种计算好的数据。

但这些数据在远端,所以在 PD 分离场景下 scheduler 需要增加逻辑:让 worker 拉取远端 KV、远端 KV 没获取完前 request 请求需要继续等待、本地不需要新计算 block 等。

反过来,worker 获取远端 KV cache 值刷新了本地 block 值需要告知 scheduler。

Scheduler connector

connector 基类里面包括 scheduler 端、worker 端调用的函数接口,其中给 scheduler 用的主要是:

  • get_num_new_matched_tokens(): 获取远端已计算的 KV cache 的 token 数量。
  • update_state_after_alloc() : block 开辟后,更新 connector 状态。
  • build_connector_meta(): 构建元数据,scheduler 告诉 worker 需要保持/加载哪些 KV 数据。

函数在 scheduler 的 schedule() 操作中调用,会改变 num_computed_tokens 的计算,创建 worker 交互的 meta 数据。

但 base 类的操作不改变 scheduler_output,也不改变队列执行顺序(扩展类可改变队列执行逻辑)。

# vllm/vllm/v1/core/sched/scheduler.py
# scheduler中的函数调用位置:# KVConnector: update internal state after allocation.                # This information is used to determine if a load is                # needed for this request. if self.connector is not None:self.connector.update_state_after_alloc(request,new_computed_blocks + new_blocks,                        num_external_computed_tokens,)
# ...
# 计算远端已有kv数量,                # Get externally-cached tokens if using a KVConnector.                num_external_computed_tokens, load_kv_async = (                    (0, False) if self.connector is None else                    self.connector.get_num_new_matched_tokens(                        request, num_native_computed_tokens))                # Total computed tokens (local + external).                num_computed_tokens = (num_native_computed_tokens +                                       num_external_computed_tokens +                                       num_prealloc_computed_tokens)# ...        # NOTE(Kuntai): this function is designed for multiple purposes:        # 1. Plan the KV cache store        # 2. Wrap up all the KV cache load / save ops into an opaque object        # 3. Clear the internal states of the connector        if self.connector is not None:            meta = self.connector.build_connector_meta(scheduler_output)            scheduler_output.kv_connector_metadata = meta

Worker connector

worker connector 主要接口是生产者端调用存储KV接口,消费者端调用的加载 KV 接口;

两种角色与之配套的都有一个异步等待操作,支持按层传递,可异步执行:

  • bind_connector_metadata: 绑定元数据;
  • start_load_kv: 开始加载,消费端调用;
  • wait_for_layer_load: 阻塞直到指定层加载结束,消费端调用;
  • save_kv_layer: 开始保存层,生产端调用;
  • wait_for_save: 阻塞直到所有保存完成,生产端调用;
#vllm/vllm/attention/layer.py# worker connecotr 使用位置(attention部分):# ...# 加载逻辑使用def wait_for_kv_layer_from_connector(layer_name: str):    if not has_kv_transfer_group() or not is_v1_kv_transfer_group():        return    connector = get_kv_transfer_group()    forward_context: ForwardContext = get_forward_context()    attn_metadata = forward_context.attn_metadata    if attn_metadata is None:        return    assert isinstance(attn_metadata, dict)    connector.wait_for_layer_load(layer_name)# ...# 保存逻辑使用def maybe_save_kv_layer_to_connector(    layer_name: str,    kv_cache_layer: List[torch.Tensor],):    if not has_kv_transfer_group() or not is_v1_kv_transfer_group():        return    connector = get_kv_transfer_group()    forward_context: ForwardContext = get_forward_context()    attn_metadata = forward_context.attn_metadata    if attn_metadata is None:        return    assert isinstance(attn_metadata, dict)    connector.save_kv_layer(layer_name, kv_cache_layer,                            attn_metadata[layer_name])

worker connector 的接口 start/wait 由 runner 调用(ModelRunner 模块),而 attention 调用存/读 KV cache 接口。

相关 PR:KV Connector API V1,大致的时序如下:

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_人工智能_13

03

迭代的其它思考

(1)控制信号的设计

由于 PD 分离中 P 实例和 D 实例处于不同的机器中,P 和 D 之间的执行动作协调肯定需要一个控制信号来处理。

比如 P 实例告诉 D 实例 KV cache 的计算状态、D 实例从 P 实例拉取 KV 值的时机选择、D 实例拉取完 KV 值后告诉 P 实例可以释放本地的 KV 值等。

虽然都是 P 和 D 之间进行数据传递,控制信号和 KV 值数据传递方式可以不共用链路,因为控制信号的信息量较小,一般用 TCP 这种就能满足要求。

而且传递"绕远路"也并不影响性能,目前主要的方式有两种:直连、通过中心调度器传递

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_人工智能_14

虽然控制信号对速度要求不高,但其可靠性相关问题值得关注。如,控制信号如果丢失可靠性怎么保证?

**更具体的例子:**D 实例让 P 实例释放 KV 值,结果控制信号丢失了,P 实例什么时候释放 KV 值?

方式的利弊**:**控制信号用中心调度"包办"的方式会增加调度器的处理逻辑复杂度,但实现起来比较简单直观,所以现在很多 DIY 版本的 PD 分离用这种方式较多;

直连的方式是把处理逻辑下放到了执行器,执行器之间自己去搭建通信链路,可以避免信息传递的多次转手。

(2)scheduler 逻辑设计

scheduler 在 V1 的迭代中功能得到了简化,逻辑也变得更简单,带来的好处是执行效率更高。

但 connector 需要在 scheduler 中执行,使得本来轻量的 scheduler 因为不同形态的 connector 模块的注入变得越来越沉重。

【AI大模型】PD分离架构下:KV Cache如何传递?看完这一篇你就知道了!!_人工智能_15

以 5.13 这个合入为例,scheduler 为了 nixl 的合入引入了非常多的改动,暂时还不会构成质的影响,但是否能够支撑后续的扩展需求?

# scheduler 的修改# 增加了一种状态:                # P/D: skip request if still waiting for remote kvs.                if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS:                    is_ready = self._update_waiting_for_remote_kv(request)                    if is_ready:                        request.status = RequestStatus.WAITING                    else:                        self.waiting.popleft()                        skipped_waiting_requests.appendleft(request)                        continue# 增加PD的处理逻辑    ########################################################################    # P/D Related Methods    ########################################################################    def get_kv_connector(self) -> Optional[KVConnectorBase_V1]:        return self.connector    def _connector_finished(            self, request: Request) -> tuple[bool, Optional[KVTransferParams]]:        """Invoke the KV connector request_finished() method if applicable."""        if self.connector is None:            return False, None        block_ids = self.kv_cache_manager.get_block_ids(request.request_id)        return self.connector.request_finished(request, block_ids)    def _update_waiting_for_remote_kv(self, request: Request) -> bool:        """        P/D: check if the request_id is finished_recving.        The finished_recving_kv_req_ids list is populated        on the previous steps()'s update_from_output based        on the worker side connector.        When the kv transfer is ready, we cache the blocks        and the request state will be moved back to WAITING from        WAITING_FOR_REMOTE_KV.        """        if request.request_id not in self.finished_recving_kv_req_ids:            return False        # Now that the blocks are ready, actually cache them.        block_ids = self.kv_cache_manager.get_block_ids(request.request_id)        num_computed_tokens = len(block_ids) * self.block_size        if num_computed_tokens == request.num_tokens:            num_computed_tokens -= 1        self.kv_cache_manager.single_type_manager.cache_blocks(            request,            self.kv_cache_manager.req_to_block_hashes[request.request_id],            num_computed_tokens,        )        # Update the request state for scheduling.        request.num_computed_tokens = num_computed_tokens        # Return that we are ready.        self.finished_recving_kv_req_ids.remove(request.request_id)        return True    def _update_from_kv_xfer_finished(self,                                      model_runner_output: ModelRunnerOutput):        """        P/D: update the scheduler state based on the output.        The Worker side connectors add finished_recving and        finished_sending reqs to the output.        * if finished_sending: free the blocks        # if finished_recving: add to state so we can            scheduler the request during the next step.        """        # P/D: update recv and send status from last step.        for req_id in (model_runner_output.finished_recving or ()):            logger.debug("Finished recving KV transfer for request %s", req_id)            self.finished_recving_kv_req_ids.add(req_id)        for req_id in (model_runner_output.finished_sending or ()):            logger.debug("Finished sending KV transfer for request %s", req_id)            self._free_blocks(self.requests[req_id])

能否让 scheduler 中的 connector 也更轻量,将更多工作交给 worker 完成?

(3)整体的可靠性设计

KV cache 传递过程需要考虑数据中途传递失败的问题,失败后是重传还是丢弃本次计算内容

对于故障处理这个工作可以留给第三方插件(如 LMcache 或者 Mooncake store)进行。

也可以在 KV connector 内部实现一套传递故障容错机制,保证故障后能够自动重试、重计算等,使得推理正常完成。

除了数据传递的可靠性,就是前面提到的传递控制信号的可靠性,也需要重点考虑。

目前 vLLM 的 PD 分离相关内容还没有在可靠性方面做太多考虑,如果自己实现 PD 分离,开始设计时不妨把数据、控制信号传输的可靠性考虑进去。