


Detailed explanation of serializing objects and storing them in HDFS in Spark
This article mainly introduces the relevant information of object serialization stored in hdfs in Spark in java. Friends in need can refer to
Spark in java. Object serialization and storage to HDFS
Abstract: Spark applications often encounter such a requirement: JAVA objects need to be serialized and stored in HDFS, especially using MLlib calculations Some of the models that come out are stored in hdfs so that the models can be used repeatedly. The following example demonstrates reading data from Hbase in the Spark environment, generating a word2vec model, and storing it in hdfs.
No more nonsense, just post the code. spark1.4 + hbase0.98
import org.apache.spark.storage.StorageLevel import scala.collection.JavaConverters._ import java.io.File import java.io.FileInputStream import java.io.FileOutputStream import java.io.ObjectInputStream import java.io.ObjectOutputStream import java.net.URI import java.util.Date import org.ansj.library.UserDefineLibrary import org.ansj.splitWord.analysis.NlpAnalysis import org.ansj.splitWord.analysis.ToAnalysis import org.apache.hadoop.fs.FSDataInputStream import org.apache.hadoop.fs.FSDataOutputStream import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.FileUtil import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.filter.FilterList import org.apache.hadoop.hbase.filter.PageFilter import org.apache.hadoop.hbase.filter.RegexStringComparator import org.apache.hadoop.hbase.filter.SingleColumnValueFilter import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import com.feheadline.fespark.db.Neo4jManager import com.feheadline.fespark.util.Env import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.rdd._ import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} import scala.math.log import scala.io.Source object Word2VecDemo { def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("Word2Vec Demo") sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") sparkConf.set("spark.kryoserializer.buffer", "256m") sparkConf.set("spark.kryoserializer.buffer.max","2046m") sparkConf.set("spark.akka.frameSize", "500") sparkConf.set("spark.rpc.askTimeout", "30") val sc = new SparkContext(sparkConf) val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper") hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled") val scan = new Scan() val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL) val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""") val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter( "data".getBytes, "article".getBytes, CompareOp.EQUAL, comp ) filterList.addFilter(articleFilter) filterList.addFilter(new PageFilter(100)) scan.setFilter(filterList) scan.setCaching(50) scan.setCacheBlocks(false) hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan)) val crawledRDD = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result] ) val articlesRDD = crawledRDD.filter{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) content != null } } val wordsInDoc = articlesRDD.map{ case (_,result) => { val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes)) if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq else Seq("") } } val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty) val word2vec = new Word2Vec() val model = word2vec.fit(fitleredWordsInDoc) //---------------------------------------重点看这里------------------------------------------------------------- //将上面的模型存储到hdfs val hadoopConf = sc.hadoopConfiguration hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/") val fileSystem = FileSystem.get(hadoopConf) val path = new Path("/user/hadoop/data/mllib/word2vec-object") val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path))) oos.writeObject(model) oos.close //这里示例另外一个程序直接从hdfs读取序列化对象使用模型 val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path))) val sample_model = ois.readObject.asInstanceOf[Word2VecModel] /* * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型 * import java.io._ * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object")) * val sample_model = ois.readObject.asInstanceOf[Word2VecModel] * ois.close */ //-------------------------------------------------------------------------------------------------------------- } }
The above is the detailed content of Detailed explanation of serializing objects and storing them in HDFS in Spark. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics



Windows operating system is one of the most popular operating systems in the world, and its new version Win11 has attracted much attention. In the Win11 system, obtaining administrator rights is an important operation. Administrator rights allow users to perform more operations and settings on the system. This article will introduce in detail how to obtain administrator permissions in Win11 system and how to effectively manage permissions. In the Win11 system, administrator rights are divided into two types: local administrator and domain administrator. A local administrator has full administrative rights to the local computer

Detailed explanation of division operation in OracleSQL In OracleSQL, division operation is a common and important mathematical operation, used to calculate the result of dividing two numbers. Division is often used in database queries, so understanding the division operation and its usage in OracleSQL is one of the essential skills for database developers. This article will discuss the relevant knowledge of division operations in OracleSQL in detail and provide specific code examples for readers' reference. 1. Division operation in OracleSQL

The modulo operator (%) in PHP is used to obtain the remainder of the division of two numbers. In this article, we will discuss the role and usage of the modulo operator in detail, and provide specific code examples to help readers better understand. 1. The role of the modulo operator In mathematics, when we divide an integer by another integer, we get a quotient and a remainder. For example, when we divide 10 by 3, the quotient is 3 and the remainder is 1. The modulo operator is used to obtain this remainder. 2. Usage of the modulo operator In PHP, use the % symbol to represent the modulus

Here's how to convert a MySQL query result array into an object: Create an empty object array. Loop through the resulting array and create a new object for each row. Use a foreach loop to assign the key-value pairs of each row to the corresponding properties of the new object. Adds a new object to the object array. Close the database connection.

In PHP, an array is an ordered sequence, and elements are accessed by index; an object is an entity with properties and methods, created through the new keyword. Array access is via index, object access is via properties/methods. Array values are passed and object references are passed.

The impact of serialization on Java performance: The serialization process relies on reflection, which will significantly affect performance. Serialization requires the creation of a byte stream to store object data, resulting in memory allocation and processing costs. Serializing large objects consumes a lot of memory and time. Serialized objects increase load when transmitted over the network.

In C++, there are three points to note when a function returns an object: The life cycle of the object is managed by the caller to prevent memory leaks. Avoid dangling pointers and ensure the object remains valid after the function returns by dynamically allocating memory or returning the object itself. The compiler may optimize copy generation of the returned object to improve performance, but if the object is passed by value semantics, no copy generation is required.

The Request object in PHP is an object used to handle HTTP requests sent by the client to the server. Through the Request object, we can obtain the client's request information, such as request method, request header information, request parameters, etc., so as to process and respond to the request. In PHP, you can use global variables such as $_REQUEST, $_GET, $_POST, etc. to obtain requested information, but these variables are not objects, but arrays. In order to process request information more flexibly and conveniently, you can
