Skip to content

Commit bc170bd

Browse files
feat(source): add Tables filter and dynamic publication management (#83)
* feat(source): add Tables filter and dynamic publication management * feat(cmd): integrate Tables filter option in pg2pulsar and agent * test(source): add tests for Tables filter and publication management
1 parent c34d46c commit bc170bd

6 files changed

Lines changed: 396 additions & 9 deletions

File tree

cmd/agent.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,17 @@ func (a *Agent) cleanup() error {
179179
}
180180

181181
func (a *Agent) pg2pulsar(params *structpb.Struct) (*pb.AgentConfigResponse, error) {
182-
v, err := extract(params, "PGConnURL", "PGReplURL", "PulsarURL", "PulsarTopic", "DecodePlugin", "?StartLSN", "?PulsarTracker", "?PulsarTrackerInterval", "?PulsarTrackerReplicateState")
182+
v, err := extract(params, "PGConnURL", "PGReplURL", "PulsarURL", "PulsarTopic", "DecodePlugin", "?StartLSN", "?PulsarTracker", "?PulsarTrackerInterval", "?PulsarTrackerReplicateState", "?Tables")
183183
if err != nil {
184184
return nil, err
185185
}
186186

187-
pgSrc := &source.PGXSource{SetupConnStr: v["PGConnURL"].GetStringValue(), ReplConnStr: v["PGReplURL"].GetStringValue(), ReplSlot: trimSlot(v["PulsarTopic"].GetStringValue()), CreateSlot: true, CreatePublication: true, StartLSN: v["StartLSN"].GetStringValue(), DecodePlugin: v["DecodePlugin"].GetStringValue()}
187+
var tables []source.TableIdent
188+
if tablesStr := v["Tables"].GetStringValue(); tablesStr != "" {
189+
tables = source.ParseTableIdents(strings.Split(tablesStr, ",")...)
190+
}
191+
192+
pgSrc := &source.PGXSource{SetupConnStr: v["PGConnURL"].GetStringValue(), ReplConnStr: v["PGReplURL"].GetStringValue(), ReplSlot: trimSlot(v["PulsarTopic"].GetStringValue()), CreateSlot: true, CreatePublication: true, StartLSN: v["StartLSN"].GetStringValue(), DecodePlugin: v["DecodePlugin"].GetStringValue(), Tables: tables}
188193
pulsarSink := &sink.PulsarSink{PulsarOption: pulsar.ClientOptions{URL: v["PulsarURL"].GetStringValue()}, PulsarTopic: v["PulsarTopic"].GetStringValue()}
189194

190195
switch v["PulsarTracker"].GetStringValue() {

cmd/configure.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"log"
66
"strconv"
7+
"strings"
78
"time"
89

910
"github.com/replicase/pgcapture/pkg/decode"
@@ -27,6 +28,7 @@ var (
2728
ConfigPulsarTrackerReplicateState bool
2829
ConfigDecodePlugin string
2930
ConfigBatchTXSize int
31+
ConfigTables []string
3032
)
3133

3234
func init() {
@@ -44,6 +46,7 @@ func init() {
4446
configure.Flags().BoolVarP(&ConfigPulsarTrackerReplicateState, "PulsarTrackerReplicateState", "", false, "the replicate state for the pg2pulsar, optional")
4547
configure.Flags().StringVarP(&ConfigDecodePlugin, "DecodePlugin", "", decode.PGOutputPlugin, "the logical decoding plugin name")
4648
configure.Flags().IntVarP(&ConfigBatchTXSize, "BatchTxSize", "", 100, "the max number of tx in a pipeline")
49+
configure.Flags().StringSliceVar(&ConfigTables, "Tables", nil, "tables to capture (e.g., public.users,public.orders); empty for all tables (pgoutput plugin only)")
4750
configure.MarkFlagRequired("AgentAddr")
4851
configure.MarkFlagRequired("AgentCommand")
4952
configure.MarkFlagRequired("PGConnURL")
@@ -68,6 +71,7 @@ var configure = &cobra.Command{
6871
"PulsarTrackerReplicateState": strconv.FormatBool(ConfigPulsarTrackerReplicateState),
6972
"DecodePlugin": ConfigDecodePlugin,
7073
"BatchTxSize": ConfigBatchTXSize,
74+
"Tables": strings.Join(ConfigTables, ","),
7175
})
7276
if err != nil {
7377
panic(err)

cmd/pg2pulsar.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ var (
1414
SinkPulsarURL string
1515
SinkPulsarTopic string
1616
DecodePlugin string
17+
SourceTables []string
1718
)
1819

1920
func init() {
@@ -23,6 +24,7 @@ func init() {
2324
pg2pulsar.Flags().StringVarP(&SinkPulsarURL, "PulsarURL", "", "", "connection url to sink pulsar cluster")
2425
pg2pulsar.Flags().StringVarP(&SinkPulsarTopic, "PulsarTopic", "", "", "the sink pulsar topic name and as well as the logical replication slot name")
2526
pg2pulsar.Flags().StringVar(&DecodePlugin, "DecodePlugin", decode.PGOutputPlugin, "the logical decoding plugin name")
27+
pg2pulsar.Flags().StringSliceVar(&SourceTables, "Tables", nil, "tables to capture (e.g., public.users,public.orders); empty for all tables (pgoutput plugin only)")
2628
pg2pulsar.MarkFlagRequired("PGConnURL")
2729
pg2pulsar.MarkFlagRequired("PGReplURL")
2830
pg2pulsar.MarkFlagRequired("PulsarURL")
@@ -33,7 +35,7 @@ var pg2pulsar = &cobra.Command{
3335
Use: "pg2pulsar",
3436
Short: "Capture logical replication logs to a Pulsar Topic from a PostgreSQL logical replication slot",
3537
RunE: func(cmd *cobra.Command, args []string) (err error) {
36-
pgSrc := &source.PGXSource{SetupConnStr: SourcePGConnURL, ReplConnStr: SourcePGReplURL, ReplSlot: trimSlot(SinkPulsarTopic), CreateSlot: true, CreatePublication: true, DecodePlugin: DecodePlugin}
38+
pgSrc := &source.PGXSource{SetupConnStr: SourcePGConnURL, ReplConnStr: SourcePGReplURL, ReplSlot: trimSlot(SinkPulsarTopic), CreateSlot: true, CreatePublication: true, DecodePlugin: DecodePlugin, Tables: source.ParseTableIdents(SourceTables...)}
3739
pulsarSink := &sink.PulsarSink{PulsarOption: pulsar.ClientOptions{URL: SinkPulsarURL}, PulsarTopic: SinkPulsarTopic}
3840
return sourceToSink(pgSrc, pulsarSink)
3941
},

pkg/source/postgres.go

Lines changed: 145 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"strings"
78
"sync/atomic"
89
"time"
910

@@ -28,6 +29,7 @@ type PGXSource struct {
2829
CreatePublication bool
2930
StartLSN string
3031
DecodePlugin string
32+
Tables []TableIdent // Tables to capture; empty means all tables
3133

3234
setupConn *pgx.Conn
3335
replConn *pgconn.PgConn
@@ -54,6 +56,7 @@ func (p *PGXSource) Capture(cp cursor.Checkpoint) (changes chan Change, err erro
5456
}()
5557

5658
ctx := context.Background()
59+
p.log = logrus.WithFields(logrus.Fields{"From": "PGXSource"})
5760
p.setupConn, err = pgx.Connect(ctx, p.SetupConnStr)
5861
if err != nil {
5962
return nil, err
@@ -77,11 +80,8 @@ func (p *PGXSource) Capture(cp cursor.Checkpoint) (changes chan Change, err erro
7780
case decode.PGOutputPlugin:
7881
p.decoder = decode.NewPGOutputDecoder(p.schema, p.ReplSlot)
7982
if p.CreatePublication {
80-
if _, err = p.setupConn.Exec(ctx, fmt.Sprintf(sql.CreatePublication, p.ReplSlot)); err != nil {
81-
var pge *pgconn.PgError
82-
if !errors.As(err, &pge) || pge.Code != "42710" {
83-
return nil, err
84-
}
83+
if err = p.ensurePublication(ctx); err != nil {
84+
return nil, err
8585
}
8686
}
8787
default:
@@ -115,7 +115,6 @@ func (p *PGXSource) Capture(cp cursor.Checkpoint) (changes chan Change, err erro
115115
return nil, err
116116
}
117117

118-
p.log = logrus.WithFields(logrus.Fields{"From": "PGXSource"})
119118
p.log.WithFields(logrus.Fields{
120119
"SystemID": ident.SystemID,
121120
"Timeline": ident.Timeline,
@@ -285,3 +284,143 @@ func (p *PGXSource) cleanup() {
285284
p.replConn.Close(ctx)
286285
}
287286
}
287+
288+
func (p *PGXSource) buildPublicationSQL() string {
289+
pubName := pgx.Identifier{p.ReplSlot}.Sanitize()
290+
if len(p.Tables) == 0 {
291+
return fmt.Sprintf(sql.CreatePublication, pubName)
292+
}
293+
return fmt.Sprintf(sql.CreatePublicationForTables, pubName, p.quoteTables())
294+
}
295+
296+
type TableIdent struct {
297+
Schema string
298+
Table string
299+
}
300+
301+
func (t TableIdent) Quoted() string {
302+
return pgx.Identifier{t.Schema, t.Table}.Sanitize()
303+
}
304+
305+
func ParseTableIdents(ss ...string) []TableIdent {
306+
result := make([]TableIdent, len(ss))
307+
for i, s := range ss {
308+
parts := strings.SplitN(s, ".", 2)
309+
if len(parts) == 2 {
310+
result[i] = TableIdent{Schema: parts[0], Table: parts[1]}
311+
} else {
312+
result[i] = TableIdent{Schema: "public", Table: s}
313+
}
314+
}
315+
return result
316+
}
317+
318+
func (p *PGXSource) quoteTables() string {
319+
quoted := make([]string, 0, len(p.Tables)+1)
320+
quoted = append(quoted, TableIdent{Schema: decode.ExtensionSchema, Table: decode.ExtensionDDLLogs}.Quoted())
321+
for _, t := range p.Tables {
322+
quoted = append(quoted, t.Quoted())
323+
}
324+
return strings.Join(quoted, ", ")
325+
}
326+
327+
type publicationAction int
328+
329+
const (
330+
publicationNoChange publicationAction = iota
331+
publicationCreate
332+
publicationRecreate
333+
publicationAlter
334+
)
335+
336+
func (a publicationAction) String() string {
337+
switch a {
338+
case publicationNoChange:
339+
return "no_change"
340+
case publicationCreate:
341+
return "create"
342+
case publicationRecreate:
343+
return "recreate"
344+
case publicationAlter:
345+
return "alter"
346+
default:
347+
return "unknown"
348+
}
349+
}
350+
351+
func (p *PGXSource) getPublicationAction(ctx context.Context) (publicationAction, error) {
352+
var existingAllTables bool
353+
err := p.setupConn.QueryRow(ctx, sql.QueryPublication, p.ReplSlot).Scan(&existingAllTables)
354+
if errors.Is(err, pgx.ErrNoRows) {
355+
return publicationCreate, nil
356+
} else if err != nil {
357+
return publicationNoChange, err
358+
}
359+
360+
wantAllTables := len(p.Tables) == 0
361+
if existingAllTables != wantAllTables {
362+
return publicationRecreate, nil
363+
}
364+
if wantAllTables {
365+
return publicationNoChange, nil
366+
}
367+
368+
// Compare table lists
369+
rows, err := p.setupConn.Query(ctx, sql.QueryPublicationTables, p.ReplSlot)
370+
if err != nil {
371+
return publicationNoChange, err
372+
}
373+
defer rows.Close()
374+
375+
existing := make(map[TableIdent]struct{})
376+
for rows.Next() {
377+
var t TableIdent
378+
if err := rows.Scan(&t.Schema, &t.Table); err != nil {
379+
return publicationNoChange, err
380+
}
381+
existing[t] = struct{}{}
382+
}
383+
if err := rows.Err(); err != nil {
384+
return publicationNoChange, err
385+
}
386+
387+
// +1 for pgcapture.ddl_logs
388+
if len(existing) != len(p.Tables)+1 {
389+
return publicationAlter, nil
390+
}
391+
if _, ok := existing[TableIdent{Schema: decode.ExtensionSchema, Table: decode.ExtensionDDLLogs}]; !ok {
392+
return publicationAlter, nil
393+
}
394+
for _, t := range p.Tables {
395+
if _, ok := existing[t]; !ok {
396+
return publicationAlter, nil
397+
}
398+
}
399+
return publicationNoChange, nil
400+
}
401+
402+
func (p *PGXSource) ensurePublication(ctx context.Context) error {
403+
pubName := pgx.Identifier{p.ReplSlot}.Sanitize()
404+
action, err := p.getPublicationAction(ctx)
405+
if err != nil {
406+
return err
407+
}
408+
409+
p.log.Infof("ensuring publication: %s", action)
410+
switch action {
411+
case publicationNoChange:
412+
// no-op
413+
case publicationCreate:
414+
_, err = p.setupConn.Exec(ctx, p.buildPublicationSQL())
415+
case publicationRecreate:
416+
if _, err = p.setupConn.Exec(ctx, fmt.Sprintf(sql.DropPublication, pubName)); err != nil {
417+
return err
418+
}
419+
_, err = p.setupConn.Exec(ctx, p.buildPublicationSQL())
420+
case publicationAlter:
421+
_, err = p.setupConn.Exec(ctx, fmt.Sprintf(sql.AlterPublicationSetTable, pubName, p.quoteTables()))
422+
default:
423+
err = fmt.Errorf("unhandled publication action: %s", action)
424+
}
425+
return err
426+
}

0 commit comments

Comments
 (0)