We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
2 parents 1c7bef1 + aee7aa0 commit 0dc7134Copy full SHA for 0dc7134
1 file changed
pkg/source/pulsar.go
@@ -178,6 +178,7 @@ func (p *PulsarConsumerSource) getConsumerName() (string, error) {
178
if p.ConsumerName != "" {
179
return p.ConsumerName, nil
180
}
181
+
182
hostname, err := os.Hostname()
183
if err != nil {
184
return "", err
@@ -191,6 +192,11 @@ func (p *PulsarConsumerSource) Capture(cp cursor.Checkpoint) (changes chan Chang
191
192
return nil, err
193
194
195
+ p.client, err = pulsar.NewClient(p.PulsarOption)
196
+ if err != nil {
197
+ return nil, err
198
+ }
199
200
p.consumer, err = p.client.Subscribe(pulsar.ConsumerOptions{
201
Name: consumerName,
202
Topic: p.PulsarTopic,
0 commit comments