我是靠谱客的博主 无私小蜜蜂,最近开发中收集的这篇文章主要介绍hadoop put流程代码,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

hadoop fs -put xxxx

解析命令

FsShell类的main方法进入
创建实例FsShell shell = newShellInstance();
ToolRunner.run(shell, argv)
进入FsShell的run方法

先init方法,主要是commandFactory = new CommandFactory(getConf())

Command instance = commandFactory.getInstance(cmd);
instance.run(argv)
根据命令判断用Command 的那个实现类。模板模式。
判断出来的是 CopyCommands.Put 类

Command 的run方法主要调用两个方法

 public int run(String...argv) {
LinkedList<String> args = new LinkedList<String>(Arrays.asList(argv));
try {
if (isDeprecated()) {
displayWarning(
"DEPRECATED: Please use '"+ getReplacementCommand() + "' instead.");
}
//参数的预处理,在这里会把参数中的一些参数给剥离出来
processOptions(args);
processRawArguments(args);
} catch (CommandInterruptException e) {
displayError("Interrupted");
return 130;
} catch (IOException e) {
displayError(e);
}
return (numErrors == 0) ? exitCode : exitCodeForError();
}

processOptions是个抽象方法,为CopyCommands.Put 类中的方法

protected void processOptions(LinkedList<String> args) throws IOException {
CommandFormat cf =
new CommandFormat(1, Integer.MAX_VALUE, "f", "p", "l", "d");
cf.parse(args);
setOverwrite(cf.getOpt("f"));
setPreserve(cf.getOpt("p"));
setLazyPersist(cf.getOpt("l"));
setDirectWrite(cf.getOpt("d"));
getRemoteDestination(args);
// should have a -r option
setRecursive(true);
}

CopyCommands.Put 类重写了processRawArguments里面调用processArguments方法
经过几轮调用到CommandWithDestination类的processPath


protected void processPath(PathData src, PathData dst) throws IOException {
if (src.stat.isSymlink()) {
// TODO: remove when FileContext is supported, this needs to either
// copy the symlink or deref the symlink
throw new PathOperationException(src.toString());
} else if (src.stat.isFile()) {
copyFileToTarget(src, dst);
} else if (src.stat.isDirectory() && !isRecursive()) {
throw new PathIsDirectoryException(src.toString());
}
}

copyFileToTarget(src, dst)这个方法是上传文件的核心

namenode判断是否存在目录,创建目录,生成块的元数据信息

TargetFileSystem类的writeStreamToFile
进过一系列调create方法,中间经过DistributedFileSystem类,最终到DFSOutputStream的newStreamForCreate方法

stat = dfsClient.namenode.create(src, masked,
dfsClient.clientName,
new EnumSetWritable<>(flag),
createParent,
replication,
blockSize,
SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);

调用NameNodeRpcServer类的create方法,使用RPC调用远程服务端的方法
核心方法为


try {
PermissionStatus perm = new PermissionStatus(getRemoteUser()
.getShortUserName(), null, masked);
status = namesystem.startFile(src, perm, clientName, clientMachine,
flag.get(), createParent, replication, blockSize, supportedVersions,
ecPolicyName, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, status != null, status);
}

进入FSNamesystem类的startFileInt方法
先后调用 FSDirWriteFileOp类的startFile,addFile,newINodeFile->(FSDirectory)fsd.addINode->addLastINode
FSDirWriteFileOp.startFile后 fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry)写fseditLog

创建输出流

回到DFSOutputStream的newStreamForCreate方法,create结束后,创建输出流

 if(stat.getErasureCodingPolicy() != null) {
out = new DFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes, true);
}
out.start();

new DFSOutputStream中调用computePacketChunkSize方法

out.start方法,调用一个DataStreamer线程run。
开始里面有空的dataQueue在等待。

数据流发送到datanode

从create一层层出来,到TargetFileSystem类


void writeStreamToFile(InputStream in, PathData target,
boolean lazyPersist, boolean direct)
throws IOException {
FSDataOutputStream out = null;
try {
out = create(target, lazyPersist, direct);
IOUtils.copyBytes(in, out, getConf(), true);
} finally {
IOUtils.closeStream(out); // just in case copyBytes didn't
}
}

IOUtils.copyBytes为写文件方法,写完后_copy也就重命名为正常的文件夹名。
->FSOutputSummer.write-> FSOutputSummer.flushBuffer()->FSOutputSummer.writeChecksumChunks->DFSOutputStream.writeChunk


protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
writeChunkPrepare(len, ckoff, cklen);
currentPacket.writeChecksum(checksum, ckoff, cklen);
currentPacket.writeData(b, offset, len);
currentPacket.incNumChunks();
getStreamer().incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
}

enqueueCurrentPacketFull()方法


synchronized void enqueueCurrentPacketFull() throws IOException {
LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+ " appendChunk={}, {}", currentPacket, src, getStreamer()
.getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
getStreamer());
enqueueCurrentPacket();
adjustChunkBoundary();
endBlock();
}

enqueueCurrentPacket调用了DataStreamer的waitAndQueuePacket方法。
如果队列不满调用queuePacket方法


void queuePacket(DFSPacket packet) {
synchronized (dataQueue) {
if (packet == null) return;
packet.addTraceParent(Tracer.getCurrentSpanId());
dataQueue.addLast(packet);
lastQueuedSeqno = packet.getSeqno();
LOG.debug("Queued {}, {}", packet, this);
dataQueue.notifyAll();
}
}

加到队列中,并通知
DataStreamer线程run这时就会唤醒,
调用dataQueue.getFirst()获取第一个数据包,续通过setPipeline(nextBlockOutputStream())创建管道。
nextBlockOutputStream获取块信息

1.调用 locateFollowingBlock方法->
DFSOutputStream.addBlock
里面开始和服务端通信dfsClient.namenode.addBlock
里面一直调用namesystem.getAdditionalBlock->FSDirWriteFileOp.chooseTargetForNewBlock->blockplacement.chooseTarget->BlockPlacementPolicy.chooseTarget->BlockPlacementPolicyDefault.chooseTarget->chooseTargetInOrder进行机架感知,选择机架
2.调用createBlockOutputStream方法


OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
blockReplyStream = new DataInputStream(unbufIn);
//
// Xmit header info to datanode
//
BlockConstructionStage bcs = recoveryFlag ?
stage.getRecoveryStage() : stage;
// We cannot change the block length in 'block' as it counts the number
// of bytes ack'ed.
ExtendedBlock blockCopy = block.getCurrentBlock();
blockCopy.setNumBytes(stat.getBlockSize());
boolean[] targetPinnings = getPinnings(nodes);
// send the request
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
(targetPinnings != null && targetPinnings[0]), targetPinnings,
nodeStorageIDs[0], nodeStorageIDs);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(blockReplyStream));
Status pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();

创建输入输出流,通过socket方式发送数据流
send(out, Op.WRITE_BLOCK, proto.build());
如果失败
dfsClient.namenode.abandonBlock

datanode接受数据流

主要逻辑在DataXceiverServer类中run方法
先创建一个线程
new Daemon(datanode.threadGroup,
DataXceiver.create(peer, datanode, this))
.start();
会选择做什么事,WRITE_BLOCK
op = readOp();
processOp(op)

case WRITE_BLOCK:
opWriteBlock(in);
break;
在opWriteBlock方法中,调用DataXceiver类的writeBlock方法
1.写第一个机器块

setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,peer.getRemoteAddressString(),peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,clientname, srcDataNode, datanode, requestedChecksum,cachingStrategy, allowLazyPersist, pinning, storageId));

getBlockReceiver方法
new BlockReceiver对象,
case PIPELINE_SETUP_CREATE:管道建立阶段
replicaHandler = datanode.data.createRbw(storageType, storageId,
block, allowLazyPersist);
写文件到磁盘
实现类为FsDatasetImpl的recoverRbw方法
2.targets.length > 0找下一个机器
继续写

写完后DataStreamer线程run,
1.获取返回应答 通过initDataStreaming()开启线程
response = new ResponseProcessor(nodes);
response.start();

ResponseProcessor的run方法
ackQueue.removeFirst();
packetSendTime.remove(seqno);
dataQueue.notifyAll();

dataQueue.removeFirst();
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(), Time.monotonicNow());
dataQueue.notifyAll();
3.最终写数据
(DFSPacket)ne.writeTo(blockStream);

block是最大的一个单位,它是最终存储于DataNode上的数据粒度,由dfs.block.size参数决定,默认是64M;注:这个参数由客户端配置决定;
packet是中等的一个单位,它是数据由DFSClient流向DataNode的粒度,以dfs.write.packet.size参数为参考值,默认是64K;注:这个参数为参考值,是指真正在进行数据传输时,会以它为基准进行调整,调整的原因是一个packet有特定的结构,调整的目标是这个packet的大小刚好包含结构中的所有成员,同时也保证写到DataNode后当前block的大小不超过设定值;
chunk是最小的一个单位,它是DFSClient到DataNode数据传输中进行数据校验的粒度,由io.bytes.per.checksum参数决定,默认是512B;注:事实上一个chunk还包含4B的校验值,因而chunk写入packet时是516B;数据与检验值的比值为128:1,所以对于一个128M的block会有一个1M的校验文件与之对应;

最后

以上就是无私小蜜蜂为你收集整理的hadoop put流程代码的全部内容,希望文章能够帮你解决hadoop put流程代码所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(55)

评论列表共有 0 条评论

立即
投稿
返回
顶部