如何使用Spark从HBase读取

马丹兰

下面的代码将从hbase中读取,然后将其转换为json结构并转换为schemaRDD,但是问题是我using List要存储json字符串,然后传递给javaRDD,对于大约100 GB的数据,主数据库将被加载内存中的数据。什么是从hbase加载数据然后执行操作,然后转换为JavaRDD的正确方法。

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

    public static void main(String[] args) throws IOException, ParseException {

        List<String> jars = Lists.newArrayList("");

        SparkConf spconf = new SparkConf();
        spconf.setMaster("local[2]");
        spconf.setAppName("HBase");
        //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
        spconf.setJars(jars.toArray(new String[jars.size()]));
        JavaSparkContext sc = new JavaSparkContext(spconf);
        //spconf.set("spark.executor.memory", "1g");

        JavaSQLContext jsql = new JavaSQLContext(sc);


        HBaseConfiguration conf = new HBaseConfiguration();
        String tableName = "HBase.CounData1_Raw_Min1";
        HTable table = new HTable(conf,tableName);
        try {

            ResultScanner scanner = table.getScanner(new Scan());
            List<String> jsonList = new ArrayList<String>();

            String json = null;

            for(Result rowResult:scanner) {
                json = "";
                String rowKey  = Bytes.toString(rowResult.getRow());
                for(byte[] s1:rowResult.getMap().keySet()) {
                    String s1_str = Bytes.toString(s1);

                    String jsonSame = "";
                    for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
                        String s2_str = Bytes.toString(s2);
                        for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
                            String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
                            jsonSame += "\""+s2_str+"\":"+s3_str+",";
                        }
                    }
                    jsonSame = jsonSame.substring(0,jsonSame.length()-1);
                    json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
                }
                json = json.substring(0,json.length()-1);
                json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
                jsonList.add(json);
            }

            JavaRDD<String> jsonRDD = sc.parallelize(jsonList);

            JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);




            System.out.println(schemaRDD.take(2));

        } finally {
            table.close();
        }

    }

}
穆尔塔扎·坎奇瓦拉

一个使用Spark(Scala)读取HBase数据的基本示例,您也可以在Java中编写以下代码:

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    val tableName = "table1"

    System.setProperty("user.name", "hdfs")
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    conf.set("hbase.master", "localhost:60000")
    conf.setInt("timeout", 120000)
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found : " + hBaseRDD.count())
    sc.stop()
  }
}

更新-2016

从Spark 1.0.x +开始,现在您还可以使用Spark-HBase连接器:

Maven依赖关系包括:

<dependency>
  <groupId>it.nerdammer.bigdata</groupId>
  <artifactId>spark-hbase-connector_2.10</artifactId>
  <version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>

并找到以下相同的示例代码:

import org.apache.spark._
import it.nerdammer.spark.hbase._

object HBaseRead extends App {
    val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
    sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
    val sc = new SparkContext(sparkConf)

    // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:

    val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
    .select("DocID", "Title").inColumnFamily("SMPL")

    println("Number of Records found : " + docRdd .count())
}

更新-2017

从Spark 1.6.x +开始,现在您还可以使用SHC Connector(Hortonworks或HDP用户):

Maven依赖关系包括:

    <dependency>
        <groupId>com.hortonworks</groupId>
        <artifactId>shc</artifactId>
        <version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
    </dependency>

使用此连接器的主要优点是,它在Schema定义中具有灵活性,并且不需要像nerdammer / spark-hbase-connector中那样的硬编码参数。还要记住,它支持Spark 2.x,因此该连接器非常灵活,并在Issues和PR中提供了端到端支持。

在以下存储库路径中找到最新的自述文件和示例:

Hortonworks Spark HBase连接器

您还可以将此RDD转换为DataFrames并在其上运行SQL,也可以将这些Dataset或DataFrames映射到用户定义的Java Pojo或Case类。它运作出色。

如果您还有其他需要,请在下面发表评论。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

使用Spark从HBase读取特定的列数据

来自分类Dev

如何使用Python连接HBase和Spark?

来自分类Dev

通过Spark Streaming从HBase读取数据

来自分类Dev

如何使用Spark处理一系列HBase行?

来自分类Dev

使用HBASE的Spark与使用HDFS的Spark

来自分类Dev

HBase如何使用ZooKeper?

来自分类Dev

HBase如何使用ZooKeper?

来自分类Dev

如何读取从HBase导出的序列文件

来自分类Dev

如何读取从HBase导出的序列文件

来自分类Dev

如何从 Geomesa HBase 表中读取数据?

来自分类Dev

如何使用wholeTextFiles在Spark中读取gz文件

来自分类Dev

如何使用Spark快速从map()中的HDFS中读取文件

来自分类Dev

如何使用Java从Spark中的kafka读取流嵌套的JSON

来自分类Dev

如何使用wholeTextFiles在Spark中读取gz文件

来自分类Dev

如何使用Spark Streaming从序列文件中读取数据

来自分类Dev

如何使用Spark快速从map()中的HDFS中读取文件

来自分类Dev

使用spark读取csv.file时如何省略标题?

来自分类Dev

如何在Spark中配置HBase?

来自分类Dev

如何使用Scala在Spark中联接两个Hbase表

来自分类Dev

使用Spark删除HBase单元格

来自分类Dev

无法使用Spark Scala递增ColumnValue HBASE

来自分类Dev

使用spark-submit设置Spark Job的HBase属性

来自分类Dev

无法使用Spark脚本将Spark数据集写入HBase

来自分类Dev

如何从HBase结果中读取字符串?

来自分类Dev

如何从Hive读取HBase当前和先前版本的数据

来自分类Dev

如何将最新的100行从Hbase加载到Spark

来自分类Dev

Spark-HBASE错误java.lang.IllegalStateException:未读取的块数据

来自分类Dev

从pyspark读取hbase

来自分类Dev

使用Spark流在Hbase / HDFS中保存protobuf

Related 相关文章

热门标签

归档