首页 » NoSQL » Spark整合HBase(自定义HBase DataSource)

Spark整合HBase(自定义HBase DataSource)

原文 http://blog.csdn.net/UUfFO/article/details/79243644

2018-02-04 02:01:10阅读(417)

背景

Spark支持多种数据源,但是Spark对HBase 的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的DataSource API自己实现了一套比较方便操作HBase的API。

写 HBase

写HBase会根据Dataframe的schema写入对应数据类型的数据到Hbase,先上使用示例:

import spark.implicits._
import org.apache.hack.spark._
val df = spark.createDataset(Seq(("ufo",  "play"), ("yy",  ""))).toDF("name", "like")
// 方式一
val options = Map(
            "rowkey.filed" -> "name",
            "startKey" -> "aaaaa",
            "endKey" -> "zzzzz",
            "numReg" -> "12",
            "bulkload.enable" -> "false"
        )
df.saveToHbase("hbase_table", Some("XXX:2181"), options)
// 方式二
df1.write.format("org.apache.spark.sql.execution.datasources.hbase")
            .options(Map(
                "rowkey.filed" -> "name",
                "outputTableName" -> "hbase_table",
                "hbase.zookeeper.quorum" -> "XXX:2181",
                "startKey" -> "aaaaa",
                "endKey" -> "zzzzz",
                "numReg" -> "12",
                "bulkload.enable" -> "false"
            )).save()

上面两种方式实现的效果是一样的,下面解释一下每个参数的含义:

rowkey.field:指定dataframe的哪个字段作为HBase表的rowkey,默认使用第一个字段 outputTableName:HBase表名 hbase.zookeeper.quorum:zookeeper地址,也可写成 zk startKey,endKey,numReg:这三个配置是一家的,当对应的HBase表不存在时会先创建表,默认只有一个分区,通过这三个配置可以进行预分区,从字面意思可知分别对应,分区的起始key,结束key,分区数,另外列族名通过family设定,默认是 info。 bulkload.enable:写HBase可走HBase写入,也可BulkLoad直接生成HBase需要的Hfile,在数据量大的情况下会快很多,默认是使用BulkLoad 读 HBase

示例代码如下:

// 方式一
import org.apache.hack.spark._
 val options = Map(
    "spark.table.schema" -> "appid:String,appstoreid:int,firm:String",
    "hbase.table.schema" -> ":rowkey,info:appStoreId,info:firm"
)
spark.hbaseTableAsDataFrame("hbase_table", Some("XXX:2181")).show(false)
// 方式二
spark.read.format("org.apache.spark.sql.execution.datasources.hbase").
            options(Map(
            "spark.table.schema" -> "appid:String,appstoreid:int,firm:String",
            "hbase.table.schema" -> ":rowkey,info:appStoreId,info:firm",
            "hbase.zookeeper.quorum" -> "XXX:2181",
            "inputTableName" -> "hbase_table"
        )).load.show(false)  

spark和hbase表的schema映射关系指定不是必须的,默认会生成rowkey和content两个字段,content是由所有字段组成的json字符串,可通过field.type.fieldname对单个字段设置数据类型,默认都是StringType。这样映射出来还得通过spark程序转一下才是你想要的样子,而且所有字段都会去扫描,相对来说不是特别高效。

故我们可自定义schema映射来获取数据:

hbase.table.schema:hbase表的数据结构,:rowkey,cm:fieldname1,cm:fieldname2,一看就明白 spark.table.schema:spark DataFrame对应的schema,name_x:fieldType,name_y:fieldType,name_z:fieldType
注意这两个schema是一一对应的,Hbase只会扫描hbase.table.schema对应的列。

源码在我的 GitHub,以后在GitHub上跟新,欢迎star

最新发布

CentOS专题

关于本站

5ibc.net旗下博客站精品博文小部分原创、大部分从互联网收集整理。尊重作者版权、传播精品博文,让更多编程爱好者知晓!

小提示

按 Ctrl+D 键,
把本文加入收藏夹