Skip to content

Commit 3a4e37a

Browse files
aoiasdclaude
andauthored
fix: bind file resource refCnt to collection lifecycle to prevent panic (#48893)
relate: #48612 ## Summary - Increment `fileResourceRefCnt` during `validateSchema` instead of in the async ack callback's `AddCollection`, closing the TOCTOU race where `RemoveFileResource` could delete a resource between validation and `AddCollection` - On failure before `Broadcast`, refCnt is decremented immediately; on restart, refCnt for pending broadcast tasks is recovered from etcd before rootcoord becomes Healthy - Remove refCnt++ from `addCollectionMeta` since it's now done at validation time (reload path unchanged) --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f531b40 commit 3a4e37a

10 files changed

Lines changed: 278 additions & 6 deletions

File tree

internal/mocks/streamingcoord/server/mock_broadcaster/mock_Broadcaster.go

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/rootcoord/create_collection_task.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,19 @@ type createCollectionTask struct {
5151
header *message.CreateCollectionMessageHeader
5252
body *message.CreateCollectionRequest
5353
preserveFieldID bool
54+
55+
// heldFileResourceIds tracks file resources whose refCnt was incremented
56+
// during validation, to be released if the task fails before Broadcast.
57+
heldFileResourceIds []int64
58+
}
59+
60+
// releaseFileResources decrements refCnt for file resources that were
61+
// incremented during validation. Called when the task fails before Broadcast.
62+
func (t *createCollectionTask) releaseFileResources() {
63+
if len(t.heldFileResourceIds) > 0 {
64+
t.meta.DecFileResourceRefCnt(t.heldFileResourceIds)
65+
t.heldFileResourceIds = nil
66+
}
5467
}
5568

5669
func (t *createCollectionTask) validate(ctx context.Context) error {
@@ -246,6 +259,15 @@ func (t *createCollectionTask) validateSchema(ctx context.Context, schema *schem
246259
return err
247260
}
248261
schema.FileResourceIds = resp.GetResourceIds()
262+
263+
// Bind file resources to collection lifecycle: refCnt++ now, refCnt-- on
264+
// drop. Under ddLock, atomic with RemoveFileResource. See #48612.
265+
if len(schema.FileResourceIds) > 0 {
266+
if err := t.meta.IncFileResourceRefCnt(schema.FileResourceIds); err != nil {
267+
return err
268+
}
269+
t.heldFileResourceIds = schema.FileResourceIds
270+
}
249271
}
250272

251273
return validateFieldDataType(schema.GetFields())

internal/rootcoord/create_collection_task_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,6 +1468,9 @@ func Test_createCollectionTask_prepareSchema(t *testing.T) {
14681468
ResourceIds: []int64{1, 2, 3},
14691469
}, nil)
14701470

1471+
meta := mockrootcoord.NewIMetaTable(t)
1472+
meta.EXPECT().IncFileResourceRefCnt(mock.Anything).Return(nil)
1473+
14711474
collectionName := funcutil.GenRandomStr()
14721475
field1 := funcutil.GenRandomStr()
14731476
field2 := funcutil.GenRandomStr()
@@ -1498,7 +1501,7 @@ func Test_createCollectionTask_prepareSchema(t *testing.T) {
14981501
assert.NoError(t, err)
14991502

15001503
task := createCollectionTask{
1501-
Core: newTestCore(withMixCoord(mixcoord)),
1504+
Core: newTestCore(withMixCoord(mixcoord), withMeta(meta)),
15021505
Req: &milvuspb.CreateCollectionRequest{
15031506
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
15041507
CollectionName: collectionName,
@@ -1522,6 +1525,9 @@ func Test_createCollectionTask_prepareSchema(t *testing.T) {
15221525
ResourceIds: []int64{1, 2, 3},
15231526
}, nil)
15241527

1528+
meta := mockrootcoord.NewIMetaTable(t)
1529+
meta.EXPECT().IncFileResourceRefCnt(mock.Anything).Return(nil)
1530+
15251531
collectionName := funcutil.GenRandomStr()
15261532
field1 := funcutil.GenRandomStr()
15271533
field2 := funcutil.GenRandomStr()
@@ -1552,7 +1558,7 @@ func Test_createCollectionTask_prepareSchema(t *testing.T) {
15521558
assert.NoError(t, err)
15531559

15541560
task := createCollectionTask{
1555-
Core: newTestCore(withMixCoord(mixcoord)),
1561+
Core: newTestCore(withMixCoord(mixcoord), withMeta(meta)),
15561562
Req: &milvuspb.CreateCollectionRequest{
15571563
Base: &commonpb.MsgBase{MsgType: commonpb.MsgType_CreateCollection},
15581564
CollectionName: collectionName,

internal/rootcoord/ddl_callbacks_create_collection.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func (c *Core) broadcastCreateCollectionV1(ctx context.Context, req *milvuspb.Cr
8080
preserveFieldID: preserveFieldID == "true",
8181
}
8282
if err := createCollectionTask.Prepare(ctx); err != nil {
83+
createCollectionTask.releaseFileResources()
8384
return err
8485
}
8586

@@ -95,6 +96,9 @@ func (c *Core) broadcastCreateCollectionV1(ctx context.Context, req *milvuspb.Cr
9596
WithBroadcast(broadcastChannel).
9697
MustBuildBroadcast()
9798
if _, err := broadcaster.Broadcast(ctx, msg); err != nil {
99+
// Do NOT release file resources here: the broadcast task is already in the
100+
// scheduler and will retry until success. refCnt will be decremented when
101+
// the collection is eventually dropped.
98102
return err
99103
}
100104
return nil

internal/rootcoord/meta_table.go

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,9 @@ type IMetaTable interface {
152152
AddFileResource(ctx context.Context, resource *internalpb.FileResourceInfo) error
153153
RemoveFileResource(ctx context.Context, name string) (error, bool)
154154
ListFileResource(ctx context.Context) ([]*internalpb.FileResourceInfo, uint64)
155+
IncFileResourceRefCnt(ids []int64) error
156+
DecFileResourceRefCnt(ids []int64)
157+
RecoverFileResourceRefCnt(pendingCollections map[int64][]int64)
155158
}
156159

157160
// MetaTable is a persistent meta set of all databases, collections and partitions.
@@ -557,9 +560,6 @@ func (mt *MetaTable) AddCollection(ctx context.Context, coll *model.Collection)
557560

558561
mt.collID2Meta[coll.CollectionID] = coll.Clone()
559562
mt.names.insert(coll.DBName, coll.Name, coll.CollectionID)
560-
for _, fileResourceID := range coll.FileResourceIds {
561-
mt.fileResourceRefCnt[fileResourceID]++
562-
}
563563

564564
pn := coll.GetPartitionNum(true)
565565
mt.generalCnt += pn * int(coll.ShardsNum)
@@ -598,7 +598,12 @@ func (mt *MetaTable) DropCollection(ctx context.Context, collectionID UniqueID,
598598
}
599599
mt.collID2Meta[collectionID] = clone
600600
for _, fileResourceID := range coll.FileResourceIds {
601-
mt.fileResourceRefCnt[fileResourceID]--
601+
if mt.fileResourceRefCnt[fileResourceID] > 0 {
602+
mt.fileResourceRefCnt[fileResourceID]--
603+
} else {
604+
log.Warn("DropCollection: file resource refCnt underflow",
605+
zap.Int64("collectionID", collectionID), zap.Int64("fileResourceID", fileResourceID))
606+
}
602607
}
603608

604609
log.Ctx(ctx).Info("update coll state to dropping",
@@ -2326,3 +2331,55 @@ func (mt *MetaTable) ListFileResource(ctx context.Context) ([]*internalpb.FileRe
23262331

23272332
return lo.Values(mt.fileResourceID2Meta), mt.fileResourceVersion
23282333
}
2334+
2335+
// IncFileResourceRefCnt increments refCnt for file resources, binding them to a
2336+
// collection being created. Under ddLock, atomic with RemoveFileResource.
2337+
// Returns error if any resource ID does not exist.
2338+
func (mt *MetaTable) IncFileResourceRefCnt(ids []int64) error {
2339+
mt.ddLock.Lock()
2340+
defer mt.ddLock.Unlock()
2341+
for _, id := range ids {
2342+
if _, ok := mt.fileResourceID2Meta[id]; !ok {
2343+
return merr.WrapErrParameterInvalidMsg("file resource %d not found", id)
2344+
}
2345+
}
2346+
for _, id := range ids {
2347+
mt.fileResourceRefCnt[id]++
2348+
}
2349+
return nil
2350+
}
2351+
2352+
// DecFileResourceRefCnt decrements refCnt. Used for early release when
2353+
// CreateCollection fails after validation.
2354+
func (mt *MetaTable) DecFileResourceRefCnt(ids []int64) {
2355+
mt.ddLock.Lock()
2356+
defer mt.ddLock.Unlock()
2357+
for _, id := range ids {
2358+
if mt.fileResourceRefCnt[id] > 0 {
2359+
mt.fileResourceRefCnt[id]--
2360+
} else {
2361+
log.Warn("DecFileResourceRefCnt underflow", zap.Int64("id", id))
2362+
}
2363+
}
2364+
}
2365+
2366+
// RecoverFileResourceRefCnt re-increments refCnt for file resources referenced by
2367+
// pending CreateCollection broadcast tasks whose collections have not yet been
2368+
// persisted. Called during startup before rootcoord becomes Healthy.
2369+
func (mt *MetaTable) RecoverFileResourceRefCnt(pendingCollections map[int64][]int64) {
2370+
mt.ddLock.Lock()
2371+
defer mt.ddLock.Unlock()
2372+
for collID, resourceIds := range pendingCollections {
2373+
if _, exists := mt.collID2Meta[collID]; exists {
2374+
continue // collection already persisted, reload already counted it
2375+
}
2376+
for _, id := range resourceIds {
2377+
if _, ok := mt.fileResourceID2Meta[id]; ok {
2378+
mt.fileResourceRefCnt[id]++
2379+
} else {
2380+
log.Warn("RecoverFileResourceRefCnt: pending task references missing file resource",
2381+
zap.Int64("collectionID", collID), zap.Int64("resourceID", id))
2382+
}
2383+
}
2384+
}
2385+
}

internal/rootcoord/mocks/meta_table.go

Lines changed: 112 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/rootcoord/root_coord.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/milvus-io/milvus/internal/rootcoord/tombstone"
4848
"github.com/milvus-io/milvus/internal/storage"
4949
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
50+
"github.com/milvus-io/milvus/internal/streamingcoord/server/broadcaster/broadcast"
5051
tso2 "github.com/milvus-io/milvus/internal/tso"
5152
"github.com/milvus-io/milvus/internal/types"
5253
"github.com/milvus-io/milvus/internal/util/dependency"
@@ -560,6 +561,14 @@ func (c *Core) Init() error {
560561

561562
c.initOnce.Do(func() {
562563
initError = c.initInternal()
564+
// Recover file resource refCnt for pending CreateCollection broadcast tasks
565+
// before registering DDL callbacks, so ack callbacks won't race with recovery.
566+
// See #48612.
567+
pending := broadcast.GetPendingCreateCollectionResources()
568+
if len(pending) > 0 {
569+
c.meta.RecoverFileResourceRefCnt(pending)
570+
log.Info("recovered file resource refCnt from pending broadcast tasks", zap.Int("count", len(pending)))
571+
}
563572
RegisterDDLCallbacks(c)
564573
})
565574
log.Info("RootCoord init successfully")

internal/streamingcoord/server/broadcaster/broadcast/singleton.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,15 @@ func StartBroadcastWithSecondaryClusterResourceKey(ctx context.Context) (broadca
6262
return broadcaster.WithSecondaryClusterResourceKey(ctx)
6363
}
6464

65+
// GetPendingCreateCollectionResources returns pending CreateCollection file resource
66+
// IDs from the broadcaster. Must be called after Register.
67+
func GetPendingCreateCollectionResources() map[int64][]int64 {
68+
if !singleton.Ready() {
69+
return nil
70+
}
71+
return singleton.Get().GetPendingCreateCollectionResources()
72+
}
73+
6574
// Release releases the broadcaster.
6675
func Release() {
6776
if !singleton.Ready() {

0 commit comments

Comments
 (0)