首頁 Java java教程 詳解Spark中將物件序列化儲存到hdfs

詳解Spark中將物件序列化儲存到hdfs

Jun 17, 2017 am 11:42 AM
spark 物件 序列化 詳解

這篇文章主要介紹了java 中Spark中將物件序列化儲存到hdfs的相關資料,需要的朋友可以參考下

java 中Spark中將物件序列化儲存到hdfs

摘要: Spark應用程式中經常會遇到這樣一個需求: 需要將JAVA物件序列化並儲存到HDFS, 尤其是利用MLlib計算出來的一些模型, 儲存到hdfs以便模型可以重複利用. 下面的範例示範了Spark環境下從Hbase讀取資料, 產生一個word2vec模型, 儲存到hdfs.

#廢話不多說, 直接貼程式碼了. spark1.4 + hbase0.98


#
import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source

object Word2VecDemo {

 def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
 }

 def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer", "256m")
  sparkConf.set("spark.kryoserializer.buffer.max","2046m")
  sparkConf.set("spark.akka.frameSize", "500")
  sparkConf.set("spark.rpc.askTimeout", "30")
  

  val sc = new SparkContext(sparkConf)
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

  val scan = new Scan()
  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
  
  val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
  
  val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
  "data".getBytes,
  "article".getBytes,
  CompareOp.EQUAL,
  comp
  )
  
  filterList.addFilter(articleFilter)
  filterList.addFilter(new PageFilter(100))
  
  scan.setFilter(filterList)
  scan.setCaching(50)
  scan.setCacheBlocks(false)
  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

  val crawledRDD = sc.newAPIHadoopRDD(
   hbaseConf,
   classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result]
  )
 
  val articlesRDD = crawledRDD.filter{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     content != null
   }
  }

  val wordsInDoc = articlesRDD.map{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
     else Seq("")
   }
  }
  
  val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
  
  val word2vec = new Word2Vec()
  val model = word2vec.fit(fitleredWordsInDoc)
  
  //---------------------------------------重点看这里-------------------------------------------------------------
  //将上面的模型存储到hdfs
  val hadoopConf = sc.hadoopConfiguration
  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
  val fileSystem = FileSystem.get(hadoopConf)
  val path = new Path("/user/hadoop/data/mllib/word2vec-object")
  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
  oos.writeObject(model)
  oos.close
  
  //这里示例另外一个程序直接从hdfs读取序列化对象使用模型
  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  
  /*
  * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
  * import java.io._
  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  * ois.close
  */
  //--------------------------------------------------------------------------------------------------------------
 }
}
登入後複製

以上是詳解Spark中將物件序列化儲存到hdfs的詳細內容。更多資訊請關注PHP中文網其他相關文章!

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您聽不到任何人,如何修復音頻
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解鎖Myrise中的所有內容
4 週前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

Win11管理員權限取得詳解 Win11管理員權限取得詳解 Mar 08, 2024 pm 03:06 PM

Windows作業系統是全球最受歡迎的作業系統之一,其新版本Win11備受矚目。在Win11系統中,管理員權限的取得是一個重要的操作,管理員權限可以讓使用者對系統進行更多的操作和設定。本文將詳細介紹在Win11系統中如何取得管理員權限,以及如何有效地管理權限。在Win11系統中,管理員權限分為本機管理員和網域管理員兩種。本機管理員是指具有對本機電腦的完全管理權限

Oracle SQL中的除法運算詳解 Oracle SQL中的除法運算詳解 Mar 10, 2024 am 09:51 AM

OracleSQL中的除法運算詳解在OracleSQL中,除法運算是一種常見且重要的數學運算運算,用來計算兩個數相除的結果。除法在資料庫查詢中經常用到,因此了解OracleSQL中的除法運算及其用法是資料庫開發人員必備的技能之一。本文將詳細討論OracleSQL中除法運算的相關知識,並提供具體的程式碼範例供讀者參考。一、OracleSQL中的除法運算

PHP模運算子的作用及用法詳解 PHP模運算子的作用及用法詳解 Mar 19, 2024 pm 04:33 PM

PHP中的模運算子(%)是用來取得兩個數值相除的餘數的。在本文中,我們將詳細討論模運算子的作用及用法,並提供具體的程式碼範例來幫助讀者更好地理解。 1.模運算子的作用在數學中,當我們將一個整數除以另一個整數時,就會得到一個商和一個餘數。例如,當我們將10除以3時,商數為3,餘數為1。模運算子就是用來取得這個餘數的。 2.模運算子的用法在PHP中,使用%符號來表示模

如何將 MySQL 查詢結果陣列轉換為物件? 如何將 MySQL 查詢結果陣列轉換為物件? Apr 29, 2024 pm 01:09 PM

將MySQL查詢結果陣列轉換為物件的方法如下:建立一個空物件陣列。循環結果數組並為每一行建立一個新的物件。使用foreach迴圈將每一行的鍵值對賦給新物件的對應屬性。將新物件加入到物件數組中。關閉資料庫連線。

數組和物件在 PHP 中的差異是什麼? 數組和物件在 PHP 中的差異是什麼? Apr 29, 2024 pm 02:39 PM

PHP中,數組是有序序列,以索引存取元素;物件是具有屬性和方法的實體,透過new關鍵字建立。數組存取透過索引,物件存取通過屬性/方法。數組值傳遞,物件參考傳遞。

PHP中的Request物件是什麼? PHP中的Request物件是什麼? Feb 27, 2024 pm 09:06 PM

PHP中的Request物件是用來處理客戶端傳送到伺服器的HTTP請求的物件。透過Request對象,我們可以取得客戶端的請求訊息,例如請求方法、請求頭資訊、請求參數等,從而實現對請求的處理和回應。在PHP中,可以使用$_REQUEST、$_GET、$_POST等全域變數來取得要求的信息,但是這些變數並不是對象,而是陣列。為了更靈活和方便地處理請求訊息,可

Java序列化如何影響效能? Java序列化如何影響效能? Apr 16, 2024 pm 06:36 PM

序列化对Java性能的影响:序列化过程依赖于反射,会显著影响性能。序列化需要创建字节流存储对象数据,导致内存分配和处理成本。序列化大对象会消耗大量内存和时间。序列化后的对象在网络上传输时会增加负载量。

C++ 函式庫如何進行序列化與反序列化? C++ 函式庫如何進行序列化與反序列化? Apr 18, 2024 am 10:06 AM

C++函式庫序列化和反序列化指南序列化:建立輸出流並將其轉換為存檔格式。將物件序列化到存檔中。反序列化:建立輸入流並將其從存檔格式還原。從存檔中反序列化物件。實戰範例:序列化:建立輸出流。建立存檔物件。建立物件並將其序列化到存檔中。反序列化:建立輸入流。建立存檔物件。建立物件並從存檔中反序列化。

See all articles