-
Notifications
You must be signed in to change notification settings - Fork 12
/
eval.py
336 lines (278 loc) · 12.5 KB
/
eval.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
from argparse import ArgumentParser
import json
import numpy as np
import pandas as pd
from pathlib import Path
import pickle
from PIL import Image
from pycocotools import mask
import torch.nn.functional as F
from tqdm import tqdm
from eval_constants import LOCALIZATION_TASKS
from utils import CPU_Unpickler
def calculate_iou(pred_mask, gt_mask, true_pos_only):
"""
Calculate IoU score between two segmentation masks.
Args:
pred_mask (np.array): binary segmentation mask
gt_mask (np.array): binary segmentation mask
Returns:
iou_score (np.float64)
"""
intersection = np.logical_and(pred_mask, gt_mask)
union = np.logical_or(pred_mask, gt_mask)
if true_pos_only:
if np.sum(pred_mask) == 0 or np.sum(gt_mask) == 0:
iou_score = np.nan
else:
iou_score = np.sum(intersection) / (np.sum(union))
else:
if np.sum(union) == 0:
iou_score = np.nan
else:
iou_score = np.sum(intersection) / (np.sum(union))
return iou_score
def get_ious(gt_path, pred_path, true_pos_only):
"""
Returns IoU scores for each combination of CXR and pathology in gt_path and pred_path.
Args:
gt_path (str): path to ground-truth segmentation json file (encoded)
pred_path (str): path to predicted segmentation json file (encoded)
true_pos_only (bool): if true, run evaluation only on the true positive
slice of the dataset (CXRs that contain predicted
and ground-truth segmentations); if false, also
include CXRs with a predicted segmentation but
without a ground-truth segmentation, and include
CXRs with a ground-truth segmentation but without
a predicted segmentation.
Returns:
ious (dict): dict with 10 keys, one for each pathology (task). Values
are lists of all CXR IoU scores for the pathology key.
cxr_ids (list): list of all CXR ids (e.g. 'patient64541_study1_view1_frontal').
"""
with open(gt_path) as f:
gt_dict = json.load(f)
with open(pred_path) as f:
pred_dict = json.load(f)
ious = {}
tasks = sorted(LOCALIZATION_TASKS)
for task in tasks:
cxr_ids = sorted(gt_dict.keys())
print(f'Evaluating {task}')
ious[task] = []
for cxr_id in cxr_ids:
# get ground-truth segmentation mask
gt_item = gt_dict[cxr_id][task]
gt_mask = mask.decode(gt_item)
# get predicted segmentation mask
if cxr_id not in pred_dict:
pred_mask = np.zeros(gt_item['size'])
else:
pred_item = pred_dict[cxr_id][task]
pred_mask = mask.decode(pred_item)
assert gt_mask.shape == pred_mask.shape
iou_score = calculate_iou(pred_mask, gt_mask, true_pos_only)
ious[task].append(iou_score)
# if true_pos_only is false, include cxrs that do not have ground-truth
# segmentations but that have predicted segmentations
if not true_pos_only:
for cxr_id in sorted(pred_dict.keys()):
if cxr_id not in gt_dict:
pred_item = pred_dict[cxr_id][task]
pred_mask = mask.decode(pred_item)
gt_mask = np.zeros(pred_item['size'])
assert gt_mask.shape == pred_mask.shape
iou_score = calculate_iou(pred_mask, gt_mask, true_pos_only)
ious[task].append(iou_score)
cxr_ids.append(cxr_id)
else:
assert len(ious[task]) == len(gt_dict.keys())
return ious, cxr_ids
def bootstrap_metric(df, num_replicates):
"""Create dataframe of bootstrap samples."""
def single_replicate_performances():
sample_ids = np.random.choice(len(df), size=len(df), replace=True)
replicate_performances = {}
df_replicate = df.iloc[sample_ids]
for task in df[LOCALIZATION_TASKS].columns:
performance = df_replicate[task].mean()
replicate_performances[task] = performance
return replicate_performances
all_performances = []
for _ in range(num_replicates):
replicate_performances = single_replicate_performances()
all_performances.append(replicate_performances)
df_performances = pd.DataFrame.from_records(all_performances)
return df_performances
def compute_cis(series, confidence_level):
sorted_perfs = series.sort_values()
lower_index = int(confidence_level/2 * len(sorted_perfs)) - 1
upper_index = int((1 - confidence_level/2) * len(sorted_perfs)) - 1
lower = sorted_perfs.iloc[lower_index].round(3)
upper = sorted_perfs.iloc[upper_index].round(3)
mean = round(sorted_perfs.mean(),3)
return lower, mean, upper
def create_ci_record(perfs, task):
lower, mean, upper = compute_cis(perfs, confidence_level = 0.05)
record = {"name": task,
"lower": lower,
"mean": mean,
"upper": upper}
return record
def get_map(pkl_path):
info = CPU_Unpickler(open(pkl_path, 'rb')).load()
saliency_map = info['map']
img_dims = info['cxr_dims']
map_resized = F.interpolate(saliency_map, size=(img_dims[1],img_dims[0]),
mode='bilinear', align_corners=False)
saliency_map = map_resized.squeeze().squeeze().detach().cpu().numpy()
return saliency_map
def get_hitrates(gt_path, pred_path):
"""
Args:
gt_path (str): directory where ground-truth segmentations are saved (encoded)
pred_path (str): directory with pickle file containing heat maps
"""
with open(gt_path) as f:
gt_dict = json.load(f)
all_paths = sorted(list(Path(pred_path).rglob("*_map.pkl")))
results = {}
for pkl_path in tqdm(all_paths):
# break down path to image name and task
path = str(pkl_path).split('/')
task = path[-1].split('_')[-2]
img_id = '_'.join(path[-1].split('_')[:-2])
if task not in LOCALIZATION_TASKS:
print(f"Invalid task {task}")
continue
if img_id in results:
if task in results[img_id]:
print(f'Check for duplicates for {task} for {img_id}')
break
else:
results[img_id][task] = 0
else:
# get ground truth binary mask
if img_id not in gt_dict:
continue
else:
results[img_id] = {}
results[img_id][task] = 0
gt_item = gt_dict[img_id][task]
gt_mask = mask.decode(gt_item)
# get saliency heatmap
sal_map = get_map(pkl_path)
x = np.unravel_index(np.argmax(sal_map, axis = None), sal_map.shape)[0]
y = np.unravel_index(np.argmax(sal_map, axis = None), sal_map.shape)[1]
assert (gt_mask.shape == sal_map.shape)
if (gt_mask[x][y]==1):
results[img_id][task] = 1
elif (np.sum(gt_mask)==0):
results[img_id][task] = np.nan
all_ids = sorted(gt_dict.keys())
results_df = pd.DataFrame.from_dict(results, orient='index')
return results_df, all_ids
def get_hb_hitrates(gt_path, pred_path):
"""
Args:
gt_path (str): directory where ground-truth segmentations are saved (encoded)
pred_path (str): json file with human annotations for most representative point
"""
with open(pred_path) as f:
hb_salient_pts = json.load(f)
with open(gt_path) as f:
gt_dict = json.load(f)
# evaluate hit
results = {}
all_ids = sorted(gt_dict.keys())
for task in sorted(LOCALIZATION_TASKS):
print(f'Evaluating {task}')
results[task] = []
for img_id in all_ids:
hit = np.nan
gt_item = gt_dict[img_id][task]
gt_mask = mask.decode(gt_item)
if np.sum(gt_mask) !=0:
if img_id in hb_salient_pts and task in hb_salient_pts[img_id]:
salient_pts = hb_salient_pts[img_id][task]
hit = 0
for pt in salient_pts:
if gt_mask[int(pt[1]), int(pt[0])]:
hit = 1
else:
hit = 0
results[task].append(hit)
results['cxr_id'] = all_ids
results_df = pd.DataFrame.from_dict(results)
results_df = results_df.set_index('cxr_id')
return results_df, all_ids
def evaluate(gt_path, pred_path, save_dir, metric, true_pos_only, if_human_benchmark):
"""
Generates and saves three csv files:
-- `{iou/hitmiss}_results.csv`: IoU or hit/miss results for each CXR and
each pathology.
-- `{iou/hitmiss}_bootstrap_results.csv`: 1000 bootstrap samples of IoU
or hit/miss for each pathology.
-- `{miou/hitrate}_summary_results.csv`: mIoU or hit rate 95% bootstrap
confidence intervals for each pathology.
"""
# create save_dir if it does not already exist
Path(save_dir).mkdir(exist_ok=True, parents=True)
if metric == 'iou':
ious, cxr_ids = get_ious(gt_path, pred_path, true_pos_only)
metric_df = pd.DataFrame.from_dict(ious)
elif metric == 'hitmiss' and if_human_benchmark == False:
metric_df, cxr_ids = get_hitrates(gt_path, pred_path)
elif metric == 'hitmiss' and if_human_benchmark == True:
metric_df, cxr_ids = get_hb_hitrates(gt_path, pred_path)
else:
raise ValueError('`metric` must be either `iou` or `hitmiss`')
hb = 'humanbenchmark_' if if_human_benchmark else ''
metric_df['img_id'] = cxr_ids
metric_df = metric_df.sort_values(by='img_id')
metric_df.to_csv(f'{save_dir}/{metric}_{hb}results_per_cxr.csv', index=False)
bs_df = bootstrap_metric(metric_df, 1000)
bs_df.to_csv(f'{save_dir}/{metric}_{hb}bootstrap_results_per_cxr.csv', index=False)
# get confidence intervals
records = []
for task in bs_df.columns:
records.append(create_ci_record(bs_df[task], task))
summary_df = pd.DataFrame.from_records(records).sort_values(by='name')
print(summary_df)
summary_df.to_csv(f'{save_dir}/{metric}_{hb}summary_results.csv', index=False)
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument('--metric', type=str,
help='options are: iou or hitmiss')
parser.add_argument('--gt_path', type=str,
help='directory where ground-truth segmentations are \
saved (encoded)')
parser.add_argument('--pred_path', type=str,
help='json path where predicted segmentations are saved \
(if metric = iou) or directory with pickle files \
containing heat maps (if metric = hitmiss and \
if_human_benchmark = false) or json path with \
human annotations for most representative points \
(if metric = hitmiss and if_human_benchmark = \
true)')
parser.add_argument('--true_pos_only', type=str, default='True',
help='if true, run evaluation only on the true positive \
slice of the dataset (CXRs that contain predicted and \
ground-truth segmentations); if false, also include cxrs \
with a predicted segmentation but without a ground-truth \
segmentation, and include cxrs with a ground-truth\
segmentation but without a predicted segmentation.')
parser.add_argument('--save_dir', default='.',
help='where to save evaluation results')
parser.add_argument('--if_human_benchmark', type=str, default='False',
help='if true, scripts expects human benchmark inputs')
parser.add_argument('--seed', type=int, default=0,
help='random seed to fix')
args = parser.parse_args()
assert args.metric in ['iou', 'hitmiss'], \
"`metric` flag must be either `iou` or `hitmiss`"
assert args.if_human_benchmark in ['True', 'False'], \
"`if_human_benchmark` flag must be either `True` or `False`"
np.random.seed(args.seed)
evaluate(args.gt_path, args.pred_path, args.save_dir, args.metric,
eval(args.true_pos_only), eval(args.if_human_benchmark))