Skip to content

Commit e1c52db

Browse files
Merge pull request #34 from yuriy-glotanov/2020.1beta
fix small bugs and optimize algorithms
2 parents 98ef746 + a254096 commit e1c52db

14 files changed

Lines changed: 173 additions & 215 deletions

src/main/java/su/interference/core/DataChunk.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public class DataChunk implements Chunk {
5454
private final static Logger logger = LoggerFactory.getLogger(DataChunk.class);
5555
private final static int INIT_STATE = 1;
5656
private final static int NORMAL_STATE = 2;
57-
private DataObject t;
57+
private Table t;
5858
private Class class_;
5959
private volatile int state;
6060
private volatile RowHeader header;
@@ -171,7 +171,7 @@ public void setUndoChunk(UndoChunk uc) {
171171
this.uc = uc;
172172
}
173173

174-
public DataObject getT() {
174+
public Table getT() {
175175
return t;
176176
}
177177

@@ -336,7 +336,7 @@ public DataChunk (Object o, Session s, RowId r) throws IOException, InvocationTa
336336
this.header = new RowHeader(r, null, getChunk().length, false);
337337
}
338338

339-
public DataChunk (byte[] b, int file, long frame, int hsize, DataObject t, Class c) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException, MalformedURLException {
339+
public DataChunk (byte[] b, int file, long frame, int hsize, Table t, Class c) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException, MalformedURLException {
340340
final ByteString bs = new ByteString(b);
341341
this.t = t;
342342
this.class_= c;
@@ -347,7 +347,7 @@ public DataChunk (byte[] b, int file, long frame, int hsize, DataObject t, Class
347347
}
348348

349349
//constructor for clone method - de-serialize chunk only without header
350-
public DataChunk (byte[] b, DataObject t, RowHeader h, DataChunk source) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException, MalformedURLException {
350+
public DataChunk (byte[] b, Table t, RowHeader h, DataChunk source) throws ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException, MalformedURLException {
351351
this.chunk = b;
352352
this.header = h;
353353
this.state = INIT_STATE;

src/main/java/su/interference/core/DataFrame.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ public DataFrame(int file, long pointer, int size, Table t) throws InternalExcep
4545
super (file, pointer, size, t);
4646
}
4747

48-
public DataFrame(FrameData bd, DataObject t) throws InternalException {
48+
public DataFrame(FrameData bd, Table t) throws InternalException {
4949
super (bd, t);
5050
}
5151

52-
public DataFrame(int file, long pointer, int size, FrameData bd, DataObject t, Class c) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException {
52+
public DataFrame(int file, long pointer, int size, FrameData bd, Table t, Class c) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException {
5353
super(null, file, pointer, size, bd, t, c);
5454

5555
int ptr = FRAME_HEADER_SIZE;
@@ -71,7 +71,7 @@ public DataFrame(int file, long pointer, int size, FrameData bd, DataObject t, C
7171
}
7272

7373
//constructor for replication service - data frames
74-
public DataFrame(byte[] b, int file, long pointer, Map<Long, List<Chunk>> umap, DataObject t) throws Exception {
74+
public DataFrame(byte[] b, int file, long pointer, Map<Long, List<Chunk>> umap, Table t) throws Exception {
7575
super(b, file, pointer, t);
7676

7777
int ptr = FRAME_HEADER_SIZE;
@@ -102,7 +102,7 @@ public DataFrame(byte[] b, int file, long pointer, Map<Long, List<Chunk>> umap,
102102
}
103103

104104
// constructor for replication service - undo frames
105-
public DataFrame(byte[] b, int file, long pointer, HashMap<Long, Long> imap, HashMap<Long, Long> hmap, Map<Long, List<Chunk>> umap, DataObject t, Session s) throws Exception {
105+
public DataFrame(byte[] b, int file, long pointer, HashMap<Long, Long> imap, HashMap<Long, Long> hmap, Map<Long, List<Chunk>> umap, Table t, Session s) throws Exception {
106106
super(b, file, pointer, t);
107107

108108
if (!t.getName().equals("su.interference.persistent.UndoChunk")) {

src/main/java/su/interference/core/DataObject.java

Lines changed: 0 additions & 78 deletions
This file was deleted.

src/main/java/su/interference/core/Frame.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public class Frame implements Comparable {
8181
protected byte[] b;
8282

8383
private FrameData frameData;
84-
private DataObject dataObject;
84+
private Table dataObject;
8585
private Class entityClass;
8686
private static final Logger logger = LoggerFactory.getLogger(Frame.class);
8787

@@ -100,15 +100,15 @@ public void setFrameData(FrameData frameData) {
100100
this.frameData = frameData;
101101
}
102102

103-
public DataObject getDataObject() {
103+
public Table getDataObject() {
104104
return dataObject;
105105
}
106106

107107
public Class getEntityClass() {
108108
return entityClass;
109109
}
110110

111-
public Frame(int file, long pointer, int size, DataObject t) throws InternalException {
111+
public Frame(int file, long pointer, int size, Table t) throws InternalException {
112112
this.file = file;
113113
this.pointer = pointer;
114114
this.allocFile = 0;
@@ -124,7 +124,7 @@ public Frame(int file, long pointer, int size, DataObject t) throws InternalExce
124124
}
125125
}
126126

127-
public Frame(FrameData bd, DataObject t) throws InternalException {
127+
public Frame(FrameData bd, Table t) throws InternalException {
128128
this.file = bd.getFile();
129129
this.pointer = bd.getPtr();
130130
this.allocFile = (int)bd.getAllocId()%4096;
@@ -187,7 +187,7 @@ public Frame(byte[] b) throws InternalException {
187187
}
188188

189189
//constructor for replication service
190-
public Frame(byte[] b, int file, long pointer, DataObject t) throws InternalException {
190+
public Frame(byte[] b, int file, long pointer, Table t) throws InternalException {
191191
this.dataObject = t;
192192
if (b.length<FRAME_HEADER_SIZE) {
193193
throw new InvalidFrameHeader();
@@ -232,7 +232,7 @@ public Frame(byte[] b, int file, long pointer, DataObject t) throws InternalExce
232232
}
233233
}
234234

235-
public Frame(byte[] bb, int file, long pointer, int size, FrameData bd, DataObject t, Class c) throws IOException, InternalException, ClassNotFoundException, IllegalAccessException, InstantiationException {
235+
public Frame(byte[] bb, int file, long pointer, int size, FrameData bd, Table t, Class c) throws IOException, InternalException, ClassNotFoundException, IllegalAccessException, InstantiationException {
236236
entityClass = c;
237237
dataObject = t;
238238
if (bd!=null) { //system frame not used dataobject
@@ -484,21 +484,14 @@ public synchronized void removeChunk (int ptr, LLT llt, boolean ignore) {
484484
throw new NullPointerException();
485485
}
486486
} else {
487-
if (chunk == null) {
488-
System.out.println("ops");
489-
}
490487
if (chunk.getHeader().getLltId() < sync) {
491488
final ByteString sc = new ByteString();
492489
sc.append(chunk.getHeader().getHeader());
493490
sc.append(chunk.getChunk());
494-
if (!Config.getConfig().SYNC_LOCK_ENABLE) {
495-
snap.put(llt.getId(), sc.getBytes());
496-
}
491+
if (!Config.getConfig().SYNC_LOCK_ENABLE) { snap.put(llt.getId(), sc.getBytes()); }
497492
chunk.getHeader().setLltId(llt == null ? 0 : llt.getId());
498-
if (llt != null) {
499-
llt.add(this);
500-
}
501493
}
494+
if (llt != null) { llt.add(this); }
502495
data.removeByPtr(ptr);
503496
}
504497
}

src/main/java/su/interference/core/IndexFrame.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@ public IndexFrame(int file, long pointer, int size, int objectId, Table t) throw
5252
super(file, pointer, size, t);
5353
}
5454

55-
public IndexFrame(FrameData bd, int frameType, DataObject t) throws InternalException {
55+
public IndexFrame(FrameData bd, int frameType, Table t) throws InternalException {
5656
super(bd, t);
5757
this.setType(frameType);
5858
}
5959

60-
public IndexFrame(int file, long pointer, int size, FrameData bd, DataObject t, Class c) throws IOException, InvalidFrameHeader, InvalidFrame, EmptyFrameHeaderFound, ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException {
60+
public IndexFrame(int file, long pointer, int size, FrameData bd, Table t, Class c) throws IOException, InvalidFrameHeader, InvalidFrame, EmptyFrameHeaderFound, ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException {
6161
super(null, file, pointer, size, bd, t, c);
6262
int ptr = FRAME_HEADER_SIZE;
6363
final ByteString bs = new ByteString(this.b);
@@ -84,7 +84,7 @@ public IndexFrame(int file, long pointer, int size, FrameData bd, DataObject t,
8484
}
8585

8686
//constructor for replication service
87-
public IndexFrame(byte[] b, int file, long pointer, HashMap<Long, Long> imap, HashMap<Long, Long> hmap, DataObject t) throws IOException, InvalidFrameHeader, InvalidFrame, EmptyFrameHeaderFound, ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException {
87+
public IndexFrame(byte[] b, int file, long pointer, HashMap<Long, Long> imap, HashMap<Long, Long> hmap, Table t) throws IOException, InvalidFrameHeader, InvalidFrame, EmptyFrameHeaderFound, ClassNotFoundException, InstantiationException, IllegalAccessException, InternalException {
8888
super(b, file, pointer, t);
8989
int ptr = FRAME_HEADER_SIZE;
9090
final ByteString bs = new ByteString(this.b);
@@ -123,7 +123,7 @@ public IndexFrame add (DataChunk e, Table t, Session s, LLT llt) throws Exceptio
123123

124124
if (this.isFill(e)) {
125125

126-
final int nfileId = t.getIndexFileId();
126+
final int nfileId = t.getIndexFileId(this.getFrameData());
127127
res = t.createNewFrame(this.getFrameData(), nfileId, this.getType(), 0, false, false, false, s, llt).getIndexFrame();
128128
res.setParentF(this.getParentF());
129129
res.setParentB(this.getParentB());

src/main/java/su/interference/core/Instance.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class Instance implements Interference {
6262
private static final int MAX_NODE_ID = 32;
6363
public static final int SESSION_EXPIRE = 7200000; //in ms
6464

65-
public static final int SYSTEM_VERSION = 20200524;
65+
public static final int SYSTEM_VERSION = 20200531;
6666

6767
public static final int SYSTEM_STATE_ONLINE = 1;
6868
public static final int SYSTEM_STATE_UP = 2;

src/main/java/su/interference/core/Storage.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ public DataFile[] getInitDataFiles() throws ClassNotFoundException, InternalExce
274274
return res.toArray(new DataFile[]{});
275275
}
276276

277-
public DataFile[] getInitTempFiles() throws ClassNotFoundException, InternalException, InstantiationException, IllegalAccessException {
277+
public DataFile[] getInitTempFiles() {
278278
final List<DataFile> res = new ArrayList<DataFile>();
279279
if (state == STORAGE_STATE_OPEN) {
280280
for (Map.Entry e : dfs.entrySet()) {
@@ -312,6 +312,16 @@ public DataFile[] getIndexFiles () {
312312
return res.toArray(new DataFile[]{});
313313
}
314314

315+
public DataFile[] getTempFiles () {
316+
final List<DataFile> res = new ArrayList<DataFile>();
317+
for (Map.Entry e : ifs.entrySet()) {
318+
if (((DataFile)e.getValue()).getType()==TEMPFILE_TYPEID) {
319+
res.add((DataFile)e.getValue());
320+
}
321+
}
322+
return res.toArray(new DataFile[]{});
323+
}
324+
315325
public void createSystemDataFiles (Session s, LLT llt) throws NoSuchMethodException, InternalException, IOException, InstantiationException, IllegalAccessException, InvocationTargetException, ClassNotFoundException {
316326
for (Map.Entry e : ifs.entrySet()) {
317327
if (((DataFile)e.getValue()).checkFile()==0) {

src/main/java/su/interference/core/SystemCleanUp.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import org.slf4j.LoggerFactory;
55
import su.interference.metrics.Metrics;
66
import su.interference.persistent.FrameData;
7+
import su.interference.persistent.Table;
78

89
import java.util.Map;
910
import java.util.concurrent.CountDownLatch;
@@ -12,6 +13,10 @@ public class SystemCleanUp implements Runnable, ManagedProcess {
1213
private volatile boolean f = true;
1314
CountDownLatch latch;
1415
private final static Logger logger = LoggerFactory.getLogger(SystemCleanUp.class);
16+
private static final int CLEANUP_TIMEOUT = 3000;
17+
public static final int DATA_RETRIEVED_PRIORITY = 600;
18+
public static final int INDEX_RETRIEVED_PRIORITY = 900;
19+
private static final int CLEANUP_PROTECTION_THR = 1000;
1520

1621
public void run () {
1722
while (f) {
@@ -24,7 +29,7 @@ public void run () {
2429

2530
try {
2631
//final int period = Config.getConfig().SYNC_PERIOD;
27-
Thread.sleep(3000);
32+
Thread.sleep(CLEANUP_TIMEOUT);
2833
} catch (InterruptedException e) {
2934
e.printStackTrace();
3035
}
@@ -40,17 +45,37 @@ public void stop() throws InterruptedException{
4045
}
4146

4247
private void cleanUpFrames() {
48+
4349
Metrics.get("systemCleanUp").start();
4450
int i = 0;
4551
for (Object entry : Instance.getInstance().getFramesMap().entrySet()) {
4652
final FrameData f = (FrameData) ((DataChunk) ((Map.Entry) entry).getValue()).getEntity();
47-
f.decreasePriority();
48-
if (f.isSynced() && f.getObjectId() > 999 && f.getPriority() <= 0) {
49-
f.clearFrame();
53+
final long frameAmount = f.getDataObject().getFrameAmount();
54+
if (f.getDataFile().isData() || f.getDataFile().isIndex()) {
55+
f.decreasePriority();
56+
if (f.isSynced() && f.getObjectId() > 999 && f.getPriority() == 0 && frameAmount > CLEANUP_PROTECTION_THR) {
57+
if (f.clearFrame()) {
58+
i++;
59+
}
60+
}
61+
}
62+
if (f.getDataFile().isTemp()) {
63+
if (f.isSynced() && f.getObjectId() > 999 && frameAmount > CLEANUP_PROTECTION_THR) {
64+
if (f.clearFrame()) {
65+
i++;
66+
}
67+
}
68+
}
69+
if (f.getDataFile().isUndo()) {
70+
if (f.isSynced() && f.getObjectId() > 999) {
71+
if (f.clearFrame()) {
72+
i++;
73+
}
74+
}
5075
}
51-
i++;
5276
}
5377
Metrics.get("сleanUpBlocks").put(i);
5478
Metrics.get("systemCleanUp").stop();
79+
5580
}
5681
}

0 commit comments

Comments
 (0)