Source code for genome_kmers.sequence_collection

# from bisect import bisect_right
import pickle
import shelve
from collections import Counter
from pathlib import Path
from typing import Callable, List, Union

import h5py
import numpy as np
from numba import jit
from numba.core import types
from numba.typed import Dict


[docs] @jit def bisect_right(a, x): """ NOTE: this is a minor modification to the copy and paste from the python standard library. It is not possible to use the python standard library version with the @jit decorator, which is required since functions that call this function need to use the @jit decorator. Return the index where to insert item x in list a, assuming a is sorted. The return value i is such that all e in a[:i] have e <= x, and all e in a[i:] have e > x. So if x already appears in the list, a.insert(i, x) will insert just after the rightmost x already there. Optional args lo (default 0) and hi (default len(a)) bound the slice of a to be searched. """ lo = 0 hi = len(a) while lo < hi: mid = (lo + hi) // 2 if x < a[mid]: hi = mid else: lo = mid + 1 return lo
[docs] @jit def reverse_complement_sba( sba: np.array, complement_mapping_arr: np.array, inplace=False ) -> np.array: """ Reverse complement sequence byte array (sba) using the uint8 to uint8 mapping array (complement_mapping_arr). This function uses numba.jit for performance. Args: sba (np.array): sequence byte array (dtype=uint8) complement_mapping_arr (np.array): maps from sequence byte array value (uint8) to complement sequence byte array value (uint8) inplace (bool, optional): whether to perform in place or return a newly created array. Defaults to False. Returns: np.array: reverse complemented sequence byte array """ if inplace: for idx in range((len(sba) + 1) // 2): rc_idx = len(sba) - 1 - idx front_byte = sba[idx] back_byte = sba[rc_idx] sba[idx] = complement_mapping_arr[back_byte] sba[rc_idx] = complement_mapping_arr[front_byte] reverse_complement_arr = sba else: reverse_complement_arr = np.zeros(len(sba), dtype=np.uint8) for idx in range(len(sba)): rc_idx = rc_idx = len(sba) - 1 - idx reverse_complement_arr[rc_idx] = complement_mapping_arr[sba[idx]] return reverse_complement_arr
[docs] @jit def get_segment_num_from_sba_index(sba_idx: int, sba_strand: str, sba_seg_starts: np.array) -> int: """ Get the segment number from the sequence byte array index. Args: sba_idx (int): sequence byte array index sba_strand (str): "forward" or "reverse_complement" sba_seg_starts (np.array): sba indices for each segment start (dtype=np.unit32) Returns: int: segment_num """ # NOTE: if this is too slow in profiling, an alternative implementation is to generate a # look-up table that takes O(1) average look-up time if the distribution of sequence lengths # isn't too wide. Define an array of length (sba_len / N) and populate each index # (sba_idx // N) with the sba_idx. Choose N to be small enough to ensure O(1) lookup time. # Will have bad memory usage in worst case (e.g. 1 length 1e7, 1e7 of length 1) # use a @jit decorated copy of Python's bisect function to do O(log(N)) search for the correct # record number using return bisect_right(sba_seg_starts, sba_idx) - 1
[docs] @jit def get_forward_seq_idx( sba_idx: int, sba_strand: str, seg_sba_start_idx: int, seg_sba_end_idx: int, one_based: bool = False, ) -> int: """ Get the forward sequence index given a sequence byte array index and segment start and and indices. Optionally returns a one-based sequence index. Args: sba_idx (int): sequence byte array index sba_strand (str, optional): sequence byte array strand. If strands_loaded is "both", then it must be specified as forward" or "reverse_complement". Otherwise it is inferred. Defaults to None. seg_sba_start_idx (int): sequence byte array index for the start of the segment seg_sba_end_idx (int): sequence byte array index for the of the segment one_based (bool, optional): whether seq index return be one-based. Defaults to False. Raises: ValueError: if sba_idx, seg_sba_start_idx, or seg_sba_end_idx is not valid ValueError: if sba_strand is not recognized Returns: int: sequence forward strand index """ # check that sba_idx, seg_sba_start_idx, and seg_sba_end_idx are valid if sba_idx < seg_sba_start_idx: raise ValueError(f"sba_idx ({sba_idx}) must be >= seg_sba_start_idx ({seg_sba_start_idx})") if sba_idx > seg_sba_end_idx: raise ValueError(f"sba_idx ({sba_idx}) must be <= seg_end_start_idx ({seg_sba_end_idx})") if seg_sba_start_idx > seg_sba_end_idx: raise ValueError( f"seg_sba_start_idx ({seg_sba_start_idx}) must be <= seg_sba_end_idx ({seg_sba_end_idx})" ) if seg_sba_start_idx < 0: raise ValueError(f"seg_sba_start_idx ({seg_sba_start_idx}) must be > 0") # calculate seq_idx if sba_strand == "forward": seq_idx = sba_idx - seg_sba_start_idx elif sba_strand == "reverse_complement": seq_idx = seg_sba_end_idx - sba_idx else: raise ValueError(f"sba_strand ({sba_strand}) not recognized") # add one if one_based if one_based: seq_idx += 1 return seq_idx
[docs] @jit def get_sba_start_end_indices_for_segment( segment_num: int, sba_strand: str, sba_seg_starts: np.array, len_sba: int ) -> tuple[int, int]: """ Given a segment number (and optionally sba_strand), return the first sba index and last sba index of the segment. Args: segment_num (int): segment number sba_strand (str, optional): sequence byte array strand ("forward", "reverse_complement", or "both"). Defaults to None. Raises: ValueError: segment_num out of bounds Returns: tuple[int, int]: sba_start_index, sba_end_index """ # verify segment_num is valid if segment_num < 0: raise ValueError(f"segment_num ({segment_num}) is out of bounds") elif segment_num >= len(sba_seg_starts): raise ValueError(f"segment_num ({segment_num}) is out of bounds") # get start_index sba_start_index = sba_seg_starts[segment_num] if segment_num == len(sba_seg_starts) - 1: sba_end_index = len_sba - 1 else: sba_end_index = sba_seg_starts[segment_num + 1] - 2 return sba_start_index, sba_end_index
[docs] class SequenceCollection: """ Holds all the information contained within a fasta file in a format conducive to kmer sorting. Terminology ----------- .. code-block:: sba: sequence byte array revcomp: reverse complement record: each header and its corresponding sequence is called a record. record_num is based on the order that records are read in. record_num does not change when reverse complemented segment: is the same as a record except that segment_num always starts leftmost. i.e. the sba end index for segment N is always less than the sba end index for segment M > N forward_sba_idx: index in forward sequence byte array revcomp_sba_idx: index in reverse complement sequence byte array forward_seq_idx: 0-based index for a sequence on the forward strand revcomp_seq_idx: 0-based index for a sequence on the reverse complement strand Forward strand -------------- record_num 0 1 2 segment_num 0 1 2 forward_record_name A B C forward_sba [-------]$[------------]$[--------------------] | | | | | | forward_sba_seg_starts 0 | 10 | 25 | forward_sba_seg_ends 8 23 46 Reverse complement strand ------------------------- revcomp_sba_seg_ends 21 36 46 revcomp_sba_seg_starts 0 | 23 | 38 | | | | | | | revcomp_sba [====================]$[============]$[=======] revcomp_record_name C B A segment_num 0 1 2 record_num 2 1 0 Notes ----- .. code-block:: - a "$" is placed between each sequence so that you can determine if you've reached the end of a sequence without referencing the seg_starts / seg_ends array. - the collection must contain at least one sequence - all sequences in the collection must have a length > 0 - duplicate record_names are not allowed Members ------- .. code-block:: forward_sba (np.array): forward sba (dtype=uint8) _forward_sba_seg_starts (np.array): value at index i gives the sba start index for the ith segment on the forward strand. (dtype=uint32) revcomp_sba (np.array): reverse complement sba (dtype=uint8) _revcomp_sba_seg_starts (np.array): value at index i gives the sba start index for the ith segment on the reverse complement strand. (dtype=uint32) """ def __init__( self, fasta_file_path: Union[Path, None] = None, sequence_list: Union[list[tuple[str, str]], None] = None, strands_to_load: str = "forward", ) -> None: """ Initializes a SequenceCollection object. Args: fasta_file_path (Path, optional): The path to the fasta formatted file to be read. Defaults to None. Must be specified if sequence_list is not. sequence_list (list[tuple[str, str]], optional): List of (seq_id, seq) tuples defining a sequence collection. seq_id is the sequences id (i.e. the header in a fasta file, such as "chr1"). seq is the string sequence (e.g. "ACTG"). Defaults to None. Must be specified if fasta_file_path is not. strands_to_load (str, optional): which strand(s) to load into memory. One of "forward", "reverse_complement", "both". Defaults to "forward". """ # set default parameters based on user arguments self.forward_sba = None self._forward_sba_seg_starts = None self.forward_record_names = None self.revcomp_sba = None self._revcomp_sba_seg_starts = None self.revcomp_record_names = None self._strands_loaded = None self._fasta_file_path = None self._initialize_mapping_arrays() # if fasta_file_path and sequence_list are both None, do not do any additional initialization if fasta_file_path is None and sequence_list is None: return # check provided arguments if fasta_file_path is not None and sequence_list is not None: raise ValueError("Only one of fasta_file_path and sequence_list can be specified") if strands_to_load not in ("forward", "reverse_complement", "both"): raise ValueError(f"strands_to_load unrecognized ({strands_to_load})") # load sequence if fasta_file_path is not None: self._fasta_file_path = fasta_file_path self._initialize_from_fasta(fasta_file_path, strands_to_load) else: self._initialize_from_sequence_list(sequence_list, strands_to_load) return def __len__(self) -> int: """ Count of sequence records in the collection """ if self._strands_loaded == "forward" or self._strands_loaded == "both": return len(self._forward_sba_seg_starts) elif self._strands_loaded == "reverse_complement": return len(self._revcomp_sba_seg_starts) else: raise AssertionError(f"strands_loaded ({self._strands_loaded}) not recognized") def __str__(self) -> str: """ Return fasta-formatted sequence. If _strands_loaded == "forward" or "both", return the forward sequence. Otherwise, return the reverse complement, but keep the record order the same as for the forward sequence. """ sba_strand = ( "reverse_complement" if self._strands_loaded == "reverse_complement" else "forward" ) if sba_strand == "forward": sba = self.forward_sba elif sba_strand == "reverse_complement": sba = self.revcomp_sba lines = [] for record_name, sba_seg_start_idx, sba_seg_end_idx in self.iter_records(sba_strand): seq = bytearray(sba[sba_seg_start_idx : sba_seg_end_idx + 1]).decode() lines.append(f">{record_name}") lines.append(seq) return "\n".join(lines)
[docs] def sequence_length(self, record_num=None, record_name=None): """ Return: If record_num is specified, then the length of record_num. If record_name is specified, then the length of record_num corresponding to record_name Otherwise, the total length of all sequences """ if record_name is not None and record_num is not None: raise ValueError( f"record_num ({record_num}) and record_name ({record_name}) cannot both be specified" ) # TODO: implement this function after carefully thinking through the functionality desired. # record_num vs segment_num needs to be considered. This also likely needs to have # strand specified (if using segment_num) raise NotImplementedError()
[docs] def iter_records(self, sba_strand: str = None): """ Yield (record_name, record_sba_start_idx, record_sba_end_idx) for all sequence records in order of increasing **record_num**. i.e. if records are ordered as "chr1", "chr2", "chr3", they will be yielded in that same order regardless of whether the strand is "forward" or "reverse_complement". Args: sba_strand (str, optional): for which strand to yield records. If self._strands_loaded == "both", it must be specified. Otherwise it is determined from the loaded strand ("forward" or "reverse_complement"). Defaults to None. Yield: (record_name, record_sba_start_idx, record_sba_end_idx) """ # decide which strand to use based on user parameter sba_strand = self._get_sba_strand_to_use(sba_strand) if sba_strand == "forward": for segment_num in range(len(self)): record_name = self.forward_record_names[segment_num] sba_seg_start_idx, sba_seg_end_idx = get_sba_start_end_indices_for_segment( segment_num, sba_strand, self._forward_sba_seg_starts, len(self.forward_sba) ) yield (record_name, sba_seg_start_idx, sba_seg_end_idx) elif sba_strand == "reverse_complement": # iterate in opposite order of segments to maintain correct record_num ordering for segment_num in range(len(self) - 1, -1, -1): record_name = self.revcomp_record_names[segment_num] sba_seg_start_idx, sba_seg_end_idx = get_sba_start_end_indices_for_segment( segment_num, sba_strand, self._revcomp_sba_seg_starts, len(self.revcomp_sba) ) yield (record_name, sba_seg_start_idx, sba_seg_end_idx) else: raise ValueError(f"sba_strand ({sba_strand}) must be 'forward' or 'reverse_complement'") return
[docs] def strands_loaded(self) -> str: """ Returns which strands are loaded. Returns: str: which strands are loaded "forward", "reverse_complement", "both" """ return self._strands_loaded
@staticmethod def _get_complement_mapping_array(): """ Initialize the reverse_complement byte mapping array """ # https://www.bioinformatics.org/sms/iupac # '$' is a special character that is used to mark the boundary between records in the # sequence byte array complement_mapping_dict = { "A": "T", "C": "G", "G": "C", "T": "A", "R": "Y", "Y": "R", "S": "S", "W": "W", "K": "M", "M": "K", "B": "V", "D": "H", "H": "D", "V": "B", "N": "N", "$": "$", } # build array mapping complement_mapping_arr = np.zeros(256, dtype=np.uint8) for key, val in complement_mapping_dict.items(): complement_mapping_arr[ord(key)] = ord(val) return complement_mapping_arr def _initialize_mapping_arrays(self): """ Initialize mappings between uint8 value (the dtype stored in the sequence byte arrays) and the u1 value (i.e. unicode char of length 1) """ # https://www.bioinformatics.org/sms/iupac self._allowed_bases = { "A", "C", "G", "T", "R", "Y", "S", "W", "K", "M", "B", "D", "H", "V", "N", "$", } self._allowed_uint8 = {ord(base) for base in self._allowed_bases} # initialize arrays to map from from uint8 to character and vice versa self._complement_mapping_arr = SequenceCollection._get_complement_mapping_array() self._uint8_to_u1_mapping = np.zeros(256, dtype="U1") self._u1_to_uint8_mapping = dict() self._numba_unicode_to_uint8_mapping = Dict.empty( key_type=types.unicode_type, value_type=types.uint8 ) for i in range(256): self._u1_to_uint8_mapping[chr(i)] = i self._numba_unicode_to_uint8_mapping[chr(i)] = types.uint8(i) self._uint8_to_u1_mapping[i] = chr(i) return @staticmethod def _get_fasta_stats(fasta_file_path: Path) -> tuple[int, int]: """ Read fasta file to determine number of records and total sequence length. Args: fasta_file_path (Path): fasta file Returns: tuple[int, int]: num_records, total_seq_len """ num_records = 0 total_seq_len = 0 with open(fasta_file_path, "r") as input_file: for line in input_file: if line.startswith(">"): num_records += 1 else: total_seq_len += len(line.strip()) return num_records, total_seq_len @staticmethod def _get_fasta_record_name(line: str) -> str: """ Get the fasta record name from a line starting with ">". Use the same method as Bowtie, which reads all characters following ">" until the end of the line or whitespace. Args: line (str): fasta file line Raises: ValueError: if line does not start with ">" Returns: str: record_name """ if not line.startswith(">"): raise ValueError("line does not start with '>'") record_name = line[1:].strip().split()[0] return record_name def _load_forward_sba_from_fasta( self, fasta_file_path: Path, num_records: int, total_seq_len: int ) -> tuple[np.array, np.array]: """ Load forward sequence byte array from fasta file. Args: fasta_file_path (Path): fasta file to read num_records (int): number of records in the file total_seq_len (int): total length of all sequences contained in the fasta file Returns: tuple[np.array, np.array]: _description_ """ sba_len = total_seq_len + num_records - 1 sba_seg_starts = np.zeros(num_records, dtype=np.uint32) sba = np.zeros(sba_len, dtype=np.uint8) record_names = [] first_empty_idx = 0 record_num = -1 with open(fasta_file_path, "r") as input_file: for line in input_file: if line.startswith(">"): record_num += 1 # if there is previous sequence, need to add a "$" to sba if first_empty_idx != 0: sba[first_empty_idx] = ord("$") first_empty_idx += 1 # set the segment start index sba_seg_starts[record_num] = first_empty_idx # collect the record_name record_name = SequenceCollection._get_fasta_record_name(line) record_names.append(record_name) else: sba_chunk = bytearray(line.strip().upper(), "utf-8") sba[first_empty_idx : first_empty_idx + len(sba_chunk)] = sba_chunk first_empty_idx += len(sba_chunk) if first_empty_idx != sba_len: raise AssertionError("After parsing the fasta file, we expect sba to be full") # check for any sequences with length zero empty_seq_found = (np.diff(sba_seg_starts) < 2).any() if empty_seq_found: raise ValueError( f"At least one empty sequence was found in the input file ({fasta_file_path})" ) SequenceCollection._verify_record_names_are_unique(record_names) # verify that there are no unrecognized values in the sba values_in_sba = set(np.unique(sba)) values_not_allowed = values_in_sba - self._allowed_uint8 if values_not_allowed != set(): raise ValueError(f"Sequence contains non-allowed characters! ({values_not_allowed})") return sba, sba_seg_starts, record_names def _initialize_from_fasta(self, fasta_file_path: Path, strands_to_load: str) -> None: """ Initialize from SequenceCollection object from a fasta file. Args: fasta_file_path (Path): fasta file strands_to_load (str): which strand(s) to load into sequence byte arrays ("forward", "reverse_complement", or "both") Raises: ValueError: if strand is not recognized """ if strands_to_load not in ("forward", "reverse_complement", "both"): raise ValueError(f"strands_to_load not recognized ({strands_to_load})") self.forward_sba = None self._forward_sba_seg_starts = None self.revcomp_sba = None self._revcomp_sba_seg_starts = None self.forward_record_names = None self.revcomp_record_names = None # determine the full size of the fasta file num_records, total_seq_len = self._get_fasta_stats(fasta_file_path) if strands_to_load == "forward" or strands_to_load == "both": self.forward_sba, self._forward_sba_seg_starts, self.forward_record_names = ( self._load_forward_sba_from_fasta(fasta_file_path, num_records, total_seq_len) ) if strands_to_load == "both": # take advantage of having forward_sba and _forward_sba_seg_starts already loaded # into memory. We need only copy the array and reverse_complement. self.revcomp_sba = np.copy(self.forward_sba) reverse_complement_sba(self.revcomp_sba, self._complement_mapping_arr, inplace=True) self._revcomp_sba_seg_starts = self._get_opposite_strand_sba_start_indices( self._forward_sba_seg_starts, len(self.revcomp_sba), ) self.revcomp_record_names = self.forward_record_names.copy() self.revcomp_record_names.reverse() elif strands_to_load == "reverse_complement": # load forward strand information and then reverse complement in place self.forward_sba, self._forward_sba_seg_starts, self.forward_record_names = ( self._load_forward_sba_from_fasta(fasta_file_path, num_records, total_seq_len) ) self._strands_loaded = "forward" self.reverse_complement() self._strands_loaded = strands_to_load return @staticmethod def _get_required_sba_length_from_sequence_list(sequence_list: list[tuple[str, str]]) -> int: """ Calculate the size of the sequence byte array to allocate. Note that a '$' is placed between each sequence, which accounts for the length beyond just sequence. Args: sequence_list (list[tuple[str, str]]): List of (seq_id, seq) tuples defining a sequence collection. seq_id is the sequences id (i.e. the header in a fasta file, such as "chr1"). seq is the string sequence (e.g. "ACTG"). Defaults to None. Must be specified if fasta_file_path is not. Raises: ValueError: raised if there is a sequence in sequence_list with length 0 Returns: sba_length (int): length required for sequence byte array """ total_seq_len = 0 for record_name, seq in sequence_list: if len(seq) == 0: raise ValueError( f"Each sequence in the collection must have length > 0. Record '{record_name}' has a sequence lengt of 0" ) total_seq_len += len(seq) sba_length = total_seq_len + len(sequence_list) - 1 return sba_length def _get_sba_from_sequence_list(self, sequence_list: list[tuple[str, str]]) -> np.array: """ Generate a sequence byte array from a sequence list. Args: sequence_list (list[tuple[str, str]]): List of (seq_id, seq) tuples defining a sequence collection. seq_id is the sequences id (i.e. the header in a fasta file, such as "chr1"). seq is the string sequence (e.g. "ACTG"). Defaults to None. Must be specified if fasta_file_path is not. Raises: ValueError: raised when sequence contains non-allowed values Returns: sba (np.array): sequence byte array """ sba_length = SequenceCollection._get_required_sba_length_from_sequence_list(sequence_list) sba = np.zeros(sba_length, dtype=np.uint8) last_filled_idx = -1 for i, (_, seq) in enumerate(sequence_list): start_idx = last_filled_idx + 1 sba[start_idx : start_idx + len(seq)] = bytearray(seq, "utf-8") last_filled_idx = start_idx + len(seq) - 1 # place a '$' between loaded sequences if i != len(sequence_list) - 1: last_filled_idx += 1 sba[last_filled_idx] = ord("$") # verify that there are no unrecognized values in the sba values_in_sba = set(np.unique(sba)) values_not_allowed = values_in_sba - self._allowed_uint8 if values_not_allowed != set(): raise ValueError(f"Sequence contains non-allowed characters! ({values_not_allowed})") return sba @staticmethod def _get_sba_starts_from_sequence_list(sequence_list: list[tuple[str, str]]) -> np.array: """ Generate an array of sequence start indices within the sequence byte array from sequence_list. Args: sequence_list (list[tuple[str, str]]): List of (seq_id, seq) tuples defining a sequence collection. seq_id is the sequences id (i.e. the header in a fasta file, such as "chr1"). seq is the string sequence (e.g. "ACTG"). Defaults to None. Must be specified if fasta_file_path is not. Returns: sba_starts (np.array): array with sba index for the start of the sequence (dtype=uint32) """ sba_seq_starts = np.zeros(len(sequence_list), dtype=np.uint32) last_filled_idx = -1 for i, (_, seq) in enumerate(sequence_list): start_idx = last_filled_idx + 1 sba_seq_starts[i] = start_idx last_filled_idx = start_idx + len(seq) - 1 # place a '$' between loaded sequences if i != len(sequence_list) - 1: last_filled_idx += 1 return sba_seq_starts @staticmethod def _verify_record_names_are_unique(record_names): # verify that there are no repeated record names that would lead to ambiguity record_names_counter = Counter(record_names) if len(record_names) != len(record_names_counter): num_record_names_repeated = len( [1 for count in record_names_counter.values() if count > 1] ) raise ValueError( f"sequence_list contains {num_record_names_repeated} repeated record_names" ) return @staticmethod def _get_record_names_from_sequence_list(sequence_list: list[tuple[str, str]]) -> List[str]: """ Get the list of record names from sequence_list. Verify that there are no duplicates. Args: sequence_list (list[tuple[str, str]]): List of (seq_id, seq) tuples defining a sequence collection. seq_id is the sequences id (i.e. the header in a fasta file, such as "chr1"). seq is the string sequence (e.g. "ACTG"). Defaults to None. Must be specified if fasta_file_path is not. Returns: """ record_names = [record_name for record_name, _ in sequence_list] SequenceCollection._verify_record_names_are_unique(record_names) return record_names def _initialize_from_sequence_list( self, sequence_list: list[tuple[str, str]], strands_to_load: str ): """ Loads the sequence records from a list of (seq_id, seq) tuples. Args: sequence_list (list[tuple[str, str]]): List of (seq_id, seq) tuples defining a sequence collection. seq_id is the sequences id (i.e. the header in a fasta file, such as "chr1"). seq is the string sequence (e.g. "ACTG"). Defaults to None. Must be specified if fasta_file_path is not. strands_to_load (str, optional): which strand(s) to load into memory. One of "forward", "reverse_complement", "both". Defaults to "forward". """ if strands_to_load not in ("forward", "reverse_complement", "both"): raise ValueError(f"strands_to_load not recognized ({strands_to_load})") self.forward_sba = None self._forward_sba_seg_starts = None self.revcomp_sba = None self._revcomp_sba_seg_starts = None self.forward_record_names = None self.revcomp_record_names = None if strands_to_load == "forward" or strands_to_load == "both": self.forward_sba = self._get_sba_from_sequence_list(sequence_list) self._forward_sba_seg_starts = self._get_sba_starts_from_sequence_list(sequence_list) self.forward_record_names = self._get_record_names_from_sequence_list(sequence_list) if strands_to_load == "both": # take advantage of having forward_sba and _forward_sba_seg_starts already loaded # into memory. We need only copy the array and reverse_complement. self.revcomp_sba = np.copy(self.forward_sba) reverse_complement_sba(self.revcomp_sba, self._complement_mapping_arr, inplace=True) self._revcomp_sba_seg_starts = self._get_opposite_strand_sba_start_indices( self._forward_sba_seg_starts, len(self.revcomp_sba), ) self.revcomp_record_names = self.forward_record_names.copy() self.revcomp_record_names.reverse() elif strands_to_load == "reverse_complement": # load forward strand information and then reverse complement in place self.revcomp_sba = self._get_sba_from_sequence_list(sequence_list) reverse_complement_sba(self.revcomp_sba, self._complement_mapping_arr, inplace=True) self._revcomp_sba_seg_starts = self._get_sba_starts_from_sequence_list(sequence_list) self._revcomp_sba_seg_starts = self._get_opposite_strand_sba_start_indices( self._revcomp_sba_seg_starts, len(self.revcomp_sba), ) self.revcomp_record_names = self._get_record_names_from_sequence_list(sequence_list) self.revcomp_record_names.reverse() self._strands_loaded = strands_to_load return
[docs] def reverse_complement(self) -> np.array: """ Reverse complement the sequence byte array. Only valid if a single strand is loaded. """ if self._strands_loaded == "both": raise ValueError(f"self._strands_loaded ({self._strands_loaded}) cannot be 'both'") if self._strands_loaded == "forward": # sba self.revcomp_sba = self.forward_sba self.forward_sba = None reverse_complement_sba(self.revcomp_sba, self._complement_mapping_arr, inplace=True) # sba segment starts self._revcomp_sba_seg_starts = self._forward_sba_seg_starts self._forward_sba_seg_starts = None self._revcomp_sba_seg_starts = self._get_opposite_strand_sba_start_indices( self._revcomp_sba_seg_starts, len(self.revcomp_sba), ) # record names self.revcomp_record_names = self.forward_record_names self.revcomp_record_names.reverse() self.forward_record_names = None self._strands_loaded = "reverse_complement" elif self._strands_loaded == "reverse_complement": # sba self.forward_sba = self.revcomp_sba self.revcomp_sba = None reverse_complement_sba(self.forward_sba, self._complement_mapping_arr, inplace=True) # sba segment starts self._forward_sba_seg_starts = self._revcomp_sba_seg_starts self._revcomp_sba_seg_starts = None self._forward_sba_seg_starts = self._get_opposite_strand_sba_start_indices( self._forward_sba_seg_starts, len(self.forward_sba), ) # record names self.forward_record_names = self.revcomp_record_names self.forward_record_names.reverse() self.revcomp_record_names = None self._strands_loaded = "forward" return
@staticmethod def _get_opposite_strand_sba_index(sba_idx: int, sba_len: int) -> int: """ Get the mapped sequence byte array index for the opposite strand. Args: sba_idx (int): sequence byte array index sba_len (int): sequence byte array length Returns: opposite_strand_sba_idx (int): the converted index """ if sba_idx < 0 or sba_idx >= sba_len: raise ValueError(f"sba_idx ({sba_idx}) is out of bounds") return sba_len - 1 - sba_idx @staticmethod def _get_opposite_strand_sba_indices(sba_indices: np.array, sba_len: int) -> np.array: """ Get the mapped sequence byte array indices for the opposite strand. Args: sba_indices (np.array): an array of sequence byte array indices sba_len (int): sequence byte arrray length Returns: opposite_strand_sba_indices: the converted indices """ if (sba_indices < 0).any() or (sba_indices >= sba_len).any(): raise ValueError("There is at least one sba index that is out of bounds") return sba_len - 1 - sba_indices @staticmethod def _get_opposite_strand_sba_start_indices(sba_starts: np.array, sba_len: int) -> np.array: """ Get the sba_start_indices for the opposite strand. A sba_start_index is defined as the leftmost inclusive start index of the corresponding sequence. The sba_start_indices are ordered from smallest to largest. Args: sba_starts (np.array): sequence byte array start indices dtype=np.unit32 sba_len (int): total length of the sequence byte array Returns: opposite_strand_start_indices (np.array): sequence byte array start indices for the reverse complement. """ # NOTE: The start of each sequence on the opposite strand is the current end of the # sequence. Also, the order will need to be reversed to keep it in ascending order sba_end_indices = np.copy(sba_starts) if len(sba_end_indices) > 1: sba_end_indices[:-1] = sba_end_indices[1:] - 2 sba_end_indices[-1] = sba_len - 1 opposite_strand_start_indices = SequenceCollection._get_opposite_strand_sba_indices( np.flip(sba_end_indices), sba_len ) return opposite_strand_start_indices
[docs] def get_record_loc_from_sba_index( self, sba_idx: int, sba_strand: str = None, one_based: bool = False ) -> tuple[str, str, int]: """ Get the sequence location (strand, record_name, seq_idx) from the sequence byte array index Args: sba_idx (int): sequence byte array index sba_strand (str, optional): sequence byte array strand. If strands_loaded is "both", then it must be specified as forward" or "reverse_complement". Otherwise it is inferred. Defaults to None. one_based (bool, optional): whether seq index return be one-based. Defaults to False. Raises: ValueError: _description_ Returns: tuple[str, str, int]: _description_ """ # decide which strand to use based on user parameter sba_strand = self._get_sba_strand_to_use(sba_strand) # get all loc info if sba_strand == "forward": segment_num = get_segment_num_from_sba_index( sba_idx, sba_strand, self._forward_sba_seg_starts ) record_name = self.forward_record_names[segment_num] seg_sba_start_idx, seg_sba_end_idx = get_sba_start_end_indices_for_segment( segment_num, sba_strand, self._forward_sba_seg_starts, len(self.forward_sba) ) elif sba_strand == "reverse_complement": segment_num = get_segment_num_from_sba_index( sba_idx, sba_strand, self._revcomp_sba_seg_starts ) record_name = self.revcomp_record_names[segment_num] seg_sba_start_idx, seg_sba_end_idx = get_sba_start_end_indices_for_segment( segment_num, sba_strand, self._revcomp_sba_seg_starts, len(self.revcomp_sba) ) else: raise ValueError(f"sba_strand ({sba_strand}) not recognized") seq_idx = get_forward_seq_idx( sba_idx, sba_strand, seg_sba_start_idx, seg_sba_end_idx, one_based=one_based ) strand = "+" if sba_strand == "forward" else "-" return (strand, record_name, seq_idx)
[docs] def get_record_name_from_sba_index(self, sba_idx: int, sba_strand: str = None) -> str: """ Get the sequence record number from the sequence byte array index. Args: sba_idx (int): sequence byte array index sba_strand (str, optional): for which strand is the sba_idx defined ("forward" or "reverse_complement"). Must be defined when SequenceCollection has both strands loaded. If specified when only a single strand has been loaded, it will verify that it matches what is expected. If set to None, it will automatically detect the strand that was loaded. Defaults to None. Returns: record_name (str): """ # decide which strand to use based on user parameter sba_strand = self._get_sba_strand_to_use(sba_strand) if sba_strand == "forward": segment_num = get_segment_num_from_sba_index( sba_idx, sba_strand, self._forward_sba_seg_starts ) record_name = self.forward_record_names[segment_num] elif sba_strand == "reverse_complement": segment_num = get_segment_num_from_sba_index( sba_idx, sba_strand, self._revcomp_sba_seg_starts ) record_name = self.revcomp_record_names[segment_num] else: raise ValueError(f"sba_strand ({sba_strand}) not recognized") return record_name
def _get_sba_strand_to_use(self, sba_strand: str) -> str: # sba_strand only needs to be specified for self._strands_loaded == "both". If provided # for "forward" or "reverse_complement", it is verified to match if sba_strand is not None: if sba_strand == "forward": if self._strands_loaded == "reverse_complement": raise ValueError( f"sba_strand ({sba_strand}) does not match _strands_loaded ({self._strands_loaded})" ) elif sba_strand == "reverse_complement": if self._strands_loaded == "forward": raise ValueError( f"sba_strand ({sba_strand}) does not match _strands_loaded ({self._strands_loaded})" ) else: raise ValueError(f"sba_strand ({sba_strand}) not recognized") if self._strands_loaded == "both" and sba_strand is None: raise ValueError("sba_strand must be specified when both strands are loaded") sba_strand_to_use = self._strands_loaded if self._strands_loaded != "both" else sba_strand return sba_strand_to_use
[docs] def get_segment_num_from_sba_index(self, sba_idx: int, sba_strand: str = None) -> int: """ Get the segment number from the sequence byte array index defined on sba_strand (attempt to automatically detect the strand if not specified) Args: sba_idx (int): sequence byte array index sba_strand (str, optional): for which strand is the sba_idx defined ("forward" or "reverse_complement"). Must be defined when SequenceCollection has both strands loaded. If specified when only a single strand has been loaded, it will verify that it matches what is expected. If set to None, it will automatically detect the strand that was loaded. Defaults to None. Returns: segment_num (int): """ # decide which strand to use based on user parameter sba_strand = self._get_sba_strand_to_use(sba_strand) # get the segment number if sba_strand == "forward": if sba_idx < 0 or sba_idx >= len(self.forward_sba): raise IndexError(f"sba_idx ({sba_idx}) is out of bounds") segment_num = get_segment_num_from_sba_index( sba_idx, sba_strand, self._forward_sba_seg_starts ) elif sba_strand == "reverse_complement": if sba_idx < 0 or sba_idx >= len(self.revcomp_sba): raise IndexError(f"sba_idx ({sba_idx}) is out of bounds") segment_num = get_segment_num_from_sba_index( sba_idx, sba_strand, self._revcomp_sba_seg_starts ) return segment_num
[docs] def get_sba_start_end_indices_for_segment( self, segment_num: int, sba_strand: str = None ) -> tuple[int, int]: """ Given a segment number (and optionally sba_strand), return the first sba index and last sba index of the segment. Args: segment_num (int): segment number sba_strand (str, optional): sequence byte array strand ("forward", "reverse_complement", or "both"). Defaults to None. Raises: ValueError: segment_num out of bounds Returns: tuple[int, int]: sba_start_index, sba_end_index """ # set sba_strand to either "forward" or "reverse_complement" based provided argument sba_strand = self._get_sba_strand_to_use(sba_strand) if sba_strand == "forward": sba_seg_starts = self._forward_sba_seg_starts sba = self.forward_sba elif sba_strand == "reverse_complement": sba_seg_starts = self._revcomp_sba_seg_starts sba = self.revcomp_sba # verify segment_num is allowed if segment_num < 0: raise ValueError(f"segment_num ({segment_num}) is out of bounds") elif segment_num >= len(sba_seg_starts): raise ValueError(f"segment_num ({segment_num}) is out of bounds") # get start_index sba_start_index = sba_seg_starts[segment_num] if segment_num == len(sba_seg_starts) - 1: sba_end_index = len(sba) - 1 else: sba_end_index = sba_seg_starts[segment_num + 1] - 2 return sba_start_index, sba_end_index
[docs] def generate_get_record_info_from_sba_index_func(self, one_based: bool = False) -> Callable: """ Generate a function that returns record info when provided a sequence byte array index Args: one_based (bool, optional): Should sequence indices by one based. Defaults to False. Raises: ValueError: sba_strand not recognized Returns: Callable: get_record_info_from_sba_index """ # NOTE: below, record_names is caste into a tuple to avoid the following numba error # Exception has occurred: NumbaNotImplementedError (note: full exception trace is shown but execution is paused at: <module>) # Failed in nopython mode pipeline (step: native lowering) # <numba.core.base.OverloadSelector object at 0x7f4e9cf54880>, (List(unicode_type, True),) sba_strand = self._get_sba_strand_to_use(self.strands_loaded()) # set record_names, sba_seg_starts, seq_strand if sba_strand == "forward": record_names = tuple(self.forward_record_names) sba_seg_starts = self._forward_sba_seg_starts seq_strand = "+" len_sba = len(self.forward_sba) elif sba_strand == "reverse_complement": record_names = tuple(self.revcomp_record_names) sba_seg_starts = self._revcomp_sba_seg_starts seq_strand = "-" len_sba = len(self.revcomp_sba) else: raise ValueError(f"sba_strand ({sba_strand}) not recognized") @jit def get_record_info_from_sba_index(sba_idx: int) -> tuple[int, int, str, str, int]: """ Given a sequence byte array index, return record info. Args: sba_idx (int): sequence byte array index Raises: ValueError: _description_ Returns: tuple[int, int, str, str, int]: seg_sba_start_idx, seg_sba_end_idx, seq_strand, seq_record_name, seq_start_idx """ # get the segment number that contains sba_idx seg_num = get_segment_num_from_sba_index(sba_idx, sba_strand, sba_seg_starts) # get the segment start and end indices in the sequence byte array seg_sba_start_idx, seg_sba_end_idx = get_sba_start_end_indices_for_segment( seg_num, sba_strand, sba_seg_starts, len_sba ) # get the sequence index (on the forward strand) seq_start_idx = get_forward_seq_idx( sba_idx, sba_strand, seg_sba_start_idx, seg_sba_end_idx, one_based=one_based ) # get record name seq_record_name = record_names[seg_num] return ( seg_num, seg_sba_start_idx, seg_sba_end_idx, seq_strand, seq_record_name, seq_start_idx, ) return get_record_info_from_sba_index
def __ne__(self, other): return not self.__eq__(other) def __eq__(self, other): # NOTE: _fasta_file_path is not required to match for SequenceCollection objects to be equal # forward_sba if self.forward_sba is None and other.forward_sba is not None: return False elif self.forward_sba is not None and other.forward_sba is None: return False elif not np.array_equal(self.forward_sba, other.forward_sba): return False # _forward_sba_seg_starts if self._forward_sba_seg_starts is None and other._forward_sba_seg_starts is not None: return False elif self._forward_sba_seg_starts is not None and other._forward_sba_seg_starts is None: return False elif not np.array_equal(self._forward_sba_seg_starts, other._forward_sba_seg_starts): return False # forward_record_names if self.forward_record_names is None and other.forward_record_names is not None: return False elif self.forward_record_names is not None and other.forward_record_names is None: return False elif self.forward_record_names != other.forward_record_names: return False # revcomp_sba if self.revcomp_sba is None and other.revcomp_sba is not None: return False elif self.revcomp_sba is not None and other.revcomp_sba is None: return False elif not np.array_equal(self.revcomp_sba, other.revcomp_sba): return False # _revcomp_sba_seg_starts if self._revcomp_sba_seg_starts is None and other._revcomp_sba_seg_starts is not None: return False elif self._revcomp_sba_seg_starts is not None and other._revcomp_sba_seg_starts is None: return False elif not np.array_equal(self._revcomp_sba_seg_starts, other._revcomp_sba_seg_starts): return False # revcomp_record_names if self.revcomp_record_names is None and other.revcomp_record_names is not None: return False elif self.revcomp_record_names is not None and other.revcomp_record_names is None: return False elif self.revcomp_record_names != other.revcomp_record_names: return False # _strands_loaded if self._strands_loaded is None and other._strands_loaded is not None: return False elif self._strands_loaded is not None and other._strands_loaded is None: return False elif self._strands_loaded != other._strands_loaded: return False # if this is reached, then they are equal return True @staticmethod def _set_for_export(value, value_if_none): """ Helper function that converts a None value into the appropriate indicator value if value equals value_if_none. This is used to help deal with the HDF5 format not handling null values. Args: value: value we want to pass to serialize with HDF5 value_if_none: what to pass to HDF5 instead of value if value is None Returns: value_if_none if value is None else value """ if value is None: return value_if_none else: return value @staticmethod def _correct_import(value, value_if_none): """ Helper function that converts a value read from HDF5 to None if it equal to the indicator value value_if_none. This is used to help deal with the HDF5 format not handling null values. Args: value: value we want to pass to serialize with HDF5 value_if_none: what to pass to HDF5 instead of value if value is None Returns: None if value == value_if_none else value """ if isinstance(value, np.ndarray): if value.shape == (0,): return None elif value == value_if_none: return None return value
[docs] def save(self, save_file_path: Path, mode: str = "a", format: str = "hdf5") -> None: """ Save SequenceCollection object to file. Args: save_file_path (Path): path for saved file format (str, optional): "hdf5" or "shelve". Defaults to "hdf5". mode (str, optional): mode with which to open file and save information. "w" for write or "a" for append. Defaults to "a". Raises: ValueError: format not recognized """ if format == "hdf5": self._save_hdf5(save_file_path, mode=mode) elif format == "shelve": self._save_shelve(save_file_path) else: raise ValueError(f"format ({format}) not recognized")
[docs] def load(self, load_file_path: Path, format: str = "hdf5"): """ Load SequenceCollection object from saved file. Args: load_file_path (Path): path to file to load. format (str, optional): "hdf5" or "shelve". Defaults to "hdf5". Raises: ValueError: format not recognized """ if format == "hdf5": self._load_h5py(load_file_path) elif format == "shelve": self._load_shelve(load_file_path) else: raise ValueError(f"format ({format}) not recognized")
def _save_hdf5(self, save_file_path: Path, mode: str = "a") -> None: """ Save SequenceCollection object information to HDF5 file format. Args: save_file_path (Path): path for saved file mode (str, optional): mode with which to open file and save information. "w" for write or "a" for append. Defaults to "a". """ with h5py.File(save_file_path, mode) as file: grp = file.create_group("seq_coll") # hdf5 does not accept None values. Correct them before exporting. forward_sba = self._set_for_export(self.forward_sba, np.array([], dtype=np.uint8)) _forward_sba_seg_starts = self._set_for_export(self._forward_sba_seg_starts, []) forward_record_names = self._set_for_export(self.forward_record_names, []) revcomp_sba = self._set_for_export(self.revcomp_sba, np.array([], dtype=np.uint8)) _revcomp_sba_seg_starts = self._set_for_export(self._revcomp_sba_seg_starts, []) revcomp_record_names = self._set_for_export(self.revcomp_record_names, []) _strands_loaded = self._set_for_export(self._strands_loaded, "") _fasta_file_path = self._set_for_export(self._fasta_file_path, "") # save them to file grp["forward_sba"] = forward_sba grp["_forward_sba_seg_starts"] = _forward_sba_seg_starts grp["forward_record_names"] = forward_record_names grp["revcomp_sba"] = revcomp_sba grp["_revcomp_sba_seg_starts"] = _revcomp_sba_seg_starts grp["revcomp_record_names"] = revcomp_record_names grp["_strands_loaded"] = _strands_loaded grp["_fasta_file_path"] = str(_fasta_file_path) # do not save the members set by _initialize_mapping_arrays() return def _load_h5py(self, load_file_path: Path): """ Load SequenceCollection object information from HDF5 file format. Args: load_file_path (Path): path to file to load. """ with h5py.File(load_file_path, "r") as file: grp = file["seq_coll"] empty_sba = np.array([], dtype=np.uint8) # read values from file self.forward_sba = self._correct_import(grp["forward_sba"][:], empty_sba) self._forward_sba_seg_starts = self._correct_import( grp["_forward_sba_seg_starts"][:], [] ) self.forward_record_names = [val.decode("utf-8") for val in grp["forward_record_names"]] self.forward_record_names = self._correct_import(self.forward_record_names, []) self.revcomp_sba = self._correct_import(grp["revcomp_sba"][:], empty_sba) self._revcomp_sba_seg_starts = self._correct_import( grp["_revcomp_sba_seg_starts"][:], [] ) self.revcomp_record_names = [val.decode("utf-8") for val in grp["revcomp_record_names"]] self.revcomp_record_names = self._correct_import(self.revcomp_record_names, []) self._strands_loaded = self._correct_import( grp["_strands_loaded"][()].decode("utf-8"), "" ) self._fasta_file_path = self._correct_import( grp["_fasta_file_path"][()].decode("utf-8"), "" ) if self._fasta_file_path is not None: self._fasta_file_path = Path(self._fasta_file_path) # initialize mapping arrays self._initialize_mapping_arrays() return def _save_shelve(self, save_file_path: Path) -> None: """ Save SequenceCollection object information to shelve file format. Args: save_file_path (Path): path for saved file """ with shelve.open(save_file_path, protocol=pickle.DEFAULT_PROTOCOL) as db: db["seq_coll.forward_sba"] = self.forward_sba db["seq_coll._forward_sba_seg_starts"] = self._forward_sba_seg_starts db["seq_coll.forward_record_names"] = self.forward_record_names db["seq_coll.revcomp_sba"] = self.revcomp_sba db["seq_coll._revcomp_sba_seg_starts"] = self._revcomp_sba_seg_starts db["seq_coll.revcomp_record_names"] = self.revcomp_record_names db["seq_coll._strands_loaded"] = self._strands_loaded db["seq_coll._fasta_file_path"] = self._fasta_file_path # do not save the members set by _initialize_mapping_arrays() return def _load_shelve(self, load_file_path: Path): """ Load SequenceCollection object information from shelve file format. Args: load_file_path (Path): path to file to load. """ with shelve.open(load_file_path) as db: self.forward_sba = db["seq_coll.forward_sba"] self._forward_sba_seg_starts = db["seq_coll._forward_sba_seg_starts"] self.forward_record_names = db["seq_coll.forward_record_names"] self.revcomp_sba = db["seq_coll.revcomp_sba"] self._revcomp_sba_seg_starts = db["seq_coll._revcomp_sba_seg_starts"] self.revcomp_record_names = db["seq_coll.revcomp_record_names"] self._strands_loaded = db["seq_coll._strands_loaded"] self._fasta_file_path = db["seq_coll._fasta_file_path"] self._initialize_mapping_arrays() return