技术分享

金山云 > 推荐阅读 > 【技术实践】Spark+SQL作业查询API

【技术实践】Spark+SQL作业查询API

发布时间: 2020-01-16 12:01:03

  Spark SQL作业查询API


  1.需求


  查询当前用户的所有Spark SQL的执行历史情况。


  技术实现:


  第一步.首先查询yarn app中,当前用户的 yarn app(name=spark JDBCServer,name=当前用户),得到该app的tracking url,就是spakui或sparkhistory ui:


  第二步.从该ui的API获取作业:


  2. WEB UI


  YARN app:


  运行中的App,Tracking UIhttp://slave5.cluster.local:8088/proxy/application_1541409376608_0046/:


  已结束的app historyhttp://slave5.cluster.local:18081/history/application_1541409376608_0044/jobs/:


  基本类似。


  3.YARN API


  查看app:


  curlhttp://slave5.cluster.local:8088/ws/v1/cluster/apps?user=hdfs&name="ThriftJDBC/ODBC Server"


  name参数不支持:


  * state [deprecated] -state of the application


  * states - applicationsmatching the given application states, specified as a comma-separated list.


  * finalStatus - the finalstatus of the application - reported by the application itself


  * user - user name


  * queue - queue name


  * limit - total number ofapp objects to be returned


  * startedTimeBegin -applications with start time beginning with this time, specified in ms sinceepoch


  * startedTimeEnd -applications with start time ending with this time, specified in ms since epoch


  * finishedTimeBegin -applications with finish time beginning with this time, specified in ms sinceepoch


  * finishedTimeEnd -applications with finish time ending with this time, specified in ms sinceepoch


  * applicationTypes -applications matching the given application types, specified as a comma-separatedlist.


  * applicationTags -applications matching any of the given application tags, specified as acomma-separated list.


  * deSelects - a genericfields which will be skipped in the result.


  而应用的类型也是在代码中写死的:https://github.com/apache/spark/blob/v2.4.0/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala


  defcreateApplicationSubmissionContext(


  newApp:YarnClientApplication,


  containerContext:ContainerLaunchContext): ApplicationSubmissionContext = {


  val appContext =newApp.getApplicationSubmissionContext


  appContext.setApplicationName(sparkConf.get("spark.app.name","Spark"))


  appContext.setQueue(sparkConf.get(QUEUE_NAME))


  appContext.setAMContainerSpec(containerContext)


  appContext.setApplicationType("SPARK")


  只能查出来之后再按 name="ThriftJDBC/ODBC Server" 过滤。


  查询到的app列表:


  


  


  application_1541409376608_0041


  hdfs


  livy-session-16


  default


  FINISHED


  SUCCEEDED


  

100.0


  History


  


  http://slave5.cluster.local:8088/proxy/application_1541409376608_0041/


  


  


  1541409376608


  SPARK


  livy-session-16-rslkzght


  

0


  1543111595930


  1543112016116


  420186


  


  http://slave2:8042/node/containerlogs/container_e82_1541409376608_0041_01_000001/hdfs


  


  slave2:8042


  10.221.129.26:0


  slave2:45454


  -1


  -1


  -1


  -1


  -1


  2442054


  1190


  0.0


  0.0


  


  


  memory-mb


  2442054


  


  


  vcores


  1190


  


  


  

0


  

0


  0


  0


  

0


  

0


  


  SUCCEEDED


  false


  


  


  


  LIFETIME


  UNLIMITED


  -1


  


  


  4.SPARK API


  从yarn app中取得trackingUrl属性,查询spark sql任务。


  4.1 查看app信息


  历史App的 job:


  curlhttp://slave5.cluster.local:18081/api/v1/applications


  [


  {


  "id":"local-1543304247490",


  "name":"Thrift JDBC/ODBC Server",


  "attempts": [


  {


  "startTime":"2018-11-27T07:37:25.240GMT",


  "endTime":"2018-11-27T07:37:30.094GMT",


  "lastUpdated":"2018-11-27T07:37:30.886GMT",


  "duration": 4854,


  "sparkUser":"hdfs",


  "completed":true,


  "appSparkVersion":"2.3.2",


  "startTimeEpoch":1543304245240,


  "endTimeEpoch":1543304250094,


  "lastUpdatedEpoch":1543304250886


  }


  ]


  }


  当前运行的App的job:


  curlhttp://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications


  [ {


  "id" :"application_1541409376608_0046",


  "name" :"Thrift JDBC/ODBC Server",


  "attempts" : [ {


  "startTime" :"2018-11-27T07:40:28.712GMT",


  "endTime" :"1969-12-31T23:59:59.999GMT",


  "lastUpdated" :"2018-11-27T07:40:28.712GMT",


  "duration" : 0,


  "sparkUser" :"spark",


  "completed" :false,


  "appSparkVersion": "2.3.1.3.0.0.0-1634",


  "lastUpdatedEpoch": 1543304428712,


  "startTimeEpoch": 1543304428712,


  "endTimeEpoch" :-1


  } ]


  } ]


  注:我们使用了yarn端口的proxy来访问spark ui的API,可以屏蔽后端spark ui的地址。


  4.2 查看sql job


  job列表:


  curlhttp://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/jobs


  [ {


  "jobId" : 0,


  "name" :"run at AccessController.java:0",


  "submissionTime": "2018-11-27T07:41:50.904GMT",


  "completionTime": "2018-11-27T07:42:31.746GMT",


  "stageIds" : [ 0],


  "jobGroup" :"afd3e9b1-8810-4df8-a25c-12f0fd5e6163",


  "status" :"SUCCEEDED",


  "numTasks" : 2,


  "numActiveTasks": 0,


  "numCompletedTasks": 2,


  "numSkippedTasks": 0,


  "numFailedTasks": 0,


  "numKilledTasks": 0,


  "numCompletedIndices": 2,


  "numActiveStages": 0,


  "numCompletedStages": 1,


  "numSkippedStages": 0,


  "numFailedStages": 0,


  "killedTasksSummary": { }


  } ]


  某个job:


  curl http://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/jobs/0


  {


  "jobId" : 0,


  "name" :"run at AccessController.java:0",


  "submissionTime": "2018-11-27T07:41:50.904GMT",


  "completionTime": "2018-11-27T07:42:31.746GMT",


  "stageIds" : [ 0],


  "jobGroup" :"afd3e9b1-8810-4df8-a25c-12f0fd5e6163",


  "status" :"SUCCEEDED",


  "numTasks" : 2,


  "numActiveTasks": 0,


  "numCompletedTasks": 2,


  "numSkippedTasks": 0,


  "numFailedTasks": 0,


  "numKilledTasks": 0,


  "numCompletedIndices": 2,


  "numActiveStages": 0,


  "numCompletedStages": 1,


  "numSkippedStages": 0,


  "numFailedStages": 0,


  "killedTasksSummary": { }


  }


  job返回的信息中没有执行的sql和时间等信息,需要进一步查询stageIds中的stage数据。一个job可能会保护多个stage,需要汇总多个stage的执行时间:


  curlhttp://slave5.cluster.local:8088/proxy/application_1541409376608_0046/api/v1/applications/application_1541409376608_0046/stage/0


  [


  {


  "status":"COMPLETE",


  "stageId": 0,


  "attemptId": 0,


  "numTasks": 2,


  "numActiveTasks":0,


  "numCompleteTasks":2,


  "numFailedTasks":0,


  "numKilledTasks":0,


  "numCompletedIndices":2,


  "executorRunTime":23209,


  "executorCpuTime":3730752861,


  "submissionTime":"2018-11-27T07:41:51.030GMT",


  "firstTaskLaunchedTime":"2018-11-27T07:42:07.414GMT",


  "completionTime":"2018-11-27T07:42:31.741GMT",


  "inputBytes":8718,


  "inputRecords":500,


  "outputBytes": 0,


  "outputRecords":0,


  "shuffleReadBytes":0,


  "shuffleReadRecords":0,


  "shuffleWriteBytes":0,


  "shuffleWriteRecords":0,


  "memoryBytesSpilled":0,


  "diskBytesSpilled":0,


  "name": "runat AccessController.java:0",


  "description":"select * from spokes",


  "details": "org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1$$anon$2.run(SparkExecuteStatementOperation.scala:171) java.security.AccessController.doPrivileged(NativeMethod) javax.security.auth.Subject.doAs(Subject.java:422) org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1688) org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation$$anon$1.run(SparkExecuteStatementOperation.scala:185) java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) java.util.concurrent.FutureTask.run(FutureTask.java:266) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)",


  "schedulingPool":"default",


  "rddIds": [


  4,


  3,


  2,


  0,


  1


  ],


  "accumulatorUpdates":[


  ],


  "tasks": {


  "0": {


  "taskId": 0,


  "index": 0,


  "attempt": 0,


  "launchTime":"2018-11-27T07:42:07.414GMT",


  "duration": 23071,


  "executorId":"1",


  "host":"slave3",


  "status":"SUCCESS",


  "taskLocality":"NODE_LOCAL",


  "speculative":false,


  "accumulatorUpdates":[


  ],


  "taskMetrics": {


  "executorDeserializeTime":4366,


  "executorDeserializeCpuTime":426890376,


  "executorRunTime":16589,


  "executorCpuTime":1993215589,


  "resultSize":3787,


  "jvmGcTime":1626,


  "resultSerializationTime":3,


  "memoryBytesSpilled":0,


  "diskBytesSpilled":0,


  "peakExecutionMemory":0,


  "inputMetrics": {


  "bytesRead":5812,


  "recordsRead":251


  },


  "outputMetrics":{


  "bytesWritten":0,


  "recordsWritten":0


  },


  "shuffleReadMetrics":{


  "remoteBlocksFetched":0,


  "localBlocksFetched":0,


  "fetchWaitTime":0,


  "remoteBytesRead":0,


  "remoteBytesReadToDisk":0,


  "localBytesRead":0,


  "recordsRead": 0


  },


  "shuffleWriteMetrics":{


  "bytesWritten":0,


  "writeTime": 0,


  "recordsWritten":0


  }


  }


  },


  "1": {


  "taskId": 1,


  "index": 1,


  "attempt": 0,


  "launchTime":"2018-11-27T07:42:22.386GMT",


  "duration": 9350,


  "executorId":"2",


  "host":"slave3",


  "status":"SUCCESS",


  "taskLocality":"NODE_LOCAL",


  "speculative":false,


  "accumulatorUpdates":[


  ],


  "taskMetrics": {


  "executorDeserializeTime":1449,


  "executorDeserializeCpuTime":369738428,


  "executorRunTime":6620,


  "executorCpuTime":1737537272,


  "resultSize":3770,


  "jvmGcTime":1133,


  "resultSerializationTime":1,


  "memoryBytesSpilled":0,


  "diskBytesSpilled":0,


  "peakExecutionMemory":0,


  "inputMetrics": {


  "bytesRead":2906,


  "recordsRead":249


  },


  "outputMetrics":{


  "bytesWritten":0,


  "recordsWritten":0


  },


  "shuffleReadMetrics":{


  "remoteBlocksFetched":0,


  "localBlocksFetched":0,


  "fetchWaitTime":0,


  "remoteBytesRead":0,


  "remoteBytesReadToDisk":0,


  "localBytesRead":0,


  "recordsRead": 0


  },


  "shuffleWriteMetrics":{


  "bytesWritten":0,


  "writeTime": 0,


  "recordsWritten":0


  }


  }


  }


  },


  "executorSummary":{


  "1": {


  "taskTime":23071,


  "failedTasks": 0,


  "succeededTasks":1,


  "killedTasks": 0,


  "inputBytes":5812,


  "inputRecords":251,


  "outputBytes": 0,


  "outputRecords":0,


  "shuffleRead": 0,


  "shuffleReadRecords":0,


  "shuffleWrite":0,


  "shuffleWriteRecords":0,


  "memoryBytesSpilled":0,


  "diskBytesSpilled":0


  },


  "2": {


  "taskTime": 9350,


  "failedTasks": 0,


  "succeededTasks":1,


  "killedTasks": 0,


  "inputBytes":2906,


  "inputRecords":249,


  "outputBytes": 0,


  "outputRecords":0,


  "shuffleRead": 0,


  "shuffleReadRecords":0,


  "shuffleWrite":0,


  "shuffleWriteRecords":0,


  "memoryBytesSpilled":0,


  "diskBytesSpilled":0


  }


  },


  "killedTasksSummary":{


  }


  }


  ]


作者:贾德星


职务:云服务集团云计算产品中心高级架构师


专业领域:大数据


专家简介:系统软件架构师,具备二十余年一线软件开发的工作经历,经验丰富。主持研发金山大数据平台产品云海InsightHD,专注于大数据Hadoop/Spark/流计算/机器学习/深度学习等相关技术组件的研究与应用及组件研发。参与起草信息技术国家标准二十余项,已正式发布12项国家标准。研发并申请9项国家专利获得授权。

以上就是金山云为您带来的【技术实践】Spark+SQL作业查询API的相关内容,如果您还想了解更多用户,作业,历史,情况,需求,金山云的相关问题您可以点击页面中的链接进行具体了解。金山云提供云服务器,云主机,云存储,私有云,数据库,物理主机,RDS,KS3,SLB,KEC的全套产品服务,部分产品可以免费体验,而且会有定期的优惠、代金券等相关的活动。成立7年来,金山云始终坚持以客户为中心的服务理念,提供安全、可靠、稳定、高品质的云计算服务。以上是对【技术实践】Spark+SQL作业查询API相关介绍,如果觉得对您有帮助可以收藏。欢迎随时查看。
以上就是金山云为您带来的推荐阅读的全部内容,如果还想了解更多内容可访问金山云官网www.ksyun.com了解其它资讯。
*免责声明:部分文章信息来源于网络以及网友投稿,本网站只负责对文章进行整理、排版、编辑,是出于传递更多信息之目的,并不意味着赞同其观点或证实其内容的真实性。如本站文章和转稿涉及版权等问题,请作者在及时联系本站,我们会尽快处理。