前言
KV cache 是 vLLM 的基础,KV 值的传递是 PD 分离实现的关键,目前 vLLM 中由 kv_transfer 模块(包括两个版本 V0/V1)完成。
在 KV cache 传递过程中要考虑哪些问题,传输会不会成为系统的瓶颈?本文主要讨论 PD 分离下 KV cache 传递机制,对过程中可能遇到的问题、未来演进方向进行探讨。
1、KV cache传递常见问题
训练/推理过程中,数据在跨设备之间的传递已较为普遍,KV 值在 P/D 之间传递可以认为是一个子类应用,因此在实现方式、带宽、延时、计算通信重叠等方面都会遇到与模型分布式训练/推理类似的问题。
这里讨论几个 KV 传递当前常见的问题:
(1)传递的常见方式有哪些?
根据目前一些框架的形式来看,主要有中心存储和离散式(分布式)两种方式,当然也可以是两者的结合。
所谓中心存储就是建立一个跨设备的 KV store,由它统一管理 KV 值,包括 KV 的增、删、查、传递等工作,推理实例(P/D)连接 KV store 后只需负责往里面增、查数据。
另一种是分布式:P2P 的方式进行数据传递,各个实例分别管理自己的存储,比如一个 P 实例计算完成后,向目标 D 实例建立通信完成 KV 值传递,这种方式没有统一的传递中介。
**两种方式各有利弊:**中心存储能够构建更大的集群管理体系、充分利用各种存储介质、传输通道,计算重用的效能更高,但性能在某些情况下可能会较差(取决于系统设计),维护成本较高;
离散式的优势是架构清晰,性能更好,但扩展性、链路稳定性都不理想。
两种方式可以混用,但目前 Mooncake、Dynamo 等主流推理解决方案更倾向于中心存储的方式,把传递中的复杂度问题内部解决。
Mooncake 的中心存储
这里简单罗列存储介质和传输方式:
(2)配合模型并行传递需要注意的问题?
当加载模型过大时一般需要用到模型并行(TP、PP、EP、SP 等)。
而这会使得模型计算的 KV 值分散在各个 rank 设备中(GPU/NPU),而 P 模型和 D 模型的分布式策略可能还不一样。
比如 P 实例用 TP4、EP1,D 实例用 TP1、PP2、EP4,这样 P 和 D 实例无法直接进行一对一的 rank 传递。
若切分策略相同,可以找副本位置进行传递,这样效率更高,比如直接用 NCCL 的 P2P 通信;
当切分方式不一样时,就需要考虑先聚合后传递再分发的方式,比如让 TP 组中 rank0 进行汇聚后传递,但这样可能不能充分利用传输链路的优势(如 NVLink)。
当然 KV blocks 的尺寸足够细时,不用聚合,但会引发一个新问题,多个 rank 要从相同 rank 拉数据,这样是否会因冲突产生阻塞,导致效率降低?是值得考虑的。
(3) KV cache 传递一定是只有 P 端到 D 端吗?
PD 分离多对多模式,P 和 D 之间任意映射都要能完成。在 vLLM 有 prefix cache 功能,可以利用历史计算数据,存在一种情况:D 端的 block 保存的数据,P 端是否能够利用?
根据 prefix cache 的 block 管理来看(如下图)这完全是可能的。所以 P 可以从 D 拉取已计算数据。
顺便提一下,在 vLLM 的 V1 版本中 D 的调度逻辑可以设计成:KV cache 先本地查找匹配,然后再从远端获取。这样当本地已有匹配值时,避免从 P 实例去拉取重复的 KV 值。
(4)KV 传输与模型计算传输是否会争抢带宽?
这里分两种情况:按模型整体计算传输、按层计算传输。
按模型整体计算传输是指一个请求的 P 处理全部结束再传输 KV 值,通常该情景下能够错开通信链路的使用,不会出现带宽争抢,但也存在一些极端情况,模型计算速度快于传输速度。
出现上一个请求的 KV 传递还未结束下一个请求的计算就需要使用通信链路的情况;
另一个是按层计算(或者按 block)传递:完成一个单位的 KV cache 计算时向下一阶段的 D 实例立刻进行异步传输。
这种方式能够避免串行带来的性能下降(可能),但也需要考虑当模型计算的通信带宽使用较满时,两者会出现带宽争抢。
2、vLLM 的现状分析
vLLM 的 PD 分离在前面文章中有过简单方案分析,提到了两个关键点:
- 目前 vLLM 架构正在调整,所以许多特性有两套方案
- PD 分离方案目前处于原型探索阶段
目前,KV cache 传递机制在 kv transfer 中实现(vllm/vllm/distributed/kv_transfer/),存在 V0 和 V1 两个版本。
V1 的版本迭代速度较快,代码可能会出现较大改动,这里以 0.8.5 版本进行现状分析,主要了解目前架构和接口设计。
(1)V0 版本
KV cache 的传递在 PD 实例之间通过**连接器(Connector)**完成,目前 V0 版本的结构如下图所示,是典型的生产者-消费者模式。
connector 里面包括查询缓冲(Lookup Buffer)、传递管道(pipe),还有生产者(producer,P 实例)、消费者(consumer,D 实例)。
架构上: pipe 负责数据传递、buffer 负责构建一个 FIFO 队列缓冲数据、connector 在上层负责操作的协调并对外提供接口。
它们的一些关键函数如下所示:
Connector 模块
在 connector 的上层有一个 transfer agent 模块,它主要负责对外暴露调用接口,初始化里面调用工厂类(KVConnectorFactory)根据 KVTransferConfig 来决定使用什么 connector。
class KVTransferAgent: def __init__( self, rank: int, local_rank: int, config: "VllmConfig", ): self.config = config if config.kv_transfer_config is None: raise ValueError("KVTransferConfig is not set in the VllmConfig," " cannot initialize KVConnector.") assert self.config.kv_transfer_config.is_kv_transfer_instance, "KV"\ "TransferAgent should only be used when kv_connector is set." # 工厂类的作用 self.connector = KVConnectorFactory.create_connector_v0( rank, local_rank, config)
connector 基类的定义参考:
vllm/vllm/distributed/kv_transfer/kv_connector/base.py
基类函数主要是两个:发送函数(P 调用)、接收函数(D 调用)。
其操作逻辑如下所示:
connector 的具体实现,可以根据不同的存储库来实现,目前 V0 版本中当前已注册的工厂类如下:
KVConnectorFactory.register_connector( "PyNcclConnector", "vllm.distributed.kv_transfer.kv_connector.simple_connector", "SimpleConnector")KVConnectorFactory.register_connector( "MooncakeConnector", "vllm.distributed.kv_transfer.kv_connector.simple_connector", "SimpleConnector")KVConnectorFactory.register_connector( "LMCacheConnector", "vllm.distributed.kv_transfer.kv_connector.lmcache_connector", "LMCacheConnector")KVConnectorFactory.register_connector( "MooncakeStoreConnector", "vllm.distributed.kv_transfer.kv_connector.mooncake_store_connector", "MooncakeStoreConnector")
Lookup Buffer 模块
查找缓冲主要用于构建一个 FIFO 队列的数据缓冲区,它能保证数据传递顺序,控制数据占用的显存量,主要的对外接口是 insert(生产者调用)、drop_select(消费者调用),其大致逻辑如下图所示。
生产者侧会创建一个 buffer(双端队列),insert 函数会在主线程中将 KV 数据添加到队列,同时创建一个后台子线程 drop_select_handler,负责从 buffer 中取出数据并发送给消费者。
生产者与消费者之间的通信有两个步骤,首先,消费者会发送 input_tokens 和 roi,生产者根据这两个信息判断 KV cache 数据是否已计算完成,若已完成则将 key、value、hidden 等数据发送给消费者。
这里用一段代码解释一下 roi 的作用:
取值操作操作:input_tokens = torch.tensor([1, 2, 3, 4, 5])roi = torch.tensor([False, True, False, True, False], dtype=torch.bool)result = input_tokens[roi]print(result) # 输出:tensor([2, 4])POP操作:原始 buffer: [10, 20, 30, 40, 50]input_tokens: tensor([1, 2, 3, 4, 5])roi: tensor([False, True, False, True, False])buffer.pop(input_tokens, roi)处理后的 buffer: [10, 30, 50]
Pipe 模块
pipe 用来进行具体的数据传递,对外接口:
class KVPipeBase(ABC): """ This class provides an interface for sending and receiving tensors, or None, by distributed communications. """ @abstractmethod def send_tensor(self, tensor: Optional[torch.Tensor]) -> None: """Send a tensor, or None, via the pipe. """ raise NotImplementedError @abstractmethod def recv_tensor(self) -> Optional[torch.Tensor]: """Receive a tensor (can be None) from the pipeline. """ raise NotImplementedError @abstractmethod def close(self) -> None: """Close the pipeline and release resources. """ raise NotImplementedError
以 PyNcclPipe 为例,用于在分布式环境中实现张量及其元数据的发送和接收功能。
里面的一些关键步骤如下:
- **配置与设备选择:**保存传入的配置信息
KVTransferConfig
,并根据配置或传入的参数选择设备(CUDA 或 CPU)。 - **分布式连接建立:**使用
StatelessProcessGroup
创建一个分布式进程组,通过barrier
方法确保连接正确初始化。 - **发送和接收实现选择:**根据设备类型选择合适的发送和接收实现,对于 CUDA 设备使用
PyNcclCommunicator
,对于 CPU 设备使用StatelessProcessGroup
的对象发送和接收方法。 - **传输相关变量初始化:**初始化传输线程池、缓冲区大小和缓冲区大小阈值。
(2)V1 版本
V1 版本中 PD 的角色关系已变得比较模糊,P 和 D 既可以是生产者也可以是消费者,而且默认开启 prefix KV cache 特性后 P 可以从 D 实例中获取已计算好的 Block(虽然这个功能暂未实现)。
V1 的 transfer 实现主要架构如下图所示,最大特点是 connector 有两个执行角色(Role):scheduler_connector 和 worker_connector,分别在 scheduler 线程和 worker 线程中执行。
scheduler 负责指挥 worker 进行 KV 数据的传递,两者之间的信息桥梁是元数据(KVConnectorMetadata),worker 通过 metadata 知道哪些 KV 值需要从远端加载。
考虑一个问题**:**KV connector 按理来说只要让 worker 能调用就行,为什么在 scheduler 里面也要运行?
这个问题要从 scheduler 的重复计算功能角度思考,因为 scheduler 可以通过匹配利用历史计算数据(block),在远端的 KV cache 也可以认为是一种计算好的数据。
但这些数据在远端,所以在 PD 分离场景下 scheduler 需要增加逻辑:让 worker 拉取远端 KV、远端 KV 没获取完前 request 请求需要继续等待、本地不需要新计算 block 等。
反过来,worker 获取远端 KV cache 值刷新了本地 block 值需要告知 scheduler。
Scheduler connector
connector 基类里面包括 scheduler 端、worker 端调用的函数接口,其中给 scheduler 用的主要是:
- get_num_new_matched_tokens(): 获取远端已计算的 KV cache 的 token 数量。
- update_state_after_alloc() : block 开辟后,更新 connector 状态。
- build_connector_meta(): 构建元数据,scheduler 告诉 worker 需要保持/加载哪些 KV 数据。
函数在 scheduler 的 schedule() 操作中调用,会改变 num_computed_tokens 的计算,创建 worker 交互的 meta 数据。
但 base 类的操作不改变 scheduler_output,也不改变队列执行顺序(扩展类可改变队列执行逻辑)。
# vllm/vllm/v1/core/sched/scheduler.py
# scheduler中的函数调用位置:# KVConnector: update internal state after allocation. # This information is used to determine if a load is # needed for this request. if self.connector is not None:self.connector.update_state_after_alloc(request,new_computed_blocks + new_blocks, num_external_computed_tokens,)
# ...
# 计算远端已有kv数量, # Get externally-cached tokens if using a KVConnector. num_external_computed_tokens, load_kv_async = ( (0, False) if self.connector is None else self.connector.get_num_new_matched_tokens( request, num_native_computed_tokens)) # Total computed tokens (local + external). num_computed_tokens = (num_native_computed_tokens + num_external_computed_tokens + num_prealloc_computed_tokens)# ... # NOTE(Kuntai): this function is designed for multiple purposes: # 1. Plan the KV cache store # 2. Wrap up all the KV cache load / save ops into an opaque object # 3. Clear the internal states of the connector if self.connector is not None: meta = self.connector.build_connector_meta(scheduler_output) scheduler_output.kv_connector_metadata = meta
Worker connector
worker connector 主要接口是生产者端调用存储KV接口,消费者端调用的加载 KV 接口;
两种角色与之配套的都有一个异步等待操作,支持按层传递,可异步执行:
- bind_connector_metadata: 绑定元数据;
- start_load_kv: 开始加载,消费端调用;
- wait_for_layer_load: 阻塞直到指定层加载结束,消费端调用;
- save_kv_layer: 开始保存层,生产端调用;
- wait_for_save: 阻塞直到所有保存完成,生产端调用;
#vllm/vllm/attention/layer.py# worker connecotr 使用位置(attention部分):# ...# 加载逻辑使用def wait_for_kv_layer_from_connector(layer_name: str): if not has_kv_transfer_group() or not is_v1_kv_transfer_group(): return connector = get_kv_transfer_group() forward_context: ForwardContext = get_forward_context() attn_metadata = forward_context.attn_metadata if attn_metadata is None: return assert isinstance(attn_metadata, dict) connector.wait_for_layer_load(layer_name)# ...# 保存逻辑使用def maybe_save_kv_layer_to_connector( layer_name: str, kv_cache_layer: List[torch.Tensor],): if not has_kv_transfer_group() or not is_v1_kv_transfer_group(): return connector = get_kv_transfer_group() forward_context: ForwardContext = get_forward_context() attn_metadata = forward_context.attn_metadata if attn_metadata is None: return assert isinstance(attn_metadata, dict) connector.save_kv_layer(layer_name, kv_cache_layer, attn_metadata[layer_name])
worker connector 的接口 start/wait 由 runner 调用(ModelRunner 模块),而 attention 调用存/读 KV cache 接口。
相关 PR:KV Connector API V1,大致的时序如下:
03
迭代的其它思考
(1)控制信号的设计
由于 PD 分离中 P 实例和 D 实例处于不同的机器中,P 和 D 之间的执行动作协调肯定需要一个控制信号来处理。
比如 P 实例告诉 D 实例 KV cache 的计算状态、D 实例从 P 实例拉取 KV 值的时机选择、D 实例拉取完 KV 值后告诉 P 实例可以释放本地的 KV 值等。
虽然都是 P 和 D 之间进行数据传递,控制信号和 KV 值数据传递方式可以不共用链路,因为控制信号的信息量较小,一般用 TCP 这种就能满足要求。
而且传递"绕远路"也并不影响性能,目前主要的方式有两种:直连、通过中心调度器传递。
虽然控制信号对速度要求不高,但其可靠性相关问题值得关注。如,控制信号如果丢失可靠性怎么保证?
**更具体的例子:**D 实例让 P 实例释放 KV 值,结果控制信号丢失了,P 实例什么时候释放 KV 值?
方式的利弊**:**控制信号用中心调度"包办"的方式会增加调度器的处理逻辑复杂度,但实现起来比较简单直观,所以现在很多 DIY 版本的 PD 分离用这种方式较多;
直连的方式是把处理逻辑下放到了执行器,执行器之间自己去搭建通信链路,可以避免信息传递的多次转手。
(2)scheduler 逻辑设计
scheduler 在 V1 的迭代中功能得到了简化,逻辑也变得更简单,带来的好处是执行效率更高。
但 connector 需要在 scheduler 中执行,使得本来轻量的 scheduler 因为不同形态的 connector 模块的注入变得越来越沉重。
以 5.13 这个合入为例,scheduler 为了 nixl 的合入引入了非常多的改动,暂时还不会构成质的影响,但是否能够支撑后续的扩展需求?
# scheduler 的修改# 增加了一种状态: # P/D: skip request if still waiting for remote kvs. if request.status == RequestStatus.WAITING_FOR_REMOTE_KVS: is_ready = self._update_waiting_for_remote_kv(request) if is_ready: request.status = RequestStatus.WAITING else: self.waiting.popleft() skipped_waiting_requests.appendleft(request) continue# 增加PD的处理逻辑 ######################################################################## # P/D Related Methods ######################################################################## def get_kv_connector(self) -> Optional[KVConnectorBase_V1]: return self.connector def _connector_finished( self, request: Request) -> tuple[bool, Optional[KVTransferParams]]: """Invoke the KV connector request_finished() method if applicable.""" if self.connector is None: return False, None block_ids = self.kv_cache_manager.get_block_ids(request.request_id) return self.connector.request_finished(request, block_ids) def _update_waiting_for_remote_kv(self, request: Request) -> bool: """ P/D: check if the request_id is finished_recving. The finished_recving_kv_req_ids list is populated on the previous steps()'s update_from_output based on the worker side connector. When the kv transfer is ready, we cache the blocks and the request state will be moved back to WAITING from WAITING_FOR_REMOTE_KV. """ if request.request_id not in self.finished_recving_kv_req_ids: return False # Now that the blocks are ready, actually cache them. block_ids = self.kv_cache_manager.get_block_ids(request.request_id) num_computed_tokens = len(block_ids) * self.block_size if num_computed_tokens == request.num_tokens: num_computed_tokens -= 1 self.kv_cache_manager.single_type_manager.cache_blocks( request, self.kv_cache_manager.req_to_block_hashes[request.request_id], num_computed_tokens, ) # Update the request state for scheduling. request.num_computed_tokens = num_computed_tokens # Return that we are ready. self.finished_recving_kv_req_ids.remove(request.request_id) return True def _update_from_kv_xfer_finished(self, model_runner_output: ModelRunnerOutput): """ P/D: update the scheduler state based on the output. The Worker side connectors add finished_recving and finished_sending reqs to the output. * if finished_sending: free the blocks # if finished_recving: add to state so we can scheduler the request during the next step. """ # P/D: update recv and send status from last step. for req_id in (model_runner_output.finished_recving or ()): logger.debug("Finished recving KV transfer for request %s", req_id) self.finished_recving_kv_req_ids.add(req_id) for req_id in (model_runner_output.finished_sending or ()): logger.debug("Finished sending KV transfer for request %s", req_id) self._free_blocks(self.requests[req_id])
能否让 scheduler 中的 connector 也更轻量,将更多工作交给 worker 完成?
(3)整体的可靠性设计
KV cache 传递过程需要考虑数据中途传递失败的问题,失败后是重传还是丢弃本次计算内容。
对于故障处理这个工作可以留给第三方插件(如 LMcache 或者 Mooncake store)进行。
也可以在 KV connector 内部实现一套传递故障容错机制,保证故障后能够自动重试、重计算等,使得推理正常完成。
除了数据传递的可靠性,就是前面提到的传递控制信号的可靠性,也需要重点考虑。
目前 vLLM 的 PD 分离相关内容还没有在可靠性方面做太多考虑,如果自己实现 PD 分离,开始设计时不妨把数据、控制信号传输的可靠性考虑进去。