Skip to content

Commit e6d6c0d

Browse files
improve of internode operations stability
1 parent 6414624 commit e6d6c0d

7 files changed

Lines changed: 85 additions & 29 deletions

File tree

interference-2020.1.jar

751 Bytes
Binary file not shown.

interference-2020.1.jar.md5

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
c790f85b766e557a220bfbc57ba76047 *interference-2020.1.jar
1+
dda5a3ba8ed161e2bbab852457b37b5d *interference-2020.1.jar

src/main/java/su/interference/core/LLT.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ this software and associated documentation files (the "Software"), to deal in
2424

2525
package su.interference.core;
2626

27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
2730
import java.util.concurrent.atomic.AtomicLong;
2831
import java.util.concurrent.locks.ReentrantLock;
2932
import java.util.concurrent.ConcurrentHashMap;
@@ -41,32 +44,55 @@ public class LLT {
4144
private static final ReentrantLock rlck = new ReentrantLock();
4245
private static final ConcurrentHashMap<Long, LLT> pool = new ConcurrentHashMap<Long, LLT>();
4346
private static final ConcurrentHashMap<Long, Frame> frames = new ConcurrentHashMap<Long, Frame>();
47+
private final static Logger logger = LoggerFactory.getLogger(LLT.class);
4448
private final boolean lock;
4549
private final long id;
50+
private final StackTraceElement[] trace;
51+
52+
// WARNING!!!
53+
// change of 'debug' value to true causes decrease total performance
54+
// dev & QA engineers may change this constant
55+
private static final boolean debug = false;
4656

47-
private LLT(boolean lock) {
48-
id = cntr.incrementAndGet();
57+
private LLT(long id, boolean lock) {
58+
this.id = id;
4959
this.lock = lock;
60+
this.trace = debug ? Thread.currentThread().getStackTrace() : null;
5061
}
5162

5263
public static long getSyncId() {
5364
return sync.get();
5465
}
5566

5667
public static LLT getLLT() throws InterruptedException {
68+
final long id_ = Thread.currentThread().getId();
69+
if (pool.get(id_) != null) {
70+
if (debug) {
71+
for (StackTraceElement e : pool.get(id_).getTrace()) {
72+
logger.info(e.toString());
73+
}
74+
}
75+
logger.error("an unexpected attempt to get llt with id = "+id_+" which already exists");
76+
throw new RuntimeException("an unexpected attempt to get llt with id = "+id_+" which already exists");
77+
}
5778
rlck.lock();
58-
final LLT llt = new LLT(false);
79+
final LLT llt = new LLT(id_, false);
5980
pool.put(llt.getId(), llt);
6081
rlck.unlock();
6182
return llt;
6283
}
6384

6485
public static LLT getLLTAndLock() throws InterruptedException {
86+
final long id_ = Thread.currentThread().getId();
87+
if (pool.get(id_) != null) {
88+
logger.error("an unexpected attempt to get llt with id = "+id_+" which already exists");
89+
throw new RuntimeException("an unexpected attempt to get llt with id = "+id_+" which already exists");
90+
}
6591
if (Config.getConfig().SYNC_LOCK_ENABLE) {
6692
rlck.lock();
6793
}
6894
while(poolNotEmpty()) { }
69-
final LLT llt = new LLT(true);
95+
final LLT llt = new LLT(id_, true);
7096
sync.compareAndSet(0, llt.getId());
7197
pool.put(llt.getId(), llt);
7298
return llt;
@@ -103,4 +129,7 @@ public long getId() {
103129
return id;
104130
}
105131

132+
public StackTraceElement[] getTrace() {
133+
return trace;
134+
}
106135
}

src/main/java/su/interference/core/SyncFrame.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,20 @@ public class SyncFrame implements Comparable, Serializable, AllowRPredicate {
6464
public SyncFrame(Frame frame, Session s, FreeFrame fb) throws Exception {
6565
final Table t = Instance.getInstance().getTableById(frame.getObjectId());
6666
final FrameData bd = Instance.getInstance().getFrameById(frame.getPtr());
67-
if (bd == null) {
67+
allowR = frame.isLocal() ? !t.isNoTran() || t.getName().equals("su.interference.persistent.UndoChunk") : false;
68+
69+
if (bd == null && allowR) {
6870
final FreeFrame fframe = Instance.getInstance().getFreeFrameById(frame.getPtr());
6971
if (fframe == null) {
7072
logger.error(frame.getClass().getSimpleName()+" does not match any system objects");
7173
throw new InternalException();
74+
} else {
75+
fframe.setPassed(1);
76+
fb = fframe;
7277
}
73-
fframe.setPassed(1);
74-
fb = fframe;
7578
//throw new MissingSyncFrameException();
7679
}
7780

78-
allowR = frame.isLocal() ? !t.isNoTran() || t.getName().equals("su.interference.persistent.UndoChunk") : false;
7981
className = bd == null ? null : t.getName();
8082
rtran = frame.getLiveTransactions();
8183
tframes = frame.getLiveTransFrames();

src/main/java/su/interference/persistent/Table.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,12 +1091,13 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt)
10911091
bd.release();
10921092
Metrics.get("persistInsertChunk").stop();
10931093

1094+
if (extllt == null) { llt.commit(); }
1095+
10941096
Metrics.get("persistInsertIndex").start();
1095-
this.persistIndexes(nc, s, llt);
1097+
//remove external llt for deadlock prevent
1098+
this.persistIndexes(nc, s, null);
10961099
Metrics.get("persistInsertIndex").stop();
10971100

1098-
if (extllt == null) { llt.commit(); }
1099-
11001101
return nc;
11011102

11021103
} else {
@@ -1608,7 +1609,7 @@ private byte[] append(byte[] b, byte[] toAdd){
16081609
/****************** persistent indexes *******************/
16091610

16101611
//rowid used in DataChunk constructor for build standalone indexes
1611-
private synchronized void add (RowId rowid, Object o, Session s, LLT llt) throws Exception {
1612+
private synchronized void add (RowId rowid, Object o, Session s, LLT extllt) throws Exception {
16121613

16131614
final DataChunk dc = new DataChunk(o, s, rowid);
16141615
final int len = dc.getBytesAmount();
@@ -1618,6 +1619,8 @@ private synchronized void add (RowId rowid, Object o, Session s, LLT llt) throws
16181619
boolean cnue = true;
16191620
FrameData target = Instance.getInstance().getFrameById(this.getFileStart()+this.getFrameStart());
16201621

1622+
final LLT llt = extllt==null?LLT.getLLT():extllt;
1623+
16211624
while (cnue) {
16221625
if (target.getIndexFrame().getType()== IndexFrame.INDEX_FRAME_LEAF) {
16231626
cnue = false;
@@ -1629,7 +1632,11 @@ private synchronized void add (RowId rowid, Object o, Session s, LLT llt) throws
16291632
target = Instance.getInstance().getFrameById(cc.getHeader().getFramePtr());
16301633
target.getIndexFrame().setMv(cc.getDcs()); //set non-persitent maxvalue
16311634
} else {
1632-
target = Instance.getInstance().getFrameById(target.getIndexFrame().getLcF()+target.getIndexFrame().getLcB()); //get by last child
1635+
final long lcId = target.getIndexFrame().getLcF()+target.getIndexFrame().getLcB();
1636+
target = Instance.getInstance().getFrameById(lcId); //get by last child
1637+
if (target == null) {
1638+
logger.info("null target returned for frame id "+lcId);
1639+
}
16331640
}
16341641
target.getIndexFrame().setParentF(parentF);
16351642
target.getIndexFrame().setParentB(parentB);
@@ -1670,6 +1677,9 @@ private synchronized void add (RowId rowid, Object o, Session s, LLT llt) throws
16701677
}
16711678
}
16721679
}
1680+
1681+
if (extllt == null) { llt.commit(); }
1682+
16731683
if (!isNoTran()) {
16741684
//todo ((EntityContainer)o).setTransId(dc.getHeader().getTran());
16751685
//todo ((EntityContainer)o).setRowId(dc.getHeader().getRowID());
@@ -1707,7 +1717,9 @@ public synchronized void storeFrames(List<SyncFrame> frames, int sourceNodeId, L
17071717
ixstartfs.put(sourceNodeId, b.getBd().getFrameId());
17081718
b.getBd().setStarted(sourceNodeId);
17091719
}
1720+
//final LLT llt_ = LLT.getLLT(); //df access reordering prevent deadlock
17101721
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), b.getBd().getFrame().getFrame(), llt, s);
1722+
//llt_.commit();
17111723
}
17121724
}
17131725

@@ -1892,7 +1904,11 @@ private synchronized DataChunk getLocalObjectByKey (long start, ValueSet key) th
18921904
if (cc!=null) {
18931905
target = Instance.getInstance().getFrameById(cc.getHeader().getFramePtr()).getIndexFrame();
18941906
} else {
1895-
target = Instance.getInstance().getFrameById(target.getLcF()+target.getLcB()).getIndexFrame(); //get by last child
1907+
final long lcId = target.getLcF()+target.getLcB();
1908+
target = Instance.getInstance().getFrameById(lcId).getIndexFrame(); //get by last child
1909+
if (target == null) {
1910+
logger.info("null target returned for frame id "+lcId);
1911+
}
18961912
}
18971913
}
18981914
}

src/main/java/su/interference/transport/SyncFrameEvent.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
7070
Session s = Session.getDntmSession();
7171
HashMap<Long, Long> hmap = new HashMap<Long, Long>();
7272
HashMap<Long, Long> hmap2 = new HashMap<Long, Long>();
73-
LLT llt = LLT.getLLT();
73+
// LLT llt = LLT.getLLT();
74+
final LLT llt = null;
7475
for (SyncFrame b : sb) {
7576
if (b.isAllowR()) {
7677
updateTransactions(b.getRtran(), s);
@@ -89,16 +90,18 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
8990
for (DataFile f : dfs) {
9091
final int order = (f.getFileId() % Storage.MAX_NODES) % Config.getConfig().FILES_AMOUNT;
9192
if (order == allocOrder) {
92-
bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), false, false, true, s, llt);
93+
final LLT llt_ = LLT.getLLT(); //df access reordering prevent deadlock
94+
bd = t.createNewFrame(null, f.getFileId(), b.getFrameType(), b.getAllocId(), false, false, true, s, llt_);
95+
llt_.commit();
9396
bd.setFrame(null);
9497
b.setDf(f);
9598
}
9699
}
97-
logger.info("create replicated frame with allocId " + b.getAllocId() + " ptr " + bd.getFrameId());
100+
logger.debug("create replicated frame with allocId " + b.getAllocId() + " ptr " + bd.getFrameId());
98101
} else {
99102
if (t.getObjectId() == bd.getObjectId()) {
100103
b.setDf(Instance.getInstance().getDataFileById(bd.getFile()));
101-
logger.info("rframe bd found with allocId=" + b.getAllocId());
104+
logger.debug("rframe bd found with allocId=" + b.getAllocId());
102105
} else {
103106
final FrameData bd_ = new FrameData(bd, t);
104107
s.delete(bd);
@@ -137,9 +140,11 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
137140
frame.setRes02(nextF);
138141
frame.setRes06(prevB);
139142
frame.setRes07(nextB);
140-
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt, s);
143+
final LLT llt_ = LLT.getLLT(); //df access reordering prevent deadlock
144+
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt_, s);
145+
llt_.commit();
141146
b.getBd().setFrame(null);
142-
logger.info("write undo frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId());
147+
logger.debug("write undo frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId());
143148
}
144149
}
145150
}
@@ -168,9 +173,11 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
168173
frame.setRes02(nextF);
169174
frame.setRes06(prevB);
170175
frame.setRes07(nextB);
171-
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt, s);
176+
final LLT llt_ = LLT.getLLT(); //df access reordering prevent deadlock
177+
b.getDf().writeFrame(b.getBd(), b.getBd().getPtr(), frame.getFrame(), llt_, s);
178+
llt_.commit();
172179
b.getBd().setFrame(frame);
173-
logger.info("write data frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId());
180+
logger.debug("write data frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId());
174181
}
175182
}
176183
}
@@ -207,7 +214,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
207214
storemap.put(t.getObjectId(), new ArrayList<>());
208215
}
209216
storemap.get(t.getObjectId()).add(b);
210-
logger.info("write index frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId());
217+
logger.debug("write index frame with allocId "+b.getAllocId()+" ptr "+b.getBd().getFrameId());
211218
}
212219
}
213220
}
@@ -222,10 +229,12 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
222229
if (this.getCallbackNodeId() == 0) {
223230
throw new RuntimeException("wrong callback node id");
224231
}
225-
t.storeFrames(entry.getValue(), this.getCallbackNodeId(), llt, s);
232+
final LLT llt_ = LLT.getLLT();
233+
t.storeFrames(entry.getValue(), this.getCallbackNodeId(), llt_, s);
234+
llt_.commit();
226235
}
227236

228-
llt.commit();
237+
// llt.commit();
229238

230239
final Map<Integer, List<FrameApi>> frames_ = new HashMap<>();
231240
for (SyncFrame f : sb) {
@@ -239,7 +248,7 @@ public synchronized int rframe2(SyncFrame[] sb) throws Exception {
239248
SQLCursor.addStreamFrame(new ContainerFrame(entry.getKey(), entry.getValue()));
240249
}
241250

242-
logger.debug(sb.length + " frame(s) were received and synced");
251+
logger.info(sb.length + " frame(s) were received and synced");
243252

244253
return 0;
245254

src/main/java/su/interference/transport/TransportSyncTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void run () {
130130
if (persist) {
131131
FrameSync bs = new FrameSync(b.getAllocId(), channel.getChannelId(), b.getFrameId());
132132
s.persist(bs);
133-
logger.info("persist framesync with allocId = "+b.getAllocId()+" for channel " + channel.getChannelId());
133+
logger.debug("persist framesync with allocId = "+b.getAllocId()+" for channel " + channel.getChannelId());
134134
}
135135
Metrics.get("syncQueue").put(1);
136136
}
@@ -150,7 +150,7 @@ public void run () {
150150
if (persist) {
151151
FrameSync bs = new FrameSync(b.getAllocId(), channel.getChannelId(), b.getFrameId());
152152
s.persist(bs);
153-
logger.info("persist framesync with allocId = "+b.getAllocId()+" for channel " + channel.getChannelId());
153+
logger.debug("persist framesync with allocId = "+b.getAllocId()+" for channel " + channel.getChannelId());
154154
}
155155
}
156156
logger.info(sb.length+" frame(s) were not sync due to node unavailable (channel id = " + channel.getChannelId() + ")");

0 commit comments

Comments
 (0)