1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
| class pagedblocktype(Enum):
GPU = "gpu"
CPU = "cpu"
NONE = "none" # 表示未分配
class BlockManager:
def __init__(self,
num_gpu_blocks: int,
num_cpu_blocks: int,
block_size: int,
num_layers: int,
num_kv_heads: int,
head_dim: int,
dtype: dtype):
########## 物理块索引 ##########
self.num_physical_gpu_blocks = num_gpu_blocks
self.num_physical_cpu_blocks = num_cpu_blocks
self.num_total_blocks = num_gpu_blocks + num_cpu_blocks
self.block_size = block_size
# 空闲块列表,初始全部空闲
# 用 deque 比 list 的 pop(0) 快
# GPU pool
self.gpu_free_blocks: deque[int] = deque(range(num_gpu_blocks))
self.gpu_kv_cache = torch.zeros(
2, num_layers, num_gpu_blocks, block_size, num_kv_heads, head_dim,
device='cuda', dtype=dtype
)
# CPU pool(offload 目标)
self.cpu_free_blocks: deque[int] = deque(range(num_cpu_blocks))
self.cpu_kv_cache = torch.zeros(
2, num_layers, num_cpu_blocks, block_size, num_kv_heads, head_dim,
device='cpu', dtype=dtype,
pin_memory=True # ← 关键:pin_memory 让 CPU→GPU 传输更快
)
########## 逻辑块索引 ##########
# 逻辑块表,记录每个逻辑块对应的物理块和类型
# 最多逻辑块不会超过 num_total_blocks,因为每个逻辑块至少占一个物理块
# 第i个初始逻辑块初始化为[NONE, -1],表示未分配物理块
self.block_mapping: List[tuple[pagedblocktype, int]] = [
(pagedblocktype.NONE, -1) for i in range(self.num_total_blocks)
]
self.logical_free_blocks: deque[int] = deque(range(self.num_total_blocks)) # 逻辑块索引,初始全部空闲
def allocate(self, num_blocks: int = 1) -> List[int]:
# 分配 num_blocks 个物理块
block_ids = []
if num_blocks > self.num_free_blocks:
raise RuntimeError(f"Not enough free blocks to allocate {num_blocks} blocks")
for _ in range(num_blocks):
if self.gpu_free_blocks:
physical_block_id = self.gpu_free_blocks.popleft()
logical_block_id = self.logical_free_blocks.popleft()
self.block_mapping[logical_block_id] = (pagedblocktype.GPU, physical_block_id)
block_ids.append(logical_block_id)
elif self.cpu_free_blocks:
physical_block_id = self.cpu_free_blocks.popleft()
logical_block_id = self.logical_free_blocks.popleft()
self.block_mapping[logical_block_id] = (pagedblocktype.CPU, physical_block_id)
block_ids.append(logical_block_id)
else:
raise RuntimeError("No free blocks available")
return block_ids
def free(self, block_ids: List[int]) -> None:
# 回收物理块,加回 free_blocks
# 回收逻辑块
for block_id in block_ids:
block_type, physical_block_id = self.block_mapping[block_id]
if block_type == pagedblocktype.GPU:
self.gpu_free_blocks.append(physical_block_id)
elif block_type == pagedblocktype.CPU:
self.cpu_free_blocks.append(physical_block_id)
else:
raise RuntimeError(f"Block {block_id} is not allocated")
# 更新 block_mapping
self.logical_free_blocks.append(block_id)
self.block_mapping[block_id] = (pagedblocktype.NONE, -1)
#####################################################
# 非必须的方法,用于offload到CPU,swap_in/swap_out()
#####################################################
def swap_out(self, block_ids: List[int]) -> None:
"""
把 GPU 上的 block 换出到 CPU
"""
cpu_block_ids = []
for block_id in block_ids:
block_type, physical_block_id = self.block_mapping[block_id]
if block_type != pagedblocktype.GPU:
raise RuntimeError(f"Block {block_id} is not on GPU, cannot swap out")
cpu_block_id = self.cpu_free_blocks.popleft()
self.cpu_kv_cache[:,:,cpu_block_id].copy_(
self.gpu_kv_cache[:,:,physical_block_id], non_blocking=True
)
self.gpu_free_blocks.append(physical_block_id) # GPU 块释放
self.block_mapping[block_id] = (pagedblocktype.CPU, cpu_block_id) # 更新映射
cpu_block_ids.append(cpu_block_id)
def swap_in(self, block_ids: List[int]) -> None:
"""
把 CPU 上的 block 换回 GPU
"""
gpu_block_ids = []
for block_id in block_ids:
block_type, physical_block_id = self.block_mapping[block_id]
if block_type != pagedblocktype.CPU:
raise RuntimeError(f"Block {block_id} is not on CPU, cannot swap in")
gpu_block_id = self.gpu_free_blocks.popleft()
self.gpu_kv_cache[:,:,gpu_block_id].copy_(
self.cpu_kv_cache[:,:,physical_block_id], non_blocking=True
)
self.cpu_free_blocks.append(physical_block_id) # CPU 块释放
self.block_mapping[block_id] = (pagedblocktype.GPU, gpu_block_id) # 更新映射
gpu_block_ids.append(gpu_block_id)
@property
def num_free_blocks(self) -> int:
return len(self.gpu_free_blocks) + len(self.cpu_free_blocks)
def can_allocate_gpu(self, num_blocks: int) -> bool:
# 查询是否有足够的GPU空闲块
# scheduler 用这个决定是否接受新请求
return len(self.gpu_free_blocks) >= num_blocks
|