Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ type EtcdServer struct {
kv mvcc.WatchableKV
lessor lease.Lessor
bemu sync.RWMutex
defragMu sync.Mutex
be backend.Backend
beHooks *serverstorage.BackendHooks
authStore auth.AuthStore
Expand Down Expand Up @@ -975,8 +976,10 @@ func (s *EtcdServer) Cleanup() {
}

func (s *EtcdServer) Defragment() error {
s.bemu.Lock()
defer s.bemu.Unlock()
s.defragMu.Lock()
defer s.defragMu.Unlock()
s.bemu.RLock()
defer s.bemu.RUnlock()
return s.be.Defrag()
}

Expand Down
253 changes: 178 additions & 75 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package backend

import (
"errors"
"fmt"
"hash/crc32"
"io"
Expand All @@ -28,6 +29,7 @@ import (
"go.uber.org/zap"

bolt "go.etcd.io/bbolt"
bolterrors "go.etcd.io/bbolt/errors"
"go.etcd.io/etcd/client/pkg/v3/verify"
)

Expand Down Expand Up @@ -469,20 +471,6 @@ func (b *backend) defrag() error {
isDefragActive.Set(1)
defer isDefragActive.Set(0)

// TODO: make this non-blocking?
// lock batchTx to ensure nobody is using previous tx, and then
// close previous ongoing tx.
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()

// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()

// block concurrent read requests while resetting tx
b.readTx.Lock()
defer b.readTx.Unlock()

// Create a temporary file to ensure we start with a clean slate.
// Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.
dir := filepath.Dir(b.db.Path())
Expand All @@ -500,77 +488,156 @@ func (b *backend) defrag() error {
// return nil, fmt.Errorf(defragOpenFileError)
return temp, nil
}
// Don't load tmp db into memory regardless of opening options
options.Mlock = false
tdbp := temp.Name()
tmpdb, err := bolt.Open(tdbp, 0o600, &options)
if err != nil {
temp.Close()
if rmErr := os.Remove(temp.Name()); rmErr != nil {
b.lg.Error(
"failed to remove temporary file",
b.lg.Error("failed to remove temporary file",
zap.String("path", temp.Name()),
zap.Error(rmErr),
)
}

return err
}

dbp := b.db.Path()
size1, sizeInUse1 := b.Size(), b.SizeInUse()
b.lg.Info(
"defragmenting",
b.lg.Info("defragmenting",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes", size1),
zap.String("current-db-size", humanize.Bytes(uint64(size1))),
zap.Int64("current-db-size-in-use-bytes", sizeInUse1),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse1))),
)

defer func() {
// NOTE: We should exit as soon as possible because that tx
// might be closed. The inflight request might use invalid
// tx and then panic as well. The real panic reason might be
// shadowed by new panic. So, we should fatal here with lock.
if rerr := recover(); rerr != nil {
b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr))
}
}()
journal, snapTx, err := b.defragSetupSnapshot()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please improve the name?

  • snapshot already has a meaning in etcd
  • this only initializes the transaction, no? the real data is read in defragFromTx?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but I also don't have a better name

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mirror?

if err != nil {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we call defragCancelJournal for completeness if we fail here? Or in defer?

tmpdb.Close()
os.Remove(tdbp)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do the os.Remove error handling in a consistent way as above

return err
}

// Commit/stop and then reset current transactions (including the readTx)

@atiratree atiratree Jun 17, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this comment no longer apply?

b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil
b.lg.Info("defrag: copying data (writes unlocked)")

// gofail: var defragBeforeCopy struct{}
err = defragdb(b.db, tmpdb, defragLimit)
err = defragFromTx(snapTx, tmpdb, defragLimit)
snapTx.Rollback()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need the journal? Aren't the writes disabled up until this point?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we need the journal because we're defragging from a read snapshot transaction, so we need something to catch any writes so they can be replayed. This is what allows this to be "less blocking", before, we had to block writes while we copied the entire database; with this, we allow writes while we copy the database by saving the incoming writes and replaying them later.

The journal replay and the database switchover are the only times we need to stop both reads and writes, which we do at the same time.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what we may need however, is some sort of backpressure. We won't be able to buffer all stuff forever

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as in, have a hard size limit on the ops to buffer (new config arg) and then have the writelock block when going over the limit

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, trouble is, if a writer is holding the writelock because the journal is full, then we can't replay the journal and swap cause that's the "stop the world" time; since this refactor only tries to get the write lock when it's ready to swap.


if err != nil {
b.defragCancelJournal(journal)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so hold on, we're losing all writes so far? that's a disaster

that's definitely a place to put a failpoint into

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapTx is a read transaction, and writes during the defrag exist both in the old database and in the journal; so nothing should be lost.

tmpdb.Close()
if rmErr := os.RemoveAll(tmpdb.Path()); rmErr != nil {
b.lg.Error("failed to remove db.tmp after defragmentation completed", zap.Error(rmErr))
}

// restore the bbolt transactions if defragmentation fails
b.batchTx.tx = b.unsafeBegin(true)
b.readTx.tx = b.unsafeBegin(false)
Comment on lines -553 to -555

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what was the puerpose behind this restore?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

etcd does a "group commit", so it will always batch incoming writes for some time and then commit those together to improve throughput.

This is just making sure the transaction for that is clean and setup again for the new writes incoming after the mutex unlocked

os.RemoveAll(tdbp)
return err
}

b.lg.Info("defrag: replaying journal")
err = b.defragReplayAndSwap(journal, tmpdb, dbp, tdbp)
if err != nil {
return err
}

err = b.db.Close()
took := time.Since(now)
defragSec.Observe(took.Seconds())

size2, sizeInUse2 := b.Size(), b.SizeInUse()
b.lg.Info("finished defragmenting directory",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes-diff", size2-size1),
zap.Int64("current-db-size-bytes", size2),
zap.String("current-db-size", humanize.Bytes(uint64(size2))),
zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
zap.Duration("took", took),
)
return nil
}

// defragSetupSnapshot commits pending writes, takes a read-only
// snapshot, and installs a journal to capture writes during the copy.
func (b *backend) defragSetupSnapshot() (*defragJournal, *bolt.Tx, error) {
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()

b.batchTx.commit(false)

b.mu.RLock()
snapTx, err := b.db.Begin(false)
b.mu.RUnlock()
if err != nil {
return nil, nil, fmt.Errorf("failed to begin snapshot tx for defrag: %w", err)
}

journal := newDefragJournal()
b.batchTx.defragJournal = journal
return journal, snapTx, nil
}

// defragCancelJournal removes the journal from the batch transaction
// and closes it. Called when the copy phase fails.
func (b *backend) defragCancelJournal(journal *defragJournal) {
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()
b.batchTx.defragJournal = nil
journal.close()
}

// defragReplayAndSwap drains the journal, replays it into the temp
// database, then atomically swaps the old database for the new one.
// batchTx is held for the entire operation to prevent writes between
// journal drain and database swap.
func (b *backend) defragReplayAndSwap(journal *defragJournal, tmpdb *bolt.DB, dbp, tdbp string) error {
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()

b.batchTx.defragJournal = nil
journal.close()
ops := journal.drain()
Comment on lines +595 to +597

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we could use defragJournal instead of journal arg


if len(ops) > 0 {
b.lg.Info("defrag: replaying journal ops", zap.Int("count", len(ops)))
}

if err := replayJournal(tmpdb, ops, defragLimit); err != nil {
tmpdb.Close()
os.RemoveAll(tdbp)
return err
}

b.lg.Info("defrag: switching database")

b.mu.Lock()
defer b.mu.Unlock()
b.readTx.Lock()
defer b.readTx.Unlock()

// NOTE: We should exit as soon as possible because that tx
// might be closed. The inflight request might use invalid
// tx and then panic as well. The real panic reason might be
// shadowed by new panic. So, we should fatal here with lock.
defer func() {
if rerr := recover(); rerr != nil {
b.lg.Fatal("unexpected panic during defrag", zap.Any("panic", rerr))
}
}()

b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil

if err := b.db.Close(); err != nil {
b.lg.Fatal("failed to close database", zap.Error(err))
}
err = tmpdb.Close()
if err != nil {
if err := tmpdb.Close(); err != nil {
b.lg.Fatal("failed to close tmp database", zap.Error(err))
}
// gofail: var defragBeforeRename struct{}
err = os.Rename(tdbp, dbp)
if err != nil {
if err := os.Rename(tdbp, dbp); err != nil {
b.lg.Fatal("failed to rename tmp database", zap.Error(err))
}

var err error
b.db, err = bolt.Open(dbp, 0o600, b.bopts)
if err != nil {
b.lg.Fatal("failed to open database", zap.String("path", dbp), zap.Error(err))
Expand All @@ -585,29 +652,10 @@ func (b *backend) defrag() error {
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))

took := time.Since(now)
defragSec.Observe(took.Seconds())

size2, sizeInUse2 := b.Size(), b.SizeInUse()
b.lg.Info(
"finished defragmenting directory",
zap.String("path", dbp),
zap.Int64("current-db-size-bytes-diff", size2-size1),
zap.Int64("current-db-size-bytes", size2),
zap.String("current-db-size", humanize.Bytes(uint64(size2))),
zap.Int64("current-db-size-in-use-bytes-diff", sizeInUse2-sizeInUse1),
zap.Int64("current-db-size-in-use-bytes", sizeInUse2),
zap.String("current-db-size-in-use", humanize.Bytes(uint64(sizeInUse2))),
zap.Duration("took", took),
)
return nil
}

func defragdb(odb, tmpdb *bolt.DB, limit int) error {
// gofail: var defragdbFail string
// return fmt.Errorf(defragdbFail)
Comment on lines -607 to -608

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please bring those back again and maybe also think of adding a few more failure points where they make sense (eg along the locking paths).

Please also run the robustness tests at least once


// open a tx on tmpdb for writes
func defragFromTx(srcTx *bolt.Tx, tmpdb *bolt.DB, limit int) error {
tmptx, err := tmpdb.Begin(true)
if err != nil {
return err
Expand All @@ -618,18 +666,10 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
}
}()

// open a tx on old db for read
tx, err := odb.Begin(false)
if err != nil {
return err
}
defer tx.Rollback()

c := tx.Cursor()

c := srcTx.Cursor()
count := 0
for next, _ := c.First(); next != nil; next, _ = c.Next() {
b := tx.Bucket(next)
b := srcTx.Bucket(next)
if b == nil {
return fmt.Errorf("backend: cannot defrag bucket %s", next)
}
Expand Down Expand Up @@ -665,6 +705,69 @@ func defragdb(odb, tmpdb *bolt.DB, limit int) error {
return tmptx.Commit()
}

func replayJournal(tmpdb *bolt.DB, ops []defragJournalOp, limit int) error {
if len(ops) == 0 {
return nil
}

tx, err := tmpdb.Begin(true)
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
}
}()

count := 0
for _, op := range ops {
count++
if count > limit {
if err = tx.Commit(); err != nil {
return err
}
tx, err = tmpdb.Begin(true)
if err != nil {
return err
}
count = 0
}

switch op.opType {
case opCreateBucket:
if _, err = tx.CreateBucketIfNotExists(op.bucketName); err != nil {
return fmt.Errorf("replay: create bucket %s: %w", op.bucketName, err)
}
case opDeleteBucket:
if delErr := tx.DeleteBucket(op.bucketName); delErr != nil && !errors.Is(delErr, bolterrors.ErrBucketNotFound) {
return fmt.Errorf("replay: delete bucket %s: %w", op.bucketName, delErr)
}
case opPut:
b := tx.Bucket(op.bucketName)
if b == nil {
return fmt.Errorf("replay: bucket %s not found for put", op.bucketName)
}
if op.seq {
b.FillPercent = 0.9
}
Comment on lines +751 to +753

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh? what's that doing

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, it's a replica from UnsafePut:

	if seq {
		// it is useful to increase fill percent when the workloads are mostly append-only.
		// this can delay the page split and reduce space usage.
		bucket.FillPercent = 0.9
	}

I would skip this on the defrag path, I don't think this applies here.

if err = b.Put(op.key, op.value); err != nil {
return fmt.Errorf("replay: put in bucket %s: %w", op.bucketName, err)
}
case opDelete:
b := tx.Bucket(op.bucketName)
if b == nil {
continue
}
Comment on lines +759 to +761

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does that not error for deletes, but for puts?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and you may as well pull that out of the switch case, all operations are bucket-based

if err = b.Delete(op.key); err != nil {
return fmt.Errorf("replay: delete from bucket %s: %w", op.bucketName, err)
}
}
}

return tx.Commit()
}

func (b *backend) begin(write bool) *bolt.Tx {
b.mu.RLock()
tx := b.unsafeBegin(write)
Expand Down
Loading