Skip to content

Commit

Permalink
Simplify / Clarify db-fetching methods in SpaceBasedStep
Browse files Browse the repository at this point in the history
- Also some other minor code improvements

Signed-off-by: Benjamin Rögner <[email protected]>
  • Loading branch information
roegi committed Jan 14, 2025
1 parent 5f6e4f7 commit acc996c
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,9 @@ private void checkAsyncExecutionState() {

private void handleAsyncUpdate(ProcessUpdate processUpdate) {
boolean isCompleted = onAsyncUpdate(processUpdate);
if(isSimulation) {
/** In simulations we are handling success callbacks by our own */
if (isSimulation)
//In simulations we are handling success callbacks by our own
return;
}
if (isCompleted)
reportAsyncSuccess();
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@

package com.here.xyz.jobs.steps.execution;

import static com.fasterxml.jackson.annotation.JsonInclude.Include.ALWAYS;
import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC;
import static java.util.regex.Matcher.quoteReplacement;

import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.here.xyz.jobs.steps.Config;
import com.here.xyz.jobs.steps.StepExecution;
import com.here.xyz.jobs.steps.inputs.Input;
Expand Down Expand Up @@ -69,7 +67,6 @@ public class RunEmrJob extends LambdaBasedStep<RunEmrJob> {
private String executionRoleArn;
private String jarUrl;
private List<String> positionalScriptParams = new ArrayList<>();
@JsonInclude(ALWAYS)
private Map<String, String> namedScriptParams = new HashMap<>();
private String sparkParams;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,8 @@ private static List<Database> loadDatabasesForConnector(Connector connector) {
.withName(connector.id)
.withRole(WRITER));

/** Adding a virtual readReplica for local testing (same db but ro user) */
//TODO: Ensure that we always have a reader for all Databases (by using the read Only user or replica_host if present) and then - if there is none - it is not supported for a good reason
//Adding a virtual readReplica for local testing (same db but ro user)
if(connector.id.equals("psql") && (connectorDbSettings.runsLocal())) {
databases.add(new Database(null, null, 128, connectorDbSettingsMap)
.withName(connector.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.here.xyz.events.ContextAwareEvent.SpaceContext;
import com.here.xyz.jobs.steps.Config;
import com.here.xyz.jobs.steps.execution.db.Database;
import com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole;
import com.here.xyz.jobs.steps.execution.db.DatabaseBasedStep;
import com.here.xyz.jobs.steps.impl.transport.CopySpace;
import com.here.xyz.jobs.steps.impl.transport.CopySpacePost;
Expand Down Expand Up @@ -66,12 +67,6 @@ public abstract class SpaceBasedStep<T extends SpaceBasedStep> extends DatabaseB
@JsonView({Internal.class, Static.class})
private String spaceId;

@JsonIgnore
private Database db_writer;

@JsonIgnore
private Database db_reader;

@JsonIgnore
private Map<String, Space> cachedSpaces = new ConcurrentHashMap<>();

Expand Down Expand Up @@ -136,49 +131,69 @@ protected HubWebClient hubWebClient() {
return HubWebClient.getInstance(Config.instance.HUB_ENDPOINT);
}

/**
* Provides the {@link Database} instance for the pre-defined space ID of this step. See: {@link #getSpaceId()}
* The loading calls are cached; that means that later calls will not induce an actual REST request to Hub.
* Also, the Database objects are cached. See: {@link Database#loadDatabase(String, DatabaseRole)}
* @return The Database for the pre-defined space of this step and for the provided role
* @throws WebClientException
*/
protected Database db(DatabaseRole role) throws WebClientException {
return loadDatabase(space().getStorage().getId(), role);
}

/**
* Provides the default {@link Database} instance (WRITER) for the pre-defined space ID of this step. See: {@link #getSpaceId()}
* The loading calls are cached; that means that later calls will not induce an actual REST request to Hub.
* Also, the Database objects are cached. See: {@link Database#loadDatabase(String, DatabaseRole)}
* @return The WRITER Database for the pre-defined space of this step
* @throws WebClientException
*/
protected Database db() throws WebClientException {
return db(WRITER);
}

protected Database dbReaderElseWriter() throws WebClientException {
try{
/**
* Provides the READER {@link Database} instance for the pre-defined space ID of this step. See: {@link #getSpaceId()}
* The loading calls are cached; that means that later calls will not induce an actual REST request to Hub.
* Also, the Database objects are cached. See: {@link Database#loadDatabase(String, DatabaseRole)}
* If no READER Database is found, the writer will be returned.
* @return The READER Database for the pre-defined space of this step, or the WRITER Database if there is no READER
* @throws WebClientException
*/
protected Database dbReader() throws WebClientException {
try {
return db(READER);
}
catch( RuntimeException rt ) {
catch (RuntimeException rt) {
//TODO: Ensure that we always have a reader for all Databases and then - if there is none - it would be for a good reason, so we should not ignore that exception anymore
if (!(rt.getCause() instanceof NoSuchElementException))
throw rt;
}

return db(WRITER);
}

protected Database db(Database.DatabaseRole role) throws WebClientException {
return role.equals(READER) ? db_reader() : db_writer();
}

private Database db_reader() throws WebClientException {
if (db_reader == null) {
logger.info("[{}] Loading database[{}] for space {} ...", getGlobalStepId(), READER.name(), getSpaceId());
db_reader = loadDatabase(space().getStorage().getId(), READER);
}
return db_reader;
}

private Database db_writer() throws WebClientException {
if (db_writer == null) {
logger.info("[{}] Loading database[{}] for space {} ...", getGlobalStepId(), WRITER.name(), getSpaceId());
db_writer = loadDatabase(space().getStorage().getId(), WRITER);
}
return db_writer;
}

/**
* Provides the space instance for the provided space ID.
* The loading calls are cached; that means that later calls will not induce an actual REST request to Hub.
* @param spaceId
* @return
* @throws WebClientException
*/
protected Space space(String spaceId) throws WebClientException {
Space space = cachedSpaces.get(spaceId);
if (space == null)
cachedSpaces.put(spaceId, space = loadSpace(spaceId));
return space;
}

/**
* Provides the space instance for the pre-defined space ID of this step. See: {@link #getSpaceId()}
* The loading calls are cached; that means that later calls will not induce an actual REST request to Hub.
* @return
* @throws WebClientException
*/
protected Space space() throws WebClientException {
return space(getSpaceId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.here.xyz.events.PropertiesQuery;
import com.here.xyz.jobs.steps.execution.StepException;
import com.here.xyz.jobs.steps.execution.db.Database;
import com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole;
import com.here.xyz.jobs.steps.impl.SpaceBasedStep;
import com.here.xyz.jobs.steps.impl.tools.ResourceAndTimeCalculator;
import com.here.xyz.jobs.steps.impl.transport.query.ExportSpace;
Expand All @@ -57,7 +56,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -225,7 +223,7 @@ public List<Load> getNeededResources() {
boolean isRemoteCopy = isRemoteCopy(sourceSpace, targetSpace);

if (isRemoteCopy)
expectedLoads.add(new Load().withResource(dbReaderElseWriter())
expectedLoads.add(new Load().withResource(dbReader())
.withEstimatedVirtualUnits(calculateNeededAcus()));

logger.info("[{}] #getNeededResources() isRemoteCopy={} {} -> {}", getGlobalStepId(), isRemoteCopy, sourceSpace.getStorage().getId(),
Expand Down Expand Up @@ -412,7 +410,7 @@ private SQLQuery buildCopySpaceQuery(Space sourceSpace, Space targetSpace, int t
SQLQuery contentQuery = buildCopyContentQuery(sourceSpace, threadCount, threadId);

if (isRemoteCopy(sourceSpace,targetSpace))
contentQuery = buildCopyQueryRemoteSpace(dbReaderElseWriter(), contentQuery );
contentQuery = buildCopyQueryRemoteSpace(dbReader(), contentQuery );

return new SQLQuery(
/**/
Expand Down Expand Up @@ -489,7 +487,7 @@ private SearchForFeatures getQueryRunner(GetFeaturesByGeometryEvent event) throw

Database db = !isRemoteCopy()
? loadDatabase(sourceSpace.getStorage().getId(), WRITER)
: dbReaderElseWriter();
: dbReader();

SearchForFeatures queryRunner;
if (geometry == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@

package com.here.xyz.jobs.steps.impl.transport;

import static com.here.xyz.events.ContextAwareEvent.SpaceContext.DEFAULT;
import static com.here.xyz.events.ContextAwareEvent.SpaceContext.SUPER;
import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM;
import static com.here.xyz.jobs.steps.Step.Visibility.USER;
import static com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole.WRITER;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.JOB_EXECUTOR;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.JOB_VALIDATE;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_ON_ASYNC_SUCCESS;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.buildTemporaryJobTableDropStatement;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.createQueryContext;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.errorLog;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.getTemporaryJobTableName;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog;
import static com.here.xyz.util.web.XyzWebClient.WebClientException;

import com.fasterxml.jackson.annotation.JsonView;
import com.here.xyz.events.ContextAwareEvent.SpaceContext;
import com.here.xyz.events.PropertiesQuery;
Expand All @@ -27,7 +43,6 @@
import com.here.xyz.jobs.steps.Step;
import com.here.xyz.jobs.steps.StepExecution;
import com.here.xyz.jobs.steps.execution.LambdaBasedStep.LambdaStepRequest.ProcessUpdate;
import com.here.xyz.jobs.steps.execution.db.Database;
import com.here.xyz.jobs.steps.impl.SpaceBasedStep;
import com.here.xyz.jobs.steps.impl.tools.ResourceAndTimeCalculator;
import com.here.xyz.jobs.steps.outputs.DownloadUrl;
Expand All @@ -43,32 +58,15 @@
import com.here.xyz.util.db.SQLQuery;
import com.here.xyz.util.geo.GeoTools;
import com.here.xyz.util.service.BaseHttpServerVerticle.ValidationException;
import org.geotools.api.referencing.FactoryException;
import org.locationtech.jts.geom.Geometry;

import javax.xml.crypto.dsig.TransformException;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

import static com.here.xyz.jobs.steps.execution.db.Database.DatabaseRole.WRITER;
import static com.here.xyz.events.ContextAwareEvent.SpaceContext.DEFAULT;
import static com.here.xyz.events.ContextAwareEvent.SpaceContext.SUPER;
import static com.here.xyz.jobs.steps.Step.Visibility.SYSTEM;
import static com.here.xyz.jobs.steps.Step.Visibility.USER;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.JOB_EXECUTOR;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.JOB_VALIDATE;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_EXECUTE;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.Phase.STEP_ON_ASYNC_SUCCESS;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.buildTemporaryJobTableDropStatement;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.createQueryContext;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.errorLog;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.getTemporaryJobTableName;
import static com.here.xyz.jobs.steps.impl.transport.TransportTools.infoLog;
import static com.here.xyz.util.web.XyzWebClient.WebClientException;
import javax.xml.crypto.dsig.TransformException;
import org.geotools.api.referencing.FactoryException;
import org.locationtech.jts.geom.Geometry;


/**
Expand Down Expand Up @@ -186,7 +184,7 @@ public List<Load> getNeededResources() {
+ statistics.getDataSize().getValue() + " => neededACUs:" + overallNeededAcus);

//TODO: add writer?
return List.of(new Load().withResource(dbReaderElseWriter()).withEstimatedVirtualUnits(overallNeededAcus),
return List.of(new Load().withResource(dbReader()).withEstimatedVirtualUnits(overallNeededAcus),
new Load().withResource(IOResource.getInstance()).withEstimatedVirtualUnits(getUncompressedUploadBytesEstimation()));
}catch (Exception e){
throw new RuntimeException(e);
Expand Down Expand Up @@ -365,7 +363,7 @@ public void execute() throws Exception {

infoLog(STEP_EXECUTE, this,"Start export thread number: " + i );
//TODO: use READER instead. Fix local Problem SQL state: 2F003
runReadQueryAsync(buildExportQuery(schema, i), dbReaderElseWriter(), 0,false);
runReadQueryAsync(buildExportQuery(schema, i), dbReader(), 0,false);
}
}

Expand All @@ -385,7 +383,7 @@ public void resume() throws Exception {
for (int i = 0; i < calculatedThreadCount; i++) {
if(threadList.contains(Integer.valueOf(i))) {
infoLog(STEP_EXECUTE, this, "Start export for thread number: " + i);
runReadQueryAsync(buildExportQuery(schema, i), dbReaderElseWriter(), 0, false);
runReadQueryAsync(buildExportQuery(schema, i), dbReader(), 0, false);
}
}
}
Expand Down Expand Up @@ -470,7 +468,7 @@ protected boolean onAsyncFailure() {

private String generateFilteredExportQuery(int threadNumber) throws WebClientException, TooManyResourcesClaimed, QueryBuildingException {
GetFeaturesByGeometryBuilder queryBuilder = new GetFeaturesByGeometryBuilder()
.withDataSourceProvider(requestResource(dbReaderElseWriter(), 0));
.withDataSourceProvider(requestResource(dbReader(), 0));
if(context == SUPER)
space().switchToSuper(superSpace().getId());

Expand Down

0 comments on commit acc996c

Please sign in to comment.