How to add modules#
Basic Functionality#
This should reside in topostats/
or optionally topostats/<sub_dir>
Unit tests should be written for each function, classes and method and integration tests to ensure the overall functionality is as expected now and in the future should refactoring be undertaken.
Adding configuration#
Configuration options live in topostats/default_config.yaml
you should create a nested object for the options
corresponding to your module. The configuration nomenclature should match exactly the options of your modules
class/function as this allows the use of **kwargs
to be used to pass the options from the loaded
dictionary to the function without having to explicitly map them to the class/function arguments. This might seem fickle
or excessive but it saves you and others work in the long run.
Modularity#
TopoStats is a command line program (for now at least!) and the main command topsotats
has sub-commands which allow
individual steps of the processing to be undertaken, making it faster to test out the impact of different options. For
example once loaded and saved as .topostats
files the filter
stage could be re-run to improve flattening. Once
flattening is complete changes in configuration could be made to detect different grains or features using the already
flattened images.
Once you have a module that works it should ideally be included in this pipeline. Here we described how to include it.
Processing#
Each module needs a processing function. This is defined the topostats/processing.py
module and should be called from
the process_all()
function to include it in the end to end pipeline.
process_all
#
All of these modules are run in order to process individual images in parallel and this is achieved via the
process_all()
function which has a parameter for a single image and the configuration dictionary and calls each module
in turn.
This is run in parallel using the functools.partial()
and Pool
. The former takes
the process_all()
function as an argument along with all other configuration options. This is combined with a list of
images to be processed and run through the Pool
class (see examples below).
To add your functionality to this pipeline you should add the calls to your class and its method to the process_all()
function.
Modular Analyses#
It is however useful to be able to call the function in isolation, this means we can add in sub-commands to
topostats
so that we can experiment with different configurations without having to run the whole pipeline for all
images in one big batch job.
As an example we will look at the Filter
class and how it is added as a sub-command.
The topostats.processing.run_filters()
command is the processing command that
takes in the various options, instantiates an instance of the Filters()
class and runs the various methods required to
process the image. There are other classes such as Grains()
, Grainstats()
, Tracing()
which form part of the
pipeline.
Adding Sub-Command Arguments#
You need to add a sub-parser for your module to run it in isolation. To do this entries need to be made to
topostats/entry_point.py
which sets up argparse
. At the top level the create_parser()
function
is defined where the parser
has the main options shown by topostats --help
are defined. Each sub-command has its own
parser, the most commonly used is process_parser
which defines the arguments to and the help shown by topostats process --help
. A filter_parser
subparser is added which defines the arguments and
help shown by topostats filter
and this has various arguments added to it with .add_argument()
. Note it is important
to ensure that the dest
for each added argument matches the name used in the default_config.yaml
as these values are
used to update the loaded dictionary. If the names don’t match the values do not get updated (and/or an error will
occur).
# Filter
filter_parser = subparsers.add_parser(
"filter",
description="Load and filter images, saving as .topostats files for subsequent processing.",
help="Load and filter images, saving as .topostats files for subsequent processing.",
)
filter_parser.add_argument(
"--row-alignment-quantile",
dest="row_alignment_quantile",
type=float,
required=False,
help="Lower values may improve flattening of larger features.",
)
filter_parser.add_argument(
"--threshold-method",
dest="threshold_method",
type=str,
required=False,
help="Method for thresholding Filtering. Options are otsu, std_dev, absolute.",
)
filter_parser.add_argument(
"--otsu-threshold-multiplier",
dest="otsu_threshold_multiplier",
type=float,
required=False,
help="Factor for scaling the Otsu threshold during Filtering.",
)
filter_parser.add_argument(
"--threshold-std-dev-below",
dest="threshold_std_dev_below",
type=float,
required=False,
help="Threshold for data below the image background for std dev method during Filtering.",
)
...
filter_parser.add_argument(
"--scars-max-scar-length",
dest="scars_max_scar_length",
type=int,
required=False,
help="Maximum length of scars in pixels",
)
# Run the relevant function with the arguments
filter_parser.set_defaults(func=run_modules.filters)
Create a subparser with
<module_name>_parser = subparsers.add_parser(...)
.Add arguments to subparser with
<module_name>_parser.add_argument(...)
, include…
Single letter flag (optional).
Longer flag (required).
dest
the destination to which the variable will be saved. This should match the corresponding value in thedefault_config.yaml
which makes updating the configuration options straight-forward and will occur automatically (the code is in place you don’t need to add anything, just ensure the names match).type
should ideally be included as this helps define the type of the stored variable, particularly useful if for example paths are provided which should be stored asPath
pathlib objects.default
a sensible default value, typicallyNone
though so that the value indefault_config.yaml
is used.help
a useful description of what the configuration option changes along with possible options.
Set the default function that will be called with
<module_name>_parser.set_defaults(func=run_modules.<function>)
which will call the function you have defined intopostats/run_modules.py
which runs your module.
NB In the above substitute <module_name>
for the meaningful name you have created for your module.
Running in Parallel#
One of the main advantages of TopoStats is the ability to batch process images. As modern computers typically have
multiple cores it means the processing can be done in parallel making the processing faster. To achieve this we use
wrapper functions in topostats/run_modules.py
that leverage the Pool
multiprocessing class.
Your function should first load the .topostats
files you wish to process that are found from the user specified path
(default being the current directory ./
).
You then need to define a partial()
function with the topostats/processing.py<your_module_processing_function>
as
the first argument and the remaining configuration options. These will typically be a subset/list from the
default_config.yaml
where the configuration options use the exact same names as the arguments of the function you
defined your topostats/processing.py
. As mentioned above keeping configuration names consistent between configuration
files and functions means **kwargs
can be used when passing options to functions.
Continuing with our example let’s look at the [topostats.processing.run_filters()
][topostats_entry_point_filters]
function.
def run_filters(
unprocessed_image: npt.NDArray,
pixel_to_nm_scaling: float,
filename: str,
filter_out_path: Path,
core_out_path: Path,
filter_config: dict,
plotting_config: dict,
) -> npt.NDArray | None:
"""
Filter and flatten an image. Optionally plots the results, returning the flattened image.
Parameters
----------
unprocessed_image : npt.NDArray
Image to be flattened.
pixel_to_nm_scaling : float
Scaling factor for converting pixel length scales to nanometres.
ie the number of pixels per nanometre.
filename : str
File name for the image.
filter_out_path : Path
Output directory for step-by-step flattening plots.
core_out_path : Path
General output directory for outputs such as the flattened image.
filter_config : dict
Dictionary of configuration for the Filters class to use when initialised.
plotting_config : dict
Dictionary of configuration for plotting output images.
Returns
-------
npt.NDArray | None
Either a numpy array of the flattened image, or None if an error occurs or
flattening is disabled in the configuration.
"""
if filter_config["run"]:
filter_config.pop("run")
LOGGER.debug(f"[{filename}] Image dimensions: {unprocessed_image.shape}")
LOGGER.info(f"[{filename}] : *** Filtering ***")
filters = Filters(
image=unprocessed_image,
filename=filename,
pixel_to_nm_scaling=pixel_to_nm_scaling,
**filter_config,
)
filters.filter_image()
# Optionally plot filter stage
if plotting_config["run"]:
plotting_config.pop("run")
LOGGER.info(f"[{filename}] : Plotting Filtering Images")
if (
"all" in plotting_config["image_set"]
or "filters" in plotting_config["image_set"]
):
filter_out_path.mkdir(parents=True, exist_ok=True)
LOGGER.debug(
f"[{filename}] : Target filter directory created : {filter_out_path}"
)
# Generate plots
for plot_name, array in filters.images.items():
if plot_name not in ["scan_raw"]:
if plot_name == "extracted_channel":
array = np.flipud(array.pixels)
plotting_config["plot_dict"][plot_name]["output_dir"] = (
core_out_path
if plotting_config["plot_dict"][plot_name]["core_set"]
else filter_out_path
)
try:
Images(
array, **plotting_config["plot_dict"][plot_name]
).plot_and_save()
Images(
array, **plotting_config["plot_dict"][plot_name]
).plot_histogram_and_save()
except AttributeError:
LOGGER.info(
f"[{filename}] Unable to generate plot : {plot_name}"
)
plotting_config["run"] = True
# Always want the 'z_threshed' plot (aka "Height Thresholded") but in the core_out_path
plot_name = "z_threshed"
plotting_config["plot_dict"][plot_name]["output_dir"] = core_out_path
Images(
filters.images["gaussian_filtered"],
filename=filename,
**plotting_config["plot_dict"][plot_name],
).plot_and_save()
LOGGER.info(f"[{filename}] : Filters stage completed successfully.")
return filters.images["gaussian_filtered"]
# Otherwise, return None and warn that initial processing is disabled.
LOGGER.error(
"You have not included running the initial filter stage. This is required for all subsequent "
"stages of processing. Please check your configuration file."
)
return None
This instantiates (creates) the object filters
of the class Filters
with the supplied options and then runs the
filter_image()
method to perform the filtering. The rest of the code determines what images to plot based on the
configuration. At the end the gaussian_filtered
image is returned.
This function only processes a single image. As mentioned we use the functools.partial()
function to
define a function with the command we want to run, in this case topostats.processing.run_filters()
(imported as just
filters()
) as the first argument and then all of the parameters we would normally pass in to filters()
as the
remainder arguments. This is then used with the Pool
class and given a list of the images that are to
be processed and the partial
ly defined function is run with each image.
In our example the run_modules.filters()
this looks like the following.
def filters(args: argparse.Namespace | None = None) -> None:
"""
Load files from disk and run filtering.
Parameters
----------
args : None
Arguments.
"""
config, img_files = _parse_configuration(args)
# If loading existing .topostats files the images need filtering again so we need to extract the raw image
if config["file_ext"] == ".topostats":
config["loading"]["extract"] = "raw"
all_scan_data = LoadScans(img_files, **config["loading"])
all_scan_data.get_data()
processing_function = partial(
process_filters,
base_dir=config["base_dir"],
filter_config=config["filter"],
plotting_config=config["plotting"],
output_dir=config["output_dir"],
)
with Pool(processes=config["cores"]) as pool:
results = defaultdict()
with tqdm(
total=len(img_files),
desc=f"Processing images from {config['base_dir']}, results are under {config['output_dir']}",
) as pbar:
for img, result in pool.imap_unordered(
processing_function,
all_scan_data.img_dict.values(),
):
results[str(img)] = result
pbar.update()
# Display completion message for the image
LOGGER.info(f"[{img}] Filtering completed.")
# Write config to file
config["plotting"].pop("plot_dict")
write_yaml(config, output_dir=config["output_dir"])
LOGGER.debug(f"Images processed : {len(results)}")
# Update config with plotting defaults for printing
completion_message(
config, img_files, summary_config=None, images_processed=sum(results.values())
)
The files that are to be processed are first loaded to give the list of images that need processing and the partial()
function is defined as processing_function
with all of the arguments before running in parallel using Pool
. The
tqdm
package is leverage to give a progress bar and after completion the configuration file is written to file
before a completion message is run.
Results#
Before we start the parallel processing we create a dictionary results = defaultdict()
which will have the results of
processing added to. Some steps in processing return more than one object, for example grainstats
returns statistics
for the image along with height profiles. In such cases we need to instantiate a dictionary to hold each set of results
across images and included a holder in the for ... in pool.imap_unordered(...):
to store the results before adding the
results to the dictionary. For run_modules.grainstats()
this looks like the below code. We create two dictionaries
results
and height_profile_all
and in our call for for
we have img
(a string representing the image name),
result
(the returned Pandas DataFrame of grain statistics for the given image) and height_profiles
(the height
profile dictionary for the grains in that image).
with Pool(processes=config["cores"]) as pool:
results = defaultdict()
height_profile_all = defaultdict()
with tqdm(
total=len(img_files),
desc=f"Processing images from {config['base_dir']}, results are under {config['output_dir']}",
) as pbar:
for img, result, height_profiles in pool.imap_unordered(
processing_function,
all_scan_data.img_dict.values(),
):
results[str(img)] = results
height_profile_all[str(img)] = height_profiles
pbar.update()
# Display completion message for the image
LOGGER.info(
f"[{img}] Grainstats completed (NB - Filtering was *not* re-run)."
)
Note that your processing.process_<stage>
function which is used in the call to processing_function
should return a
tuple, the first item of which is the topostats_object["filename"]
(which will be stored in img
and used as
dictionary keys), the remaining items are the results that you expect to be returned.
Conclusion#
Adding functionality is useful but it has to integrate into the workflow and ideally be accessible as a stand alone step in the process. Hopefully the above helps demystify the steps required to achieve this.