Skip to content

Commit

Permalink
Merge pull request #21 from ispras/Q-attack
Browse files Browse the repository at this point in the history
Q attack
  • Loading branch information
LukyanovKirillML authored Oct 14, 2024
2 parents 704a10a + 220dc27 commit afa3b24
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 3 deletions.
110 changes: 109 additions & 1 deletion experiments/attack_defense_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,116 @@ def test_nettack_evasion():
print(f"info_before_evasion_attack: {info_before_evasion_attack}")
print(f"info_after_evasion_attack: {info_after_evasion_attack}")

def test_qattack():
from attacks.QAttack import qattack
my_device = device('cpu')

# Load dataset
# full_name = ("single-graph", "Planetoid", 'Cora')
full_name = ('single-graph', 'pytorch-geometric-other', 'KarateClub')
dataset, data, results_dataset_path = DatasetManager.get_by_full_name(
full_name=full_name,
dataset_ver_ind=0
)

# Train model on original dataset and remember the model metric and node predicted probability
gcn_gcn = model_configs_zoo(dataset=dataset, model_name='gcn_gcn')

manager_config = ConfigPattern(
_config_class="ModelManagerConfig",
_config_kwargs={
"mask_features": [],
"optimizer": {
"_class_name": "Adam",
"_config_kwargs": {},
}
}
)

gnn_model_manager = FrameworkGNNModelManager(
gnn=gcn_gcn,
dataset_path=results_dataset_path,
manager_config=manager_config,
modification=ModelModificationConfig(model_ver_ind=0, epochs=0)
)

gnn_model_manager.gnn.to(my_device)

num_steps = 100
gnn_model_manager.train_model(gen_dataset=dataset,
steps=num_steps,
save_model_flag=False)

evasion_attack_config = ConfigPattern(
_class_name="QAttack",
_import_path=EVASION_ATTACK_PARAMETERS_PATH,
_config_class="EvasionAttackConfig",
_config_kwargs={
}
)

gnn_model_manager.set_evasion_attacker(evasion_attack_config=evasion_attack_config)

# Evaluate model

# acc_train = gnn_model_manager.evaluate_model(gen_dataset=dataset,
# metrics=[Metric("Accuracy", mask='train')])['train']['Accuracy']


acc_test = gnn_model_manager.evaluate_model(gen_dataset=dataset,
metrics=[Metric("Accuracy", mask='test')])['test']['Accuracy']
# print(f"Accuracy on train: {acc_train}. Accuracy on test: {acc_test}")
print(f"Accuracy on test: {acc_test}")

# Node for attack
# node_idx = 0
#
# # Model prediction on a node before an evasion attack on it
# gnn_model_manager.gnn.eval()
# with torch.no_grad():
# probabilities = torch.exp(gnn_model_manager.gnn(dataset.data.x, dataset.data.edge_index))
#
# predicted_class = probabilities[node_idx].argmax().item()
# predicted_probability = probabilities[node_idx][predicted_class].item()
# real_class = dataset.data.y[node_idx].item()

# info_before_evasion_attack = {"node_idx": node_idx,
# "predicted_class": predicted_class,
# "predicted_probability": predicted_probability,
# "real_class": real_class}

# Attack config


#dataset = gnn_model_manager.evasion_attacker.attack(gnn_model_manager, dataset, None)

# Attack
# gnn_model_manager.evaluate_model(gen_dataset=dataset, metrics=[Metric("F1", mask='test', average='macro')])
#
# acc_test = gnn_model_manager.evaluate_model(gen_dataset=dataset,
# metrics=[Metric("Accuracy", mask='test')])['test']['Accuracy']
# print(f"Accuracy on test after attack: {acc_test}")

# # Model prediction on a node after an evasion attack on it
# with torch.no_grad():
# probabilities = torch.exp(gnn_model_manager.gnn(gnn_model_manager.evasion_attacker.attack_diff.data.x,
# gnn_model_manager.evasion_attacker.attack_diff.data.edge_index))
#
# predicted_class = probabilities[node_idx].argmax().item()
# predicted_probability = probabilities[node_idx][predicted_class].item()
# real_class = dataset.data.y[node_idx].item()
#
# info_after_evasion_attack = {"node_idx": node_idx,
# "predicted_class": predicted_class,
# "predicted_probability": predicted_probability,
# "real_class": real_class}
#
# print(f"info_before_evasion_attack: {info_before_evasion_attack}")
# print(f"info_after_evasion_attack: {info_after_evasion_attack}")


if __name__ == '__main__':
#test_attack_defense()
torch.manual_seed(5000)
test_meta()
#test_meta()
test_qattack()
9 changes: 8 additions & 1 deletion metainfo/evasion_attack_parameters.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
"perturb_structure": ["perturb_structure", "bool", true, {}, "Indicates whether the structure can be changed"],
"direct": ["direct", "bool", true, {}, "Indicates whether to directly modify edges/features of the node attacked or only those of influencers"],
"n_influencers": ["n_influencers", "int", 0, {"min": 0, "step": 1}, "Number of influencing nodes. Will be ignored if direct is True"]
}
},
"QAttack": {
"population_size": ["Population size", "int", 50, {"min": 1, "step": 1}, "Number of genes in population"],
"individual_size": ["Individual size", "int", 30, {"min": 1, "step": 1}, "Number of rewiring operations within one gene"],
"generations" : ["Generations", "int", 50, {"min": 0, "step": 1}, "Number of generations for genetic algorithm"],
"prob_cross": ["Probability for crossover", "float", 0.5, {"min": 0, "max": 1, "step": 0.01}, "Probability of crossover between two genes"],
"prob_mutate": ["Probability for mutation", "float", 0.02, {"min": 0, "max": 1, "step": 0.01}, "Probability of gene mutation"]
}
}

239 changes: 239 additions & 0 deletions src/attacks/QAttack/qattack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
import copy
import math
import numpy as np
import random

from tqdm import tqdm
from attacks.evasion_attacks import EvasionAttacker
from attacks.QAttack.utils import get_adj_list, from_adj_list, adj_list_oriented_to_non_oriented

class QAttacker(EvasionAttacker):
name = "QAttack"

def __init__(self, population_size, individual_size, generations, prob_cross, prob_mutate, **kwargs):
super().__init__(**kwargs)
self.population_size = population_size
self.individual_size = individual_size
self.generations = generations
self.prob_cross = prob_cross
self.prob_mutate = prob_mutate

def init(self, gen_dataset):
"""
Init first population:
gen_dataset - graph-dataset
population_size - size of population
individual_size - amount of rewiring actions in one gene/individual
"""
self.population = []

self.adj_list = get_adj_list(gen_dataset)

for i in tqdm(range(self.population_size), desc='Init first population:'):
non_isolated_nodes = set(gen_dataset.dataset.edge_index[0].tolist()).union(
set(gen_dataset.dataset.edge_index[1].tolist()))
selected_nodes = np.random.choice(list(self.adj_list.keys()), size=self.individual_size, replace=False)
gene = {}
for n in selected_nodes:
connected_nodes = set(self.adj_list[n])
connected_nodes.add(n)
addition_nodes = non_isolated_nodes.difference(connected_nodes)
gene[n] = {'add': np.random.choice(list(addition_nodes), size=1),
'del': np.random.choice(list(self.adj_list[n]), size=1)}
self.population.append(gene)

def fitness(self, model, gen_dataset):
"""
Calculate fitness function with node classification
"""

fit_scores = []
for i in range(self.population_size):
# Get rewired dataset
dataset = copy.deepcopy(gen_dataset.dataset)
rewiring = self.population[i]
adj_list = get_adj_list(dataset)
for n in rewiring.keys():
adj_list[n] = list(set(adj_list[n]).union({int(rewiring[n]['add'])}).difference({int(rewiring[n]['del'])}))
dataset.edge_index = from_adj_list(adj_list)

# Get labels from black-box
labels = model.gnn.get_answer(dataset.x, dataset.edge_index)
labeled_nodes = {n: labels.tolist()[n-1] for n in adj_list.keys()} # FIXME check order for labels and node id consistency

# Calculate modularity
Q = self.modularity(adj_list, labeled_nodes)
fit_scores.append(1 / math.exp(Q))
return fit_scores

def fitness_individual(self, model, gen_dataset, gene):
dataset = copy.deepcopy(gen_dataset.dataset)
rewiring = gene
adj_list = get_adj_list(dataset)
for n in rewiring.keys():
adj_list[n] = list(set(adj_list[n]).union(set(rewiring[n]['add'])).difference(set(rewiring[n]['del'])))
dataset.edge_index = from_adj_list(adj_list)

# Get labels from black-box
labels = model.gnn.get_answer(dataset.x, dataset.edge_index)
labeled_nodes = {n: labels.tolist()[n-1] for n in adj_list.keys()} # FIXME check order for labels and node id consistency

# Calculate modularity
Q = self.modularity(adj_list, labeled_nodes)
return 1 / math.exp(Q)

@staticmethod
def modularity(adj_list, labeled_nodes):
"""
Calculation of graph modularity with specified node partition on communities
"""
# TODO implement oriented-modularity

inc = dict([])
deg = dict([])

links = 0
non_oriented_adj_list = adj_list_oriented_to_non_oriented(adj_list)
for k, v in non_oriented_adj_list.items():
links += len(v)
if links == 0:
raise ValueError("A graph without link has an undefined modularity")
links //= 2

for node, edges in non_oriented_adj_list.items():
com = labeled_nodes[node]
deg[com] = deg.get(com, 0.) + len(non_oriented_adj_list[node])
for neighbor in edges:
edge_weight = 1 # TODO weighted graph to be implemented
if labeled_nodes[neighbor] == com:
if neighbor == node:
inc[com] = inc.get(com, 0.) + float(edge_weight)
else:
inc[com] = inc.get(com, 0.) + float(edge_weight) / 2.

res = 0.
for com in set(labeled_nodes.values()):
res += (inc.get(com, 0.) / links) - \
(deg.get(com, 0.) / (2. * links)) ** 2
return res

def selection(self, model_manager, gen_dataset):
fit_scores = self.fitness(model_manager, gen_dataset)
probs = [i / sum(fit_scores) for i in fit_scores]
selected_population = copy.deepcopy(self.population)
for i in range(self.population_size):
selected_population[i] = copy.deepcopy(self.population[np.random.choice(
self.population_size, 1, False, probs)[0]])
self.population = selected_population

def crossover(self):
for i in range(0, self.population_size // 2, 2):
parent_1 = self.population[i]
parent_2 = self.population[i + 1]
crossover_prob = np.random.random()
if crossover_prob <= self.prob_cross:
self.population[i * 2], self.population[i * 2 + 1] = self.gene_crossover(parent_1, parent_2)
else:
self.population[i * 2], self.population[i * 2 + 1] = (copy.deepcopy(self.population[i * 2]),
copy.deepcopy(self.population[i * 2 + 1]))

def gene_crossover(self, parent_1, parent_2):
parent_1_set = set(parent_1.keys())
parent_2_set = set(parent_2.keys())

parent_1_unique = parent_1_set.difference(parent_2_set)
parent_2_unique = parent_2_set.difference(parent_1_set)

parent_1_cross = list(parent_1_unique)
parent_2_cross = list(parent_2_unique)

assert len(parent_1_cross) == len(parent_2_cross)
if len(parent_1_cross) == 0:
return parent_1, parent_2
n = np.random.randint(1, len(parent_1_cross) + 1)
parent_1_cross = random.sample(parent_1_cross, n)
parent_2_cross = random.sample(parent_2_cross, n)

parent_1_set.difference_update(parent_1_cross)
parent_2_set.difference_update(parent_2_cross)

parent_1_set.update(parent_2_cross)
parent_2_set.update(parent_1_cross)

child_1 = {}
child_2 = {}
for n in parent_1_set:
if n in parent_1.keys():
child_1[n] = parent_1[n]
else:
child_1[n] = parent_2[n]
for n in parent_2_set:
if n in parent_2.keys():
child_2[n] = parent_2[n]
else:
child_2[n] = parent_1[n]

return child_1,child_2

def mutation(self, gen_dataset):
for i in range(self.population_size):
keys = self.population[i].keys()
for n in list(keys):
mutation_prob = np.random.random()
if mutation_prob <= self.prob_mutate:
mut_type = np.random.randint(3)
dataset = copy.deepcopy(gen_dataset.dataset)
rewiring = self.population[i]
adj_list = get_adj_list(dataset)
for n in rewiring.keys():
adj_list[n] = list(
set(adj_list[n]).union(set([int(rewiring[n]['add'])])).difference(set([int(rewiring[n]['del'])])))
dataset.edge_index = from_adj_list(adj_list)
non_isolated_nodes = set(gen_dataset.dataset.edge_index[0].tolist()).union(
set(gen_dataset.dataset.edge_index[1].tolist()))
if mut_type == 0:
# add mutation
connected_nodes = set(self.adj_list[n])
connected_nodes.add(n)
addition_nodes = non_isolated_nodes.difference(connected_nodes)
self.population[i][n]['add'] = np.random.choice(list(addition_nodes), 1)
elif mut_type == 1:
# del mutation
self.population[i][n]['del'] = np.random.choice(list(adj_list[n]), 1)
else:
selected_nodes = set(self.population[i].keys())
non_selected_nodes = non_isolated_nodes.difference(selected_nodes)
new_node = np.random.choice(list(non_selected_nodes), size=1, replace=False)[0]
self.population[i].pop(n)
addition_nodes = non_isolated_nodes.difference(set(self.adj_list[new_node]))
self.population[i][new_node] = {}
self.population[i][new_node]['add'] = np.random.choice(list(addition_nodes), 1)
self.population[i][new_node]['del'] = np.random.choice(list(adj_list[new_node]), 1)

def elitism(self, model, gen_dataset):
fit_scores = list(enumerate(self.fitness(model, gen_dataset)))
fit_scores = sorted(fit_scores, key=lambda x: x[1])
sort_order = [x[0] for x in fit_scores]
self.population = [self.population[i] for i in sort_order]
elitism_size = int(0.1 * self.population_size)
self.population[:elitism_size] = self.population[-elitism_size:]
return self.population[-1]


def attack(self, model_manager, gen_dataset, mask_tensor):
self.init(gen_dataset)

for i in tqdm(range(self.generations), desc='Attack iterations:', position=0, leave=True):
self.selection(model_manager, gen_dataset)
self.crossover()
self.mutation(gen_dataset)
best_offspring = self.elitism(model_manager, gen_dataset)

rewiring = best_offspring
adj_list = get_adj_list(gen_dataset)
for n in rewiring.keys():
adj_list[n] = list(
set(adj_list[n]).union(set([int(rewiring[n]['add'])])).difference(set([int(rewiring[n]['del'])])))

gen_dataset.dataset.data.edge_index = from_adj_list(adj_list)
return gen_dataset
Loading

0 comments on commit afa3b24

Please sign in to comment.