Coverage for seqbank/io.py: 100.00%

61 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-12-02 04:29 +0000

1from pathlib import Path 

2import gzip 

3import bz2 

4from Bio import SeqIO 

5import requests 

6import tempfile 

7 

8 

9def open_path(path: Path | str): 

10 """ 

11 Opens a file, optionally decompressing it based on its extension. 

12 

13 Args: 

14 path (Path | str): The path to the file, which can be a compressed (.gz, .bz2) or uncompressed file. 

15 

16 Returns: 

17 file object: A file object opened for reading. 

18 

19 Raises: 

20 ValueError: If the file has an unsupported compression extension. 

21 """ 

22 path = Path(path) 

23 suffix = path.suffix.lower() 

24 

25 if suffix == ".gz": 

26 return gzip.open(path, "rt") 

27 

28 if suffix == ".bz2": 

29 return bz2.open(path, "rt") 

30 

31 return open(path, "rt") 

32 

33 

34def get_file_format(path: Path | str) -> str: 

35 """ 

36 Determines the sequence file format based on the file extension. 

37 

38 Supported file formats and their extensions: 

39 - FASTA: .fa, .fna, .fasta 

40 - GenBank: .genbank, .gb, .gbk 

41 - EMBL: .embl 

42 - Tab-delimited: .tab, .tsv 

43 - Nexus: .nexus, .nxs 

44 - FASTQ: .fastq, .fq 

45 

46 Args: 

47 path (Path | str): The path to the file. 

48 

49 Returns: 

50 str: The detected file format (e.g., 'fasta', 'genbank', 'embl', etc.). 

51 

52 Raises: 

53 ValueError: If the file format cannot be determined. 

54 """ 

55 path = Path(path) 

56 suffix = path.suffix.lower() 

57 

58 if suffix in [".gz", ".bz2"]: 

59 suffix = path.suffixes[-2].lower() 

60 

61 if suffix in [".fa", ".fna", ".fasta"]: 

62 return "fasta" 

63 

64 if suffix in [".genbank", ".gb", ".gbk"]: 

65 return "genbank" 

66 

67 if suffix in [".embl"]: 

68 return "embl" 

69 

70 if suffix in [".tab", ".tsv"]: 

71 return "tab" 

72 

73 if suffix in [".nexus", ".nxs"]: 

74 return "nexus" 

75 

76 if suffix in [".fastq", ".fq"]: 

77 return "fastq" 

78 

79 raise ValueError(f"Cannot determine file format of {path}.") 

80 

81 

82def seq_count(path: Path | str) -> int: 

83 """ 

84 Counts the number of sequences in a file. 

85 

86 Args: 

87 path (Path | str): The path to the file. 

88 

89 Returns: 

90 int: The total number of sequences in the file. 

91 """ 

92 file_format = get_file_format(path) 

93 with open_path(path) as f: 

94 total = sum(1 for _ in SeqIO.parse(f, file_format)) 

95 return total 

96 

97 

98def download_file(url: str, local_path: Path) -> Path: 

99 """ 

100 Downloads a file from a URL and saves it to a specified local path. 

101 

102 Args: 

103 url (str): The URL of the file to download. 

104 local_path (Path): The path where the file should be saved. 

105 

106 Returns: 

107 Path: The path to the downloaded file. 

108 

109 Raises: 

110 requests.exceptions.HTTPError: If the download fails. 

111 """ 

112 with requests.get(url, stream=True) as r: 

113 r.raise_for_status() 

114 with open(local_path, "wb") as f: 

115 for chunk in r.iter_content(chunk_size=8192): 

116 f.write(chunk) 

117 return local_path 

118 

119 

120class TemporaryDirectory(tempfile.TemporaryDirectory): 

121 """ 

122 A custom TemporaryDirectory class that can optionally create a parent directory if a prefix is provided. 

123 

124 Args: 

125 prefix (str | Path | None): Optional prefix for the temporary directory. 

126 """ 

127 

128 def __init__(self, prefix: str | Path | None = None, *args, **kwargs): 

129 self._created_dirs = [] # Track directories that were created 

130 if isinstance(prefix, Path): 

131 # Resolve path to string and ensure parent directories are created 

132 prefix.mkdir(parents=True, exist_ok=True) 

133 self._created_dirs.append(prefix) # Track created parent directories 

134 

135 # Ensure prefix ends with a "/" 

136 prefix = str(prefix.resolve().absolute()) 

137 if not prefix.endswith("/"): 

138 prefix += "/" 

139 

140 super().__init__(prefix=prefix, *args, **kwargs) 

141 

142 def cleanup(self): 

143 """ 

144 Cleans up the temporary directory and removes any created parent directories if they are empty. 

145 """ 

146 super().cleanup() 

147 for created_dir in self._created_dirs: 

148 if not any(created_dir.iterdir()): # Only remove if the directory is empty 

149 created_dir.rmdir() 

150 

151 def __enter__(self) -> Path: 

152 """ 

153 Enters the context of the temporary directory. 

154 

155 Returns: 

156 Path: The path to the temporary directory. 

157 """ 

158 return Path(super().__enter__())