diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index 7546dcdf9..7e8b36d55 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -11,6 +11,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## Unreleased +### Sink + +* Fix `Sinker.requestActiveStartBlock` not being set when the handler implements `SinkerSessionInitHandler`, which previously caused `ProgressMessageLastContiguousBlock` to be incorrect for production-mode mapper stages. + ### Fixed - Server: per-block execution timeouts (`--substreams-block-execution-timeout`) are no longer silently swallowed when a WASM host-function panic (e.g. wasmtime) coincides with the deadline. Previously, `recoverExecutionPanic` would return `nil` instead of `CodeDeadlineExceeded`, causing the offending block to be skipped and the stream to complete successfully. diff --git a/sink/sinker.go b/sink/sinker.go index d1808539c..acd09858e 100644 --- a/sink/sinker.go +++ b/sink/sinker.go @@ -784,19 +784,9 @@ func (s *Sinker) doRequest( } case *pbsubstreamsrpc.Response_Session: - if sh, ok := handler.(SinkerSessionInitHandler); ok { - if err := sh.HandleSessionInit(ctx, s.request, r.Session); err != nil { - return activeCursor, receivedDataMessage, fmt.Errorf("handle session init: %w", err) - } - break + if err := s.handleSessionInit(ctx, handler, r.Session); err != nil { + return activeCursor, receivedDataMessage, err } - s.Logger.Info("session initialized with remote endpoint", - zap.Uint64("max_parallel_workers", r.Session.MaxParallelWorkers), - zap.Uint64("linear_handoff_block", r.Session.LinearHandoffBlock), - zap.Uint64("resolved_start_block", r.Session.ResolvedStartBlock), - zap.String("trace_id", r.Session.TraceId), - ) - s.requestActiveStartBlock = r.Session.ResolvedStartBlock default: s.Logger.Info("received unknown type of message", zap.Reflect("message", r)) @@ -805,6 +795,32 @@ func (s *Sinker) doRequest( } } +// handleSessionInit processes a `Response_Session` message received from the Substreams endpoint. +// +// If the registered handler implements [SinkerSessionInitHandler], its [SinkerSessionInitHandler.HandleSessionInit] +// callback is invoked. We do *not* short-circuit afterwards: the default logging and the +// `s.requestActiveStartBlock` assignment are sinker-internal bookkeeping that must run on every +// `Response_Session` message regardless of whether a custom handler is installed. The +// `requestActiveStartBlock` field is later consumed in the `Response_ModulesProgress` case to identify the +// contiguous completed range covering the user's resolved start block, so it must always be kept up to date. +func (s *Sinker) handleSessionInit(ctx context.Context, handler SinkerHandler, session *pbsubstreamsrpc.SessionInit) error { + if sh, ok := handler.(SinkerSessionInitHandler); ok { + if err := sh.HandleSessionInit(ctx, s.request, session); err != nil { + return fmt.Errorf("handle session init: %w", err) + } + } + + s.Logger.Info("session initialized with remote endpoint", + zap.Uint64("max_parallel_workers", session.MaxParallelWorkers), + zap.Uint64("linear_handoff_block", session.LinearHandoffBlock), + zap.Uint64("resolved_start_block", session.ResolvedStartBlock), + zap.String("trace_id", session.TraceId), + ) + s.requestActiveStartBlock = session.ResolvedStartBlock + + return nil +} + func (s *Sinker) processBlockScopedData( ctx context.Context, handler SinkerHandler, diff --git a/sink/sinker_test.go b/sink/sinker_test.go new file mode 100644 index 000000000..d4814181a --- /dev/null +++ b/sink/sinker_test.go @@ -0,0 +1,93 @@ +package sink + +import ( + "context" + "errors" + "testing" + + pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" + pbsubstreamsrpcv3 "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// noopHandler implements SinkerHandler but NOT SinkerSessionInitHandler. +type noopHandler struct{} + +func (noopHandler) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error { + return nil +} + +func (noopHandler) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error { + return nil +} + +// sessionInitRecordingHandler implements both SinkerHandler and SinkerSessionInitHandler, recording +// the arguments it was invoked with so the test can assert dispatch happened correctly. +type sessionInitRecordingHandler struct { + noopHandler + + calls int + gotReq *pbsubstreamsrpcv3.Request + gotSession *pbsubstreamsrpc.SessionInit + returnError error +} + +func (h *sessionInitRecordingHandler) HandleSessionInit(ctx context.Context, req *pbsubstreamsrpcv3.Request, sessionInit *pbsubstreamsrpc.SessionInit) error { + h.calls++ + h.gotReq = req + h.gotSession = sessionInit + return h.returnError +} + +func newTestSinker() *Sinker { + return &Sinker{ + SinkerConfig: &SinkerConfig{Logger: zlog}, + request: &pbsubstreamsrpcv3.Request{StartBlockNum: 100}, + } +} + +func TestSinker_handleSessionInit(t *testing.T) { + const resolvedStartBlock = uint64(12345) + + t.Run("no session-init handler still sets requestActiveStartBlock", func(t *testing.T) { + s := newTestSinker() + session := &pbsubstreamsrpc.SessionInit{ResolvedStartBlock: resolvedStartBlock} + + err := s.handleSessionInit(context.Background(), noopHandler{}, session) + require.NoError(t, err) + assert.Equal(t, resolvedStartBlock, s.requestActiveStartBlock) + }) + + t.Run("session-init handler invoked and requestActiveStartBlock still set", func(t *testing.T) { + s := newTestSinker() + handler := &sessionInitRecordingHandler{} + session := &pbsubstreamsrpc.SessionInit{ResolvedStartBlock: resolvedStartBlock} + + err := s.handleSessionInit(context.Background(), handler, session) + require.NoError(t, err) + + // Handler was invoked exactly once with the sinker's request and the received session. + assert.Equal(t, 1, handler.calls) + assert.Same(t, s.request, handler.gotReq) + assert.Same(t, session, handler.gotSession) + + // Regression assertion for the bug: the internal bookkeeping must run even when a custom + // session-init handler is installed (previously a `break` short-circuited this assignment). + assert.Equal(t, resolvedStartBlock, s.requestActiveStartBlock) + }) + + t.Run("session-init handler error is propagated and wrapped", func(t *testing.T) { + s := newTestSinker() + sentinel := errors.New("boom") + handler := &sessionInitRecordingHandler{returnError: sentinel} + session := &pbsubstreamsrpc.SessionInit{ResolvedStartBlock: resolvedStartBlock} + + err := s.handleSessionInit(context.Background(), handler, session) + require.Error(t, err) + assert.ErrorIs(t, err, sentinel) + + // On handler failure we never reach the bookkeeping, so the field stays at its zero value. + assert.Zero(t, s.requestActiveStartBlock) + }) +} diff --git a/sink/types.go b/sink/types.go index 8712148e9..7823abe55 100644 --- a/sink/types.go +++ b/sink/types.go @@ -181,8 +181,16 @@ type SinkerSessionInitHandler interface { // // The [HandleSessionInit] is optional and can be nil. // + // This callback behaves as an *interceptor*, not as a replacement: implementing it does **not** + // short-circuit the [Sinker]'s normal session-init handling. After your callback returns (with a nil + // error), the [Sinker] still performs its default bookkeeping for every session — it emits the + // default "session initialized with remote endpoint" log line and updates its internal + // resolved-start-block state. Your implementation runs *in addition to* that default behavior, so it + // should layer on whatever extra work you need (extra logging, persistence, etc.) rather than assume + // it has taken over session handling. + // // Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal - // error and the [Sinker] will shutdown + // error and the [Sinker] will shutdown. HandleSessionInit(ctx context.Context, req *pbsubstreamsrpcv3.Request, sessionInit *pbsubstreamsrpc.SessionInit) error }