Skip to content

HbaseApi是一个使用scala编写的调用hbase API的简单API库,提供简单的hbase管理、get、scan、put等接口。

Notifications You must be signed in to change notification settings

CWX21/HbaseApi-cwx

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

1 Commit
 
 
 
 
 
 
 
 

Repository files navigation

#hapi

###简介 HbaseApi是一个使用scala编写的调用hbase API的简单API库,提供简单的hbase管理、get、scan、put等接口。

###使用

####HbaseTool 负责初始化hbase的连接等操作 #####API

1、def apply() = {}
2、def apply(cfgs: MuMap[String, String]) = {}
3、def apply(fileName: String) = {}
4、def apply(cfgs: MuMap[String, String], executorAmount: Int) = {}
5、def apply(fileName: String, executorAmount: Int) = {}
6、def getHbaseConnection: HBConnection = {}
7、def getConnection: Connection = {}
8、def getAdmin: Admin = {}
9、def getExecutors: Option[ExecutorService] = {}
10、def closeConnect() = {}

#####例子

import hapi.HbaseTool

/* 利用kv键值对配置hbase的连接属性 */
HbaseTool.apply("./DycConfigs/HClusterConfig")

/* 如果成功, 可以获取admin、connection等 */
val admin = HbaseTool.getAdmin

####HManager 负责hbase的管理操作 #####API

1、def tableExist(admin: Admin, tb: String): Boolean = {}
2、def createTable(admin: Admin, tb: String, hFamilys: TraversableOnce[HFamily]): Boolean = {}
3、def createTable(admin: Admin, tb: String, hFamily: HFamily): Boolean = {}
4、def createTableForce(admin: Admin, tb: String, hFamilys: TraversableOnce[HFamily]): Boolean = {}
5、def createTableForce(admin: Admin, tb: String, hFamily: HFamily): Boolean = {}
6、def dropTable(admin: Admin, tb: String): Boolean = {}
7、def addFamily(admin: Admin, tb: String, hFamily: HFamily): Boolean = {}
8、def addFamilys(admin: Admin, tb: String, hFamilys: TraversableOnce[HFamily]): Boolean = {}
9、def deleteFamily(admin: Admin, tb: String, hFamily: HFamily): Boolean = {
10、def deleteFamilys(admin: Admin, tb: String, hFamilys: TraversableOnce[HFamily]): Boolean = {}
11、def familyExist(admin: Admin, tb: String, familyName: String): Boolean = {}
12、def updateFamilyArrts(admin: Admin, tb: String, hFamily: HFamily): Boolean = {}
13、def updateFamilyArrts(admin: Admin, tb: String, hFamilys: TraversableOnce[HFamily]): Boolean = {}

#####例子

import hapi.HbaseTool
import hapi.HManager

HbaseTool.apply("./DycConfigs/HClusterConfig")
val admin = HbaseTool.getAdmin

val hf = new HFamily("diu1")
hf.setMaxVersions(100)

if (!HManager.tableExist(admin, "testapi")) {
  val status = HManager.createTable(admin, "testapi", hf)
  if (!status) println(HBError.getAndCleanErrors())
  else println("创建表成功")
}

/* 加入列簇 */
val map = MuMap[String, String]()

/* 设置列簇属性 */
map += ("keepdeletecells" -> "true")  //true or false, string
map += ("maxversions" -> "100")     //num
map += ("minversions" -> "1")       //num
map += ("ttl" -> "10000000")        //num
map += ("blocksize" -> (1024 * 128).toString) //1k ~ 16M, num
map += ("inmem" -> "true")          //true or false
map += ("bloomfilter" -> "row")     //row or rowcol or don't set it defaule is None
map += ("blockcache" -> "true")     //true or false
map += ("scope" -> "1")             //0 or 1, num

val hf1 = new HFamily("diu2")
hf1.setMaxVersions(100)

val hf2 = new HFamily("diu3")
hf2.setHColumnDescriptorAttrs(map)

val arr = ArrayBuffer[HFamily]()
arr += (hf, hf1, hf2)

val status = HManager.addFamilys(admin, "testapi", arr)
if (!status) println(HBError.getAndCleanErrors())
else println("加入列簇成功")


/* 删除列簇 */
val hf = new HFamily("diu1")
val hf1 = new HFamily("diu2")
val hf2 = new HFamily("diu4")

val arr = ArrayBuffer[HFamily]()
arr += (hf, hf1, hf2)

if (!HManager.tableExist(admin, "testapi")) {
  val status = HManager.createTable(admin, "testapi", hf)
  println(status)
}

val status = HManager.deleteFamilys(admin, "testapi", arr)
if (!status) println(HBError.getAndCleanErrors())
else println("删除列簇成功")
/* 其他例子请参考 test的代码 */

####HWriter 负责写入数据 #####API

1、def putOne(admin: Admin, cnt: Connection, tb: String, columnrow: HBColumnRow): Boolean = {}
2、def putRows(puter: HBRowPuter): Boolean = {}
3、def putRowsAtFamily(puter: HBFamilyPuter): Boolean = {}

#####例子

import hapi.HBColumn.{HBRowCell, HBColumnFamily, HBColumnRow}
import hapi.HWriter

/* 写入一个 */
HbaseTool.apply("./DycConfigs/HClusterConfig")
val admin = HbaseTool.getAdmin
val connection = HbaseTool.getConnection
val hbWriter = new HWriter()
val hbRow = new HBColumnRow("10")  //创建一行
val hbFamily = new HBColumnFamily("diu2") //加入一个family

hbFamily.addColumnCell("col2_1", "test1") //加入列值
hbFamily.addColumnCell("col2_2", "test2")
hbRow.addFamily(hbFamily)

val status = hbWriter.putOne(admin, connection, "testapi", hbRow)
if (!status) {
  println("put 失败")
  println(HBError.getAndCleanErrors())
}else {
  println("put 成功")
}

/* 写入多个 */
HbaseTool.apply("./DycConfigs/HClusterConfig", 4)
val cnt = HbaseTool.getHbaseConnection
val puter = new HBRowPuter(cnt, "testapi")
val hbWriter = new HWriter()
val rows = ArrayBuffer[HBColumnRow]()

for (i <- 1 to 100000) {
  val hbRow = new HBColumnRow(i.toString)

  val hbFamily = new HBColumnFamily("diu1")
  hbFamily.addColumnCell("col1", "testtesttesttesttestesttesttesttesttesttesttesttesttesttesttestestt" + i.toString)
  hbFamily.addColumnCell("col2", "testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttetest" + i.toString)
  hbFamily.addColumnCell("col3", "testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttestest" + i.toString)
  hbRow.addFamily(hbFamily)

  val hbFamily1 = new HBColumnFamily("diu2")
  hbFamily1.addColumnCell("col2_1", "testtesttesttesttestesttesttesttesttesttesttesttesttesttesttestestt" + i.toString)
  hbFamily1.addColumnCell("col2_2", "testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttetest" + i.toString)
  hbFamily1.addColumnCell("col2_3", "testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttestest" + i.toString)
  hbRow.addFamily(hbFamily1)

  rows += hbRow
}

puter.data = rows
puter.bufSize = 1024 * 1024 * 3
puter.durability = Durability.USE_DEFAULT

println("start")
val s = System.currentTimeMillis
/**
  * ASYNC_WAL : 当数据变动时,异步写WAL日志
  * SYNC_WAL : 当数据变动时,同步写WAL日志
  * FSYNC_WAL : 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘
  * SKIP_WAL : 不写WAL日志
  * USE_DEFAULT : 使用HBase全局默认的WAL写入级别,即 SYNC_WAL
  */
val status = hbWriter.putRows(puter)
val e = System.currentTimeMillis
if (!status) {
  println(HBError.getAndCleanErrors())
  println("批量插入失败")
}else {
  println("批量插入成功")
}
println(e - s + " ms")
println((e - s) / 1000 + " s")

####HReader 负责读取hbase行、列的内容 #####API

1、def resolveResult(result: Result) = {}
2、def resolveResults(results: Array[Result]) = {}
3、def resolveResult2Json(result: Result): String = {}
4、def resolveResults2Json(results: Array[Result]): String = {}
5、def getRow(hbGeter: HBGeter, rowKey: String): Option[Result] = {}
6、def getRows(hbGeter: HBGeter, rowKeys: Traversable[String]): Option[Array[Result]] = {}
7、def getRowByQualifier(hbGeter: HBQualifierGeter, rowKey: String, qualifier: String): Option[Result] = {}
8、def getRowsByQualifier(hbGeter: HBQualifierGeter, rowKeys: Traversable[String],  qualifier: String): Option[Array[Result]] = {}
9、def getRowByQualifiers(hbGeter: HBQualifierGeter, rowKey: String, qualifiers: Traversable[String]): Option[Result] = {}
10、def getRowsByQualifiers(hbGeter: HBQualifierGeter, rowCells: MuMap[String, Traversable[String]]): Option[Array[Result]] = {}
11、def getRowByFamily(hbGeter: HBGeter, family: String, row: String): Option[Result] = {}
12、def getRowsByFamily(hbGeter: HBGeter, family: String, rows: Traversable[String]): Option[Array[Result]] = {}
13、def getRowByFamilys(hbGeter: HBGeter, familys: Traversable[String], row: String): Option[Result] = {}
14、def getRowsByFamilys(hbGeter: HBGeter, familys: Traversable[String], rows: Traversable[String]): Option[Array[Result]] = {}

####例子

import hapi.Geter.{HBGeter, HBQualifierGeter}
import hapi.Puter.{HBRowPuter, HBFamilyPuter}
import hapi.HReader
import hapi.HBColumn.{HBRowCell, HBColumnFamily, HBColumnRow}
import scala.collection.mutable.ArrayBuffer

/* 读一个 */
HbaseTool.apply("./DycConfigs/HClusterConfig", 4)
val cnt = HbaseTool.getHbaseConnection
val geter = new HBGeter(cnt, "testapi")
val reader = new HReader()
val keys = ArrayBuffer[String]()
for (i <- 1 to 30) {
  keys += i.toString
}
geter.maxVersion = 2
println("start")
val s = System.currentTimeMillis
val hh = reader.getRows(geter, keys)
val e = System.currentTimeMillis
hh match {
  case None =>
    println(HBError.getAndCleanErrors())
  case Some(rs) =>
    reader.resolveResults(rs)
    val ret = reader.resolveResults2Json(rs)
    println(ret)
}
println(e - s + " ms")
println((e - s) / 1000 + " s")

/* 读多个 */
HbaseTool.apply("./DycConfigs/HClusterConfig")
val cnt = HbaseTool.getHbaseConnection
val reader = new HReader()
val rowsQul = MuMap[String, ArrayBuffer[String]]()
val geter = new HBQualifierGeter(cnt, "testapi", "diu1")
val quls = ArrayBuffer[String]()
quls += "col1"
quls += "col2"
quls += "col3"

for (i <- 11 to 13) {
  rowsQul += (i.toString -> quls)
}

val hh = reader.getRowsByQualifiers(geter, rowsQul)

hh match {
  case None =>
    println(HBError.getAndCleanErrors())
  case Some(rs) =>
    reader.resolveResults(rs)
}
/* 其他例子请参考 test的代码*/

####HObtainer 是一个scan的封装 #####API

1、def closeScanner(scanResult: ResultScanner): Boolean = {}
2、def resolveScanResult(scanResult: ResultScanner, hbcs: Array[HBFamilyQualifier], func: (Iterator[Result], Array[HBFamilyQualifier]) => Any = printScanResult): Option[Any] = {}
3、def scan(hbScanner: HBScanner): Option[ResultScanner] = {}

#####例子

import hapi.Geter.{HBScanner, HBGeter, HBQualifierGeter}
import hapi.Puter.{HBRowPuter, HBFamilyPuter}
import hapi.HBScanner
import scala.collection.mutable.ArrayBuffer

HbaseTool.apply("./DycConfigs/HClusterConfig")
val cnt = HbaseTool.getHbaseConnection
val hbScanner = new HBScanner(cnt, "testapi")
hbScanner.setStartRow("1".getBytes())
hbScanner.setStopRow("11".getBytes())
val hObt = new HObtainer()
val rs = hObt.scan(hbScanner)

rs match {
  case None =>
    println(HBError.getAndCleanErrors())
    println("scan 数据失败")
  case Some(result) =>
    hObt.resolveScanResult(result, Array(new HBFamilyQualifier("diu1".getBytes(), "col1".getBytes())))
    hObt.closeScanner(result)
}
//    val geter = new HBGeter(cnt, "testapi")
//    val scanner = new HObtainer()
//    scanner.scan(geter)

About

HbaseApi是一个使用scala编写的调用hbase API的简单API库,提供简单的hbase管理、get、scan、put等接口。

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages