From 572c564eb8a34256ea86b5a5e5e29b9c7c48c683 Mon Sep 17 00:00:00 2001 From: baoloongmao Date: Mon, 18 Nov 2024 20:04:45 +0800 Subject: [PATCH] Introduce banned id manager and checker --- .../uniffle/coordinator/AccessManager.java | 6 + .../uniffle/coordinator/BannedManager.java | 53 ++++++++ .../uniffle/coordinator/CoordinatorConf.java | 10 ++ .../access/checker/AccessBannedChecker.java | 83 +++++++++++++ .../metric/CoordinatorMetrics.java | 3 + .../coordinator/web/resource/APIResource.java | 5 + .../web/resource/BannedResource.java | 72 +++++++++++ .../checker/AccessBannedCheckerTest.java | 114 ++++++++++++++++++ 8 files changed, 346 insertions(+) create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/BannedManager.java create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessBannedChecker.java create mode 100644 coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BannedResource.java create mode 100644 coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessBannedCheckerTest.java diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java index c6d34b5b41..2feebcf6e3 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java @@ -40,6 +40,7 @@ public class AccessManager { private final CoordinatorConf coordinatorConf; private final ClusterManager clusterManager; private final QuotaManager quotaManager; + private final BannedManager bannedManager; private final Configuration hadoopConf; private List accessCheckers = Lists.newArrayList(); @@ -53,6 +54,7 @@ public AccessManager( this.clusterManager = clusterManager; this.hadoopConf = hadoopConf; this.quotaManager = quotaManager; + this.bannedManager = new BannedManager(coordinatorConf); init(); } @@ -103,6 +105,10 @@ public QuotaManager getQuotaManager() { return quotaManager; } + public BannedManager getBannedManager() { + return bannedManager; + } + public void close() throws IOException { for (AccessChecker checker : accessCheckers) { checker.close(); diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/BannedManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/BannedManager.java new file mode 100644 index 0000000000..a0ffcd0cfb --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/BannedManager.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.util.JavaUtils; + +/** BannedManager is a manager for ban the abnormal app. */ +public class BannedManager { + private static final Logger LOG = LoggerFactory.getLogger(BannedManager.class); + // versionId -> bannedIds + private volatile Pair> bannedIdsFromRest = + Pair.of("0", Collections.emptySet()); + private final Map bannedIdsFromServer = JavaUtils.newConcurrentMap(); + + public BannedManager(CoordinatorConf conf) { + LOG.info("BannedManager initialized successfully."); + } + + public boolean checkBanned(String id) { + return bannedIdsFromRest.getValue().contains(id) || bannedIdsFromServer.containsKey(id); + } + + public void reloadBannedIdsFromRest(Pair> newBannedIds) { + bannedIdsFromRest = newBannedIds; + } + + public String getBannedIdsFromRestVersion() { + return bannedIdsFromRest.getKey(); + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index 5b1f64ff16..933b968e5a 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -256,6 +256,16 @@ public class CoordinatorConf extends RssBaseConf { .asList() .defaultValues("appHeartbeat", "heartbeat") .withDescription("Exclude record rpc audit operation list, separated by ','"); + public static final ConfigOption COORDINATOR_ACCESS_BANNED_ID_PROVIDER = + ConfigOptions.key("rss.coordinator.access.bannedIdProvider") + .stringType() + .noDefaultValue() + .withDescription("Get the banned id from Access banned id provider "); + public static final ConfigOption COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN = + ConfigOptions.key("rss.coordinator.access.bannedIdProviderPattern") + .stringType() + .defaultValue("(.*)") + .withDescription("The regular banned id pattern to extract"); public CoordinatorConf() {} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessBannedChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessBannedChecker.java new file mode 100644 index 0000000000..60d39a8add --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessBannedChecker.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.access.checker; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.util.Constants; +import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.access.AccessCheckResult; +import org.apache.uniffle.coordinator.access.AccessInfo; +import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; + +/** + * AccessBannedChecker maintain a list of banned id and update it periodically, it checks the banned + * id in the access request and reject if the id is in the banned list. + */ +public class AccessBannedChecker extends AbstractAccessChecker { + private static final Logger LOG = LoggerFactory.getLogger(AccessBannedChecker.class); + private final AccessManager accessManager; + private final String bannedIdProviderKey; + private final Pattern bannedIdProviderPattern; + + public AccessBannedChecker(AccessManager accessManager) throws Exception { + super(accessManager); + this.accessManager = accessManager; + CoordinatorConf conf = accessManager.getCoordinatorConf(); + bannedIdProviderKey = conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER); + String bannedIdProviderRegex = + conf.get(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN); + bannedIdProviderPattern = Pattern.compile(bannedIdProviderRegex); + + LOG.info( + "Construct BannedChecker. BannedIdProviderKey is {}, pattern is {}", + bannedIdProviderKey, + bannedIdProviderRegex); + } + + @Override + public AccessCheckResult check(AccessInfo accessInfo) { + if (accessInfo.getExtraProperties() != null + && accessInfo.getExtraProperties().containsKey(bannedIdProviderKey)) { + String bannedIdPropertyValue = accessInfo.getExtraProperties().get(bannedIdProviderKey); + Matcher matcher = bannedIdProviderPattern.matcher(bannedIdPropertyValue); + if (matcher.find()) { + String bannedId = matcher.group(1); + if (accessManager.getBannedManager() != null + && accessManager.getBannedManager().checkBanned(bannedId)) { + String msg = String.format("Denied by BannedChecker, accessInfo[%s].", accessInfo); + if (LOG.isDebugEnabled()) { + LOG.debug("BannedIdPropertyValue is {}, {}", bannedIdPropertyValue, msg); + } + CoordinatorMetrics.counterTotalBannedDeniedRequest.inc(); + return new AccessCheckResult(false, msg); + } + } + } + + return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE); + } + + @Override + public void close() {} +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java index a97892526e..19fd226c8f 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java @@ -43,6 +43,7 @@ public class CoordinatorMetrics { private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request"; private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request"; private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request"; + private static final String TOTAL_BANNED_DENIED_REQUEST = "total_banned_denied_request"; public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_"; public static final String APP_NUM_TO_USER = "app_num"; public static final String USER_LABEL = "user_name"; @@ -57,6 +58,7 @@ public class CoordinatorMetrics { public static Counter counterTotalCandidatesDeniedRequest; public static Counter counterTotalQuotaDeniedRequest; public static Counter counterTotalLoadDeniedRequest; + public static Counter counterTotalBannedDeniedRequest; public static final Map GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap(); private static MetricsManager metricsManager; @@ -118,5 +120,6 @@ private static void setUpMetrics() { metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST); counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST); counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST); + counterTotalBannedDeniedRequest = metricsManager.addCounter(TOTAL_BANNED_DENIED_REQUEST); } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java index 324820e657..6cf9ba980e 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/APIResource.java @@ -43,4 +43,9 @@ public Class getCoordinatorServerResource() { public Class getApplicationResource() { return ApplicationResource.class; } + + @Path("banned") + public Class getBannedResource() { + return BannedResource.class; + } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BannedResource.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BannedResource.java new file mode 100644 index 0000000000..5c97da5703 --- /dev/null +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/web/resource/BannedResource.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.web.resource; + +import java.util.Set; +import javax.servlet.ServletContext; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hbase.thirdparty.javax.ws.rs.GET; +import org.apache.hbase.thirdparty.javax.ws.rs.POST; +import org.apache.hbase.thirdparty.javax.ws.rs.Path; +import org.apache.hbase.thirdparty.javax.ws.rs.Produces; +import org.apache.hbase.thirdparty.javax.ws.rs.core.Context; +import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.uniffle.common.web.resource.BaseResource; +import org.apache.uniffle.common.web.resource.Response; +import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.BannedManager; + +@Produces({MediaType.APPLICATION_JSON}) +public class BannedResource extends BaseResource { + private static final Logger LOG = LoggerFactory.getLogger(BannedResource.class); + @Context protected ServletContext servletContext; + + @POST + @Path("/reload") + public Response reload(String versionId, Set bannedIds) { + BannedManager bannedManager = getAccessManager().getBannedManager(); + if (bannedManager != null && bannedIds != null) { + bannedManager.reloadBannedIdsFromRest(Pair.of(versionId, bannedIds)); + LOG.info("reload {} banned ids.", bannedIds.size()); + return Response.success("success"); + } else { + return Response.fail("bannedManager is not initialized or bannedIds is null."); + } + } + + @GET + @Path("/version") + public Response version() { + BannedManager bannedManager = getAccessManager().getBannedManager(); + if (bannedManager != null) { + String version = bannedManager.getBannedIdsFromRestVersion(); + LOG.info("Get version of banned ids is {}.", version); + return Response.success(version); + } else { + return Response.fail("bannedManager is not initialized."); + } + } + + private AccessManager getAccessManager() { + return (AccessManager) servletContext.getAttribute(AccessManager.class.getCanonicalName()); + } +} diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessBannedCheckerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessBannedCheckerTest.java new file mode 100644 index 0000000000..d026feaa22 --- /dev/null +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/checker/AccessBannedCheckerTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.coordinator.checker; + +import java.io.File; +import java.util.Collections; + +import com.google.common.collect.Sets; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import org.apache.uniffle.coordinator.AccessManager; +import org.apache.uniffle.coordinator.BannedManager; +import org.apache.uniffle.coordinator.CoordinatorConf; +import org.apache.uniffle.coordinator.access.AccessInfo; +import org.apache.uniffle.coordinator.access.checker.AccessBannedChecker; +import org.apache.uniffle.coordinator.metric.CoordinatorMetrics; + +import static java.lang.Thread.sleep; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AccessBannedCheckerTest { + + @BeforeEach + public void setUp() { + CoordinatorMetrics.register(); + } + + @AfterEach + public void clear() { + CoordinatorMetrics.clear(); + } + + @Test + public void test(@TempDir File tempDir) throws Exception { + CoordinatorConf conf = new CoordinatorConf(); + conf.set(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER, "test.key"); + conf.set(CoordinatorConf.COORDINATOR_ACCESS_BANNED_ID_PROVIDER_REG_PATTERN, "(.*)_.*"); + String checkerClassName = AccessBannedChecker.class.getName(); + conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), checkerClassName); + AccessManager accessManager = new AccessManager(conf, null, null, new Configuration()); + BannedManager bannedManager = accessManager.getBannedManager(); + bannedManager.reloadBannedIdsFromRest(Pair.of("version1", Sets.newHashSet("2", "9527", "135"))); + AccessBannedChecker checker = (AccessBannedChecker) accessManager.getAccessCheckers().get(0); + sleep(1200); + assertFalse( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "9527_1234"), + "")) + .isSuccess()); + assertFalse( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "135_1234"), + "")) + .isSuccess()); + assertTrue( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "2"), + "")) + .isSuccess()); + assertTrue( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "1"), + "")) + .isSuccess()); + assertTrue( + checker + .check( + new AccessInfo( + "DummyAccessId", + Sets.newHashSet(), + Collections.singletonMap("test.key", "1_2"), + "")) + .isSuccess()); + + checker.close(); + } +}