Skip to content

Commit bd48640

Browse files
authored
Write the cache of preempted req to storage (#7113)
1 parent 6d0d404 commit bd48640

5 files changed

Lines changed: 16 additions & 7 deletions

File tree

fastdeploy/cache_manager/cache_transfer_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ def read_storage_task(self, task: ReadStorageTask):
796796
try:
797797
valid_gpu_block_ids = self._run_read_storage(
798798
task.task_id,
799-
task.token_ids[: match_block_num * self.block_size],
799+
task.token_ids[: match_block_num * self.block_size] if task.token_ids else None,
800800
task.start_read_block_idx,
801801
k_cache_keys,
802802
v_cache_keys,

fastdeploy/cache_manager/prefix_cache_manager.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -872,7 +872,7 @@ def request_match_blocks(self, task: Request, block_size, *args):
872872
read_storage_task = ReadStorageTask(
873873
task_id=req_id,
874874
keys=no_match_block_keys,
875-
token_ids=input_token_ids,
875+
token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None,
876876
gpu_block_ids=gpu_recv_storage_block_ids,
877877
start_read_block_idx=match_token_num // block_size,
878878
)
@@ -1111,7 +1111,9 @@ def write_cache_to_storage(self, request: Request):
11111111
if isinstance(token_ids, np.ndarray):
11121112
token_ids = token_ids.tolist()
11131113
if self.config.cache_config.enable_output_caching:
1114-
token_ids += request.output_token_ids
1114+
input_token_ids = token_ids + request.output_token_ids
1115+
else:
1116+
input_token_ids = token_ids
11151117

11161118
req_id = request.request_id
11171119
keys = []
@@ -1128,7 +1130,7 @@ def write_cache_to_storage(self, request: Request):
11281130
write_storage_task = WriteStorageTask(
11291131
task_id=req_id,
11301132
keys=keys,
1131-
token_ids=token_ids,
1133+
token_ids=input_token_ids if self.kvcache_storage_backend == "attention_store" else None,
11321134
gpu_block_ids=gpu_block_ids,
11331135
)
11341136
logger.debug(f"issue write storage task: {write_storage_task}")
@@ -2067,7 +2069,7 @@ def recv_data_transfer_result(self):
20672069
event_type = data[0]
20682070

20692071
if event_type.value == CacheStatus.STORAGE2GPU.value:
2070-
logger.info(f"recv_data_transfer_result: {data}")
2072+
logger.debug(f"recv_data_transfer_result: {data}")
20712073
task_id, hash_keys, block_ids = data[1:]
20722074
if task_id not in self.storage_prefetch_block_ids:
20732075
self.storage_prefetch_block_ids[task_id] = []
@@ -2078,7 +2080,7 @@ def recv_data_transfer_result(self):
20782080
if task_id in self.task_prefetch_event:
20792081
self.task_prefetch_event[task_id].set()
20802082
elif event_type.value == CacheStatus.GPU2STORAGE.value:
2081-
logger.info(f"recv_data_transfer_result: {data}")
2083+
logger.debug(f"recv_data_transfer_result: {data}")
20822084
task_id, hash_keys, block_ids = data[1:]
20832085
if task_id in self.task_write_back_event:
20842086
self.task_write_back_event[task_id].set()

fastdeploy/engine/common_engine.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -891,7 +891,7 @@ def _fetch_request():
891891
self.split_connector.send_splitwise_tasks([task], task.idx)
892892
status, msg = self.split_connector.check_decode_allocated(task)
893893
if not status:
894-
self.llm_logger.error(
894+
self.llm_logger.warning(
895895
f"D failed to allocate resource for request {task.request_id}, try again."
896896
)
897897
time.sleep(0.05)

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,9 @@ def _trigger_preempt(self, request, num_new_blocks, preempted_reqs, scheduled_re
367367
self._free_blocks(preempted_req)
368368
llm_logger.info(f"Preemption is triggered! Preempted request id: {preempted_req.request_id}")
369369
else:
370+
if envs.FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST:
371+
if self.config.cache_config.kvcache_storage_backend:
372+
self.cache_manager.write_cache_to_storage(preempted_req)
370373
self._free_blocks(preempted_req)
371374
preempted_req.num_cached_blocks = 0
372375
self.to_be_rescheduled_request_id_set.add(preempted_req.request_id)

fastdeploy/envs.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@
230230
# When v1 is enabled, the legacy /clear_load_weight and /update_model_weight
231231
# will adopt this new communication pattern.
232232
"FD_ENABLE_V1_UPDATE_WEIGHTS": lambda: bool(int(os.getenv("FD_ENABLE_V1_UPDATE_WEIGHTS", "0"))),
233+
# Whether to save the cache of output token for preempted request to storage.
234+
"FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST": lambda: bool(
235+
int(os.getenv("FD_SAVE_OUTPUT_CACHE_FOR_PREEMPTED_REQUEST", "1"))
236+
),
233237
}
234238

235239

0 commit comments

Comments
 (0)