Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#2317] feat(server): Introduce server flush benchmark #2318

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public class ShuffleServerMetrics {

private static MetricsManager metricsManager;
private static boolean isRegister = false;
@VisibleForTesting public static Supplier<Void> callbackBenchmark;

public static synchronized void register(
CollectorRegistry collectorRegistry, String tags, ShuffleServerConf serverConf) {
Expand Down Expand Up @@ -326,6 +327,9 @@ public static void incStorageSuccessCounter(String storageHost) {
counterRemoteStorageSuccessWrite.labels(tags, storageHost).inc();
}
}
if (callbackBenchmark != null) {
callbackBenchmark.get();
}
}

public static void incStorageFailedCounter(String storageHost) {
Expand All @@ -338,6 +342,9 @@ public static void incStorageFailedCounter(String storageHost) {
counterRemoteStorageFailedWrite.labels(tags, storageHost).inc();
}
}
if (callbackBenchmark != null) {
callbackBenchmark.get();
}
}

public static void incHadoopStorageWriteDataSize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.UnitConverter;
import org.apache.uniffle.proto.RssProtos;
import org.apache.uniffle.server.block.ShuffleBlockIdManager;
import org.apache.uniffle.server.block.ShuffleBlockIdManagerFactory;

Expand Down Expand Up @@ -99,6 +100,13 @@ public ShuffleTaskInfo(String appId) {
this.partitionBlockCounters = JavaUtils.newConcurrentMap();
this.latestStageAttemptNumbers = JavaUtils.newConcurrentMap();
this.shuffleDetailInfos = JavaUtils.newConcurrentMap();
ShuffleDataDistributionType shuffleDataDistributionType =
ShuffleDataDistributionType.valueOf(RssProtos.DataDistribution.NORMAL.name());
specification.set(
ShuffleSpecification.builder()
.maxConcurrencyPerPartitionToWrite(1)
.dataDistributionType(shuffleDataDistributionType)
.build());
}

public Long getCurrentTimes() {
Expand Down
160 changes: 160 additions & 0 deletions server/src/main/java/org/apache/uniffle/server/bench/BenchHandle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server.bench;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.lang3.time.DateFormatUtils;

import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.web.vo.BenchArgumentVO;

public class BenchHandle implements AutoCloseable {
private final String id;
private final BenchArgumentVO argument;

public final List<String> log = new Vector<>();
private long startTime;
private long endTime;
private LinkedHashMap<String, AutoCloseable> allCloseable = new LinkedHashMap<>();
private BenchTrace benchTrace;
private long generateEventNums;
private long totalEventNum;
private AtomicLong completedEventNumCounter;

public BenchHandle(String id, BenchArgumentVO argument) {
this.id = id;
this.argument = argument;
}

public void startTrace() {
this.startTime = System.currentTimeMillis();
this.benchTrace = new BenchTrace();
benchTrace.start();
}

public void endTrace() {
endTime = System.currentTimeMillis();
benchTrace.end();
}

public BenchTrace getBenchTrace() {
return benchTrace;
}

public String getId() {
return id;
}

public String getStartTime() {
return DateFormatUtils.format(startTime, Constants.DATE_PATTERN);
}

public String getEndTime() {
return DateFormatUtils.format(endTime, Constants.DATE_PATTERN);
}

public long getDuration() {
return endTime - startTime;
}

public List<String> getLog() {
return log;
}

public void addLog(String content) {
log.add(
DateFormatUtils.format(System.currentTimeMillis(), Constants.DATE_PATTERN)
+ ": ["
+ id
+ "] "
+ content);
}

public void setEndTime(long endTime) {
this.endTime = endTime;
}

public BenchArgumentVO getArgument() {
return argument;
}

public void setStartTime(long startTime) {
this.startTime = startTime;
}

public void setBenchTrace(BenchTrace benchTrace) {
this.benchTrace = benchTrace;
}

@Override
public void close() throws Exception {
ArrayList<Map.Entry<String, AutoCloseable>> entries = new ArrayList<>(allCloseable.entrySet());
Collections.reverse(entries);
for (Map.Entry<String, AutoCloseable> entry : entries) {
addLog("Closing " + entry.getKey());
entry.getValue().close();
addLog("Closed " + entry.getKey());
}
addLog("Closed all registered closable. Count is: " + allCloseable.size());
allCloseable.clear();
}

public <T extends AutoCloseable> T registerClosable(String description, T closeable) {
allCloseable.put(description, closeable);
addLog("register closable : " + description);
return closeable;
}

public void setGenerateEventNums(long generateEventNums) {
this.generateEventNums = generateEventNums;
}

public void setTotalEventNum(long num) {
this.totalEventNum = num;
}

public String getGenerateEventInfo() {
double percentage = generateEventNums * 100.0 / totalEventNum;
return String.format("%.2f%% (%d/%d)", percentage, generateEventNums, totalEventNum);
}

public long getTotalEventNum() {
return this.totalEventNum;
}

public void setCompletedEventNumsCounter(AtomicLong completedEventNumCounter) {
this.completedEventNumCounter = completedEventNumCounter;
}

public long getCompletedEventNum() {
return completedEventNumCounter.get();
}

public String getCompletedEventInfo() {
double percentage = completedEventNumCounter.get() * 100.0 / totalEventNum;
return String.format(
"%.2f%% (%d/%d)", percentage, completedEventNumCounter.get(), totalEventNum);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server.bench;

import org.apache.uniffle.server.ShuffleServerMetrics;

public class BenchTrace {
private double totalWriteTime;
private double totalWriteBlockSize;
private double totalWriteDataSize;
private boolean isEnd = false;

public void start() {
totalWriteTime = ShuffleServerMetrics.counterTotalWriteTime.get();
totalWriteBlockSize = ShuffleServerMetrics.counterTotalWriteBlockSize.get();
totalWriteDataSize = ShuffleServerMetrics.counterTotalWriteDataSize.get();
}

public void end() {
totalWriteTime = ShuffleServerMetrics.counterTotalWriteTime.get() - totalWriteTime;
totalWriteBlockSize =
ShuffleServerMetrics.counterTotalWriteBlockSize.get() - totalWriteBlockSize;
totalWriteDataSize = ShuffleServerMetrics.counterTotalWriteDataSize.get() - totalWriteDataSize;
isEnd = true;
}

public boolean isEnd() {
return isEnd;
}

public void setEnd(boolean end) {
isEnd = end;
}

public double getTotalWriteBlockSize() {
return totalWriteBlockSize;
}

public double getTotalWriteDataSize() {
return totalWriteDataSize;
}

public double getTotalWriteTime() {
return totalWriteTime;
}

public void setTotalWriteBlockSize(double totalWriteBlockSize) {
this.totalWriteBlockSize = totalWriteBlockSize;
}

public void setTotalWriteDataSize(double totalWriteDataSize) {
this.totalWriteDataSize = totalWriteDataSize;
}

public void setTotalWriteTime(double totalWriteTime) {
this.totalWriteTime = totalWriteTime;
}
}
Loading
Loading