Home Java javaTutorial Detailed explanation of serializing objects and storing them in HDFS in Spark

Detailed explanation of serializing objects and storing them in HDFS in Spark

Jun 17, 2017 am 11:42 AM
spark object Serialization Detailed explanation

This article mainly introduces the relevant information of object serialization stored in hdfs in Spark in java. Friends in need can refer to

Spark in java. Object serialization and storage to HDFS

Abstract: Spark applications often encounter such a requirement: JAVA objects need to be serialized and stored in HDFS, especially using MLlib calculations Some of the models that come out are stored in hdfs so that the models can be used repeatedly. The following example demonstrates reading data from Hbase in the Spark environment, generating a word2vec model, and storing it in hdfs.

No more nonsense, just post the code. spark1.4 + hbase0.98


import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
import java.io.File
import java.io.FileInputStream
import java.io.FileOutputStream
import java.io.ObjectInputStream
import java.io.ObjectOutputStream
import java.net.URI
import java.util.Date
import org.ansj.library.UserDefineLibrary
import org.ansj.splitWord.analysis.NlpAnalysis
import org.ansj.splitWord.analysis.ToAnalysis
import org.apache.hadoop.fs.FSDataInputStream
import org.apache.hadoop.fs.FSDataOutputStream
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.filter.FilterList
import org.apache.hadoop.hbase.filter.PageFilter
import org.apache.hadoop.hbase.filter.RegexStringComparator
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import com.feheadline.fespark.db.Neo4jManager
import com.feheadline.fespark.util.Env
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd._
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
import scala.math.log
import scala.io.Source

object Word2VecDemo {

 def convertScanToString(scan: Scan) = {
  val proto = ProtobufUtil.toScan(scan)
  Base64.encodeBytes(proto.toByteArray)
 }

 def main(args: Array[String]): Unit = {
  val sparkConf = new SparkConf().setAppName("Word2Vec Demo")
  sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  sparkConf.set("spark.kryoserializer.buffer", "256m")
  sparkConf.set("spark.kryoserializer.buffer.max","2046m")
  sparkConf.set("spark.akka.frameSize", "500")
  sparkConf.set("spark.rpc.askTimeout", "30")
  

  val sc = new SparkContext(sparkConf)
  val hbaseConf = HBaseConfiguration.create()
  hbaseConf.set("hbase.zookeeper.quorum", "myzookeeper")

  hbaseConf.set(TableInputFormat.INPUT_TABLE, "crawled")

  val scan = new Scan()
  val filterList:FilterList = new FilterList(FilterList.Operator.MUST_PASS_ALL)
  
  val comp:RegexStringComparator = new RegexStringComparator(""".{1500,}""")
  
  val articleFilter:SingleColumnValueFilter = new SingleColumnValueFilter(
  "data".getBytes,
  "article".getBytes,
  CompareOp.EQUAL,
  comp
  )
  
  filterList.addFilter(articleFilter)
  filterList.addFilter(new PageFilter(100))
  
  scan.setFilter(filterList)
  scan.setCaching(50)
  scan.setCacheBlocks(false)
  hbaseConf.set(TableInputFormat.SCAN,convertScanToString(scan))

  val crawledRDD = sc.newAPIHadoopRDD(
   hbaseConf,
   classOf[TableInputFormat],
   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
   classOf[org.apache.hadoop.hbase.client.Result]
  )
 
  val articlesRDD = crawledRDD.filter{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     content != null
   }
  }

  val wordsInDoc = articlesRDD.map{
   case (_,result) => {
     val content = Bytes.toString(result.getValue("data".getBytes,"article".getBytes))
     if(content!=null)ToAnalysis.parse(content).asScala.map(_.getName).toSeq
     else Seq("")
   }
  }
  
  val fitleredWordsInDoc = wordsInDoc.filter(_.nonEmpty)
  
  val word2vec = new Word2Vec()
  val model = word2vec.fit(fitleredWordsInDoc)
  
  //---------------------------------------重点看这里-------------------------------------------------------------
  //将上面的模型存储到hdfs
  val hadoopConf = sc.hadoopConfiguration
  hadoopConf.set("fs.defaultFS", "hdfs://myhadoop:9000/")
  val fileSystem = FileSystem.get(hadoopConf)
  val path = new Path("/user/hadoop/data/mllib/word2vec-object")
  val oos = new ObjectOutputStream(new FSDataOutputStream(fileSystem.create(path)))
  oos.writeObject(model)
  oos.close
  
  //这里示例另外一个程序直接从hdfs读取序列化对象使用模型
  val ois = new ObjectInputStream(new FSDataInputStream(fileSystem.open(path)))
  val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  
  /*
  * //你还可以将序列化文件从hdfs放到本地, scala程序使用模型
  * import java.io._
  * import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
  * val ois = new ObjectInputStream(new FileInputStream("/home/cherokee/tmp/word2vec-object"))
  * val sample_model = ois.readObject.asInstanceOf[Word2VecModel]
  * ois.close
  */
  //--------------------------------------------------------------------------------------------------------------
 }
}
Copy after login

The above is the detailed content of Detailed explanation of serializing objects and storing them in HDFS in Spark. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
1 months ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Best Graphic Settings
1 months ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. How to Fix Audio if You Can't Hear Anyone
1 months ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Chat Commands and How to Use Them
1 months ago By 尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

Detailed explanation of obtaining administrator rights in Win11 Detailed explanation of obtaining administrator rights in Win11 Mar 08, 2024 pm 03:06 PM

Windows operating system is one of the most popular operating systems in the world, and its new version Win11 has attracted much attention. In the Win11 system, obtaining administrator rights is an important operation. Administrator rights allow users to perform more operations and settings on the system. This article will introduce in detail how to obtain administrator permissions in Win11 system and how to effectively manage permissions. In the Win11 system, administrator rights are divided into two types: local administrator and domain administrator. A local administrator has full administrative rights to the local computer

Detailed explanation of division operation in Oracle SQL Detailed explanation of division operation in Oracle SQL Mar 10, 2024 am 09:51 AM

Detailed explanation of division operation in OracleSQL In OracleSQL, division operation is a common and important mathematical operation, used to calculate the result of dividing two numbers. Division is often used in database queries, so understanding the division operation and its usage in OracleSQL is one of the essential skills for database developers. This article will discuss the relevant knowledge of division operations in OracleSQL in detail and provide specific code examples for readers' reference. 1. Division operation in OracleSQL

Detailed explanation of the role and usage of PHP modulo operator Detailed explanation of the role and usage of PHP modulo operator Mar 19, 2024 pm 04:33 PM

The modulo operator (%) in PHP is used to obtain the remainder of the division of two numbers. In this article, we will discuss the role and usage of the modulo operator in detail, and provide specific code examples to help readers better understand. 1. The role of the modulo operator In mathematics, when we divide an integer by another integer, we get a quotient and a remainder. For example, when we divide 10 by 3, the quotient is 3 and the remainder is 1. The modulo operator is used to obtain this remainder. 2. Usage of the modulo operator In PHP, use the % symbol to represent the modulus

How to convert MySQL query result array to object? How to convert MySQL query result array to object? Apr 29, 2024 pm 01:09 PM

Here's how to convert a MySQL query result array into an object: Create an empty object array. Loop through the resulting array and create a new object for each row. Use a foreach loop to assign the key-value pairs of each row to the corresponding properties of the new object. Adds a new object to the object array. Close the database connection.

What is the difference between arrays and objects in PHP? What is the difference between arrays and objects in PHP? Apr 29, 2024 pm 02:39 PM

In PHP, an array is an ordered sequence, and elements are accessed by index; an object is an entity with properties and methods, created through the new keyword. Array access is via index, object access is via properties/methods. Array values ​​are passed and object references are passed.

How does Java serialization affect performance? How does Java serialization affect performance? Apr 16, 2024 pm 06:36 PM

The impact of serialization on Java performance: The serialization process relies on reflection, which will significantly affect performance. Serialization requires the creation of a byte stream to store object data, resulting in memory allocation and processing costs. Serializing large objects consumes a lot of memory and time. Serialized objects increase load when transmitted over the network.

What should I pay attention to when a C++ function returns an object? What should I pay attention to when a C++ function returns an object? Apr 19, 2024 pm 12:15 PM

In C++, there are three points to note when a function returns an object: The life cycle of the object is managed by the caller to prevent memory leaks. Avoid dangling pointers and ensure the object remains valid after the function returns by dynamically allocating memory or returning the object itself. The compiler may optimize copy generation of the returned object to improve performance, but if the object is passed by value semantics, no copy generation is required.

What is the Request object in PHP? What is the Request object in PHP? Feb 27, 2024 pm 09:06 PM

The Request object in PHP is an object used to handle HTTP requests sent by the client to the server. Through the Request object, we can obtain the client's request information, such as request method, request header information, request parameters, etc., so as to process and respond to the request. In PHP, you can use global variables such as $_REQUEST, $_GET, $_POST, etc. to obtain requested information, but these variables are not objects, but arrays. In order to process request information more flexibly and conveniently, you can

See all articles