Skip to content

Commit

Permalink
集群互踢 && acl不设置无法publish
Browse files Browse the repository at this point in the history
  • Loading branch information
1ssqq1lxr committed Feb 23, 2023
1 parent 5ac4df0 commit 5b193c4
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 20 deletions.
2 changes: 1 addition & 1 deletion smqtt-bootstrap/src/test/java/ClusterNode1.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static void main(String[] args) throws InterruptedException {
BootstrapConfig.
ClusterConfig
.builder()
.enable(false)
.enable(true)
.namespace("smqtt")
.node("node-1")
.port(7773)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
@Data
public class ClusterMessage {

public ClusterMessage(){}

private ClusterEvent clusterEvent;

private Object message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,32 @@ private void loadAclCache() {

@Override
public boolean check(MqttChannel mqttChannel, String source, AclAction action) {
try {
boolean isCheckAcl = Optional.ofNullable(filterAclTopicActions.get(action.name()))
.map(objects -> objects.stream().anyMatch(topic -> BuiltInFunctions.keyMatch2(source, topic)))
.orElse(false);
if (isCheckAcl) {
String subject = String.format(REQUEST_SUBJECT_TEMPLATE, mqttChannel.getClientIdentifier()
, mqttChannel.getAddress().split(":")[0]);
return Optional.ofNullable(enforcer)
.map(ef -> ef.enforce(subject, source, action.name()))
.orElse(true);
}
if (isOpen) {
try {

boolean isCheckAcl = Optional.ofNullable(filterAclTopicActions.get(action.name()))
.map(objects -> objects.stream().anyMatch(topic -> BuiltInFunctions.keyMatch2(source, topic)))
.orElse(false);
if (isCheckAcl) {
String subject = String.format(REQUEST_SUBJECT_TEMPLATE, mqttChannel.getClientIdentifier()
, mqttChannel.getAddress().split(":")[0]);
return Optional.ofNullable(enforcer)
.map(ef -> ef.enforce(subject, source, action.name()))
.orElse(true);
}

} catch (Exception e) {
log.error("acl check error", e);
} catch (Exception e) {
log.error("acl check error", e);
}
}
return isOpen;
return true;
}

@Override
public boolean add(String sub, String source, AclAction action, AclType type) {
return isOpen?Optional.ofNullable(enforcer)
return isOpen ? Optional.ofNullable(enforcer)
.map(ef -> enforcer.addNamedPolicy("p", sub, source, action.name(), type.getDesc()))
.orElse(true):false;
.orElse(true) : false;
}

@Override
Expand All @@ -109,7 +112,7 @@ public boolean delete(String sub, String source, AclAction action, AclType type)

@Override
public List<List<String>> get(PolicyModel policyModel) {
if(!isOpen){
if (!isOpen) {
return Collections.emptyList();
}
return Optional.ofNullable(enforcer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void registry() {
else {
CloseMqttMessage closeMqttMessage =(CloseMqttMessage) clusterMessage.getMessage();
Optional.ofNullable(mqttReceiveContext.getChannelRegistry().get(closeMqttMessage.getClientIdentifier()))
.ifPresent(MqttChannel::disposableClose);
.ifPresent(mqttChannel -> mqttChannel.close().subscribe());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public Mono<Void> parseProtocol(SmqttMessage<MqttConnectMessage> smqttMessage, M
CloseMqttMessage closeMqttMessage = new CloseMqttMessage();
closeMqttMessage.setClientIdentifier(clientIdentifier);
ClusterMessage clusterMessage = new ClusterMessage(closeMqttMessage);
mqttReceiveContext.getClusterRegistry().spreadPublishMessage(clusterMessage);
mqttReceiveContext.getClusterRegistry().spreadPublishMessage(clusterMessage)
.subscribe();

/*registry will message send */
mqttChannel.registryClose(channel -> Optional.ofNullable(mqttChannel.getWill())
Expand Down

0 comments on commit 5b193c4

Please sign in to comment.