Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce mixed language module linkismange #4463 #4509

Conversation

CharlieYan24
Copy link
Contributor

What is the purpose of the change

Reduce mixlanguage module linkismange. Translate linkis-application-manager and linkis-manager-common module from Scala to Java.

Related issues/PRs

Related issues: #4463
Related pr: #4463

Brief change log

  • Translate linkis-application-manager and linkis-manager-common module from Scala to Java.
  • Test execution of Shell and Spark engine Job after translate .

Checklist

  • I have read the Contributing Guidelines on pull requests.
  • I have explained the need for this PR and the problem it solves
  • I have explained the changes or the new features added to this PR
  • I have added tests corresponding to this change
  • I have updated the documentation to reflect this change
  • I have verified that this change is backward compatible (If not, please discuss on the Linkis mailing list first)
  • If this is a code change: I have written unit tests to fully verify the new behavior.

@CharlieYan24 CharlieYan24 changed the base branch from master to dev-1.4.0 April 30, 2023 08:36
@CharlieYan24 CharlieYan24 changed the title Dev 1.4.0 reduce mixlanguage module linkismange Reduce mixed language module linkismange #4463 Apr 30, 2023
@CharlieYan24
Copy link
Contributor Author

Test result of Spark sql engine job is following, job launch log :
image

Job result:
image

@CharlieYan24
Copy link
Contributor Author

Test result of Shell engine job is following, job launch log :
image

Job result:
image

@CharlieYan24
Copy link
Contributor Author

CharlieYan24 commented Apr 30, 2023

Sorry for failed the spotless test, Already fixed it.

@CharlieYan24 CharlieYan24 force-pushed the dev-1.4.0-reduce-mixlanguage-module-linkismange branch from abc6e56 to 4a1a413 Compare May 2, 2023 15:03
@CharlieYan24
Copy link
Contributor Author

Modified code and passed maven build with parameters: -Pspark-2.4 -Phadoop-2.7.

int fromLine = getAs(parameters, "fromLine", 1);
boolean enableTail = getAs(parameters, "enableTail", false);
if (lastRows > EngineConnLogOperator.MAX_LOG_FETCH_SIZE.getValue()) {
throw new WarnException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use ECMErrorException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ECMErrorException instead of WarnException.Thanks for your good advice.

skippedLine += 1;
} else {
if (rowIgnore) {
Matcher matcher = linePattern.matcher(line);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line pattern needs to judge null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified line pattern to avoid NullPointException. Thanks for your good advice.

return resultMap;
} catch (IOException e) {
// ing
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need use ECMException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used ECMErrorException instead of RuntimeException. Thanks for your good advice.

line = randomAndReversedReadLine(randomReader, reversedReader);
}

IOUtils.closeQuietly(randomReader);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already remove repeat code, Thanks .


File logPath = new File(engineConnLogDir, logType);
if (!logPath.exists() || !logPath.isFile()) {
throw new WarnException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use ECMErrorException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ECMErrorException instead of WarnException.Thanks for your good advice.


import java.util.List;

public class RequestExpectedResourceAndWait {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already remove RequestExpectedResourceAndWait class.


import java.util.List;

public class RequestExpectedResource {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already remove RequestExpectedResource class,Thanks.


import java.util.List;

public class RequestExpectedResource {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already remove RequestExpectedResource class.

@@ -15,6 +15,6 @@
* limitations under the License.
*/

package org.apache.linkis.manager.common.protocol
package org.apache.linkis.manager.common.protocol;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already remove ServiceState class,Thanks.


package org.apache.linkis.manager.common.protocol;

public abstract class ServiceHealthReport {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already remove ServiceHealthReport class.

RMLabelContainer rMLabelContainer = labelResourceService.enrichLabels(engineNode.getLabels());

PersistenceResource persistenceResource =
labelResourceService.getPersistenceResource(rMLabelContainer.getEngineInstanceLabel());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ticketID can be obtained through ecResourceInfo.getTicketId, check the resource if it is only null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because ecResourceInfo is already been null in above line, so that I get Ticket id from labelResourceService ,please correct me if I was wrong ,thanks.
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misread


String engineAskAsyncId = getAsyncId();
CompletableFuture<EngineNode> createNodeThread =
CompletableFuture.supplyAsync(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add Executors (thread pools)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already add Executors (thread pools), Thanks.

"%s ticketID:%s Failed to initialize engine, reason: %s ",
engineNode.getServiceInstance(), resourceTicketId, errorInfo.getKey()));
}
throw new LinkisRetryException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should to use AMErrorException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already used AMErrorException instead of LinkisRetryException ,Thanks for your good advice.

} catch (Throwable t) {
logger.info(
"Failed to reuse engineConn time taken " + (System.currentTimeMillis() - startTime), t);
throw new RuntimeException(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to use linkisRunitmeException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already use AMErrorException instead ,thanks for your time.

stopEngine(engineStopRequest, Sender.getSender(Sender.getThisServiceInstance())),
String.format("async stop engineFailed %s", engineStopRequest),
logger);
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already add add executor, Thanks for your excellent advice.

String.format("asyncStopEngineWithUpdateMetrics with error: %s", e.getMessage()),
e);
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already add add executor, Thanks for your excellent advice.

engineNodes.add(node);
}
}
dealECNodes(engineNodes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to try catch

Copy link
Contributor Author

@CharlieYan24 CharlieYan24 May 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add try catch for dealECNodes(engineNodes) method, thanks .

*/
@Override
public void run() {
logger.info("Start to check the health of the node");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run need to try catch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add try catch for dealECNodes(engineNodes) method, thanks .

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Utils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is recommended to move to common and modify it to LinkisUtils

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already modify it to LinkisUtils, Thanks for your advice.

EngineStopRequest stopEngineRequest =
new EngineStopRequest(engineNode.getServiceInstance(), ManagerUtils.getAdminUser());
engineStopService.asyncStopEngine(stopEngineRequest);
throw new RuntimeException(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should LinkisRuntimeException

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used LinkisRuntimie instead of RuntimeException, thanks.

combinedLabelBuilder.build("", Lists.newArrayList(userCreatorLabel, engineTypeLabel));
} catch (LabelErrorException e) {
logger.warn("getAllUserResource failed", e);
throw new RuntimeException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use LinkisRuntimie

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used LinkisRuntimie instead of RuntimeException, thanks.

}

@Override
public void run() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should add tryandwarn

Copy link
Contributor Author

@CharlieYan24 CharlieYan24 May 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already added try catch inside of method run().

() -> {
Pair<YarnResource, YarnResource> yarnResource =
getResources(rmWebAddress, realQueueName, queueName);
getResources(rmWebAddress, realQueueName, queueName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems duplicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed duplicate code ,thanks for your good advice.

return nodeResource;
},
t -> {
throw new RuntimeException(t);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use linkis exception ?

Copy link
Contributor Author

@CharlieYan24 CharlieYan24 May 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah , already used RMErrorException instead of RuntimeException. Thanks for your time.

if (queueValue.isPresent()) {
JsonNode jsonNode = queueValue.get();
double absoluteCapacity = jsonNode.path("absoluteCapacity").asDouble();
double effectiveResource = absoluteCapacity;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use absoluteCapacity directly?

Copy link
Contributor Author

@CharlieYan24 CharlieYan24 May 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, already deleted line double effectiveResource = absoluteCapacity, and assign absoluteCapacity = jsonNode.path("absoluteCapacity").asDouble() directly.

try {
JsonNode metrics = getResponseByUrl("metrics", rmWebAddress);
long totalMemory = metrics.path("clusterMetrics").path("totalMB").asLong();
long totalCores = metrics.path("clusterMetrics").path("totalVirtualCores").asLong();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can parse clusterMetrics once and used for totalMemory and totalCores ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah , thanks for your good advice. I have modified it .

}

public static JsonNode getChildQueues(JsonNode resp) {
JsonNode queues = resp.get("childQueues").get("queue");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is queue sub-attribute of childQueues ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is translate from pre scala version like bellow:
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can double confirm with actual json values

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already modified getChildQueues method,thanks.

String schedulerType = resp.path("scheduler").path("schedulerInfo").path("type").asText();
if ("capacityScheduler".equals(schedulerType)) {
realQueueName = queueName;
JsonNode childQueues = getChildQueuesOfCapacity(resp.path("scheduler").path("schedulerInfo"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we parse resp.path("scheduler").path("schedulerInfo") once and used latter ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah , thanks for your good advice. I have modified it .

String id = app.get("id").asText();
String user = app.get("user").asText();
String applicationType = app.get("applicationType").asText();
Optional<YarnResource> yarnResource = getYarnResource(Optional.ofNullable(app), queueName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this getYarnResource work ?

Copy link
Contributor Author

@CharlieYan24 CharlieYan24 May 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method getYarnResource works ,I trace this method return using arthas when I submited spark task.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yarn01
app not memory and cores?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my misunderstanding ,I added method getAllocatedYarnResource to fix above problem.
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

}
JsonNode jsonNode = null;
try {
jsonNode = objectMapper.readTree(entityString);
Copy link
Member

@GuoPhilipse GuoPhilipse May 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you test with three level child queue , not sure if it works on fairscheduler

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested three level child queue yet, I would tested in next week and reply the result ,thanks for your time.

Copy link
Contributor Author

@CharlieYan24 CharlieYan24 May 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already tested with three level child queue with following unit test and fairscheduler config
unit test:
image

fairscheduler config:
image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can try with multi child queue, for examples, you can define root.production.productionchild1,root.production.productionchild2,root.production.productionchild3, and test whether all this child queues parsed correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already passed three level child queue parse test.

  1. the following is fair-scheduler.xml
    image

  2. here is the submit parameters
    image

  3. the following is child queues parsing result
    image

}
return Optional.empty();
} catch (Exception e) {
// e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to print log

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already added print log.

String id = app.get("id").asText();
String user = app.get("user").asText();
String applicationType = app.get("applicationType").asText();
Optional<YarnResource> yarnResource = getYarnResource(Optional.ofNullable(app), queueName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yarn01
app not memory and cores?

httpResponse = response;
}

ObjectMapper objectMapper = new ObjectMapper();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use static var

Copy link
Contributor Author

@CharlieYan24 CharlieYan24 May 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already use static var instead.

} catch (Throwable t) {
if (t instanceof FatalException) {
logger.error("Fatal error, system exit...", t);
throw (FatalException) t;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add System.exit(fatal.getErrCode)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add
···
case e: VirtualMachineError =>
logger.error("Fatal error, system exit...", e)
System.exit(-1)
throw e
case exp
if (null != exp.getCause && (exp.getCause.isInstanceOf[FatalException] || exp.getCause
.isInstanceOf[VirtualMachineError])) =>
logger.error("Caused by fatal error, system exit...", exp)
System.exit(-1)
throw exp
case er: Error =>
logger.error("Throw error", er)
throw er
case t => catchOp(t)
···

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to add
···
case e: VirtualMachineError =>
logger.error("Fatal error, system exit...", e)
System.exit(-1)
throw e
case exp
if (null != exp.getCause && (exp.getCause.isInstanceOf[FatalException] || exp.getCause
.isInstanceOf[VirtualMachineError])) =>
logger.error("Caused by fatal error, system exit...", exp)
System.exit(-1)
throw exp
case er: Error =>
logger.error("Throw error", er)
throw er
case t => catchOp(t)
···

Copy link
Contributor Author

@CharlieYan24 CharlieYan24 May 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already added the above exception.

t -> {
if (t instanceof ErrorException) {
ErrorException error = (ErrorException) t;
log.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should use log.error ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use log.error instead ,thanks for your time.

} catch (WarnException t) {
WarnException warn = (WarnException) t;
log.warn(
"Warning code(警告码): {}, Warning message(警告信息): {}.", warn.getErrCode(), warn.getDesc());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method the corresponding org.apache.linkis.common.utils.Utils#tryAndErrorMsg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this method correspond to org.apache.linkis.common.utils.Utils#tryAndErrorMsg.

<module>python</module>
<module>openlookeng</module>
<module>io_file</module>
<!-- <module>python</module>-->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to remove

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncommented, thanks for your good advice.

Copy link
Contributor

@peacewong peacewong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@peacewong peacewong merged commit 5b7fd7b into apache:dev-1.4.0 May 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants