Summary
Currently, Numaflow sink processes messages batch-by-batch: a batch is read from the ISB, the entire batch is sent to the sink, and we wait for all responses before acknowledging. This blocks throughput and does not allow for concurrency independent of the ISB read batch size.
This issue proposes:
- Concurrent inflight control in the sink forwarder — maintain a configurable number of messages in-flight at all times, decoupled from the ISB read batch size.
- A new
StreamingSinker SDK interface — instead of returning a slice of responses per batch, the SDK streams back individual responses as messages are processed.
Problem
Batch-Coupled Throughput
Today's pipeline:
ISB Read (batch N) → Send batch to Sink → Wait for all N responses → ACK batch → repeat
- Concurrency is implicitly tied to the ISB
readBatchSize.
- No way to keep the sink saturated with a fixed window of in-flight messages independent of batch size.
- A slow sink holds the entire batch in memory until all messages are done.
Batch Response API
The current SDK interface (Sinker) collects all responses into a Responses array and returns them all at once:
- The server cannot start ACKing any messages until the entire batch is finished.
- Not suitable for sinks that naturally produce results as they go (e.g., HTTP calls, DB writes with partial success).
Proposed Changes
1. Sink Concurrency Setting
Add a new concurrency field to the sink vertex spec (and mono-vertex sink config):
sink:
udsink:
container:
image: my-sink:latest
concurrency: 50 # maintain up to 50 messages in-flight at all times
concurrency is independent of readBatchSize on the ISB reader.
- The
SinkWriter maintains a semaphore / sliding window of in-flight messages.
- As responses come back per-message, the window slides forward — new messages are dispatched without waiting for the whole batch.
- ACKs are issued per-message as responses arrive (subject to ordering constraints from the tracker).
Key invariants to preserve:
- Watermark progression must remain correct.
- Fallback sink routing is preserved per-message.
- Cancellation / error-drain behavior on non-retryable errors is preserved.
2. New StreamingSinker gRPC Interface
Update sink.proto to add a new service method that streams back individual per-message results rather than a batched array:
service Sink {
// Existing batch interface (unchanged for backwards compatibility)
rpc SinkFn(stream SinkRequest) returns (stream SinkResponse);
// New: responses streamed back individually as each message is processed
rpc SinkFnStreaming(stream SinkRequest) returns (stream SinkResponseResult);
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}
// A single per-message result streamed back immediately upon processing
message SinkResponseResult {
string id = 1;
Status status = 2;
string err_msg = 3;
optional bytes serve_response = 4;
optional SinkResponse.Result.Message on_success_msg = 5;
optional Handshake handshake = 6;
}
The server detects whether the UDS implements StreamingSinker via capability handshake at startup and uses SinkFnStreaming if available, falling back to SinkFn otherwise. Fully backwards compatible.
3. Go SDK — New StreamingSinker Interface
Add a new interface alongside the existing Sinker:
type StreamingSinker interface {
SinkStream(ctx context.Context, datumStreamCh <-chan Datum, responseCh chan<- Response)
}
The pattern mirrors MapStreamer — the user writes responses to a channel and closes it when done, rather than returning a collected slice. NewServer will accept both Sinker and StreamingSinker.
Example:
func (l *streamingLogSink) SinkStream(
ctx context.Context,
datumStreamCh <-chan sinksdk.Datum,
responseCh chan<- sinksdk.Response,
) {
defer close(responseCh) // must close to signal end of responses
for d := range datumStreamCh {
// process d ...
responseCh <- sinksdk.ResponseOK(d.ID()) // stream response immediately
}
}
Summary
Currently, Numaflow sink processes messages batch-by-batch: a batch is read from the ISB, the entire batch is sent to the sink, and we wait for all responses before acknowledging. This blocks throughput and does not allow for concurrency independent of the ISB read batch size.
This issue proposes:
StreamingSinkerSDK interface — instead of returning a slice of responses per batch, the SDK streams back individual responses as messages are processed.Problem
Batch-Coupled Throughput
Today's pipeline:
readBatchSize.Batch Response API
The current SDK interface (
Sinker) collects all responses into aResponsesarray and returns them all at once:Proposed Changes
1. Sink Concurrency Setting
Add a new
concurrencyfield to the sink vertex spec (and mono-vertex sink config):concurrencyis independent ofreadBatchSizeon the ISB reader.SinkWritermaintains a semaphore / sliding window of in-flight messages.Key invariants to preserve:
2. New
StreamingSinkergRPC InterfaceUpdate
sink.prototo add a new service method that streams back individual per-message results rather than a batched array:The server detects whether the UDS implements
StreamingSinkervia capability handshake at startup and usesSinkFnStreamingif available, falling back toSinkFnotherwise. Fully backwards compatible.3. Go SDK — New
StreamingSinkerInterfaceAdd a new interface alongside the existing
Sinker:The pattern mirrors
MapStreamer— the user writes responses to a channel and closes it when done, rather than returning a collected slice.NewServerwill accept bothSinkerandStreamingSinker.Example: