hadoop fs -put xxxx
解析命令
FsShell类的main方法进入
创建实例FsShell shell = newShellInstance();
ToolRunner.run(shell, argv)
进入FsShell的run方法
先init方法,主要是commandFactory = new CommandFactory(getConf())
Command instance = commandFactory.getInstance(cmd);
instance.run(argv)
根据命令判断用Command 的那个实现类。模板模式。
判断出来的是 CopyCommands.Put 类
Command 的run方法主要调用两个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19public int run(String...argv) { LinkedList<String> args = new LinkedList<String>(Arrays.asList(argv)); try { if (isDeprecated()) { displayWarning( "DEPRECATED: Please use '"+ getReplacementCommand() + "' instead."); } //参数的预处理,在这里会把参数中的一些参数给剥离出来 processOptions(args); processRawArguments(args); } catch (CommandInterruptException e) { displayError("Interrupted"); return 130; } catch (IOException e) { displayError(e); } return (numErrors == 0) ? exitCode : exitCodeForError(); }
processOptions是个抽象方法,为CopyCommands.Put 类中的方法
1
2
3
4
5
6
7
8
9
10
11
12
13protected void processOptions(LinkedList<String> args) throws IOException { CommandFormat cf = new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d"); cf.parse(args); setOverwrite(cf.getOpt("f")); setPreserve(cf.getOpt("p")); setLazyPersist(cf.getOpt("l")); setDirectWrite(cf.getOpt("d")); getRemoteDestination(args); // should have a -r option setRecursive(true); }
CopyCommands.Put 类重写了processRawArguments里面调用processArguments方法
经过几轮调用到CommandWithDestination类的processPath
1
2
3
4
5
6
7
8
9
10
11
12
13protected void processPath(PathData src, PathData dst) throws IOException { if (src.stat.isSymlink()) { // TODO: remove when FileContext is supported, this needs to either // copy the symlink or deref the symlink throw new PathOperationException(src.toString()); } else if (src.stat.isFile()) { copyFileToTarget(src, dst); } else if (src.stat.isDirectory() && !isRecursive()) { throw new PathIsDirectoryException(src.toString()); } }
copyFileToTarget(src, dst)这个方法是上传文件的核心
namenode判断是否存在目录,创建目录,生成块的元数据信息
TargetFileSystem类的writeStreamToFile
进过一系列调create方法,中间经过DistributedFileSystem类,最终到DFSOutputStream的newStreamForCreate方法
1
2
3
4
5
6
7
8stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable<>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
调用NameNodeRpcServer类的create方法,使用RPC调用远程服务端的方法
核心方法为
1
2
3
4
5
6
7
8
9
10
11try { PermissionStatus perm = new PermissionStatus(getRemoteUser() .getShortUserName(), null, masked); status = namesystem.startFile(src, perm, clientName, clientMachine, flag.get(), createParent, replication, blockSize, supportedVersions, ecPolicyName, cacheEntry != null); } finally { RetryCache.setState(cacheEntry, status != null, status); }
进入FSNamesystem类的startFileInt方法
先后调用 FSDirWriteFileOp类的startFile,addFile,newINodeFile->(FSDirectory)fsd.addINode->addLastINode
FSDirWriteFileOp.startFile后 fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry)写fseditLog
创建输出流
回到DFSOutputStream的newStreamForCreate方法,create结束后,创建输出流
1
2
3
4
5
6
7
8
9if(stat.getErasureCodingPolicy() != null) { out = new DFSStripedOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes); } else { out = new DFSOutputStream(dfsClient, src, stat, flag, progress, checksum, favoredNodes, true); } out.start();
new DFSOutputStream中调用computePacketChunkSize方法
out.start方法,调用一个DataStreamer线程run。
开始里面有空的dataQueue在等待。
数据流发送到datanode
从create一层层出来,到TargetFileSystem类
1
2
3
4
5
6
7
8
9
10
11
12
13void writeStreamToFile(InputStream in, PathData target, boolean lazyPersist, boolean direct) throws IOException { FSDataOutputStream out = null; try { out = create(target, lazyPersist, direct); IOUtils.copyBytes(in, out, getConf(), true); } finally { IOUtils.closeStream(out); // just in case copyBytes didn't } }
IOUtils.copyBytes为写文件方法,写完后_copy也就重命名为正常的文件夹名。
->FSOutputSummer.write-> FSOutputSummer.flushBuffer()->FSOutputSummer.writeChecksumChunks->DFSOutputStream.writeChunk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) throws IOException { writeChunkPrepare(len, ckoff, cklen); currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); currentPacket.incNumChunks(); getStreamer().incBytesCurBlock(len); // If packet is full, enqueue it for transmission if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || getStreamer().getBytesCurBlock() == blockSize) { enqueueCurrentPacketFull(); } }
enqueueCurrentPacketFull()方法
1
2
3
4
5
6
7
8
9
10
11synchronized void enqueueCurrentPacketFull() throws IOException { LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={}," + " appendChunk={}, {}", currentPacket, src, getStreamer() .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(), getStreamer()); enqueueCurrentPacket(); adjustChunkBoundary(); endBlock(); }
enqueueCurrentPacket调用了DataStreamer的waitAndQueuePacket方法。
如果队列不满调用queuePacket方法
1
2
3
4
5
6
7
8
9
10
11
12void queuePacket(DFSPacket packet) { synchronized (dataQueue) { if (packet == null) return; packet.addTraceParent(Tracer.getCurrentSpanId()); dataQueue.addLast(packet); lastQueuedSeqno = packet.getSeqno(); LOG.debug("Queued {}, {}", packet, this); dataQueue.notifyAll(); } }
加到队列中,并通知
DataStreamer线程run这时就会唤醒,
调用dataQueue.getFirst()获取第一个数据包,续通过setPipeline(nextBlockOutputStream())创建管道。
nextBlockOutputStream获取块信息
1.调用 locateFollowingBlock方法->
DFSOutputStream.addBlock
里面开始和服务端通信dfsClient.namenode.addBlock
里面一直调用namesystem.getAdditionalBlock->FSDirWriteFileOp.chooseTargetForNewBlock->blockplacement.chooseTarget->BlockPlacementPolicy.chooseTarget->BlockPlacementPolicyDefault.chooseTarget->chooseTargetInOrder进行机架感知,选择机架
2.调用createBlockOutputStream方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(s, readTimeout); IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration()))); blockReplyStream = new DataInputStream(unbufIn); // // Xmit header info to datanode // BlockConstructionStage bcs = recoveryFlag ? stage.getRecoveryStage() : stage; // We cannot change the block length in 'block' as it counts the number // of bytes ack'ed. ExtendedBlock blockCopy = block.getCurrentBlock(); blockCopy.setNumBytes(stat.getBlockSize()); boolean[] targetPinnings = getPinnings(nodes); // send the request new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, nodes.length, block.getNumBytes(), bytesSent, newGS, checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, (targetPinnings != null && targetPinnings[0]), targetPinnings, nodeStorageIDs[0], nodeStorageIDs); // receive ack for connect BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( PBHelperClient.vintPrefixed(blockReplyStream)); Status pipelineStatus = resp.getStatus(); firstBadLink = resp.getFirstBadLink();
创建输入输出流,通过socket方式发送数据流
send(out, Op.WRITE_BLOCK, proto.build());
如果失败
dfsClient.namenode.abandonBlock
datanode接受数据流
主要逻辑在DataXceiverServer类中run方法
先创建一个线程
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
会选择做什么事,WRITE_BLOCK
op = readOp();
processOp(op)
case WRITE_BLOCK:
opWriteBlock(in);
break;
在opWriteBlock方法中,调用DataXceiver类的writeBlock方法
1.写第一个机器块
1
2setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,peer.getRemoteAddressString(),peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,clientname, srcDataNode, datanode, requestedChecksum,cachingStrategy, allowLazyPersist, pinning, storageId));
getBlockReceiver方法
new BlockReceiver对象,
case PIPELINE_SETUP_CREATE:管道建立阶段
replicaHandler = datanode.data.createRbw(storageType, storageId,
block, allowLazyPersist);
写文件到磁盘
实现类为FsDatasetImpl的recoverRbw方法
2.targets.length > 0找下一个机器
继续写
写完后DataStreamer线程run,
1.获取返回应答 通过initDataStreaming()开启线程
response = new ResponseProcessor(nodes);
response.start();
ResponseProcessor的run方法
ackQueue.removeFirst();
packetSendTime.remove(seqno);
dataQueue.notifyAll();
dataQueue.removeFirst();
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(), Time.monotonicNow());
dataQueue.notifyAll();
3.最终写数据
(DFSPacket)ne.writeTo(blockStream);
block是最大的一个单位,它是最终存储于DataNode上的数据粒度,由dfs.block.size参数决定,默认是64M;注:这个参数由客户端配置决定;
packet是中等的一个单位,它是数据由DFSClient流向DataNode的粒度,以dfs.write.packet.size参数为参考值,默认是64K;注:这个参数为参考值,是指真正在进行数据传输时,会以它为基准进行调整,调整的原因是一个packet有特定的结构,调整的目标是这个packet的大小刚好包含结构中的所有成员,同时也保证写到DataNode后当前block的大小不超过设定值;
chunk是最小的一个单位,它是DFSClient到DataNode数据传输中进行数据校验的粒度,由io.bytes.per.checksum参数决定,默认是512B;注:事实上一个chunk还包含4B的校验值,因而chunk写入packet时是516B;数据与检验值的比值为128:1,所以对于一个128M的block会有一个1M的校验文件与之对应;
最后
以上就是无私小蜜蜂最近收集整理的关于hadoop put流程代码的全部内容,更多相关hadoop内容请搜索靠谱客的其他文章。
发表评论 取消回复