Skip to content

Commit

Permalink
Merge pull request #2095 from emilymassahud/calculate-min-dist-betwee…
Browse files Browse the repository at this point in the history
…n-control-points

Calculate min dist between control points in RingExtraction
  • Loading branch information
kif authored Mar 25, 2024
2 parents 2c3084a + 3e8b773 commit ddfa542
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 166 deletions.
167 changes: 88 additions & 79 deletions src/pyFAI/ring_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@


import logging
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Optional

import numpy as np
import numpy
from silx.image import marchingsquares

from .control_points import ControlPoints
Expand All @@ -58,11 +59,7 @@
class RingExtraction:
"""Class to perform extraction of control points from a calibration image."""

def __init__(
self,
single_geometry: SingleGeometry,
massif: Optional[Massif] = None,
):
def __init__(self, single_geometry: SingleGeometry, massif: Optional[Massif] = None):
"""
Parameters
----------
Expand Down Expand Up @@ -91,9 +88,7 @@ def __init__(
self.two_theta_values = self._get_unique_two_theta_values_in_image()

def extract_control_points(
self,
max_number_of_rings: Optional[int] = None,
points_per_degree: float = 1,
self, max_number_of_rings: Optional[int] = None, points_per_degree: float = 1
) -> ControlPoints:
"""
Primary method of RingExtraction class. Runs extract_control_points_in_one_ring for all
Expand All @@ -120,13 +115,9 @@ def extract_control_points(

tasks = {}
with ThreadPoolExecutor() as executor:
for ring_index in range(
min(max_number_of_rings, self.two_theta_values.size)
):
for ring_index in range(min(max_number_of_rings, self.two_theta_values.size)):
future = executor.submit(
self.extract_list_of_peaks_in_one_ring,
ring_index,
points_per_degree,
self.extract_list_of_peaks_in_one_ring, ring_index, points_per_degree
)
tasks[future] = ring_index

Expand All @@ -139,9 +130,7 @@ def extract_control_points(
return control_points

def extract_list_of_peaks_in_one_ring(
self,
ring_index: int,
points_per_degree: float = 1.0
self, ring_index: int, points_per_degree: float = 1.0
) -> Optional[list[tuple[float, float]]]:
"""
Using massif.peaks_from_area, get all pixel coordinates inside a mask of pixels around a
Expand All @@ -158,48 +147,39 @@ def extract_list_of_peaks_in_one_ring(
Returns
-------
Optional[List[Tuple[float]]]
Optional[list[tuple[float, float]]]
peaks at given ring index
"""
marching_squares_algorithm = marchingsquares.MarchingSquaresMergeImpl(
self.two_theta_array,
mask=self.detector.mask,
use_minmax_cache=True,
self.two_theta_array, mask=self.detector.mask, use_minmax_cache=True
)

initial_mask = self._create_mask_around_ring(ring_index)

mask_size = initial_mask.sum(dtype=int)
if mask_size > 0:
final_mask, upper_limit = self._remove_low_intensity_pixels_from_mask(
initial_mask
)
final_mask, upper_limit = self._remove_low_intensity_pixels_from_mask(initial_mask)

pixels_at_two_theta_level = marching_squares_algorithm.find_pixels(
self.two_theta_values[ring_index]
)
seeds = set(
(i[0], i[1])
for i in pixels_at_two_theta_level
if final_mask[i[0], i[1]]
)
seeds = set((i[0], i[1]) for i in pixels_at_two_theta_level if final_mask[i[0], i[1]])

num_points_to_keep = self._calculate_num_of_points_to_keep(
pixels_at_two_theta_level,
points_per_degree,
pixels_at_two_theta_level, points_per_degree
)
if num_points_to_keep > 0:
min_distance_between_control_points = (
len(seeds) / 2.0 / num_points_to_keep
self._calculate_min_distance_between_control_points(
pixels_at_two_theta_level.tolist(), points_per_degree
)
)
# original code has a comment here which seems outdates, but I also didn't
# understand where this calculation comes from, so I just left it as is

logger.info(
"Extracting datapoints for ring %s (2theta = %.2f deg); searching for %i pts"
" out of %i with I>%.1f, dmin=%.1f",
ring_index,
np.degrees(self.two_theta_values[ring_index]),
numpy.degrees(self.two_theta_values[ring_index]),
num_points_to_keep,
final_mask.sum(dtype=int),
upper_limit,
Expand All @@ -216,29 +196,25 @@ def extract_list_of_peaks_in_one_ring(
return None
return None

def _get_unique_two_theta_values_in_image(self) -> np.ndarray:
def _get_unique_two_theta_values_in_image(self) -> numpy.ndarray:
"""
Calculates all two theta values covered by the image with the current detector and geometry
Returns
-------
np.ndarray
numpy.ndarray
array containing all two theta values for calibrant at present wavelength
"""
two_theta_values = np.unique(
np.array([i for i in self.calibrant.get_2th() if i is not None])
two_theta_values = numpy.unique(
numpy.array([i for i in self.calibrant.get_2th() if i is not None])
)
largest_two_theta_in_image = self.two_theta_array.max()
two_theta_values_in_image = np.array(
[
two_theta
for two_theta in two_theta_values
if two_theta <= largest_two_theta_in_image
]
two_theta_values_in_image = numpy.array(
[two_theta for two_theta in two_theta_values if two_theta <= largest_two_theta_in_image]
)
return two_theta_values_in_image

def _create_mask_around_ring(self, ring_index: int) -> np.ndarray:
def _create_mask_around_ring(self, ring_index: int) -> numpy.ndarray:
"""
Creates a mask of valid pixels around each ring, of thickness equal to 1/2 the distance
between the centre of two adjacent rings.
Expand All @@ -251,18 +227,14 @@ def _create_mask_around_ring(self, ring_index: int) -> np.ndarray:
Returns
-------
np.ndarray
numpy.ndarray
Mask of valid pixels around each ring
"""
two_theta_min_max = self._get_two_theta_min_max(ring_index)
two_theta_min, two_theta_max = (
two_theta_min_max["min"],
two_theta_min_max["max"],
)
two_theta_min, two_theta_max = (two_theta_min_max["min"], two_theta_min_max["max"])

initial_mask = np.logical_and(
self.two_theta_array >= two_theta_min,
self.two_theta_array < two_theta_max,
initial_mask = numpy.logical_and(
self.two_theta_array >= two_theta_min, self.two_theta_array < two_theta_max
)
if self.detector.mask is not None:
detector_mask_bool = self.detector.mask.astype(bool)
Expand All @@ -282,15 +254,14 @@ def _get_two_theta_min_max(self, ring_index: int) -> dict[str, float]:
ring number
Returns
-------
dict[str, np.ndarray]
dict[str, float]
dictionary containing minimum and maximum values for two theta
"""
if ring_index == 0:
delta_two_theta = (self.two_theta_values[1] - self.two_theta_values[0]) / 4
else:
delta_two_theta = (
self.two_theta_values[ring_index]
- self.two_theta_values[ring_index - 1]
self.two_theta_values[ring_index] - self.two_theta_values[ring_index - 1]
) / 4

two_theta_min = self.two_theta_values[ring_index] - delta_two_theta
Expand All @@ -299,46 +270,44 @@ def _get_two_theta_min_max(self, ring_index: int) -> dict[str, float]:
return {"min": two_theta_min, "max": two_theta_max}

def _remove_low_intensity_pixels_from_mask(
self, mask: np.ndarray
) -> tuple[np.ndarray, float]:
self, mask: numpy.ndarray
) -> tuple[numpy.ndarray, float]:
"""
Creates a final mask of valid pixels to be used in peak extraction, by removing low
intensity pixels from the initial mask
Parameters
----------
mask : np.ndarray
mask : numpy.ndarray
mask of valid pixels
Returns
-------
tuple[np.ndarray, float]
tuple[numpy.ndarray, float]
final mask of valid pixels, upper limit of intensity to mask
"""
mean, std = self._calculate_mean_and_std_of_intensities_in_mask(mask)
upper_limit = mean + std
high_pixel_intensity_coords = self.image > upper_limit
final_mask = np.logical_and(high_pixel_intensity_coords, mask)
final_mask = numpy.logical_and(high_pixel_intensity_coords, mask)
final_mask_size = final_mask.sum(dtype=int)
minimum_mask_size = (
1000 # copied this from original method, don't know why this number
)
minimum_mask_size = 1000 # copied this from original method, don't know why this number
if final_mask_size < minimum_mask_size:
upper_limit = mean
final_mask = np.logical_and(self.image > upper_limit, mask)
final_mask = numpy.logical_and(self.image > upper_limit, mask)
final_mask_size = final_mask.sum()
return final_mask, upper_limit

def _calculate_mean_and_std_of_intensities_in_mask(
self, mask: np.ndarray
self, mask: numpy.ndarray
) -> tuple[float, float]:
"""
Calculates mean and standard deviation of pixel intensities of image which are emcompassed
by mask.
Parameters
----------
mask : np.ndarray
mask : numpy.ndarray
mask of valid pixels
Returns
Expand All @@ -348,23 +317,21 @@ def _calculate_mean_and_std_of_intensities_in_mask(
"""
flattened_mask = mask.flatten()
flattened_image = self.image.flatten()
pixel_intensities_in_mask = flattened_image[np.where(flattened_mask)]
pixel_intensities_in_mask = flattened_image[numpy.where(flattened_mask)]
mean = pixel_intensities_in_mask.mean()
std = pixel_intensities_in_mask.std()

return mean, std

def _calculate_num_of_points_to_keep(
self,
pixels_at_two_theta_level: np.ndarray,
points_per_degree: float,
self, pixels_at_two_theta_level: numpy.ndarray, points_per_degree: float
) -> int:
"""
Calculate number of azimuthal degrees in ring, then multiply by self.points_per_degree
Parameters
----------
pixels_at_two_theta_level : np.ndarray
pixels_at_two_theta_level : numpy.ndarray
Array of pixels in the image located in the ring at the current two theta value
points_per_degree : float
number of control points per azimuthal degree (increase for better precision)
Expand All @@ -375,14 +342,56 @@ def _calculate_num_of_points_to_keep(
Number of points to keep as control points
"""
image_shape = self.image.shape
azimuthal_angles_array = self.single_geometry.geometry_refinement.chiArray(
image_shape
)
azimuthal_angles_array = self.single_geometry.geometry_refinement.chiArray(image_shape)
azimuthal_degrees_array_in_ring = azimuthal_angles_array[
pixels_at_two_theta_level[:, 0].clip(0, image_shape[0]),
pixels_at_two_theta_level[:, 1].clip(0, image_shape[1]),
]
number_of_azimuthal_degrees = np.unique(
np.rad2deg(azimuthal_degrees_array_in_ring).round()
number_of_azimuthal_degrees = numpy.unique(
numpy.rad2deg(azimuthal_degrees_array_in_ring).round()
).size
return int(number_of_azimuthal_degrees * points_per_degree)

def _calculate_min_distance_between_control_points(
self, pixel_list_at_two_theta_level: list[list[int]], points_per_degree: float
) -> float:
"""
Get a random value from the ring and subtract from beam_centre_coords to calculate ring
radius; grabbing any value from pixel_list_at_two_theta_level will lead to the same radius.
then calculate the distance between 2 points separated by 1 degree in ring circumference:
ring_radius * 0.0174533 (equals a degree in radians)
only good for shapes which are close to circular.
Parameters
----------
pixel_list_at_two_theta_level : list[list[int]]
List of pixels in the image located in the ring at the current two theta value
Returns
-------
float
Minimum accepted distance between 2 control points
"""

# TODO implementation for elliptical rings needs semi-minor axis to ensure the min value for
# arc, and ellipse centre, which differs from beam centre in elliptical projections
beam_centre_coords = self._get_beam_centre_coords()
random_point_in_ring = numpy.array(random.sample(pixel_list_at_two_theta_level, 1)[0])
ring_radius_px = numpy.sqrt(sum(x**2 for x in (random_point_in_ring - beam_centre_coords)))
one_degree = 1
ring_dist_in_one_deg = ring_radius_px * numpy.radians(one_degree)
return ring_dist_in_one_deg / points_per_degree

def _get_beam_centre_coords(self) -> numpy.ndarray:
"""
Use AzimuthalIntegrator's method `getFit2D` to get the pixel coordinates of the beam centre
Returns
-------
numpy.ndarray
beam centre coordinates [y,x], to match pyFAI order.
"""
fit_2d = self.single_geometry.get_ai().getFit2D()
beam_centre_x = fit_2d["centerX"]
beam_centre_y = fit_2d["centerY"]
return numpy.array([beam_centre_y, beam_centre_x])
Loading

0 comments on commit ddfa542

Please sign in to comment.