Skip to content

Commit

Permalink
Merge pull request #15522 from cdapio/CDAP-20940
Browse files Browse the repository at this point in the history
[CDAP-20940] Add image version to dataproc provisioner metric
  • Loading branch information
itsankit-google authored Jan 13, 2024
2 parents 489ba81 + 1d29fc9 commit 5b2cbb2
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.cdap.cdap.runtime.spi.common;

import com.google.common.base.Strings;
import io.cdap.cdap.runtime.spi.runtimejob.LaunchMode;
import javax.annotation.Nullable;

Expand All @@ -29,19 +30,34 @@ public class DataprocMetric {
private final Exception exception;
@Nullable
private final LaunchMode launchMode;
@Nullable
private final String imageVersion;

private DataprocMetric(String metricName, String region, @Nullable Exception exception,
@Nullable LaunchMode launchMode) {
@Nullable LaunchMode launchMode, @Nullable String imageVersion) {
this.metricName = metricName;
this.region = region;
this.exception = exception;
this.launchMode = launchMode;
this.imageVersion = imageVersion;
}

public String getMetricName() {
return metricName;
}

@Nullable
public String getImageVersion() {
if (!Strings.isNullOrEmpty(imageVersion)) {
// return major.minor
String[] splits = imageVersion.split("\\.", 3);
if (splits.length > 2) {
return String.format("%s.%s", splits[0], splits[1]);
}
}
return imageVersion;
}

public String getRegion() {
return region;
}
Expand Down Expand Up @@ -72,6 +88,7 @@ public static Builder builder(String metricName) {
public static class Builder {
private final String metricName;
private String region;
private String imageVersion;
@Nullable
private Exception exception;
@Nullable
Expand All @@ -96,6 +113,11 @@ public Builder setLaunchMode(@Nullable LaunchMode launchMode) {
return this;
}

public Builder setImageVersion(String imageVersion) {
this.imageVersion = imageVersion;
return this;
}

/**
* Returns a DataprocMetric.
*
Expand All @@ -106,7 +128,7 @@ public DataprocMetric build() {
// region should always be set unless there is a bug in the code
throw new IllegalStateException("Dataproc metric is missing the region");
}
return new DataprocMetric(metricName, region, exception, launchMode);
return new DataprocMetric(metricName, region, exception, launchMode, imageVersion);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,14 @@ public static String getSystemProjectId() {
* Emit a dataproc metric.
**/
public static void emitMetric(ProvisionerContext context, String region,
String metricName, @Nullable Exception e) {
String metricName, @Nullable String imageVersion, @Nullable Exception e) {
emitMetric(context,
DataprocMetric.builder(metricName).setRegion(region).setException(e).build());
DataprocMetric.builder(metricName).setRegion(region).setException(e).setImageVersion(imageVersion).build());
}

public static void emitMetric(ProvisionerContext context, String region, String metricName) {
emitMetric(context, region, metricName, null);
public static void emitMetric(ProvisionerContext context, String region,
@Nullable String imageVersion, String metricName) {
emitMetric(context, region, metricName, imageVersion, null);
}

/**
Expand All @@ -335,6 +336,9 @@ public static void emitMetric(ProvisionerContext context, DataprocMetric datapro
if (dataprocMetric.getLaunchMode() != null) {
tags.put("lchmode", dataprocMetric.getLaunchMode().name());
}
if (!Strings.isNullOrEmpty(dataprocMetric.getImageVersion())) {
tags.put("imgVer", dataprocMetric.getImageVersion());
}
ProvisionerMetrics metrics = context.getMetrics(tags.build());
metrics.count(dataprocMetric.getMetricName(), 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public Cluster createCluster(ProvisionerContext context) throws Exception {
try (DataprocClient client = clientFactory.create(conf, sshPublicKey != null)) {
Cluster reused = tryReuseCluster(client, context, conf);
if (reused != null) {
DataprocUtils.emitMetric(context, conf.getRegion(),
DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf),
"provisioner.createCluster.reuse.count");
return reused;
}
Expand Down Expand Up @@ -224,12 +224,12 @@ public Cluster createCluster(ProvisionerContext context) throws Exception {
numWarnings, numWarnings > 1 ? "s" : "",
String.join("\n", createOperationMeta.getWarningsList()));
}
DataprocUtils.emitMetric(context, conf.getRegion(),
DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf),
"provisioner.createCluster.response.count");
return new Cluster(clusterName, ClusterStatus.CREATING, Collections.emptyList(),
Collections.emptyMap());
} catch (Exception e) {
DataprocUtils.emitMetric(context, conf.getRegion(),
DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf),
"provisioner.createCluster.response.count", e);
throw e;
}
Expand Down Expand Up @@ -448,11 +448,11 @@ public ClusterStatus getClusterStatus(ProvisionerContext context, Cluster cluste

try (DataprocClient client = clientFactory.create(conf)) {
status = client.getClusterStatus(clusterName);
DataprocUtils.emitMetric(context, conf.getRegion(),
DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf),
"provisioner.clusterStatus.response.count");
return status;
} catch (Exception e) {
DataprocUtils.emitMetric(context, conf.getRegion(),
DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf),
"provisioner.clusterStatus.response.count", e);
throw e;
}
Expand All @@ -474,11 +474,11 @@ public Cluster getClusterDetail(ProvisionerContext context, Cluster cluster) thr
String clusterName = cluster.getName();
try (DataprocClient client = clientFactory.create(conf, shouldUseSsh(context, conf))) {
Optional<Cluster> existing = client.getCluster(clusterName);
DataprocUtils.emitMetric(context, conf.getRegion(),
DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf),
"provisioner.clusterDetail.response.count");
return existing.orElseGet(() -> new Cluster(cluster, ClusterStatus.NOT_EXISTS));
} catch (Exception e) {
DataprocUtils.emitMetric(context, conf.getRegion(),
DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf),
"provisioner.clusterDetail.response.count", e);
throw e;
}
Expand Down Expand Up @@ -507,10 +507,10 @@ protected void doDeleteCluster(ProvisionerContext context, Cluster cluster, Data
} else {
client.deleteCluster(clusterName);
}
DataprocUtils.emitMetric(context, conf.getRegion(),
DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf),
"provisioner.deleteCluster.response.count");
} catch (Exception e) {
DataprocUtils.emitMetric(context, conf.getRegion(),
DataprocUtils.emitMetric(context, conf.getRegion(), getImageVersion(context, conf),
"provisioner.deleteCluster.response.count", e);
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.runtime.spi.common;

import org.junit.Assert;
import org.junit.Test;

/**
* Unit tests for {@link DataprocMetric}.
*/
public class DataprocMetricTest {

@Test
public void testImageVersion() {
String metricName = "provisioner.createCluster.response.count";
String region = "us-east1";
String imageVersion = "2.1.35-debian11";

DataprocMetric dataprocMetric =
DataprocMetric.builder(metricName)
.setRegion(region).setImageVersion(imageVersion).build();
Assert.assertEquals("2.1", dataprocMetric.getImageVersion());

imageVersion = "2.1";
dataprocMetric = DataprocMetric.builder(metricName)
.setRegion(region).setImageVersion(imageVersion).build();
Assert.assertEquals("2.1", dataprocMetric.getImageVersion());

imageVersion = null;
dataprocMetric = DataprocMetric.builder(metricName)
.setRegion(region).setImageVersion(imageVersion).build();
Assert.assertNull(dataprocMetric.getImageVersion());

imageVersion = "2";
dataprocMetric = DataprocMetric.builder(metricName)
.setRegion(region).setImageVersion(imageVersion).build();
Assert.assertEquals("2", dataprocMetric.getImageVersion());

}
}

0 comments on commit 5b2cbb2

Please sign in to comment.