首页 Java java教程 详解Spark中将对象序列化存储到hdfs

详解Spark中将对象序列化存储到hdfs

Jun 17, 2017 am 11:42 AM
spark 对象 序列化 详解

这篇文章主要介绍了java 中Spark中将对象序列化存储到hdfs的相关资料,需要的朋友可以参考下

java 中Spark中将对象序列化存储到hdfs

摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.

废话不多说, 直接贴代码了. spark1.4 + hbase0.98


import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source

object Word2VecDemo {

 def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
 }

 def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer", "256m")
  sparkConf.set("spark.kryoserializer.buffer.max","2046m")
  sparkConf.set("spark.akka.frameSize", "500")
  sparkConf.set("spark.rpc.askTimeout", "30")
  

  val sc = new SparkContext(sparkConf)
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

  val scan = new Scan()
  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
  
  val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
  
  val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
  "data".getBytes,
  "article".getBytes,
  CompareOp.EQUAL,
  comp
  )
  
  filterList.addFilter(articleFilter)
  filterList.addFilter(new PageFilter(100))
  
  scan.setFilter(filterList)
  scan.setCaching(50)
  scan.setCacheBlocks(false)
  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

  val crawledRDD = sc.newAPIHadoopRDD(
   hbaseConf,
   classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result]
  )
 
  val articlesRDD = crawledRDD.filter{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     content != null
   }
  }

  val wordsInDoc = articlesRDD.map{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
     else Seq("")
   }
  }
  
  val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
  
  val word2vec = new Word2Vec()
  val model = word2vec.fit(fitleredWordsInDoc)
  
  //---------------------------------------重点看这里-------------------------------------------------------------
  //将上面的模型存储到hdfs
  val hadoopConf = sc.hadoopConfiguration
  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
  val fileSystem = FileSystem.get(hadoopConf)
  val path = new Path("/user/hadoop/data/mllib/word2vec-object")
  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
  oos.writeObject(model)
  oos.close
  
  //这里示例另外一个程序直接从hdfs读取序列化对象使用模型
  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  
  /*
  * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
  * import java.io._
  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  * ois.close
  */
  //--------------------------------------------------------------------------------------------------------------
 }
}
登录后复制

以上是详解Spark中将对象序列化存储到hdfs的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
4 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

Win11管理员权限获取详解 Win11管理员权限获取详解 Mar 08, 2024 pm 03:06 PM

Windows操作系统是全球最流行的操作系统之一,其新版本Win11备受瞩目。在Win11系统中,管理员权限的获取是一个重要的操作,管理员权限可以让用户对系统进行更多的操作和设置。本文将详细介绍在Win11系统中如何获取管理员权限,以及如何有效地管理权限。在Win11系统中,管理员权限分为本地管理员和域管理员两种。本地管理员是指具有对本地计算机的完全管理权限

Oracle SQL中的除法运算详解 Oracle SQL中的除法运算详解 Mar 10, 2024 am 09:51 AM

OracleSQL中的除法运算详解在OracleSQL中,除法运算是一种常见且重要的数学运算操作,用于计算两个数相除的结果。除法在数据库查询中经常用到,因此了解OracleSQL中的除法运算及其用法是数据库开发人员必备的技能之一。本文将详细讨论OracleSQL中除法运算的相关知识,并提供具体的代码示例供读者参考。一、OracleSQL中的除法运算

PHP模运算符的作用及用法详解 PHP模运算符的作用及用法详解 Mar 19, 2024 pm 04:33 PM

PHP中的模运算符(%)是用来获取两个数值相除的余数的。在本文中,我们将详细讨论模运算符的作用及用法,并提供具体的代码示例来帮助读者更好地理解。1.模运算符的作用在数学中,当我们将一个整数除以另一个整数时,会得到一个商和一个余数。例如,当我们将10除以3时,商为3,余数为1。模运算符就是用来获取这个余数的。2.模运算符的用法在PHP中,使用%符号来表示模

如何将 MySQL 查询结果数组转换为对象? 如何将 MySQL 查询结果数组转换为对象? Apr 29, 2024 pm 01:09 PM

将MySQL查询结果数组转换为对象的方法如下:创建一个空对象数组。循环结果数组并为每一行创建一个新的对象。使用foreach循环将每一行的键值对赋给新对象的相应属性。将新对象添加到对象数组中。关闭数据库连接。

数组和对象在 PHP 中的区别是什么? 数组和对象在 PHP 中的区别是什么? Apr 29, 2024 pm 02:39 PM

PHP中,数组是有序序列,以索引访问元素;对象是具有属性和方法的实体,通过new关键字创建。数组访问通过索引,对象访问通过属性/方法。数组值传递,对象引用传递。

PHP中的Request对象是什么? PHP中的Request对象是什么? Feb 27, 2024 pm 09:06 PM

PHP中的Request对象是用于处理客户端发送到服务器的HTTP请求的对象。通过Request对象,我们可以获取客户端的请求信息,比如请求方法、请求头信息、请求参数等,从而实现对请求的处理和响应。在PHP中,可以使用$_REQUEST、$_GET、$_POST等全局变量来获取请求的信息,但是这些变量并不是对象,而是数组。为了更加灵活和方便地处理请求信息,可

Java序列化如何影响性能? Java序列化如何影响性能? Apr 16, 2024 pm 06:36 PM

序列化对Java性能的影响:序列化过程依赖于反射,会显着影响性能。序列化需要创建字节流存储对象数据,导致内存分配和处理成本。序列化大对象会消耗大量内存和时间。序列化后的对象在网络上传输时会增加负载量。

C++ 函数库如何进行序列化和反序列化? C++ 函数库如何进行序列化和反序列化? Apr 18, 2024 am 10:06 AM

C++函数库序列化和反序列化指南序列化:创建输出流并将其转换为存档格式。将对象序列化到存档中。反序列化:创建输入流并将其从存档格式恢复。从存档中反序列化对象。实战示例:序列化:创建输出流。创建存档对象。创建对象并将其序列化到存档中。反序列化:创建输入流。创建存档对象。创建对象并从存档中反序列化。

See all articles