Spark Log level
about logging : http://www.lxway.com/898006.htm
In Spark Home/conf/
Set everything to be logged to the console
log4j.rootCategory=INFO, console,FILE
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
Settings to quiet third party logs that are too verbose log4j.logger.org.eclipse.jetty=WARN log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.appender.FILE=org.apache.log4j.DailyRollingFileAppender log4j.appender.FILE.Threshold=DEBUG log4j.appender.FILE.file=/usr/local/src/spark-1.5.1-bin-hadoop2.6/spark.log log4j.appender.FILE.DatePattern='.'yyyy-MM-ddSpark Job的ResultStage的最后一个Task成功执行之后,DAGScheduler.handleTaskCompletion方法会发送SparkListenerJobEnd事件,源码如下:
log4j.appender.FILE.layout=org.apache.log4j.PatternLayout log4j.appender.FILE.layout.ConversionPattern=[%-5p] [%d{yyyy-MM-dd HH:mm:ss}] [%C{1}:%M:%L] %m%n spark log4j.logger.org.apache.spark=DEBUG
http://blog.csdn.net/u012684933/article/details/50378725
val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
attemptId ---
Spark Job的ResultStage的最后一个Task成功执行之后,DAGScheduler.handleTaskCompletion方法会发送SparkListenerJobEnd事件,源码如下:
TaskMetrics: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
ResultStage last Task ------ finished
D.... will send it to SparkListenerJobEnd
"Event":"SparkListenerLogStart"
"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"192.168.1.101","Port":38019},"Maximum Memory":560497950,"Timestamp":1457544469272}
In application_1457516913091_0012_1 file:
Total 10 events
"Event":"SparkListenerEnvironmentUpdate" "Event":"SparkListenerApplicationStart" "Event":"SparkListenerExecutorAdded" 2 "Event":"SparkListenerBlockManagerAdded" 3 "Event":"SparkListenerJobStart" "Event":"SparkListenerStageSubmitted" "Event":"SparkListenerTaskStart" 10 "Event":"SparkListenerTaskEnd" 10 "Event":"SparkListenerStageCompleted" "Event":"SparkListenerJobEnd" "Event":"SparkListenerApplicationEnd"
In SparkListener.scala Total: 17 events extend SparkListernerEvent
SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerBlockManagerAdded, SparkListenerBlockManagerRemoved, SparkListenerBlockUpdated, SparkListenerEnvironmentUpdate, SparkListenerExecutorAdded, SparkListenerExecutorMetricsUpdate, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd, SparkListenerTaskGettingResult, SparkListenerTaskStart, SparkListenerUnpersistRDD
Missing: SparkListenerBlockManagerRemoved SparkListenerBlockUpdated SparkListenerExecutorMetricsUpdate SparkListenerExecutorRemoved
1.SparkListenerBlockManagerAdded
Source code: @DeveloperApi case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long) extends SparkListenerEvent
BlockManagerId.scala: https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
Example: {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"192.168.1.101","Port":38019},"Maximum Memory":560497950,"Timestamp":1457544469272}
In this class: BlockmanagerId are .... @param execId ID of the executor. @param host Host name of the block manager. @param port Port of the block manager.
What is BlockManager:
2.SparkListenerLogStart
Source code: @DeveloperApi case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long) extends SparkListenerEvent
BlockManagerId.scala: https://github.com/apache/spark/blob/v1.6.1/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
Example: {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"192.168.1.101","Port":38019},"Maximum Memory":560497950,"Timestamp":1457544469272}
3.SparkListenerStageSubmitted: @DeveloperApi case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null) extends SparkListenerEvent
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"count at SlowNode.scala:34","Number of Tasks":10,"RDD Info":[{"RDD ID":2,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"2\",\"name\":\"map\"}","Parent IDs":[1],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"flatMap\"}","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.rdd.RDD.count(RDD.scala:1121)\ncom.tolon.spark.SlowNode$.main(SlowNode.scala:34)\ncom.tolon.spark.SlowNode.main(SlowNode.scala)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:497)\norg.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)","Accumulables":[]},"Properties":{}}
StageInfo:
new StageInfo( stage.id, attemptId, stage.name, numTasks.getOrElse(stage.numTasks), rddInfos, stage.parents.map(_.id), stage.details, taskLocalityPreferences) }
properties
4.SparkListenerStageCompleted: case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
Example:{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"count at SlowNode.scala:34","Number of Tasks":10,"RDD Info":[{"RDD ID":2,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"2\",\"name\":\"map\"}","Parent IDs":[1],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"flatMap\"}","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.rdd.RDD.count(RDD.scala:1121)\ncom.tolon.spark.SlowNode$.main(SlowNode.scala:34)\ncom.tolon.spark.SlowNode.main(SlowNode.scala)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl**.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:497)\norg.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)","Submission Time":1457544477629,"Completion Time":1457544549144,"Accumulables":[]}}
Stage Info:
new StageInfo( stage.id, attemptId, stage.name, numTasks.getOrElse(stage.numTasks), rddInfos, stage.parents.map(_.id), stage.details, taskLocalityPreferences) }
5.SparkListenerTaskStart: case class SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo) extends SparkListenerEvent
"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1457544477628,"Executor ID":"1","Host":"Node1.Bing","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Accumulables":[]}}
Task Info:
class TaskInfo( val taskId: Long, val index: Int, val attemptNumber: Int, val launchTime: Long, val executorId: String, val host: String, val taskLocality: TaskLocality.TaskLocality, val speculative: Boolean) { /**
- The time when the task started remotely getting the result. Will not be set if the
- task result was sent immediately when the task finished (as opposed to sending an
IndirectTaskResult and later fetching the result from the block manager). */ var gettingResultTime: Long = 0
/**
- Intermediate updates to accumulables during this task. Note that it is valid for the same
- accumulable to be updated multiple times in a single task or for two accumulables with the
same name but different IDs to exist in a task. */ val accumulables = ListBufferAccumulableInfo
/**
- The time when the task has completed successfully (including the time to remotely fetch
results, if necessary). */ var finishTime: Long = 0
var failed = false
private[spark] def markGettingResult(time: Long = System.currentTimeMillis) { gettingResultTime = time }
private[spark] def markSuccessful(time: Long = System.currentTimeMillis) { finishTime = time }
private[spark] def markFailed(time: Long = System.currentTimeMillis) { finishTime = time failed = true }
def gettingResult: Boolean = gettingResultTime != 0
def finished: Boolean = finishTime != 0
def successful: Boolean = finished && !failed
def running: Boolean = !finished
def status: String = { if (running) { if (gettingResult) { "GET RESULT" } else { "RUNNING" } } else if (failed) { "FAILED" } else if (successful) { "SUCCESS" } else { "UNKNOWN" } }
@deprecated("Use attemptNumber", "1.6.0") def attempt: Int = attemptNumber
def id: String = s"$index.$attemptNumber"
def duration: Long = { if (!finished) { throw new UnsupportedOperationException("duration() called on unfinished task") } else { finishTime - launchTime } }
private[spark] def timeRunning(currentTime: Long): Long = currentTime - launchTime }
6.SparkListenerTaskGettingResult: @DeveloperApi case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
7.SparkListenerJobStart:
case class SparkListenerJobStart( jobId: Int, time: Long, stageInfos: Seq[StageInfo], properties: Properties = null) extends SparkListenerEvent { // Note: this is here for backwards-compatibility with older versions of this event which // only stored stageIds and not StageInfos: val stageIds: Seq[Int] = stageInfos.map(_.stageId) }
"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1457544476853,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"count at SlowNode.scala:34","Number of Tasks":10,"RDD Info":[{"RDD ID":2,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"2\",\"name\":\"map\"}","Parent IDs":[1],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"flatMap\"}","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.rdd.RDD.count(RDD.scala:1121)\ncom.tolon.spark.SlowNode$.main(SlowNode.scala:34)\ncom.tolon.spark.SlowNode.main(SlowNode.scala)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\nsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\nsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\njava.lang.reflect.Method.invoke(Method.java:497)\norg.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525)","Accumulables":[]}],"Stage IDs":[0],"Properties":{}}
8.SparkListenerJobEnd
case class SparkListenerJobEnd( jobId: Int, time: Long, jobResult: JobResult) extends SparkListenerEven
Example {"Event":"SparkListenerJobEnd","Job ID":0,"Completion Time":1457544549163,"Job Result":{"Result":"JobSucceeded"}}
9.SparkListenerEnvironmentUpdate @DeveloperApi case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]]) extends SparkListenerEvent
Example
{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/local/src/jdk1.8.0_65/jre","Java Version":"1.8.0_65 (Oracle Corporation)","Scala Version":"version 2.10.4"},"Spark Properties":{"spark.serializer":"org.apache.spark.serializer.KryoSerializer","spark.executor.extraJavaOptions":"-XX:+PrintGCDetails -Dkey=value -Dnumbers=\"one two three\"","spark.driver.host":"192.168.1.101","spark.eventLog.enabled":"true","spark.ui.port":"0","spark.driver.port":"43786","spark.yarn.app.id":"application_1457516913091_0012","spark.app.name":"My Spark Check App!","spark.scheduler.mode":"FIFO","spark.driver.memory":"1g","spark.executor.instances":"2","spark.executor.id":"driver","spark.yarn.app.attemptId":"1","spark.yarn.app.container.log.dir":"/usr/local/src/hadoop......
10.SparkListenerBlockManagerRemoved @DeveloperApi case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId) extends SparkListenerEvent
11.SparkListenerUnpersistRDD @DeveloperApi case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
12.SparkListenerExecutorAdded @DeveloperApi case class SparkListenerExecutorAdded(time: Long, executorId: String, executorInfo: ExecutorInfo) extends SparkListenerEvent
Example: {"Event":"SparkListenerExecutorAdded","Timestamp":1457544475740,"Executor ID":"1","Executor Info":{"Host":"Node1.Bing","Total Cores":1,"Log Urls":{"stdout":"http://Node1.Bing:8042/node/containerlogs/container_1457516913091_0012_01_000002/root/stdout?start=-4096","stderr":"http://Node1.Bing:8042/node/containerlogs/container_1457516913091_0012_01_000002/root/stderr?start=-4096"}}}
13.SparkListenerExecutorRemoved @DeveloperApi case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String) extends SparkListenerEvent
14.SparkListenerBlockUpdated @DeveloperApi case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends SparkListenerEvent
/**
- Periodic updates from executors.
- @param execId executor id
- @param taskMetrics sequence of (task id, stage id, stage attempt, metrics) */
15. SparkListenerExecutorMetricsUpdate @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, taskMetrics: Seq[(Long, Int, Int, TaskMetrics)]) extends SparkListenerEvent
16.SparkListenerApplicationStart case class SparkListenerApplicationStart( appName: String, appId: Option[String], time: Long, sparkUser: String, appAttemptId: Option[String], driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent
Example: {"Event":"SparkListenerApplicationStart","App Name":"My Spark Check App!","App ID":"application_1457516913091_0012","Timestamp":1457544467153,"User":"root","App Attempt ID":"1","Driver Logs":{"stderr":"http://Node1.Bing:8042/node/containerlogs/container_1457516913091_0012_01_000001/root/stderr?start=-4096","stdout":"http://Node1.Bing:8042/node/containerlogs/container_1457516913091_0012_01_000001/root/stdout?start=-4096"}}
17.SparkListenerApplicationEnd** @DeveloperApi case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
Example "Event":"SparkListenerApplicationEnd","Timestamp":1457544549187}