Skip to content

Commit 1c7bef1

Browse files
benjamin99claude
andauthored
feat: add configurable consumer name for PulsarConsumerSource (#78)
- Add ConsumerName field to PulsarConsumerSource struct - Extract getConsumerName() method to handle consumer name logic - Falls back to os.Hostname() when ConsumerName is not set - Add unit test to validate consumer name behavior 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude <noreply@anthropic.com>
1 parent ae7ef5f commit 1c7bef1

2 files changed

Lines changed: 45 additions & 5 deletions

File tree

pkg/source/pulsar.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ type PulsarConsumerSource struct {
165165
PulsarSubscription string
166166
PulsarReplicateState bool
167167
PulsarMaxReconnect *uint
168+
ConsumerName string
168169

169170
client pulsar.Client
170171
consumer pulsar.Consumer
@@ -173,19 +174,25 @@ type PulsarConsumerSource struct {
173174
ackTrackers *ackTrackers
174175
}
175176

176-
func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Change, err error) {
177-
host, err := os.Hostname()
177+
func (p *PulsarConsumerSource) getConsumerName() (string, error) {
178+
if p.ConsumerName != "" {
179+
return p.ConsumerName, nil
180+
}
181+
hostname, err := os.Hostname()
178182
if err != nil {
179-
return nil, err
183+
return "", err
180184
}
185+
return hostname, nil
186+
}
181187

182-
p.client, err = pulsar.NewClient(p.PulsarOption)
188+
func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Change, err error) {
189+
consumerName, err := p.getConsumerName()
183190
if err != nil {
184191
return nil, err
185192
}
186193

187194
p.consumer, err = p.client.Subscribe(pulsar.ConsumerOptions{
188-
Name: host,
195+
Name: consumerName,
189196
Topic: p.PulsarTopic,
190197
SubscriptionName: p.PulsarSubscription,
191198
ReplicateSubscriptionState: p.PulsarReplicateState,

pkg/source/pulsar_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package source
22

33
import (
44
"context"
5+
"os"
56
"strconv"
67
"testing"
78
"time"
@@ -250,3 +251,35 @@ func TestPulsarConsumerSource(t *testing.T) {
250251
}
251252
src.Stop()
252253
}
254+
255+
func TestPulsarConsumerSourceGetConsumerName(t *testing.T) {
256+
// test with custom consumer name set
257+
src := &PulsarConsumerSource{
258+
ConsumerName: "custom-test-consumer",
259+
}
260+
261+
name, err := src.getConsumerName()
262+
if err != nil {
263+
t.Fatalf("unexpected error: %v", err)
264+
}
265+
if name != "custom-test-consumer" {
266+
t.Fatalf("expected consumer name 'custom-test-consumer', got '%s'", name)
267+
}
268+
269+
// test with empty consumer name (should fall back to hostname)
270+
src2 := &PulsarConsumerSource{}
271+
272+
name2, err := src2.getConsumerName()
273+
if err != nil {
274+
t.Fatalf("unexpected error: %v", err)
275+
}
276+
if name2 == "" {
277+
t.Fatal("expected non-empty hostname, got empty string")
278+
}
279+
280+
// verify that hostname fallback returns a non-empty string
281+
hostname, _ := os.Hostname()
282+
if name2 != hostname {
283+
t.Fatalf("expected hostname '%s', got '%s'", hostname, name2)
284+
}
285+
}

0 commit comments

Comments
 (0)