Skip to content

Commit 818e1a8

Browse files
committed
fix(rpc): delegate REM pipelines from generic Start/Stop/DeletePipeline to dedicated handlers
StartPipeline/StopPipeline/DeletePipeline did not handle REM pipeline type correctly — REM was falling through to the default CustomPipeline path in the listener's startPipeline(), so the rem process was never actually started. WebUI always calls the generic StartPipeline RPC, which caused REM pipelines created via WebUI to have empty Link and Subscribe fields. Add type checks in StartPipeline, StopPipeline, and DeletePipeline to detect REM pipelines and delegate to StartRem, StopRem, DeleteRem respectively. Also fix two related issues: - Remove stale core.Jobs.AddPipeline() call in StartRem that overwrote runtime Link/Subscribe/Port data from SyncPipeline with the old DB snapshot. - Enhance handleStartRem idempotency check to detect dead REM pipelines (crashed via runtimeErrorHandler) and recreate them instead of just syncing stale state.
1 parent 8b5aeae commit 818e1a8

3 files changed

Lines changed: 39 additions & 3 deletions

File tree

server/listener/listener.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -679,8 +679,15 @@ func (lns *listener) handleStartRem(job *clientpb.Job) error {
679679

680680
// Idempotency: REM already started in this listener process.
681681
if existing := lns.pipelines.Get(pipe.Name); existing != nil {
682-
_, err := lns.Rpc.SyncPipeline(lns.Context(), existing.ToProtobuf())
683-
return err
682+
remPipeline, ok := existing.(*REM)
683+
if ok && remPipeline.Enable {
684+
// Still healthy — just sync its current state.
685+
_, err := lns.Rpc.SyncPipeline(lns.Context(), existing.ToProtobuf())
686+
return err
687+
}
688+
// Dead pipeline (crashed via runtimeErrorHandler) — remove the stale
689+
// entry so we can create a fresh one below.
690+
lns.pipelines.Delete(existing.ID())
684691
}
685692

686693
rem, err := NewRem(lns.Rpc, pipe)

server/rpc/rpc-pipeline.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,14 @@ func (rpc *Server) StartPipeline(ctx context.Context, req *clientpb.CtrlPipeline
111111
if err != nil {
112112
return nil, err
113113
}
114+
115+
// REM pipelines have their own start path; delegate transparently so
116+
// callers (e.g. WebUI) that always use StartPipeline still work.
117+
if pipelineDB.Type == consts.RemPipeline {
118+
req.ListenerId = listenerID
119+
return rpc.StartRem(ctx, req)
120+
}
121+
114122
if pipelineDB.PipelineParams == nil {
115123
pipelineDB.PipelineParams = &implanttypes.PipelineParams{}
116124
}
@@ -169,6 +177,17 @@ func (rpc *Server) StopPipeline(ctx context.Context, req *clientpb.CtrlPipeline)
169177
return nil, err
170178
}
171179

180+
pipelineDB, err := db.FindPipelineByListener(req.Name, listenerID)
181+
if err != nil {
182+
return nil, err
183+
}
184+
185+
// Delegate REM pipelines to their dedicated handler.
186+
if pipelineDB.Type == consts.RemPipeline {
187+
req.ListenerId = listenerID
188+
return rpc.StopRem(ctx, req)
189+
}
190+
172191
lns, err := core.Listeners.Get(listenerID)
173192
if err != nil {
174193
return nil, err
@@ -214,6 +233,13 @@ func (rpc *Server) DeletePipeline(ctx context.Context, req *clientpb.CtrlPipelin
214233
if err != nil {
215234
return nil, err
216235
}
236+
237+
// Delegate REM pipelines to their dedicated handler.
238+
if pipelineDB.Type == consts.RemPipeline {
239+
req.ListenerId = listenerID
240+
return rpc.DeleteRem(ctx, req)
241+
}
242+
217243
lns, err := core.Listeners.Get(listenerID)
218244
if err != nil {
219245
return nil, err

server/rpc/rpc-rem.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ func (rpc *Server) StartRem(ctx context.Context, req *clientpb.CtrlPipeline) (*c
101101
if err := db.EnablePipelineByListener(rem.Name, listenerID); err != nil {
102102
return nil, err
103103
}
104-
core.Jobs.AddPipeline(rem)
104+
// Do not call core.Jobs.AddPipeline(rem) here: the listener's
105+
// handleStartRem already invoked SyncPipeline with the runtime-
106+
// populated pipeline (Link, Subscribe, Port). Calling AddPipeline
107+
// again with the stale DB snapshot would overwrite those values.
105108

106109
return &clientpb.Empty{}, nil
107110
}

0 commit comments

Comments
 (0)