30
30
import java .util .Collections ;
31
31
import java .util .List ;
32
32
import java .util .Map ;
33
+ import java .util .concurrent .Callable ;
33
34
import java .util .concurrent .ConcurrentHashMap ;
35
+ import java .util .concurrent .ExecutionException ;
36
+ import java .util .concurrent .TimeUnit ;
34
37
import java .util .concurrent .atomic .AtomicInteger ;
35
38
36
39
import org .apache .commons .logging .Log ;
37
40
import org .apache .commons .logging .LogFactory ;
38
41
import org .apache .hadoop .classification .InterfaceAudience ;
39
42
import org .apache .hadoop .conf .Configuration ;
43
+ import org .apache .hadoop .hbase .client .HTableFactory ;
40
44
import org .apache .hadoop .hbase .client .HTableInterface ;
41
45
import org .apache .hadoop .hbase .client .HTablePool ;
42
46
import org .apache .hadoop .hbase .client .ResultScanner ;
43
- import org .apache .hadoop .hbase .client . RowMutations ;
47
+ import org .apache .hadoop .hbase .security . UserProvider ;
44
48
import org .apache .hadoop .hbase .thrift .ThriftMetrics ;
45
49
import org .apache .hadoop .hbase .thrift2 .generated .*;
50
+ import org .apache .hadoop .hbase .util .Bytes ;
51
+ import org .apache .hadoop .hbase .util .ConnectionCache ;
46
52
import org .apache .thrift .TException ;
47
53
54
+ import com .google .common .cache .Cache ;
55
+ import com .google .common .cache .CacheBuilder ;
56
+
48
57
/**
49
58
* This class is a glue object that connects Thrift RPC calls to the HBase client API primarily
50
59
* defined in the HTableInterface.
51
60
*/
52
61
@ InterfaceAudience .Private
62
+ @ SuppressWarnings ("deprecation" )
53
63
public class ThriftHBaseServiceHandler implements THBaseService .Iface {
54
64
55
65
// TODO: Size of pool configuraple
56
- private final HTablePool htablePool ;
66
+ private final Cache <String , HTablePool > htablePools ;
67
+ private final Callable <? extends HTablePool > htablePoolCreater ;
57
68
private static final Log LOG = LogFactory .getLog (ThriftHBaseServiceHandler .class );
58
69
59
70
// nextScannerId and scannerMap are used to manage scanner state
@@ -62,8 +73,15 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
62
73
private final Map <Integer , ResultScanner > scannerMap =
63
74
new ConcurrentHashMap <Integer , ResultScanner >();
64
75
65
- public static THBaseService .Iface newInstance (Configuration conf , ThriftMetrics metrics ) {
66
- THBaseService .Iface handler = new ThriftHBaseServiceHandler (conf );
76
+ private final ConnectionCache connectionCache ;
77
+ private final HTableFactory tableFactory ;
78
+ private final int maxPoolSize ;
79
+
80
+ static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval" ;
81
+ static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime" ;
82
+
83
+ public static THBaseService .Iface newInstance (
84
+ THBaseService .Iface handler , ThriftMetrics metrics ) {
67
85
return (THBaseService .Iface ) Proxy .newProxyInstance (handler .getClass ().getClassLoader (),
68
86
new Class [] { THBaseService .Iface .class }, new THBaseServiceMetricsProxy (handler , metrics ));
69
87
}
@@ -98,13 +116,41 @@ private static long now() {
98
116
return System .nanoTime ();
99
117
}
100
118
101
- ThriftHBaseServiceHandler (Configuration conf ) {
102
- int maxPoolSize = conf .getInt ("hbase.thrift.htablepool.size.max" , 1000 );
103
- htablePool = new HTablePool (conf , maxPoolSize );
119
+ ThriftHBaseServiceHandler (final Configuration conf ,
120
+ final UserProvider userProvider ) throws IOException {
121
+ int cleanInterval = conf .getInt (CLEANUP_INTERVAL , 10 * 1000 );
122
+ int maxIdleTime = conf .getInt (MAX_IDLETIME , 10 * 60 * 1000 );
123
+ connectionCache = new ConnectionCache (
124
+ conf , userProvider , cleanInterval , maxIdleTime );
125
+ tableFactory = new HTableFactory () {
126
+ @ Override
127
+ public HTableInterface createHTableInterface (Configuration config ,
128
+ byte [] tableName ) {
129
+ try {
130
+ return connectionCache .getTable (Bytes .toString (tableName ));
131
+ } catch (IOException ioe ) {
132
+ throw new RuntimeException (ioe );
133
+ }
134
+ }
135
+ };
136
+ htablePools = CacheBuilder .newBuilder ().expireAfterAccess (
137
+ maxIdleTime , TimeUnit .MILLISECONDS ).softValues ().concurrencyLevel (4 ).build ();
138
+ maxPoolSize = conf .getInt ("hbase.thrift.htablepool.size.max" , 1000 );
139
+ htablePoolCreater = new Callable <HTablePool >() {
140
+ public HTablePool call () {
141
+ return new HTablePool (conf , maxPoolSize , tableFactory );
142
+ }
143
+ };
104
144
}
105
145
106
146
private HTableInterface getTable (ByteBuffer tableName ) {
107
- return htablePool .getTable (byteBufferToByteArray (tableName ));
147
+ String currentUser = connectionCache .getEffectiveUser ();
148
+ try {
149
+ HTablePool htablePool = htablePools .get (currentUser , htablePoolCreater );
150
+ return htablePool .getTable (byteBufferToByteArray (tableName ));
151
+ } catch (ExecutionException ee ) {
152
+ throw new RuntimeException (ee );
153
+ }
108
154
}
109
155
110
156
private void closeTable (HTableInterface table ) throws TIOError {
@@ -141,6 +187,10 @@ private ResultScanner getScanner(int id) {
141
187
return scannerMap .get (id );
142
188
}
143
189
190
+ void setEffectiveUser (String effectiveUser ) {
191
+ connectionCache .setEffectiveUser (effectiveUser );
192
+ }
193
+
144
194
/**
145
195
* Removes the scanner associated with the specified ID from the internal HashMap.
146
196
* @param id of the Scanner to remove
0 commit comments