Spark SQL作业查询API
1.需求
查询当前用户的所有Spark SQL的执行历史情况。
技术实现:
第一步.首先查询yarn app中,当前用户的 yarn app(name=spark JDBCServer,name=当前用户),得到该app的tracking url,就是spakui或sparkhistory ui:
第二步.从该ui的API获取作业:
2. WEB UI
YARN app:
运行中的App,Tracking UIhttp://slave5.cluster.local:8088/proxy/application_1541409376608_0046/:
已结束的app historyhttp://slave5.cluster.local:18081/history/application_1541409376608_0044/jobs/:
基本类似。
3.YARN API
查看app:
curlhttp://slave5.cluster.local:8088/ws/v1/cluster/apps?user=hdfs&name="ThriftJDBC/ODBC Server"
name参数不支持:
* state [deprecated] -state of the application
* states - applicationsmatching the given application states, specified as a comma-separated list.
* finalStatus - the finalstatus of the application - reported by the application itself
* user - user name
* queue - queue name
* limit - total number ofapp objects to be returned
* startedTimeBegin -applications with start time beginning with this time, specified in ms sinceepoch
* startedTimeEnd -applications with start time ending with this time, specified in ms since epoch
* finishedTimeBegin -applications with finish time beginning with this time, specified in ms sinceepoch
* finishedTimeEnd -applications with finish time ending with this time, specified in ms sinceepoch
* applicationTypes -applications matching the given application types, specified as a comma-separatedlist.
* applicationTags -applications matching any of the given application tags, specified as acomma-separated list.
* deSelects - a genericfields which will be skipped in the result.
而应用的类型也是在代码中写死的:https://github.com/apache/spark/blob/v2.4.0/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
defcreateApplicationSubmissionContext(
newApp:YarnClientApplication,
containerContext:ContainerLaunchContext): ApplicationSubmissionContext = {
val appContext =newApp.getApplicationSubmissionContext
appContext.setApplicationName(sparkConf.get("spark.app.name","Spark"))
appContext.setQueue(sparkConf.get(QUEUE_NAME))
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
只能查出来之后再按 name="ThriftJDBC/ODBC Server" 过滤。
查询到的app列表:
100.0
History
http://slave5.cluster.local:8088/proxy/application_1541409376608_0041/
SPARK
livy-session-16-rslkzght
0
http://slave2:8042/node/containerlogs/container_e82_1541409376608_0041_01_000001/hdfs
slave2:8042
10.221.129.26:0
-1
-1
0
0
0
0
4.SPARK API
从yarn app中取得trackingUrl属性,查询spark sql任务。
4.1 查看app信息
历史App的 job:
curlhttp://slave5.cluster.local:18081/api/v1/applications
[
{
"id":"local-1543304247490",
"name":"Thrift JDBC/ODBC Server",
"attempts": [
{
"startTime":"2018-11-27T07:37:25.240GMT",
"endTime":"2018-11-27T07:37:30.094GMT",
"lastUpdated":"2018-11-27T07:37:30.886GMT",
"duration": 4854,
"sparkUser":"hdfs",
"completed":true,
"appSparkVersion":"2.3.2",
"startTimeEpoch":1543304245240,
"endTimeEpoch":1543304250094,
"lastUpdatedEpoch":1543304250886
}
]
}
当前运行的App的job:
curlhttp://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications
[ {
"id" :"application_1541409376608_0046",
"name" :"Thrift JDBC/ODBC Server",
"attempts" : [ {
"startTime" :"2018-11-27T07:40:28.712GMT",
"endTime" :"1969-12-31T23:59:59.999GMT",
"lastUpdated" :"2018-11-27T07:40:28.712GMT",
"duration" : 0,
"sparkUser" :"spark",
"completed" :false,
"appSparkVersion": "2.3.1.3.0.0.0-1634",
"lastUpdatedEpoch": 1543304428712,
"startTimeEpoch": 1543304428712,
"endTimeEpoch" :-1
} ]
} ]
注:我们使用了yarn端口的proxy来访问spark ui的API,可以屏蔽后端spark ui的地址。
4.2 查看sql job
job列表:
curlhttp://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/jobs
[ {
"jobId" : 0,
"name" :"run at AccessController.java:0",
"submissionTime": "2018-11-27T07:41:50.904GMT",
"completionTime": "2018-11-27T07:42:31.746GMT",
"stageIds" : [ 0],
"jobGroup" :"afd3e9b1-8810-4df8-a25c-12f0fd5e6163",
"status" :"SUCCEEDED",
"numTasks" : 2,
"numActiveTasks": 0,
"numCompletedTasks": 2,
"numSkippedTasks": 0,
"numFailedTasks": 0,
"numKilledTasks": 0,
"numCompletedIndices": 2,
"numActiveStages": 0,
"numCompletedStages": 1,
"numSkippedStages": 0,
"numFailedStages": 0,
"killedTasksSummary": { }
} ]
某个job:
curl http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/jobs/0
{
"jobId" : 0,
"name" :"run at AccessController.java:0",
"submissionTime": "2018-11-27T07:41:50.904GMT",
"completionTime": "2018-11-27T07:42:31.746GMT",
"stageIds" : [ 0],
"jobGroup" :"afd3e9b1-8810-4df8-a25c-12f0fd5e6163",
"status" :"SUCCEEDED",
"numTasks" : 2,
"numActiveTasks": 0,
"numCompletedTasks": 2,
"numSkippedTasks": 0,
"numFailedTasks": 0,
"numKilledTasks": 0,
"numCompletedIndices": 2,
"numActiveStages": 0,
"numCompletedStages": 1,
"numSkippedStages": 0,
"numFailedStages": 0,
"killedTasksSummary": { }
}
job返回的信息中没有执行的sql和时间等信息,需要进一步查询stageIds中的stage数据。一个job可能会保护多个stage,需要汇总多个stage的执行时间:
curlhttp://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/stage/0
[
{
"status":"COMPLETE",
"stageId": 0,
"attemptId": 0,
"numTasks": 2,
"numActiveTasks":0,
"numCompleteTasks":2,
"numFailedTasks":0,
"numKilledTasks":0,
"numCompletedIndices":2,
"executorRunTime":23209,
"executorCpuTime":3730752861,
"submissionTime":"2018-11-27T07:41:51.030GMT",
"firstTaskLaunchedTime":"2018-11-27T07:42:07.414GMT",
"completionTime":"2018-11-27T07:42:31.741GMT",
"inputBytes":8718,
"inputRecords":500,
"outputBytes": 0,
"outputRecords":0,
"shuffleReadBytes":0,
"shuffleReadRecords":0,
"shuffleWriteBytes":0,
"shuffleWriteRecords":0,
"memoryBytesSpilled":0,
"diskBytesSpilled":0,
"name": "runat AccessController.java:0",
"description":"select * from spokes",
"details": "org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:171) java.security.AccessController.doPrivileged(NativeMethod) javax.security.auth.Subject.doAs(Subject.java:422) org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688) org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:185) java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) java.util.concurrent.FutureTask.run(FutureTask.java:266) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)",
"schedulingPool":"default",
"rddIds": [
4,
3,
2,
0,
1
],
"accumulatorUpdates":[
],
"tasks": {
"0": {
"taskId": 0,
"index": 0,
"attempt": 0,
"launchTime":"2018-11-27T07:42:07.414GMT",
"duration": 23071,
"executorId":"1",
"host":"slave3",
"status":"SUCCESS",
"taskLocality":"NODE_LOCAL",
"speculative":false,
"accumulatorUpdates":[
],
"taskMetrics": {
"executorDeserializeTime":4366,
"executorDeserializeCpuTime":426890376,
"executorRunTime":16589,
"executorCpuTime":1993215589,
"resultSize":3787,
"jvmGcTime":1626,
"resultSerializationTime":3,
"memoryBytesSpilled":0,
"diskBytesSpilled":0,
"peakExecutionMemory":0,
"inputMetrics": {
"bytesRead":5812,
"recordsRead":251
},
"outputMetrics":{
"bytesWritten":0,
"recordsWritten":0
},
"shuffleReadMetrics":{
"remoteBlocksFetched":0,
"localBlocksFetched":0,
"fetchWaitTime":0,
"remoteBytesRead":0,
"remoteBytesReadToDisk":0,
"localBytesRead":0,
"recordsRead": 0
},
"shuffleWriteMetrics":{
"bytesWritten":0,
"writeTime": 0,
"recordsWritten":0
}
}
},
"1": {
"taskId": 1,
"index": 1,
"attempt": 0,
"launchTime":"2018-11-27T07:42:22.386GMT",
"duration": 9350,
"executorId":"2",
"host":"slave3",
"status":"SUCCESS",
"taskLocality":"NODE_LOCAL",
"speculative":false,
"accumulatorUpdates":[
],
"taskMetrics": {
"executorDeserializeTime":1449,
"executorDeserializeCpuTime":369738428,
"executorRunTime":6620,
"executorCpuTime":1737537272,
"resultSize":3770,
"jvmGcTime":1133,
"resultSerializationTime":1,
"memoryBytesSpilled":0,
"diskBytesSpilled":0,
"peakExecutionMemory":0,
"inputMetrics": {
"bytesRead":2906,
"recordsRead":249
},
"outputMetrics":{
"bytesWritten":0,
"recordsWritten":0
},
"shuffleReadMetrics":{
"remoteBlocksFetched":0,
"localBlocksFetched":0,
"fetchWaitTime":0,
"remoteBytesRead":0,
"remoteBytesReadToDisk":0,
"localBytesRead":0,
"recordsRead": 0
},
"shuffleWriteMetrics":{
"bytesWritten":0,
"writeTime": 0,
"recordsWritten":0
}
}
}
},
"executorSummary":{
"1": {
"taskTime":23071,
"failedTasks": 0,
"succeededTasks":1,
"killedTasks": 0,
"inputBytes":5812,
"inputRecords":251,
"outputBytes": 0,
"outputRecords":0,
"shuffleRead": 0,
"shuffleReadRecords":0,
"shuffleWrite":0,
"shuffleWriteRecords":0,
"memoryBytesSpilled":0,
"diskBytesSpilled":0
},
"2": {
"taskTime": 9350,
"failedTasks": 0,
"succeededTasks":1,
"killedTasks": 0,
"inputBytes":2906,
"inputRecords":249,
"outputBytes": 0,
"outputRecords":0,
"shuffleRead": 0,
"shuffleReadRecords":0,
"shuffleWrite":0,
"shuffleWriteRecords":0,
"memoryBytesSpilled":0,
"diskBytesSpilled":0
}
},
"killedTasksSummary":{
}
}
]
作者:贾德星
职务:云服务集团云计算产品中心高级架构师
专业领域:大数据
专家简介:系统软件架构师,具备二十余年一线软件开发的工作经历,经验丰富。主持研发金山大数据平台产品云海InsightHD,专注于大数据Hadoop/Spark/流计算/机器学习/深度学习等相关技术组件的研究与应用及组件研发。参与起草信息技术国家标准二十余项,已正式发布12项国家标准。研发并申请9项国家专利获得授权。
以上就是金山云为您带来的【技术实践】Spark+SQL作业查询API的相关内容,如果您还想了解更多用户,作业,历史,情况,需求,金山云的相关问题您可以点击页面中的链接进行具体了解。金山云提供云服务器,云主机,云存储,私有云,数据库,物理主机,RDS,KS3,SLB,KEC的全套产品服务,部分产品可以免费体验,而且会有定期的优惠、代金券等相关的活动。成立7年来,金山云始终坚持以客户为中心的服务理念,提供安全、可靠、稳定、高品质的云计算服务。以上是对【技术实践】Spark+SQL作业查询API相关介绍,如果觉得对您有帮助可以收藏。欢迎随时查看。