Skip to content

Commit

Permalink
Introduce banned id manager and checker
Browse files Browse the repository at this point in the history
  • Loading branch information
maobaolong committed Nov 18, 2024
1 parent 3cf82d7 commit 572c564
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class AccessManager {
private final CoordinatorConf coordinatorConf;
private final ClusterManager clusterManager;
private final QuotaManager quotaManager;
private final BannedManager bannedManager;
private final Configuration hadoopConf;
private List<AccessChecker> accessCheckers = Lists.newArrayList();

Expand All @@ -53,6 +54,7 @@ public AccessManager(
this.clusterManager = clusterManager;
this.hadoopConf = hadoopConf;
this.quotaManager = quotaManager;
this.bannedManager = new BannedManager(coordinatorConf);
init();
}

Expand Down Expand Up @@ -103,6 +105,10 @@ public QuotaManager getQuotaManager() {
return quotaManager;
}

public BannedManager getBannedManager() {
return bannedManager;
}

public void close() throws IOException {
for (AccessChecker checker : accessCheckers) {
checker.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.JavaUtils;

/** BannedManager is a manager for ban the abnormal app. */
public class BannedManager {
private static final Logger LOG = LoggerFactory.getLogger(BannedManager.class);
// versionId -> bannedIds
private volatile Pair<String, Set<String>> bannedIdsFromRest =
Pair.of("0", Collections.emptySet());
private final Map<String, String> bannedIdsFromServer = JavaUtils.newConcurrentMap();

public BannedManager(CoordinatorConf conf) {
LOG.info("BannedManager initialized successfully.");
}

public boolean checkBanned(String id) {
return bannedIdsFromRest.getValue().contains(id) || bannedIdsFromServer.containsKey(id);
}

public void reloadBannedIdsFromRest(Pair<String, Set<String>> newBannedIds) {
bannedIdsFromRest = newBannedIds;
}

public String getBannedIdsFromRestVersion() {
return bannedIdsFromRest.getKey();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,16 @@ public class CoordinatorConf extends RssBaseConf {
.asList()
.defaultValues("appHeartbeat", "heartbeat")
.withDescription("Exclude record rpc audit operation list, separated by ','");
public static final ConfigOption<String> COORDINATOR_ACCESS_BANNED_ID_PROVIDER =
ConfigOptions.key("rss.coordinator.access.bannedIdProvider")
.stringType()
.noDefaultValue()
.withDescription("Get the banned id from Access banned id provider ");
public static final ConfigOption<String> COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN =
ConfigOptions.key("rss.coordinator.access.bannedIdProviderPattern")
.stringType()
.defaultValue("(.*)")
.withDescription("The regular banned id pattern to extract");

public CoordinatorConf() {}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.access.checker;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;

/**
* AccessBannedChecker maintain a list of banned id and update it periodically, it checks the banned
* id in the access request and reject if the id is in the banned list.
*/
public class AccessBannedChecker extends AbstractAccessChecker {
private static final Logger LOG = LoggerFactory.getLogger(AccessBannedChecker.class);
private final AccessManager accessManager;
private final String bannedIdProviderKey;
private final Pattern bannedIdProviderPattern;

public AccessBannedChecker(AccessManager accessManager) throws Exception {
super(accessManager);
this.accessManager = accessManager;
CoordinatorConf conf = accessManager.getCoordinatorConf();
bannedIdProviderKey = conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER);
String bannedIdProviderRegex =
conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN);
bannedIdProviderPattern = Pattern.compile(bannedIdProviderRegex);

LOG.info(
"Construct BannedChecker. BannedIdProviderKey is {}, pattern is {}",
bannedIdProviderKey,
bannedIdProviderRegex);
}

@Override
public AccessCheckResult check(AccessInfo accessInfo) {
if (accessInfo.getExtraProperties() != null
&& accessInfo.getExtraProperties().containsKey(bannedIdProviderKey)) {
String bannedIdPropertyValue = accessInfo.getExtraProperties().get(bannedIdProviderKey);
Matcher matcher = bannedIdProviderPattern.matcher(bannedIdPropertyValue);
if (matcher.find()) {
String bannedId = matcher.group(1);
if (accessManager.getBannedManager() != null
&& accessManager.getBannedManager().checkBanned(bannedId)) {
String msg = String.format("Denied by BannedChecker, accessInfo[%s].", accessInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("BannedIdPropertyValue is {}, {}", bannedIdPropertyValue, msg);
}
CoordinatorMetrics.counterTotalBannedDeniedRequest.inc();
return new AccessCheckResult(false, msg);
}
}
}

return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class CoordinatorMetrics {
private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request";
private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request";
private static final String TOTAL_BANNED_DENIED_REQUEST = "total_banned_denied_request";
public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_";
public static final String APP_NUM_TO_USER = "app_num";
public static final String USER_LABEL = "user_name";
Expand All @@ -57,6 +58,7 @@ public class CoordinatorMetrics {
public static Counter counterTotalCandidatesDeniedRequest;
public static Counter counterTotalQuotaDeniedRequest;
public static Counter counterTotalLoadDeniedRequest;
public static Counter counterTotalBannedDeniedRequest;
public static final Map<String, Gauge> GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap();

private static MetricsManager metricsManager;
Expand Down Expand Up @@ -118,5 +120,6 @@ private static void setUpMetrics() {
metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST);
counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST);
counterTotalBannedDeniedRequest = metricsManager.addCounter(TOTAL_BANNED_DENIED_REQUEST);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ public Class<CoordinatorServerResource> getCoordinatorServerResource() {
public Class<ApplicationResource> getApplicationResource() {
return ApplicationResource.class;
}

@Path("banned")
public Class<BannedResource> getBannedResource() {
return BannedResource.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.web.resource;

import java.util.Set;
import javax.servlet.ServletContext;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hbase.thirdparty.javax.ws.rs.GET;
import org.apache.hbase.thirdparty.javax.ws.rs.POST;
import org.apache.hbase.thirdparty.javax.ws.rs.Path;
import org.apache.hbase.thirdparty.javax.ws.rs.Produces;
import org.apache.hbase.thirdparty.javax.ws.rs.core.Context;
import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.web.resource.BaseResource;
import org.apache.uniffle.common.web.resource.Response;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.BannedManager;

@Produces({MediaType.APPLICATION_JSON})
public class BannedResource extends BaseResource {
private static final Logger LOG = LoggerFactory.getLogger(BannedResource.class);
@Context protected ServletContext servletContext;

@POST
@Path("/reload")
public Response<String> reload(String versionId, Set<String> bannedIds) {
BannedManager bannedManager = getAccessManager().getBannedManager();
if (bannedManager != null && bannedIds != null) {
bannedManager.reloadBannedIdsFromRest(Pair.of(versionId, bannedIds));
LOG.info("reload {} banned ids.", bannedIds.size());
return Response.success("success");
} else {
return Response.fail("bannedManager is not initialized or bannedIds is null.");
}
}

@GET
@Path("/version")
public Response<String> version() {
BannedManager bannedManager = getAccessManager().getBannedManager();
if (bannedManager != null) {
String version = bannedManager.getBannedIdsFromRestVersion();
LOG.info("Get version of banned ids is {}.", version);
return Response.success(version);
} else {
return Response.fail("bannedManager is not initialized.");
}
}

private AccessManager getAccessManager() {
return (AccessManager) servletContext.getAttribute(AccessManager.class.getCanonicalName());
}
}
Loading

0 comments on commit 572c564

Please sign in to comment.