Skip to content

Streaming/Concurrent Sink with Per-Message Response Streaming #3340

@yhl25

Description

@yhl25

Summary

Currently, Numaflow sink processes messages batch-by-batch: a batch is read from the ISB, the entire batch is sent to the sink, and we wait for all responses before acknowledging. This blocks throughput and does not allow for concurrency independent of the ISB read batch size.

This issue proposes:

  1. Concurrent inflight control in the sink forwarder — maintain a configurable number of messages in-flight at all times, decoupled from the ISB read batch size.
  2. A new StreamingSinker SDK interface — instead of returning a slice of responses per batch, the SDK streams back individual responses as messages are processed.

Problem

Batch-Coupled Throughput

Today's pipeline:

ISB Read (batch N) → Send batch to Sink → Wait for all N responses → ACK batch → repeat                                                                                                                                                                                                                                                                                                  
  • Concurrency is implicitly tied to the ISB readBatchSize.
  • No way to keep the sink saturated with a fixed window of in-flight messages independent of batch size.
  • A slow sink holds the entire batch in memory until all messages are done.

Batch Response API

The current SDK interface (Sinker) collects all responses into a Responses array and returns them all at once:

  • The server cannot start ACKing any messages until the entire batch is finished.
  • Not suitable for sinks that naturally produce results as they go (e.g., HTTP calls, DB writes with partial success).

Proposed Changes

1. Sink Concurrency Setting

Add a new concurrency field to the sink vertex spec (and mono-vertex sink config):

sink:           
  udsink:
    container:
      image: my-sink:latest
  concurrency: 50  # maintain up to 50 messages in-flight at all times
  • concurrency is independent of readBatchSize on the ISB reader.
  • The SinkWriter maintains a semaphore / sliding window of in-flight messages.
  • As responses come back per-message, the window slides forward — new messages are dispatched without waiting for the whole batch.
  • ACKs are issued per-message as responses arrive (subject to ordering constraints from the tracker).

Key invariants to preserve:

  • Watermark progression must remain correct.
  • Fallback sink routing is preserved per-message.
  • Cancellation / error-drain behavior on non-retryable errors is preserved.

2. New StreamingSinker gRPC Interface

Update sink.proto to add a new service method that streams back individual per-message results rather than a batched array:

service Sink {
  // Existing batch interface (unchanged for backwards compatibility)                                                                                                                                                                                                                                                                                                                    
  rpc SinkFn(stream SinkRequest) returns (stream SinkResponse);

  // New: responses streamed back individually as each message is processed                                                                                                                                                                                                                                                                                                              
  rpc SinkFnStreaming(stream SinkRequest) returns (stream SinkResponseResult);
                                                                                                                                                                                                                                                                                                                                                                                         
  rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}

// A single per-message result streamed back immediately upon processing                                                                                                                                                                                                                                                                                                                 
message SinkResponseResult {
  string id = 1;                                                                                                                                                                                                                                                                                                                                                                         
  Status status = 2;
  string err_msg = 3;
  optional bytes serve_response = 4;
  optional SinkResponse.Result.Message on_success_msg = 5;
  optional Handshake handshake = 6;                                                                                                                                                                                                                                                                                                                                                      
}

The server detects whether the UDS implements StreamingSinker via capability handshake at startup and uses SinkFnStreaming if available, falling back to SinkFn otherwise. Fully backwards compatible.

3. Go SDK — New StreamingSinker Interface

Add a new interface alongside the existing Sinker:

type StreamingSinker interface {
    SinkStream(ctx context.Context, datumStreamCh <-chan Datum, responseCh chan<- Response)                                                                                                                                                                                                                                                                                              
}

The pattern mirrors MapStreamer — the user writes responses to a channel and closes it when done, rather than returning a collected slice. NewServer will accept both Sinker and StreamingSinker.

Example:

func (l *streamingLogSink) SinkStream(
    ctx context.Context,
    datumStreamCh <-chan sinksdk.Datum,                                                                                                                                                                                                                                                                                                                                                  
    responseCh chan<- sinksdk.Response,
) {                                                                                                                                                                                                                                                                                                                                                                                      
    defer close(responseCh) // must close to signal end of responses
    for d := range datumStreamCh {                                                                                                                                                                                                                                                                                                                                                       
        // process d ...
        responseCh <- sinksdk.ResponseOK(d.ID()) // stream response immediately                                                                                                                                                                                                                                                                                                          
    }                                                                                                                                                                                                                                                                                                                                                                                    
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions