Skip to content

Kafka rebalancing can trigger multiple pod restarts #3355

@vaibhavtiwari33

Description

@vaibhavtiwari33

Describe the bug

Kafka rebalancing can trigger multiple restarts of the built-in kafka source vtx/mvtx.

To Reproduce
Steps to reproduce the behavior:

  1. Create a pipeline/mvtx with built-in kafka source
  2. Bring up a second pipeline/mvtx with built-in kafka source for same consumer group
  3. Kafka rebalancing triggers
  4. consumer sees RebalanceInProgress errors when committing offsets. Triggers a pod restart (non-retryable error).
  5. This triggers another kafka rebalancing since current consumer leaves the group.
  6. There is a chance that during restart of the kafka consumer, we encounter a timeout while fetching pending on the topic since broker is busy rebalancing.
  7. This fails the start-up of the kafka consumer during restart, triggering another restart, and hence another rebalancing ...

Expected behavior
The consumer should ideally only restart once, when it encountered the non-retryable error.

Logs

Captured multiple pod restarts after rebalancing event
simple-kafka-consumer-daemon-54db7b88cd-4st8l    1/1     Running            0               56m
simple-kafka-consumer-kafka-consumer-0-talva     2/2     Running            4 (80s ago)     50m
simple-kafka-consumer-log-sink-0-ycxti           2/2     Running            0               56m
simple-kafka-consumer2-daemon-64d8c94fcf-dcxzz   1/1     Running            0               3m46s
simple-kafka-consumer2-kafka-consumer-0-7f433    1/2     CrashLoopBackOff   4 (69s ago)     3m46s
simple-kafka-consumer2-log-sink-0-qaron          2/2     Running            0               3m46s
Logs from restarting pod
Defaulted container "numa" out of: numa, init (init), monitor (init)
{"timestamp":"2026-04-08T00:53:42.788616Z","level":"INFO","message":"Numaflow runtime details","VERSION_INFO":"Version: latest+672a159.dirty, BuildDate: 2026-04-07T19:27:45Z, GitCommit: 672a1591316960116cfbac9231aea2b58227c8fa, GitTag: , GitTreeState: dirty, RustVersion: rustc 1.93.1 (01f6ddf75 2026-02-11), Platform: linux/x86_64","tokio_worker_threads":1,"target":"numaflow"}
{"timestamp":"2026-04-08T00:53:42.788845Z","level":"INFO","message":"Starting processing pipeline","target":"numaflow"}
{"timestamp":"2026-04-08T00:53:42.789037Z","level":"INFO","message":"Loaded pipeline spec: Vertex { api_version: None, kind: None, metadata: Some(ObjectMeta { annotations: None, creation_timestamp: None, deletion_grace_period_seconds: None, deletion_timestamp: None, finalizers: None, generate_name: None, generation: None, labels: None, managed_fields: None, name: Some(\"simple-kafka-consumer-kafka-consumer\"), namespace: Some(\"numaflow-system\"), owner_references: None, resource_version: None, self_link: None, uid: None }), spec: VertexSpec { affinity: None, automount_service_account_token: None, container_template: None, dns_config: None, dns_policy: None, from_edges: None, image_pull_secrets: None, init_container_template: None, init_containers: None, inter_step_buffer: None, inter_step_buffer_service_name: Some(\"\"), lifecycle: Some(VertexLifecycle { desired_phase: None }), limits: Some(VertexLimits { buffer_max_length: Some(30000), buffer_usage_limit: Some(80), rate_limit: None, read_batch_size: Some(500), read_timeout: Some(1s) }), metadata: None, name: \"kafka-consumer\", node_selector: None, ordered: Some(Ordered { enabled: None }), partitions: None, pipeline_name: \"simple-kafka-consumer\", priority: None, priority_class_name: None, replicas: Some(0), resource_claims: None, runtime_class_name: None, scale: Some(Scale { disabled: None, lookback_seconds: None, max: None, min: None, replicas_per_scale: None, replicas_per_scale_down: None, replicas_per_scale_up: None, scale_down_cooldown_seconds: None, scale_up_cooldown_seconds: None, target_buffer_availability: None, target_processing_seconds: None, zero_replica_sleep_seconds: None }), security_context: None, service_account_name: None, side_inputs: None, side_inputs_container_template: None, sidecars: None, sink: None, source: Some(Source { generator: None, http: None, jetstream: None, kafka: Some(KafkaSource { brokers: Some([\"kafka:9092\"]), config: None, consumer_group: Some(\"my-consumer-group\"), kafka_version: None, sasl: None, tls: None, topic: \"orders-json-1,orders-json-2\" }), nats: None, pulsar: None, serving: None, sqs: None, transformer: None, udsource: None }), to_edges: Some([CombinedEdge { conditions: None, from: \"kafka-consumer\", from_vertex_limits: Some(VertexLimits { buffer_max_length: Some(30000), buffer_usage_limit: Some(80), rate_limit: None, read_batch_size: Some(500), read_timeout: Some(1s) }), from_vertex_partition_count: Some(1), from_vertex_type: \"Source\", on_full: None, to: \"log-sink\", to_vertex_limits: Some(VertexLimits { buffer_max_length: Some(30000), buffer_usage_limit: Some(80), rate_limit: None, read_batch_size: Some(500), read_timeout: Some(1s) }), to_vertex_ordered: Some(Ordered { enabled: None }), to_vertex_partition_count: Some(1), to_vertex_type: \"Sink\" }]), tolerations: None, udf: None, update_strategy: Some(UpdateStrategy { rolling_update: None, type: None }), volumes: None, watermark: Some(Watermark { disabled: None, idle_source: None, max_delay: Some(30s) }) }, status: Some(VertexStatus { conditions: None, current_hash: None, desired_replicas: Some(0), last_scaled_at: None, message: None, observed_generation: None, phase: Some(\"\"), ready_replicas: None, reason: None, replicas: Some(0), selector: None, update_hash: None, updated_ready_replicas: None, updated_replicas: None }) }","target":"numaflow_core::config::pipeline"}
{"timestamp":"2026-04-08T00:53:42.789141Z","level":"INFO","message":"Starting pipeline forwarder with config: PipelineConfig {\n    pipeline_name: \"simple-kafka-consumer\",\n    vertex_name: \"kafka-consumer\",\n    replica: 0,\n    batch_size: 500,\n    writer_concurrency: 500,\n    read_timeout: 1s,\n    graceful_shutdown_time: 20s,\n    js_client_config: ClientConfig {\n        url: \"nats://isbsvc-default-js-svc.numaflow-system.svc:4222\",\n        user: Some(\n            \"GpqmQNfG\",\n        ),\n        password: Some(\n            \"[REDACTED]\",\n        ),\n    },\n    from_vertex_config: [],\n    to_vertex_config: [\n        ToVertexConfig {\n            name: \"log-sink\",\n            partitions: 1,\n            writer_config: BufferWriterConfig {\n                streams: [\n                    Stream {\n                        name: \"numaflow-system-simple-kafka-consumer-log-sink-0\",\n                        vertex: \"log-sink\",\n                        partition: 0,\n                    },\n                ],\n                max_length: 30000,\n                usage_limit: 0.8,\n                buffer_full_strategy: RetryUntilSuccess,\n            },\n            conditions: None,\n            to_vertex_type: Sink,\n            ordered_processing_enabled: false,\n        },\n    ],\n    vertex_config: Source(\n        SourceVtxConfig {\n            source_config: SourceConfig {\n                read_ahead: false,\n                source_type: Kafka(\n                    KafkaSourceConfig {\n                        brokers: [\n                            \"kafka:9092\",\n                        ],\n                        topics: [\n                            \"orders-json-1\",\n                            \"orders-json-2\",\n                        ],\n                        consumer_group: \"my-consumer-group\",\n                        auth: None,\n                        tls: None,\n                        kafka_raw_config: {},\n                    },\n                ),\n            },\n            transformer_config: None,\n        },\n    ),\n    vertex_type: Source,\n    metrics_config: MetricsConfig {\n        metrics_server_listen_port: 2469,\n        lag_check_interval_in_secs: 5,\n        lag_refresh_interval_in_secs: 3,\n        lookback_window_in_secs: 120,\n    },\n    watermark_config: Some(\n        Source(\n            SourceWatermarkConfig {\n                max_delay: 30s,\n                source_bucket_config: BucketConfig {\n                    vertex: \"kafka-consumer\",\n                    partitions: [\n                        0,\n                    ],\n                    ot_bucket: \"numaflow-system-simple-kafka-consumer-kafka-consumer_SOURCE_OT\",\n                    delay: Some(\n                        100ms,\n                    ),\n                },\n                to_vertex_bucket_config: [\n                    BucketConfig {\n                        vertex: \"log-sink\",\n                        partitions: [\n                            0,\n                        ],\n                        ot_bucket: \"numaflow-system-simple-kafka-consumer-kafka-consumer-log-sink_OT\",\n                        delay: Some(\n                            100ms,\n                        ),\n                    },\n                ],\n                idle_config: IdleConfig {\n                    increment_by: 0ns,\n                    step_interval: 100ms,\n                    threshold: 0ns,\n                    init_source_delay: None,\n                },\n            },\n        ),\n    ),\n    callback_config: None,\n    isb_config: None,\n    rate_limit: None,\n    ordered_processing_enabled: false,\n}","target":"numaflow_core"}
{"timestamp":"2026-04-08T00:53:42.789295Z","level":"INFO","message":"Starting source forwarder","target":"numaflow_core::pipeline::forwarder"}
{"timestamp":"2026-04-08T00:53:42.791771Z","level":"INFO","message":"connected successfully","server":"4222","max_payload":"1048576","target":"async_nats::connector"}
{"timestamp":"2026-04-08T00:53:42.791827Z","level":"INFO","message":"event: connected","target":"async_nats"}
{"timestamp":"2026-04-08T00:53:42.801578Z","level":"INFO","message":"Processed messages per second","processed":"0","target":"numaflow_core::tracker"}
{"timestamp":"2026-04-08T00:53:42.801724Z","level":"INFO","message":"Source Watermark Summary: fetched_wm=1775609518549, incoming_offset=N/A, processors={source-kafka-consumer-0(active)[p0:[(wm=1775609518554,off=1775609552245567)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-1(active)[p0:[(wm=1775609518559,off=1775609552247739)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-10(active)[p0:[(wm=1775609518585,off=1775609552244586)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-11(active)[p0:[(wm=1775609518568,off=1775609552244198)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-12(active)[p0:[(wm=1775609518549,off=1775609552247005)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-13(active)[p0:[(wm=1775609518554,off=1775609552247370)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-14(active)[p0:[(wm=1775609518557,off=1775609552245888)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-15(active)[p0:[(wm=1775609534235,off=1775609594246322)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-16(active)[p0:[(wm=1775609534255,off=1775609594247989)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-17(active)[p0:[(wm=1775609534262,off=1775609594145537)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-18(active)[p0:[(wm=1775609534235,off=1775609594247254)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-19(active)[p0:[(wm=1775609534235,off=1775609594246953)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-2(active)[p0:[(wm=1775609518554,off=1775609552246300)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-3(active)[p0:[(wm=1775609518574,off=1775609552245050)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-4(active)[p0:[(wm=1775609518582,off=1775609552248043)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-5(active)[p0:[(wm=1775609534180,off=1775609594245606)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-6(active)[p0:[(wm=1775609534247,off=1775609594245994)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-7(active)[p0:[(wm=1775609534261,off=1775609594246586)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-8(active)[p0:[(wm=1775609534235,off=1775609594247603)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]], source-kafka-consumer-9(active)[p0:[(wm=1775609534235,off=1775609594245104)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)->(wm=-1,off=-1)]]}","target":"numaflow_core::watermark::source::source_wm_fetcher"}
{"timestamp":"2026-04-08T00:53:43.801717Z","level":"INFO","message":"Processed messages per second","processed":"0","target":"numaflow_core::tracker"}
{"timestamp":"2026-04-08T00:53:44.802181Z","level":"INFO","message":"Processed messages per second","processed":"0","target":"numaflow_core::tracker"}
{"timestamp":"2026-04-08T00:53:45.801982Z","level":"INFO","message":"Processed messages per second","processed":"0","target":"numaflow_core::tracker"}
{"timestamp":"2026-04-08T00:53:46.801717Z","level":"INFO","message":"Processed messages per second","processed":"0","target":"numaflow_core::tracker"}
{"timestamp":"2026-04-08T00:53:47.802471Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-6\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802495Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-9\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802498Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-19\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802501Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-2\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802504Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-0\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802506Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-1\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802509Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-3\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802513Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-15\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802516Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-11\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802521Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-12\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802525Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-17\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802529Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-5\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802534Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-13\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802539Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-14\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802543Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-7\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802546Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-8\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802548Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-10\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802551Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-4\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802554Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-16\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802556Z","level":"INFO","message":"Processor status change","processor":"\"source-kafka-consumer-18\"","old_status":"Active","new_status":"InActive","behind_ms":5008,"last_seen":1775609622794,"target":"numaflow_core::watermark::processor::manager"}
{"timestamp":"2026-04-08T00:53:47.802563Z","level":"INFO","message":"Processed messages per second","processed":"0","target":"numaflow_core::tracker"}
{"timestamp":"2026-04-08T00:53:48.802108Z","level":"INFO","message":"Processed messages per second","processed":"0","target":"numaflow_core::tracker"}
{"timestamp":"2026-04-08T00:53:48.806483Z","level":"ERROR","message":"Error fetching pending messages","e":"Kafka(\"Failed to get committed offsets: Meta data fetch error: OperationTimedOut (Local: Timed out)\")","target":"numaflow_kafka::source"}
{"timestamp":"2026-04-08T00:53:48.806538Z","level":"ERROR","message":"Error running pipeline","e":"Source(\"Failed to get pending messages: Kafka(\\\"Failed to get committed offsets: Meta data fetch error: OperationTimedOut (Local: Timed out)\\\")\")","target":"numaflow_core"}
{"timestamp":"2026-04-08T00:53:48.806734Z","level":"INFO","message":"Gracefully Exiting...","target":"numaflow_core"}
{"timestamp":"2026-04-08T00:53:48.806757Z","level":"INFO","message":"Exited.","target":"numaflow"}
{"timestamp":"2026-04-08T00:53:53.810490Z","level":"WARN","message":"librdkafka: REQTMOUT [thrd:GroupCoordinator]: GroupCoordinator/1: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests","log.target":"librdkafka","log.module_path":"rdkafka::client","log.file":"/usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/rdkafka-0.38.0/src/client.rs","log.line":77,"target":"librdkafka"}
{"timestamp":"2026-04-08T00:53:53.810525Z","level":"ERROR","message":"librdkafka: FAIL [thrd:GroupCoordinator]: GroupCoordinator: 1 request(s) timed out: disconnect (average rtt 0.498ms) (after 10005ms in state UP)","log.target":"librdkafka","log.module_path":"rdkafka::client","log.file":"/usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/rdkafka-0.38.0/src/client.rs","log.line":74,"target":"librdkafka"}
{"timestamp":"2026-04-08T00:53:53.810531Z","level":"ERROR","message":"librdkafka: Global error: OperationTimedOut (Local: Timed out): GroupCoordinator: 1 request(s) timed out: disconnect (average rtt 0.498ms) (after 10005ms in state UP)","log.target":"rdkafka::client","log.module_path":"rdkafka::client","log.file":"/usr/local/cargo/registry/src/index.crates.io-1949cf8c6b5b557f/rdkafka-0.38.0/src/client.rs","log.line":116,"target":"rdkafka::client"}

Environment (please complete the following information):

  • Numaflow: main-8119090

Additional context

  • during restart, kafka actor starts up
    impl KafkaActor {
    async fn start(
    config: KafkaSourceConfig,
    batch_size: usize,
    read_timeout: Duration,
    handler_rx: mpsc::Receiver<KafkaActorMessage>,
    cancel_token: CancellationToken,
    ) -> Result<()> {
  • makes call to get pending messages
    actor
    .pending_messages()
    .await
    .map_err(|err| Error::Kafka(format!("Failed to get pending messages: {err:?}")))?;
  • Fails here due to timeout
    let committed = consumer.committed_offsets(tpl, timeout).map_err(|e| {
    Error::Kafka(format!("Failed to get committed offsets: {e}"))
    })?;
  • This fails the source from starting, triggering another pod restart

Message from the maintainers:

Impacted by this bug? Give it a 👍. We often sort issues this way to know what to prioritize.

For quick help and support, join our slack channel.

Metadata

Metadata

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions