Skip to content

Commit 8f7dafe

Browse files
authored
Add round robin segment assignment strategies (#18270)
1 parent 78e38eb commit 8f7dafe

9 files changed

Lines changed: 580 additions & 17 deletions

File tree

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,13 +133,20 @@ public static List<String> assignSegmentWithReplicaGroup(Map<String, Map<String,
133133
}
134134

135135
// Mirror the assignment to all replica-groups
136+
return getOneInstanceFromEachReplicaGroup(instancePartitions, partitionId, instanceIdWithLeastSegmentsAssigned);
137+
}
138+
139+
public static List<String> getOneInstanceFromEachReplicaGroup(InstancePartitions instancePartitions,
140+
int partitionId, int instanceId) {
136141
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
137-
List<String> instancesAssigned = new ArrayList<>(numReplicaGroups);
142+
int numInstancesPerReplicaGroup = instancePartitions.getInstances(partitionId, 0).size();
143+
instanceId %= numInstancesPerReplicaGroup;
144+
List<String> instances = new ArrayList<>(numReplicaGroups);
138145
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups; replicaGroupId++) {
139-
instancesAssigned.add(
140-
instancePartitions.getInstances(partitionId, replicaGroupId).get(instanceIdWithLeastSegmentsAssigned));
146+
List<String> instancesInReplicaGroup = instancePartitions.getInstances(partitionId, replicaGroupId);
147+
instances.add(instancesInReplicaGroup.get(instanceId));
141148
}
142-
return instancesAssigned;
149+
return instances;
143150
}
144151

145152
/// Rebalances the table with non-replica-group based segment assignment strategy by uniformly spraying segment

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/BalancedNumSegmentAssignmentStrategy.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,14 @@
3939
public class BalancedNumSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
4040
private static final Logger LOGGER = LoggerFactory.getLogger(BalancedNumSegmentAssignmentStrategy.class);
4141

42-
private int _replication;
42+
protected int _replication;
4343

4444
@Override
4545
public void init(HelixManager helixManager, TableConfig tableConfig) {
4646
SegmentsValidationAndRetentionConfig validationAndRetentionConfig = tableConfig.getValidationConfig();
4747
Preconditions.checkState(validationAndRetentionConfig != null, "segmentsConfig is null");
4848
_replication = tableConfig.getReplication();
49-
LOGGER.info("Initialized BalancedNumSegmentAssignmentStrategy for table: {} with replication: {}",
49+
LOGGER.info("Initialized {} for table: {} with replication: {}", this.getClass().getSimpleName(),
5050
tableConfig.getTableName(), _replication);
5151
}
5252

@@ -66,7 +66,7 @@ public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String,
6666
return SegmentAssignmentUtils.rebalanceNonReplicaGroupBasedTable(currentAssignment, instances, _replication);
6767
}
6868

69-
private void validateSegmentAssignmentStrategy(InstancePartitions instancePartitions) {
69+
protected void validateSegmentAssignmentStrategy(InstancePartitions instancePartitions) {
7070
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
7171
int numPartitions = instancePartitions.getNumPartitions();
7272
// Non-replica-group based assignment should have numReplicaGroups and numPartitions = 1

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/ReplicaGroupSegmentAssignmentStrategy.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@
3737
import org.slf4j.LoggerFactory;
3838

3939

40-
class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
40+
public class ReplicaGroupSegmentAssignmentStrategy implements SegmentAssignmentStrategy {
4141
private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaGroupSegmentAssignmentStrategy.class);
4242

43-
private HelixManager _helixManager;
44-
private String _tableName;
45-
private String _partitionColumn;
46-
private int _replication;
43+
protected HelixManager _helixManager;
44+
protected String _tableName;
45+
protected String _partitionColumn;
46+
protected int _replication;
4747

4848
@Override
4949
public void init(HelixManager helixManager, TableConfig tableConfig) {
@@ -54,16 +54,19 @@ public void init(HelixManager helixManager, TableConfig tableConfig) {
5454
_replication = tableConfig.getReplication();
5555
_partitionColumn = TableConfigUtils.getPartitionColumn(tableConfig);
5656
if (_partitionColumn == null) {
57-
LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy "
58-
+ "with replication: {} without partition column for table: {} ", _replication, _tableName);
57+
LOGGER.info("Initialized {} "
58+
+ "with replication: {} without partition column for table: {} ", this.getClass().getSimpleName(),
59+
_replication, _tableName);
5960
} else {
60-
LOGGER.info("Initialized ReplicaGroupSegmentAssignmentStrategy "
61-
+ "with replication: {} and partition column: {} for table: {}", _replication, _partitionColumn, _tableName);
61+
LOGGER.info("Initialized {} "
62+
+ "with replication: {} and partition column: {} for table: {}", this.getClass().getSimpleName(),
63+
_replication, _partitionColumn, _tableName);
6264
}
6365
}
6466

6567
/**
6668
* Assigns the segment for the replica-group based segment assignment strategy and returns the assigned instances.
69+
* Assign to the instance with the least number of segments for each replica-group.
6770
*/
6871
@Override
6972
public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
@@ -128,7 +131,7 @@ public Map<String, Map<String, String>> reassignSegments(Map<String, Map<String,
128131
* mismatch can happen when table is not configured correctly (table replication and numReplicaGroups does not match
129132
* or replication changed without reassigning instances).
130133
*/
131-
private static void checkReplication(InstancePartitions instancePartitions, int replication, String tableName) {
134+
protected static void checkReplication(InstancePartitions instancePartitions, int replication, String tableName) {
132135
int numReplicaGroups = instancePartitions.getNumReplicaGroups();
133136
if (numReplicaGroups != replication) {
134137
LOGGER.warn(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.Random;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import org.apache.commons.lang3.tuple.Pair;
26+
import org.apache.pinot.common.assignment.InstancePartitions;
27+
import org.apache.pinot.common.utils.SegmentUtils;
28+
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
29+
30+
31+
/**
32+
* Segment assignment strategy class where segments are assigned in a round-robin fashion across available instances.
33+
* It can be used in the same scenarios as {@link ReplicaGroupSegmentAssignmentStrategy}, except the new segment will
34+
* not be assigned to the instance with the least number of segments, but instead assigned to the next instance in a
35+
* round-robin fashion within each replica-group. Rebalance with this strategy is identical to
36+
* {@link ReplicaGroupSegmentAssignmentStrategy}.
37+
*
38+
* <p>When the number of instances in each replica group increases, this is useful when we don't care about the
39+
* existing segments as they might be removed later by retention, or there are significantly unbalanced demands on
40+
* newer segments. This strategy works better than {@link ReplicaGroupSegmentAssignmentStrategy} since that strategy
41+
* may require a bootstrap rebalance in this case.
42+
*
43+
* <p>The round-robin counter is maintained per table and partition in memory and does not sync across controller
44+
* instances, which means segment assignment may not be strictly round-robin after controller restarts or when
45+
* multiple controllers assign segments concurrently.
46+
*
47+
*/
48+
public class RoundRobinReplicaGroupSegmentAssignmentStrategy extends ReplicaGroupSegmentAssignmentStrategy {
49+
private static final Map<Pair<String, Integer>, Integer> TABLE_ROUND_ROBIN_COUNTER = new ConcurrentHashMap<>();
50+
private static final Random RANDOM = new Random();
51+
52+
/**
53+
* Assign the segment to the replica groups of its partition. The instance in each replica group is picked in a
54+
* round-robin manner, and the counter increases regardless whether the segment is actually added to the ideal
55+
* state or not.
56+
*/
57+
@Override
58+
public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
59+
InstancePartitions instancePartitions) {
60+
checkReplication(instancePartitions, _replication, _tableName);
61+
int numPartitions = instancePartitions.getNumPartitions();
62+
int partitionId;
63+
if (numPartitions == 1) {
64+
partitionId = 0;
65+
} else {
66+
partitionId =
67+
SegmentUtils.getSegmentPartitionIdOrDefault(segmentName, _tableName, _helixManager, _partitionColumn)
68+
% numPartitions;
69+
}
70+
int numInstancesPerReplicaGroup = instancePartitions.getInstances(partitionId, 0).size();
71+
72+
Pair<String, Integer> counterKey = Pair.of(_tableName, partitionId);
73+
int instanceId = TABLE_ROUND_ROBIN_COUNTER.compute(counterKey,
74+
(key, value) -> value == null ? RANDOM.nextInt(numInstancesPerReplicaGroup)
75+
: (value + 1) % numInstancesPerReplicaGroup);
76+
return SegmentAssignmentUtils.getOneInstanceFromEachReplicaGroup(instancePartitions, partitionId, instanceId);
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.controller.helix.core.assignment.segment.strategy;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Random;
25+
import java.util.concurrent.ConcurrentHashMap;
26+
import org.apache.helix.HelixManager;
27+
import org.apache.pinot.common.assignment.InstancePartitions;
28+
import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
29+
import org.apache.pinot.spi.config.table.TableConfig;
30+
31+
32+
/**
33+
* Segment assignment strategy class where segments are assigned in a round-robin fashion across available instances.
34+
* It can be used in the same scenarios as {@link BalancedNumSegmentAssignmentStrategy}, except the new segment will
35+
* not be assigned to the instance with the least number of segments, but instead assigned to the next instance in a
36+
* round-robin fashion. Rebalance with this strategy is identical to {@link BalancedNumSegmentAssignmentStrategy}.
37+
*
38+
* <p>When the number of instances increases, this is useful when we don't care about the existing segments as they
39+
* might be removed later by retention, or there are significantly unbalanced demands on newer segments. This strategy
40+
* works better than {@link BalancedNumSegmentAssignmentStrategy} since that strategy may require a bootstrap rebalance
41+
* in this case. Note that bootstrap rebalances with this strategy are non-deterministic in ordering because the
42+
* round-robin counter is not persisted.
43+
*
44+
* <p>The round-robin counter is maintained per table in memory and does not sync across controller instances, which
45+
* means segment assignment may not be strictly round-robin after controller restarts or when multiple controllers
46+
* assign segments concurrently.
47+
*/
48+
public class RoundRobinSegmentAssignmentStrategy extends BalancedNumSegmentAssignmentStrategy {
49+
private static final Map<String, Integer> TABLE_ROUND_ROBIN_COUNTER = new ConcurrentHashMap<>();
50+
private static final Random RANDOM = new Random();
51+
private String _tableName;
52+
53+
@Override
54+
public void init(HelixManager helixManager, TableConfig tableConfig) {
55+
super.init(helixManager, tableConfig);
56+
_tableName = tableConfig.getTableName();
57+
}
58+
59+
/**
60+
* Assign the segment to the next n_replica instances from the current round-robin counter, and the counter increases
61+
* regardless whether the segment is actually added to the ideal state or not.
62+
*/
63+
@Override
64+
public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
65+
InstancePartitions instancePartitions) {
66+
validateSegmentAssignmentStrategy(instancePartitions);
67+
List<String> instances =
68+
SegmentAssignmentUtils.getInstancesForNonReplicaGroupBasedAssignment(instancePartitions, _replication);
69+
List<String> assignedInstances = new ArrayList<>();
70+
int instanceId = TABLE_ROUND_ROBIN_COUNTER.compute(_tableName,
71+
(key, value) -> value == null ? RANDOM.nextInt(instances.size()) : (value + _replication) % instances.size());
72+
for (int i = 0; i < _replication; i++) {
73+
assignedInstances.add(instances.get((instanceId - i + instances.size()) % instances.size()));
74+
}
75+
return assignedInstances;
76+
}
77+
}

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/strategy/SegmentAssignmentStrategyFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ public static SegmentAssignmentStrategy getSegmentAssignmentStrategy(HelixManage
8686
} else {
8787
// Set segment assignment strategy depending on strategy set in table config
8888
switch (assignmentStrategy) {
89+
case AssignmentStrategy.ROUND_ROBIN_SEGMENT_ASSIGNMENT_STRATEGY:
90+
segmentAssignmentStrategy = new RoundRobinSegmentAssignmentStrategy();
91+
break;
92+
case AssignmentStrategy.ROUND_ROBIN_REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY:
93+
segmentAssignmentStrategy = new RoundRobinReplicaGroupSegmentAssignmentStrategy();
94+
break;
8995
case AssignmentStrategy.REPLICA_GROUP_SEGMENT_ASSIGNMENT_STRATEGY:
9096
segmentAssignmentStrategy = new ReplicaGroupSegmentAssignmentStrategy();
9197
break;

0 commit comments

Comments
 (0)