Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## Unreleased

### Sink

* Fix `Sinker.requestActiveStartBlock` not being set when the handler implements `SinkerSessionInitHandler`, which previously caused `ProgressMessageLastContiguousBlock` to be incorrect for production-mode mapper stages.

### Fixed

- Server: per-block execution timeouts (`--substreams-block-execution-timeout`) are no longer silently swallowed when a WASM host-function panic (e.g. wasmtime) coincides with the deadline. Previously, `recoverExecutionPanic` would return `nil` instead of `CodeDeadlineExceeded`, causing the offending block to be skipped and the stream to complete successfully.
Expand Down
40 changes: 28 additions & 12 deletions sink/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,19 +784,9 @@ func (s *Sinker) doRequest(
}

case *pbsubstreamsrpc.Response_Session:
if sh, ok := handler.(SinkerSessionInitHandler); ok {
if err := sh.HandleSessionInit(ctx, s.request, r.Session); err != nil {
return activeCursor, receivedDataMessage, fmt.Errorf("handle session init: %w", err)
}
break
if err := s.handleSessionInit(ctx, handler, r.Session); err != nil {
return activeCursor, receivedDataMessage, err
}
s.Logger.Info("session initialized with remote endpoint",
zap.Uint64("max_parallel_workers", r.Session.MaxParallelWorkers),
zap.Uint64("linear_handoff_block", r.Session.LinearHandoffBlock),
zap.Uint64("resolved_start_block", r.Session.ResolvedStartBlock),
zap.String("trace_id", r.Session.TraceId),
)
s.requestActiveStartBlock = r.Session.ResolvedStartBlock

default:
s.Logger.Info("received unknown type of message", zap.Reflect("message", r))
Expand All @@ -805,6 +795,32 @@ func (s *Sinker) doRequest(
}
}

// handleSessionInit processes a `Response_Session` message received from the Substreams endpoint.
//
// If the registered handler implements [SinkerSessionInitHandler], its [SinkerSessionInitHandler.HandleSessionInit]
// callback is invoked. We do *not* short-circuit afterwards: the default logging and the
// `s.requestActiveStartBlock` assignment are sinker-internal bookkeeping that must run on every
// `Response_Session` message regardless of whether a custom handler is installed. The
// `requestActiveStartBlock` field is later consumed in the `Response_ModulesProgress` case to identify the
// contiguous completed range covering the user's resolved start block, so it must always be kept up to date.
func (s *Sinker) handleSessionInit(ctx context.Context, handler SinkerHandler, session *pbsubstreamsrpc.SessionInit) error {
if sh, ok := handler.(SinkerSessionInitHandler); ok {
if err := sh.HandleSessionInit(ctx, s.request, session); err != nil {
return fmt.Errorf("handle session init: %w", err)
}
}

s.Logger.Info("session initialized with remote endpoint",
zap.Uint64("max_parallel_workers", session.MaxParallelWorkers),
zap.Uint64("linear_handoff_block", session.LinearHandoffBlock),
zap.Uint64("resolved_start_block", session.ResolvedStartBlock),
zap.String("trace_id", session.TraceId),
)
s.requestActiveStartBlock = session.ResolvedStartBlock

return nil
}

func (s *Sinker) processBlockScopedData(
ctx context.Context,
handler SinkerHandler,
Expand Down
93 changes: 93 additions & 0 deletions sink/sinker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package sink

import (
"context"
"errors"
"testing"

pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
pbsubstreamsrpcv3 "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// noopHandler implements SinkerHandler but NOT SinkerSessionInitHandler.
type noopHandler struct{}

func (noopHandler) HandleBlockScopedData(ctx context.Context, data *pbsubstreamsrpc.BlockScopedData, isLive *bool, cursor *Cursor) error {
return nil
}

func (noopHandler) HandleBlockUndoSignal(ctx context.Context, undoSignal *pbsubstreamsrpc.BlockUndoSignal, cursor *Cursor) error {
return nil
}

// sessionInitRecordingHandler implements both SinkerHandler and SinkerSessionInitHandler, recording
// the arguments it was invoked with so the test can assert dispatch happened correctly.
type sessionInitRecordingHandler struct {
noopHandler

calls int
gotReq *pbsubstreamsrpcv3.Request
gotSession *pbsubstreamsrpc.SessionInit
returnError error
}

func (h *sessionInitRecordingHandler) HandleSessionInit(ctx context.Context, req *pbsubstreamsrpcv3.Request, sessionInit *pbsubstreamsrpc.SessionInit) error {
h.calls++
h.gotReq = req
h.gotSession = sessionInit
return h.returnError
}

func newTestSinker() *Sinker {
return &Sinker{
SinkerConfig: &SinkerConfig{Logger: zlog},
request: &pbsubstreamsrpcv3.Request{StartBlockNum: 100},
}
}

func TestSinker_handleSessionInit(t *testing.T) {
const resolvedStartBlock = uint64(12345)

t.Run("no session-init handler still sets requestActiveStartBlock", func(t *testing.T) {
s := newTestSinker()
session := &pbsubstreamsrpc.SessionInit{ResolvedStartBlock: resolvedStartBlock}

err := s.handleSessionInit(context.Background(), noopHandler{}, session)
require.NoError(t, err)
assert.Equal(t, resolvedStartBlock, s.requestActiveStartBlock)
})

t.Run("session-init handler invoked and requestActiveStartBlock still set", func(t *testing.T) {
s := newTestSinker()
handler := &sessionInitRecordingHandler{}
session := &pbsubstreamsrpc.SessionInit{ResolvedStartBlock: resolvedStartBlock}

err := s.handleSessionInit(context.Background(), handler, session)
require.NoError(t, err)

// Handler was invoked exactly once with the sinker's request and the received session.
assert.Equal(t, 1, handler.calls)
assert.Same(t, s.request, handler.gotReq)
assert.Same(t, session, handler.gotSession)

// Regression assertion for the bug: the internal bookkeeping must run even when a custom
// session-init handler is installed (previously a `break` short-circuited this assignment).
assert.Equal(t, resolvedStartBlock, s.requestActiveStartBlock)
})

t.Run("session-init handler error is propagated and wrapped", func(t *testing.T) {
s := newTestSinker()
sentinel := errors.New("boom")
handler := &sessionInitRecordingHandler{returnError: sentinel}
session := &pbsubstreamsrpc.SessionInit{ResolvedStartBlock: resolvedStartBlock}

err := s.handleSessionInit(context.Background(), handler, session)
require.Error(t, err)
assert.ErrorIs(t, err, sentinel)

// On handler failure we never reach the bookkeeping, so the field stays at its zero value.
assert.Zero(t, s.requestActiveStartBlock)
})
}
10 changes: 9 additions & 1 deletion sink/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,16 @@ type SinkerSessionInitHandler interface {
//
// The [HandleSessionInit] is optional and can be nil.
//
// This callback behaves as an *interceptor*, not as a replacement: implementing it does **not**
// short-circuit the [Sinker]'s normal session-init handling. After your callback returns (with a nil
// error), the [Sinker] still performs its default bookkeeping for every session — it emits the
// default "session initialized with remote endpoint" log line and updates its internal
// resolved-start-block state. Your implementation runs *in addition to* that default behavior, so it
// should layer on whatever extra work you need (extra logging, persistence, etc.) rather than assume
// it has taken over session handling.
//
// Your handler must return an error value that can be nil or non-nil. If non-nil, the error is assumed to be a fatal
// error and the [Sinker] will shutdown
// error and the [Sinker] will shutdown.
HandleSessionInit(ctx context.Context, req *pbsubstreamsrpcv3.Request, sessionInit *pbsubstreamsrpc.SessionInit) error
}

Expand Down
Loading