From 60ccb63722b58fc5cef7fd2d913236f9af1345bd Mon Sep 17 00:00:00 2001 From: v-kkhuang <420895376@qq.com> Date: Fri, 28 Nov 2025 15:41:42 +0800 Subject: [PATCH] Fix and update metrics information exception bug --- .../engine/DefaultEngineCreateService.scala | 35 +++++++++++------ .../engine/DefaultEngineReuseService.scala | 38 ++++++++++++------- 2 files changed, 49 insertions(+), 24 deletions(-) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala index 27a961b852..ac05d4e96b 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala @@ -289,17 +289,30 @@ class DefaultEngineCreateService s"Failed to update engineNode: ${t.getMessage}" ) } - if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) { - val emInstance = engineNode.getServiceInstance.getInstance - val ecmInstance = engineNode.getEMNode.getServiceInstance.getInstance - // 8. Update job history metrics after successful engine creation - 异步执行 - AMUtils.updateMetricsAsync( - taskId, - resourceTicketId, - emInstance, - ecmInstance, - null, - isReuse = false + Utils.tryCatch { + if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) { + val emInstance = engineNode.getServiceInstance.getInstance + val ecmInstance = engineNode.getEMNode.getServiceInstance.getInstance + if ((null != emInstance) && (null != ecmInstance)) { + // 8. Update job history metrics after successful engine creation - 异步执行 + AMUtils.updateMetricsAsync( + taskId, + resourceTicketId, + emInstance, + ecmInstance, + null, + isReuse = false + ) + } else { + logger.info( + s"CreateEngine:Failed to update metrics for emInstance: $emInstance, ecmInstance: $ecmInstance" + ) + } + } + } { case e: Exception => + logger.error( + s"Failed to update metrics for taskId: $taskId", + e ) } // 9. Add the Label of EngineConn, and add the Alias of engineConn diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala index fb79c9e062..eca36c9b41 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineReuseService.scala @@ -383,19 +383,31 @@ class DefaultEngineReuseService extends AbstractEngineService with EngineReuseSe .toJson(engine) + " from engineLabelMap : " + AMUtils.GSON.toJson(instances) ) } - if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) { - val engineNode = - ecResourceInfoService.getECResourceInfoRecordByInstance( - engine.getServiceInstance.getInstance - ) - // 异步更新 metrics - AMUtils.updateMetricsAsync( - taskId, - engineNode.getTicketId, - engineNode.getServiceInstance, - engineNode.getEcmInstance, - engineNode.getLogDirSuffix, - isReuse = true + Utils.tryCatch { + if (Configuration.METRICS_INCREMENTAL_UPDATE_ENABLE.getValue) { + val engineNode = + ecResourceInfoService.getECResourceInfoRecordByInstance( + engine.getServiceInstance.getInstance + ) + if (null != engineNode) { + // 异步更新 metrics + AMUtils.updateMetricsAsync( + taskId, + engineNode.getTicketId, + engineNode.getServiceInstance, + engineNode.getEcmInstance, + engineNode.getLogDirSuffix, + isReuse = true + ) + } else { + logger.info(s"ReuseEngine:Failed to update metrics for engineNode: $engineNode") + } + + } + } { case e: Exception => + logger.error( + s"Failed to update metrics for taskId: $taskId", + e ) } engine