Skip to content

Commit

Permalink
Add counter for tasks created on worker
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk authored and wendigo committed Jan 8, 2025
1 parent 509508a commit b27dd3b
Showing 1 changed file with 27 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public class SqlTaskManager

private final long queryMaxMemoryPerNode;

private final CounterStat createdTasks = new CounterStat();
private final CounterStat failedTasks = new CounterStat();
private final Optional<StuckSplitTasksInterrupter> stuckSplitTasksInterrupter;
private final LanguageFunctionProvider languageFunctionProvider;
Expand Down Expand Up @@ -230,22 +231,25 @@ public SqlTaskManager(
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryMemoryPerNode, maxQuerySpillPerNode)));

tasks = buildNonEvictableCache(CacheBuilder.newBuilder(), CacheLoader.from(
taskId -> createSqlTask(
taskId,
locationFactory.createLocalTaskLocation(taskId),
nodeInfo.getNodeId(),
queryContexts.getUnchecked(taskId.getQueryId()),
tracer,
sqlTaskExecutionFactory,
taskNotificationExecutor,
sqlTask -> {
languageFunctionProvider.unregisterTask(taskId);
finishedTaskStats.merge(sqlTask.getIoStats());
},
maxBufferSize,
maxBroadcastBufferSize,
requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null"),
failedTasks)));
taskId -> {
createdTasks.update(1);
return createSqlTask(
taskId,
locationFactory.createLocalTaskLocation(taskId),
nodeInfo.getNodeId(),
queryContexts.getUnchecked(taskId.getQueryId()),
tracer,
sqlTaskExecutionFactory,
taskNotificationExecutor,
sqlTask -> {
languageFunctionProvider.unregisterTask(taskId);
finishedTaskStats.merge(sqlTask.getIoStats());
},
maxBufferSize,
maxBroadcastBufferSize,
requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null"),
failedTasks);
}));

stuckSplitTasksInterrupter = createStuckSplitTasksInterrupter(
config.isInterruptStuckSplitTasksEnabled(),
Expand Down Expand Up @@ -355,6 +359,13 @@ public ThreadPoolExecutorMBean getTaskNotificationExecutor()
return taskNotificationExecutorMBean;
}

@Managed(description = "Created tasks counter")
@Nested
public CounterStat getCreatedTasks()
{
return createdTasks;
}

@Managed(description = "Failed tasks counter")
@Nested
public CounterStat getFailedTasks()
Expand Down

0 comments on commit b27dd3b

Please sign in to comment.