-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CDAP-21096] Remove in-memory launching queue from RunRecordMonitorService #15800
base: develop
Are you sure you want to change the base?
[CDAP-21096] Remove in-memory launching queue from RunRecordMonitorService #15800
Conversation
ef61989
to
00add6d
Compare
Quality Gate passedIssues Measures |
dc068c3
to
0ea30c6
Compare
0ea30c6
to
abcd4c5
Compare
* @return Counter with total number of launching and running program runs. | ||
*/ | ||
public Counter getCount() { | ||
return getFlowControlMetrics(true, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not use term metrics as it's pretty strongly associated with metrics service. Use counts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed with Counter
.
} | ||
|
||
int launchingCount = addRequest(programRunId, programOptions, programDescriptor); | ||
int runningCount = getFlowControlMetrics(false, true).getRunningCount(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there value in having getFlowControlMetrics
method? in addRequestAndGetCount
it still ends up creating 2 separate database transactions. I'd consider moving transaction into addRequestAndGetCount/getCount and removing/splitting getFlowControlMetrics into 2 methods without transaction handling.
As of emitFlowControlMetrics, as far as I can see in all but edge case all metrics are emitted, so I'd just make it emit both metrics universaly.
if (runRecordDetail.getStatus() == ProgramRunStatus.PENDING) { | ||
runRecordMonitorService.addRequest(runRecordDetail.getProgramRunId()); | ||
flowControlMonitorService.addRequest(runRecordDetail.getProgramRunId(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is needed if the run is already in the database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is required for emitting flow control metrics.
Replaces addRequest
() with emitFlowControlMetrics()
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, originally we did it to recreate the in-memory store from database store. Now we read a database store. I don't see a reason to imeddiately write back. Do I miss something?
throws IOException { | ||
long startTs = RunIds.getTime(programRunId.getRun(), TimeUnit.SECONDS); | ||
if (startTs == -1L) { | ||
LOG.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would fail it. Since we are in the handler, we can fail and pass it to user. We could not do it in the subscriber
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Throwing IllegalArgumentException
for this case.
"Ignoring unexpected request to record rejected state for program run {} that has an existing " | ||
+ "run record in run state {} and cluster state {}.", | ||
programRunId, existing.getStatus(), existing.getCluster().getStatus()); | ||
"Ignoring unexpected request to record rejected state for program run {} that has no existing run record.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? I don't know in which case it may happen, but I'd still leave a trace. Just skip delete
.
*/ | ||
public int getLaunchingCount(Set<ProgramType> programTypes, @Nullable Integer limit) throws IOException { | ||
AtomicInteger count = new AtomicInteger(0); | ||
try (CloseableIterator<RunRecordDetail> iterator = queryProgramRuns( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is heavy. Can we use io.cdap.cdap.spi.data.StructuredTable#count? If needed, we can even add a field when we write the record to filter efficiently
abcd4c5
to
343101b
Compare
Context
Multiple instances of
RunRecordMonitoringService
cannot run as distributed services as the in-memory cache of the launching queue will result in inconsistencies.Removed the in-memory launching queue from the RunRecordMonitoringService and used
AppMetadataStore
APIs.For more context, see: #15773 (comment).
Note:
RunRecordMonitoringService
is renamed asFlowControlMonitoringService
.Testing