int DataService::close_write_file(CloseFileMessage* message) { CloseFileInfo close_file_info = message->get_close_file_info();
int32_t lease_id = message->get_lease_id(); uint64_t peer_id = message->get_connection()->getPeerId();
TBSYS_LOG( DEBUG, "close write file, blockid: %u, fileid: %" PRI64_PREFIX "u, filenumber: %" PRI64_PREFIX "u, leaseid: %u, from: %sn", close_file_info.block_id_, close_file_info.file_id_,close_file_info.file_number_, lease_id, tbsys::CNetUtil::addrToString(peer_id).c_str());
int32_t write_file_size = 0; //这个过程比较复杂,大致是检查lease是否过期、从临时文件或临时内容中读出数据,写入到真正的block位置上 int ret = data_management_.close_write_file(close_file_info,write_file_size); if (TFS_SUCCESS != ret) { if (EXIT_DATAFILE_EXPIRE_ERROR == ret) { return MessageFactory::send_error_message( message, TBSYS_LOG_LEVEL(ERROR), data_server_info_.id_, "datafile is null(maybe expired). blockid: %u, fileid: %"PRI64_PREFIX "u, filenumber: %" PRI64_PREFIX "u, ret: %d", close_file_info.block_id_, close_file_info.file_id_,close_file_info.file_number_, ret); } else if (EXIT_NO_LOGICBLOCK_ERROR == ret) { return MessageFactory::send_error_message(message,TBSYS_LOG_LEVEL(ERROR), data_server_info_.id_, "close write file failed. block is not exist. blockid: %u, fileid: %" PRI64_PREFIX "u, ret: %d", close_file_info.block_id_, close_file_info.file_id_, ret); } else if (TFS_SUCCESS != ret) { try_add_repair_task(close_file_info.block_id_, ret); if (CLOSE_FILE_SLAVER != close_file_info.mode_) { ds_requester_.req_block_write_complete(close_file_info.block_id_,lease_id, ret); } return MessageFactory::send_error_message( message, TBSYS_LOG_LEVEL(ERROR), data_server_info_.id_, "close write file error. blockid: %u, fileid : %"PRI64_PREFIX "u, filenumber: %" PRI64_PREFIX "u. ret: %d", close_file_info.block_id_, close_file_info.file_id_,close_file_info.file_number_, ret); } }
BlockInfo* blk = NULL; int32_t visit_count = 0; ret = data_management_.get_block_info(close_file_info.block_id_, blk,visit_count); if (TFS_SUCCESS != ret) { return MessageFactory::send_error_message(message,TBSYS_LOG_LEVEL(ERROR), data_server_info_.id_, "close write file failed. block is not exist. blockid: %u, fileid: %" PRI64_PREFIX "u, ret: %d", close_file_info.block_id_, close_file_info.file_id_, ret); }
//if it is master DS. Send to other slave ds //primaryDS将关闭文件的消息发送给其他非primaryDS,执行前面的数据写入操作 if (CLOSE_FILE_SLAVER != close_file_info.mode_) { do_stat(peer_id, write_file_size, write_file_size, 0,AccessStat::WRITE_BYTES);
message->set_mode(CLOSE_FILE_SLAVER); message->set_block(blk);
//这个是异步通信,使用条件等待(cond_.wait())直到收到所有DS发送回来的消息才返回 int send_ret = send_message_to_slave_ds(message, message->get_ds_list()); if (TFS_SUCCESS != send_ret) { // other ds failed, release lease ds_requester_.req_block_write_complete(close_file_info.block_id_,lease_id, TFS_ERROR); return MessageFactory::send_error_message(message,TBSYS_LOG_LEVEL(ERROR), data_server_info_.id_, "close write file to other dataserver fail, blockid: %u, fileid: %" PRI64_PREFIX "u, send_ret: %d", close_file_info.block_id_, close_file_info.file_id_,send_ret); } else { //commit //提交writeCommit消息给NS。NS更新相关元数据信息,回确认消息给primaryDS //再由primaryDS回消息给client,整个写流程结束 //req_block_write_complete给NS发送wirteCommmit消息 int ret_code =ds_requester_.req_block_write_complete(close_file_info.block_id_, lease_id, TFS_SUCCESS); if (TFS_SUCCESS == ret_code) { //sync to mirror int option_flag = message->get_option_flag(); if (0 == (option_flag & TFS_FILE_NO_SYNC_LOG)) { TBSYS_LOG(INFO, " write sync log, blockid: %u, fileid: %"PRI64_PREFIX "u", close_file_info.block_id_, close_file_info.file_id_); //为什么在写完数据之后才写日志??? ret_code = sync_mirror_->write_sync_log(OPLOG_INSERT,close_file_info.block_id_, close_file_info.file_id_); } } //primaryDS给client回写入成功消息 if (TFS_SUCCESS == ret_code) { message->reply_message(new StatusMessage(STATUS_MESSAGE_OK)); TBSYS_LOG(INFO, "write successful. blockid: %u, fileid: %"PRI64_PREFIX "un", close_file_info.block_id_, close_file_info.file_id_); } else { TBSYS_LOG(ERROR, "rep block write complete or write sync log fail, blockid: %u, fileid: %" PRI64_PREFIX "u, ret: %d", close_file_info.block_id_, close_file_info.file_id_,ret_code); message->reply_message(new StatusMessage(STATUS_MESSAGE_ERROR)); } } } //非primaryDS给primaryDS发送写入操作成功的消息 else { TBSYS_LOG(INFO, "slave write successful. blockid: %u, fileid: %"PRI64_PREFIX "un", close_file_info.block_id_, close_file_info.file_id_); //slave will save seqno to prevent from the conflict when this block change to master block BlockInfo* copyblk = message->get_block(); if (NULL != copyblk) { blk->seq_no_ = copyblk->seq_no_; } message->reply_message(new StatusMessage(STATUS_MESSAGE_OK)); }
return TFS_SUCCESS; } |
发表评论 取消回复