1
1
package net .xqj .basex .local ;
2
2
3
+ import com .xqj2 .XQConnection2 ;
4
+ import static net .xqj .basex .BaseXXQInsertOptions .*;
5
+
6
+ import net .xqj .basex .BaseXXQInsertOptions ;
3
7
import org .junit .Test ;
8
+ import static org .junit .Assert .*;
4
9
5
10
import javax .xml .xquery .XQConnection ;
6
11
import javax .xml .xquery .XQException ;
7
12
import javax .xml .xquery .XQExpression ;
13
+ import javax .xml .xquery .XQItem ;
8
14
import javax .xml .xquery .XQResultSequence ;
9
15
import java .util .ArrayList ;
16
+ import java .util .HashMap ;
17
+ import java .util .UUID ;
18
+ import java .util .concurrent .ArrayBlockingQueue ;
19
+ import java .util .concurrent .Future ;
20
+ import java .util .concurrent .ThreadPoolExecutor ;
21
+ import java .util .concurrent .TimeUnit ;
10
22
11
23
/**
12
- * XQJ concurrency test.
24
+ * Test XQJ concurrency, both reads and writes
13
25
*
14
26
* @author Charles Foster
15
27
*/
16
28
public class XQJConcurrencyTest extends XQJBaseTest {
17
- /** Thread count. */
18
- private static final int THREAD_COUNT = 256 ;
19
- /** Numbers of iterations. */
29
+
30
+ /** Number of threads used when executing read only queries */
31
+ private static final int CONCURRENT_READ_THREADS = 256 ;
32
+
33
+ /** Numbers of iterations, when perform a ready query */
20
34
private static final int ITERATE_TO = 1024 ;
21
35
36
+ /** Number of threads used when writing documents **/
37
+ private static final int CONCURRENT_WRITE_THREADS = 12 ;
38
+
39
+ /** Total number of documents to insert when writing **/
40
+ private static final int DOCS_TO_INSERT = CONCURRENT_WRITE_THREADS * 30 ;
41
+
42
+ /** BaseX insert strategy for inserting documents **/
43
+ private static final BaseXXQInsertOptions INSERT_STRATEGY = options (REPLACE );
44
+
22
45
/**
23
- * Runs concurrency test.
46
+ * Runs read concurrency test.
24
47
* @throws Throwable any exception or error
25
48
*/
26
49
@ Test
27
50
public void testConcurrentXQuery1to1024 () throws Throwable {
28
51
final ArrayList <SimpleQueryThread > sqtList = new ArrayList <SimpleQueryThread >();
29
52
30
- for (int i = 0 ; i < THREAD_COUNT ; i ++)
53
+ for (int i = 0 ; i < CONCURRENT_READ_THREADS ; i ++)
31
54
sqtList .add (new SimpleQueryThread ());
32
55
33
56
for (final SimpleQueryThread s : sqtList ) s .start ();
34
57
for (final SimpleQueryThread s : sqtList ) s .join ();
35
58
for (final SimpleQueryThread s : sqtList ) if (s .thrown != null ) throw s .thrown ;
36
59
}
37
60
61
+ /**
62
+ * Runs insert concurrency test.
63
+ */
64
+ @ Test
65
+ public void testConcurrentInsert () throws Exception {
66
+
67
+ XQExpression xqpe = xqc .createExpression ();
68
+
69
+ try
70
+ {
71
+ xqpe .executeCommand ("CREATE DB xqj-concurrent-insert-test" );
72
+ xqpe .executeCommand ("OPEN xqj-concurrent-insert-test" );
73
+ xqpe .executeCommand ("SET DEFAULTDB true" );
74
+
75
+ HashMap <String , XQItem > docs = new HashMap <String , XQItem >();
76
+
77
+ ThreadPoolExecutor tpe =
78
+ new ThreadPoolExecutor (
79
+ CONCURRENT_WRITE_THREADS , CONCURRENT_WRITE_THREADS , 4l ,
80
+ TimeUnit .SECONDS ,
81
+ new ArrayBlockingQueue <Runnable >(CONCURRENT_READ_THREADS ),
82
+ new ThreadPoolExecutor .CallerRunsPolicy ());
83
+
84
+ ArrayList <Future > futures = new ArrayList <Future >();
85
+
86
+ for (int i =0 ;i <DOCS_TO_INSERT ;i ++) {
87
+ String uri = i + "-" + UUID .randomUUID ().toString () + ".xml" ;
88
+ XQItem item = createDocument ("<e>" + uri + "</e>" );
89
+ docs .put (uri , item );
90
+ }
91
+
92
+ for (String uri : docs .keySet ())
93
+ futures .add (tpe .submit (new InsertItemThread (uri , docs .get (uri ))));
94
+
95
+ for (Future future : futures )
96
+ future .get ();
97
+
98
+ for (String uri : docs .keySet ())
99
+ assertTrue (docAvailable (uri ));
100
+ }
101
+ finally {
102
+ xqpe .executeCommand ("DROP DB xqj-concurrent-insert-test" );
103
+ }
104
+ }
105
+
38
106
/**
39
107
* Query Thread.
40
108
*/
@@ -44,6 +112,7 @@ private class SimpleQueryThread extends Thread {
44
112
45
113
@ Override
46
114
public void run () {
115
+
47
116
XQConnection newConnection = null ;
48
117
49
118
try {
@@ -76,6 +145,31 @@ public void run() {
76
145
}
77
146
}
78
147
148
+ private class InsertItemThread extends Thread {
149
+
150
+ /** uri of document being inserted **/
151
+ private final String uri ;
152
+
153
+ /** content of document being inserted **/
154
+ private final XQItem item ;
155
+
156
+ public InsertItemThread (String uri , XQItem item ) {
157
+ this .uri = uri ;
158
+ this .item = item ;
159
+ }
160
+
161
+ @ Override
162
+ public void run () {
163
+ try {
164
+ XQConnection2 xqc2 = (XQConnection2 )xqc ;
165
+ xqc2 .insertItem (uri , item , INSERT_STRATEGY );
166
+ } catch (final Throwable th ) {
167
+ // a JUnit assertion WILL fail later because of this happening.
168
+ th .printStackTrace ();
169
+ }
170
+ }
171
+ }
172
+
79
173
/**
80
174
* Closes a connection.
81
175
* @param conn connection to be closed
0 commit comments