Skip to content

Commit 6414624

Browse files
fix rsync bugs =unstable=
1 parent 2b00dda commit 6414624

2 files changed

Lines changed: 33 additions & 36 deletions

File tree

src/main/java/su/interference/core/SyncFrame.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ public class SyncFrame implements Comparable, Serializable, AllowRPredicate {
4646
private final long allocId;
4747
private final int fileType;
4848
private final int frameType;
49-
private final int objectId;
5049
private final String className;
5150
private long prevId;
5251
private long nextId;
@@ -77,7 +76,7 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb) throws Exception {
7776
}
7877

7978
allowR = frame.isLocal() ? !t.isNoTran() || t.getName().equals("su.interference.persistent.UndoChunk") : false;
80-
className = t.getName();
79+
className = bd == null ? null : t.getName();
8180
rtran = frame.getLiveTransactions();
8281
tframes = frame.getLiveTransFrames();
8382
if (frame.getClass().getName().equals("su.interference.core.DataFrame")) {
@@ -126,7 +125,6 @@ public SyncFrame(Frame frame, Session s, FreeFrame fb) throws Exception {
126125
b = frame.getFrame();
127126
frameId = frame.getPtr();
128127
this.allocId = frame.getAllocFile()+ frame.getAllocPointer();
129-
objectId = bd == null ? 0 : frame.getObjectId();
130128
}
131129

132130
public Frame getRFrame() {
@@ -157,10 +155,6 @@ public int getFrameType() {
157155
return frameType;
158156
}
159157

160-
public int getObjectId() {
161-
return objectId;
162-
}
163-
164158
public int getFile() {
165159
return (int)frameId%4096;
166160
}

src/main/java/su/interference/transport/SyncFrameEvent.java

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -75,33 +75,32 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
7575
if (b.isAllowR()) {
7676
updateTransactions(b.getRtran(), s);
7777
FrameData bd = Instance.getInstance().getFrameByAllocId(b.getAllocId());
78-
if (bd == null) {
79-
final Table t = Instance.getInstance().getTableByName(b.getClassName());
80-
final int allocFileId = (int) b.getAllocId() % 4096;
81-
final int allocOrder = (allocFileId % Storage.MAX_NODES) % Config.getConfig().FILES_AMOUNT;
82-
ArrayList<DataFile> dfs = Instance.getInstance().getDataFilesByType(b.getFileType());
83-
for (DataFile f : dfs) {
84-
final int order = (f.getFileId() % Storage.MAX_NODES) % Config.getConfig().FILES_AMOUNT;
85-
if (order == allocOrder) {
86-
bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), false, false, true, s, llt);
87-
bd.setFrame(null);
88-
b.setDf(f);
89-
}
90-
}
91-
logger.info("create replicated frame with allocId "+b.getAllocId()+" ptr "+bd.getPtr());
78+
final Table t = Instance.getInstance().getTableByName(b.getClassName());
79+
if (t == null) {
80+
final FreeFrame fb = new FreeFrame(0, bd.getFrameId(), bd.getSize());
81+
s.persist(fb, llt);
82+
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
83+
s.delete(bd);
9284
} else {
93-
if (b.getObjectId() == bd.getObjectId()) {
94-
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
95-
logger.info("rframe bd found with allocId=" + b.getAllocId());
85+
if (bd == null) {
86+
final int allocFileId = (int) b.getAllocId() % 4096;
87+
final int allocOrder = (allocFileId % Storage.MAX_NODES) % Config.getConfig().FILES_AMOUNT;
88+
ArrayList<DataFile> dfs = Instance.getInstance().getDataFilesByType(b.getFileType());
89+
for (DataFile f : dfs) {
90+
final int order = (f.getFileId() % Storage.MAX_NODES) % Config.getConfig().FILES_AMOUNT;
91+
if (order == allocOrder) {
92+
bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), false, false, true, s, llt);
93+
bd.setFrame(null);
94+
b.setDf(f);
95+
}
96+
}
97+
logger.info("create replicated frame with allocId " + b.getAllocId() + " ptr " + bd.getFrameId());
9698
} else {
97-
if (b.getObjectId() == 0) {
98-
final FreeFrame fb = new FreeFrame(0, bd.getFrameId(), bd.getSize());
99-
s.persist(fb, llt);
99+
if (t.getObjectId() == bd.getObjectId()) {
100100
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
101-
s.delete(bd);
101+
logger.info("rframe bd found with allocId=" + b.getAllocId());
102102
} else {
103-
Table t_ = Instance.getInstance().getTableById(b.getObjectId());
104-
FrameData bd_ = new FrameData(bd, t_);
103+
final FrameData bd_ = new FrameData(bd, t);
105104
s.delete(bd);
106105
s.persist(bd_);
107106
b.setDf(Instance.getInstance().getDataFileById(bd_.getFile()));
@@ -140,7 +139,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
140139
frame.setRes07(nextB);
141140
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt, s);
142141
b.getBd().setFrame(null);
143-
logger.info("write undo frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
142+
logger.info("write undo frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId());
144143
}
145144
}
146145
}
@@ -171,7 +170,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
171170
frame.setRes07(nextB);
172171
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt, s);
173172
b.getBd().setFrame(frame);
174-
logger.info("write data frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
173+
logger.info("write data frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId());
175174
}
176175
}
177176
}
@@ -208,7 +207,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
208207
storemap.put(t.getObjectId(), new ArrayList<>());
209208
}
210209
storemap.get(t.getObjectId()).add(b);
211-
logger.info("write index frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getPtr());
210+
logger.info("write index frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId());
212211
}
213212
}
214213
}
@@ -220,17 +219,21 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
220219

221220
for (Map.Entry<Integer, List<SyncFrame>> entry : storemap.entrySet()) {
222221
final Table t = Instance.getInstance().getTableById(entry.getKey());
222+
if (this.getCallbackNodeId() == 0) {
223+
throw new RuntimeException("wrong callback node id");
224+
}
223225
t.storeFrames(entry.getValue(), this.getCallbackNodeId(), llt, s);
224226
}
225227

226228
llt.commit();
227229

228230
final Map<Integer, List<FrameApi>> frames_ = new HashMap<>();
229231
for (SyncFrame f : sb) {
230-
if (frames_.get(f.getObjectId()) == null) {
231-
frames_.put(f.getObjectId(), new ArrayList<>());
232+
final Table t = Instance.getInstance().getTableByName(f.getClassName());
233+
if (frames_.get(t.getObjectId()) == null) {
234+
frames_.put(t.getObjectId(), new ArrayList<>());
232235
}
233-
frames_.get(f.getObjectId()).add(f.getBd());
236+
frames_.get(t.getObjectId()).add(f.getBd());
234237
}
235238
for (Map.Entry<Integer, List<FrameApi>> entry: frames_.entrySet()) {
236239
SQLCursor.addStreamFrame(new ContainerFrame(entry.getKey(), entry.getValue()));

0 commit comments

Comments
 (0)