详解Spark中将对象序列化存储到hdfs
这篇文章主要介绍了java 中Spark中将对象序列化存储到hdfs的相关资料,需要的朋友可以参考下
java 中Spark中将对象序列化存储到hdfs
摘要: Spark应用中经常会遇到这样一个需求: 需要将JAVA对象序列化并存储到HDFS, 尤其是利用MLlib计算出来的一些模型, 存储到hdfs以便模型可以反复利用. 下面的例子演示了Spark环境下从Hbase读取数据, 生成一个word2vec模型, 存储到hdfs.
废话不多说, 直接贴代码了. spark1.4 + hbase0.98
import org.apache.spark.storage.StorageLevel import scala.collection.JavaConverters._ import java.io.File import java.io.FileInputStream import java.io.FileOutputStream import java.io.ObjectInputStream import java.io.ObjectOutputStream import java.net.URI import java.util.Date import org.ansj.library.UserDefineLibrary import org.ansj.splitWord.analysis.NlpAnalysis import org.ansj.splitWord.analysis.ToAnalysis import org.apache.hadoop.fs.FSDataInputStream import org.apache.hadoop.fs.FSDataOutputStream import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.FileUtil import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.filter.FilterList import org.apache.hadoop.hbase.filter.PageFilter import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.filter.SingleColumnValueFilter import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import com.feheadline.fespark.db.Neo4jManager import com.feheadline.fespark.util.Env import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd._ import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} import scala.math.log import scala.io.Source object Word2VecDemo { def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("Word2Vec Demo") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.kryoserializer.buffer", "256m") sparkConf.set("spark.kryoserializer.buffer.max","2046m") sparkConf.set("spark.akka.frameSize", "500") sparkConf.set("spark.rpc.askTimeout", "30") val sc = new SparkContext(sparkConf) val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper") hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled") val scan = new Scan() val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL) val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""") val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter( "data".getBytes, "article".getBytes, CompareOp.EQUAL, comp ) filterList.addFilter(articleFilter) filterList.addFilter(new PageFilter(100)) scan.setFilter(filterList) scan.setCaching(50) scan.setCacheBlocks(false) hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan)) val crawledRDD = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) val articlesRDD = crawledRDD.filter{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) content != null } } val wordsInDoc = articlesRDD.map{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq else Seq("") } } val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty) val word2vec = new Word2Vec() val model = word2vec.fit(fitleredWordsInDoc) //---------------------------------------重点看这里------------------------------------------------------------- //将上面的模型存储到hdfs val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/") val fileSystem = FileSystem.get(hadoopConf) val path = new Path("/user/hadoop/data/mllib/word2vec-object") val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path))) oos.writeObject(model) oos.close //这里示例另外一个程序直接从hdfs读取序列化对象使用模型 val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path))) val sample_model = ois.readObject.asInstanceOf[Word2VecModel] /* * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型 * import java.io._ * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object")) * val sample_model = ois.readObject.asInstanceOf[Word2VecModel] * ois.close */ //-------------------------------------------------------------------------------------------------------------- } }
Atas ialah kandungan terperinci 详解Spark中将对象序列化存储到hdfs. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

AI Hentai Generator
Menjana ai hentai secara percuma.

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Topik panas



Sistem pengendalian Windows ialah salah satu sistem pengendalian yang paling popular di dunia, dan versi baharunya Win11 telah menarik perhatian ramai. Dalam sistem Win11, mendapatkan hak pentadbir adalah operasi penting Hak pentadbir membolehkan pengguna melakukan lebih banyak operasi dan tetapan pada sistem. Artikel ini akan memperkenalkan secara terperinci cara mendapatkan kebenaran pentadbir dalam sistem Win11 dan cara mengurus kebenaran dengan berkesan. Dalam sistem Win11, hak pentadbir dibahagikan kepada dua jenis: pentadbir tempatan dan pentadbir domain. Pentadbir tempatan mempunyai hak pentadbiran penuh ke atas komputer tempatan

Penjelasan terperinci tentang operasi bahagi dalam OracleSQL Dalam OracleSQL, operasi bahagi ialah operasi matematik yang biasa dan penting, digunakan untuk mengira hasil pembahagian dua nombor. Bahagian sering digunakan dalam pertanyaan pangkalan data, jadi memahami operasi bahagian dan penggunaannya dalam OracleSQL adalah salah satu kemahiran penting untuk pembangun pangkalan data. Artikel ini akan membincangkan pengetahuan berkaitan operasi bahagian dalam OracleSQL secara terperinci dan menyediakan contoh kod khusus untuk rujukan pembaca. 1. Operasi bahagian dalam OracleSQL

Operator modulo (%) dalam PHP digunakan untuk mendapatkan baki pembahagian dua nombor. Dalam artikel ini, kami akan membincangkan peranan dan penggunaan pengendali modulo secara terperinci, dan memberikan contoh kod khusus untuk membantu pembaca memahami dengan lebih baik. 1. Peranan pengendali modulo Dalam matematik, apabila kita membahagi integer dengan integer lain, kita mendapat hasil bagi dan baki. Sebagai contoh, apabila kita membahagi 10 dengan 3, hasil bahagi ialah 3 dan selebihnya ialah 1. Operator modulo digunakan untuk mendapatkan baki ini. 2. Penggunaan operator modulo Dalam PHP, gunakan simbol % untuk mewakili modulus

Begini cara untuk menukar tatasusunan hasil pertanyaan MySQL kepada objek: Cipta tatasusunan objek kosong. Gelung melalui tatasusunan yang terhasil dan buat objek baharu untuk setiap baris. Gunakan gelung foreach untuk menetapkan pasangan nilai kunci setiap baris kepada sifat yang sepadan bagi objek baharu. Menambah objek baharu pada tatasusunan objek. Tutup sambungan pangkalan data.

Dalam PHP, tatasusunan ialah urutan tersusun, dan elemen diakses mengikut indeks; Akses tatasusunan adalah melalui indeks, akses objek adalah melalui sifat/kaedah. Nilai tatasusunan diluluskan dan rujukan objek diluluskan.

Objek Permintaan dalam PHP ialah objek yang digunakan untuk mengendalikan permintaan HTTP yang dihantar oleh klien ke pelayan. Melalui objek Permintaan, kami boleh mendapatkan maklumat permintaan pelanggan, seperti kaedah permintaan, maklumat pengepala permintaan, parameter permintaan, dsb., untuk memproses dan membalas permintaan tersebut. Dalam PHP, anda boleh menggunakan pembolehubah global seperti $_REQUEST, $_GET, $_POST, dll. untuk mendapatkan maklumat yang diminta, tetapi pembolehubah ini bukan objek, tetapi tatasusunan. Untuk memproses maklumat permintaan dengan lebih fleksibel dan mudah, anda boleh

Kesan siri pada prestasi Java: Proses siri bergantung pada refleksi, yang akan menjejaskan prestasi dengan ketara. Serialisasi memerlukan penciptaan aliran bait untuk menyimpan data objek, mengakibatkan peruntukan memori dan kos pemprosesan. Mensiri objek besar menggunakan banyak memori dan masa. Objek bersiri meningkatkan beban apabila dihantar melalui rangkaian.

Panduan Pensirilan dan Penyahserikatan Perpustakaan C++ Pensirian: Mencipta aliran output dan menukarnya kepada format arkib. Mensiri objek ke dalam arkib. Penyahserialisasian: Mencipta aliran input dan memulihkannya daripada format arkib. Nyahserialisasi objek daripada arkib. Contoh praktikal: Serialisasi: Mencipta aliran keluaran. Buat objek arkib. Cipta dan sirikan objek ke dalam arkib. Penyahserikatan: Buat aliran input. Buat objek arkib. Cipta objek dan deserialisasikannya daripada arkib.
