HBase APIs

Scala or Java client

Dependencies

Maven

<dependency>
   <groupId>org.apache.hbase</groupId>
   <artifactId>hbase-client</artifactId>
   <version>1.3.1</version>
</dependency>

SBT

libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.3.1"

APIs

这里是Scala写法,Java写法,自行领悟,-_-。

package com.zq.hbase.learn

import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes

object HBaseUtil {
  def main(args: Array[String]): Unit = {
    
    // 创建 HBase 配置文件对象
    val conf = HBaseConfiguration.create()
    // 指定 zookeeper 节点地址,在 hbase-site.xml 有配置
    conf.set("hbase.zookeeper.quorum", "server202,server203,server204")
    // 指定 zookeeper 端口号
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    /**
     * HDP's HBase 需要额外指定 zookeeper.znode.parent
     * Cloudera's HBase 不需要
     * 因为我用的 Cloudera,所以在这里注释掉了 ^_^
     */
    // conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    
    // 根据配置文件创建连接
    val connection: Connection = ConnectionFactory.createConnection(conf)
    
    /**
      * 通过连接获取管理对象
      * Note:
      * 1、这是一个轻量级的操作,不会保证返回的Admin对象是线程安全的
      * 在多线程的情况下,应该为每个线程创建一个新实例。
      * 2、不建议把返回的 Admin 对象池化或缓存
      * 3、记得调用 admin.close()
      */
    val admin: Admin = connection.getAdmin

    // 如果表存在,删了重建
    if (admin.tableExists(TableName.valueOf("test_table"))) {
      admin.disableTable(TableName.valueOf("test_table"))
      admin.deleteTable(TableName.valueOf("test_table"))
    }
    // 建表
    val tableDescriptor: HTableDescriptor = new HTableDescriptor(TableName.valueOf("test_table"))
    val columnDescriptor: HColumnDescriptor = new HColumnDescriptor("column_family".getBytes())
    tableDescriptor.addFamily(columnDescriptor)
    admin.createTable(tableDescriptor)


    // 返回 HBase table ist
    val tables: Array[HTableDescriptor] = admin.listTables()
    tables.foreach(println)


    // get table
    val table: Table = connection.getTable(TableName.valueOf("test_table"))

    // ======================= 写 =======================

    // rowkey
    val put: Put = new Put(Bytes.toBytes("2018/4/27-rowkey"))
    // column family, column qualifier, value
    put.addColumn(Bytes.toBytes("column_family"), Bytes.toBytes("column_qualifier"), Bytes.toBytes("value"))
    // add to HBase table
    table.put(put)

    // ======================= 读 =======================

    // rowkey
    val get: Get = new Get(Bytes.toBytes("2018/4/27-rowkey"))
    val result: Result = table.get(get)
    
    val value: Array[Byte] = result.value()
    println(Bytes.toString(value))
    
    result.rawCells()
      .filter(cell => "column_family" == Bytes.toString(CellUtil.cloneFamily(cell)))
      .map { cell =>
        val qualifier = CellUtil.cloneQualifier(cell)
        val value = CellUtil.cloneValue(cell)
        (qualifier, value)
      }
  }
}

Spark APIs

Dependencies

Maven

<dependency>
   <groupId>org.apache.hbase</groupId>
   <artifactId>hbase-client</artifactId>
   <version>1.3.1</version>
</dependency>
<dependency>
   <groupId>org.apache.hbase</groupId>
   <artifactId>hbase-server</artifactId>
   <version>1.3.1</version>
</dependency>

SBT

libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.3.1"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.3.1"

Spark 集群需要下面的jar包

hbase-client-1.3.1.jar
hbase-server-1.3.1.jar
hbase-common-1.3.1.jar
hbase-protocol-1.3.1.jar
hbase-hadoop2-compat-1.3.1.jar
htrace-core-3.1.0-incubating.jar
metrics-core-2.2.0.jar(com.yammer.metrics.metrics-core)

APIs

Write

将 RDD 写入 HBase 表

package com.zq.hbase.learn

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.SparkConf
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.sql.SparkSession


object Spark_HBase_Write {

  def main(args: Array[String]): Unit = {
  
    val spark = SparkSession.builder().master("local").appName("Spark HBase Write Demo").getOrCreate()
    val sc = spark.sparkContext

    val tableName = "test_table"

    // JobConf 是 write out
    val jobConf = new JobConf(hbaseConf, this.getClass)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    // Note: TableOutputFormat 是 mapred 包下的
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    jobConf.set("hbase.zookeeper.quorum", "server202,server203,server204")

    // (rowkey, value)
    val pairs = sc.parallelize(Seq("2018/4/27-rowkey" -> "value"))

    def convert(pair: (String, String)) = {
      val (rowkey, value) = pair
      val p = new Put(Bytes.toBytes(rowkey))
      // column family, column qualifier, value
      p.addColumn(Bytes.toBytes("column_family"), Bytes.toBytes("column_qualifier"), Bytes.toBytes(value))
      (new ImmutableBytesWritable, p)
    }

    // 保存数据到 HBase
    new PairRDDFunctions(pairs.map(convert)).saveAsHadoopDataset(jobConf)
  }
}

Read

将 HBase 表读到 Spark RDD

package com.zq.hbase.learn

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession


object Spark_HBase_Read {

  def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder().master("local").appName("Spark HBase Read Demo").getOrCreate();
    val sc = spark.sparkContext

    // HBaseConfiguration 永远是 read in
    val tableName = "test_table"
    val hConf = HBaseConfiguration.create()
    hConf.set("hbase.zookeeper.quorum", "server202,server203,server204")
    // Note: TableInputFormat 是 mapreduce 包下的
    hConf.set(TableInputFormat.INPUT_TABLE, tableName)

    // 将整张表读入 Spark 并转换成 RDD
    val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

    // ============================ round 1 ============================

    resultRDD.foreach(tuple => {
      // rowkey
      println(Bytes.toString(tuple._2.getRow))
      // column family, column qualifier
      println(Bytes.toInt(tuple._2.getValue("column_family".getBytes, "column_qualifier".getBytes)))
    })

    // ============================ round 2 :元组模式匹配 ============================

    resultRDD.foreach(x => x match {
      case (_, result) =>
        // rowkey
        println(Bytes.toString(result.getRow))
        // column family, column qualifier
        println(Bytes.toInt(result.getValue("column_family".getBytes, "column_qualifier".getBytes)))
    })

    // ============================ round 3 :真·元组模式匹配 ============================

    resultRDD.foreach {
      case (_, result) =>
        // rowkey
        println(Bytes.toString(result.getRow))
        // column family, column qualifier
        println(Bytes.toInt(result.getValue("column_family".getBytes, "column_qualifier".getBytes)))
    }

  }
  
}

HBase MOB

Preparation

使用 MOB,需要确保 HFile 的版本为 3

<property>
  <name>hfile.format.version</name>
  <value>3</value>
</property>

# NAME: 列族名称
# IS_MO: 是一个 Boolean 值选项,指定当前列族是否启用 MOB 功能
# MOB_THRESHOLD: 指定多大的对象当做 MOB 来存储,单位为字节。如果你没有指定 MOB_THRESHOLD 的值,默认是 100KB。也就是说如果你在 HBase 中写入的对象大于这个尺寸,这个对象就会被当做 MOB 来处理。
hbase> create 'BIGDATA:mob_test', {NAME => 'mob_family', IS_MOB => true, MOB_THRESHOLD => 102400}

qin

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏

打开支付宝扫一扫,即可进行扫码打赏哦