我是靠谱客的博主 动人蜻蜓,最近开发中收集的这篇文章主要介绍本地文件上传到HDFS,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

 目录

一、文件上传需求

二、具体代码实现

1.1 hdfsETL.json文件

1.2 定义UploadHDFSProject

1.3 定义UploadHdfsETLJson类

1.4 定义RegexLocalPathFilter类

1.5  定义DataCollect主类


一、文件上传需求

按照配置文件的内容解析要上传文件的路径,HDFS的目标路径,文件名格式匹配,按照不同的文件名上传到不同的HDFS目录,HDFS按照业务分目录存储文件

二、具体代码实现

1.1 hdfsETL.json文件

{
  "hdfsProjectList":[
    {
      "projectName":"data_collection",
      "localSrcPath":"E:/hadoop/software/apache-flume-1.9.0-bin/400Data_tmp/",
      "hdfsTargetPath":"hdfs:///hadoop/org/400backup/hour/%Y/%m/%d",
      "fileRegex":"^.*summary_sixty.*txt$"
    },
    {
      "projectName":"ETL_collection",
      "localSrcPath":"E:/hadoop/software/apache-flume-1.9.0-bin/400Data_tmp/",
      "hdfsTargetPath":"hdfs:///hadoop/org/400backup/day/%Y/%m/%d",
      "fileRegex":"^.*summary_day.*txt$"
    }
  ]
}

1.2 定义UploadHDFSProject

将JSON数组中的元数据信息定义为类

package com.kangna.bean;

/*******************************************************************
 *@auther kangll
 *@date 2021/6/26 14:50                                  
 *@desc 定义上传文件信息,因为会存在多个路不同项目本地文件的上传
 ******************************************************************/
public class UploadHDFSProject {

    /* 上传文件的业务名 */
    private String projectName;

    /* 本地文件路径 */
    private String localSrcPath;

    /* HDFS路径 */
    private String hdfsTargetPath;

    /* 文件名 */
    private String fileRegex;

    public String getProjectName() {
        return projectName;
    }

    public void setProjectName(String projectName) {
        this.projectName = projectName;
    }

    public String getLocalSrcPath() {
        return localSrcPath;
    }

    public void setLocalSrcPath(String localSrcPath) {
        this.localSrcPath = localSrcPath;
    }

    public String getHdfsTargetPath() {
        return hdfsTargetPath;
    }

    public void setHdfsTargetPath(String hdfsTargetPath) {
        this.hdfsTargetPath = hdfsTargetPath;
    }

    public String getFileRegex() {
        return fileRegex;
    }

    public void setFileRegex(String fileRegex) {
        this.fileRegex = fileRegex;
    }


    @Override
    public String toString() {
        return "UploadHDFSProject{" +
                "projectName='" + projectName + ''' +
                ", localSrcPath='" + localSrcPath + ''' +
                ", hdfsTargetPath='" + hdfsTargetPath + ''' +
                ", fileRegex='" + fileRegex + ''' +
                '}';
    }
}

1.3 定义UploadHdfsETLJson类

将hdfsETL.json配置文件解析出来的JSON数组定义为List

package com.kangna.bean;

import java.util.List;

/*********************************************************************
 *@auther kangll
 *@date 2021/6/26 17:06                                  
 *@desc 解析的 hdfsETL.json 封装到 List中
 ********************************************************************/
public class UploadHdfsETLJson {

    /* 解析的 hdfsETL.json 封装到 List中 */
    private List<UploadHDFSProject> hdfsProjectList;

    public List<UploadHDFSProject> getHdfsProjectList() {
        return hdfsProjectList;
    }

    public void setHdfsProjectList(List<UploadHDFSProject> hdfsProjectList) {
        this.hdfsProjectList = hdfsProjectList;
    }

    @Override
    public String toString() {
        return "UploadHdfsETLJson{" +
                "hdfsProjectList=" + hdfsProjectList +
                '}';
    }
}

1.4 定义RegexLocalPathFilter类

定义RegexLocalPathFilter类实现FileFilter接口实现文件过滤

package com.kangna.hdfs;

import java.io.File;
import java.io.FileFilter;

/*********************************************************************
 *@auther kangll
 *@date 2021/6/26 17:59                                  
 *@desc 本地文件过滤
 ********************************************************************/
public class RegexLocalPathFilter implements FileFilter {

    private final String regex;

    public RegexLocalPathFilter(String regex) {
        this.regex = regex;
    }

    /**
     * 正则过滤要上传到HDFS的本地文件
     * @param pathname
     * @return
     */
    public boolean accept(File pathname) {
        String name = pathname.getName();
        return !name.endsWith(".tmp") && name.toLowerCase().matches(regex);
    }
}

1.5  定义DataCollect主类

​
package com.kangna.hdfs;

import com.alibaba.fastjson.JSON;
import com.kangna.bean.UploadHDFSProject;
import com.kangna.bean.UploadHdfsETLJson;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/*********************************************************************
 *@auther kangll
 *@date 2021/6/26 17:15                                  
 *@desc 本地数据上传到 HDFS
 ********************************************************************/
public class DataCollectionFailoverUpload {

    // 配置信息
    static String CONFIG_FILE = "hdfsETL.JSON";

    // hdfsETL.JSON 中的HDFS日期格式化
    static String TAR_REGEX = "%(\w)";
    static final Pattern tagPattern = Pattern.compile(TAR_REGEX);

    /**
     * 解析JSON配置文件
     *
     * @return
     */
    public static UploadHdfsETLJson parseJSON() {
        UploadHdfsETLJson hdfsETLJson = null;
        try {
            InputStream fileInputStream = ClassLoader.getSystemResourceAsStream(CONFIG_FILE);
            String text = IOUtils.toString(fileInputStream, "utf8");
            hdfsETLJson = JSON.parseObject(text, UploadHdfsETLJson.class);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return hdfsETLJson;
    }

    /**
     * 配置文件中的日期格式化
     *
     * @param hdfsTragetPathStr HDFS路径
     * @return
     */
    public static String formatDate(String hdfsTragetPathStr) {
        Matcher matcher = tagPattern.matcher(hdfsTragetPathStr);
        Date date = new Date();
        while (matcher.find()) {
            char c = matcher.group(1).charAt(0);
            String formatString = "";
            switch (c) {
                case 'd':
                    formatString = "dd";
                    break;
                case 'H':
                    formatString = "HH";
                    break;
                case 'm':
                    formatString = "MM";
                    break;
                case 'Y':
                    formatString = "yyyy";
                    break;
                case 'M':
                    formatString = "mm";
                    break;
                case 'S':
                    formatString = "ss";
                    break;
                default:
                    break;
            }
            SimpleDateFormat dateFormat = new SimpleDateFormat(formatString);
            String format = dateFormat.format(date);
            // DHFS 路径中 “%” 替换
            hdfsTragetPathStr = hdfsTragetPathStr.replaceAll("%" + c, format);
        }
        return hdfsTragetPathStr;
    }

    /**
     * 本地文件过滤
     *
     * @param localSrcPath
     * @param regex
     * @return
     */
    public static List<Path> localListStatus(String localSrcPath, RegexLocalPathFilter regex) {
        List<Path> files = new ArrayList<>();
        File file = new File(localSrcPath);
        if (!file.exists()) {
            System.out.println("源文件夹中没有新增文件!");
            System.exit(0);
        }
        File[] listFiles = file.listFiles(regex);
        for (int i = 0; i < listFiles.length - 1; i++) {
            files.add(new Path(listFiles[i].getPath()));
        }
        return files;
    }

    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);

        // 获取 hdfsETL.json配置
        UploadHdfsETLJson uploadHdfsETLJson = parseJSON();
        List<UploadHDFSProject> hdfsProjectList = uploadHdfsETLJson.getHdfsProjectList();
        for (UploadHDFSProject hp : hdfsProjectList) {
            hp.setHdfsTargetPath(formatDate(hp.getHdfsTargetPath()));
            // 获取指定目录下的指定文件
            RegexLocalPathFilter regexLocalPathFilter = new RegexLocalPathFilter(hp.getFileRegex().toLowerCase());
            // 获取文件
            final List<Path> paths = localListStatus(hp.getLocalSrcPath(), regexLocalPathFilter);
            // 日期文件夹不存在则创建
            if (!fs.exists(new Path(hp.getHdfsTargetPath()))) {
                fs.mkdirs(new Path(hp.getHdfsTargetPath()));
            }

            for (Path localFile : paths) {
                fs.copyFromLocalFile(localFile, new Path(hp.getHdfsTargetPath()));
            }
        }
    }
}

测试结果

 

最后

以上就是动人蜻蜓为你收集整理的本地文件上传到HDFS的全部内容,希望文章能够帮你解决本地文件上传到HDFS所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(47)

评论列表共有 0 条评论

立即
投稿
返回
顶部