"""
Run TopoStats modules.
This provides an entry point for running TopoStats as a command line programme.
"""
from __future__ import annotations
import argparse
import logging
import sys
from collections import defaultdict
from functools import partial
from importlib import resources
from multiprocessing import Pool
from pprint import pformat
import pandas as pd
import yaml
from tqdm import tqdm
from topostats.io import (
LoadScans,
dict_to_json,
find_files,
merge_mappings,
read_yaml,
save_folder_grainstats,
write_yaml,
)
from topostats.logs.logs import LOGGER_NAME
from topostats.plotting import toposum
from topostats.processing import (
check_run_steps,
completion_message,
process_scan,
run_disordered_tracing,
run_filters,
run_grains,
run_grainstats,
run_nodestats,
run_ordered_tracing,
run_splining,
)
from topostats.utils import update_config, update_plotting_config
from topostats.validation import DEFAULT_CONFIG_SCHEMA, PLOTTING_SCHEMA, SUMMARY_SCHEMA, validate_config
# We already setup the logger in __init__.py and it is idempotent so calling it here returns the same object as from
# __init__.py
# Ref : https://stackoverflow.com/a/57799639/1444043
# LOGGER = setup_logger(LOGGER_NAME)
LOGGER = logging.getLogger(LOGGER_NAME)
# pylint: disable=too-many-branches
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
# pylint: disable=unnecessary-dict-index-lookup
# pylint: disable=too-many-nested-blocks
[docs]
def reconcile_config_args(args: argparse.Namespace | None) -> dict:
"""
Reconcile command line arguments with the default configuration.
Command line arguments take precedence over the default configuration. If a partial configuration file is specified
(with '-c' or '--config-file') the defaults are over-ridden by these values (internally the configuration
dictionary is updated with these values). Any other command line arguments take precedence over both the default
and those supplied in a configuration file (again the dictionary is updated).
The final configuration is validated before processing begins.
Parameters
----------
args : argparse.Namespace
Command line arguments passed into TopoStats.
Returns
-------
dict
The configuration dictionary.
"""
default_config = read_yaml(resources.files(__package__) / "default_config.yaml")
if args is not None:
config_file_arg: str | None = args.config_file
if config_file_arg is not None:
config = read_yaml(config_file_arg)
# Merge the loaded config with the default config to fill in any defaults that are missing
# Make sure to prioritise the loaded config, so it overrides the default
config = merge_mappings(map1=default_config, map2=config)
else:
# If no config file is provided, use the default config
config = default_config
else:
# If no args are provided, use the default config
config = default_config
# Override the config with command line arguments passed in, eg --output_dir ./output/
if args is not None:
config = update_config(config, args)
return config
[docs]
def _set_logging(log_level: str | None) -> None:
"""
Set the logging level.
Parameters
----------
log_level : str
String for the desired log-level.
"""
if log_level == "warning":
LOGGER.setLevel("WARNING")
elif log_level == "error":
LOGGER.setLevel("ERROR")
elif log_level == "debug":
LOGGER.setLevel("DEBUG")
else:
LOGGER.setLevel("INFO")
[docs]
def _log_setup(config: dict, args: argparse.Namespace | None, img_files: dict) -> None:
"""
Log the current configuration.
Parameters
----------
config : dict
Dictionary of configuration options.
args : argparse.Namespace | None
Arguments function was invoked with.
img_files : dict
Dictionary of image files that have been found.
"""
LOGGER.debug(f"Plotting configuration after update :\n{pformat(config['plotting'], indent=4)}")
LOGGER.info(f"Configuration file loaded from : {args.config_file}")
LOGGER.info(f"Scanning for images in : {config['base_dir']}")
LOGGER.info(f"Output directory : {str(config['output_dir'])}")
LOGGER.info(f"Looking for images with extension : {config['file_ext']}")
LOGGER.info(f"Images with extension {config['file_ext']} in {config['base_dir']} : {len(img_files)}")
if len(img_files) == 0:
LOGGER.error(f"No images with extension {config['file_ext']} in {config['base_dir']}")
LOGGER.error("Please check your configuration and directories.")
sys.exit()
LOGGER.info(f"Thresholding method (Filtering) : {config['filter']['threshold_method']}")
LOGGER.info(f"Thresholding method (Grains) : {config['grains']['threshold_method']}")
LOGGER.debug(f"Configuration after update : \n{pformat(config, indent=4)}") # noqa: T203
[docs]
def _parse_configuration(args: argparse.Namespace | None = None) -> tuple[dict, dict]:
"""
Load configurations, validate and check run steps are consistent.
Parameters
----------
args : None
Arguments.
Returns
-------
tuple[dict, dict]
Returns the dictionary of configuration options and a dictionary of image files found on the input path.
"""
# Parse command line options, load config (or default) and update with command line options
config = reconcile_config_args(args=args)
# Validate configuration
validate_config(config, schema=DEFAULT_CONFIG_SCHEMA, config_type="YAML configuration file")
# Set logging level
_set_logging(config["log_level"])
# Create base output directory
config["output_dir"].mkdir(parents=True, exist_ok=True)
# Load plotting_dictionary and validate then update with command line options
plotting_dictionary = (resources.files(__package__) / "plotting_dictionary.yaml").read_text()
config["plotting"]["plot_dict"] = yaml.safe_load(plotting_dictionary)
validate_config(
config["plotting"]["plot_dict"], schema=PLOTTING_SCHEMA, config_type="YAML plotting configuration file"
)
config["plotting"] = update_config(config["plotting"], args)
# Check earlier stages of processing are enabled for later.
check_run_steps(
filter_run=config["filter"]["run"],
grains_run=config["grains"]["run"],
grainstats_run=config["grainstats"]["run"],
disordered_tracing_run=config["disordered_tracing"]["run"],
nodestats_run=config["nodestats"]["run"],
ordered_tracing_run=config["ordered_tracing"]["run"],
splining_run=config["splining"]["run"],
)
# Ensures each image has all plotting options which are passed as **kwargs
config["plotting"] = update_plotting_config(config["plotting"])
img_files = find_files(config["base_dir"], file_ext=config["file_ext"])
_log_setup(config, args, img_files)
return config, img_files
[docs]
def process(args: argparse.Namespace | None = None) -> None: # noqa: C901
"""
Find and process all files.
Parameters
----------
args : None
Arguments.
"""
config, img_files = _parse_configuration(args)
processing_function = partial(
process_scan,
base_dir=config["base_dir"],
filter_config=config["filter"],
grains_config=config["grains"],
grainstats_config=config["grainstats"],
disordered_tracing_config=config["disordered_tracing"],
nodestats_config=config["nodestats"],
ordered_tracing_config=config["ordered_tracing"],
splining_config=config["splining"],
plotting_config=config["plotting"],
output_dir=config["output_dir"],
)
all_scan_data = LoadScans(img_files, **config["loading"])
all_scan_data.get_data()
# Get a dictionary of all the image data dictionaries.
# Keys are the image names
# Values are the individual image data dictionaries
scan_data_dict = all_scan_data.img_dict
with Pool(processes=config["cores"]) as pool:
results = defaultdict()
image_stats_all = defaultdict()
mols_results = defaultdict()
disordered_trace_results = defaultdict()
height_profile_all = defaultdict()
with tqdm(
total=len(img_files),
desc=f"Processing images from {config['base_dir']}, results are under {config['output_dir']}",
) as pbar:
for (
img,
result,
height_profiles,
individual_image_stats_df,
disordered_trace_result,
mols_result,
) in pool.imap_unordered(
processing_function,
scan_data_dict.values(),
):
results[str(img)] = result.dropna(axis=1, how="all")
disordered_trace_results[str(img)] = disordered_trace_result.dropna(axis=1, how="all")
mols_results[str(img)] = mols_result.dropna(axis=1, how="all")
pbar.update()
# Add the dataframe to the results dict
image_stats_all[str(img)] = individual_image_stats_df.dropna(axis=1, how="all")
# Combine all height profiles
height_profile_all[str(img)] = height_profiles
# Display completion message for the image
LOGGER.info(f"[{img.name}] Processing completed.")
LOGGER.info(f"Saving image stats to : {config['output_dir']}/image_stats.csv.")
# Concatenate all the dictionary's values into a dataframe. Ignore the keys since
# the dataframes have the file names in them already.
image_stats_all_df = pd.concat(image_stats_all.values())
image_stats_all_df.to_csv(config["output_dir"] / "image_stats.csv")
try:
results = pd.concat(results.values())
except ValueError as error:
LOGGER.error("No grains found in any images, consider adjusting your thresholds.")
LOGGER.error(error)
try:
disordered_trace_results = pd.concat(disordered_trace_results.values())
except ValueError as error:
LOGGER.error("No skeletons found in any images, consider adjusting disordered tracing parameters.")
LOGGER.error(error)
try:
mols_results = pd.concat(mols_results.values())
except ValueError as error:
LOGGER.error("No mols found in any images, consider adjusting ordered tracing / splining parameters.")
LOGGER.error(error)
# If requested save height profiles
if config["grainstats"]["extract_height_profile"]:
LOGGER.info(f"Saving all height profiles to {config['output_dir']}/height_profiles.json")
dict_to_json(data=height_profile_all, output_dir=config["output_dir"], filename="height_profiles.json")
# Summary Statistics and Plots
if config["summary_stats"]["run"]:
# Load summary plots/statistics configuration and validate, location depends on command line args or value in
# any config file given, if neither are provided the default topostats/summary_config.yaml is loaded
if args.summary_config is not None:
summary_config = read_yaml(args.summary_config)
elif config["summary_stats"]["config"] is not None:
summary_config = read_yaml(config["summary_stats"]["config"])
else:
summary_yaml = (resources.files(__package__) / "summary_config.yaml").read_text()
summary_config = yaml.safe_load(summary_yaml)
# Do not pass command line arguments to toposum as they clash with process command line arguments
summary_config = update_config(summary_config, config["plotting"])
validate_config(summary_config, SUMMARY_SCHEMA, config_type="YAML summarisation config")
# We never want to load data from CSV as we are using the data that has just been processed.
summary_config.pop("csv_file")
# Load variable to label mapping
plotting_yaml = (resources.files(__package__) / "var_to_label.yaml").read_text()
summary_config["var_to_label"] = yaml.safe_load(plotting_yaml)
LOGGER.info("[plotting] Default variable to labels mapping loaded.")
# If we don't have a dataframe or we do and it is all NaN there is nothing to plot
if isinstance(results, pd.DataFrame) and not results.isna().values.all():
if results.shape[0] > 1:
# If summary_config["output_dir"] does not match or is not a sub-dir of config["output_dir"] it
# needs creating
summary_config["output_dir"] = config["output_dir"] / "summary_distributions"
summary_config["output_dir"].mkdir(parents=True, exist_ok=True)
LOGGER.info(f"Summary plots and statistics will be saved to : {summary_config['output_dir']}")
# Plot summaries
summary_config["df"] = results.reset_index()
toposum(summary_config)
else:
LOGGER.warning(
"There are fewer than two grains that have been detected, so"
" summary plots cannot be made for this image."
)
else:
LOGGER.warning(
"There are no results to plot, either...\n\n"
"* you have disabled grains/grainstats etc.\n"
"* no grains have been detected across all scans.\n"
"* there have been errors.\n\n"
"If you are not expecting to detect grains please consider disabling"
"grains/grainstats etc/plotting/summary_stats. If you are expecting to detect grains"
" please check log-files for further information."
)
else:
summary_config = None
# Write statistics to CSV if there is data.
if isinstance(results, pd.DataFrame) and not results.isna().values.all():
results.reset_index(inplace=True)
results.set_index(["image", "threshold", "grain_number"], inplace=True)
results.to_csv(config["output_dir"] / "all_statistics.csv", index=True)
save_folder_grainstats(config["output_dir"], config["base_dir"], results, "grain_stats")
results.reset_index(inplace=True) # So we can access unique image names
images_processed = len(results["image"].unique())
else:
images_processed = 0
LOGGER.warning("There are no grainstats statistics to write to CSV.")
if isinstance(disordered_trace_results, pd.DataFrame) and not disordered_trace_results.isna().values.all():
disordered_trace_results.reset_index(inplace=True)
disordered_trace_results.set_index(["image", "threshold", "grain_number"], inplace=True)
disordered_trace_results.to_csv(config["output_dir"] / "all_disordered_segment_statistics.csv", index=True)
save_folder_grainstats(
config["output_dir"], config["base_dir"], disordered_trace_results, "disordered_trace_stats"
)
disordered_trace_results.reset_index(inplace=True) # So we can access unique image names
else:
LOGGER.warning("There are no disordered tracing statistics to write to CSV.")
if isinstance(mols_results, pd.DataFrame) and not mols_results.isna().values.all():
mols_results.reset_index(drop=True, inplace=True)
mols_results.set_index(["image", "threshold", "grain_number"], inplace=True)
mols_results.to_csv(config["output_dir"] / "all_mol_statistics.csv", index=True)
save_folder_grainstats(config["output_dir"], config["base_dir"], mols_results, "mol_stats")
mols_results.reset_index(inplace=True) # So we can access unique image names
else:
LOGGER.warning("There are no molecule tracing statistics to write to CSV.")
# Write config to file
config["plotting"].pop("plot_dict")
write_yaml(config, output_dir=config["output_dir"])
LOGGER.debug(f"Images processed : {images_processed}")
# Update config with plotting defaults for printing
completion_message(config, img_files, summary_config, images_processed)
# WIP these will be added from args once the dictionary keys have been wrangled
# pylint: disable=no-value-for-parameter
# pylint: disable=unused-argument
[docs]
def filters(args: argparse.Namespace | None = None) -> None:
"""
Load files from disk and run filtering.
Parameters
----------
args : None
Arguments.
"""
config, img_files = _parse_configuration(args)
run_filters()
[docs]
def grains(args: argparse.Namespace | None = None) -> None:
"""
Load files from disk and run grain finding.
Parameters
----------
args : None
Arguments.
"""
config, img_files = _parse_configuration(args)
run_grains()
[docs]
def grainstats(args: argparse.Namespace | None = None) -> None:
"""
Load files from disk and run grainstats.
Parameters
----------
args : None
Arguments.
"""
config, img_files = _parse_configuration(args)
run_grainstats()
[docs]
def disordered_tracing(args: argparse.Namespace | None = None) -> None:
"""
Load files from disk and run grainstats.
Parameters
----------
args : None
Arguments.
"""
config, img_files = _parse_configuration(args)
run_disordered_tracing()
[docs]
def nodestats(args: argparse.Namespace | None = None) -> None:
"""
Load files from disk and run grainstats.
Parameters
----------
args : None
Arguments.
"""
config, img_files = _parse_configuration(args)
run_nodestats()
[docs]
def ordered_tracing(args: argparse.Namespace | None = None) -> None:
"""
Load files from disk and run grainstats.
Parameters
----------
args : None
Arguments.
"""
config, img_files = _parse_configuration(args)
run_ordered_tracing()
[docs]
def splining(args: argparse.Namespace | None = None) -> None:
"""
Load files from disk and run grainstats.
Parameters
----------
args : None
Arguments.
"""
config, img_files = _parse_configuration(args)
run_splining()