Skip to content

Commit 2b00dda

Browse files
fix rsync bugs =unstable=
1 parent 209770b commit 2b00dda

6 files changed

Lines changed: 75 additions & 100 deletions

File tree

src/main/java/su/interference/core/DataChunk.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -527,9 +527,11 @@ public Object getEntity () {
527527
final DataChunk dc = (DataChunk) Instance.getInstance().getChunkByPointer(this.getHeader().getFramePtr(), this.getHeader().getFramePtrRowId().getRowPointer());
528528
((IndexChunk)o).setDataChunk(dc);
529529
if (dc == null) {
530-
final long allocId = Instance.getInstance().getFrameById(this.header.getRowID().getFileId()+this.header.getRowID().getFramePointer()).getAllocId();
531-
final long allocId2 = Instance.getInstance().getFrameById(this.getHeader().getFramePtr()).getAllocId();
532-
logger.error("null datachunk found for indexframe allocId = " + allocId + " indexptr allocId = " + allocId2);
530+
// todo during rframe.IndexFrame.init system directory not yet contains replicated FrameData objects
531+
// final long allocId = Instance.getInstance().getFrameById(this.header.getRowID().getFileId()+this.header.getRowID().getFramePointer()).getAllocId();
532+
// final long allocId2 = Instance.getInstance().getFrameById(this.getHeader().getFramePtr()).getAllocId();
533+
// logger.error("null datachunk found for indexframe allocId = " + allocId + " indexptr allocId = " + allocId2);
534+
logger.error("null datachunk found");
533535
}
534536
}
535537
if (!t.isNoTran()) {

src/main/java/su/interference/core/SyncFrame.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class SyncFrame implements Comparable, Serializable, AllowRPredicate {
5656
private final boolean started;
5757
private FrameData bd;
5858
private DataFile df;
59+
private Frame rFrame;
5960
private final HashMap<Long, Long> imap;
6061
private final HashMap<Long, Transaction> rtran;
6162
private final ArrayList<TransFrame> tframes;
@@ -128,6 +129,14 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb) throws Exception {
128129
objectId = bd == null ? 0 : frame.getObjectId();
129130
}
130131

132+
public Frame getRFrame() {
133+
return rFrame;
134+
}
135+
136+
public void setRFrame(Frame rFrame) {
137+
this.rFrame = rFrame;
138+
}
139+
131140
public byte[] getBytes() {
132141
return b;
133142
}

src/main/java/su/interference/persistent/FrameData.java

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,7 @@ public class FrameData implements Serializable, Comparable, FrameApi, FilePartit
9696
@Column
9797
private AtomicInteger current;
9898
@Column
99-
@IndexColumn
10099
private volatile int started;
101-
@Column
102-
@IndexColumn
103-
private AtomicInteger allocated;
104100
@Id
105101
@MapColumn
106102
@Transient
@@ -156,19 +152,6 @@ public void clearCurrent() {
156152
}
157153
}
158154

159-
public void allocate() {
160-
if (this.allocated==null) {
161-
this.allocated = new AtomicInteger(0);
162-
}
163-
this.allocated.compareAndSet(0, 1);
164-
}
165-
166-
public void deallocate() {
167-
if (this.allocated!=null) {
168-
this.allocated.compareAndSet(1, 0);
169-
}
170-
}
171-
172155
public DataObject getDataObject() {
173156
if (dataObject==null) {
174157
dataObject = Instance.getInstance().getTableById(this.objectId);
@@ -227,10 +210,6 @@ public void setFrame(Frame b) {
227210
this.frame = b;
228211
}
229212

230-
public FrameData() {
231-
this.allocated = new AtomicInteger();
232-
}
233-
234213
public long getFrameId() {
235214
return this.ptr+this.file;
236215
}
@@ -322,14 +301,34 @@ public int hasRemoteTransactions() throws InternalException {
322301
return nodeId;
323302
}
324303

304+
public FrameData() {
305+
306+
}
307+
325308
public FrameData(int file, long ptr, int size, DataObject tt) {
326309
this.dataObject = tt;
327310
this.objectId = tt.getObjectId();
328311
this.allocId = file+ptr;
329312
this.file = file;
330313
this.ptr = ptr;
331314
this.size = size;
332-
this.allocated = new AtomicInteger();
315+
}
316+
317+
public FrameData(FrameData bd, DataObject tt) {
318+
this.dataObject = tt;
319+
this.objectId = tt.getObjectId();
320+
this.allocId = bd.getFile()+bd.getPtr();
321+
this.file = bd.getFile();
322+
this.ptr = bd.getPtr();
323+
this.size = bd.getSize();
324+
this.allocId = bd.getAllocId();
325+
this.started = bd.getStarted();
326+
this.current = bd.getCurrent();
327+
this.used = bd.getUsed();
328+
this.prevFrame = bd.getPrevFrame();
329+
this.prevFile = bd.getPrevFile();
330+
this.nextFrame = bd.getNextFrame();
331+
this.nextFile = bd.getNextFile();
333332
}
334333

335334
public int compareTo(Object obj) {
@@ -371,10 +370,6 @@ public int getObjectId() {
371370
return objectId;
372371
}
373372

374-
public void setObjectId(int objectId) {
375-
this.objectId = objectId;
376-
}
377-
378373
public int getFile() {
379374
return file;
380375
}
@@ -490,12 +485,4 @@ public int getStarted() {
490485
public void setStarted(int started) {
491486
this.started = started;
492487
}
493-
494-
public AtomicInteger getAllocated() {
495-
return allocated;
496-
}
497-
498-
public void setAllocated(AtomicInteger allocated) {
499-
this.allocated = allocated;
500-
}
501488
}

src/main/java/su/interference/persistent/Source.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ this software and associated documentation files (the "Software"), to deal in
3434
*/
3535

3636
@Entity
37-
@SystemEntity
3837
public class Source {
3938
@Column
4039
@Id

src/main/java/su/interference/persistent/Table.java

Lines changed: 30 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,7 @@ public Table (DataChunk chunk, IndexList ixl) throws IllegalAccessException, Cla
656656
this.genericClass = Instance.getUCL().loadClass(name);
657657
final SystemEntity sa = (SystemEntity)this.genericClass.getAnnotation(SystemEntity.class);
658658
final IndexEntity xa = (IndexEntity)this.genericClass.getAnnotation(IndexEntity.class);
659+
final ResultSetEntity rsa = (ResultSetEntity)this.genericClass.getAnnotation(ResultSetEntity.class);
659660
this.notran = sa!=null;
660661
this.index = xa!=null;
661662
this.idfield = getTableIdField();
@@ -681,39 +682,32 @@ public Table (DataChunk chunk, IndexList ixl) throws IllegalAccessException, Cla
681682
}
682683
getMapFieldByColumn("frameId").setMap(ixlb);
683684
getMapFieldByColumn("allocId").setMap(ixla);
684-
getIndexFieldByColumn("started").setIndex(ixls);
685685
} else if (this.name.equals("su.interference.persistent.UndoChunk")) {
686686
//none
687687
} else {
688-
Class cc = this.getTableClass();
689688
try {
690-
java.lang.reflect.Field fc = null;
691-
Constructor ccc = null;
692-
try {
693-
fc = cc.getField("CLASS_ID");
694-
} catch (NoSuchFieldException e) { logger.info("NoSuchFieldException: CLASS_ID during Table construct");; }
695-
try {
696-
ccc = cc.getConstructor(DataChunk.class);
697-
} catch (NoSuchMethodException e) { logger.info("NoSuchMethodException: <init>(DataChunk) during Table:"+cc.getName()+" construct"); }
698-
if (fc != null && ccc != null) {
699-
final List<Object> bds = ixl.getObjectsByKey(fc.getInt(this));
700-
for (Object b : bds) {
701-
final FrameData bd = (FrameData)((DataChunk)b).getEntity();
702-
bd.setDataObject(this); //todo must be refactored, FrameData->new DataFrame->new DataChunk->t.getFields() possibly may be simply
703-
DataFrame db = null;
689+
final List<Object> bds = ixl.getObjectsByKey(this.getObjectId());
690+
for (Object b : bds) {
691+
final FrameData bd = (FrameData)((DataChunk)b).getEntity();
692+
bd.setDataObject(this); //todo must be refactored, FrameData->new DataFrame->new DataChunk->t.getFields() possibly may be simply
693+
694+
if (sa != null && rsa == null && this.objectId != Table.CLASS_ID) {
695+
Frame db = null;
704696
try {
705-
db = bd.getDataFrame();
697+
db = bd.getFrame();
706698
} catch (Exception e) {
699+
logger.error("internal Table.<init>");
707700
}
708-
for (Chunk ck : db.getChunks()) {
709-
if (ck.getHeader().getState()==Header.RECORD_NORMAL_STATE) { //miss deleted or archived records
710-
//Object to;
711-
//to = ccc.newInstance((DataChunk)ck);
712-
//this.addIndexValue(to);
713-
this.addIndexValue((DataChunk)ck);
701+
for (Chunk ck : ((DataFrame)db).getChunks()) {
702+
if (ck.getHeader().getState() == Header.RECORD_NORMAL_STATE) { //miss deleted or archived records
703+
this.addIndexValue((DataChunk) ck);
714704
}
715705
}
716706
}
707+
//set start frames for remote indexes
708+
if (bd.getStarted() > 0) {
709+
ixstartfs.put(bd.getStarted(), bd.getFrameId());
710+
}
717711
}
718712
} catch (Exception e) {
719713
e.printStackTrace();
@@ -1355,12 +1349,8 @@ public Boolean call() throws Exception {
13551349
synchronized (t) {
13561350
List<Long> startframes = new ArrayList<>();
13571351
startframes.add(t.fileStart + t.frameStart);
1358-
List<FrameData> bb = Instance.getInstance().getTableById(getObjectId()).getFrames();
1359-
1360-
for (FrameData b : bb) {
1361-
if (b.getStarted() > 0) {
1362-
startframes.add(b.getFrameId());
1363-
}
1352+
for (Map.Entry<Integer, Long> entry : ixstartfs.entrySet()) {
1353+
startframes.add(entry.getValue());
13641354
}
13651355

13661356
//todo need to implement merge algorithm for multinode indexes
@@ -1712,6 +1702,7 @@ public synchronized void remove (ValueSet key, Object o, Session s, LLT llt) thr
17121702
public synchronized void storeFrames(List<SyncFrame> frames, int sourceNodeId, LLT llt, Session s) throws Exception {
17131703
for (SyncFrame b : frames) {
17141704
b.getBd().setStarted(0);
1705+
b.getBd().setFrame(b.getRFrame());
17151706
if (b.isStarted()) {
17161707
ixstartfs.put(sourceNodeId, b.getBd().getFrameId());
17171708
b.getBd().setStarted(sourceNodeId);
@@ -1724,12 +1715,8 @@ public synchronized void storeFrames(List<SyncFrame> frames, int sourceNodeId, L
17241715
public synchronized List<Chunk> getContent(Session s) throws IOException, InternalException, NoSuchMethodException, InvocationTargetException, EmptyFrameHeaderFound, ClassNotFoundException, InstantiationException, IllegalAccessException {
17251716
ArrayList<Chunk> res = new ArrayList<Chunk>();
17261717
res.addAll(getLocalContent(this.fileStart+this.frameStart, s));
1727-
//todo need performance optimizing
1728-
List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
1729-
for (FrameData b : bb) {
1730-
if (b.getStarted() > 0) {
1731-
res.addAll(getLocalContent(b.getFrameId(), s));
1732-
}
1718+
for (Map.Entry<Integer, Long> entry : ixstartfs.entrySet()) {
1719+
res.addAll(getLocalContent(entry.getValue(), s));
17331720
}
17341721
return res;
17351722
}
@@ -1782,12 +1769,8 @@ private synchronized List<Chunk> getLocalContent(long start, Session s) throws I
17821769
private synchronized ArrayList<FrameData> getLeafFrames (Session s) throws IOException, InternalException, NoSuchMethodException, InvocationTargetException, EmptyFrameHeaderFound, ClassNotFoundException, InstantiationException, IllegalAccessException {
17831770
ArrayList<FrameData> res = new ArrayList<FrameData>();
17841771
res.addAll(getLocalLeafFrames(this.fileStart+this.frameStart, s));
1785-
//todo need performance optimizing
1786-
List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
1787-
for (FrameData b : bb) {
1788-
if (b.getStarted() > 0) {
1789-
res.addAll(getLocalLeafFrames(b.getFrameId(), s));
1790-
}
1772+
for (Map.Entry<Integer, Long> entry : ixstartfs.entrySet()) {
1773+
res.addAll(getLocalLeafFrames(entry.getValue(), s));
17911774
}
17921775
return res;
17931776
}
@@ -1878,14 +1861,10 @@ public synchronized DataChunk getObjectByKey (ValueSet key) throws IOException,
18781861
if (dc != null) {
18791862
return dc;
18801863
} else {
1881-
//todo need performance optimizing
1882-
final List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
1883-
for (FrameData b : bb) {
1884-
if (b.getStarted() > 0) {
1885-
final DataChunk dc_ = getLocalObjectByKey(b.getFrameId(), key);
1886-
if (dc_ != null) {
1887-
return dc_;
1888-
}
1864+
for (Map.Entry<Integer, Long> entry : ixstartfs.entrySet()) {
1865+
final DataChunk dc_ = getLocalObjectByKey(entry.getValue(), key);
1866+
if (dc_ != null) {
1867+
return dc_;
18891868
}
18901869
}
18911870
}
@@ -1895,12 +1874,8 @@ public synchronized DataChunk getObjectByKey (ValueSet key) throws IOException,
18951874
public synchronized List<DataChunk> getObjectsByKey (ValueSet key) throws IOException, InternalException, NoSuchMethodException, InvocationTargetException, EmptyFrameHeaderFound, ClassNotFoundException, InstantiationException, IllegalAccessException {
18961875
final long start = this.fileStart+this.frameStart;
18971876
final List<DataChunk> r = getLocalObjectsByKey(start, key);
1898-
//todo need performance optimizing
1899-
final List<FrameData> bb = Instance.getInstance().getTableById(this.getObjectId()).getFrames();
1900-
for (FrameData b : bb) {
1901-
if (b.getStarted() > 0) {
1902-
r.addAll(getLocalObjectsByKey(b.getFrameId(), key));
1903-
}
1877+
for (Map.Entry<Integer, Long> entry : ixstartfs.entrySet()) {
1878+
r.addAll(getLocalObjectsByKey(entry.getValue(), key));
19041879
}
19051880
return r;
19061881
}

src/main/java/su/interference/transport/SyncFrameEvent.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,11 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
100100
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
101101
s.delete(bd);
102102
} else {
103-
bd.setObjectId(b.getObjectId());
104-
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
103+
Table t_ = Instance.getInstance().getTableById(b.getObjectId());
104+
FrameData bd_ = new FrameData(bd, t_);
105+
s.delete(bd);
106+
s.persist(bd_);
107+
b.setDf(Instance.getInstance().getDataFileById(bd_.getFile()));
105108
}
106109
}
107110
}
@@ -200,14 +203,14 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
200203
frame.setRes05(lcF);
201204
frame.setRes06(parentB);
202205
frame.setRes07(lcB);
203-
b.getBd().setFrame(frame);
206+
b.setRFrame(frame);
207+
if (storemap.get(t.getObjectId()) == null) {
208+
storemap.put(t.getObjectId(), new ArrayList<>());
209+
}
210+
storemap.get(t.getObjectId()).add(b);
204211
logger.info("write index frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
205212
}
206213
}
207-
if (storemap.get(t.getObjectId()) == null) {
208-
storemap.put(t.getObjectId(), new ArrayList<>());
209-
}
210-
storemap.get(t.getObjectId()).add(b);
211214
}
212215
} catch (Exception e) {
213216
e.printStackTrace();

0 commit comments

Comments
 (0)