详解Spark中将对象序列化存储到hdfs
这篇文章主要介绍了java 中Spark中将对象序列化存储到hdfs的相关资料,需要的朋友可以参考下
java 中Spark中将对象序列化存储到hdfs
摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.
废话不多说, 直接贴代码了. spark1.4 + hbase0.98
import org.apache.spark.storage.StorageLevel import scala.collection.JavaConverters._ import java.io.File import java.io.FileInputStream import java.io.FileOutputStream import java.io.ObjectInputStream import java.io.ObjectOutputStream import java.net.URI import java.util.Date import org.ansj.library.UserDefineLibrary import org.ansj.splitWord.analysis.NlpAnalysis import org.ansj.splitWord.analysis.ToAnalysis import org.apache.hadoop.fs.FSDataInputStream import org.apache.hadoop.fs.FSDataOutputStream import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.FileUtil import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.filter.FilterList import org.apache.hadoop.hbase.filter.PageFilter import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.filter.SingleColumnValueFilter import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import com.feheadline.fespark.db.Neo4jManager import com.feheadline.fespark.util.Env import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd._ import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} import scala.math.log import scala.io.Source object Word2VecDemo { def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("Word2Vec Demo") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.kryoserializer.buffer", "256m") sparkConf.set("spark.kryoserializer.buffer.max","2046m") sparkConf.set("spark.akka.frameSize", "500") sparkConf.set("spark.rpc.askTimeout", "30") val sc = new SparkContext(sparkConf) val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper") hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled") val scan = new Scan() val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL) val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""") val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter( "data".getBytes, "article".getBytes, CompareOp.EQUAL, comp ) filterList.addFilter(articleFilter) filterList.addFilter(new PageFilter(100)) scan.setFilter(filterList) scan.setCaching(50) scan.setCacheBlocks(false) hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan)) val crawledRDD = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) val articlesRDD = crawledRDD.filter{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) content != null } } val wordsInDoc = articlesRDD.map{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq else Seq("") } } val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty) val word2vec = new Word2Vec() val model = word2vec.fit(fitleredWordsInDoc) //---------------------------------------重点看这里------------------------------------------------------------- //将上面的模型存储到hdfs val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/") val fileSystem = FileSystem.get(hadoopConf) val path = new Path("/user/hadoop/data/mllib/word2vec-object") val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path))) oos.writeObject(model) oos.close //这里示例另外一个程序直接从hdfs读取序列化对象使用模型 val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path))) val sample_model = ois.readObject.asInstanceOf[Word2VecModel] /* * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型 * import java.io._ * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object")) * val sample_model = ois.readObject.asInstanceOf[Word2VecModel] * ois.close */ //-------------------------------------------------------------------------------------------------------------- } }
以上是详解Spark中将对象序列化存储到hdfs的详细内容。更多信息请关注PHP中文网其他相关文章!

热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

SublimeText3汉化版
中文版,非常好用

禅工作室 13.0.1
功能强大的PHP集成开发环境

Dreamweaver CS6
视觉化网页开发工具

SublimeText3 Mac版
神级代码编辑软件(SublimeText3)

热门话题

Windows操作系统是全球最流行的操作系统之一,其新版本Win11备受瞩目。在Win11系统中,管理员权限的获取是一个重要的操作,管理员权限可以让用户对系统进行更多的操作和设置。本文将详细介绍在Win11系统中如何获取管理员权限,以及如何有效地管理权限。在Win11系统中,管理员权限分为本地管理员和域管理员两种。本地管理员是指具有对本地计算机的完全管理权限

OracleSQL中的除法运算详解在OracleSQL中,除法运算是一种常见且重要的数学运算操作,用于计算两个数相除的结果。除法在数据库查询中经常用到,因此了解OracleSQL中的除法运算及其用法是数据库开发人员必备的技能之一。本文将详细讨论OracleSQL中除法运算的相关知识,并提供具体的代码示例供读者参考。一、OracleSQL中的除法运算

PHP中的模运算符(%)是用来获取两个数值相除的余数的。在本文中,我们将详细讨论模运算符的作用及用法,并提供具体的代码示例来帮助读者更好地理解。1.模运算符的作用在数学中,当我们将一个整数除以另一个整数时,会得到一个商和一个余数。例如,当我们将10除以3时,商为3,余数为1。模运算符就是用来获取这个余数的。2.模运算符的用法在PHP中,使用%符号来表示模

将MySQL查询结果数组转换为对象的方法如下:创建一个空对象数组。循环结果数组并为每一行创建一个新的对象。使用foreach循环将每一行的键值对赋给新对象的相应属性。将新对象添加到对象数组中。关闭数据库连接。

PHP中,数组是有序序列,以索引访问元素;对象是具有属性和方法的实体,通过new关键字创建。数组访问通过索引,对象访问通过属性/方法。数组值传递,对象引用传递。

PHP中的Request对象是用于处理客户端发送到服务器的HTTP请求的对象。通过Request对象,我们可以获取客户端的请求信息,比如请求方法、请求头信息、请求参数等,从而实现对请求的处理和响应。在PHP中,可以使用$_REQUEST、$_GET、$_POST等全局变量来获取请求的信息,但是这些变量并不是对象,而是数组。为了更加灵活和方便地处理请求信息,可

序列化对Java性能的影响:序列化过程依赖于反射,会显着影响性能。序列化需要创建字节流存储对象数据,导致内存分配和处理成本。序列化大对象会消耗大量内存和时间。序列化后的对象在网络上传输时会增加负载量。

C++函数库序列化和反序列化指南序列化:创建输出流并将其转换为存档格式。将对象序列化到存档中。反序列化:创建输入流并将其从存档格式恢复。从存档中反序列化对象。实战示例:序列化:创建输出流。创建存档对象。创建对象并将其序列化到存档中。反序列化:创建输入流。创建存档对象。创建对象并从存档中反序列化。
