概述
上篇博客说了下mahout的phase参数的含义,但是没有涉及到具体的应用,上次也只是说了一下这两个参数,startPhase和endPhase的好处:当运行RecommenderJob时前面的两个phase都运行成功了,但是后面的运行出错,那么是否要继续从第一个phase开始运行呢,其实完全没有必要,可以设置startPhase和endPhase这两个参数,直接跳过前面两个Phase。的确是,这个就是它的用处吧(或许还有其他用处?)下面就针对RecommenderJob来进行分析吧
看下图:
这个图针对RecommenderJob进行了分析,第二行的1,2,。。表示当前的Phase包含的MR数量;第三行是每个Phase运行后产生的文件及文件目录。下面说下我的跳过Phase的想法:由上面的图可以看出,每个Phase的输出目录文件是不一样的,所以我采用的做法是运行一遍RecommenderJob,如果有错,那么就去检测每个Phase的输出目录,如果里面有文件,那么就说明这个Phase是正常运行的,反之亦然;
下面上代码:
PhaseUtils.java:
package org.fansy.date1203.mahout.phase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.mahout.common.HadoopUtil;
public class PhaseUtils {
/*
* 常量
*/
public static final String TEMP_PATH="hdfs://fansypc:9000/user/botwave/";
public static final String TEMP=System.currentTimeMillis()+"/cf/";
public static final String CF_TEMP_DIR=TEMP_PATH+TEMP;
/*
* files should be checked in phase one
*/
public static final String PHASE_ONE_1=CF_TEMP_DIR+"/"+"preparePreforenceMatrix/itemIDIndex/part-r-00000";
public static final String PHASE_ONE_2=CF_TEMP_DIR+"/"+"preparePreforenceMatrix/numUsers.bin";
public static final String PHASE_ONE_3=CF_TEMP_DIR+"/"+"preparePreforenceMatrix/ratingMatrix/part-r-00000";
public static final String PHASE_ONE_4=CF_TEMP_DIR+"/"+"preparePreforenceMatrix/userVectors/part-r-00000";
/*
* files should be deleted in phase one
*/
public static final String PHASE_ONE_1_D=CF_TEMP_DIR+"/"+"preparePreforenceMatrix/itemIDIndex/_SUCCESS";
public static final String PHASE_ONE_2_D=CF_TEMP_DIR+"/"+"preparePreforenceMatrix/ratingMatrix/_SUCCESS";
public static final String PHASE_ONE_3_D=CF_TEMP_DIR+"/"+"preparePreforenceMatrix/userVectors/_SUCCESS";
/*
* files should be checked in phase two
*/
public static final String PHASE_TWO_1=CF_TEMP_DIR+"/"+"paiwiseSimilarity/part-r-00000";
public static final String PHASE_TWO_2=CF_TEMP_DIR+"/"+"maxValues.bin";
public static final String PHASE_TWO_3=CF_TEMP_DIR+"/"+"normas.bin";
public static final String PHASE_TWO_4=CF_TEMP_DIR+"/"+"numNonZeroEntries.bin";
public static final String PHASE_TWO_5=CF_TEMP_DIR+"/"+"similarityMatrix/part-r-00000";
public static final String PHASE_TWO_6=CF_TEMP_DIR+"/"+"weights/part-r-00000";
/*
* files should be delete in phase two
*/
public static final String PHASE_TWO_1_D=CF_TEMP_DIR+"/"+"similarityMatrix/_SUCCESS";
public static final String PHASE_TWO_2_D=CF_TEMP_DIR+"/"+"weights/_SUCCESS";
public static final String PHASE_TWO_3_D=CF_TEMP_DIR+"/"+"paiwiseSimilarity/_SUCCESS";
/*
* files should be checked in phase three
*/
public static final String PHASE_THREE_1=CF_TEMP_DIR+"/"+"partialMultiply/part-r-00000";
public static final String PHASE_THREE_2=CF_TEMP_DIR+"/"+"partialMultiply1/part-r-00000";
public static final String PHASE_THREE_3=CF_TEMP_DIR+"/"+"partialMultiply2/part-r-00000";
/*
* files should be deleted in phase three
*/
public static final String PHASE_THREE_1_D=CF_TEMP_DIR+"/"+"partialMultiply/_SUCCESS";
public static final String PHASE_THREE_2_D=CF_TEMP_DIR+"/"+"partialMultiply1/_SUCCESS";
public static final String PHASE_THREE_3_D=CF_TEMP_DIR+"/"+"partialMultiply2/_SUCCESS";
/*
* 查看文件是否存在
*/
public static boolean fileExists(String file){
boolean flag=false;
Configuration conf=new Configuration();
Path path=new Path(file);
FileSystem fs=null;
try {
fs=FileSystem.get(path.toUri(), conf);
} catch (IOException e) {
e.printStackTrace();
}
try {
flag=fs.exists(path);
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}
/*
* 删除文件
*/
public static boolean deleteFile(String file){
boolean flag=true;
try {
HadoopUtil.delete(new Configuration(), new Path(file));
} catch (IOException e) {
// TODO Auto-generated catch block
flag=false;
e.printStackTrace();
}
return flag;
}
/*
* check files in phase one
*/
public static boolean checkFilesInPhaseOne(){
int temp=0;
if(fileExists(PHASE_ONE_1)){
temp++;
}
if(fileExists(PHASE_ONE_2)){
temp++;
}
if(fileExists(PHASE_ONE_3)){
temp++;
}
if(fileExists(PHASE_ONE_4)){
temp++;
}
if(temp==4){
return true;
}else{
return false;
}
}
/*
* delete files in phase one
*/
public static void deleteFilesInPhaseOne(){
deleteFile(PHASE_ONE_1_D);
deleteFile(PHASE_ONE_2_D);
deleteFile(PHASE_ONE_3_D);
}
/*
* check files in phase two
*/
public static boolean checkFilesInPhaseTwo(){
int temp=0;
if(fileExists(PHASE_TWO_1)){
temp++;
}
if(fileExists(PHASE_TWO_2)){
temp++;
}
if(fileExists(PHASE_TWO_3)){
temp++;
}
if(fileExists(PHASE_TWO_4)){
temp++;
}
if(fileExists(PHASE_TWO_5)){
temp++;
}
if(fileExists(PHASE_TWO_6)){
temp++;
}
if(temp==6){
return true;
}else{
return false;
}
}
/*
* delete files in phase two
*/
public static void deleteFilesInPhaseTwo(){
deleteFile(PHASE_TWO_1_D);
deleteFile(PHASE_TWO_2_D);
deleteFile(PHASE_TWO_3_D);
}
/*
* check files in phase three
*/
public static boolean checkFilesInPhaseThree(){
int temp=0;
if(fileExists(PHASE_THREE_1)){
temp++;
}
if(fileExists(PHASE_THREE_2)){
temp++;
}
if(fileExists(PHASE_THREE_3)){
temp++;
}
if(temp==3){
return true;
}else{
return false;
}
}
/*
* delete files in phase three
*/
public static void deleteFilesInPhaseThree(){
deleteFile(PHASE_THREE_1_D);
deleteFile(PHASE_THREE_3_D);
deleteFile(PHASE_THREE_3_D);
}
}
PhaseCheckDriver.java:
package org.fansy.date1203.mahout.phase;
import static org.fansy.date1203.mahout.phase.PhaseUtils.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
public class PhaseCheckDriver {
/**
* 原地运行任务
* @throws Exception
*/
public static void main(String[] args) throws Exception {
if(args.length!=2){
System.out.print("Usage: <input><output>");
System.exit(-1);
}
String arg="--input "+TEMP_PATH+args[0]+" --output "+TEMP_PATH+args[1]+
" --booleanData false --similarityClassname SIMILARITY_COOCCURRENCE "+
"--startPhase 0 --tempDir "+CF_TEMP_DIR;
String[] tempArg=arg.split(" ");
int temp=ToolRunner.run(new Configuration(), new RecommenderJob(),tempArg);
if(temp==0){
System.exit(0); // 程序正常退出
}
int times=3; // 继续运行任务三次,然后退出
boolean flag_phase1=false;
boolean flag_phase2=false;
boolean flag_phase3=false;
boolean flag_phase4=false;
int startPhase=0;
while(times-->=0){
flag_phase1=PhaseUtils.checkFilesInPhaseOne();
if(flag_phase1){
flag_phase2=PhaseUtils.checkFilesInPhaseTwo();
startPhase++;
}
if(flag_phase2){
flag_phase3=PhaseUtils.checkFilesInPhaseThree();
startPhase++;
}
if(flag_phase3){
flag_phase4=PhaseUtils.fileExists(TEMP_PATH+args[1]);
startPhase++;
}
if(flag_phase4){
startPhase++;
}
if(startPhase>=4){
break; // 说明4个phase运行成功,退出程序
}
// 否则的话,删除_SUCCESS文件,调整参数继续运行
switch(startPhase){
case 1:
PhaseUtils.deleteFilesInPhaseOne();
break;
case 2:
PhaseUtils.deleteFilesInPhaseOne();
PhaseUtils.deleteFilesInPhaseTwo();
break;
case 3:
PhaseUtils.deleteFilesInPhaseOne();
PhaseUtils.deleteFilesInPhaseTwo();
break;
}
//
arg="--input "+TEMP_PATH+args[0]+" --output "+TEMP_PATH+args[1]+
" --booleanData false --similarityClassname SIMILARITY_COOCCURRENCE "+
"--startPhase "+startPhase+" --tempDir "+CF_TEMP_DIR;
temp=ToolRunner.run(new Configuration(), new RecommenderJob(),tempArg);
if(temp==0){
System.exit(0); // 程序正常退出
}
}
}
}
说明:上面的代码中的删除_SUCCESS文件是必须的,这个和我的测试方法有关,先说下我的测试方法吧。
我先运行了一遍RecommenderJob(操作A),然后屏蔽PhaseCheckDriver里面的第一个RecommenderJob,同时删除操作A的phase2产生的文件,然后运行PhaseCheckDriver,看是否有跳过并且是否出来最后的结果;
因为操作A 是成功的,所以在每个目录里会有_SUCCESS文件,当有这个文件的时候,直接使用startPhase=1跳过第phase0会显示出错,所以我在程序里面用代码删除了这个文件。
注: 上面的代码还未测试过;
分享,成长,快乐
最后
以上就是朴实西牛为你收集整理的mahout phase 应用之 RecommenderJob的全部内容,希望文章能够帮你解决mahout phase 应用之 RecommenderJob所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复