光环大数据 Hadoop之HDFS原理及文件上传下载源码分析_光环大数据培训
光环大数据光环大数据-大数据培训知名品牌大数据培训知名品牌http:/hadoop.aura.cn 光环大数据光环大数据 http:/hadoop.aura.cn光环大数据光环大数据 HadoopHadoop 之之 HDFSHDFS 原理及文件上传下载源码分析原理及文件上传下载源码分析_ _光环大数据培训光环大数据培训光环大数据培训光环大数据培训认为,将继续介绍 hdfs 文件上传、下载源解析。文件上传文件上传先上文件上传的方法调用过程时序图:其主要执行过程:1.FileSystem 初始化,Client 拿到 NameNodeRpcServer 代理对象,建立与 NameNode的 RPC 通信(楼主上篇已经介绍过了)2.调用 FileSystem 的 create()方法,由于实现类为 DistributedFileSystem,所有是调用该类中的 create()方法3.DistributedFileSystem 持有 DFSClient 的引用,继续调用 DFSClient 中的 create()方法4.DFSOutputStream 提供的静态 newStreamForCreate()方法中调用NameNodeRpcServer 服务端的 create()方法并创建 DFSOutputStream 输出流对象返回5.通过 hadoop 提供的 IOUtil 工具类将输出流输出到本地下面我们来看下源码:首先初始化文件系统,建立与服务端的 RPC 通信1 HDFSDemo.java2 OutputStream os = fs.create(new Path(“/test.log“);调用 FileSystem 的 create()方法,由于 FileSystem 是一个抽象类,这里实际上是调用的该类的子类 create()方法1 /FileSystem.java2 public abstract FSDataOutputStream create(Path f,3 FsPermission permission,4 boolean overwrite,5 int bufferSize,6 short replication,7 long blockSize,8 Progressable progress) throws IOException;光环大数据光环大数据-大数据培训知名品牌大数据培训知名品牌http:/hadoop.aura.cn 光环大数据光环大数据 http:/hadoop.aura.cn前面我们已经说过 FileSystem.get()返回的是 DistributedFileSystem 对象,所以这里我们直接进入 DistributedFileSystem:1 /DistributedFileSystem.java 2 Override 3 public FSDataOutputStream create(final Path f, final FsPermission permission, 4 final EnumSet cflags, final int bufferSize, 5 final short replication, final long blockSize, final Progressable progress, 6 final ChecksumOpt checksumOpt) throws IOException 7 statistics.incrementWriteOps(1); 8 Path absF = fixRelativePart(f); 9 return new FileSystemLinkResolver() 10 Override11 public FSDataOutputStream doCall(final Path p)12 throws IOException, UnresolvedLinkException 13 final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,14 cflags, replication, blockSize, progress, bufferSize,15 checksumOpt);16 /dfs为 DistributedFileSystem 所持有的 DFSClient 对象,这里调用 DFSClient 中的 create()方法 17 return dfs.createWrappedOutputStream(dfsos, statistics);18 19 Override20 public FSDataOutputStream next(final FileSystem fs, final Path p)21 throws IOException 22 return fs.create(p, permission, cflags, bufferSize,23 replication, blockSize, progress, checksumOpt);24 25 .resolve(this, absF);26 DFSClient 的 create()返回一个 DFSOutputStream 对象:1 /DFSClient.java 2 public DFSOutputStream create(String src, 3 FsPermission permission, 4 EnumSet flag, 5 boolean createParent, 6 short replication, 7 long blockSize, 8 Progressable progress, 9 int buffersize,10 ChecksumOpt checksumOpt,11 InetSocketAddress favoredNodes) throws IOException 12 checkOpen();13 if (permission = null) 14 permission = FsPermission.getFileDefault();15 16 FsPermission masked = permission.applyUMask(dfsClientConf.uMask);17 if(LOG.isDebugEnabled() 18 LOG.debug(src + “: masked=“ + masked);19 20 /调用 DFSOutputStream 的静态方法 newStreamForCreate,光环大数据光环大数据-大数据培训知名品牌大数据培训知名品牌http:/hadoop.aura.cn 光环大数据光环大数据 http:/hadoop.aura.cn返回输出流 21 final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,22 src, masked, flag, createParent, replication, blockSize, progress,23 buffersize, dfsClientConf.createChecksum(checksumOpt),24 getFavoredNodesStr(favoredNodes);25 beginFileLease(result.getFileId(), result);26 return result;27 我们继续看下 newStreamForCreate()中的业务逻辑:1 /DFSOutputStream.java 2 static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src, 3 FsPermission masked, EnumSet flag, boolean createParent, 4 short replication, long blockSize, Progressable progress, int buffersize, 5 DataChecksum checksum, String favoredNodes) throws IOException 6 TraceScope scope = 7 dfsClient.getPathTraceScope(“newStreamForCreate“, src); 8 try 9 HdfsFileStatus stat = null;10 boolean shouldRetry = true;11 int retryCount = CREATE_RETRY_COUNT;12 while (shouldRetry) 13 shouldRetry = false;14 try 15 /这里通过 dfsClient 的 NameNode 代理对象调用 NameNodeRpcServer 中实现的 create()方法 16 stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,17 new EnumSetWritable(flag), createParent, replication,18 blockSize, SUPPORTED_CRYPTO_VERSIONS);19 break;20 catch (RemoteException re) 21 IOException e = re.unwrapRemoteException(22 AccessControlException.class,23 DSQuotaExceededException.class,24 FileAlreadyExistsException.class,25 FileNotFoundException.class,26 ParentNotDirectoryException.class,27 NSQuotaExceededException.class,28 RetryStartFileException.class,29 SafeModeException.class,30 UnresolvedPathException.class,31 SnapshotAccessControlException.class,32 UnknownCryptoProtocolVersionExcept