-
Notifications
You must be signed in to change notification settings - Fork 31
CNTRLPLANE-3461: refactor defrag to minimize database lock time #378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,7 @@ | |
| package backend | ||
|
|
||
| import ( | ||
| "errors" | ||
| "fmt" | ||
| "hash/crc32" | ||
| "io" | ||
|
|
@@ -28,6 +29,7 @@ import ( | |
| "go.uber.org/zap" | ||
|
|
||
| bolt "go.etcd.io/bbolt" | ||
| bolterrors "go.etcd.io/bbolt/errors" | ||
| "go.etcd.io/etcd/client/pkg/v3/verify" | ||
| ) | ||
|
|
||
|
|
@@ -469,20 +471,6 @@ func (b *backend) defrag() error { | |
| isDefragActive.Set(1) | ||
| defer isDefragActive.Set(0) | ||
|
|
||
| // TODO: make this non-blocking? | ||
| // lock batchTx to ensure nobody is using previous tx, and then | ||
| // close previous ongoing tx. | ||
| b.batchTx.LockOutsideApply() | ||
| defer b.batchTx.Unlock() | ||
|
|
||
| // lock database after lock tx to avoid deadlock. | ||
| b.mu.Lock() | ||
| defer b.mu.Unlock() | ||
|
|
||
| // block concurrent read requests while resetting tx | ||
| b.readTx.Lock() | ||
| defer b.readTx.Unlock() | ||
|
|
||
| // Create a temporary file to ensure we start with a clean slate. | ||
| // Snapshotter.cleanupSnapdir cleans up any of these that are found during startup. | ||
| dir := filepath.Dir(b.db.Path()) | ||
|
|
@@ -500,77 +488,156 @@ func (b *backend) defrag() error { | |
| // return nil, fmt.Errorf(defragOpenFileError) | ||
| return temp, nil | ||
| } | ||
| // Don't load tmp db into memory regardless of opening options | ||
| options.Mlock = false | ||
| tdbp := temp.Name() | ||
| tmpdb, err := bolt.Open(tdbp, 0o600, &options) | ||
| if err != nil { | ||
| temp.Close() | ||
| if rmErr := os.Remove(temp.Name()); rmErr != nil { | ||
| b.lg.Error( | ||
| "failed to remove temporary file", | ||
| b.lg.Error("failed to remove temporary file", | ||
| zap.String("path", temp.Name()), | ||
| zap.Error(rmErr), | ||
| ) | ||
| } | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| dbp := b.db.Path() | ||
| size1, sizeInUse1 := b.Size(), b.SizeInUse() | ||
| b.lg.Info( | ||
| "defragmenting", | ||
| b.lg.Info("defragmenting", | ||
| zap.String("path", dbp), | ||
| zap.Int64("current-db-size-bytes", size1), | ||
| zap.String("current-db-size", humanize.Bytes(uint64(size1))), | ||
| zap.Int64("current-db-size-in-use-bytes", sizeInUse1), | ||
| zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))), | ||
| ) | ||
|
|
||
| defer func() { | ||
| // NOTE: We should exit as soon as possible because that tx | ||
| // might be closed. The inflight request might use invalid | ||
| // tx and then panic as well. The real panic reason might be | ||
| // shadowed by new panic. So, we should fatal here with lock. | ||
| if rerr := recover(); rerr != nil { | ||
| b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr)) | ||
| } | ||
| }() | ||
| journal, snapTx, err := b.defragSetupSnapshot() | ||
| if err != nil { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we call |
||
| tmpdb.Close() | ||
| os.Remove(tdbp) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's do the |
||
| return err | ||
| } | ||
|
|
||
| // Commit/stop and then reset current transactions (including the readTx) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does this comment no longer apply? |
||
| b.batchTx.unsafeCommit(true) | ||
| b.batchTx.tx = nil | ||
| b.lg.Info("defrag: copying data (writes unlocked)") | ||
|
|
||
| // gofail: var defragBeforeCopy struct{} | ||
| err = defragdb(b.db, tmpdb, defragLimit) | ||
| err = defragFromTx(snapTx, tmpdb, defragLimit) | ||
| snapTx.Rollback() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need the journal? Aren't the writes disabled up until this point?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, we need the journal because we're defragging from a read snapshot transaction, so we need something to catch any writes so they can be replayed. This is what allows this to be "less blocking", before, we had to block writes while we copied the entire database; with this, we allow writes while we copy the database by saving the incoming writes and replaying them later. The journal replay and the database switchover are the only times we need to stop both reads and writes, which we do at the same time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what we may need however, is some sort of backpressure. We won't be able to buffer all stuff forever There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as in, have a hard size limit on the ops to buffer (new config arg) and then have the writelock block when going over the limit
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, trouble is, if a writer is holding the writelock because the journal is full, then we can't replay the journal and swap cause that's the "stop the world" time; since this refactor only tries to get the write lock when it's ready to swap. |
||
|
|
||
| if err != nil { | ||
| b.defragCancelJournal(journal) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so hold on, we're losing all writes so far? that's a disaster that's definitely a place to put a failpoint into
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The snapTx is a read transaction, and writes during the defrag exist both in the old database and in the journal; so nothing should be lost. |
||
| tmpdb.Close() | ||
| if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil { | ||
| b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr)) | ||
| } | ||
|
|
||
| // restore the bbolt transactions if defragmentation fails | ||
| b.batchTx.tx = b.unsafeBegin(true) | ||
| b.readTx.tx = b.unsafeBegin(false) | ||
|
Comment on lines
-553
to
-555
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what was the puerpose behind this restore? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. etcd does a "group commit", so it will always batch incoming writes for some time and then commit those together to improve throughput. This is just making sure the transaction for that is clean and setup again for the new writes incoming after the mutex unlocked |
||
| os.RemoveAll(tdbp) | ||
| return err | ||
| } | ||
|
|
||
| b.lg.Info("defrag: replaying journal") | ||
| err = b.defragReplayAndSwap(journal, tmpdb, dbp, tdbp) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = b.db.Close() | ||
| took := time.Since(now) | ||
| defragSec.Observe(took.Seconds()) | ||
|
|
||
| size2, sizeInUse2 := b.Size(), b.SizeInUse() | ||
| b.lg.Info("finished defragmenting directory", | ||
| zap.String("path", dbp), | ||
| zap.Int64("current-db-size-bytes-diff", size2-size1), | ||
| zap.Int64("current-db-size-bytes", size2), | ||
| zap.String("current-db-size", humanize.Bytes(uint64(size2))), | ||
| zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1), | ||
| zap.Int64("current-db-size-in-use-bytes", sizeInUse2), | ||
| zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))), | ||
| zap.Duration("took", took), | ||
| ) | ||
| return nil | ||
| } | ||
|
|
||
| // defragSetupSnapshot commits pending writes, takes a read-only | ||
| // snapshot, and installs a journal to capture writes during the copy. | ||
| func (b *backend) defragSetupSnapshot() (*defragJournal, *bolt.Tx, error) { | ||
| b.batchTx.LockOutsideApply() | ||
| defer b.batchTx.Unlock() | ||
|
|
||
| b.batchTx.commit(false) | ||
|
|
||
| b.mu.RLock() | ||
| snapTx, err := b.db.Begin(false) | ||
| b.mu.RUnlock() | ||
| if err != nil { | ||
| return nil, nil, fmt.Errorf("failed to begin snapshot tx for defrag: %w", err) | ||
| } | ||
|
|
||
| journal := newDefragJournal() | ||
| b.batchTx.defragJournal = journal | ||
| return journal, snapTx, nil | ||
| } | ||
|
|
||
| // defragCancelJournal removes the journal from the batch transaction | ||
| // and closes it. Called when the copy phase fails. | ||
| func (b *backend) defragCancelJournal(journal *defragJournal) { | ||
| b.batchTx.LockOutsideApply() | ||
| defer b.batchTx.Unlock() | ||
| b.batchTx.defragJournal = nil | ||
| journal.close() | ||
| } | ||
|
|
||
| // defragReplayAndSwap drains the journal, replays it into the temp | ||
| // database, then atomically swaps the old database for the new one. | ||
| // batchTx is held for the entire operation to prevent writes between | ||
| // journal drain and database swap. | ||
| func (b *backend) defragReplayAndSwap(journal *defragJournal, tmpdb *bolt.DB, dbp, tdbp string) error { | ||
| b.batchTx.LockOutsideApply() | ||
| defer b.batchTx.Unlock() | ||
|
|
||
| b.batchTx.defragJournal = nil | ||
| journal.close() | ||
| ops := journal.drain() | ||
|
Comment on lines
+595
to
+597
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: we could use defragJournal instead of journal arg |
||
|
|
||
| if len(ops) > 0 { | ||
| b.lg.Info("defrag: replaying journal ops", zap.Int("count", len(ops))) | ||
| } | ||
|
|
||
| if err := replayJournal(tmpdb, ops, defragLimit); err != nil { | ||
| tmpdb.Close() | ||
| os.RemoveAll(tdbp) | ||
| return err | ||
| } | ||
|
|
||
| b.lg.Info("defrag: switching database") | ||
|
|
||
| b.mu.Lock() | ||
| defer b.mu.Unlock() | ||
| b.readTx.Lock() | ||
| defer b.readTx.Unlock() | ||
|
|
||
| // NOTE: We should exit as soon as possible because that tx | ||
| // might be closed. The inflight request might use invalid | ||
| // tx and then panic as well. The real panic reason might be | ||
| // shadowed by new panic. So, we should fatal here with lock. | ||
| defer func() { | ||
| if rerr := recover(); rerr != nil { | ||
| b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr)) | ||
| } | ||
| }() | ||
|
|
||
| b.batchTx.unsafeCommit(true) | ||
| b.batchTx.tx = nil | ||
|
|
||
| if err := b.db.Close(); err != nil { | ||
| b.lg.Fatal("failed to close database", zap.Error(err)) | ||
| } | ||
| err = tmpdb.Close() | ||
| if err != nil { | ||
| if err := tmpdb.Close(); err != nil { | ||
| b.lg.Fatal("failed to close tmp database", zap.Error(err)) | ||
| } | ||
| // gofail: var defragBeforeRename struct{} | ||
| err = os.Rename(tdbp, dbp) | ||
| if err != nil { | ||
| if err := os.Rename(tdbp, dbp); err != nil { | ||
| b.lg.Fatal("failed to rename tmp database", zap.Error(err)) | ||
| } | ||
|
|
||
| var err error | ||
| b.db, err = bolt.Open(dbp, 0o600, b.bopts) | ||
| if err != nil { | ||
| b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err)) | ||
|
|
@@ -585,29 +652,10 @@ func (b *backend) defrag() error { | |
| atomic.StoreInt64(&b.size, size) | ||
| atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize))) | ||
|
|
||
| took := time.Since(now) | ||
| defragSec.Observe(took.Seconds()) | ||
|
|
||
| size2, sizeInUse2 := b.Size(), b.SizeInUse() | ||
| b.lg.Info( | ||
| "finished defragmenting directory", | ||
| zap.String("path", dbp), | ||
| zap.Int64("current-db-size-bytes-diff", size2-size1), | ||
| zap.Int64("current-db-size-bytes", size2), | ||
| zap.String("current-db-size", humanize.Bytes(uint64(size2))), | ||
| zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1), | ||
| zap.Int64("current-db-size-in-use-bytes", sizeInUse2), | ||
| zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))), | ||
| zap.Duration("took", took), | ||
| ) | ||
| return nil | ||
| } | ||
|
|
||
| func defragdb(odb, tmpdb *bolt.DB, limit int) error { | ||
| // gofail: var defragdbFail string | ||
| // return fmt.Errorf(defragdbFail) | ||
|
Comment on lines
-607
to
-608
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please bring those back again and maybe also think of adding a few more failure points where they make sense (eg along the locking paths). Please also run the robustness tests at least once |
||
|
|
||
| // open a tx on tmpdb for writes | ||
| func defragFromTx(srcTx *bolt.Tx, tmpdb *bolt.DB, limit int) error { | ||
| tmptx, err := tmpdb.Begin(true) | ||
| if err != nil { | ||
| return err | ||
|
|
@@ -618,18 +666,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { | |
| } | ||
| }() | ||
|
|
||
| // open a tx on old db for read | ||
| tx, err := odb.Begin(false) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer tx.Rollback() | ||
|
|
||
| c := tx.Cursor() | ||
|
|
||
| c := srcTx.Cursor() | ||
| count := 0 | ||
| for next, _ := c.First(); next != nil; next, _ = c.Next() { | ||
| b := tx.Bucket(next) | ||
| b := srcTx.Bucket(next) | ||
| if b == nil { | ||
| return fmt.Errorf("backend: cannot defrag bucket %s", next) | ||
| } | ||
|
|
@@ -665,6 +705,69 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error { | |
| return tmptx.Commit() | ||
| } | ||
|
|
||
| func replayJournal(tmpdb *bolt.DB, ops []defragJournalOp, limit int) error { | ||
| if len(ops) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| tx, err := tmpdb.Begin(true) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer func() { | ||
| if err != nil { | ||
| tx.Rollback() | ||
| } | ||
| }() | ||
|
|
||
| count := 0 | ||
| for _, op := range ops { | ||
| count++ | ||
| if count > limit { | ||
| if err = tx.Commit(); err != nil { | ||
| return err | ||
| } | ||
| tx, err = tmpdb.Begin(true) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| count = 0 | ||
| } | ||
|
|
||
| switch op.opType { | ||
| case opCreateBucket: | ||
| if _, err = tx.CreateBucketIfNotExists(op.bucketName); err != nil { | ||
| return fmt.Errorf("replay: create bucket %s: %w", op.bucketName, err) | ||
| } | ||
| case opDeleteBucket: | ||
| if delErr := tx.DeleteBucket(op.bucketName); delErr != nil && !errors.Is(delErr, bolterrors.ErrBucketNotFound) { | ||
| return fmt.Errorf("replay: delete bucket %s: %w", op.bucketName, delErr) | ||
| } | ||
| case opPut: | ||
| b := tx.Bucket(op.bucketName) | ||
| if b == nil { | ||
| return fmt.Errorf("replay: bucket %s not found for put", op.bucketName) | ||
| } | ||
| if op.seq { | ||
| b.FillPercent = 0.9 | ||
| } | ||
|
Comment on lines
+751
to
+753
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. huh? what's that doing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got it, it's a replica from UnsafePut: I would skip this on the defrag path, I don't think this applies here. |
||
| if err = b.Put(op.key, op.value); err != nil { | ||
| return fmt.Errorf("replay: put in bucket %s: %w", op.bucketName, err) | ||
| } | ||
| case opDelete: | ||
| b := tx.Bucket(op.bucketName) | ||
| if b == nil { | ||
| continue | ||
| } | ||
|
Comment on lines
+759
to
+761
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why does that not error for deletes, but for puts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and you may as well pull that out of the switch case, all operations are bucket-based |
||
| if err = b.Delete(op.key); err != nil { | ||
| return fmt.Errorf("replay: delete from bucket %s: %w", op.bucketName, err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return tx.Commit() | ||
| } | ||
|
|
||
| func (b *backend) begin(write bool) *bolt.Tx { | ||
| b.mu.RLock() | ||
| tx := b.unsafeBegin(write) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please improve the name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, but I also don't have a better name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mirror?