我是靠谱客的博主 踏实菠萝,最近开发中收集的这篇文章主要介绍Adam学习9之对GRCH38的fna文件进行统计操作,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述


第三种方法和第二种方法可以顺利执行,统计出来GRCH38(GCA_000001405.15_GRCh38_full_analysis_set.fna)的信息:

fq0.count:45850077                                                              
Method 3=> Length:321202 sum3:3.209457928E9 time:334096ms                       
Method 2=> Length:321202 sum2:3209457928 time:153120ms     
GCA_000001405.15_GRCh38_full_analysis_set.fna有:

45850077   行

321202条信息(parquet)

总共3209457928个碱基


第三种方法包java对内存溢出

脚本:

hadoop@Master:~/cloud/testByXubo/spark/GRCH38$ cat cluster.sh 
    #!/usr/bin/env bash  
    spark-submit --name readFileFromHs38DHGRCH38All  
--class  com.adam.code.hs38DH.readFileFromHs38DHGRCH38All 
--master  spark://<strong>Master</strong>:7077 
--jars /home/hadoop/cloud/adam/lib/adam-apis_2.10-0.18.3-SNAPSHOT.jar,/home/hadoop/cloud/adam/lib/adam-cli_2.10-0.18.3-SNAPSHOT.jar,/home/hadoop/cloud/adam/lib/adam-core_2.10-0.18.3-SNAPSHOT.jar 
--executor-memory 4096M 
--total-executor-cores 20 readFileFromHs38DHGRCH38All.jar



执行结果:


hadoop@Master:~/cloud/testByXubo/spark/GRCH38$ ./cluster.sh 
fq0.count:45850077                                                              
Method 3=> Length:321202 sum3:3.209457928E9 time:334096ms                       
Method 2=> Length:321202 sum2:3209457928 time:153120ms                          
[Stage 7:=========>                                               (4 + 14) / 25]16/04/17 14:10:39 ERROR TaskSchedulerImpl: Lost executor 2 on Master: Executor heartbeat timed out after 129978 ms
[Stage 7:=========>                                               (4 + 14) / 25]16/04/17 14:11:54 ERROR Utils: Uncaught exception in thread task-result-getter-0
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:65)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
	at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:62)
	at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:92)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:79)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "task-result-getter-0" 16/04/17 14:11:54 ERROR ContextCleaner: Error cleaning broadcast 7
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242)
	at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
	at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
	at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
	at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67)
	at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
	at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
	at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
	at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
	at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
	at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
	at scala.concurrent.Await$.result(package.scala:107)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
	... 12 more
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.scheduler.DirectTaskResult$$anonfun$readExternal$1.apply$mcV$sp(TaskResult.scala:65)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
	at org.apache.spark.scheduler.DirectTaskResult.readExternal(TaskResult.scala:62)
	at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:92)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:79)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
[Stage 7:========================>                               (11 + 12) / 25]16/04/17 14:12:44 ERROR TaskSetManager: Total size of serialized results of 14 tasks (1033.2 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
16/04/17 14:12:44 ERROR TaskSetManager: Total size of serialized results of 15 tasks (1037.8 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
16/04/17 14:12:44 ERROR TaskSetManager: Total size of serialized results of 16 tasks (1086.3 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
16/04/17 14:12:45 ERROR TaskSetManager: Total size of serialized results of 17 tasks (1089.1 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 14 tasks (1033.2 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1921)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:909)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:908)
	at com.adam.code.hs38DH.readFileFromHs38DHGRCH38All$.main(readFileFromHs38DHGRCH38All.scala:66)
	at com.adam.code.hs38DH.readFileFromHs38DHGRCH38All.main(readFileFromHs38DHGRCH38All.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
16/04/17 14:13:01 WARN QueuedThreadPool: 6 threads could not be stopped

代码:

/**
 * @author xubo
 * run in local :out of memory 
 */
package com.adam.code.hs38DH

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.bdgenomics.adam.rdd.ADAMContext
import htsjdk.samtools.ValidationStringency
import org.apache.tools.ant.taskdefs.Length
//import scala.collection.parallel.Foreach

object readFileFromGRCH38 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("ReadFile").setMaster("local")
    val sc = new SparkContext(conf)

    val ac = new ADAMContext(sc)
    //    val file1 = "hdfs://<strong>Master</strong>:9000/xubo/data/GRCH38/bwa/GCA_000001405.15_GRCh38/GCA_000001405.15_GRCh38_full_analysis_set.fna"
    val file1 = "file/adam/hs38DH/hs38DH.fa"
    //load by SparkContext textFile
    val fq0 = sc.textFile(file1)
    //    fq0.foreach(println)
    println("fq0.count:" + fq0.count);

    //load Fasta
    val fq1 = ac.loadFasta(file1, 10000)
    //    println("fq1.partitions:" + fq1.partitions);
    //    println("fq1.partitions length:" + fq1.partitions.length);
    //    println("fq1.count:" + fq1.count);
    //    fq1.foreach(println)

    //method 1
    var startTime = System.currentTimeMillis();
    var fq1Sequence = fq1.map(_.getFragmentSequence()).collect
    val fq1Length = fq1Sequence.length
    //    println(fq1Count);
    var sum = 0L;
    for (i <- 0 until fq1Length) {
      val a = fq1Sequence(i).length()
      sum = sum + a
      //      println(sum + ":" + a);
    }
    var endTime = System.currentTimeMillis();
    println("Method 1=> Length:" + fq1Length + " sum:" + sum + " time:" + (endTime - startTime) + "ms");

    //method 2
    startTime = System.currentTimeMillis();
    var fq2Sequence = fq1.map(_.getFragmentLength()).collect
    val fq2Length = fq2Sequence.length
    //    println("fq2Sequence.count:" + fq2Length);
    var sum2 = 0L;
    //    fq2Sequence.foreach(println)
    for (i <- 0 until fq2Length) {
      sum2 = sum2 + fq2Sequence(i)
      //      println(sum2 + ":" + fq2Sequence(i));
    }
    //    println(sum2);
    endTime = System.currentTimeMillis();
    println("Method 2=> Length:" + fq2Length + " sum2:" + sum2 + " time:" + (endTime - startTime) + "ms");

    startTime = System.currentTimeMillis();
    var fq3Sequence = fq1.map(_.getFragmentLength()).collect
    val fq3Length = fq3Sequence.length
    var sum3 = fq3Sequence.map(a => a.toDouble).sum;

    //    for (i <- 0 until fq3Length) {
    //      sum2 = sum2 + fq3Sequence(i)
    //    }
    endTime = System.currentTimeMillis();
    println("Method 3=> Length:" + fq3Length + " sum3:" + sum3 + " time:" + (endTime - startTime) + "ms");

  }
}


第二次运行丢了executor:

hadoop@Master:~/cloud/testByXubo/spark/GRCH38/test2$ ./cluster.sh 
fq0.count:45850077                                                              
Method 3=> Length:321202 sum3:3.209457928E9 time:236360ms                       
[Stage 5:=============================>                          (13 + 12) / 25]16/04/17 15:11:17 ERROR TaskSchedulerImpl: Lost executor 2 on Master: Executor heartbeat timed out after 144594 ms
[Stage 5:=============================>                             (2 + 2) / 4]16/04/17 15:16:42 ERROR TaskSchedulerImpl: Lost executor 5 on Mcnode6: remote Rpc client disassociated
16/04/17 15:16:43 ERROR TaskSchedulerImpl: Lost executor 3 on Mcnode3: remote Rpc client disassociated
16/04/17 15:16:43 ERROR TaskSchedulerImpl: Lost executor 1 on Mcnode1: remote Rpc client disassociated
16/04/17 15:16:43 ERROR TaskSchedulerImpl: Lost executor 0 on Mcnode2: remote Rpc client disassociated
16/04/17 15:16:51 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED


只运行methods2和3:

hadoop@Master:~/cloud/testByXubo/spark/GRCH38/test2$ ./cluster.sh 
fq0.count:45850077                                                              
Method 3=> Length:321202 sum3:3.209457928E9 time:214426ms                       
Method 2=> Length:321202 sum2:3209457928 time:132446ms                          
16/04/19 14:35:27 WARN QueuedThreadPool: 4 threads could not be stopped

hadoop@Master:~/cloud/testByXubo/spark/GRCH38/test2$ ./cluster.sh 
fq0.count:45850077                                                              
Method 3=> Length:321202 sum3:3.209457928E9 time:209010ms                       
Method 2=> Length:321202 sum2:3209457928 time:132265ms                          
16/04/19 14:47:11 WARN QueuedThreadPool: 6 threads could not be stopped
16/04/19 14:47:13 WARN QueuedThreadPool: 1 threads could not be stopped
hadoop@Master:~/cloud/testByXubo/spark/GRCH38/test2$ ./cluster.sh 
fq0.count:45850077                                                              
Method 3=> Length:321202 sum3:3.209457928E9 time:220026ms                       
Method 2=> Length:321202 sum2:3209457928 time:96890ms 


最后

以上就是踏实菠萝为你收集整理的Adam学习9之对GRCH38的fna文件进行统计操作的全部内容,希望文章能够帮你解决Adam学习9之对GRCH38的fna文件进行统计操作所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(40)

评论列表共有 0 条评论

立即
投稿
返回
顶部