Source code for genome_kmers.kmers

import shelve
from pathlib import Path
from typing import Callable, Generator, Union

import h5py
import numba as nb
import numpy as np
from numba import jit
from numba.misc import quicksort

from genome_kmers.sequence_collection import SequenceCollection


[docs] @jit def kmer_filter_keep_all(sba: np.array, sba_strand: str, kmer_sba_start_idx: int): return True
[docs] def gen_kmer_length_filter_func(min_kmer_len: int) -> Callable: """ Generates a filter that passes if kmer is of min_kmer_len, but otherwise fails. Args: min_kmer_len (int): minimum required kmer length Returns: Callable: filter """ @jit def filter(sba: np.array, sba_strand: str, kmer_sba_start_idx: int) -> bool: return kmer_has_required_len(sba, kmer_sba_start_idx, min_kmer_len) return filter
[docs] def gen_kmer_homopolymer_filter_func(max_homopolymer_size: int, kmer_len: int) -> Callable: """ Generate a filter function that passes if there is no homopolym of length greater than max_homopolymer_size. NOTE: this function does not do extra checks to ensure that the kmer itself is valid (i.e. it does overflow over a boundary or beyond the sequence byte array, is made of valid bases, etc). Args: max_homopolymer_size (int): largest allowed homopolymer. Must be >= 1 kmer_len (int): length of kmer Raises: ValueError: max_homopolymer_size must be >= 2 Returns: Callable: filter """ # verify that max_homopolymer is valid if max_homopolymer_size < 1: raise ValueError(f"max_homopolymer_size ({max_homopolymer_size}) must be >= 1") # verify that kmer_len is valid if kmer_len < 1: raise ValueError(f"kmer_len ({kmer_len}) must be >= 1") @jit def filter(sba: np.array, sba_strand: str, kmer_sba_start_idx: int): # verify that a kmer starting at kmer_sba_start_idx and of length kmer_len does not overflow if kmer_sba_start_idx + kmer_len - 1 >= len(sba): raise ValueError( f"The kmer_len ({kmer_len}) requested is too large for kmer_sba_start_idx ({kmer_sba_start_idx})" ) # if the length of the kmer is less than the max allowed homopolymer, we don't need to do # any checks if kmer_len < max_homopolymer_size: return True # iterate base by base counting up the homopolymer size homopolymer_size = 1 for kmer_idx in range(1, kmer_len): sba_idx = kmer_sba_start_idx + kmer_idx base = sba[sba_idx] prev_base = sba[sba_idx - 1] if base == ord("$"): raise ValueError( f"The kmer_len ({kmer_len}) requested is too large for kmer_sba_start_idx ({kmer_sba_start_idx})" ) # if part of a homopolymer, increment homopolymer_size if base == prev_base: homopolymer_size += 1 # if homopolymer size has exceeded the max, return False if homopolymer_size > max_homopolymer_size: return False # otherwise, reset else: homopolymer_size = 1 return True return filter
[docs] def gen_kmer_gc_content_filter_func( min_allowed_gc_frac: float, max_allowed_gc_frac: float, kmer_len: int, ) -> Callable: """ Generate a filter function that returns True if fraction GC content is between min_allowed_gc_frac and max_allowed_gc_frac. NOTE: this function does not do extra checks to ensure that the kmer itself is valid (i.e. it does overflow over a boundary or beyond the sequence byte array, is made of valid bases, etc). Args: min_allowed_gc_frac (float): minimum allowed fraction GC content (inclusive). Must be in the range [0.0, 1.0] and be <= max_allowed_gc_frac. max_allowed_gc_frac (float): maximum allowed fraction GC content (inclusive). Must be in the range [0.0, 1.0] and be >= min_allowed_gc_frac. kmer_len (int): length of kmer Raises: ValueError: min_allowed_gc_frac or max_allowed_gc_frac is invalid Returns: Callable: filter """ # check that min and max_allowed_gc_frac are valid if min_allowed_gc_frac > max_allowed_gc_frac: raise ValueError( f"min_allowed_gc_frac ({min_allowed_gc_frac}) must be <= max_allowed_gc_frac ({max_allowed_gc_frac})" ) if min_allowed_gc_frac < 0.0 or min_allowed_gc_frac > 1.0: raise ValueError( f"min_allowed_gc_frac ({min_allowed_gc_frac}) must be in the range [0.0, 1.0]" ) if max_allowed_gc_frac < 0.0 or max_allowed_gc_frac > 1.0: raise ValueError( f"max_allowed_gc_frac ({max_allowed_gc_frac}) must be in the range [0.0, 1.0]" ) # calculate the min and max GC counts allowed min_allowed_gc_count = int(np.ceil(kmer_len * min_allowed_gc_frac)) max_allowed_gc_count = int(np.floor(kmer_len * max_allowed_gc_frac)) # store values for "G" and "C" g_val = ord("G") c_val = ord("C") @jit def filter(sba: np.array, sba_strand: str, kmer_sba_start_idx: int): # if it's not possible to meet the fraction requirements, return False # # Example of the edge case where two fractions are so close that it's not possible to meet # the requirements # kmer_len = 16 # min_allowed_gc_count = 0.13 # max_allowed_gc_count= 0.18 # # 0.18 * 16 = 2.88 # min_allowed_gc_count = ceil(0.13 * 16) # = ceil(2.08) # = 3 # max_allowed_gc_count = floor(0.18 * 16) # = floor(2.88) # = 2 if max_allowed_gc_count < min_allowed_gc_count: return False # loop through kmer counting the GC bases gc_count = 0 for kmer_idx in range(kmer_len): sba_idx = kmer_sba_start_idx + kmer_idx base = sba[sba_idx] if base == ord("$"): raise ValueError( f"The kmer_len ({kmer_len}) requested is too larger for kmer_sba_start_idx ({kmer_sba_start_idx})" ) if base == g_val or base == c_val: gc_count += 1 # if we have already exceeded the max allowed GC counts, return False if gc_count > max_allowed_gc_count: return False # if the GC count is within the required range, return True if min_allowed_gc_count <= gc_count and gc_count <= max_allowed_gc_count: return True return False return filter
[docs] def gen_no_ambiguous_bases_filter(kmer_len: int) -> Callable: """ Generate a filter that checks that only "A", "T", "G", or "C" are present in the kmer. Args: kmer_len (int): kmer length Raises: ValueError: end of segment or sequence byte array is reached before end of kmer Returns: Callable: no_ambiguous_bases_filter """ @jit def no_ambiguous_bases_filter(sba: np.array, sba_strand: str, kmer_sba_start_idx: int): # check that kmer_len is valid given kmer_sba_start_idx if kmer_sba_start_idx + kmer_len > len(sba): raise ValueError(f"kmer_len ({kmer_len}) is invalid. It extends beyond len(sba)") passes = True for i in range(kmer_len): base = sba[kmer_sba_start_idx + i] # check whether the end of the segment has been reached # ord("$") == 36 if base == 36: raise ValueError(f"end of segment was reached. kmer_len ({kmer_len}) invalid.") # check if it's not any of A, T, G, C # ord("A") == 65, ord("T") == 84, ord("G") == 71, ord("C") == 67 if base != 65 and base != 84 and base != 71 and base != 67: passes = False break return passes return no_ambiguous_bases_filter
[docs] @jit def crispr_ngg_pam_filter(sba: np.array, sba_strand: str, kmer_sba_start_idx: int) -> bool: """ Generate a filter that passes for all 23-mers ending in GG. NOTE: no other checks on kmer validity are carried out. Raises: ValueError: kmer extend beyond the size of the sequence byte array Returns: bool: whether kmer passes filter or not """ # diagram of a SpyCas9 CRISPR guide # 0 19 # | | # --------------------NGG # [ target ]PAM # check that arguments are valid if kmer_sba_start_idx + 23 > len(sba): raise ValueError("The guide defined at this start index extends beyond the sba") # check whether there is an NGG PAM # ord("G") == 71 if sba[kmer_sba_start_idx + 21] == 71 and sba[kmer_sba_start_idx + 22] == 71: return True return False
[docs] @jit def kmer_has_required_len(sba: np.array, sba_start_idx: int, min_kmer_len: int) -> bool: """ Checks whether the kmer is of at least min_kmer_len before reaching the end of the segment. Args: sba (np.array): sequence byte array sba_start_idx (int): sequence byte array start index for the kmer min_kmer_len (int): minimum kmer length Returns: bool: """ for idx in range(sba_start_idx, sba_start_idx + min_kmer_len): # determine whether the kmer has reached the end of a segment idx_is_at_end_of_segment = True if idx >= len(sba) or sba[idx] == ord("$") else False # if end of segment has been reached, the kmer is invalid if idx_is_at_end_of_segment: return False return True
[docs] def get_compare_sba_kmers_func(kmer_len): @jit def compare_sba_kmers_func(sba_a, sba_b, kmer_sba_start_idx_a, kmer_sba_start_idx_b): return compare_sba_kmers_lexicographically( sba_a, sba_b, kmer_sba_start_idx_a, kmer_sba_start_idx_b, max_kmer_len=kmer_len ) return compare_sba_kmers_func
[docs] @jit def compare_sba_kmers_always_less_than( sba_a: np.array, sba_b: np.array, kmer_sba_start_idx_a: int, kmer_sba_start_idx_b: int, max_kmer_len: Union[int, None] = None, ) -> tuple[int, int]: return -1, 0
[docs] @jit def compare_sba_kmers_lexicographically( sba_a: np.array, sba_b: np.array, kmer_sba_start_idx_a: int, kmer_sba_start_idx_b: int, max_kmer_len: Union[int, None] = None, ) -> tuple[int, int]: """ Lexicographically compare two kmers of length kmer_len. If kmer_len is None, the end of the segment defines the longest kmer. NOTE: This function does no validation for kmer_len. It will compare up to max_kmer_len bases if required, but it will return as soon as the comparison result is known. Args: sba_a (np.array): sequence byte array a sba_b (np.array): sequence byte array b kmer_sba_start_idx_a (int): index in sba that defines the start of kmer a kmer_sba_start_idx_b (int): index in sba that defines the start of kmer b kmer_len (Union[int, None], optional): Length of the kmers to compare. If None, the end of the segment defines the longest kmers to compare.. Defaults to None. Raises: AssertionError: there were no valid bases to compare Returns: tuple[int, int]: comparison, last_kmer_index_compared comparison: +1 = kmer_a > kmer_b 0 = kmer_a == kmer_b -1 = kmer_a < kmer_b last_kmer_index_compared: the kmer index of the last valid comparison done between two bases. If a single base was compare, then this value will be 0. """ # Example # # str(sba): ATGGGCTGCAAGCTCGA$AATTTAGCGGCCTAGGCTTA # kmer_a: [--------] # kmer_b: [----] # # max_kmer_len | comparison # 1 | 0 # 2 | 0 # 3 | -1 # None | -1 kmer_idx = 0 comparison = 0 last_kmer_index_compared = None while True: # sba indices to compare idx_a = kmer_sba_start_idx_a + kmer_idx idx_b = kmer_sba_start_idx_b + kmer_idx # is idx_a or idx_b out of bounds? (i.e. equal to "$" or overflowed) idx_a_out_of_bounds = True if idx_a >= len(sba_a) or sba_a[idx_a] == ord("$") else False idx_b_out_of_bounds = True if idx_b >= len(sba_b) or sba_b[idx_b] == ord("$") else False # break if idx_a or idx_b is out of bounds if idx_a_out_of_bounds or idx_b_out_of_bounds: # set last_kmer_index_compared last_kmer_index_compared = kmer_idx - 1 if last_kmer_index_compared < 0: raise AssertionError(f"There were no valid kmer bases to compare") # get comparison if idx_a_out_of_bounds and not idx_b_out_of_bounds: comparison = -1 # kmer_a < kmer_b elif idx_b_out_of_bounds and not idx_a_out_of_bounds: comparison = 1 # kmer_a > kmer_b else: comparison = 0 # kmer_a == kmer_b break # check whether kmer_a < kmer_b (and vice versa) if sba_a[idx_a] < sba_b[idx_b]: comparison = -1 # kmer_a < kmer_b last_kmer_index_compared = kmer_idx break if sba_a[idx_a] > sba_b[idx_b]: comparison = 1 # kmer_a > kmer_b last_kmer_index_compared = kmer_idx break # break if kmer_len has been reached if max_kmer_len is not None and kmer_idx == max_kmer_len - 1: last_kmer_index_compared = kmer_idx break kmer_idx += 1 return comparison, last_kmer_index_compared
[docs] @jit def get_kmer_info_minimal( kmer_num: int, kmer_sba_start_indices: np.array, sba: np.array, kmer_len: Union[int, None], group_size_yielded: int, group_size_total: int, ) -> tuple[int, int, int]: """ Return only basic kmer information without any processing. Used as an input to kmer_info_by_group_generator when only basic information is required. Args: kmer_num (int): kmer number kmer_start_indices (np.array): kmer sequence byte array start indices sba (np.array): sequence byte array kmer_len (Union[int, None]): length of kmer. If None, take the longest possible. group_size_yielded (int): number of kmers in the group that will be yielded group_size_total (int): number of kmers in the group in total Returns: tuple[int, int, int]: kmer_num, group_size_yielded, group_size_total """ return kmer_num, group_size_yielded, group_size_total
[docs] @jit def get_kmer_info_group_size_only( kmer_num: int, kmer_sba_start_indices: np.array, sba: np.array, kmer_len: Union[int, None], group_size_yielded: int, group_size_total: int, ) -> tuple[int, int, int]: """ Return only group_size_total without any processing. Args: kmer_num (int): kmer number kmer_start_indices (np.array): kmer sequence byte array start indices sba (np.array): sequence byte array kmer_len (Union[int, None]): length of kmer. If None, take the longest possible. group_size_yielded (int): number of kmers in the group that will be yielded group_size_total (int): number of kmers in the group in total Returns: int: group_size_total """ return group_size_total
[docs] @jit def get_kmer_group_size_hist( sba: np.array, sba_strand: str, kmer_len: Union[int, None], kmer_start_indices: np.array, kmer_comparison_func: Callable, kmer_filter_func: Callable, min_group_size: int = 1, max_group_size: Union[int, None] = None, max_counts_bin: int = 1000000, ) -> tuple[np.array, int]: """ Build a histogram of group sizes. counts_by_group_size[i] gives the number of groups of size i. Any group sizes > max_counts_bin will be placed in max_counts_bin. The total number of kmers is also calculated. Args: sba (np.array): sequence byte array sba_strand (str): "forward" or "reverse_complement" kmer_len (Union[int, None]): length of kmer. If None, take the longest possible. kmer_start_indices (np.array): kmer sequence byte array start indices kmer_comparison_func (Callable): function that returns the result of a two kmer comparison kmer_filter_func (Callable): function that returns true if a kmer passes all filters min_group_size (int, optional): Smallest allowed group size. Defaults to 1. max_group_size (Union[int, None], optional): Largest allowed group size. If None, then there is no maximum group size. Defaults to None. max_counts_bin (int, optional): largest group_size bin in the return counts_by_group_size array. Group sizes that exceed this value will be placed in this bin. Defaults to 1000000. Raises: ValueError: max_counts_bin is invalid Returns: tuple[np.array, int]: counts_by_group_size, total_kmer_count """ if max_counts_bin <= 0: raise ValueError(f"max_counts_bin ({max_counts_bin}) must be >= 1") # set kmer_info_func and yield_first_n so that the group size is yielded once per group kmer_info_func = get_kmer_info_group_size_only yield_first_n = 1 # initialize the kmer info generator kmer_generator = kmer_info_by_group_generator( sba, sba_strand, kmer_len, kmer_start_indices, kmer_comparison_func, kmer_filter_func, kmer_info_func, min_group_size, max_group_size, yield_first_n, ) # populate the array to hold the number of group counts for each group size. Also count the # total number of kmers counts_by_group_size = np.zeros((max_counts_bin + 1,), dtype=np.int64) total_kmer_count = 0 for group_size_total in kmer_generator: total_kmer_count += group_size_total counts_by_group_size[min(group_size_total, max_counts_bin)] += 1 return counts_by_group_size, total_kmer_count
[docs] @jit(cache=False) def kmer_info_by_group_generator( sba: np.array, sba_strand: str, kmer_len: Union[int, None], kmer_start_indices: np.array, kmer_comparison_func: Callable, kmer_filter_func: Callable, kmer_info_func: Callable, min_group_size: int = 1, max_group_size: Union[int, None] = None, yield_first_n: Union[int, None] = None, ) -> Generator[tuple, None, None]: """ Generator that yields the valid kmer information and total group size for all groups meeting requirements. A valid kmer is one that passes the provided kmer_filter_func. A group is defined as the set of identical kmers as defined by the kmer_comparison_func. The first "yield_first_n" kmers will be yielded if the group meets all provided requirements. It must have a total group size between min_group_size and max_group_size (inclusive). The kmer information that is yielded is customizable and defined by kmer_info_func. Args: sba (np.array): sequence byte array sba_strand (str): "forward" or "reverse_complement" kmer_len (Union[int, None]): length of kmer. If None, take the longest possible. kmer_start_indices (np.array): kmer sequence byte array start indices kmer_comparison_func (Callable): function that returns the result of a two kmer comparison kmer_filter_func (Callable): function that returns true if a kmer passes all filters kmer_info_func (Callable): function that returns a tuple with all the kmer information to yielded. min_group_size (int, optional): Smallest allowed group size. Defaults to 1. max_group_size (Union[int, None], optional): Largest allowed group size. If None, then there is no maximum group size. Defaults to None. yield_first_n (Union[int, None], optional): yield up to this many kmer_nums. Defaults to None. Raises: ValueError: invalide min_group_size, max_group_size, or yield_first_n Yields: Generator[tuple[list[int], int], None, None]: valid_kmer_nums_in_group, group_size """ # check arguments if min_group_size < 1: raise ValueError(f"min_group_size ({min_group_size}) must be >= 1") if max_group_size is not None and max_group_size < min_group_size: raise ValueError( f"if max_group_size ({max_group_size}) is specified, it must be >= min_group_size ({min_group_size})" ) if yield_first_n is not None and yield_first_n < 1: raise ValueError(f"if yield_first_n ({yield_first_n}) is specified, it must be > 0") # iterate through all kmers storing kmers that pass all filters and yielding results when a new # group is reached # NOTE: the empty list is initialized like it is below so that numba can infer its type # https://numba.readthedocs.io/en/stable/user/troubleshoot.html#my-code-has-an-untyped-list-problem valid_kmer_nums_in_group = [i for i in range(0)] group_size = 0 prev_valid_kmer_sba_start_idx = None for kmer_num in range(0, len(kmer_start_indices)): # skip the kmer if it does not pass all filters kmer_sba_start_idx = kmer_start_indices[kmer_num] passes_all_filters = kmer_filter_func(sba, sba_strand, kmer_sba_start_idx) if not passes_all_filters: continue # if this is the first valid kmer, set prev_valid_kmer_sba_start_idx and treat as if it is # in the same group if prev_valid_kmer_sba_start_idx is None: prev_valid_kmer_sba_start_idx = kmer_sba_start_idx in_same_group = True # otherwise, check whether we are in the same group by comparing to the last valid kmer else: comparison, last_kmer_index_compared = kmer_comparison_func( sba, sba, prev_valid_kmer_sba_start_idx, kmer_sba_start_idx ) in_same_group = True if comparison == 0 else False prev_valid_kmer_sba_start_idx = kmer_sba_start_idx # if we are in a the same group, increment group size and add to valid_kmer_nums_in_group if in_same_group: group_size += 1 if yield_first_n is None or len(valid_kmer_nums_in_group) < yield_first_n: valid_kmer_nums_in_group.append(kmer_num) # otherwise, we are in a new group - yield info and start a new group else: # yield the completed group if it meets requirements meets_min_group_size = group_size >= min_group_size meets_max_group_size = max_group_size is None or group_size <= max_group_size if meets_min_group_size and meets_max_group_size: # yield kmer_info for each valid kmer_num group_size_yielded = len(valid_kmer_nums_in_group) for kmer_num_in_group in valid_kmer_nums_in_group: yield kmer_info_func( kmer_num_in_group, kmer_start_indices, sba, kmer_len, group_size_yielded, group_size, ) # reset tracking for the new group group_size = 1 valid_kmer_nums_in_group = [kmer_num] # there is likely one remaining group to yield (unless there were no valid groups) # yield the completed group if it meets requirements meets_min_group_size = group_size >= min_group_size meets_max_group_size = max_group_size is None or group_size <= max_group_size if meets_min_group_size and meets_max_group_size: # yield kmer_info for each valid kmer_num group_size_yielded = len(valid_kmer_nums_in_group) for kmer_num_in_group in valid_kmer_nums_in_group: yield kmer_info_func( kmer_num_in_group, kmer_start_indices, sba, kmer_len, group_size_yielded, group_size, ) return
[docs] class Kmers: """ Defines memory-efficient kmers calculations on a genome. """ def __init__( self, seq_coll: Union[SequenceCollection, None] = None, min_kmer_len: int = 1, max_kmer_len: Union[int, None] = None, source_strand: str = "forward", track_strands_separately: bool = False, method: str = "single_pass", ) -> None: """ Initialize Kmers Args: seq_coll (SequenceCollection): the sequence collection on which kmers are defined min_kmer_len (int, optional): kmers below this size are not considered. Defaults to 1. max_kmer_len (tuple[int, None], optional): kmers above this size are not considered. Defaults to None. source_strand (str, optional): strand(s) on which kmers are defined ("forward", "reverse_complement", "both"). Defaults to "forward". track_strands_separately (bool, optional): if source_strand is set to "both", this specifies whether kmers should be tracked separately by strand (which is required for certain kmer filters) method (str, optional): which method to use for initialization. "single_pass" runs faster, but temporarily uses more RAM (up to 2x). "double_pass" runs slower (~2x) but uses less memory (as much as half). Defaults to "single_pass". Raises: ValueError: invalid arguments NotImplementedError: yet to be implemented functionality """ # check whether arguments required functionality that has not yet been implemented if track_strands_separately: raise NotImplementedError( f"This function has not been implemented for track_strands_separately = '{track_strands_separately}'" ) if source_strand != "forward": raise NotImplementedError( f"This function has not been implemented for source_strand = '{source_strand}'" ) # verify that arguments are valid if source_strand not in ("forward", "reverse_complement", "both"): raise ValueError(f"source_strand ({source_strand}) not recognized") if source_strand != "both" and track_strands_separately: raise ValueError( f"track_strands_separately can only be true if source_strand is 'both', but it is '{source_strand}'" ) if min_kmer_len < 1: raise ValueError(f"min_kmer_len ({min_kmer_len}) must be greater than zero") if max_kmer_len is not None: if max_kmer_len < 1: raise ValueError(f"max_kmer_len ({max_kmer_len}) must be greater than zero") if min_kmer_len is not None and max_kmer_len < min_kmer_len: raise ValueError( f"max_kmer_len ({max_kmer_len}) is less than min_kmer_len ({min_kmer_len})" ) # set member variables based on arguments self.min_kmer_len = min_kmer_len self.max_kmer_len = max_kmer_len self.kmer_source_strand = source_strand self.track_strands_separately = track_strands_separately self._is_initialized = False self._is_set = False self._is_sorted = False self.kmer_sba_start_indices = None # if seq_coll is not provided, return without any further processing if seq_coll is None: return # count number of records and sequence lengths in sequence_collection seq_lengths = [] min_seq_len = None num_records = 0 record_generator = seq_coll.iter_records() for _, sba_seg_start_idx, sba_seg_end_idx in record_generator: seq_length = sba_seg_end_idx - sba_seg_start_idx + 1 seq_lengths.append(seq_length) if min_seq_len is None or seq_length < min_seq_len: min_seq_len = seq_length num_records += 1 # verify that arguments are valid given the sequence_collection if num_records == 0: raise ValueError(f"sequence_collection is empty") if min_kmer_len is not None and min_kmer_len > min_seq_len: raise ValueError( f"min_kmer_len ({min_kmer_len}) must be <= the shortest sequence length ({min_seq_len})" ) if seq_coll.strands_loaded() != source_strand: # for now, require that source_strand matches what is in the SequenceCollection for ease # of implementation raise ValueError( f"source_strand ({source_strand}) does not match sequence_collection loaded strand ({seq_coll.strands_loaded()})" ) # set seq_coll and initialize self.seq_coll = seq_coll self._initialize(method=method) return def _initialize(self, kmer_filters=[], method: str = "single_pass"): """ Initialize Kmers instance and populate internal kmer_sba_start_indices array. Args: kmer_filters (list, optional): _description_. Defaults to []. method (str, optional): which method to use for initialization. "single_pass" runs faster, but temporarily uses more RAM (up to 2x). "double_pass" runs slower (~2x) but uses less memory (as much as half). Defaults to "single_pass". Raises: ValueError: method not recognized """ if kmer_filters != []: raise NotImplementedError("kmer_filters have not been implemented") if method == "double_pass": # TODO: "double_pass" implementation counts the number of kmers first, initializes an # array of the correct size, and then populates it on-the-fly. Requires less memory raise NotImplementedError(f"method '{method}' has not been implemented") elif method == "single_pass": self._initialize_single_pass(kmer_filters=kmer_filters) else: raise ValueError(f"method '{method}' not recognized") self._is_initialized = True def _initialize_single_pass(self, kmer_filters=[]): """ Initialize Kmers in a single pass. This loads all unfiltered kmers into memory, filters them in place, and then creates a new array of length num_filtered_kmers. This runs faster than a "double pass" initialization, but temporarily uses more memory (up to 2x). Args: kmer_filters (list, optional): _description_. Defaults to []. Raises: AssertionError: logic check """ if kmer_filters != []: raise NotImplementedError("kmer_filters have not been implemented") num_kmers = self._get_unfiltered_kmer_count() if num_kmers > 2**32 - 1: msg = "the size of the required kmers array exceeds the limit set by a uint32" raise NotImplementedError(msg) # initialize array large enough to hold all unfiltered kmers self.kmer_sba_start_indices = np.zeros(num_kmers, dtype=np.uint32) # iterate over records initializing kmer start indices record_generator = self.seq_coll.iter_records() last_filled_index = -1 for _, sba_seg_start_idx, sba_seg_end_idx in record_generator: num_kmers_in_record = (sba_seg_end_idx - sba_seg_start_idx + 1) - self.min_kmer_len + 1 kmer_start = last_filled_index + 1 kmer_end = last_filled_index + 1 + num_kmers_in_record sba_start = sba_seg_start_idx sba_end = sba_seg_end_idx + 1 - self.min_kmer_len + 1 self.kmer_sba_start_indices[kmer_start:kmer_end] = np.arange( sba_start, sba_end, dtype=np.uint32 ) last_filled_index += num_kmers_in_record if last_filled_index != num_kmers - 1: raise AssertionError( f"logic error: last_filled_index ({last_filled_index}) != num_kmers - 1 ({num_kmers - 1})" ) # TODO: next step is to filter kmers in place return def _get_unfiltered_kmer_count(self) -> int: """ Calculate the number of unfiltered kmers and the total length of the kmer array for the loaded sequence_collection. Raises: ValueError: empty sequence_collection Returns: int: num_kmers """ # TODO: when SequenceCollection has a method to get num_records, remove the counter below num_kmers = 0 num_records = 0 record_generator = self.seq_coll.iter_records() for _, sba_seg_start_idx, sba_seg_end_idx in record_generator: num_kmers_in_record = (sba_seg_end_idx - sba_seg_start_idx + 1) - self.min_kmer_len + 1 num_kmers += num_kmers_in_record num_records += 1 # TODO: when SequenceCollection has a method to get num_records, move this to top of func if num_records == 0: raise ValueError(f"SequenceCollection does not have any records") return num_kmers def __len__(self): return len(self.kmer_sba_start_indices) def __getitem__(self): pass
[docs] def get_kmers( self, kmer_len: Union[int, None], one_based_seq_index: bool = False, kmer_filter_func: Callable = kmer_filter_keep_all, kmer_info_to_yield: str = "minimum", min_group_size: int = 1, max_group_size: Union[int, None] = None, yield_first_n: Union[int, None] = None, ) -> Generator[tuple, None, None]: """ A customizable generator yielding tuples with all kmer information. Examples: Yield all kmers kmers.get_kmers(yield_first_n=None) Yield only the first occurrence of a kmer kmers.get_kmers(use yield_first_n=1) Yield up to the first 10 occurrences of a kmer kmers.get_kmers(use yield_first_n=10) Yield all kmers that occur exactly once kmers.get_kmers(max_group_size=1) Yield all kmers that are repeated at least 5 times and no more than 10 times kmers.get_kmers(min_group_size=5, max_group_size=10) NOTE: group yielding is not supported if kmers are unsorted. The user cannot provide min_group_size, max_group_size, or yield_first_n in this situation. Args: kmer_len (Union[int, None]): length of kmer. If None, take the longest possible. one_based_seq_index (bool, optional): whether yielded sequence indices should be 1-based. Defaults to False. kmer_filter_func (Callable, optional): function that returns true if a kmer passes all filters. Defaults to kmer_filter_keep_all. kmer_info_to_yield (str): "minimum" or "full". Defaults to "minimum" min_group_size (int, optional): Smallest allowed group size. Defaults to 1. max_group_size (Union[int, None], optional): Largest allowed group size. If None, then there is no maximum group size. Defaults to None. yield_first_n (Union[int, None], optional): yield up to this many kmer_nums. Defaults to None. Raises: NotImplementedError: if kmer_source_strand and seq_coll.strands() loaded are not both "forward" ValueError: kmer_len is invalid ValueError: one or more group params are invalid (min_group_size, max_group_size, yield_first_n) ValueError: kmer_comparison_func is provided when kmers have not been sorted Yields: Generator[tuple, None, None]: output depends on get_kmer_info_func """ condition1 = self.kmer_source_strand != "forward" condition2 = self.seq_coll.strands_loaded() != "forward" if condition1 or condition2: raise NotImplementedError( f"both kmer_source_strand ({self.kmer_source_strand}) and " "sequence_collection.strands_loaded() must be 'forward'" ) # verify that kmer_len is valid if kmer_len is not None and kmer_len < 1: raise ValueError(f"kmer_len ({kmer_len}) must be > 0") # verify that if kmers has not been sorted, that group arguments have not been provided if not self._is_sorted: if min_group_size != 1: msg = "Returning group parameters is not supported when kmers has not been" msg += f" sorted. min_group_size ({min_group_size}) cannot be specified. Did you" msg += " mean to run sort() before getting kmers?" raise ValueError(msg) if max_group_size is not None: msg = "Returning group parameters is not supported when kmers has not been" msg += f" sorted. max_group_size ({max_group_size}) cannot be specified. Did you" msg += " mean to run sort() before getting kmers?" raise ValueError(msg) if yield_first_n is not None: msg = "Returning group parameters is not supported when kmers has not been" msg += f" sorted. yield_first_n ({yield_first_n}) cannot be specified. Did you" msg += " mean to run sort() before getting kmers?" raise ValueError(msg) # set kmer_comparison_func if self._is_sorted: kmer_comparison_func = get_compare_sba_kmers_func(kmer_len) else: kmer_comparison_func = compare_sba_kmers_always_less_than # set get_kmer_info_func if kmer_info_to_yield == "minimum": get_kmer_info_func = get_kmer_info_minimal elif kmer_info_to_yield == "full": get_kmer_info_func = self.generate_get_kmer_info_func(one_based_seq_index) else: raise ValueError(f"kmer_info_to_yield ({kmer_info_to_yield}) not recognized") # collect values to pass to kmer_info_generator sba = self.seq_coll.forward_sba sba_strand = self.seq_coll.strands_loaded() kmer_start_indices = self.kmer_sba_start_indices # initialize the kmer info generator kmer_generator = kmer_info_by_group_generator( sba, sba_strand, kmer_len, kmer_start_indices, kmer_comparison_func, kmer_filter_func, get_kmer_info_func, min_group_size, max_group_size, yield_first_n, ) for kmer_info in kmer_generator: yield kmer_info return
[docs] def get_kmer_count( self, kmer_len: Union[int, None], kmer_filter_func: Callable = kmer_filter_keep_all, min_group_size: int = 1, max_group_size: Union[int, None] = None, ) -> int: """ A customizable function to count the total number of kmers passing filters. Examples: Count kmers that occur exactly once kmers.get_kmer_count(max_group_size=1) Count kmers that are repeated at least 5 times and no more than 10 times kmers.get_kmer_count(min_group_size=5, max_group_size=10) Count 50-mers filter = gen_kmer_length_filter_func(min_kmer_len=50) kmers.get_kmer_count(kmer_filter_func=filter) Args: kmer_len (Union[int, None]): length of kmer. If None, take the longest possible. kmer_filter_func (Callable, optional): function that returns true if a kmer passes all filters. Defaults to kmer_filter_keep_all. min_group_size (int, optional): Smallest allowed group size. Defaults to 1. max_group_size (Union[int, None], optional): Largest allowed group size. If None, then there is no maximum group size. Defaults to None. Raises: NotImplementedError: if kmer_source_strand and seq_coll.strands() loaded are not both "forward" ValueError: kmer_len is invalid ValueError: one or more group params are invalid (min_group_size, max_group_size, yield_first_n) ValueError: kmer_comparison_func is provided when kmers have not been sorted Returns: int: total_kmer_count """ condition1 = self.kmer_source_strand != "forward" condition2 = self.seq_coll.strands_loaded() != "forward" if condition1 or condition2: raise NotImplementedError( f"both kmer_source_strand ({self.kmer_source_strand}) and " "sequence_collection.strands_loaded() must be 'forward'" ) # verify that kmer_len is valid if kmer_len is not None and kmer_len < 1: raise ValueError(f"kmer_len ({kmer_len}) must be > 0") # verify that if kmers has not been sorted, that group arguments have not been provided if not self._is_sorted: if min_group_size != 1: msg = "Returning group parameters is not supported when kmers has not been" msg += f" sorted. min_group_size ({min_group_size}) cannot be specified. Did you" msg += " mean to run sort() before getting kmers?" raise ValueError(msg) if max_group_size is not None: msg = "Returning group parameters is not supported when kmers has not been" msg += f" sorted. max_group_size ({max_group_size}) cannot be specified. Did you" msg += " mean to run sort() before getting kmers?" raise ValueError(msg) # set kmer_comparison_func if self._is_sorted: kmer_comparison_func = get_compare_sba_kmers_func(kmer_len) else: kmer_comparison_func = compare_sba_kmers_always_less_than # collect values to pass to the calculation sba = self.seq_coll.forward_sba sba_strand = self.seq_coll.strands_loaded() kmer_start_indices = self.kmer_sba_start_indices # run calculation _, total_kmer_count = get_kmer_group_size_hist( sba, sba_strand, kmer_len, kmer_start_indices, kmer_comparison_func, kmer_filter_func, min_group_size, max_group_size, ) return total_kmer_count
[docs] def get_kmer_group_counts( self, kmer_len: Union[int, None], kmer_filter_func: Callable = kmer_filter_keep_all, min_group_size: int = 1, max_group_size: Union[int, None] = None, max_counts_bin: int = 1000000, ) -> tuple[np.array, int]: """ A customizable function to build a histogram of group sizes for kmers passing filters. Examples: Get histogram for 50-mers filter = gen_kmer_length_filter_func(min_kmer_len=50) kmers.get_kmer_group_counts(kmer_filter_func=filter) Args: kmer_len (Union[int, None]): length of kmer. If None, take the longest possible. kmer_filter_func (Callable, optional): function that returns true if a kmer passes all filters. Defaults to kmer_filter_keep_all. min_group_size (int, optional): Smallest allowed group size. Defaults to 1. max_group_size (Union[int, None], optional): Largest allowed group size. If None, then there is no maximum group size. Defaults to None. max_counts_bin (int, optional): largest group_size bin in the return counts_by_group_size array. Group sizes that exceed this value will be placed in this bin. Defaults to 1000000. Raises: NotImplementedError: if kmer_source_strand and seq_coll.strands() loaded are not both "forward" ValueError: kmer_len is invalid ValueError: one or more group params are invalid (min_group_size, max_group_size, yield_first_n) ValueError: kmer_comparison_func is provided when kmers have not been sorted Returns: tuple[np.array, int]: counts_by_group_size (np.array): histogram of group sizes counts_by_group_size[i] gives the number of groups of size i. Any group sizes > max_counts_bin will be placed in max_counts_bin. total_kmer_count (int): total number of kmers """ condition1 = self.kmer_source_strand != "forward" condition2 = self.seq_coll.strands_loaded() != "forward" if condition1 or condition2: raise NotImplementedError( f"both kmer_source_strand ({self.kmer_source_strand}) and " "sequence_collection.strands_loaded() must be 'forward'" ) # verify that kmer_len is valid if kmer_len is not None and kmer_len < 1: raise ValueError(f"kmer_len ({kmer_len}) must be > 0") # verify that if kmers has not been sorted, that group arguments have not been provided if not self._is_sorted: if min_group_size != 1: msg = "Returning group parameters is not supported when kmers has not been" msg += f" sorted. min_group_size ({min_group_size}) cannot be specified. Did you" msg += " mean to run sort() before getting kmers?" raise ValueError(msg) if max_group_size is not None: msg = "Returning group parameters is not supported when kmers has not been" msg += f" sorted. max_group_size ({max_group_size}) cannot be specified. Did you" msg += " mean to run sort() before getting kmers?" raise ValueError(msg) # set kmer_comparison_func if self._is_sorted: kmer_comparison_func = get_compare_sba_kmers_func(kmer_len) else: raise AssertionError(f"The kmers must be sorted when calling get_kmer_group_counts") # collect values to pass to the calculation sba = self.seq_coll.forward_sba sba_strand = self.seq_coll.strands_loaded() kmer_start_indices = self.kmer_sba_start_indices # run calculation counts_by_group_size, total_kmer_count = get_kmer_group_size_hist( sba, sba_strand, kmer_len, kmer_start_indices, kmer_comparison_func, kmer_filter_func, min_group_size, max_group_size, max_counts_bin, ) return counts_by_group_size, total_kmer_count
[docs] def generate_get_kmer_info_func(self, one_based_seq_index: bool) -> Callable: """ Generate the get_kmer_info function that is used to get kmer information from a sequence byte array index. Args: one_based_seq_index (bool): whether to return one-based sequence indices Returns: Callable: get_kmer_info """ get_record_info_from_sba_index = self.seq_coll.generate_get_record_info_from_sba_index_func( one_based_seq_index ) @jit def get_kmer_info( kmer_num: int, kmer_sba_start_indices: np.array, sba: np.array, kmer_len: Union[int, None], group_size_yielded: int, group_size_total: int, ) -> tuple[int, str, str, int, int, int, int]: """ Given the kmer_num, return all kmer info. Args: kmer_num (int): kmer number kmer_sba_start_indices (np.array): sequence byte array start indices sba (np.array): sequence byte array kmer_len (Union[int, None]): length of kmer group_size_yielded (int): total number of kmers in the group that will be yielded group_size_total (int): total size of the group (including kmers not yielded) Raises: ValueError: kmer_num is invalid ValueError: kmer_len is invalid Returns: tuple[int, str, str, int, int, int, int]: kmer_num, seq_strand, seq_chrom, seq_start_idx, kmer_len, group_size_yielded, group_size_total, """ # verify that kmer_num is valid if kmer_num < 0: raise ValueError(f"kmer_num ({kmer_num}) cannot be less than zero") if kmer_num >= len(kmer_sba_start_indices): raise ValueError( f"kmer_num ({kmer_num}) is out of bounds (num kmers = {len(kmer_sba_start_indices)})" ) # get record information given the kmer's sequence byte array index sba_idx = kmer_sba_start_indices[kmer_num] seg_num, seg_sba_start_idx, seg_sba_end_idx, seq_strand, seq_chrom, seq_start_idx = ( get_record_info_from_sba_index(sba_idx) ) # if kmer_len is None, set it to the largest possible kmer if kmer_len is None: kmer_len = seg_sba_end_idx - sba_idx + 1 # otherwise, verify that kmer_len is in-bounds else: if sba_idx + kmer_len - 1 > seg_sba_end_idx: raise ValueError( f"kmer_len ({kmer_len}) for kmer_num ({kmer_num}) extends beyond the end of the segment" ) return ( kmer_num, seq_strand, seq_chrom, seq_start_idx, kmer_len, group_size_yielded, group_size_total, ) return get_kmer_info
def __ne__(self, other): return not self.__eq__(other) def __eq__(self, other): if self.min_kmer_len != other.min_kmer_len: return False # max_kmer_len is nullable if self.max_kmer_len is None and other.max_kmer_len is not None: return False elif self.max_kmer_len is not None and other.max_kmer_len is None: return False elif self.max_kmer_len != other.max_kmer_len: return False if self.kmer_source_strand != other.kmer_source_strand: return False if self.track_strands_separately != other.track_strands_separately: return False if self._is_initialized != other._is_initialized: return False if self._is_set != other._is_set: return False if self._is_sorted != other._is_sorted: return False # kmer_sba_start_indices is nullable if self.kmer_sba_start_indices is None and other.kmer_sba_start_indices is not None: return False elif self.kmer_sba_start_indices is not None and other.kmer_sba_start_indices is None: return False elif not np.array_equal(self.kmer_sba_start_indices, other.kmer_sba_start_indices): return False if self.seq_coll != other.seq_coll: return False # if this is reached, then they are equal return True
[docs] def save( self, save_file_path: Path, include_sequence_collection: bool = False, format: str = "hdf5", mode: str = "w", ) -> None: """ Save Kmers object to file. Args: save_file_path (Path): path for saved file include_sequence_collection (bool, optional): whether to include sequence collection. Defaults to False. format (str, optional): "hdf5" or "shelve". Defaults to "hdf5". mode (str, optional): mode with which to open file and save information. "w" for write or "a" for append. Defaults to "w". Raises: ValueError: format not recognized """ if format == "hdf5": self._save_hdf5(save_file_path, include_sequence_collection, mode=mode) elif format == "shelve": self._save_shelve(save_file_path, include_sequence_collection) else: raise ValueError(f"format ({format}) not recognized")
[docs] def load( self, load_file_path: Path, seq_coll: Union[SequenceCollection, None] = None, format: str = "hdf5", ) -> None: """ Load Kmers object from saved file. Args: load_file_path (Path): path to file to load. seq_coll (Union[SequenceCollection, None], optional): If provided, seq_coll will be loaded into the kmers object rather than attempting to load from the saved file. Defaults to None. format (str, optional): "hdf5" or "shelve". Defaults to "hdf5". Raises: ValueError: format not recognized """ if format == "hdf5": self._load_hdf5(load_file_path, seq_coll) elif format == "shelve": self._load_shelve(load_file_path, seq_coll) else: raise ValueError(f"format ({format}) not recognized")
@staticmethod def _set_for_export(value, value_if_none): """ Helper function that converts a None value into the appropriate indicator value if value equals value_if_none. This is used to help deal with the HDF5 format not handling null values. Args: value: value we want to pass to serialize with HDF5 value_if_none: what to pass to HDF5 instead of value if value is None Returns: value_if_none if value is None else value """ if value is None: return value_if_none else: return value @staticmethod def _correct_import(value, value_if_none): """ Helper function that converts a value read from HDF5 to None if it equal to the indicator value value_if_none. This is used to help deal with the HDF5 format not handling null values. Args: value: value we want to pass to serialize with HDF5 value_if_none: what to pass to HDF5 instead of value if value is None Returns: None if value == value_if_none else value """ if isinstance(value, np.ndarray): if value.shape == (0,): return None elif value == value_if_none: return None return value def _save_hdf5( self, save_file_path: Path, include_sequence_collection: bool = False, mode: str = "w" ) -> None: """ Save Kmers object information to HDF5 file format. Args: save_file_path (Path): path for saved file include_sequence_collection (bool, optional): whether to include sequence collection. Defaults to False. mode (str, optional): mode with which to open file and save information. "w" for write or "a" for append. Defaults to "w". """ with h5py.File(save_file_path, mode) as file: grp = file.create_group("kmers") # save all class members to file. hdf5 does not accept None values. Correct them before # exporting. empty_start_indices = np.array([], dtype=np.uint32) grp["min_kmer_len"] = self.min_kmer_len grp["max_kmer_len"] = self._set_for_export(self.max_kmer_len, 0) grp["kmer_source_strand"] = self.kmer_source_strand grp["track_strands_separately"] = self.track_strands_separately grp["_is_initialized"] = self._is_initialized grp["_is_set"] = self._is_set grp["_is_sorted"] = self._is_sorted grp["kmer_sba_start_indices"] = self._set_for_export( self.kmer_sba_start_indices, empty_start_indices ) if include_sequence_collection: self.seq_coll.save(save_file_path, mode="a", format="hdf5") def _load_hdf5( self, load_file_path: Path, seq_coll: Union[SequenceCollection, None] = None ) -> None: """ Load Kmers object information from HDF5 file format. Args: load_file_path (Path): path to file to load. seq_coll (Union[SequenceCollection, None], optional): If provided, seq_coll will be loaded into the kmers object rather than attempting to load from the saved file. Defaults to None. """ with h5py.File(load_file_path, "r") as file: grp = file["kmers"] # read values from file empty_start_indices = np.array([], dtype=np.uint32) self.min_kmer_len = grp["min_kmer_len"][()] self.max_kmer_len = self._correct_import(grp["max_kmer_len"][()], 0) self.kmer_source_strand = grp["kmer_source_strand"][()].decode("utf-8") self.track_strands_separately = grp["track_strands_separately"][()] self._is_initialized = grp["_is_initialized"][()] self._is_set = grp["_is_set"][()] self._is_sorted = grp["_is_sorted"][()] self.kmer_sba_start_indices = self._correct_import( grp["kmer_sba_start_indices"][:], empty_start_indices ) if seq_coll is not None: self.seq_coll = seq_coll else: self.seq_coll = SequenceCollection() self.seq_coll.load(load_file_path, format="hdf5") return def _save_shelve(self, save_file_path: Path, include_sequence_collection: bool = False) -> None: """ Save Kmers object information to shelve file format. Args: save_file_path (Path): path for saved file include_sequence_collection (bool, optional): whether to include sequence collection. Defaults to False. """ with shelve.open(save_file_path) as db: db["min_kmer_len"] = self.min_kmer_len db["max_kmer_len"] = self.max_kmer_len db["kmer_source_strand"] = self.kmer_source_strand db["track_strands_separately"] = self.track_strands_separately db["_is_initialized"] = self._is_initialized db["_is_set"] = self._is_set db["_is_sorted"] = self._is_sorted db["kmer_sba_start_indices"] = self.kmer_sba_start_indices db["_is_initialized"] = self._is_initialized db["kmer_sba_start_indices"] = self.kmer_sba_start_indices if include_sequence_collection: self.seq_coll.save(save_file_path, format="shelve") def _load_shelve( self, load_file_path: Path, seq_coll: Union[SequenceCollection, None] = None ) -> None: """ Load Kmers object information from shelve file format. Args: load_file_path (Path): path to file to load. seq_coll (Union[SequenceCollection, None], optional): If provided, seq_coll will be loaded into the kmers object rather than attempting to load from the saved file. Defaults to None. """ with shelve.open(load_file_path) as db: self.min_kmer_len = db["min_kmer_len"] self.max_kmer_len = db["max_kmer_len"] self.kmer_source_strand = db["kmer_source_strand"] self.track_strands_separately = db["track_strands_separately"] self._is_initialized = db["_is_initialized"] self._is_set = db["_is_set"] self._is_sorted = db["_is_sorted"] self.kmer_sba_start_indices = db["kmer_sba_start_indices"] self._is_initialized = db["_is_initialized"] self.kmer_sba_start_indices = db["kmer_sba_start_indices"] if seq_coll is None: self.seq_coll = SequenceCollection() self.seq_coll.load(load_file_path, format="shelve") else: self.seq_coll = seq_coll
[docs] def get_kmer_str_no_checks(self, kmer_num: int, kmer_strand: str, kmer_len: int) -> str: """ Return a string representation of kmer_num on kmer_strand with kmer_len. No checks to verify that arguments provided are done. Only call this function if it is known that these checks have already been completed (e.g. when yielded get_kmers()). Args: kmer_num (int): kmer number kmer_strand (str): "+" or "-" kmer_len (int): length of the kmer Raises: NotImplementedError: kmer_strand != "+" ValueError: unrecognized kmer_strand Returns: str: kmer_str """ if kmer_strand == "+": sba = self.seq_coll.forward_sba sba_start_idx = self.kmer_sba_start_indices[kmer_num] elif kmer_strand == "-": raise NotImplementedError("Only implemented for kmer_strand='+'") else: raise ValueError(f"kmer_strand ({kmer_strand}) not recognized") return bytearray(sba[sba_start_idx : sba_start_idx + kmer_len]).decode("utf-8")
[docs] def get_kmer_str(self, kmer_num: int, kmer_len: Union[int, None] = None) -> str: """ Get the kmer_num'th kmer of kmer_len. Args: kmer_num (int): which number kmer to return (in range [0, num_kmers - 1]) kmer_len (Union[int, None], optional): length of kmer to return. If kmer_len is None, return the longest possible, which is when the segment ends or the kmer_max_len is reached. Defaults to None. Raises: NotImplementedError: kmer_source_strand and strands_loaded must both be "forward" ValueError: kmer_num is invalid ValueError: kmer_len is invalid Returns: str: kmer """ condition1 = self.kmer_source_strand != "forward" condition2 = self.seq_coll.strands_loaded() != "forward" if condition1 or condition2: raise NotImplementedError( f"both kmer_source_strand ({self.kmer_source_strand}) and " "sequence_collection.strands_loaded() must be 'forward'" ) # verify that kmer_num is valid if kmer_num < 0: raise ValueError(f"kmer_num ({kmer_num}) cannot be less than zero") if kmer_num >= len(self): raise ValueError(f"kmer_num ({kmer_num}) is out of bounds (num kmers = {len(self)})") # verify that kmer_len is valid # TODO: consider allowing user to select a shorter or longer kmer than during sorting if kmer_len is not None and kmer_len < self.min_kmer_len: raise ValueError( f"kmer_len ({kmer_len}) is less than min_kmer_len ({self.min_kmer_len})" ) if self.max_kmer_len is not None and kmer_len > self.max_kmer_len: raise ValueError( f"kmer_len ({kmer_len}) is greater than max_kmer_len ({self.max_kmer_len})" ) sba_start_idx = self.kmer_sba_start_indices[kmer_num] seg_num = self.seq_coll.get_segment_num_from_sba_index(sba_start_idx) _, sba_seg_end_idx = self.seq_coll.get_sba_start_end_indices_for_segment(seg_num) if kmer_len is None: largest_kmer_len = sba_seg_end_idx - sba_start_idx + 1 if self.max_kmer_len is None: kmer_len = largest_kmer_len else: kmer_len = min(self.max_kmer_len, largest_kmer_len) # verify that kmer_num is in-bounds if sba_start_idx + kmer_len - 1 > sba_seg_end_idx: raise ValueError( f"kmer_len ({kmer_len}) for kmer_num ({kmer_num}) extends beyond the end of the segment" ) sba = self.seq_coll.forward_sba return bytearray(sba[sba_start_idx : sba_start_idx + kmer_len]).decode("utf-8")
[docs] def sort(self): """ Sort (in place) the kmer_sba_start_indices array by lexicographically comparing the kmers defined at each index. Raises: NotImplementedError: kmer_source_strand and strands_loaded must both be "forward" """ condition1 = self.kmer_source_strand != "forward" condition2 = self.seq_coll.strands_loaded() != "forward" if condition1 or condition2: raise NotImplementedError( f"both kmer_source_strand ({self.kmer_source_strand}) and " "sequence_collection.strands_loaded() must be 'forward'" ) # build the is_less_than() comparison function to be passed to quicksort is_less_than = self.get_is_less_than_func() # compile the sorting function quicksort_func = quicksort.make_jit_quicksort(lt=is_less_than, is_argsort=False) jit_sort_func = nb.njit(quicksort_func.run_quicksort) # sort jit_sort_func(self.kmer_sba_start_indices) self._is_sorted = True return
[docs] def get_is_less_than_func( self, validate_kmers: bool = True, break_ties: bool = False ) -> Callable: """ Returns a less than function that takes two integers as input and returns whether the kmer defined by the first index is less than the kmer defined by the second index. NOTE: If break_ties is True, then it will return True if the first of two equal kmers has a smaller sba_start_index. This is useful to gauranteeing identical output between different runs. However, it comes at a significant performance cost due to additional swapping required Args: validate_kmers (bool, optional): Explicitly verify that both kmers are at least of min_kmer_len. Defaults to False. break_ties (bool, optional): if two kmers are lexicographically equivalent, break the tie usind the sba_start_index. Defaults to True. Raises: NotImplementedError: kmer_source_strand and strands_loaded must both be "forward" Returns: Callable: is_less_than_func """ condition1 = self.kmer_source_strand != "forward" condition2 = self.seq_coll.strands_loaded() != "forward" if condition1 or condition2: raise NotImplementedError( f"both kmer_source_strand ({self.kmer_source_strand}) and " "sequence_collection.strands_loaded() must be 'forward'" ) # assign to local variables the member variables to which is_less_than() needs access sba = self.seq_coll.forward_sba min_kmer_len = self.min_kmer_len max_kmer_len = self.max_kmer_len def is_less_than(kmer_sba_start_idx_a: int, kmer_sba_start_idx_b: int) -> bool: """ Returns whether kmer_a is less than kmer_b. Args: kmer_sba_start_idx_a (int): start index in the sequence byte array for kmer_a kmer_sba_start_idx_b (int): start index in the sequence byte array for kmer_b Returns: bool: a_lt_b """ # compare kmers comparison, last_kmer_index_compared = compare_sba_kmers_lexicographically( sba, sba, kmer_sba_start_idx_a, kmer_sba_start_idx_b, max_kmer_len=max_kmer_len ) if comparison < 0: a_lt_b = True elif comparison > 0: a_lt_b = False else: if break_ties: a_lt_b = kmer_sba_start_idx_a < kmer_sba_start_idx_b else: a_lt_b = False # verify that kmer_a and kmer_b are at least of length min_kmer_len if validate_kmers: num_bases_to_check = min_kmer_len - (last_kmer_index_compared + 1) kmer_a_is_valid = kmer_has_required_len( sba, kmer_sba_start_idx_a + last_kmer_index_compared + 1, num_bases_to_check ) kmer_b_is_valid = kmer_has_required_len( sba, kmer_sba_start_idx_b + last_kmer_index_compared + 1, num_bases_to_check ) if not kmer_a_is_valid or not kmer_b_is_valid: raise AssertionError( f"kmers compared were less than min_kmer_len ({min_kmer_len}). Was kmer_sba_start_indices initialized correctly?" ) return a_lt_b return is_less_than
[docs] def to_csv(self, kmer_len, output_file_path, fields=["kmer"]): """ Write all kmers to CSV file using a simple function. """ pass