Skip to content

Commit 8782662

Browse files
authored
Add UT for new operators introduced by window function optimization (#17209)
1 parent 1fdd4b3 commit 8782662

4 files changed

Lines changed: 1035 additions & 4 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/GroupedTopNRowNumberBuilder.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class GroupedTopNRowNumberBuilder implements GroupedTopNBuilder {
4949
private final GroupedTopNRowNumberAccumulator groupedTopNRowNumberAccumulator;
5050
private final TsBlockWithPositionComparator comparator;
5151

52+
private int effectiveGroupCount = 0;
53+
5254
public GroupedTopNRowNumberBuilder(
5355
List<TSDataType> sourceTypes,
5456
TsBlockWithPositionComparator comparator,
@@ -77,10 +79,18 @@ public GroupedTopNRowNumberBuilder(
7779

7880
@Override
7981
public void addTsBlock(TsBlock tsBlock) {
80-
int[] groupIds = groupByHash.getGroupIds(tsBlock.getColumns(groupByChannels));
81-
int groupCount = groupByHash.getGroupCount();
82+
int[] groupIds;
83+
if (groupByChannels.length == 0) {
84+
groupIds = new int[tsBlock.getPositionCount()];
85+
if (tsBlock.getPositionCount() > 0) {
86+
effectiveGroupCount = 1;
87+
}
88+
} else {
89+
groupIds = groupByHash.getGroupIds(tsBlock.getColumns(groupByChannels));
90+
effectiveGroupCount = groupByHash.getGroupCount();
91+
}
8292

83-
processTsBlock(tsBlock, groupCount, groupIds);
93+
processTsBlock(tsBlock, effectiveGroupCount, groupIds);
8494
}
8595

8696
@Override
@@ -120,7 +130,7 @@ private void processTsBlock(TsBlock newTsBlock, int groupCount, int[] groupIds)
120130

121131
private class ResultIterator extends AbstractIterator<TsBlock> {
122132
private final TsBlockBuilder tsBlockBuilder;
123-
private final int groupIdCount = groupByHash.getGroupCount();
133+
private final int groupIdCount = effectiveGroupCount;
124134
private int currentGroupId = -1;
125135
private final LongBigArray rowIdOutput = new LongBigArray();
126136
private long currentGroupSize;
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.execution.operator.process;
21+
22+
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
24+
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
25+
import org.apache.iotdb.db.queryengine.common.QueryId;
26+
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
27+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
28+
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
29+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
30+
31+
import com.google.common.collect.ImmutableList;
32+
import org.apache.tsfile.block.column.ColumnBuilder;
33+
import org.apache.tsfile.enums.TSDataType;
34+
import org.apache.tsfile.read.common.block.TsBlock;
35+
import org.apache.tsfile.read.common.block.TsBlockBuilder;
36+
import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
37+
import org.junit.Test;
38+
39+
import java.util.Arrays;
40+
import java.util.Collections;
41+
import java.util.concurrent.ExecutorService;
42+
43+
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
44+
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
45+
import static org.junit.Assert.assertEquals;
46+
import static org.junit.Assert.assertFalse;
47+
import static org.junit.Assert.assertNotNull;
48+
import static org.junit.Assert.assertNull;
49+
import static org.junit.Assert.assertTrue;
50+
import static org.junit.Assert.fail;
51+
52+
public class ValuesOperatorTest {
53+
private static final ExecutorService instanceNotificationExecutor =
54+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "valuesOperator-test-instance-notification");
55+
56+
@Test
57+
public void testEmptyValues() {
58+
try (ValuesOperator operator = genValuesOperator(ImmutableList.of())) {
59+
assertTrue(operator.isFinished());
60+
assertFalse(operator.hasNext());
61+
assertNull(operator.next());
62+
assertEquals(0, operator.calculateMaxPeekMemory());
63+
assertEquals(0, operator.calculateMaxReturnSize());
64+
assertEquals(0, operator.calculateRetainedSizeAfterCallingNext());
65+
} catch (Exception e) {
66+
e.printStackTrace();
67+
fail(e.getMessage());
68+
}
69+
}
70+
71+
@Test
72+
public void testSingleTsBlock() {
73+
int[] values = {10, 20, 30};
74+
TsBlock tsBlock = createIntTsBlock(values);
75+
76+
try (ValuesOperator operator = genValuesOperator(Collections.singletonList(tsBlock))) {
77+
assertFalse(operator.isFinished());
78+
assertTrue(operator.hasNext());
79+
80+
TsBlock result = operator.next();
81+
assertNotNull(result);
82+
assertEquals(3, result.getPositionCount());
83+
for (int i = 0; i < values.length; i++) {
84+
assertEquals(values[i], result.getColumn(0).getInt(i));
85+
}
86+
87+
assertTrue(operator.isFinished());
88+
assertFalse(operator.hasNext());
89+
} catch (Exception e) {
90+
e.printStackTrace();
91+
fail(e.getMessage());
92+
}
93+
}
94+
95+
@Test
96+
public void testMultipleTsBlocks() {
97+
int[] values1 = {1, 2, 3};
98+
int[] values2 = {4, 5};
99+
int[] values3 = {6, 7, 8, 9};
100+
101+
TsBlock block1 = createIntTsBlock(values1);
102+
TsBlock block2 = createIntTsBlock(values2);
103+
TsBlock block3 = createIntTsBlock(values3);
104+
105+
try (ValuesOperator operator = genValuesOperator(Arrays.asList(block1, block2, block3))) {
106+
assertFalse(operator.isFinished());
107+
assertTrue(operator.hasNext());
108+
109+
// First block
110+
TsBlock result1 = operator.next();
111+
assertNotNull(result1);
112+
assertEquals(3, result1.getPositionCount());
113+
for (int i = 0; i < values1.length; i++) {
114+
assertEquals(values1[i], result1.getColumn(0).getInt(i));
115+
}
116+
117+
// Second block
118+
assertFalse(operator.isFinished());
119+
assertTrue(operator.hasNext());
120+
TsBlock result2 = operator.next();
121+
assertNotNull(result2);
122+
assertEquals(2, result2.getPositionCount());
123+
for (int i = 0; i < values2.length; i++) {
124+
assertEquals(values2[i], result2.getColumn(0).getInt(i));
125+
}
126+
127+
// Third block
128+
assertFalse(operator.isFinished());
129+
assertTrue(operator.hasNext());
130+
TsBlock result3 = operator.next();
131+
assertNotNull(result3);
132+
assertEquals(4, result3.getPositionCount());
133+
for (int i = 0; i < values3.length; i++) {
134+
assertEquals(values3[i], result3.getColumn(0).getInt(i));
135+
}
136+
137+
assertTrue(operator.isFinished());
138+
assertFalse(operator.hasNext());
139+
} catch (Exception e) {
140+
e.printStackTrace();
141+
fail(e.getMessage());
142+
}
143+
}
144+
145+
@Test
146+
public void testRetainedSizeDecreases() {
147+
int[] values1 = {1, 2, 3};
148+
int[] values2 = {4, 5, 6};
149+
150+
TsBlock block1 = createIntTsBlock(values1);
151+
TsBlock block2 = createIntTsBlock(values2);
152+
153+
try (ValuesOperator operator = genValuesOperator(Arrays.asList(block1, block2))) {
154+
long initialRetained = operator.calculateRetainedSizeAfterCallingNext();
155+
156+
operator.next();
157+
long afterFirstRetained = operator.calculateRetainedSizeAfterCallingNext();
158+
assertTrue(
159+
"Retained size should decrease after consuming a block",
160+
afterFirstRetained < initialRetained);
161+
162+
operator.next();
163+
long afterSecondRetained = operator.calculateRetainedSizeAfterCallingNext();
164+
assertEquals(0, afterSecondRetained);
165+
} catch (Exception e) {
166+
e.printStackTrace();
167+
fail(e.getMessage());
168+
}
169+
}
170+
171+
@Test
172+
public void testIsBlockedReturnsNotBlocked() {
173+
try (ValuesOperator operator = genValuesOperator(ImmutableList.of())) {
174+
assertTrue(operator.isBlocked().isDone());
175+
} catch (Exception e) {
176+
e.printStackTrace();
177+
fail(e.getMessage());
178+
}
179+
}
180+
181+
private TsBlock createIntTsBlock(int[] values) {
182+
TsBlockBuilder builder =
183+
new TsBlockBuilder(values.length, Collections.singletonList(TSDataType.INT32));
184+
ColumnBuilder columnBuilder = builder.getColumnBuilder(0);
185+
for (int value : values) {
186+
columnBuilder.writeInt(value);
187+
}
188+
builder.declarePositions(values.length);
189+
return builder.build(
190+
new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, builder.getPositionCount()));
191+
}
192+
193+
private ValuesOperator genValuesOperator(java.util.List<TsBlock> tsBlocks) {
194+
QueryId queryId = new QueryId("stub_query");
195+
FragmentInstanceId instanceId =
196+
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
197+
FragmentInstanceStateMachine stateMachine =
198+
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
199+
FragmentInstanceContext fragmentInstanceContext =
200+
createFragmentInstanceContext(instanceId, stateMachine);
201+
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
202+
PlanNodeId planNode = new PlanNodeId("1");
203+
driverContext.addOperatorContext(1, planNode, TreeLinearFillOperator.class.getSimpleName());
204+
205+
return new ValuesOperator(driverContext.getOperatorContexts().get(0), tsBlocks);
206+
}
207+
}

0 commit comments

Comments
 (0)