forked from scala/scala3
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTransactions.scala
113 lines (96 loc) · 3.05 KB
/
Transactions.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package scala.concurrent1
class AbortException extends RuntimeException
object Transaction {
private var cnt = 0L
def nextId: Long = synchronized {
cnt += 1; cnt
}
// Transaction status constants
val Running = 0
val Committed = 1
val Abortable = 2
val Aborted = 3
val Compound = 4
def atomic[T](b: Transaction => T): Option[T] =
(new Transaction).run(b)
}
class Transaction {
var status: Int = _
var id: Long = _ // only for real transactions
var head: Transaction = this
var next: Transaction = null
def this(hd: Transaction, tl: Transaction) = { this(); this.head = head; this.next = next }
def makeAbort() = synchronized {
while (status != Transaction.Aborted && status != Transaction.Committed) {
status = Transaction.Abortable
wait()
}
}
private def abort() = synchronized { status = Transaction.Aborted; notifyAll() }
private def commit() = synchronized { status = Transaction.Committed; notifyAll() }
def run[T](b: Transaction => T): Option[T] =
try {
status = Transaction.Running
id = Transaction.nextId
val result = Some(b(this))
commit()
result
} catch {
case ex: AbortException => abort(); None
case ex: Throwable => abort(); throw ex
}
}
trait Transactional {
/** create a new snapshot */
def checkPoint(): Unit
/** copy back snapshot */
def rollBack(): Unit
var readers: Transaction
var writer: Transaction
def currentWriter(): Transaction = null
if (writer == null) null
else if (writer.status == Transaction.Running) writer
else {
if (writer.status != Transaction.Committed) rollBack();
writer = null;
null
}
def getter(thisTrans: Transaction): Unit = {
if (writer == thisTrans) return
var r = readers
while (r != null && r.head.status != Transaction.Running) { r = r.next; readers = r }
while (r != null) {
if (r.head == thisTrans) return
val last = r
r = r.next
while (r != null && r.head.status != Transaction.Running) { r = r.next; last.next = r }
}
synchronized {
if (thisTrans.status == Transaction.Abortable) throw new AbortException
val w = currentWriter()
if (w != null)
if (thisTrans.id < w.id) { w.makeAbort(); rollBack(); writer = null }
else throw new AbortException
readers = if (readers == null) thisTrans else new Transaction(thisTrans, readers)
}
}
def setter(thisTrans: Transaction): Unit = {
if (writer == thisTrans) return
synchronized {
val w = currentWriter()
if (w != null)
if (thisTrans.id < w.id) { w.makeAbort(); rollBack() }
else throw new AbortException
var r = readers
while (r != null && r.head.status != Transaction.Running) { r = r.next; readers = r }
while (r != null) {
if (r.id < thisTrans.id) throw new AbortException
else w.makeAbort()
val last = r
r = r.next
while (r != null && r.head.status != Transaction.Running) { r = r.next; last.next = r }
}
checkPoint()
}
}
}