listStatus 메서드를 사용하면 위의 요구 사항을 충족할 수 있습니다.
listStatus 메소드의 시그니처는 다음과 같습니다
/** * List the statuses of the files/directories in the given path if the path is * a directory. * * @param f given path * @return the statuses of the files/directories in the given patch * @throws FileNotFoundException when the path does not exist; * IOException see specific implementation */ public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException;
listStatus는 Path 매개변수만 전달하면 FileStatus 배열이 반환되는 것을 알 수 있습니다.
FileStatus에는 다음 정보가 포함되어 있습니다
/** Interface that represents the client side information for a file. */ @InterfaceAudience.Public @InterfaceStability.Stable public class FileStatus implements Writable, Comparable { private Path path; private long length; private boolean isdir; private short block_replication; private long blocksize; private long modification_time; private long access_time; private FsPermission permission; private String owner; private String group; private Path symlink; ....
FileStatus에서 파일 경로, 크기, 디렉터리 여부, block_replication, 블록 크기... 및 기타 정보를 보는 것은 어렵지 않습니다.
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.LoggerFactory object HdfsOperation { val logger = LoggerFactory.getLogger(this.getClass) def tree(sc: SparkContext, path: String) : Unit = { val fs = FileSystem.get(sc.hadoopConfiguration) val fsPath = new Path(path) val status = fs.listStatus(fsPath) for(filestatus:FileStatus <- status) { logger.error("getPermission is: {}", filestatus.getPermission) logger.error("getOwner is: {}", filestatus.getOwner) logger.error("getGroup is: {}", filestatus.getGroup) logger.error("getLen is: {}", filestatus.getLen) logger.error("getModificationTime is: {}", filestatus.getModificationTime) logger.error("getReplication is: {}", filestatus.getReplication) logger.error("getBlockSize is: {}", filestatus.getBlockSize) if (filestatus.isDirectory) { val dirpath = filestatus.getPath.toString logger.error("文件夹名字为: {}", dirpath) tree(sc, dirpath) } else { val fullname = filestatus.getPath.toString val filename = filestatus.getPath.getName logger.error("全部文件名为: {}", fullname) logger.error("文件名为: {}", filename) } } } }
fileStatus가 폴더인 것으로 확인되면 모든 항목을 순회하는 목적을 달성하기 위해 tree 메서드가 재귀적으로 호출됩니다.
위 방법은 모든 파일과 폴더를 탐색하는 것입니다. 파일을 반복하고 싶다면 listFiles 메소드를 사용할 수 있습니다.
def findFiles(sc: SparkContext, path: String) = { val fs = FileSystem.get(sc.hadoopConfiguration) val fsPath = new Path(path) val files = fs.listFiles(fsPath, true) while(files.hasNext) { val filestatus = files.next() val fullname = filestatus.getPath.toString val filename = filestatus.getPath.getName logger.error("全部文件名为: {}", fullname) logger.error("文件名为: {}", filename) logger.error("文件大小为: {}", filestatus.getLen) } }
/** * List the statuses and block locations of the files in the given path. * * If the path is a directory, * if recursive is false, returns files in the directory; * if recursive is true, return files in the subtree rooted at the path. * If the path is a file, return the file's status and block locations. * * @param f is the path * @param recursive if the subdirectories need to be traversed recursively * * @return an iterator that traverses statuses of the files * * @throws FileNotFoundException when the path does not exist; * IOException see specific implementation */ public RemoteIterator<LocatedFileStatus> listFiles( final Path f, final boolean recursive) throws FileNotFoundException, IOException { ...
소스 코드에서 볼 수 있듯이 listFiles는 반복 가능한 객체RemoteIterator<LocatedFileStatus>
를 반환하고 listStatus는 배열을 반환합니다. 동시에 listFiles는 모든 파일을 반환합니다.
def mkdirToHdfs(sc: SparkContext, path: String) = { val fs = FileSystem.get(sc.hadoopConfiguration) val result = fs.mkdirs(new Path(path)) if (result) { logger.error("mkdirs already success!") } else { logger.error("mkdirs had failed!") } }
def deleteOnHdfs(sc: SparkContext, path: String) = { val fs = FileSystem.get(sc.hadoopConfiguration) val result = fs.delete(new Path(path), true) if (result) { logger.error("delete already success!") } else { logger.error("delete had failed!") } }
def uploadToHdfs(sc: SparkContext, localPath: String, hdfsPath: String): Unit = { val fs = FileSystem.get(sc.hadoopConfiguration) fs.copyFromLocalFile(new Path(localPath), new Path(hdfsPath)) fs.close() }
def downloadFromHdfs(sc: SparkContext, localPath: String, hdfsPath: String) = { val fs = FileSystem.get(sc.hadoopConfiguration) fs.copyToLocalFile(new Path(hdfsPath), new Path(localPath)) fs.close() }
위 내용은 Java API를 사용하여 HDFS를 작동하는 방법은 무엇입니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!