From 3bf9549360c9e3c7a83f0c26186db43aba410a09 Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Wed, 27 May 2026 23:59:56 -0400 Subject: [PATCH 1/2] fix(sink): always set requestActiveStartBlock on session init The Response_Session case in (*Sinker).doRequest short-circuited with a bare `break` when the handler implemented SinkerSessionInitHandler, skipping the `s.requestActiveStartBlock = r.Session.ResolvedStartBlock` assignment. That field is consumed by the Response_ModulesProgress case to compute ProgressMessageLastContiguousBlock for production-mode mapper stages, so a custom session-init handler silently left the metric wrong. Remove the `break` entirely: the custom handler (when present) is still invoked, but the default log line and the requestActiveStartBlock assignment now run unconditionally. The case body is extracted into a documented private method (*Sinker).handleSessionInit for testability. # Conflicts: # docs/release-notes/change-log.md --- docs/release-notes/change-log.md | 4 ++ sink/sinker.go | 40 +++++++++----- sink/sinker_test.go | 93 ++++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 12 deletions(-) create mode 100644 sink/sinker_test.go diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index aa6854c32..41df26ed1 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -11,6 +11,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Unreleased +### Sink + +* Fix `Sinker.requestActiveStartBlock` not being set when the handler implements `SinkerSessionInitHandler`, which previously caused `ProgressMessageLastContiguousBlock` to be incorrect for production-mode mapper stages. + ### Fixed - CI: Docker image login, build and push are now skipped for fork PRs; image is still built (without push) to validate the Dockerfile. diff --git a/sink/sinker.go b/sink/sinker.go index d1808539c..acd09858e 100644 --- a/sink/sinker.go +++ b/sink/sinker.go @@ -784,19 +784,9 @@ func (s *Sinker) doRequest( } case *pbsubstreamsrpc.Response_Session: - if sh, ok := handler.(SinkerSessionInitHandler); ok { - if err := sh.HandleSessionInit(ctx, s.request, r.Session); err != nil { - return activeCursor, receivedDataMessage, fmt.Errorf("handle session init: %w", err) - } - break + if err := s.handleSessionInit(ctx, handler, r.Session); err != nil { + return activeCursor, receivedDataMessage, err } - s.Logger.Info("session initialized with remote endpoint", - zap.Uint64("max_parallel_workers", r.Session.MaxParallelWorkers), - zap.Uint64("linear_handoff_block", r.Session.LinearHandoffBlock), - zap.Uint64("resolved_start_block", r.Session.ResolvedStartBlock), - zap.String("trace_id", r.Session.TraceId), - ) - s.requestActiveStartBlock = r.Session.ResolvedStartBlock default: s.Logger.Info("received unknown type of message", zap.Reflect("message", r)) @@ -805,6 +795,32 @@ func (s *Sinker) doRequest( } } +// handleSessionInit processes a `Response_Session` message received from the Substreams endpoint. +// +// If the registered handler implements [SinkerSessionInitHandler], its [SinkerSessionInitHandler.HandleSessionInit] +// callback is invoked. We do *not* short-circuit afterwards: the default logging and the +// `s.requestActiveStartBlock` assignment are sinker-internal bookkeeping that must run on every +// `Response_Session` message regardless of whether a custom handler is installed. The +// `requestActiveStartBlock` field is later consumed in the `Response_ModulesProgress` case to identify the +// contiguous completed range covering the user's resolved start block, so it must always be kept up to date. +func (s *Sinker) handleSessionInit(ctx context.Context, handler SinkerHandler, session *pbsubstreamsrpc.SessionInit) error { + if sh, ok := handler.(SinkerSessionInitHandler); ok { + if err := sh.HandleSessionInit(ctx, s.request, session); err != nil { + return fmt.Errorf("handle session init: %w", err) + } + } + + s.Logger.Info("session initialized with remote endpoint", + zap.Uint64("max_parallel_workers", session.MaxParallelWorkers), + zap.Uint64("linear_handoff_block", session.LinearHandoffBlock), + zap.Uint64("resolved_start_block", session.ResolvedStartBlock), + zap.String("trace_id", session.TraceId), + ) + s.requestActiveStartBlock = session.ResolvedStartBlock + + return nil +} + func (s *Sinker) processBlockScopedData( ctx context.Context, handler SinkerHandler, diff --git a/sink/sinker_test.go b/sink/sinker_test.go new file mode 100644 index 000000000..d4814181a --- /dev/null +++ b/sink/sinker_test.go @@ -0,0 +1,93 @@ +package sink + +import ( + "context" + "errors" + "testing" + + pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" + pbsubstreamsrpcv3 "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// noopHandler implements SinkerHandler but NOT SinkerSessionInitHandler. +type noopHandler struct{} + +func (noopHandler) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error { + return nil +} + +func (noopHandler) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error { + return nil +} + +// sessionInitRecordingHandler implements both SinkerHandler and SinkerSessionInitHandler, recording +// the arguments it was invoked with so the test can assert dispatch happened correctly. +type sessionInitRecordingHandler struct { + noopHandler + + calls int + gotReq *pbsubstreamsrpcv3.Request + gotSession *pbsubstreamsrpc.SessionInit + returnError error +} + +func (h *sessionInitRecordingHandler) HandleSessionInit(ctx context.Context, req *pbsubstreamsrpcv3.Request, sessionInit *pbsubstreamsrpc.SessionInit) error { + h.calls++ + h.gotReq = req + h.gotSession = sessionInit + return h.returnError +} + +func newTestSinker() *Sinker { + return &Sinker{ + SinkerConfig: &SinkerConfig{Logger: zlog}, + request: &pbsubstreamsrpcv3.Request{StartBlockNum: 100}, + } +} + +func TestSinker_handleSessionInit(t *testing.T) { + const resolvedStartBlock = uint64(12345) + + t.Run("no session-init handler still sets requestActiveStartBlock", func(t *testing.T) { + s := newTestSinker() + session := &pbsubstreamsrpc.SessionInit{ResolvedStartBlock: resolvedStartBlock} + + err := s.handleSessionInit(context.Background(), noopHandler{}, session) + require.NoError(t, err) + assert.Equal(t, resolvedStartBlock, s.requestActiveStartBlock) + }) + + t.Run("session-init handler invoked and requestActiveStartBlock still set", func(t *testing.T) { + s := newTestSinker() + handler := &sessionInitRecordingHandler{} + session := &pbsubstreamsrpc.SessionInit{ResolvedStartBlock: resolvedStartBlock} + + err := s.handleSessionInit(context.Background(), handler, session) + require.NoError(t, err) + + // Handler was invoked exactly once with the sinker's request and the received session. + assert.Equal(t, 1, handler.calls) + assert.Same(t, s.request, handler.gotReq) + assert.Same(t, session, handler.gotSession) + + // Regression assertion for the bug: the internal bookkeeping must run even when a custom + // session-init handler is installed (previously a `break` short-circuited this assignment). + assert.Equal(t, resolvedStartBlock, s.requestActiveStartBlock) + }) + + t.Run("session-init handler error is propagated and wrapped", func(t *testing.T) { + s := newTestSinker() + sentinel := errors.New("boom") + handler := &sessionInitRecordingHandler{returnError: sentinel} + session := &pbsubstreamsrpc.SessionInit{ResolvedStartBlock: resolvedStartBlock} + + err := s.handleSessionInit(context.Background(), handler, session) + require.Error(t, err) + assert.ErrorIs(t, err, sentinel) + + // On handler failure we never reach the bookkeeping, so the field stays at its zero value. + assert.Zero(t, s.requestActiveStartBlock) + }) +} From 49c9cc5a79207a151c036374dbd97fbdaf04177f Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Thu, 28 May 2026 00:06:21 -0400 Subject: [PATCH 2/2] docs(sink): clarify SinkerSessionInitHandler is an interceptor Document on the SinkerSessionInitHandler interface that HandleSessionInit behaves as an interceptor and does not short-circuit the Sinker's normal session-init handling: the default "session initialized" log line and the internal resolved-start-block bookkeeping still run after the callback. --- sink/types.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sink/types.go b/sink/types.go index 8712148e9..7823abe55 100644 --- a/sink/types.go +++ b/sink/types.go @@ -181,8 +181,16 @@ type SinkerSessionInitHandler interface { // // The [HandleSessionInit] is optional and can be nil. // + // This callback behaves as an *interceptor*, not as a replacement: implementing it does **not** + // short-circuit the [Sinker]'s normal session-init handling. After your callback returns (with a nil + // error), the [Sinker] still performs its default bookkeeping for every session — it emits the + // default "session initialized with remote endpoint" log line and updates its internal + // resolved-start-block state. Your implementation runs *in addition to* that default behavior, so it + // should layer on whatever extra work you need (extra logging, persistence, etc.) rather than assume + // it has taken over session handling. + // // Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal - // error and the [Sinker] will shutdown + // error and the [Sinker] will shutdown. HandleSessionInit(ctx context.Context, req *pbsubstreamsrpcv3.Request, sessionInit *pbsubstreamsrpc.SessionInit) error }