trifusion.data.resources.background_tasks module

How to use background tasks

Tasks that are too time consuming to be executed in TriFusion’s main thread are defined here. These functions should be called from TriFusionApp in a threading.Thread object. These functions must rely on multiprocessing.Namespace and Queue.Queue objects to transfer data between the worker and main thread.

Generic creation of background tasks in TriFusionApp

Any method in TriFusionApp can start a task in a worker thread, provided it follows a few guidelines described below. However, for most cases, there is already a run_in_background() method that greatly facilitates this process:

def run_in_background(self, func, second_func, args1, args2=None,
                      no_arg2=False, msg="Crunching data...",
                      cancel=True):

Suppose you have defined a function (my_func) in background_tasks that is meant to run in the background. This function may take any number of arguments, but in this example it takes only one. To execute this function in the background, simply call run_in_background() like this:

run_in_background(my_func, None, args1=[my_arg])

This is the simplest example. In many cases my_func may return something, and we may want to feed that something into another callback in the main thread to update application structures or to perform any other task. This can be easily accomplished with this method using the second_func and args2 arguments:

run_in_background(my_func, my_callback, args1=[my_arg], args2=[other_arg])

In this case, the object returned by my_func will be passed directly to my_callback. Since we also defined arguments to my_callback, the argument list will be merged before calling my_callback. It’s important to note that my_callback is being called from the main thread, which means that is can change application structures, but it can also freeze the application window if it’s too intensive.

Custom creation of background tasks

A TriFusionApp method that need to executed some functio in the background must follow some guidelines to ensure that it will start and end properly.

Use a Thread object

A worker thread can be initiated like:

p = threading.Thread(target=background_process,
                     args=(func, shared_ns, args1))
p.start()

The background task is provided in the target argument, and any potential arguments in the args argument.

Create and launch a waiting dialog

While the background task is being executed, a dialog of some sort should be created in the main thread and ideally show some progress. Any custom dialog can be created, but a general CrunchData dialog is already available:

content = CrunchData()
# Add a custom message
content.ids.msg.text = msg
# Create popup with waiting dialog
self.show_popup(title="", content=content, size=size,
                separator_color=(0, 0, 0, 0),
                border_color=tm.c_popup_border,
                auto_dissmiss=False)

Schedule function that checks the worker thread’s pulse

In order to check the pulse of the worker thread and/or receive information from it while it’s busy, a function can be scheduled to be called at regular intervals using kivy’s Clock object:

# Schedule function that checks the process' pulse
check_func = partial(check_process_status, p, shared_ns)
Clock.schedule_interval(check_func, .1)

The check_process_status function may execute anything, like checking the Namespace or Queue objects of the worker thread to update the progress. The most important thing, however, is to check if the worker thread is alive, and if not, unschedule itself, close the waiting popup, join the thread, close any connections and relevant objects:

def check_process_stats(self, p, shared_ns):

    if not p.is_alive():
        Clock.unschedule(check_func)
        self.dismiss_popup()
        p.join()

If the function in the worker thread returns some object, this woul be the place to get that object and pass it to another callback:

def check_process_stats(self, p, shared_ns):

    if not p.is_alive():

        obj = queue.get()
        self.my_callback(obj)

        Clock.unschedule(check_func)
        self.dismiss_popup()
        p.join()

Add a kill switch

Whenever possible, it’s desirable to add a kill switch to the check_process_status function, which changes the Namespace.stop attribute to True, signaling the worker thread to stop:

def check_process_stats(self, p, shared_ns):

    if self.terminate_background:
        shared_ns.stop = True
        time.sleep(1)
        self.dismiss_popup()
        Clock.unschedule(check_func)
trifusion.data.resources.background_tasks.background_export_groups(f, nm, a)[source]

Specific callback for exporting Orthology groups.

Parameters:

f : function

Callback function.

nm : multiprocessing.Namespace

Namespace object that allows communication between main and worker threads.

a : list

List of arguments provided to the f function.

trifusion.data.resources.background_tasks.background_process(f, ns, a)[source]

General execution of a background process.

Allows a generic function to be executed in the background with or without arguments, provided via the a argument.

Parameters:

f : function

Callback function.

ns : multiprocessing.Namespace

Namespace object that allows communication between main and worker threads.

a : list

List of arguments provided to the f function. Can be None.

trifusion.data.resources.background_tasks.get_active_group(ortho_groups, old_active_group, active_group_name)[source]

Task that retrieves the active GroupLight object.

Parameters:

ortho_groups : trifusion.ortho.OrthomclToolbox.MultiGroupsLight

MultiGroupsList object.

old_active_group : str

Previous active GroupLight object.

active_group_name :

Name of the GroupLight object that will be active.

Returns:

active_group : trifusion.ortho.OrthomclToolbox.GroupLight

GroupLight object.

trifusion.data.resources.background_tasks.get_orto_data(active_group, plt_idx, filt, exclude_taxa)[source]

Creates plot data for orthology

Given a GroupLight object, this function will execute the method that corresponds to plt_idx to generate data.

Parameters:

active_group : trifusion.ortho.OrthomclToolbox.GroupLight

GroupLight object.

plt_idx : str

Identifier of the plot type that must have a correspondence in the method dictionary below.

filt : list

List with orthology filters.

exclude_ taxa : list

List of taxa to be excluded.

trifusion.data.resources.background_tasks.get_stats_data(aln_obj, stats_idx, active_file_set, active_taxa_set, additional_args, ns=None)[source]

Task that retrieves the plot data from AlignmentList plot methods

Given an aln_obj, this function will execute the according method to generate plot data.

Parameters:

aln_obj : trifusion.process.sequence.AlignmentList

AlignmentList object.

stats_idx : str

Identifier of the method type that must have a correspondence in the methods dictionary below.

active_filte_set : str

Name of the active file set.

active_taxa_set : str

Name of the active taxa set.

additional_args : dict

Dictionary with keyword arguments that can be provided when the plot data method is called.

ns : multiprocessing.Namespace

Namespace object that allows communication between main and worker threads.

trifusion.data.resources.background_tasks.get_stats_summary(dest, aln_list, active_file_set, active_taxa_set, ns)[source]

Calculates Statistic’s summary statistics.

Executes the get_summary_stats method in the background and writes the output in pickle files.

Parameters:

dest : str

Path to directory where the pickle objects with the results will be created.

aln_list : trifusion.process.sequence.AlignmentList

AlignmentList object.

active_file_set : list

List with the active alignments via their Alignment.name attribute.

active_taxa_set : list

List with the active taxa.

ns : multiprocessing.Namespace

Namespace object that allows communication between main and worker threads.

trifusion.data.resources.background_tasks.load_group_files(group_files, temp_dir, ns=None)[source]

Task that loads orthology group files into TriFusion

Parameters:

group_files : list

List of paths to group files.

temp_dir : str

Temporary directory where sqlite database will be created.

ns : multiprocessing.Namespace

Namespace object that allows communication between main and worker threads.

Returns:

og : trifusion.ortho.OrthomclToolbox.MultiGroupsLight

MultiGroupsList object.

og.filters : list

List of filters for the MultiGroupsList object.

trifusion.data.resources.background_tasks.load_proc(aln_list, file_list, nm, queue)[source]

Task that loads alignment files into TriFusion.

Loads alignment files provided via the file_list argument into the AlignmentList object provided via aln_list.

Parameters:

aln_list : trifusion.process.sequence.AlignmentList

AlignmentList object.

file_list : list

List of paths to alignment files.

nm : multiprocessing.Namespace

Namespace object that allows communication between main and worker threads.

queue :Queue.Queue

Queue object used to transfer the AlignmentList to the main thread.

trifusion.data.resources.background_tasks.orto_execution(nm, temp_dir, proteome_files, protein_min_len, protein_max_stop, usearch_file, usearch_evalue, usearch_threads, usearch_output, mcl_file, mcl_inflation, ortholog_prefix, group_prefix, orto_max_gene, orto_min_sp, sqldb, ortho_dir, usearch_db)[source]

Execution of the orthology search pipeline.

Executes all pipeline subprocesses sequentially and updates the Progess dialog label.

Parameters:

nm : multiprocessing.Namespace

Namespace object that allows communication between main and worker threads.

temp_dir : str

Path to TriFusion’s temporary directory.

proteome_files : list

List of pahts to proteome files.

protein_min_len : int

Minimum lenght of protein sequences.

protein_max_stop : int

Maximum percentage of stop codons allowed.

usearch_file : str

Path to usearch executbale.

usearch_evalue: int or float

Evalue for usearch execution.

usearch_threads : int

Number of threads used by usearch execution.

usearch_output : str

Name of usearch’s output file.

mcl_file : str

Path to the mcl executable.

mcl_inflation : list

List of inflation values (int) to perform at the end of the orthology search.

ortholog_prefix : str

Prefix for the name of the orthologs.

group_prefix : str

Prefix for the name of the group files.

orto_max_gene : int

Maximum number of gene copies allowed when filtering the search results.

orto_min_sp : int

Minimum number of taxa representation when filtering the search results.

sqldb : str

Path to the sqlite database.

ortho_dir : str

Path to the directory where the results will be generated.

usearch_db : str

Name of the file used as database for usearch.

trifusion.data.resources.background_tasks.orto_update_filters(ortho_groups, gn_filter, sp_filter, excluded_taxa, group_names=None, default=False)[source]

Task that updates filters of a MultiGroupsLight object

Parameters:

ortho_groups : trifusion.ortho.OrthomclToolbox.MultiGroupsLight

MultiGroupsList object.

gn_filter : int

Filter for maximum gene copies.

sp_filter : int

Filter for minimum taxa representation.

excluded_taxa : list

List of taxa to be excluded.

group_names : list

List with name of group files.

default : bool

If True, the default filters will be used.

Returns:

orto_groups : trifusion.ortho.OrthomclToolbox.MultiGroupsLight

MultiGroupsList object.

trifusion.data.resources.background_tasks.process_execution(aln_list, file_set_name, file_list, file_groups, filename_map, taxa_set_name, active_taxa_list, ns, taxa_groups, hap_prefix, secondary_operations, secondary_options, missing_filter_settings, taxa_filter_settings, codon_filter_settings, variation_filter_settings, output_file, rev_infile, main_operations, zorro_suffix, partitions_file, output_formats, create_partfile, use_nexus_partitions, use_nexus_models, phylip_truncate_name, output_dir, use_app_partitions, consensus_type, ld_hat, ima2_params, conversion_suffix)[source]

The Process execution

Parameters:

aln_list : trifusion.process.sequence.AlignmentList

AlignmentList object.

file_set_name : str

Name of the active file group.

file_list : list

List of alignment files loaded into TriFusion.

file_groups : dict

Maps the name of custom file groups to a list of alignment files.

filename_map : dict

Maps the basename of aligment files to their full path.

taxa_set_name : str

Name of the active taxa group.

active_taxa_list : list

List of active taxa.

ns : multiprocessing.Namespace

Namespace object that allows communication between main and worker threads.

taxa_groups : dict

Maps the name of custom taxa groups to a list of taxon names.

hap_prefix : str

See hap_prefix attribute.

secondary_operations : dict

See secondary_operations attribute.

secondary_options : dict

See secondary_options attribute.

missing_filter_settings : list

See missing_filter_settings attribute.

taxa_filter_settings : list

See taxa_filter_settings attribute.

codon_filter_settings: list

See codon_filter_settings attribute.

variation_filter_settings : list

See variation_filter_settings attribute.

output_file : str

Name of the output file.

rev_infile : str

See rev_infile attribute.

main_operations : dict

See main_operations attribute.

zorro_suffix : str

See zorro_suffix attribute.

partitions_file : str

See partitions_file attribute.

output_formats : list

See output_formats attribute.

create_partfile : bool

See create_partfile attribute.

use_nexus_partitions : bool

See use_nexus_partitions attribute.

use_nexus_models : bool

See use_nexus_models attribute.

phylip_truncate_name : bool

See phylip_truncate_name attribute.

output_dir : str

Path to directory where the output file(s) will be generated.

use_app_partitions : bool

See use_app_partitions attribute.

consensus_type : str

Mode of consensus variation handling.

ld_hat : bool

See ld_hat attribute.

ima2_params :

See ima2_options attribute.

conversion_suffix : str

See conversion_suffix attribute.

trifusion.data.resources.background_tasks.remove_tmp(temp_dir, sql_con)[source]

Removes TriFusion’s temporary directory and closes sqlite connection.

Removes the temporary directory and all its contents and closes the connection to the sqlite database.

Parameters:

temp_dir : str

Path to the temporary directory

sql_con : sqlite3.Connection

Sqlite3 connection object

trifusion.data.resources.background_tasks.update_active_fileset(aln_obj, set_name, file_list, file_groups, filename_map)[source]

Upates the active files of an AlignmentList object

This method is similar in purpose to AlignmentList.update_active_alignments but it can convert the set name of the active group defined in TriFusion to an actual list of files.

Parameters:

aln_obj : trifusion.process.sequence.AlignmentList

AlignmentList object.

set_name : str

Name of the active file group.

file_list : list

List of alignment files loaded into TriFusion.

file_groups : dict

Maps the name of custom file groups to a list of alignment files.

filename_map : dict

Maps the basename of aligment files to their full path.

trifusion.data.resources.background_tasks.update_active_taxaset(aln_obj, set_name, active_taxa_list, taxa_groups)[source]

Upates the active taxa of an AlignmentList object

This method is similar in purpose to AlignmentList.update_taxa_names but it can convert the set name of the active group defined in TriFusion to an actual list of taxa.

Parameters:

aln_obj : trifusion.process.sequence.AlignmentList

AlignmentList object.

set_name : str

Name of the active taxa group.

active_taxa_list : list

List of active taxa.

taxa_groups : dict

Maps the name of custom taxa groups to a list of taxon names.