-
Notifications
You must be signed in to change notification settings - Fork 1
/
assign_SOM_wide_unsheared_mpi.py
executable file
·113 lines (96 loc) · 3.43 KB
/
assign_SOM_wide_unsheared_mpi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os
import sys
import numpy as np
import h5py
import yaml
from sompz import NoiseSOM as ns
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
nprocs = comm.Get_size()
if len(sys.argv) == 1:
cfgfile = 'y3_sompz.cfg'
else:
cfgfile = sys.argv[1]
with open(cfgfile, 'r') as fp:
cfg = yaml.safe_load(fp)
# Some hardcoded variables that need a bit more thought
som_type = 'wide'
shear = 'unsheared'
# Read variables from config file
som_wide = cfg['som_wide']
som_dim = cfg['wide_som_dim']
output_path = cfg['out_dir']
wide_file = cfg['wide_file']
wide_h5_path = cfg['wide_h5_path']
bands = cfg['wide_bands']
bands_label = cfg['wide_bands_label']
bands_err_label = cfg['wide_bands_err_label']
# Load data
if rank == 0:
with h5py.File(wide_file, 'r') as f:
fluxes = {}
flux_errors = {}
if cfgfile == 'y3_sompz.cfg':
ind_mcal = f['index']['select']
for i, band in enumerate(bands):
print(i, band)
fluxes[band] = np.array_split(
f[wide_h5_path + shear + bands_label + band][...][ind_mcal],
nprocs
)
flux_errors[band] = np.array_split(
f[wide_h5_path + shear + bands_err_label + band][...][ind_mcal],
nprocs
)
else:
for i, band in enumerate(bands):
print(i, band)
fluxes[band] = np.array_split(
f[wide_h5_path + shear + bands_label + band][...],
nprocs
)
flux_errors[band] = np.array_split(
f[wide_h5_path + shear + bands_err_label + band][...],
nprocs
)
os.system(f'mkdir -p {output_path}/{som_type}_{shear}')
else:
# data = None
fluxes = {b: None for b in bands}
flux_errors = {b: None for b in bands}
# scatter data
for i, band in enumerate(bands):
fluxes[band] = comm.scatter(fluxes[band], root=0)
flux_errors[band] = comm.scatter(flux_errors[band], root=0)
# prepare big data matrix
fluxes_d = np.zeros((fluxes[bands[0]].size, len(bands)))
fluxerrs_d = np.zeros((flux_errors[bands[0]].size, len(bands)))
for i, band in enumerate(bands):
fluxes_d[:, i] = fluxes[band]
fluxerrs_d[:, i] = flux_errors[band]
nTrain = fluxes_d.shape[0]
# Now, instead of training the SOM, we load the SOM we trained:
som_weights = np.load(f'{output_path}/{som_wide}', allow_pickle=True)
hh = ns.hFunc(nTrain, sigma=(30, 1))
metric = ns.AsinhMetric(lnScaleSigma=0.4, lnScaleStep=0.03)
som = ns.NoiseSOM(metric, None, None,
learning=hh,
shape=(som_dim, som_dim),
wrap=False, logF=True,
initialize=som_weights,
minError=0.02)
nsubsets = 10
inds = np.array_split(np.arange(len(fluxes_d)), nsubsets)
# This function checks whether you have already run that subset, and if not it runs the SOM classifier
def assign_som(ind):
print(f'Running rank {rank}, index {ind}')
filename = f'{output_path}/{som_type}_{shear}/som_{som_type}_32_32_1e7_assign_{shear}_{rank}_subsample_{ind}.npz'
if not os.path.exists(filename):
print('Running')
cells_test, _ = som.classify(fluxes_d[inds[ind]], fluxerrs_d[inds[ind]])
np.savez(filename, cells=cells_test)
else:
print('File already exists')
for index in range(nsubsets):
assign_som(index)