"""Functions for processing data."""
from __future__ import annotations
import logging
from collections import defaultdict
from pathlib import Path
import numpy as np
import numpy.typing as npt
import pandas as pd
from art import tprint
from topostats import __version__
from topostats.filters import Filters
from topostats.grains import Grains
from topostats.grainstats import GrainStats
from topostats.io import get_out_path, save_topostats_file
from topostats.logs.logs import LOGGER_NAME
from topostats.plotting import plot_crossing_linetrace_halfmax
from topostats.plottingfuncs import Images, add_pixel_to_nm_to_plotting_config
from topostats.statistics import image_statistics
from topostats.tracing.disordered_tracing import trace_image_disordered
from topostats.tracing.nodestats import nodestats_image
from topostats.tracing.ordered_tracing import ordered_tracing_image
from topostats.tracing.splining import splining_image
from topostats.utils import create_empty_dataframe
# pylint: disable=broad-except
# pylint: disable=line-too-long
# pylint: disable=too-many-arguments
# pylint: disable=too-many-branches
# pylint: disable=too-many-lines
# pylint: disable=too-many-locals
# pylint: disable=too-many-positional-arguments
# pylint: disable=too-many-statements
# pylint: disable=too-many-nested-blocks
# pylint: disable=unnecessary-dict-index-lookup
# pylint: disable=too-many-lines
LOGGER = logging.getLogger(LOGGER_NAME)
[docs]
def run_filters(
unprocessed_image: npt.NDArray,
pixel_to_nm_scaling: float,
filename: str,
filter_out_path: Path,
core_out_path: Path,
filter_config: dict,
plotting_config: dict,
) -> npt.NDArray | None:
"""
Filter and flatten an image. Optionally plots the results, returning the flattened image.
Parameters
----------
unprocessed_image : npt.NDArray
Image to be flattened.
pixel_to_nm_scaling : float
Scaling factor for converting pixel length scales to nanometres.
ie the number of pixels per nanometre.
filename : str
File name for the image.
filter_out_path : Path
Output directory for step-by-step flattening plots.
core_out_path : Path
General output directory for outputs such as the flattened image.
filter_config : dict
Dictionary of configuration for the Filters class to use when initialised.
plotting_config : dict
Dictionary of configuration for plotting output images.
Returns
-------
npt.NDArray | None
Either a numpy array of the flattened image, or None if an error occurs or
flattening is disabled in the configuration.
"""
if filter_config["run"]:
filter_config.pop("run")
LOGGER.debug(f"[{filename}] Image dimensions: {unprocessed_image.shape}")
LOGGER.info(f"[{filename}] : *** Filtering ***")
filters = Filters(
image=unprocessed_image,
filename=filename,
pixel_to_nm_scaling=pixel_to_nm_scaling,
**filter_config,
)
filters.filter_image()
# Optionally plot filter stage
if plotting_config["run"]:
plotting_config.pop("run")
LOGGER.info(f"[{filename}] : Plotting Filtering Images")
if plotting_config["image_set"] == "all":
filter_out_path.mkdir(parents=True, exist_ok=True)
LOGGER.debug(f"[{filename}] : Target filter directory created : {filter_out_path}")
# Generate plots
for plot_name, array in filters.images.items():
if plot_name not in ["scan_raw"]:
if plot_name == "extracted_channel":
array = np.flipud(array.pixels)
plotting_config["plot_dict"][plot_name]["output_dir"] = (
core_out_path if plotting_config["plot_dict"][plot_name]["core_set"] else filter_out_path
)
try:
Images(array, **plotting_config["plot_dict"][plot_name]).plot_and_save()
Images(array, **plotting_config["plot_dict"][plot_name]).plot_histogram_and_save()
except AttributeError:
LOGGER.info(f"[{filename}] Unable to generate plot : {plot_name}")
plotting_config["run"] = True
# Always want the 'z_threshed' plot (aka "Height Thresholded") but in the core_out_path
plot_name = "z_threshed"
plotting_config["plot_dict"][plot_name]["output_dir"] = core_out_path
Images(
filters.images["gaussian_filtered"],
filename=filename,
**plotting_config["plot_dict"][plot_name],
).plot_and_save()
LOGGER.info(f"[{filename}] : Filters stage completed successfully.")
return filters.images["gaussian_filtered"]
# Otherwise, return None and warn that initial processing is disabled.
LOGGER.error(
"You have not included running the initial filter stage. This is required for all subsequent "
"stages of processing. Please check your configuration file."
)
return None
[docs]
def run_grains( # noqa: C901
image: npt.NDArray,
pixel_to_nm_scaling: float,
filename: str,
grain_out_path: Path,
core_out_path: Path,
plotting_config: dict,
grains_config: dict,
) -> dict | None:
"""
Identify grains (molecules) and optionally plots the results.
Parameters
----------
image : npt.NDArray
2d numpy array image to find grains in.
pixel_to_nm_scaling : float
Scaling factor for converting pixel length scales to nanometres. I.e. the number of pixels per nanometre.
filename : str
Name of file being processed (used in logging).
grain_out_path : Path
Output path for step-by-step grain finding plots.
core_out_path : Path
General output directory for outputs such as the flattened image with grain masks overlaid.
plotting_config : dict
Dictionary of configuration for plotting images.
grains_config : dict
Dictionary of configuration for the Grains class to use when initialised.
Returns
-------
dict | None
Either None in the case of error or grain finding being disabled or a dictionary
with keys of "above" and or "below" containing binary masks depicting where grains
have been detected.
"""
if grains_config["run"]:
grains_config.pop("run")
try:
LOGGER.info(f"[{filename}] : *** Grain Finding ***")
grains = Grains(
image=image,
filename=filename,
pixel_to_nm_scaling=pixel_to_nm_scaling,
**grains_config,
)
grains.find_grains()
for direction, _ in grains.region_properties.items():
LOGGER.info(
f"[{filename}] : Grains found for direction {direction} : {len(grains.region_properties[direction])}"
)
if len(grains.region_properties[direction]) == 0:
LOGGER.warning(f"[{filename}] : No grains found for direction {direction}")
except Exception as e:
LOGGER.error(
f"[{filename}] : An error occurred during grain finding, skipping following steps.", exc_info=e
)
else:
for direction, region_props in grains.region_properties.items():
if len(region_props) == 0:
LOGGER.warning(f"[{filename}] : No grains found for the {direction} direction.")
# Optionally plot grain finding stage if we have found grains and plotting is required
if plotting_config["run"]:
plotting_config.pop("run")
LOGGER.info(f"[{filename}] : Plotting Grain Finding Images")
for direction, image_arrays in grains.directions.items():
LOGGER.debug(f"[{filename}] : Plotting {direction} Grain Finding Images")
grain_out_path_direction = grain_out_path / f"{direction}"
if plotting_config["image_set"] == "all":
grain_out_path_direction.mkdir(parents=True, exist_ok=True)
LOGGER.debug(f"[{filename}] : Target grain directory created : {grain_out_path_direction}")
for plot_name, array in image_arrays.items():
if len(array.shape) == 3:
# Use the DNA class mask from the tensor. Hardcoded to 1 as this implementation is not yet generalised.
array = array[:, :, 1]
LOGGER.debug(f"[{filename}] : Plotting {plot_name} image")
plotting_config["plot_dict"][plot_name]["output_dir"] = grain_out_path_direction
Images(
data=np.zeros_like(array), masked_array=array, **plotting_config["plot_dict"][plot_name]
).plot_and_save()
# Make a plot of coloured regions with bounding boxes
plotting_config["plot_dict"]["bounding_boxes"]["output_dir"] = grain_out_path_direction
Images(
grains.directions[direction]["coloured_regions"],
**plotting_config["plot_dict"]["bounding_boxes"],
region_properties=grains.region_properties[direction],
).plot_and_save()
plotting_config["plot_dict"]["coloured_boxes"]["output_dir"] = grain_out_path_direction
# hard code to class index 1, as this implementation is not yet generalised.
Images(
data=np.zeros_like(grains.directions[direction]["labelled_regions_02"][:, :, 1]),
masked_array=grains.directions[direction]["labelled_regions_02"][:, :, 1],
**plotting_config["plot_dict"]["coloured_boxes"],
region_properties=grains.region_properties[direction],
).plot_and_save()
# Always want mask_overlay (aka "Height Thresholded with Mask") but in core_out_path
plot_name = "mask_overlay"
plotting_config["plot_dict"][plot_name]["output_dir"] = core_out_path
# hard code to class index 1, as this implementation is not yet generalised.
Images(
image,
filename=f"{filename}_{direction}_masked",
masked_array=grains.directions[direction]["removed_small_objects"][:, :, 1].astype(bool),
**plotting_config["plot_dict"][plot_name],
region_properties=grains.region_properties[direction],
).plot_and_save()
plotting_config["run"] = True
else:
# Otherwise, return None and warn that plotting is disabled for grain finding images
LOGGER.info(f"[{filename}] : Plotting disabled for Grain Finding Images")
grain_masks = {}
for direction in grains.directions:
grain_masks[direction] = grains.directions[direction]["labelled_regions_02"]
LOGGER.info(f"[{filename}] : Grain Finding stage completed successfully.")
return grain_masks
# Otherwise, return None and warn grainstats is disabled
LOGGER.info(f"[{filename}] Detection of grains disabled, GrainStats will not be run.")
return None
[docs]
def run_grainstats(
image: npt.NDArray,
pixel_to_nm_scaling: float,
grain_masks: dict,
filename: str,
basename: Path,
grainstats_config: dict,
plotting_config: dict,
grain_out_path: Path,
):
"""
Calculate grain statistics for an image and optionally plots the results.
Parameters
----------
image : npt.NDArray
2D numpy array image for grain statistics calculations.
pixel_to_nm_scaling : float
Scaling factor for converting pixel length scales to nanometres.
ie the number of pixels per nanometre.
grain_masks : dict
Dictionary of grain masks, keys "above" or "below" with values of 2d numpy
boolean arrays indicating the pixels that have been masked as grains.
filename : str
Name of the image.
basename : Path
Path to directory containing the image.
grainstats_config : dict
Dictionary of configuration for the GrainStats class to be used when initialised.
plotting_config : dict
Dictionary of configuration for plotting images.
grain_out_path : Path
Directory to save optional grain statistics visual information to.
Returns
-------
pd.DataFrame
A pandas DataFrame containing the statsistics for each grain. The index is the
filename and grain number.
"""
# Calculate statistics if required
if grainstats_config["run"]:
grainstats_config.pop("run")
# Grain Statistics :
try:
LOGGER.info(f"[{filename}] : *** Grain Statistics ***")
grain_plot_dict = {
key: value
for key, value in plotting_config["plot_dict"].items()
if key in ["grain_image", "grain_mask", "grain_mask_image"]
}
grainstats_dict = {}
height_profiles_dict = {}
# There are two layers to process those above the given threshold and those below
for direction, _ in grain_masks.items():
# Get the DNA class mask from the tensor
LOGGER.debug(f"[{filename}] : Full Mask dimensions: {grain_masks[direction].shape}")
assert len(grain_masks[direction].shape) == 3, "Grain masks should be 3D tensors"
dna_class_mask = grain_masks[direction][:, :, 1]
LOGGER.debug(f"[{filename}] : DNA Mask dimensions: {dna_class_mask.shape}")
# Check if there are grains
if np.max(dna_class_mask) == 0:
LOGGER.warning(
f"[{filename}] : No grains exist for the {direction} direction. Skipping grainstats for {direction}."
)
grainstats_dict[direction] = create_empty_dataframe(
column_set="grainstats", index_col="grain_number"
)
else:
grainstats_calculator = GrainStats(
data=image,
labelled_data=dna_class_mask,
pixel_to_nanometre_scaling=pixel_to_nm_scaling,
direction=direction,
base_output_dir=grain_out_path,
image_name=filename,
plot_opts=grain_plot_dict,
**grainstats_config,
)
grainstats_dict[direction], grains_plot_data, height_profiles_dict[direction] = (
grainstats_calculator.calculate_stats()
)
grainstats_dict[direction]["threshold"] = direction
# Plot grains if required
if plotting_config["image_set"] == "all":
LOGGER.info(f"[{filename}] : Plotting grain images for direction: {direction}.")
for plot_data in grains_plot_data:
LOGGER.debug(
f"[{filename}] : Plotting grain image {plot_data['filename']} for direction: {direction}."
)
Images(
data=plot_data["data"],
output_dir=plot_data["output_dir"],
filename=plot_data["filename"],
**plotting_config["plot_dict"][plot_data["name"]],
).plot_and_save()
# Create results dataframe from above and below results
# Appease pylint and ensure that grainstats_df is always created
grainstats_df = create_empty_dataframe(column_set="grainstats", index_col="grain_number")
if "above" in grainstats_dict and "below" in grainstats_dict:
grainstats_df = pd.concat([grainstats_dict["below"], grainstats_dict["above"]])
elif "above" in grainstats_dict:
grainstats_df = grainstats_dict["above"]
elif "below" in grainstats_dict:
grainstats_df = grainstats_dict["below"]
else:
raise ValueError(
"grainstats dictionary has neither 'above' nor 'below' keys. This should be impossible."
)
grainstats_df["basename"] = basename.parent
LOGGER.info(f"[{filename}] : Calculated grainstats for {len(grainstats_df)} grains.")
LOGGER.info(f"[{filename}] : Grainstats stage completed successfully.")
return grainstats_df, height_profiles_dict
except Exception:
LOGGER.info(
f"[{filename}] : Errors occurred whilst calculating grain statistics. Returning empty dataframe."
)
return create_empty_dataframe(column_set="grainstats", index_col="grain_number"), height_profiles_dict
else:
LOGGER.info(
f"[{filename}] : Calculation of grainstats disabled, returning empty dataframe and empty height_profiles."
)
return create_empty_dataframe(column_set="grainstats", index_col="grain_number"), {}
[docs]
def run_disordered_tracing(
image: npt.NDArray,
grain_masks: dict,
pixel_to_nm_scaling: float,
filename: str,
basename: str,
core_out_path: Path,
tracing_out_path: Path,
disordered_tracing_config: dict,
plotting_config: dict,
grainstats_df: pd.DataFrame = None,
) -> dict:
"""
Skeletonise and prune grains, adding results to statistics data frames and optionally plot results.
Parameters
----------
image : npt.ndarray
Image containing the grains to pass to the tracing function.
grain_masks : dict
Dictionary of grain masks, keys "above" or "below" with values of 2D Numpy boolean arrays indicating the pixels
that have been masked as grains.
pixel_to_nm_scaling : float
Scaling factor for converting pixel length scales to nanometers, i.e. the number of pixesl per nanometres (nm).
filename : str
Name of the image.
basename : Path
Path to directory containing the image.
core_out_path : Path
Path to save the core disordered trace image to.
tracing_out_path : Path
Path to save the optional, diagnostic disordered trace images to.
disordered_tracing_config : dict
Dictionary configuration for obtaining a disordered trace representation of the grains.
plotting_config : dict
Dictionary configuration for plotting images.
grainstats_df : pd.DataFrame | None
The grain statistics dataframe to be added to. This optional argument defaults to `None` in which case an empty grainstats dataframe is created.
Returns
-------
dict
Dictionary of "grain_<index>" keys and Nx2 coordinate arrays of the disordered grain trace.
"""
if disordered_tracing_config["run"]:
disordered_tracing_config.pop("run")
LOGGER.info(f"[{filename}] : *** Disordered Tracing ***")
if grainstats_df is None:
grainstats_df = create_empty_dataframe(column_set="grainstats", index_col="grain_number")
disordered_traces = defaultdict()
disordered_trace_grainstats = pd.DataFrame()
disordered_tracing_stats_image = pd.DataFrame()
try:
# run image using directional grain masks
for direction, _ in grain_masks.items():
# Check if there are grains
assert len(grain_masks[direction].shape) == 3, "Grain masks should be 3D tensors"
dna_class_mask = grain_masks[direction][:, :, 1]
if np.max(dna_class_mask) == 0:
LOGGER.warning(
f"[{filename}] : No grains exist for the {direction} direction. Skipping disordered_tracing for {direction}."
)
raise ValueError(f"No grains exist for the {direction} direction")
# if grains are found
(
disordered_traces_cropped_data,
_disordered_trace_grainstats,
disordered_tracing_images,
disordered_tracing_stats,
) = trace_image_disordered(
image=image,
grains_mask=dna_class_mask,
filename=filename,
pixel_to_nm_scaling=pixel_to_nm_scaling,
**disordered_tracing_config,
)
# save per image new grainstats stats
_disordered_trace_grainstats["threshold"] = direction
disordered_trace_grainstats = pd.concat([disordered_trace_grainstats, _disordered_trace_grainstats])
disordered_tracing_stats["threshold"] = direction
disordered_tracing_stats["basename"] = basename.parent
disordered_tracing_stats_image = pd.concat([disordered_tracing_stats_image, disordered_tracing_stats])
# append direction results to dict
disordered_traces[direction] = disordered_traces_cropped_data
# save plots
Images(
image,
masked_array=disordered_tracing_images.pop("pruned_skeleton"),
output_dir=core_out_path,
filename=f"{filename}_{direction}_disordered_trace",
**plotting_config["plot_dict"]["pruned_skeleton"],
).plot_and_save()
for plot_name, image_value in disordered_tracing_images.items():
Images(
image,
masked_array=image_value,
output_dir=tracing_out_path / direction,
**plotting_config["plot_dict"][plot_name],
).plot_and_save()
# merge grainstats data with other dataframe
resultant_grainstats = (
pd.merge(grainstats_df, disordered_trace_grainstats, on=["image", "threshold", "grain_number"])
if grainstats_df is not None
else disordered_trace_grainstats
)
LOGGER.info(f"[{filename}] : Disordered Tracing stage completed successfully.")
return disordered_traces, resultant_grainstats, disordered_tracing_stats_image
except ValueError as e:
LOGGER.info(f"[{filename}] : Disordered tracing failed with ValueError {e}")
except Exception as e:
LOGGER.info(
f"[{filename}] : Disordered tracing failed - skipping. Consider raising an issue on GitHub. Error: ",
exc_info=e,
)
return (
disordered_traces,
grainstats_df,
create_empty_dataframe(column_set="disordered_tracing_statistics", index_col="index"),
)
LOGGER.info(f"[{filename}] Calculation of Disordered Tracing disabled, returning empty dictionary.")
return None, grainstats_df, create_empty_dataframe(column_set="disordered_tracing_statistics", index_col="index")
[docs]
def run_nodestats( # noqa: C901
image: npt.NDArray,
disordered_tracing_data: dict,
pixel_to_nm_scaling: float,
filename: str,
core_out_path: Path,
tracing_out_path: Path,
nodestats_config: dict,
plotting_config: dict,
grainstats_df: pd.DataFrame = None,
) -> tuple[dict, pd.DataFrame]:
"""
Analyse crossing points in grains adding results to statistics data frames and optionally plot results.
Parameters
----------
image : npt.ndarray
Image containing the DNA to pass to the tracing function.
disordered_tracing_data : dict
Dictionary of skeletonised and pruned grain masks. Result from "run_disordered_tracing".
pixel_to_nm_scaling : float
Scaling factor for converting pixel length scales to nanometers, i.e. the number of pixels per nanometres (nm).
filename : str
Name of the image.
core_out_path : Path
Path to save the core NodeStats image to.
tracing_out_path : Path
Path to save optional, diagnostic NodeStats images to.
nodestats_config : dict
Dictionary configuration for analysing the crossing points.
plotting_config : dict
Dictionary configuration for plotting images.
grainstats_df : pd.DataFrame | None
The grain statistics dataframe to bee added to. This optional argument defaults to `None` in which case an empty grainstats dataframe is created.
Returns
-------
tuple[dict, pd.DataFrame]
A NodeStats analysis dictionary and grainstats metrics dataframe.
"""
if nodestats_config["run"]:
nodestats_config.pop("run")
LOGGER.info(f"[{filename}] : *** Nodestats ***")
if grainstats_df is None:
grainstats_df = create_empty_dataframe(column_set="grainstats", index_col="grain_number")
nodestats_whole_data = defaultdict()
nodestats_grainstats = pd.DataFrame()
try:
# run image using directional grain masks
for direction, disordered_tracing_direction_data in disordered_tracing_data.items():
(
nodestats_data,
_nodestats_grainstats,
nodestats_full_images,
nodestats_branch_images,
) = nodestats_image(
image=image,
disordered_tracing_direction_data=disordered_tracing_direction_data,
filename=filename,
pixel_to_nm_scaling=pixel_to_nm_scaling,
**nodestats_config,
)
# save per image new grainstats stats
_nodestats_grainstats["threshold"] = direction
nodestats_grainstats = pd.concat([nodestats_grainstats, _nodestats_grainstats])
# append direction results to dict
nodestats_whole_data[direction] = {"stats": nodestats_data, "images": nodestats_branch_images}
# save whole image plots
Images(
filename=f"{filename}_{direction}_nodes",
data=image,
masked_array=nodestats_full_images.pop("connected_nodes"),
output_dir=core_out_path,
**plotting_config["plot_dict"]["connected_nodes"],
).plot_and_save()
for plot_name, image_value in nodestats_full_images.items():
Images(
image,
masked_array=image_value,
output_dir=tracing_out_path / direction,
**plotting_config["plot_dict"][plot_name],
).plot_and_save()
# plot single node images
for mol_no, mol_stats in nodestats_data.items():
if mol_stats is not None:
for node_no, single_node_stats in mol_stats.items():
# plot the node and branch_mask images
for cropped_image_type, cropped_image in nodestats_branch_images[mol_no]["nodes"][
node_no
].items():
Images(
nodestats_branch_images[mol_no]["grain"]["grain_image"],
masked_array=cropped_image,
output_dir=tracing_out_path / direction / "nodes",
filename=f"{mol_no}_{node_no}_{cropped_image_type}",
**plotting_config["plot_dict"][cropped_image_type],
).plot_and_save()
# plot crossing height linetrace
if plotting_config["image_set"] == "all":
if not single_node_stats["error"]:
fig, _ = plot_crossing_linetrace_halfmax(
branch_stats_dict=single_node_stats["branch_stats"],
mask_cmap=plotting_config["plot_dict"]["node_line_trace"]["mask_cmap"],
title=plotting_config["plot_dict"]["node_line_trace"]["mask_cmap"],
)
fig.savefig(
tracing_out_path
/ direction
/ "nodes"
/ f"{mol_no}_{node_no}_linetrace_halfmax.svg",
format="svg",
)
# merge grainstats data with other dataframe
resultant_grainstats = (
pd.merge(grainstats_df, nodestats_grainstats, on=["image", "threshold", "grain_number"])
if grainstats_df is not None
else nodestats_grainstats
)
LOGGER.info(f"[{filename}] : NodeStats stage completed successfully.")
# merge all image dictionaries
return nodestats_whole_data, resultant_grainstats
except UnboundLocalError as e:
LOGGER.info(
f"[{filename}] : NodeStats failed with UnboundLocalError {e} - all skeletons pruned in the Disordered Tracing step."
)
except KeyError as e:
LOGGER.info(
f"[{filename}] : NodeStats failed with KeyError {e} - no skeletons found from the Disordered Tracing step."
)
except Exception as e:
LOGGER.info(
f"[{filename}] : NodeStats failed - skipping. Consider raising an issue on GitHub. Error: ", exc_info=e
)
return nodestats_whole_data, grainstats_df
LOGGER.info(f"[{filename}] : Calculation of nodestats disabled, returning empty dataframe.")
return None, grainstats_df
# need to add in the molstats here
[docs]
def run_ordered_tracing(
image: npt.NDArray,
disordered_tracing_data: dict,
nodestats_data: dict,
filename: str,
basename: Path,
core_out_path: Path,
tracing_out_path: Path,
ordered_tracing_config: dict,
plotting_config: dict,
grainstats_df: pd.DataFrame = None,
) -> tuple:
"""
Order coordinates of traces, adding results to statistics data frames and optionally plot results.
Parameters
----------
image : npt.ndarray
Image containing the DNA to pass to the tracing function.
disordered_tracing_data : dict
Dictionary of skeletonised and pruned grain masks. Result from "run_disordered_tracing".
nodestats_data : dict
Dictionary of images and statistics from the NodeStats analysis. Result from "run_nodestats".
filename : str
Name of the image.
basename : Path
The path of the files' parent directory.
core_out_path : Path
Path to save the core ordered tracing image to.
tracing_out_path : Path
Path to save optional, diagnostic ordered trace images to.
ordered_tracing_config : dict
Dictionary configuration for obtaining an ordered trace representation of the skeletons.
plotting_config : dict
Dictionary configuration for plotting images.
grainstats_df : pd.DataFrame | None
The grain statistics dataframe to be added to. This optional argument defaults to `None` in which case an empty grainstats dataframe is created.
Returns
-------
tuple[dict, pd.DataFrame]
A NodeStats analysis dictionary and grainstats metrics dataframe.
"""
if ordered_tracing_config["run"]:
ordered_tracing_config.pop("run")
LOGGER.info(f"[{filename}] : *** Ordered Tracing ***")
if grainstats_df is None:
grainstats_df = create_empty_dataframe(column_set="grainstats", index_col="grain_number")
ordered_tracing_image_data = defaultdict()
ordered_tracing_molstats = pd.DataFrame()
ordered_tracing_grainstats = pd.DataFrame()
try:
# run image using directional grain masks
for direction, disordered_tracing_direction_data in disordered_tracing_data.items():
# Check if there are grains
if not disordered_tracing_direction_data:
LOGGER.warning(
f"[{filename}] : No skeletons exist for the {direction} direction. Skipping ordered_tracing for {direction}."
)
raise ValueError(f"No skeletons exist for the {direction} direction")
# if grains are found
(
ordered_tracing_data,
_ordered_tracing_grainstats,
_ordered_tracing_molstats,
ordered_tracing_full_images,
) = ordered_tracing_image(
image=image,
disordered_tracing_direction_data=disordered_tracing_direction_data,
nodestats_direction_data=nodestats_data[direction],
filename=filename,
**ordered_tracing_config,
)
# save per image new grainstats stats
_ordered_tracing_grainstats["threshold"] = direction
ordered_tracing_grainstats = pd.concat([ordered_tracing_grainstats, _ordered_tracing_grainstats])
_ordered_tracing_molstats["threshold"] = direction
ordered_tracing_molstats = pd.concat([ordered_tracing_molstats, _ordered_tracing_molstats])
# append direction results to dict
ordered_tracing_image_data[direction] = ordered_tracing_data
# save whole image plots
plotting_config["plot_dict"]["ordered_traces"]["core_set"] = True # fudge around core having own cmap
Images(
filename=f"{filename}_{direction}_ordered_traces",
data=image,
masked_array=ordered_tracing_full_images.pop("ordered_traces"),
output_dir=core_out_path,
**plotting_config["plot_dict"]["ordered_traces"],
).plot_and_save()
# save optional diagnostic plots (those with core_set = False)
for plot_name, image_value in ordered_tracing_full_images.items():
Images(
image,
masked_array=image_value,
output_dir=tracing_out_path / direction,
**plotting_config["plot_dict"][plot_name],
).plot_and_save()
# merge grainstats data with other dataframe
resultant_grainstats = (
pd.merge(grainstats_df, ordered_tracing_grainstats, on=["image", "threshold", "grain_number"])
if grainstats_df is not None
else ordered_tracing_grainstats
)
ordered_tracing_molstats["basename"] = basename.parent
LOGGER.info(f"[{filename}] : Ordered Tracing stage completed successfully.")
# merge all image dictionaries
return ordered_tracing_image_data, resultant_grainstats, ordered_tracing_molstats
except ValueError as e:
LOGGER.info(
f"[{filename}] : Ordered Tracing failed with ValueError {e} - No skeletons exist for the {direction} direction."
)
except KeyError as e:
LOGGER.info(
f"[{filename}] : Ordered Tracing failed with KeyError {e} - no skeletons found from the Disordered Tracing step."
)
except Exception as e:
LOGGER.info(
f"[{filename}] : Ordered Tracing failed - skipping. Consider raising an issue on GitHub. Error: ",
exc_info=e,
)
return (
ordered_tracing_image_data,
grainstats_df,
create_empty_dataframe(column_set="mol_statistics", index_col="molecule_number"),
)
return None, grainstats_df, create_empty_dataframe(column_set="mol_statistics", index_col="molecule_number")
[docs]
def run_splining(
image: npt.NDArray,
ordered_tracing_data: dict,
pixel_to_nm_scaling: float,
filename: str,
core_out_path: Path,
splining_config: dict,
plotting_config: dict,
grainstats_df: pd.DataFrame = None,
molstats_df: pd.DataFrame = None,
) -> tuple:
"""
Smooth the ordered trace coordinates, adding results to statistics data frames and optionally plot results.
Parameters
----------
image : npt.NDArray
Image containing the DNA to pass to the tracing function.
ordered_tracing_data : dict
Dictionary of ordered coordinates. Result from "run_ordered_tracing".
pixel_to_nm_scaling : float
Scaling factor for converting pixel length scales to nanometers, i.e. the number of pixels per nanometres (nm).
filename : str
Name of the image.
core_out_path : Path
Path to save the core ordered tracing image to.
splining_config : dict
Dictionary configuration for obtaining an ordered trace representation of the skeletons.
plotting_config : dict
Dictionary configuration for plotting images.
grainstats_df : pd.DataFrame | None
The grain statistics dataframe to be added to. This optional argument defaults to `None` in which case an empty grainstats dataframe is created.
molstats_df : pd.DataFrame | None
The molecule statistics dataframe to be added to. This optional argument defaults to `None` in which case an empty grainstats dataframe is created.
Returns
-------
tuple[dict, pd.DataFrame]
A smooth curve analysis dictionary and grainstats metrics dataframe.
"""
if splining_config["run"]:
splining_config.pop("run")
LOGGER.info(f"[{filename}] : *** Splining ***")
if grainstats_df is None:
grainstats_df = create_empty_dataframe(column_set="grainstats", index_col="grain_number")
if molstats_df is None:
molstats_df = create_empty_dataframe(column_set="mol_statistics", index_col="molecule_number")
splined_image_data = defaultdict()
splining_grainstats = pd.DataFrame()
splining_molstats = pd.DataFrame()
try:
# run image using directional grain masks
for direction, ordered_tracing_direction_data in ordered_tracing_data.items():
if not ordered_tracing_direction_data:
LOGGER.warning(
f"[{filename}] : No grains exist for the {direction} direction. Skipping disordered_tracing for {direction}."
)
splining_grainstats = create_empty_dataframe(column_set="grainstats", index_col="grain_number")
splining_molstats = create_empty_dataframe(column_set="mol_statistics", index_col="molecule_number")
raise ValueError(f"No grains exist for the {direction} direction")
# if grains are found
(
splined_data,
_splining_grainstats,
_splining_molstats,
) = splining_image(
image=image,
ordered_tracing_direction_data=ordered_tracing_direction_data,
filename=filename,
pixel_to_nm_scaling=pixel_to_nm_scaling,
**splining_config,
)
# save per image new grainstats stats
_splining_grainstats["threshold"] = direction
splining_grainstats = pd.concat([splining_grainstats, _splining_grainstats])
_splining_molstats["threshold"] = direction
splining_molstats = pd.concat([splining_molstats, _splining_molstats])
# append direction results to dict
splined_image_data[direction] = splined_data
# Plot traces on each grain individually
all_splines = []
for _, grain_dict in splined_data.items():
for _, mol_dict in grain_dict.items():
all_splines.append(mol_dict["spline_coords"] + mol_dict["bbox"][:2])
Images(
data=image,
output_dir=core_out_path,
filename=f"{filename}_{direction}_all_splines",
plot_coords=all_splines,
**plotting_config["plot_dict"]["splined_trace"],
).plot_and_save()
# merge grainstats data with other dataframe
resultant_grainstats = (
pd.merge(grainstats_df, splining_grainstats, on=["image", "threshold", "grain_number"])
if grainstats_df is not None
else splining_grainstats
)
# merge molstats data with other dataframe
resultant_molstats = (
pd.merge(molstats_df, splining_molstats, on=["image", "threshold", "grain_number", "molecule_number"])
if molstats_df is not None
else splining_molstats
)
LOGGER.info(f"[{filename}] : Splining stage completed successfully.")
# merge all image dictionaries
return splined_image_data, resultant_grainstats, resultant_molstats
except KeyError as e:
LOGGER.info(
f"[{filename}] : Splining failed with KeyError {e} - no ordered traces found from the Ordered Tracing step."
)
return (
splined_image_data,
grainstats_df,
create_empty_dataframe(column_set="mol_statistics", index_col="molecule_number"),
)
except Exception as e:
LOGGER.error(
f"[{filename}] : Splining failed - skipping. Consider raising an issue on GitHub. Error: ", exc_info=e
)
return splined_image_data, grainstats_df, splining_molstats
return None, grainstats_df, molstats_df
[docs]
def get_out_paths(image_path: Path, base_dir: Path, output_dir: Path, filename: str, plotting_config: dict):
"""
Determine components of output paths for a given image and plotting config.
Parameters
----------
image_path : Path
Path of the image being processed.
base_dir : Path
Path of the data folder.
output_dir : Path
Base output directory for output data.
filename : str
Name of the image being processed.
plotting_config : dict
Dictionary of configuration for plotting images.
Returns
-------
tuple
Core output path for general file outputs, filter output path for flattening related files and
grain output path for grain finding related files.
"""
LOGGER.info(f"Processing : {filename}")
core_out_path = get_out_path(image_path, base_dir, output_dir).parent / "processed"
core_out_path.mkdir(parents=True, exist_ok=True)
filter_out_path = core_out_path / filename / "filters"
grain_out_path = core_out_path / filename / "grains"
tracing_out_path = core_out_path / filename / "dnatracing"
if plotting_config["image_set"] == "all":
filter_out_path.mkdir(exist_ok=True, parents=True)
Path.mkdir(grain_out_path / "above", parents=True, exist_ok=True)
Path.mkdir(grain_out_path / "below", parents=True, exist_ok=True)
Path.mkdir(tracing_out_path / "above", parents=True, exist_ok=True)
Path.mkdir(tracing_out_path / "below", parents=True, exist_ok=True)
Path.mkdir(tracing_out_path / "above" / "nodes", parents=True, exist_ok=True)
Path.mkdir(tracing_out_path / "below" / "nodes", parents=True, exist_ok=True)
return core_out_path, filter_out_path, grain_out_path, tracing_out_path
[docs]
def process_scan(
topostats_object: dict,
base_dir: str | Path,
filter_config: dict,
grains_config: dict,
grainstats_config: dict,
disordered_tracing_config: dict,
nodestats_config: dict,
ordered_tracing_config: dict,
splining_config: dict,
plotting_config: dict,
output_dir: str | Path = "output",
) -> tuple[dict, pd.DataFrame, dict]:
"""
Process a single image, filtering, finding grains and calculating their statistics.
Parameters
----------
topostats_object : dict[str, Union[npt.NDArray, Path, float]]
A dictionary with keys 'image', 'img_path' and 'pixel_to_nm_scaling' containing a file or frames' image, it's path and it's
pixel to namometre scaling value.
base_dir : str | Path
Directory to recursively search for files, if not specified the current directory is scanned.
filter_config : dict
Dictionary of configuration options for running the Filter stage.
grains_config : dict
Dictionary of configuration options for running the Grain detection stage.
grainstats_config : dict
Dictionary of configuration options for running the Grain Statistics stage.
disordered_tracing_config : dict
Dictionary configuration for obtaining a disordered trace representation of the grains.
nodestats_config : dict
Dictionary of configuration options for running the NodeStats stage.
ordered_tracing_config : dict
Dictionary configuration for obtaining an ordered trace representation of the skeletons.
splining_config : dict
Dictionary of configuration options for running the splining stage.
plotting_config : dict
Dictionary of configuration options for plotting figures.
output_dir : str | Path
Directory to save output to, it will be created if it does not exist. If it already exists then it is possible
that output will be over-written.
Returns
-------
tuple[dict, pd.DataFrame, dict]
TopoStats dictionary object, DataFrame containing grain statistics and dna tracing statistics,
and dictionary containing general image statistics.
"""
core_out_path, filter_out_path, grain_out_path, tracing_out_path = get_out_paths(
image_path=topostats_object["img_path"],
base_dir=base_dir,
output_dir=output_dir,
filename=topostats_object["filename"],
plotting_config=plotting_config,
)
plotting_config = add_pixel_to_nm_to_plotting_config(plotting_config, topostats_object["pixel_to_nm_scaling"])
# Flatten Image
image_flattened = run_filters(
unprocessed_image=topostats_object["image_original"],
pixel_to_nm_scaling=topostats_object["pixel_to_nm_scaling"],
filename=topostats_object["filename"],
filter_out_path=filter_out_path,
core_out_path=core_out_path,
filter_config=filter_config,
plotting_config=plotting_config,
)
# Use flattened image if one is returned, else use original image
topostats_object["image_flattened"] = (
image_flattened if image_flattened is not None else topostats_object["image_original"]
)
# Find Grains :
grain_masks = run_grains(
image=topostats_object["image_flattened"],
pixel_to_nm_scaling=topostats_object["pixel_to_nm_scaling"],
filename=topostats_object["filename"],
grain_out_path=grain_out_path,
core_out_path=core_out_path,
plotting_config=plotting_config,
grains_config=grains_config,
)
# Update grain masks if new grain masks are returned. Else keep old grain masks. Topostats object's "grain_masks"
# defaults to an empty dictionary so this is safe.
topostats_object["grain_masks"] = grain_masks if grain_masks is not None else topostats_object["grain_masks"]
if "above" in topostats_object["grain_masks"].keys() or "below" in topostats_object["grain_masks"].keys():
# Grainstats :
grainstats_df, height_profiles = run_grainstats(
image=topostats_object["image_flattened"],
pixel_to_nm_scaling=topostats_object["pixel_to_nm_scaling"],
grain_masks=topostats_object["grain_masks"],
filename=topostats_object["filename"],
basename=topostats_object["img_path"],
grainstats_config=grainstats_config,
plotting_config=plotting_config,
grain_out_path=grain_out_path,
)
topostats_object["height_profiles"] = height_profiles
# Disordered Tracing
disordered_traces_data, grainstats_df, disordered_tracing_stats = run_disordered_tracing(
image=topostats_object["image_flattened"],
grain_masks=topostats_object["grain_masks"],
pixel_to_nm_scaling=topostats_object["pixel_to_nm_scaling"],
filename=topostats_object["filename"],
basename=topostats_object["img_path"],
core_out_path=core_out_path,
tracing_out_path=tracing_out_path,
disordered_tracing_config=disordered_tracing_config,
grainstats_df=grainstats_df,
plotting_config=plotting_config,
)
topostats_object["disordered_traces"] = disordered_traces_data
# Nodestats
nodestats, grainstats_df = run_nodestats(
image=topostats_object["image_flattened"],
disordered_tracing_data=topostats_object["disordered_traces"],
pixel_to_nm_scaling=topostats_object["pixel_to_nm_scaling"],
filename=topostats_object["filename"],
core_out_path=core_out_path,
tracing_out_path=tracing_out_path,
plotting_config=plotting_config,
nodestats_config=nodestats_config,
grainstats_df=grainstats_df,
)
# Ordered Tracing
ordered_tracing, grainstats_df, molstats_df = run_ordered_tracing(
image=topostats_object["image_flattened"],
disordered_tracing_data=topostats_object["disordered_traces"],
nodestats_data=nodestats,
filename=topostats_object["filename"],
basename=topostats_object["img_path"],
core_out_path=core_out_path,
tracing_out_path=tracing_out_path,
ordered_tracing_config=ordered_tracing_config,
plotting_config=plotting_config,
grainstats_df=grainstats_df,
)
topostats_object["ordered_traces"] = ordered_tracing
topostats_object["nodestats"] = nodestats # looks weird but ordered adds an extra field
# splining
splined_data, grainstats_df, molstats_df = run_splining(
image=topostats_object["image_flattened"],
ordered_tracing_data=topostats_object["ordered_traces"],
pixel_to_nm_scaling=topostats_object["pixel_to_nm_scaling"],
filename=topostats_object["filename"],
core_out_path=core_out_path,
plotting_config=plotting_config,
splining_config=splining_config,
grainstats_df=grainstats_df,
molstats_df=molstats_df,
)
# Add grain trace data to topostats object
topostats_object["splining"] = splined_data
else:
grainstats_df = create_empty_dataframe(column_set="grainstats", index_col="grain_number")
molstats_df = create_empty_dataframe(column_set="mol_statistics", index_col="molecule_number")
disordered_tracing_stats = create_empty_dataframe(column_set="disordered_tracing_statistics", index_col="index")
height_profiles = {}
# Get image statistics
LOGGER.info(f"[{topostats_object['filename']}] : *** Image Statistics ***")
# Provide the raw image if image has not been flattened, else provide the flattened image.
if topostats_object["image_flattened"] is not None:
image_for_image_stats = topostats_object["image_flattened"]
else:
image_for_image_stats = topostats_object["image_original"]
# Calculate image statistics - returns a dictionary
image_stats = image_statistics(
image=image_for_image_stats,
filename=topostats_object["filename"],
results_df=grainstats_df,
pixel_to_nm_scaling=topostats_object["pixel_to_nm_scaling"],
)
# Save the topostats dictionary object to .topostats file.
save_topostats_file(
output_dir=core_out_path, filename=str(topostats_object["filename"]), topostats_object=topostats_object
)
return (
topostats_object["img_path"],
grainstats_df,
height_profiles,
image_stats,
disordered_tracing_stats,
molstats_df,
)
[docs]
def check_run_steps( # noqa: C901
filter_run: bool,
grains_run: bool,
grainstats_run: bool,
disordered_tracing_run: bool,
nodestats_run: bool,
ordered_tracing_run: bool,
splining_run: bool,
) -> None:
"""
Check options for running steps (Filter, Grain, Grainstats and DNA tracing) are logically consistent.
This checks that earlier steps required are enabled.
Parameters
----------
filter_run : bool
Flag for running Filtering.
grains_run : bool
Flag for running Grains.
grainstats_run : bool
Flag for running GrainStats.
disordered_tracing_run : bool
Flag for running Disordered Tracing.
nodestats_run : bool
Flag for running NodeStats.
ordered_tracing_run : bool
Flag for running Ordered Tracing.
splining_run : bool
Flag for running DNA Tracing.
"""
LOGGER.debug(f"{filter_run=}")
LOGGER.debug(f"{grains_run=}")
LOGGER.debug(f"{grainstats_run=}")
LOGGER.debug(f"{disordered_tracing_run=}")
LOGGER.debug(f"{nodestats_run=}")
LOGGER.debug(f"{ordered_tracing_run=}")
LOGGER.debug(f"{splining_run=}")
if splining_run:
if ordered_tracing_run is False:
LOGGER.error("Splining enabled but Ordered Tracing disabled. Please check your configuration file.")
if nodestats_run is False:
LOGGER.error("Splining enabled but NodeStats disabled. Tracing will use the 'old' method.")
if disordered_tracing_run is False:
LOGGER.error("Splining enabled but Disordered Tracing disabled. Please check your configuration file.")
elif grainstats_run is False:
LOGGER.error("Splining enabled but Grainstats disabled. Please check your configuration file.")
elif grains_run is False:
LOGGER.error("Splining enabled but Grains disabled. Please check your configuration file.")
elif filter_run is False:
LOGGER.error("Splining enabled but Filters disabled. Please check your configuration file.")
else:
LOGGER.info("Configuration run options are consistent, processing can proceed.")
elif ordered_tracing_run:
if disordered_tracing_run is False:
LOGGER.error(
"Ordered Tracing enabled but Disordered Tracing disabled. Please check your configuration file."
)
elif grainstats_run is False:
LOGGER.error("NodeStats enabled but Grainstats disabled. Please check your configuration file.")
elif grains_run is False:
LOGGER.error("NodeStats enabled but Grains disabled. Please check your configuration file.")
elif filter_run is False:
LOGGER.error("NodeStats enabled but Filters disabled. Please check your configuration file.")
else:
LOGGER.info("Configuration run options are consistent, processing can proceed.")
elif nodestats_run:
if disordered_tracing_run is False:
LOGGER.error("NodeStats enabled but Disordered Tracing disabled. Please check your configuration file.")
elif grainstats_run is False:
LOGGER.error("NodeStats enabled but Grainstats disabled. Please check your configuration file.")
elif grains_run is False:
LOGGER.error("NodeStats enabled but Grains disabled. Please check your configuration file.")
elif filter_run is False:
LOGGER.error("NodeStats enabled but Filters disabled. Please check your configuration file.")
else:
LOGGER.info("Configuration run options are consistent, processing can proceed.")
elif disordered_tracing_run:
if grainstats_run is False:
LOGGER.error("Disordered Tracing enabled but Grainstats disabled. Please check your configuration file.")
elif grains_run is False:
LOGGER.error("Disordered Tracing enabled but Grains disabled. Please check your configuration file.")
elif filter_run is False:
LOGGER.error("Disordered Tracing enabled but Filters disabled. Please check your configuration file.")
else:
LOGGER.info("Configuration run options are consistent, processing can proceed.")
elif grainstats_run:
if grains_run is False:
LOGGER.error("Grainstats enabled but Grains disabled. Please check your configuration file.")
elif filter_run is False:
LOGGER.error("Grainstats enabled but Filters disabled. Please check your configuration file.")
else:
LOGGER.info("Configuration run options are consistent, processing can proceed.")
elif grains_run:
if filter_run is False:
LOGGER.error("Grains enabled but Filters disabled. Please check your configuration file.")
else:
LOGGER.info("Configuration run options are consistent, processing can proceed.")
else:
LOGGER.info("Configuration run options are consistent, processing can proceed.")
[docs]
def completion_message(config: dict, img_files: list, summary_config: dict, images_processed: int) -> None:
"""
Print a completion message summarising images processed.
Parameters
----------
config : dict
Configuration dictionary.
img_files : list
List of found image paths.
summary_config : dict
Configuration for plotting summary statistics.
images_processed : int
Pandas DataFrame of results.
"""
if summary_config is not None:
distribution_plots_message = str(summary_config["output_dir"])
else:
distribution_plots_message = "Disabled. Enable in config 'summary_stats/run' if needed."
print(
"\n\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n\n"
)
tprint("TopoStats", font="twisted")
LOGGER.info(
f"\n\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ COMPLETE ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n\n"
f" TopoStats Version : {__version__}\n"
f" Base Directory : {config['base_dir']}\n"
f" File Extension : {config['file_ext']}\n"
f" Files Found : {len(img_files)}\n"
f" Successfully Processed^1 : {images_processed} ({(images_processed * 100) / len(img_files)}%)\n"
f" All statistics : {str(config['output_dir'])}/all_statistics.csv\n"
f" Distribution Plots : {distribution_plots_message}\n\n"
f" Configuration : {config['output_dir']}/config.yaml\n\n"
f" Email : topostats@sheffield.ac.uk\n"
f" Documentation : https://afm-spm.github.io/topostats/\n"
f" Source Code : https://github.com/AFM-SPM/TopoStats/\n"
f" Bug Reports/Feature Request : https://github.com/AFM-SPM/TopoStats/issues/new/choose\n"
f" Citation File Format : https://github.com/AFM-SPM/TopoStats/blob/main/CITATION.cff\n\n"
f" ^1 Successful processing of an image is detection of grains and calculation of at least\n"
f" grain statistics. If these have been disabled the percentage will be 0.\n\n"
f" If you encounter bugs/issues or have feature requests please report them at the above URL\n"
f" or email us.\n\n"
f" If you have found TopoStats useful please consider citing it. A Citation File Format is\n"
f" linked above and available from the Source Code page.\n"
f"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n\n"
)