Anda boleh menggunakan kaedah listStatus untuk mencapai keperluan di atas.
Tandatangan kaedah listStatus adalah seperti berikut
/** * List the statuses of the files/directories in the given path if the path is * a directory. * * @param f given path * @return the statuses of the files/directories in the given patch * @throws FileNotFoundException when the path does not exist; * IOException see specific implementation */ public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException;
Dapat dilihat bahawa listStatus hanya perlu lulus dalam Path parameter dan tatasusunan FileStatus dikembalikan.
Status Fail mengandungi maklumat berikut
/** Interface that represents the client side information for a file. */ @InterfaceAudience.Public @InterfaceStability.Stable public class FileStatus implements Writable, Comparable { private Path path; private long length; private boolean isdir; private short block_replication; private long blocksize; private long modification_time; private long access_time; private FsPermission permission; private String owner; private String group; private Path symlink; ....
Tidak sukar untuk dilihat dari FileStatus, termasuk laluan fail, saiz, sama ada direktori, replikasi_blok, saiz blok... dan maklumat lain.
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.LoggerFactory object HdfsOperation { val logger = LoggerFactory.getLogger(this.getClass) def tree(sc: SparkContext, path: String) : Unit = { val fs = FileSystem.get(sc.hadoopConfiguration) val fsPath = new Path(path) val status = fs.listStatus(fsPath) for(filestatus:FileStatus <- status) { logger.error("getPermission is: {}", filestatus.getPermission) logger.error("getOwner is: {}", filestatus.getOwner) logger.error("getGroup is: {}", filestatus.getGroup) logger.error("getLen is: {}", filestatus.getLen) logger.error("getModificationTime is: {}", filestatus.getModificationTime) logger.error("getReplication is: {}", filestatus.getReplication) logger.error("getBlockSize is: {}", filestatus.getBlockSize) if (filestatus.isDirectory) { val dirpath = filestatus.getPath.toString logger.error("文件夹名字为: {}", dirpath) tree(sc, dirpath) } else { val fullname = filestatus.getPath.toString val filename = filestatus.getPath.getName logger.error("全部文件名为: {}", fullname) logger.error("文件名为: {}", filename) } } } }
Jika ditentukan bahawa FileStatus ialah folder, kaedah pepohon dipanggil secara rekursif untuk mencapai tujuan merentasi semua.
Kaedah di atas adalah untuk melintasi semua fail dan folder. Jika anda hanya mahu mengulang fail, anda boleh menggunakan kaedah listFiles.
def findFiles(sc: SparkContext, path: String) = { val fs = FileSystem.get(sc.hadoopConfiguration) val fsPath = new Path(path) val files = fs.listFiles(fsPath, true) while(files.hasNext) { val filestatus = files.next() val fullname = filestatus.getPath.toString val filename = filestatus.getPath.getName logger.error("全部文件名为: {}", fullname) logger.error("文件名为: {}", filename) logger.error("文件大小为: {}", filestatus.getLen) } }
/** * List the statuses and block locations of the files in the given path. * * If the path is a directory, * if recursive is false, returns files in the directory; * if recursive is true, return files in the subtree rooted at the path. * If the path is a file, return the file's status and block locations. * * @param f is the path * @param recursive if the subdirectories need to be traversed recursively * * @return an iterator that traverses statuses of the files * * @throws FileNotFoundException when the path does not exist; * IOException see specific implementation */ public RemoteIterator<LocatedFileStatus> listFiles( final Path f, final boolean recursive) throws FileNotFoundException, IOException { ...
Seperti yang dapat dilihat daripada kod sumber, listFiles mengembalikan objek boleh lelar RemoteIterator<LocatedFileStatus>
, manakala listStatus mengembalikan tatasusunan. Pada masa yang sama, listFiles mengembalikan semua fail.
def mkdirToHdfs(sc: SparkContext, path: String) = { val fs = FileSystem.get(sc.hadoopConfiguration) val result = fs.mkdirs(new Path(path)) if (result) { logger.error("mkdirs already success!") } else { logger.error("mkdirs had failed!") } }
def deleteOnHdfs(sc: SparkContext, path: String) = { val fs = FileSystem.get(sc.hadoopConfiguration) val result = fs.delete(new Path(path), true) if (result) { logger.error("delete already success!") } else { logger.error("delete had failed!") } }
Atas ialah kandungan terperinci Bagaimana untuk mengendalikan HDFS menggunakan API Java?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!