Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1from zlib import adler32 

2import time 

3from dataclasses import dataclass 

4from os import access 

5import urllib.request 

6import requests 

7import humanize 

8import sys 

9import gzip 

10from Bio import SeqIO 

11import re 

12import progressbar 

13import h5py 

14import pandas as pd 

15import asyncio 

16import httpx 

17from appdirs import user_data_dir 

18from bs4 import BeautifulSoup 

19 

20from pathlib import Path 

21from . import tensor 

22 

23 

24def open_fasta(fasta_path, use_gzip): 

25 if use_gzip: 

26 return gzip.open(fasta_path, "rt") 

27 return open(fasta_path, "rt") 

28 

29 

30REFSEQ_CATEGORIES = [ 

31 "archaea", # prokaryotic 

32 "bacteria", # prokaryotic 

33 "fungi", # eukaryotic 

34 "invertebrate", # eukaryotic 

35 "mitochondrion", # organellar 

36 "plant", # eukaryotic 

37 "plasmid", 

38 "plastid", # organellar 

39 "protozoa", # eukaryotic 

40 "vertebrate_mammalian", # eukaryotic 

41 "vertebrate_other", # eukaryotic 

42 "viral", # viral 

43] 

44 

45PROKARYOTIC = ( 

46 "archaea", 

47 "bacteria", 

48) 

49EUKARYOTIC = ( 

50 "fungi", 

51 "invertebrate", 

52 "plant", 

53 "protozoa", 

54 "vertebrate_mammalian", 

55 "vertebrate_other", 

56) 

57ORGANELLAR = ( 

58 "mitochondrion", 

59 "plastid", 

60) 

61 

62 

63def root_dir(): 

64 """ 

65 Returns the path to the root directory of this project. 

66 

67 This is useful for finding the data directory. 

68 """ 

69 return Path(__file__).parent.resolve() 

70 

71 

72def global_data_dir(): 

73 """Returns the path to the directory to hold all the data from the VBA website.""" 

74 return root_dir() / "data" 

75 

76 

77def filesize_readable(path: (str, Path)) -> str: 

78 path = Path(path) 

79 if not path.exists(): 

80 return f"File {path} does not exist." 

81 return humanize.naturalsize(path.stat().st_size) 

82 

83 

84@dataclass 

85class RefSeqCategory: 

86 name: str 

87 max_files: int = None 

88 base_dir: str = global_data_dir() 

89 

90 def __getstate__(self): 

91 # Only returns required elements 

92 # Needed because h5 files cannot be pickled 

93 return dict(name=self.name, max_files=self.max_files, base_dir=self.base_dir) 

94 

95 def base_url(self): 

96 return f"https://ftp.ncbi.nlm.nih.gov/refseq/release/{self.name}" 

97 

98 def filename(self, index) -> str: 

99 """The filename in the RefSeq database for this index.""" 

100 if self.max_files: 

101 assert index < self.max_files 

102 

103 return f"{self.name}.{index+1}.1.genomic.fna.gz" 

104 

105 def fasta_url(self, index: int) -> str: 

106 """The url for the fasta file for this index online.""" 

107 return f"{self.base_url()}/{self.filename(index)}" 

108 

109 def index_html(self): 

110 index_path = Path(user_data_dir()) / f"{self.name}.index.html" 

111 if not index_path.exists(): 

112 url = self.base_url() 

113 print("Downloading:", url) 

114 urllib.request.urlretrieve(url, index_path) 

115 with open(index_path, 'r') as f: 

116 contents = f.read() 

117 soup = BeautifulSoup(contents, 'html.parser') 

118 return soup 

119 

120 async def download(self, index: int): 

121 local_path = self.fasta_path(index, download=False) 

122 

123 if not local_path.exists(): 

124 limits = httpx.Limits(max_keepalive_connections=5, max_connections=10) 

125 async with httpx.AsyncClient(limits=limits) as client: 

126 url = self.fasta_url(index) 

127 print(f"downloading {url}") 

128 response = await client.get(url) 

129 open(local_path, 'wb').write(response.content) 

130 print(f"done {local_path}") 

131 return local_path 

132 

133 async def download_all_async(self): 

134 paths = [] 

135 max_files = self.max_files_available() 

136 if self.max_files: 

137 max_files = min(max_files, self.max_files) 

138 print(f"max_files = {max_files}") 

139 for index in range(max_files): 

140 paths.append(self.download(index)) 

141 await asyncio.gather(*paths) 

142 

143 def download_all(self): 

144 max_files = self.max_files_available() 

145 if self.max_files: 

146 max_files = min(max_files, self.max_files) 

147 print(f"max_files = {max_files}") 

148 for index in range(max_files): 

149 print(f"{self.name} - {index}") 

150 try: 

151 self.fasta_path(index, download=True) 

152 except Exception: 

153 print(f"failed to download. {self.name} file {index}") 

154 

155 def fasta_path(self, index: int, download=True) -> Path: 

156 """ 

157 Returns the local path for the file at this index. 

158 

159 If the file does not exist already in the base_dir then it is downloaded. 

160 """ 

161 local_path = Path(self.base_dir) / self.name / self.filename(index) 

162 local_path.parent.mkdir(exist_ok=True, parents=True) 

163 if download and not local_path.exists(): 

164 url = self.fasta_url(index) 

165 print("Downloading:", url) 

166 urllib.request.urlretrieve(url, local_path) 

167 return local_path 

168 

169 def h5_path(self): 

170 base_dir = Path(self.base_dir) 

171 base_dir.mkdir(exist_ok=True, parents=True) 

172 return base_dir / f"{self.name}.h5" 

173 

174 def fasta_seq_count(self, index: int) -> int: 

175 fasta_path = self.fasta_path(index) 

176 seq_count = 0 

177 with gzip.open(fasta_path, "rt") as fasta: 

178 for line in fasta: 

179 if line.startswith(">"): 

180 seq_count += 1 

181 return seq_count 

182 

183 def get_seq(self, accession): 

184 if not hasattr(self, 'read_h5'): 

185 self.read_h5 = h5py.File(self.h5_path(), "r") 

186 try: 

187 return self.read_h5[self.dataset_key(accession)] 

188 except Exception: 

189 raise Exception(f"Failed to read {accession} in {self.name}") 

190 # print(f"Failed to read {accession} in {self.name}") 

191 # return [] 

192 

193 def dataset_key(self, accession): 

194 # Using adler32 for a fast deterministic hash 

195 accession_hash = str(adler32(accession.encode('ascii'))) 

196 return f"/{accession_hash[-6:-3]}/{accession_hash[-3:]}/{accession}" 

197 

198 def max_files_available(self): 

199 max_files = 0 

200 soup = self.index_html() 

201 for link in soup.findAll("a"): 

202 m = re.match(self.name + r"\.(\d+)\.1\.genomic\.fna\.gz", link.get("href")) 

203 if m: 

204 max_files = max(int(m.group(1)), max_files) 

205 return max_files 

206 

207 def get_accessions(self): 

208 accessions = set() 

209 

210 with h5py.File(self.h5_path(), "a") as h5: 

211 for key0 in h5.keys(): 

212 for key1 in h5[f"/{key0}"].keys(): 

213 dir_accessions = h5[f"/{key0}/{key1}"].keys() 

214 # for accession in dir_accessions: 

215 # print(accession) 

216 accessions.update(dir_accessions) 

217 return accessions 

218 

219 def write_h5(self, show_bar=True, file_indexes=None): 

220 result = [] 

221 if not sys.stdout.isatty(): 

222 show_bar = False 

223 

224 if file_indexes is None or len(file_indexes) == 0: 

225 max_files = self.max_files_available() 

226 if self.max_files: 

227 max_files = min(max_files, self.max_files) 

228 

229 file_indexes = range(max_files) 

230 

231 accessions = self.get_accessions() 

232 print(f"{len(accessions)} sequences already in HDF5 file for {self.name}.") 

233 with h5py.File(self.h5_path(), "a") as h5: 

234 for file_index in file_indexes: 

235 print(f"Preprocessing file {file_index} from {self.name}", flush=True) 

236 # Try to get next file 

237 try: 

238 fasta_path = self.fasta_path(file_index) 

239 seq_count = self.fasta_seq_count(file_index) 

240 except Exception: 

241 # If it fails, then assume it doesn't exist and exit 

242 print(f"Fasta file at index {file_index} for {self.name} not found.") 

243 continue 

244 

245 result.extend(self.import_fasta(fasta_path, h5, show_bar=True)) 

246 print() 

247 

248 df = pd.DataFrame(result) 

249 return df 

250 

251 def h5_filesize(self) -> str: 

252 return filesize_readable(self.h5_path()) 

253 

254 def fasta_filesize(self, index) -> str: 

255 return filesize_readable(self.fasta_path(index)) 

256 

257 def total_fasta_filesize(self) -> str: 

258 size = sum(self.fasta_path(i).stat().st_size for i in range(self.max_files)) 

259 return humanize.naturalsize(size) 

260 

261 def total_fasta_filesize_server_bytes(self) -> int: 

262 size = 0 

263 for index in range(self.max_files): 

264 url = self.fasta_url(index) 

265 response = requests.head(url, allow_redirects=True) 

266 size += int(response.headers.get("content-length", 0)) 

267 return size 

268 

269 def total_fasta_filesize_server(self) -> str: 

270 return humanize.naturalsize(self.total_fasta_filesize_server_bytes()) 

271 

272 def add_ncbi_datasets(self, accessions, base_dir): 

273 results = [] 

274 all_accessions = self.get_accessions() 

275 

276 with h5py.File(self.h5_path(), "a") as h5: 

277 for accession in accessions: 

278 path = base_dir / "ncbi_dataset/data" / accession 

279 for fasta_path in path.glob("*.fna"): 

280 print(fasta_path) 

281 results.extend( 

282 self.import_fasta(fasta_path, h5, show_bar=True, accessions=all_accessions, use_gzip=False) 

283 ) 

284 

285 df = pd.DataFrame(results) 

286 return df 

287 

288 def add_individual_accessions(self, accessions, email=None): 

289 results = [] 

290 all_accessions = self.get_accessions() 

291 

292 with h5py.File(self.h5_path(), "a") as h5: 

293 for accession in accessions: 

294 fasta_path = self.individual_accession_path(accession, email=email) 

295 if fasta_path: 

296 results.extend(self.import_fasta(fasta_path, h5, show_bar=False, accessions=all_accessions)) 

297 

298 df = pd.DataFrame(results) 

299 return df 

300 

301 def individual_accession_path(self, accession: str, download: bool = True, email=None) -> Path: 

302 local_path = Path(self.base_dir) / self.name / "individual" / f"{accession}.fa.gz" 

303 local_path.parent.mkdir(exist_ok=True, parents=True) 

304 db = "nucleotide" 

305 if download and not local_path.exists(): 

306 from Bio import Entrez 

307 

308 if email: 

309 Entrez.email = email 

310 else: 

311 raise Exception("no email provided") 

312 

313 print(f"Trying to download '{accession}'") 

314 try: 

315 print("trying nucleotide database") 

316 net_handle = Entrez.efetch(db="nucleotide", id=accession, rettype="fasta", retmode="text") 

317 except Exception as err: 

318 print(f'failed {err}') 

319 print("trying genome database") 

320 time.sleep(3) 

321 try: 

322 net_handle = Entrez.efetch(db="genome", id=accession, rettype="fasta", retmode="text") 

323 except Exception as err: 

324 print(f'failed {err}') 

325 print("trying nuccore database") 

326 try: 

327 net_handle = Entrez.efetch(db="nuccore", id=accession, rettype="fasta", retmode="text") 

328 except: 

329 print(f'failed {err}') 

330 return None 

331 

332 with gzip.open(local_path, "wt") as f: 

333 f.write(net_handle.read()) 

334 net_handle.close() 

335 return local_path 

336 

337 def import_fasta(self, fasta_path: Path, h5, show_bar: bool = True, accessions=None, use_gzip=True): 

338 accessions = accessions or self.get_accessions() 

339 seq_count = 0 

340 with open_fasta(fasta_path, use_gzip) as fasta: 

341 for line in fasta: 

342 if line.startswith(">"): 

343 seq_count += 1 

344 

345 is_mitochondria = self.name.lower().startswith("mitochondr") 

346 is_plastid = self.name.lower().startswith("plastid") 

347 

348 with open_fasta(fasta_path, use_gzip) as fasta: 

349 result = [] 

350 if show_bar: 

351 bar = progressbar.ProgressBar(max_value=seq_count - 1) 

352 seqs = SeqIO.parse(fasta, "fasta") 

353 for i, seq in enumerate(seqs): 

354 dataset_key = self.dataset_key(seq.name) 

355 description = seq.description.lower() 

356 

357 if not is_mitochondria and "mitochondr" in description: 

358 continue 

359 

360 if not is_plastid and ( 

361 "plastid" in description 

362 or "chloroplast" in description 

363 or "apicoplast" in description 

364 or "kinetoplast" in description 

365 ): 

366 continue 

367 

368 if not is_plastid and not is_mitochondria and "organelle" in description: 

369 continue 

370 

371 # Check if we already have this dataset. If not then add. 

372 if not seq.name in accessions: 

373 t = tensor.dna_seq_to_numpy(seq.seq) 

374 dset = h5.create_dataset( 

375 dataset_key, 

376 data=t, 

377 dtype="u1", 

378 compression="gzip", 

379 compression_opts=9, 

380 ) 

381 accessions.add(seq.name) 

382 

383 result.append(dict(category=self.name, accession=seq.name)) 

384 if i % 20 == 0: 

385 if show_bar: 

386 bar.update(i) 

387 if show_bar: 

388 bar.update(i) 

389 

390 return result