4242 SpeculativeConfig ,
4343 StructuredOutputsConfig ,
4444)
45+ from fastdeploy .engine .request import RequestType
4546from fastdeploy .eplb .async_expert_loader import (
4647 MODEL_MAIN_NAME ,
4748 REARRANGE_EXPERT_MAGIC_NUM ,
@@ -415,8 +416,7 @@ def event_loop_normal(self) -> None:
415416 tp_size = self .parallel_config .tensor_parallel_size
416417 # Currently, only support single node
417418 self .nnode = int ((tp_size + 7 ) // 8 )
418- req_ids = []
419- num_running_requests = 0
419+ max_occupied_batch_index = 0
420420 tp_rank = self .local_rank % tp_size
421421
422422 # TODO: Unify status variables model_weights_status (shared memory) and model_weights_signal (numpy array) to one
@@ -492,17 +492,22 @@ def event_loop_normal(self) -> None:
492492
493493 req_dicts = []
494494 for req_dict , bsz in tasks :
495- num_running_requests = int (bsz )
495+ max_occupied_batch_index = int (bsz )
496496 req_dicts .extend (req_dict )
497497
498- req_ids = [req .request_id for req in req_dicts ]
498+ # Count prefill requests in current batch
499+ num_prefill_requests = sum (1 for req in req_dicts if req .task_type == RequestType .PREFILL )
500+ num_scheduled_requests = len (req_dicts )
501+ scheduled_request_ids = [req .request_id for req in req_dicts ]
499502 logger .info (
500- f"Rank: { self .local_rank } , num_running_requests: { num_running_requests } , "
501- f"num_insert_requests: { len (req_dicts )} , req_ids: { req_ids } "
503+ f"Rank: { self .local_rank } , num_prefill_requests: { num_prefill_requests } , "
504+ f"max_occupied_batch_index: { max_occupied_batch_index } , "
505+ f"num_scheduled_requests: { num_scheduled_requests } , "
506+ f"scheduled_request_ids: { scheduled_request_ids } "
502507 )
503508
504509 # Process prefill inputs
505- self .worker .preprocess_new_task (req_dicts , num_running_requests )
510+ self .worker .preprocess_new_task (req_dicts , max_occupied_batch_index )
506511
507512 if (not self .parallel_config .use_ep ) and (not self .worker .model_runner .not_need_stop ()):
508513 if self .ranks > 1 :
@@ -514,7 +519,7 @@ def event_loop_normal(self) -> None:
514519 # Execute model to generate token. The generated token will be written to the buffer.
515520 # These generated tokens can be obtained through get_output op.
516521 start_execute_time = time .time ()
517- self .worker .execute_model (req_dicts , num_running_requests )
522+ self .worker .execute_model (req_dicts , max_occupied_batch_index )
518523 self .exist_prefill_task_signal .value [0 ] = self .worker .exist_prefill ()
519524 logger .debug (f"execute model cost: { time .time ()- start_execute_time :.5f} s" )
520525
0 commit comments