diff --git a/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryCommandStatusStore.java b/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryCommandStatusStore.java index 90e028e156..24c3b5ab7d 100644 --- a/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryCommandStatusStore.java +++ b/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryCommandStatusStore.java @@ -190,9 +190,15 @@ public Stream> selectEntries(CommandStatusFilter fi .flatMap(entry -> getStatusByCommand(entry.getKey())); } + // create comparator for time sort + Comparator> comparator = Comparator.comparing(e -> e.getKey().reportTime); + if (filter.getReportTime() != null && filter.getReportTime().isDescendingOrder()) + comparator = comparator.reversed(); + // filter with predicate and apply limit resultStream = resultStream .filter(e -> filter.test(e.getValue())) + .sorted(comparator) .limit(filter.getLimit()); // casting is ok since keys are subtypes of BigId diff --git a/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryCommandStore.java b/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryCommandStore.java index eeaed01997..fb7063ba2d 100644 --- a/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryCommandStore.java +++ b/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryCommandStore.java @@ -243,10 +243,16 @@ else if (filter.getCommandStreamFilter() == null) .anyMatch(status -> filter.getStatusFilter().test(status.getValue())); }); } + + // create comparator for time sort + Comparator> comparator = Comparator.comparing(e -> e.getKey().issueTime); + if (filter.getIssueTime() != null && filter.getIssueTime().isDescendingOrder()) + comparator = comparator.reversed(); - // filter with predicate and apply limit + // filter with predicate, sort by time and apply limit resultStream = resultStream .filter(e -> filter.test(e.getValue())) + .sorted(comparator) .limit(filter.getLimit()); // casting is ok since keys are subtypes of BigId diff --git a/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryObsStore.java b/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryObsStore.java index 7520ef0de2..5d0a6e29bf 100644 --- a/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryObsStore.java +++ b/sensorhub-core/src/main/java/org/sensorhub/impl/datastore/mem/InMemoryObsStore.java @@ -226,7 +226,7 @@ Stream> getObsByDataStreamAndFoi(BigId dataStreamID, Big public Stream> selectEntries(ObsFilter filter, Set fields) { Stream> resultStream = null; - + // fetch obs directly in case of filtering by internal IDs if (filter.getInternalIDs() != null) { @@ -272,19 +272,31 @@ else if (filter.getFoiFilter() != null && filter.getDataStreamFilter() == null) if (dataStreamIDs.isEmpty()) return Stream.empty(); + // create set of selected FOIs + Set foiIDs = DataStoreUtils.selectFeatureIDs(foiStore, filter.getFoiFilter()) + .collect(Collectors.toSet()); + if (foiIDs.isEmpty()) + return Stream.empty(); + // cross product between list of datastream IDs and foiIDs resultStream = dataStreamIDs.stream() .flatMap(dsID -> { - return DataStoreUtils.selectFeatureIDs(foiStore, filter.getFoiFilter()) + return foiIDs.stream() .flatMap(foiID -> { return getObsByDataStreamAndFoi(dsID, foiID); }); }); } + + // create comparator for time sort + Comparator> comparator = Comparator.comparing(e -> e.getKey().phenomenonTime); + if (filter.getPhenomenonTime() != null && filter.getPhenomenonTime().isDescendingOrder()) + comparator = comparator.reversed(); - // filter with predicate and apply limit + // filter with predicate, sort by time and apply limit resultStream = resultStream .filter(e -> filter.test(e.getValue())) + .sorted(comparator) .limit(filter.getLimit()); // casting is ok since keys are subtypes of BigId diff --git a/sensorhub-core/src/testFixtures/java/org/sensorhub/impl/datastore/AbstractTestObsStore.java b/sensorhub-core/src/testFixtures/java/org/sensorhub/impl/datastore/AbstractTestObsStore.java index 4b0675c41f..8cca79fc4e 100644 --- a/sensorhub-core/src/testFixtures/java/org/sensorhub/impl/datastore/AbstractTestObsStore.java +++ b/sensorhub-core/src/testFixtures/java/org/sensorhub/impl/datastore/AbstractTestObsStore.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -758,6 +759,69 @@ public void testSelectObsByDataStreamFilter() throws Exception } + @Test + public void testSelectObsDescendingOrder() throws Exception + { + Stream> resultStream; + Map expectedResults; + ObsFilter filter; + + var ds1 = addSimpleDataStream(bigId(1), "out1"); + Instant startProc1Batch1 = Instant.parse("2015-06-23T18:24:15.233Z"); + Map proc1Batch1 = addSimpleObsWithoutResultTime(ds1, bigId(23), startProc1Batch1, 10, 30*24*3600*1000L); + Instant startProc1Batch2 = Instant.parse("2018-02-11T08:12:06.897Z"); + Map proc1Batch2 = addSimpleObsWithoutResultTime(ds1, bigId(46), startProc1Batch2, 3, 100*24*3600*1000L); + Instant startProc1Batch3 = Instant.parse("2025-06-23T18:24:15.233Z"); + Map proc1Batch3 = addSimpleObsWithoutResultTime(ds1, BigId.NONE, startProc1Batch3, 10, 30*24*3600*1000L); + + forceReadBackFromStorage(); + + // ds1 and all times + filter = new ObsFilter.Builder() + .withDataStreams(ds1) + .withPhenomenonTime() + .descendingOrder(true) + .done() + .build(); + resultStream = obsStore.selectEntries(filter); + expectedResults = new LinkedHashMap<>(); + expectedResults.putAll(proc1Batch1); + expectedResults.putAll(proc1Batch2); + expectedResults.putAll(proc1Batch3); + checkSelectedEntries(resultStream, expectedResults, filter); + + // check order is descending + var obsList = obsStore.selectEntries(filter) + .map(e -> e.getValue()) + .collect(Collectors.toList()); + var sortedObsList = new ArrayList<>(obsList); + sortedObsList.sort(Comparator.comparing(obs -> obs.getPhenomenonTime())); + Collections.reverse(sortedObsList); + assertEquals(sortedObsList, obsList); + + // ds1 and single FOI + filter = new ObsFilter.Builder() + .withDataStreams(ds1) + .withFois(bigId(23)) + .withPhenomenonTime() + .descendingOrder(true) + .done() + .build(); + resultStream = obsStore.selectEntries(filter); + expectedResults = new LinkedHashMap<>(); + expectedResults.putAll(proc1Batch1); + checkSelectedEntries(resultStream, expectedResults, filter); + + obsList = obsStore.selectEntries(filter) + .map(e -> e.getValue()) + .collect(Collectors.toList()); + sortedObsList = new ArrayList<>(obsList); + sortedObsList.sort(Comparator.comparing(obs -> obs.getPhenomenonTime())); + Collections.reverse(sortedObsList); + assertEquals(sortedObsList, obsList); + } + + @Test(expected = IllegalStateException.class) public void testErrorWithFoiFilterJoin() throws Exception { diff --git a/sensorhub-datastore-h2/src/main/java/org/h2/mvstore/DescendingCursor.java b/sensorhub-datastore-h2/src/main/java/org/h2/mvstore/DescendingCursor.java new file mode 100644 index 0000000000..0b352df47a --- /dev/null +++ b/sensorhub-datastore-h2/src/main/java/org/h2/mvstore/DescendingCursor.java @@ -0,0 +1,177 @@ +/***************************** BEGIN LICENSE BLOCK *************************** + +The contents of this file are subject to the Mozilla Public License, v. 2.0. +If a copy of the MPL was not distributed with this file, You can obtain one +at http://mozilla.org/MPL/2.0/. + +Software distributed under the License is distributed on an "AS IS" basis, +WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License +for the specific language governing rights and limitations under the License. + +Copyright (C) 2026 GeoRobotix Inc. All Rights Reserved. + +******************************* END LICENSE BLOCK ***************************/ + +package org.h2.mvstore; + +import java.util.Iterator; +import java.util.NoSuchElementException; + + +/** + * A cursor to iterate over elements in ascending order. + * + * @param the key type + * @param the value type + */ +public class DescendingCursor implements Iterator { + private final K to; + private CursorPos cursorPos; + private CursorPos keeper; + private K current; + private K last; + private V lastValue; + private Page lastPage; + + public DescendingCursor(Page root, K from) { + this(root, from, null); + } + + public DescendingCursor(Page root, K from, K to) { + this.cursorPos = traverseDown(root, from); + this.to = to; + } + + @Override + @SuppressWarnings("unchecked") + public boolean hasNext() { + if (cursorPos != null) { + while (current == null) { + Page page = cursorPos.page; + int index = cursorPos.index; + if (index < 0) { + CursorPos tmp = cursorPos; + cursorPos = cursorPos.parent; + tmp.parent = keeper; + keeper = tmp; + if(cursorPos == null) + { + return false; + } + } else { + while (!page.isLeaf()) { + page = page.getChildPage(index); + int numKeys = page.isLeaf() ? page.getKeyCount() : page.map.getChildPageCount(page); + if (keeper == null) { + cursorPos = new CursorPos(page, numKeys-1, cursorPos); + } else { + CursorPos tmp = keeper; + keeper = keeper.parent; + tmp.parent = cursorPos; + tmp.page = page; + tmp.index = numKeys-1; + cursorPos = tmp; + } + index = numKeys-1; + } + if (index >= 0) { + K key = (K) page.getKey(index); + if (to != null && page.map.getKeyType().compare(key, to) < 0) { + return false; + } + current = last = key; + lastValue = (V) page.getValue(index); + lastPage = page; + } + } + --cursorPos.index; + } + } + return current != null; + } + + @Override + public K next() { + if(!hasNext()) { + throw new NoSuchElementException(); + } + current = null; + return last; + } + + /** + * Get the last read key if there was one. + * + * @return the key or null + */ + public K getKey() { + return last; + } + + /** + * Get the last read value if there was one. + * + * @return the value or null + */ + public V getValue() { + return lastValue; + } + + /** + * Get the page where last retrieved key is located. + * + * @return the page + */ + Page getPage() { + return lastPage; + } + + /** + * Skip over that many entries. This method is relatively fast (for this map + * implementation) even if many entries need to be skipped. + * + * @param n the number of entries to skip + */ + public void skip(long n) { + if (n < 10) { + while (n-- > 0 && hasNext()) { + next(); + } + } else if(hasNext()) { + assert cursorPos != null; + CursorPos cp = cursorPos; + CursorPos parent; + while ((parent = cp.parent) != null) cp = parent; + Page root = cp.page; + @SuppressWarnings("unchecked") + MVMap map = (MVMap) root.map; + long index = map.getKeyIndex(next()); + last = map.getKey(index + n); + this.cursorPos = traverseDown(root, last); + } + } + + @Override + public void remove() { + throw DataUtils.newUnsupportedOperationException( + "Removal is not supported"); + } + + /** + * Fetch the next entry that is equal or larger than the given key, starting + * from the given page. This method retains the stack. + * + * @param p the page to start from + * @param key the key to search, null means search for the first key + */ + private static CursorPos traverseDown(Page p, Object key) { + CursorPos cursorPos = key == null ? p.getPrependCursorPos(null) : CursorPos.traverseDown(p, key); + if (cursorPos.index < 0) { + cursorPos.index = -cursorPos.index - 1; + // select previous instead of next + cursorPos.index--; + } + return cursorPos; + } +} + diff --git a/sensorhub-datastore-h2/src/main/java/org/h2/mvstore/RangeCursor.java b/sensorhub-datastore-h2/src/main/java/org/h2/mvstore/RangeCursor.java index d99b10d339..5ac45156d3 100644 --- a/sensorhub-datastore-h2/src/main/java/org/h2/mvstore/RangeCursor.java +++ b/sensorhub-datastore-h2/src/main/java/org/h2/mvstore/RangeCursor.java @@ -21,12 +21,11 @@ import java.util.Spliterators; import java.util.stream.Stream; import java.util.stream.StreamSupport; -import org.sensorhub.impl.datastore.IteratorWrapper; /** *

- * Custom MVMap cursor adding support for an end key to stop iteration. + * Wrapper for native H2 Cursor and DescendingCursor *

* * @author Alex Robin @@ -34,10 +33,11 @@ * @param Value Type * @since Oct 25, 2016 */ -public class RangeCursor extends IteratorWrapper +public class RangeCursor implements Iterator { - MVMap map; - K to; + final Iterator cursor; + final MVMap map; + final boolean descending; public RangeCursor(MVMap map, K from) @@ -48,38 +48,49 @@ public RangeCursor(MVMap map, K from) public RangeCursor(MVMap map, K from, K to) { - // TODO: Update to use reverse-order cursor in newer H2 version - super(map.cursor(from)); + this(map, from, to, false); + } + + + public RangeCursor(MVMap map, K from, K to, boolean descending) + { + this.cursor = descending ? + new DescendingCursor(map.getRootPage(), from, to) : + new Cursor(map.getRootPage(), from, to); this.map = map; - this.to = to; + this.descending = descending; } @Override - protected void preloadNext() + public boolean hasNext() { - next = null; - - if (it.hasNext()) - { - next = it.next(); - if (to != null && map.getKeyType().compare(next, to) > 0) - next = null; - } + return cursor.hasNext(); + } + + + @Override + public K next() + { + return cursor.next(); } @SuppressWarnings("unchecked") public K getKey() { - return ((Cursor)it).getKey(); + return descending ? + ((DescendingCursor)cursor).getKey() : + ((Cursor)cursor).getKey(); } @SuppressWarnings("unchecked") public V getValue() { - return ((Cursor)it).getValue(); + return descending ? + ((DescendingCursor)cursor).getValue() : + ((Cursor)cursor).getValue(); } @@ -143,12 +154,4 @@ public Stream> entryStream() { return StreamSupport.stream(entryIterator(), false); } - - - @Override - protected K process(K elt) - { - return elt; - } - } diff --git a/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVCommandStatusStoreImpl.java b/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVCommandStatusStoreImpl.java index 8d5239d3c5..99ca08d7f8 100644 --- a/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVCommandStatusStoreImpl.java +++ b/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVCommandStatusStoreImpl.java @@ -65,6 +65,8 @@ static class TimeParams { Range reportTimeRange; Range execTimeRange; + boolean descendingReportTime; + boolean descendingExecTime; boolean currentTimeOnly; boolean latestResultOnly; @@ -72,24 +74,29 @@ static class TimeParams TimeParams(CommandStatusFilter filter) { // get report time range - reportTimeRange = filter.getReportTime() != null ? - filter.getReportTime().getRange() : H2Utils.ALL_TIMES_RANGE; + reportTimeRange = H2Utils.ALL_TIMES_RANGE; + if (filter.getReportTime() != null) { + reportTimeRange = filter.getReportTime().getRange(); + latestResultOnly = filter.getReportTime().isLatestTime(); + descendingReportTime = filter.getReportTime().isDescendingOrder(); + } // get execution time range - execTimeRange = filter.getExecutionTime() != null ? - filter.getExecutionTime().getRange() : H2Utils.ALL_TIMES_RANGE; - - // try to derive execution time range from report time range - // so we can use the time index - if (filter.getReportTime() == null && execTimeRange != null && execTimeRange != H2Utils.ALL_TIMES_RANGE) - { - var begin = execTimeRange.lowerEndpoint().minus(1, ChronoUnit.DAYS); - var end = execTimeRange.upperEndpoint(); - reportTimeRange = Range.closed(begin, end); + execTimeRange = H2Utils.ALL_TIMES_RANGE; + if (filter.getExecutionTime() != null) { + execTimeRange = filter.getExecutionTime().getRange(); + currentTimeOnly = filter.getExecutionTime().isCurrentTime(); + descendingExecTime = filter.getExecutionTime().isDescendingOrder(); + + // try to derive report time range from execution time range + // so we can use the time index + if (filter.getReportTime() == null) + { + var begin = execTimeRange.lowerEndpoint().minus(1, ChronoUnit.DAYS); + var end = execTimeRange.upperEndpoint(); + reportTimeRange = Range.closed(begin, end); + } } - - latestResultOnly = filter.getReportTime() != null && filter.getReportTime().isLatestTime(); - currentTimeOnly = filter.getExecutionTime() != null && filter.getExecutionTime().isCurrentTime(); } } @@ -144,11 +151,13 @@ MVCommandStatusKey toInternalKey(Object keyObj) } - RangeCursor getStatusCursor(BigId cmdID, Range reportTimeRange) + RangeCursor getStatusCursor(BigId cmdID, Range reportTimeRange, boolean descending) { MVCommandStatusKey first = new MVCommandStatusKey(cmdID, reportTimeRange.lowerEndpoint()); MVCommandStatusKey last = new MVCommandStatusKey(cmdID, reportTimeRange.upperEndpoint()); - return new RangeCursor<>(statusIndex, first, last); + return descending ? + new RangeCursor<>(statusIndex, last, first, descending) : + new RangeCursor<>(statusIndex, first, last, descending); } @@ -166,8 +175,13 @@ Stream> getStatusStream(BigId cmdID, T } // scan using a cursor on main command index - var timeRange = reportTimeFilter != null ? reportTimeFilter.getRange() : H2Utils.ALL_TIMES_RANGE; - RangeCursor cursor = getStatusCursor(cmdID, timeRange); + var descendingTime = false; + var timeRange = H2Utils.ALL_TIMES_RANGE; + if (reportTimeFilter != null) { + timeRange = reportTimeFilter.getRange(); + descendingTime = reportTimeFilter.isDescendingOrder(); + } + RangeCursor cursor = getStatusCursor(cmdID, timeRange, descendingTime); return cursor.entryStream(); } diff --git a/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVCommandStoreImpl.java b/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVCommandStoreImpl.java index 4fa30ac748..1435fe8c7a 100644 --- a/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVCommandStoreImpl.java +++ b/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVCommandStoreImpl.java @@ -75,6 +75,7 @@ public class MVCommandStoreImpl implements ICommandStore static class TimeParams { Range issueTimeRange; + boolean descendingIssueTime; boolean currentTimeOnly; boolean latestResultOnly; @@ -82,11 +83,13 @@ static class TimeParams TimeParams(CommandFilter filter) { // get issue time range - issueTimeRange = filter.getIssueTime() != null ? - filter.getIssueTime().getRange() : H2Utils.ALL_TIMES_RANGE; - - latestResultOnly = filter.getIssueTime() != null && filter.getIssueTime().isLatestTime(); - currentTimeOnly = filter.getIssueTime() != null && filter.getIssueTime().isCurrentTime(); + issueTimeRange = H2Utils.ALL_TIMES_RANGE; + if (filter.getIssueTime() != null) { + issueTimeRange = filter.getIssueTime().getRange(); + descendingIssueTime = filter.getIssueTime().isDescendingOrder(); + latestResultOnly = filter.getIssueTime() != null && filter.getIssueTime().isLatestTime(); + currentTimeOnly = filter.getIssueTime() != null && filter.getIssueTime().isCurrentTime(); + } } } @@ -188,17 +191,20 @@ Stream getCommandSeriesByDataStream(long cmdStreamID) } - RangeCursor getCommandCursor(long seriesID, Range issueTimeRange) + RangeCursor getCommandCursor(long seriesID, Range issueTimeRange, boolean descending) { MVTimeSeriesRecordKey first = new MVTimeSeriesRecordKey(seriesID, issueTimeRange.lowerEndpoint()); MVTimeSeriesRecordKey last = new MVTimeSeriesRecordKey(seriesID, issueTimeRange.upperEndpoint()); - return new RangeCursor<>(cmdRecordsIndex, first, last); + return descending ? + new RangeCursor<>(cmdRecordsIndex, last, first, descending) : + new RangeCursor<>(cmdRecordsIndex, first, last, descending); } - Stream> getCommandStream(MVTimeSeriesInfo series, Range issueTimeRange, boolean latestOnly) + Stream> getCommandStream(MVTimeSeriesInfo series, TimeParams timeParams) { // if request if for latest only, get only the latest command in series + var latestOnly = timeParams.currentTimeOnly || timeParams.latestResultOnly; if (latestOnly) { MVTimeSeriesRecordKey maxKey = new MVTimeSeriesRecordKey(series.id, Instant.MAX); @@ -211,7 +217,7 @@ Stream> getCommandStream(MVTimeSeries // scan using a cursor on main command index // recreating full entries in the process - RangeCursor cursor = getCommandCursor(series.id, issueTimeRange); + RangeCursor cursor = getCommandCursor(series.id, timeParams.issueTimeRange, timeParams.descendingIssueTime); return cursor.entryStream(); } @@ -290,8 +296,7 @@ public Stream> selectEntries(CommandFilter filter, Se throw new IllegalStateException("Too many command streams or command receivers selected. Please refine your filter"); }) .flatMap(series -> { - var cmdStream = getCommandStream(series, timeParams.issueTimeRange, - timeParams.currentTimeOnly || timeParams.latestResultOnly); + var cmdStream = getCommandStream(series, timeParams); return getPostFilteredResultStream(cmdStream, filter); })); @@ -551,7 +556,7 @@ public Set> entrySet() public Iterator> iterator() { return getAllCommandSeries() .flatMap(series -> { - RangeCursor cursor = getCommandCursor(series.id, H2Utils.ALL_TIMES_RANGE); + RangeCursor cursor = getCommandCursor(series.id, H2Utils.ALL_TIMES_RANGE, false); // casting is ok since set is read-only and keys are subtypes of BigId @SuppressWarnings({ "unchecked" }) @@ -581,7 +586,7 @@ public Set keySet() public Iterator iterator() { return getAllCommandSeries() .flatMap(series -> { - RangeCursor cursor = getCommandCursor(series.id, H2Utils.ALL_TIMES_RANGE); + RangeCursor cursor = getCommandCursor(series.id, H2Utils.ALL_TIMES_RANGE, false); // casting is ok since set is read-only and keys are subtypes of BigId @SuppressWarnings({ "unchecked" }) @@ -675,9 +680,7 @@ public synchronized long removeEntries(CommandFilter filter) var timeParams = new TimeParams(filter); return selectCommandSeries(filter) .mapToLong(series -> { - var cmdStream = getCommandStream(series, - timeParams.issueTimeRange, - timeParams.latestResultOnly); + var cmdStream = getCommandStream(series, timeParams); // delete all matching record in series var numRemoved = getPostFilteredResultStream(cmdStream, filter) diff --git a/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVObsStoreImpl.java b/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVObsStoreImpl.java index 5a45832fd3..6b9b2aa8e1 100644 --- a/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVObsStoreImpl.java +++ b/sensorhub-datastore-h2/src/main/java/org/sensorhub/impl/datastore/h2/MVObsStoreImpl.java @@ -85,6 +85,8 @@ static class TimeParams { Range phenomenonTimeRange; Range resultTimeRange; + boolean descendingPhenomenonTime; + boolean descendingResultTime; boolean currentTimeOnly; boolean latestResultOnly; @@ -92,15 +94,20 @@ static class TimeParams TimeParams(ObsFilter filter) { // get phenomenon time range - phenomenonTimeRange = filter.getPhenomenonTime() != null ? - filter.getPhenomenonTime().getRange() : H2Utils.ALL_TIMES_RANGE; + phenomenonTimeRange = H2Utils.ALL_TIMES_RANGE; + if (filter.getPhenomenonTime() != null) { + phenomenonTimeRange = filter.getPhenomenonTime().getRange(); + descendingPhenomenonTime = filter.getPhenomenonTime().isDescendingOrder(); + currentTimeOnly = filter.getPhenomenonTime().isCurrentTime(); + } // get result time range - resultTimeRange = filter.getResultTime() != null ? - filter.getResultTime().getRange() : H2Utils.ALL_TIMES_RANGE; - - latestResultOnly = filter.getResultTime() != null && filter.getResultTime().isLatestTime(); - currentTimeOnly = filter.getPhenomenonTime() != null && filter.getPhenomenonTime().isCurrentTime(); + resultTimeRange = H2Utils.ALL_TIMES_RANGE; + if (filter.getResultTime() != null) { + resultTimeRange = filter.getResultTime().getRange(); + descendingResultTime = filter.getResultTime().isDescendingOrder(); + latestResultOnly = filter.getResultTime().isLatestTime(); + } } } @@ -266,22 +273,26 @@ Stream getObsSeriesByFoi(long foiID, Range resultTime } - RangeCursor getObsCursor(long seriesID, Range phenomenonTimeRange) + RangeCursor getObsCursor(long seriesID, Range phenomenonTimeRange, boolean descending) { MVTimeSeriesRecordKey first = new MVTimeSeriesRecordKey(seriesID, phenomenonTimeRange.lowerEndpoint()); MVTimeSeriesRecordKey last = new MVTimeSeriesRecordKey(seriesID, phenomenonTimeRange.upperEndpoint()); - return new RangeCursor<>(obsRecordsIndex, first, last); + return descending ? + new RangeCursor<>(obsRecordsIndex, last, first, descending) : + new RangeCursor<>(obsRecordsIndex, first, last, descending); } - Stream> getObsStream(MVTimeSeriesInfo series, Range resultTimeRange, Range phenomenonTimeRange, boolean currentTimeOnly, boolean latestResultOnly) + Stream> getObsStream(MVTimeSeriesInfo series, TimeParams timeParams) { + var phenomenonTimeRange = timeParams.phenomenonTimeRange; + // if series is a special case where all obs have resultTime = phenomenonTime if (series.key.resultTime == Instant.MIN) { // if request is for current time only, get only the obs with // phenomenon time right before current time - if (currentTimeOnly) + if (timeParams.currentTimeOnly) { MVTimeSeriesRecordKey maxKey = new MVTimeSeriesRecordKey(series.id, Instant.now()); Entry e = obsRecordsIndex.floorEntry(maxKey); @@ -292,7 +303,7 @@ Stream> getObsStream(MVTimeSeriesInfo ser } // if request if for latest result only, get only the latest obs in series - if (latestResultOnly) + if (timeParams.latestResultOnly) { MVTimeSeriesRecordKey maxKey = new MVTimeSeriesRecordKey(series.id, Instant.MAX); Entry e = obsRecordsIndex.floorEntry(maxKey); @@ -303,12 +314,12 @@ Stream> getObsStream(MVTimeSeriesInfo ser } // else further restrict the requested time range using result time filter - phenomenonTimeRange = resultTimeRange.intersection(phenomenonTimeRange); + phenomenonTimeRange = timeParams.resultTimeRange.intersection(phenomenonTimeRange); } // scan using a cursor on main obs index // recreating full entries in the process - RangeCursor cursor = getObsCursor(series.id, phenomenonTimeRange); + RangeCursor cursor = getObsCursor(series.id, phenomenonTimeRange, timeParams.descendingPhenomenonTime); return cursor.entryStream(); } @@ -426,11 +437,7 @@ public Stream> selectEntries(ObsFilter filter, Set { return selectObsSeries(filter, timeParams) .flatMap(series -> { - var obsStream = getObsStream(series, - timeParams.resultTimeRange, - timeParams.phenomenonTimeRange, - timeParams.currentTimeOnly, - timeParams.latestResultOnly); + var obsStream = getObsStream(series, timeParams); return getPostFilteredResultStream(obsStream, filter).skip(i).limit(1); }); }) @@ -446,22 +453,30 @@ public Stream> selectEntries(ObsFilter filter, Set { - var obsStream = getObsStream(series, - timeParams.resultTimeRange, - timeParams.phenomenonTimeRange, - timeParams.currentTimeOnly, - timeParams.latestResultOnly); - obsStreams.add(getPostFilteredResultStream(obsStream, filter)); - }); + var obsStream = getObsStream(series, timeParams); + obsStreams.add(getPostFilteredResultStream(obsStream, filter)); + }); if (obsStreams.isEmpty()) return Stream.empty(); - // TODO group by result time when series with different result times are selected - - Comparator> comparator = Comparator.comparing(e -> e.getValue().getPhenomenonTime()); - if (filter.getPhenomenonTime() != null && filter.getPhenomenonTime().isDescendingOrder()) + // build comparator + // current order is by result time, then phenomenon time, then FOI, then datastream ID + Comparator> comparator = (e1, e2) -> { + var obs1 = e1.getValue(); + var obs2 = e2.getValue(); + var comp = obs1.getResultTime().compareTo(obs2.getResultTime()); + if (comp == 0) + comp = obs1.getPhenomenonTime().compareTo(obs2.getPhenomenonTime()); + if (comp == 0) + comp = obs1.getFoiID().compareTo(obs2.getFoiID()); + if (comp == 0) + comp = obs1.getDataStreamID().compareTo(obs2.getDataStreamID()); + return comp; + }; + if (filter.getPhenomenonTime() != null && filter.getPhenomenonTime().isDescendingOrder()) { comparator = comparator.reversed(); + } // stream and merge obs from all selected datastreams and time periods var mergeSortIt = new MergeSortSpliterator>(obsStreams, comparator); @@ -820,7 +835,7 @@ public Set> entrySet() public Iterator> iterator() { return getAllObsSeries(H2Utils.ALL_TIMES_RANGE) .flatMap(series -> { - var cursor = getObsCursor(series.id, H2Utils.ALL_TIMES_RANGE); + var cursor = getObsCursor(series.id, H2Utils.ALL_TIMES_RANGE, false); // casting is ok since set is read-only and keys are subtypes of BigId @SuppressWarnings({ "unchecked" }) @@ -850,7 +865,7 @@ public Set keySet() public Iterator iterator() { return getAllObsSeries(H2Utils.ALL_TIMES_RANGE) .flatMap(series -> { - RangeCursor cursor = getObsCursor(series.id, H2Utils.ALL_TIMES_RANGE); + RangeCursor cursor = getObsCursor(series.id, H2Utils.ALL_TIMES_RANGE, false); // casting is ok since set is read-only and keys are subtypes of BigId @SuppressWarnings({ "unchecked" }) @@ -949,11 +964,7 @@ public synchronized long removeEntries(ObsFilter filter) var timeParams = new TimeParams(filter); return selectObsSeries(filter, timeParams) .mapToLong(series -> { - var obsStream = getObsStream(series, - timeParams.resultTimeRange, - timeParams.phenomenonTimeRange, - timeParams.currentTimeOnly, - timeParams.latestResultOnly); + var obsStream = getObsStream(series, timeParams); // delete all matching record in series var numRemoved = getPostFilteredResultStream(obsStream, filter) diff --git a/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/LazyLoadingObsContainer.java b/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/LazyLoadingObsContainer.java index 8d52a017c7..e230a1e40f 100644 --- a/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/LazyLoadingObsContainer.java +++ b/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/LazyLoadingObsContainer.java @@ -14,9 +14,12 @@ package org.sensorhub.ui.table; +import java.time.Instant; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import org.sensorhub.api.common.BigId; import org.sensorhub.api.common.IdEncoder; import org.sensorhub.api.database.IObsSystemDatabase; @@ -36,8 +39,11 @@ public class LazyLoadingObsContainer extends IndexedContainer final List indexers; final int pageSize; int startIndexCache = -1; + int prevStartIndex = 0; int size = -1; TimeExtent timeRange; + Instant firstObsTime, lastObsTime; + BigId firstObsId, lastObsId; public LazyLoadingObsContainer(IObsSystemDatabase db, IdEncoder foiIdEncoder, BigId dataStreamID, Set foiIDs, List indexers, int pageSize) @@ -70,42 +76,113 @@ public List getItemIds(int startIndex, int numberOfIds) { if (timeRange != null && startIndexCache != startIndex) { - startIndexCache = startIndex; + TimeExtent pageTimeRange; + long limit = 0; + long skipCount = 0; + BigId nextKey; + boolean descending = false; + + // next page + if (startIndex == prevStartIndex+pageSize && lastObsTime != null && lastObsId != null) { + pageTimeRange = TimeExtent.period(lastObsTime, timeRange.end()); + skipCount = 1; + limit = pageSize*100; + nextKey = lastObsId; + } + + // previous page + else if (startIndex == prevStartIndex-pageSize && firstObsTime != null && firstObsId != null) { + pageTimeRange = TimeExtent.period(timeRange.begin(), firstObsTime); + skipCount = 1; + limit = pageSize*100; + nextKey = firstObsId; + descending = true; + } + + // last page + else if (startIndex >= size()-10) { + pageTimeRange = TimeExtent.period(timeRange.begin(), timeRange.end()); + skipCount = 0; + limit = size() % pageSize; + if (limit == 0) limit = 10; + nextKey = null; + descending = true; + } + + // any other page, use skip (inefficient for large page number) + else { + pageTimeRange = timeRange; + skipCount = startIndex; + limit = pageSize + startIndex; + nextKey = null; + } + + startIndexCache = prevStartIndex = startIndex; //System.out.println("Loading from " + startIndex + ", count=" + numberOfIds); var filter = new ObsFilter.Builder() .withDataStreams(dataStreamID) - .withPhenomenonTime().fromTimeExtent(timeRange).done(); + .withPhenomenonTime() + .fromTimeExtent(pageTimeRange) + .descendingOrder(descending) + .done() + .withLimit(limit); if (!foiIDs.isEmpty()) filter.withFois(foiIDs); // prefetch range from DB + // we seek by time using the filter but we also need to go to the exact key + // in case there are multiple FOIs with the same timestamp + AtomicInteger count = new AtomicInteger(0); + var obsStream = db.getObservationStore().selectEntries(filter.build()); + var obsPage = obsStream + .dropWhile(e -> nextKey != null && !e.getKey().equals(nextKey)) + .skip(skipCount) + .limit(pageSize) + .collect(Collectors.toList()); + obsStream.close(); + + // reverse order if data was collected in descending order + if (descending) + Collections.reverse(obsPage); + + // add all obs items to container removeAllItems(); - AtomicInteger count = new AtomicInteger(startIndex); - db.getObservationStore().select(filter.build()) - .skip(startIndex) - .limit(10) - .forEach(obs -> { - //System.out.println(obs.getResultTime()); - var dataBlk = obs.getResult(); - Item item = addItem(count.getAndIncrement()); - if (item != null) + obsPage.forEach(e -> { + var obs = e.getValue(); + + if (count.get() == 0) { + firstObsTime = obs.getPhenomenonTime(); + firstObsId = e.getKey(); + //System.out.println("First: " + firstObsId + " -> " + firstObsTime); + } + else if (count.get() == pageSize-1) { + lastObsTime = obs.getPhenomenonTime(); + lastObsId = e.getKey(); + //System.out.println("Last: " + lastObsId + " -> " + lastObsTime); + } + + //System.out.println("Adding " + e.getKey() + " -> " + obs.getResultTime()); + var dataBlk = obs.getResult(); + var itemId = count.getAndIncrement(); + Item item = addItem(itemId); + if (item != null) + { + int i = -1; + for (Object colId: getContainerPropertyIds()) { - int i = -1; - for (Object colId: getContainerPropertyIds()) - { - String value; - - if (i < 0) - value = foiIdEncoder.encodeID(obs.getFoiID()); - else - value = indexers.get(i).getStringValue(dataBlk); - - item.getItemProperty(colId).setValue(value); - i++; - } + String value; + + if (i < 0) + value = foiIdEncoder.encodeID(obs.getFoiID()); + else + value = indexers.get(i).getStringValue(dataBlk); + + item.getItemProperty(colId).setValue(value); + i++; } - }); + } + }); } return (List)super.getItemIds(); diff --git a/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/PagedTable.java b/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/PagedTable.java index 00a7952bf1..c25e7cff71 100644 --- a/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/PagedTable.java +++ b/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/PagedTable.java @@ -4,6 +4,7 @@ import com.vaadin.ui.Button.ClickEvent; import com.vaadin.ui.Button.ClickListener; import com.vaadin.v7.data.Container; +import com.vaadin.v7.data.Property; import com.vaadin.v7.ui.Table; import java.util.ArrayList; import java.util.Collection; @@ -99,6 +100,15 @@ private void setPageFirstIndex(int firstIndex) { firstIndex = pages * getPageLength(); } + // cleanup old listeners + for (var id: container.getItemIds()) { + var item = container.getItem(id); + for (var propId: item.getItemPropertyIds()) { + var prop = item.getItemProperty(propId); + ((Property.ValueChangeNotifier)prop).removeValueChangeListener(this); + } + } + container.setStartIndex(firstIndex); // this was causing redundant call of getItemIds(start, end) // and it is not needed because this method is called by fired page changed event @@ -166,8 +176,10 @@ public void setCurrentPage(int page) { if (newIndex < 0) { newIndex = 0; } - if (newIndex >= 0 && newIndex != container.getStartIndex()) { + if (newIndex != container.getStartIndex()) { setPageFirstIndex(newIndex); + } else { + firePagedChangedEvent(); } } diff --git a/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/PagedTableControls.java b/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/PagedTableControls.java index 7e42ff651a..280c303417 100644 --- a/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/PagedTableControls.java +++ b/sensorhub-webui-core/src/main/java/org/sensorhub/ui/table/PagedTableControls.java @@ -8,10 +8,7 @@ import com.vaadin.ui.Label; import com.vaadin.v7.data.Property; import com.vaadin.v7.data.Property.ValueChangeEvent; -import com.vaadin.v7.data.Property.ValueChangeListener; -import com.vaadin.v7.data.validator.IntegerRangeValidator; import com.vaadin.v7.ui.ComboBox; -import com.vaadin.v7.ui.TextField; public class PagedTableControls extends HorizontalLayout { @@ -23,7 +20,9 @@ public class PagedTableControls extends HorizontalLayout { private Button btnPrevious = new Button("<"); private Button btnNext = new Button(">"); private Button btnLast = new Button(">>"); - private TextField currentPageTextField = new TextField(); + //private TextField currentPageTextField = new TextField(); + private Label currentPageLabel; + private Label totalPagesLabel; @SuppressWarnings("deprecation") public PagedTableControls(final PagedTable table) { @@ -40,19 +39,25 @@ public PagedTableControls(final PagedTable table) { itemsPerPageSelect.setEnabled(false); // disable for now since it's not working correctly itemsPerPageLabel.addStyleName(UIConstants.STYLE_SMALL); - currentPageTextField.setValue(String.valueOf(table.getCurrentPage())); + /*currentPageTextField.setValue(String.valueOf(table.getCurrentPage())); currentPageTextField.setConverter(Integer.class); final IntegerRangeValidator validator = new IntegerRangeValidator("Wrong page number", 1, table.getTotalAmountOfPages()); currentPageTextField.addValidator(validator); currentPageTextField.setWidth(50, Unit.PIXELS); currentPageTextField.addStyleName(UIConstants.STYLE_SMALL); - currentPageTextField.setImmediate(true); + currentPageTextField.setImmediate(true);*/ + + currentPageLabel = new Label( + String.valueOf(table.getCurrentPage()), ContentMode.HTML); + currentPageLabel.setWidth(null); + currentPageLabel.addStyleName(UIConstants.STYLE_SMALL); Label separatorLabel = new Label(" / ", ContentMode.HTML); - final Label totalPagesLabel = new Label( - String.valueOf(table.getTotalAmountOfPages()), ContentMode.HTML); separatorLabel.setWidth(null); separatorLabel.addStyleName(UIConstants.STYLE_SMALL); + + totalPagesLabel = new Label( + String.valueOf(table.getTotalAmountOfPages()), ContentMode.HTML); totalPagesLabel.setWidth(null); totalPagesLabel.addStyleName(UIConstants.STYLE_SMALL); @@ -75,18 +80,18 @@ public void valueChange(ValueChangeEvent event) { } }); - currentPageTextField.addValueChangeListener(new ValueChangeListener() { - private static final long serialVersionUID = -1301464754009535498L; - - public void valueChange(ValueChangeEvent event) { - if (currentPageTextField.isValid() - && currentPageTextField.getValue() != null) { - int page = Integer.valueOf(String - .valueOf(currentPageTextField.getValue())); - table.setCurrentPage(page); - } - } - }); +// currentPageTextField.addValueChangeListener(new ValueChangeListener() { +// private static final long serialVersionUID = -1301464754009535498L; +// +// public void valueChange(ValueChangeEvent event) { +// if (currentPageTextField.isValid() +// && currentPageTextField.getValue() != null) { +// int page = Integer.valueOf(String +// .valueOf(currentPageTextField.getValue())); +// table.setCurrentPage(page); +// } +// } +// }); btnFirst.addClickListener(new Button.ClickListener() { private static final long serialVersionUID = -355520120491283992L; @@ -128,7 +133,7 @@ public void buttonClick(Button.ClickEvent event) { pageManagement.addComponent(btnFirst); pageManagement.addComponent(btnPrevious); //pageManagement.addComponent(pageLabel); - pageManagement.addComponent(currentPageTextField); + pageManagement.addComponent(currentPageLabel); pageManagement.addComponent(separatorLabel); pageManagement.addComponent(totalPagesLabel); pageManagement.addComponent(btnNext); @@ -136,7 +141,7 @@ public void buttonClick(Button.ClickEvent event) { pageManagement.setComponentAlignment(btnFirst, Alignment.MIDDLE_LEFT); pageManagement.setComponentAlignment(btnPrevious, Alignment.MIDDLE_LEFT); //pageManagement.setComponentAlignment(pageLabel, Alignment.MIDDLE_LEFT); - pageManagement.setComponentAlignment(currentPageTextField, Alignment.MIDDLE_LEFT); + pageManagement.setComponentAlignment(currentPageLabel, Alignment.MIDDLE_LEFT); pageManagement.setComponentAlignment(separatorLabel, Alignment.MIDDLE_LEFT); pageManagement.setComponentAlignment(totalPagesLabel, Alignment.MIDDLE_LEFT); pageManagement.setComponentAlignment(btnNext, Alignment.MIDDLE_LEFT); @@ -159,12 +164,9 @@ public void pageChanged(PagedTable.PagedTableChangeEvent event) { int pageLength = table.getPageLength(); btnNext.setEnabled(startIndex < containerDataSource.getRealSize() - pageLength); btnLast.setEnabled(startIndex < containerDataSource.getRealSize() - pageLength); - int currentPage = table.getCurrentPage(); - currentPageTextField.setValue(String.valueOf(currentPage)); - int totalAmountOfPages = table.getTotalAmountOfPages(); - totalPagesLabel.setValue(String.valueOf(totalAmountOfPages)); + currentPageLabel.setValue(String.valueOf(table.getCurrentPage())); + totalPagesLabel.setValue(String.valueOf(table.getTotalAmountOfPages())); itemsPerPageSelect.setValue(String.valueOf(pageLength)); - validator.setMaxValue(totalAmountOfPages); } }); } @@ -196,8 +198,4 @@ public Button getBtnNext() { public Button getBtnLast() { return btnLast; } - - public TextField getCurrentPageTextField() { - return currentPageTextField; - } }