詳解Spark中將物件序列化儲存到hdfs
這篇文章主要介紹了java 中Spark中將物件序列化儲存到hdfs的相關資料,需要的朋友可以參考下
java 中Spark中將物件序列化儲存到hdfs
摘要: Spark應用程式中經常會遇到這樣一個需求: 需要將JAVA物件序列化並儲存到HDFS, 尤其是利用MLlib計算出來的一些模型, 儲存到hdfs以便模型可以重複利用. 下面的範例示範了Spark環境下從Hbase讀取資料, 產生一個word2vec模型, 儲存到hdfs.
#廢話不多說, 直接貼程式碼了. spark1.4 + hbase0.98
import org.apache.spark.storage.StorageLevel import scala.collection.JavaConverters._ import java.io.File import java.io.FileInputStream import java.io.FileOutputStream import java.io.ObjectInputStream import java.io.ObjectOutputStream import java.net.URI import java.util.Date import org.ansj.library.UserDefineLibrary import org.ansj.splitWord.analysis.NlpAnalysis import org.ansj.splitWord.analysis.ToAnalysis import org.apache.hadoop.fs.FSDataInputStream import org.apache.hadoop.fs.FSDataOutputStream import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.FileUtil import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.filter.FilterList import org.apache.hadoop.hbase.filter.PageFilter import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.filter.SingleColumnValueFilter import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import com.feheadline.fespark.db.Neo4jManager import com.feheadline.fespark.util.Env import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd._ import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} import scala.math.log import scala.io.Source object Word2VecDemo { def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("Word2Vec Demo") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.kryoserializer.buffer", "256m") sparkConf.set("spark.kryoserializer.buffer.max","2046m") sparkConf.set("spark.akka.frameSize", "500") sparkConf.set("spark.rpc.askTimeout", "30") val sc = new SparkContext(sparkConf) val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper") hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled") val scan = new Scan() val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL) val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""") val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter( "data".getBytes, "article".getBytes, CompareOp.EQUAL, comp ) filterList.addFilter(articleFilter) filterList.addFilter(new PageFilter(100)) scan.setFilter(filterList) scan.setCaching(50) scan.setCacheBlocks(false) hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan)) val crawledRDD = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) val articlesRDD = crawledRDD.filter{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) content != null } } val wordsInDoc = articlesRDD.map{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq else Seq("") } } val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty) val word2vec = new Word2Vec() val model = word2vec.fit(fitleredWordsInDoc) //---------------------------------------重点看这里------------------------------------------------------------- //将上面的模型存储到hdfs val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/") val fileSystem = FileSystem.get(hadoopConf) val path = new Path("/user/hadoop/data/mllib/word2vec-object") val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path))) oos.writeObject(model) oos.close //这里示例另外一个程序直接从hdfs读取序列化对象使用模型 val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path))) val sample_model = ois.readObject.asInstanceOf[Word2VecModel] /* * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型 * import java.io._ * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object")) * val sample_model = ois.readObject.asInstanceOf[Word2VecModel] * ois.close */ //-------------------------------------------------------------------------------------------------------------- } }
以上是詳解Spark中將物件序列化儲存到hdfs的詳細內容。更多資訊請關注PHP中文網其他相關文章!

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

熱門話題

Windows作業系統是全球最受歡迎的作業系統之一,其新版本Win11備受矚目。在Win11系統中,管理員權限的取得是一個重要的操作,管理員權限可以讓使用者對系統進行更多的操作和設定。本文將詳細介紹在Win11系統中如何取得管理員權限,以及如何有效地管理權限。在Win11系統中,管理員權限分為本機管理員和網域管理員兩種。本機管理員是指具有對本機電腦的完全管理權限

OracleSQL中的除法運算詳解在OracleSQL中,除法運算是一種常見且重要的數學運算運算,用來計算兩個數相除的結果。除法在資料庫查詢中經常用到,因此了解OracleSQL中的除法運算及其用法是資料庫開發人員必備的技能之一。本文將詳細討論OracleSQL中除法運算的相關知識,並提供具體的程式碼範例供讀者參考。一、OracleSQL中的除法運算

PHP中的模運算子(%)是用來取得兩個數值相除的餘數的。在本文中,我們將詳細討論模運算子的作用及用法,並提供具體的程式碼範例來幫助讀者更好地理解。 1.模運算子的作用在數學中,當我們將一個整數除以另一個整數時,就會得到一個商和一個餘數。例如,當我們將10除以3時,商數為3,餘數為1。模運算子就是用來取得這個餘數的。 2.模運算子的用法在PHP中,使用%符號來表示模

將MySQL查詢結果陣列轉換為物件的方法如下:建立一個空物件陣列。循環結果數組並為每一行建立一個新的物件。使用foreach迴圈將每一行的鍵值對賦給新物件的對應屬性。將新物件加入到物件數組中。關閉資料庫連線。

PHP中,數組是有序序列,以索引存取元素;物件是具有屬性和方法的實體,透過new關鍵字建立。數組存取透過索引,物件存取通過屬性/方法。數組值傳遞,物件參考傳遞。

PHP中的Request物件是用來處理客戶端傳送到伺服器的HTTP請求的物件。透過Request對象,我們可以取得客戶端的請求訊息,例如請求方法、請求頭資訊、請求參數等,從而實現對請求的處理和回應。在PHP中,可以使用$_REQUEST、$_GET、$_POST等全域變數來取得要求的信息,但是這些變數並不是對象,而是陣列。為了更靈活和方便地處理請求訊息,可

序列化对Java性能的影响:序列化过程依赖于反射,会显著影响性能。序列化需要创建字节流存储对象数据,导致内存分配和处理成本。序列化大对象会消耗大量内存和时间。序列化后的对象在网络上传输时会增加负载量。

C++函式庫序列化和反序列化指南序列化:建立輸出流並將其轉換為存檔格式。將物件序列化到存檔中。反序列化:建立輸入流並將其從存檔格式還原。從存檔中反序列化物件。實戰範例:序列化:建立輸出流。建立存檔物件。建立物件並將其序列化到存檔中。反序列化:建立輸入流。建立存檔物件。建立物件並從存檔中反序列化。
