Pico-vLLM 开发日志 #5 Paged Attention

终于到这一步了。刚开始开发Pico-vLLM这个推理框架的时候还觉得这是一件遥不可及的事情,但最终还是完成了。目前调用的forward过程里的Kernel仍然是伪Batching——内部是一个简陋的python for循环,不过整个Prefill、Decode、Engine、Scheduler、Forward的全流程都已经以完整的Paged Attention+Continuous Batching流程跑通了,只差Triton Kernel或者CUDA Kernel接入——我个人计划写Triton Kernel为主:毕竟时间成本有限,把性能提升到可以接受的程度是目前的目标。不论如何,这个里程碑还是挺有意义的。

和之前的博客中所提到的一样,Paged Attention就是专门为了解决内存碎片问题而生的。在naive kv cache的情况下,设计者必然面临两个决策中的一个:要么选择在一开始分配较短的kv cache空间,代价是随着seq_len长度超出原本预计的空间需要不断的进行完整的拷贝和重新分配;要么选择在一开始分配绝对够用的kv cache空间,代价是请求还没有产生几个,显存就迅速爆炸,而绝大部分大概率还没开始用。这一动机任何读者都应该相当了解,因此就不再冗余赘述了。

核心数据结构和设计模式

这次的实现规划的比较完整且一开始就预留了可扩展性,在最初就计划了双pool设计。也就是说,有可以防止硬OOM的GPU + CPU offload的双cache池,加上随之必须产生的逻辑block_id和物理block_id的分离。任何一个请求都会携带一个kv cache表,里面记录的是其持有的逻辑cache块的索引。而Block Manager则同时维护一个映射表:每个逻辑cache块对应的物理cache块的位置,以及物理块到底是哪种类型(CPU的还是GPU的还是未分配、非法的)的cache块。这种双重映射提供的分层抽象在可预见的未来甚至可以扩展到SSD——如果文件系统的性能允许的话。据作者本人所知,这种offload在工业界是大趋势,因此特意将其完整实现了出来。

核心代码块如下。值得注意的是,Block Manager本身不决定换入和换出的策略,仅仅是提供方法:这是思考后的结果,我个人认为这部分操作更适合分离开来,交给某个专门的Scheduler去执行,写在Block Manager内会让模式变得过度耦合且重量化。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class pagedblocktype(Enum):
    GPU = "gpu"
    CPU = "cpu"
    NONE = "none"  # 表示未分配
class BlockManager:
    def __init__(self, 
                 num_gpu_blocks: int, 
                 num_cpu_blocks: int, 
                 block_size: int, 
                 num_layers: int,
                 num_kv_heads: int, 
                 head_dim: int, 
                 dtype: dtype):
        ########## 物理块索引 ##########
        self.num_physical_gpu_blocks = num_gpu_blocks
        self.num_physical_cpu_blocks = num_cpu_blocks
        self.num_total_blocks = num_gpu_blocks + num_cpu_blocks
        self.block_size = block_size
        # 空闲块列表,初始全部空闲
        # 用 deque 比 list 的 pop(0) 快
        # GPU pool
        self.gpu_free_blocks: deque[int] = deque(range(num_gpu_blocks))
        self.gpu_kv_cache = torch.zeros(
            2, num_layers, num_gpu_blocks, block_size, num_kv_heads, head_dim,
            device='cuda', dtype=dtype
        )
        
        # CPU pool(offload 目标)
        self.cpu_free_blocks: deque[int] = deque(range(num_cpu_blocks))
        self.cpu_kv_cache = torch.zeros(
            2, num_layers, num_cpu_blocks, block_size, num_kv_heads, head_dim,
            device='cpu', dtype=dtype,
            pin_memory=True  # ← 关键:pin_memory 让 CPU→GPU 传输更快
        )
        
        ########## 逻辑块索引 ##########
        # 逻辑块表,记录每个逻辑块对应的物理块和类型
        # 最多逻辑块不会超过 num_total_blocks,因为每个逻辑块至少占一个物理块
        # 第i个初始逻辑块初始化为[NONE, -1],表示未分配物理块
        self.block_mapping: List[tuple[pagedblocktype, int]] = [
            (pagedblocktype.NONE, -1) for i in range(self.num_total_blocks)
        ]
        self.logical_free_blocks: deque[int] = deque(range(self.num_total_blocks))  # 逻辑块索引,初始全部空闲
    
    def allocate(self, num_blocks: int = 1) -> List[int]:
        # 分配 num_blocks 个物理块
        block_ids = []
        if num_blocks > self.num_free_blocks:
            raise RuntimeError(f"Not enough free blocks to allocate {num_blocks} blocks")
        for _ in range(num_blocks):
            if self.gpu_free_blocks:
                physical_block_id = self.gpu_free_blocks.popleft()
                logical_block_id = self.logical_free_blocks.popleft()
                self.block_mapping[logical_block_id] = (pagedblocktype.GPU, physical_block_id)
                block_ids.append(logical_block_id)
            elif self.cpu_free_blocks:
                physical_block_id = self.cpu_free_blocks.popleft()
                logical_block_id = self.logical_free_blocks.popleft()
                self.block_mapping[logical_block_id] = (pagedblocktype.CPU, physical_block_id)
                block_ids.append(logical_block_id)
            else:
                raise RuntimeError("No free blocks available")

        return block_ids

    def free(self, block_ids: List[int]) -> None:
        # 回收物理块,加回 free_blocks
        # 回收逻辑块
        for block_id in block_ids:
            block_type, physical_block_id = self.block_mapping[block_id]
            if block_type == pagedblocktype.GPU:
                self.gpu_free_blocks.append(physical_block_id)
            elif block_type == pagedblocktype.CPU:
                self.cpu_free_blocks.append(physical_block_id)
            else:
                raise RuntimeError(f"Block {block_id} is not allocated")
            # 更新 block_mapping
            self.logical_free_blocks.append(block_id)
            self.block_mapping[block_id] = (pagedblocktype.NONE, -1)

    #####################################################
    # 非必须的方法,用于offload到CPU,swap_in/swap_out()
    #####################################################
    def swap_out(self, block_ids: List[int]) -> None:
        """
        把 GPU 上的 block 换出到 CPU
        """
        cpu_block_ids = []
        for block_id in block_ids:
            block_type, physical_block_id = self.block_mapping[block_id]
            if block_type != pagedblocktype.GPU:
                raise RuntimeError(f"Block {block_id} is not on GPU, cannot swap out")
            cpu_block_id = self.cpu_free_blocks.popleft()
            self.cpu_kv_cache[:,:,cpu_block_id].copy_(
                self.gpu_kv_cache[:,:,physical_block_id], non_blocking=True
            )
            self.gpu_free_blocks.append(physical_block_id)  # GPU 块释放
            self.block_mapping[block_id] = (pagedblocktype.CPU, cpu_block_id)  # 更新映射
            cpu_block_ids.append(cpu_block_id)

    def swap_in(self, block_ids: List[int]) -> None:
        """
        把 CPU 上的 block 换回 GPU
        """
        gpu_block_ids = []
        for block_id in block_ids:
            block_type, physical_block_id = self.block_mapping[block_id]
            if block_type != pagedblocktype.CPU:
                raise RuntimeError(f"Block {block_id} is not on CPU, cannot swap in")
            gpu_block_id = self.gpu_free_blocks.popleft()
            self.gpu_kv_cache[:,:,gpu_block_id].copy_(
                self.cpu_kv_cache[:,:,physical_block_id], non_blocking=True
            )
            self.cpu_free_blocks.append(physical_block_id)  # CPU 块释放
            self.block_mapping[block_id] = (pagedblocktype.GPU, gpu_block_id)  # 更新映射
            gpu_block_ids.append(gpu_block_id)

    @property
    def num_free_blocks(self) -> int:
        return len(self.gpu_free_blocks) + len(self.cpu_free_blocks)

    def can_allocate_gpu(self, num_blocks: int) -> bool:
        # 查询是否有足够的GPU空闲块
        # scheduler 用这个决定是否接受新请求
        return len(self.gpu_free_blocks) >= num_blocks
    

而从这个Block Manager里取回相应的物理块的index组成的block_table的方法则如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    def get_block_table(self) -> torch.Tensor:
        """
        读出当前所有已写入的 k/v
        return: 存储的所有对应index的物理index
        """
        physical_ids = []
        for lid in self.cache_block_index[:self.allocated_cache_block_num]:
            _, pid = self.block_manager.block_mapping[lid]
            physical_ids.append(pid)
        return torch.tensor(physical_ids, dtype=torch.int32, device=self.device)

现在它是变长的,但随后的Triton Kernel当中,它可能会变成定长的——取决于到时候的性能调试了。

此外特别需要注意的是自回归过程中,关于真正的“序列长度”的相关处理。虽然seq_len不包括自己正在生成的这一个token——它的Logits还不存在呢,但是在每一层layer当中,其实k和v已经完整存在了,是按照这个token对应的k、v存在去处理的。因此在做相关操作的时候要格外小心,在适当的地方进行+1的修正,并且提前做好kv cache块的预申请和预分配,以免发生问题。

再此外,传参时候的类型提示真的很重要。虽然python不是静态类型的强类型语言,但适当的类型提示对于编程的帮助极其极其大——最大的之一就是你可以随时随地的查看这个变量是什么,以及通过自动补全查看和迅速编写相应的方法,避免无穷无尽的来回翻找。嗯,果然还是Java/C/C++好啊。

吐槽

作者本人用的卡是5070,著名的nano-vllm项目在写的过程中偷奸耍滑,直接用了flash-attn库——作者本人原本计划同样用flash-attn的算子接入跑通全流程之后作为对比的benchmark,测试自己的算子的信念,但尝试在wsl的venv环境中以各种方法安装flash-attn库均失败。通过github和ai查询原因,得到的结论是,5070是Blackwell系列的消费卡,而目前flash-attn目前似乎是只支持hopper以及之前的架构——直接没法用了(至少截止2026年3月28日不行。此结论有时效性,读者请注意)。这进一步倒逼作者去真的写一个Triton Kernel,不过总归是要写的,最后都一样嘛。

大家如果想使用这些算子库,切记查看好仓库发布页的目前支持什么cuda版本、pytorch版本、显卡系列的提示,避免做无用功。

性能数据

令人意外的是实现了如此多冗余拷贝的naive paged Attention之后性能居然不降反升(3%),这似乎是得益于forward过程没了cat(),某种程度上反而同时减少了拷贝。虽然之前说baseline是不适用kv cache的彻底naive实现,但那太离谱了,这次的才算真正的有比较意义的baseline。在接下来一段时间里,就主要尝试对这个指标做提升了,尝试在有限的时间内逐步优化到vLLM的60%水平,然后再迈进到多机阶段。

具体数据直接看图⬇️

profile_paged