Skip to content

Commit 0c23210

Browse files
CpaulyzJackieTien97
authored andcommitted
Fix TVF return unexpected result when passing ORIGIN argument
(cherry picked from commit 880c433)
1 parent 9763fdb commit 0c23210

5 files changed

Lines changed: 86 additions & 24 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowTVFIT.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,29 @@ public void testHopFunction() {
130130
expectedHeader,
131131
retArray,
132132
DATABASE_NAME);
133+
expectedHeader = new String[] {"window_start", "window_end", "stock_id", "sum"};
134+
retArray =
135+
new String[] {
136+
"2021-01-01T09:07:00.000Z,2021-01-01T09:08:00.000Z,AAPL,103.0,",
137+
"2021-01-01T09:09:00.000Z,2021-01-01T09:10:00.000Z,AAPL,102.0,",
138+
"2021-01-01T09:07:00.000Z,2021-01-01T09:08:00.000Z,TESL,202.0,",
139+
"2021-01-01T09:15:00.000Z,2021-01-01T09:16:00.000Z,TESL,195.0,",
140+
};
141+
tableResultSetEqualTest(
142+
"SELECT window_start, window_end, stock_id, sum(price) as sum FROM HOP(DATA => bid, TIMECOL => 'time', SLIDE => 1m, SIZE => 1m, ORIGIN => 2021-01-01T09:07:00) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start",
143+
expectedHeader,
144+
retArray,
145+
DATABASE_NAME);
146+
retArray =
147+
new String[] {
148+
"2021-01-01T09:07:00.000Z,2021-01-01T09:08:00.000Z,AAPL,103.0,",
149+
"2021-01-01T09:07:00.000Z,2021-01-01T09:08:00.000Z,TESL,202.0,",
150+
};
151+
tableResultSetEqualTest(
152+
"SELECT window_start, window_end, stock_id, sum(price) as sum FROM HOP(DATA => bid, TIMECOL => 'time', SLIDE => 1h, SIZE => 1m, ORIGIN => 2021-01-01T09:07:00) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start",
153+
expectedHeader,
154+
retArray,
155+
DATABASE_NAME);
133156
tableAssertTestFail(
134157
"SELECT * FROM HOP(DATA => bid, TIMECOL => 'time', SLIDE => -300000, SIZE => 600000) ORDER BY stock_id, time",
135158
"Invalid scalar argument SLIDE, should be a positive value",
@@ -166,7 +189,7 @@ public void testSessionFunction() {
166189
"2021-01-01T09:15:00.000Z,2021-01-01T09:15:00.000Z,TESL,195.0,",
167190
};
168191
tableResultSetEqualTest(
169-
"SELECT window_start, window_end, stock_id, sum(price) as sum FROM SESSION(DATA => bid PARTITION BY stock_id ORDER BY time, TIMECOL => 'time', GAP => 2m) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start",
192+
"SELECT window_start, window_end, stock_id, sum(price) as sum FROM SESSION(DATA => bid PARTITION BY stock_id ORDER BY time, GAP => 2m) GROUP BY window_start, window_end, stock_id ORDER BY stock_id, window_start",
170193
expectedHeader,
171194
retArray,
172195
DATABASE_NAME);
@@ -256,6 +279,19 @@ public void testTumbleFunction() {
256279
retArray,
257280
DATABASE_NAME);
258281

282+
// TUMBLE (10m) + origin
283+
expectedHeader = new String[] {"window_start", "window_end", "time", "stock_id", "price", "s1"};
284+
retArray =
285+
new String[] {
286+
"2021-01-01T09:08:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
287+
"2021-01-01T09:08:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
288+
};
289+
tableResultSetEqualTest(
290+
"SELECT * FROM TUMBLE(DATA => bid, TIMECOL => 'time', SIZE => 10m, ORIGIN => 2021-01-01T09:08:00) ORDER BY stock_id, time",
291+
expectedHeader,
292+
retArray,
293+
DATABASE_NAME);
294+
259295
// TUMBLE (10m) + GROUP BY
260296
expectedHeader = new String[] {"window_start", "window_end", "stock_id", "sum"};
261297
retArray =
@@ -309,6 +345,25 @@ public void testCumulateFunction() {
309345
retArray,
310346
DATABASE_NAME);
311347

348+
expectedHeader = new String[] {"window_start", "window_end", "time", "stock_id", "price", "s1"};
349+
retArray =
350+
new String[] {
351+
"2021-01-01T09:06:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
352+
"2021-01-01T09:06:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:07:00.000Z,AAPL,103.0,101.0,",
353+
"2021-01-01T09:06:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
354+
"2021-01-01T09:06:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:09:00.000Z,AAPL,102.0,101.0,",
355+
"2021-01-01T09:06:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
356+
"2021-01-01T09:06:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:06:00.000Z,TESL,200.0,102.0,",
357+
"2021-01-01T09:06:00.000Z,2021-01-01T09:12:00.000Z,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
358+
"2021-01-01T09:06:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:07:00.000Z,TESL,202.0,202.0,",
359+
"2021-01-01T09:06:00.000Z,2021-01-01T09:18:00.000Z,2021-01-01T09:15:00.000Z,TESL,195.0,332.0,",
360+
};
361+
tableResultSetEqualTest(
362+
"SELECT * FROM CUMULATE(DATA => bid, TIMECOL => 'time', STEP => 6m, SIZE => 12m, ORIGIN => 2021-01-01T09:06:00) ORDER BY stock_id, time",
363+
expectedHeader,
364+
retArray,
365+
DATABASE_NAME);
366+
312367
expectedHeader = new String[] {"window_start", "window_end", "stock_id", "sum"};
313368
retArray =
314369
new String[] {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,13 @@ private static class CumulateDataProcessor implements TableFunctionDataProcessor
146146

147147
private final long step;
148148
private final long size;
149-
private final long start;
149+
private final long origin;
150150
private long curIndex = 0;
151151

152152
public CumulateDataProcessor(long startTime, long step, long size) {
153153
this.step = step;
154154
this.size = size;
155-
this.start = startTime;
155+
this.origin = startTime;
156156
}
157157

158158
@Override
@@ -162,13 +162,15 @@ public void process(
162162
ColumnBuilder passThroughIndexBuilder) {
163163
// find the first windows
164164
long timeValue = input.getLong(0);
165-
long window_start = (timeValue - start) / size * size;
166-
for (long steps = (timeValue - window_start + step) / step * step;
167-
steps <= size;
168-
steps += step) {
169-
properColumnBuilders.get(0).writeLong(window_start);
170-
properColumnBuilders.get(1).writeLong(window_start + steps);
171-
passThroughIndexBuilder.writeLong(curIndex);
165+
if (timeValue >= origin) {
166+
long windowStart = origin + (timeValue - origin) / size * size;
167+
for (long steps = (timeValue - windowStart + step) / step * step;
168+
steps <= size;
169+
steps += step) {
170+
properColumnBuilders.get(0).writeLong(windowStart);
171+
properColumnBuilders.get(1).writeLong(windowStart + steps);
172+
passThroughIndexBuilder.writeLong(curIndex);
173+
}
172174
}
173175
curIndex++;
174176
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,13 @@ private static class HOPDataProcessor implements TableFunctionDataProcessor {
140140

141141
private final long slide;
142142
private final long size;
143-
private final long start;
143+
private final long origin;
144144
private long curIndex = 0;
145145

146146
public HOPDataProcessor(long startTime, long slide, long size) {
147147
this.slide = slide;
148148
this.size = size;
149-
this.start = startTime;
149+
this.origin = startTime;
150150
}
151151

152152
@Override
@@ -157,12 +157,14 @@ public void process(
157157
// find the first windows that satisfy the condition: start + n*slide <= time < start +
158158
// n*slide + size
159159
long timeValue = input.getLong(0);
160-
long window_start = (timeValue - start - size + slide) / slide * slide;
161-
while (window_start <= timeValue && window_start + size > timeValue) {
162-
properColumnBuilders.get(0).writeLong(window_start);
163-
properColumnBuilders.get(1).writeLong(window_start + size);
164-
passThroughIndexBuilder.writeLong(curIndex);
165-
window_start += slide;
160+
if (timeValue >= origin) {
161+
long window_start = origin + (timeValue - origin - size + slide) / slide * slide;
162+
while (window_start <= timeValue && window_start + size > timeValue) {
163+
properColumnBuilders.get(0).writeLong(window_start);
164+
properColumnBuilders.get(1).writeLong(window_start + size);
165+
passThroughIndexBuilder.writeLong(curIndex);
166+
window_start += slide;
167+
}
166168
}
167169
curIndex++;
168170
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public List<ParameterSpecification> getArgumentsSpecifications() {
6060
ScalarParameterSpecification.builder()
6161
.name(TIMECOL_PARAMETER_NAME)
6262
.type(Type.STRING)
63+
.defaultValue("time")
6364
.build(),
6465
ScalarParameterSpecification.builder().name(GAP_PARAMETER_NAME).type(Type.INT64).build());
6566
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,12 @@ public TableFunctionDataProcessor getDataProcessor() {
129129

130130
private static class TumbleDataProcessor implements TableFunctionDataProcessor {
131131
private final long size;
132-
private final long start;
132+
private final long origin;
133133
private long curIndex = 0;
134134

135135
public TumbleDataProcessor(long startTime, long size) {
136136
this.size = size;
137-
this.start = startTime;
137+
this.origin = startTime;
138138
}
139139

140140
@Override
@@ -144,10 +144,12 @@ public void process(
144144
ColumnBuilder passThroughIndexBuilder) {
145145
// find the proper window
146146
long timeValue = input.getLong(0);
147-
long window_start = (timeValue - start) / size * size;
148-
properColumnBuilders.get(0).writeLong(window_start);
149-
properColumnBuilders.get(1).writeLong(window_start + size);
150-
passThroughIndexBuilder.writeLong(curIndex);
147+
if (timeValue >= origin) {
148+
long windowStart = origin + (timeValue - origin) / size * size;
149+
properColumnBuilders.get(0).writeLong(windowStart);
150+
properColumnBuilders.get(1).writeLong(windowStart + size);
151+
passThroughIndexBuilder.writeLong(curIndex);
152+
}
151153
curIndex++;
152154
}
153155
}

0 commit comments

Comments
 (0)