Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from zlib import adler32
2import time
3from dataclasses import dataclass
4from os import access
5import urllib.request
6import requests
7import humanize
8import sys
9import gzip
10from Bio import SeqIO
11import re
12import progressbar
13import h5py
14import pandas as pd
15import asyncio
16import httpx
17from appdirs import user_data_dir
18from bs4 import BeautifulSoup
20from pathlib import Path
21from . import tensor
24def open_fasta(fasta_path, use_gzip):
25 if use_gzip:
26 return gzip.open(fasta_path, "rt")
27 return open(fasta_path, "rt")
30REFSEQ_CATEGORIES = [
31 "archaea", # prokaryotic
32 "bacteria", # prokaryotic
33 "fungi", # eukaryotic
34 "invertebrate", # eukaryotic
35 "mitochondrion", # organellar
36 "plant", # eukaryotic
37 "plasmid",
38 "plastid", # organellar
39 "protozoa", # eukaryotic
40 "vertebrate_mammalian", # eukaryotic
41 "vertebrate_other", # eukaryotic
42 "viral", # viral
43]
45PROKARYOTIC = (
46 "archaea",
47 "bacteria",
48)
49EUKARYOTIC = (
50 "fungi",
51 "invertebrate",
52 "plant",
53 "protozoa",
54 "vertebrate_mammalian",
55 "vertebrate_other",
56)
57ORGANELLAR = (
58 "mitochondrion",
59 "plastid",
60)
63def root_dir():
64 """
65 Returns the path to the root directory of this project.
67 This is useful for finding the data directory.
68 """
69 return Path(__file__).parent.resolve()
72def global_data_dir():
73 """Returns the path to the directory to hold all the data from the VBA website."""
74 return root_dir() / "data"
77def filesize_readable(path: (str, Path)) -> str:
78 path = Path(path)
79 if not path.exists():
80 return f"File {path} does not exist."
81 return humanize.naturalsize(path.stat().st_size)
84@dataclass
85class RefSeqCategory:
86 name: str
87 max_files: int = None
88 base_dir: str = global_data_dir()
90 def __getstate__(self):
91 # Only returns required elements
92 # Needed because h5 files cannot be pickled
93 return dict(name=self.name, max_files=self.max_files, base_dir=self.base_dir)
95 def base_url(self):
96 return f"https://ftp.ncbi.nlm.nih.gov/refseq/release/{self.name}"
98 def filename(self, index) -> str:
99 """The filename in the RefSeq database for this index."""
100 if self.max_files:
101 assert index < self.max_files
103 return f"{self.name}.{index+1}.1.genomic.fna.gz"
105 def fasta_url(self, index: int) -> str:
106 """The url for the fasta file for this index online."""
107 return f"{self.base_url()}/{self.filename(index)}"
109 def index_html(self):
110 index_path = Path(user_data_dir()) / f"{self.name}.index.html"
111 if not index_path.exists():
112 url = self.base_url()
113 print("Downloading:", url)
114 urllib.request.urlretrieve(url, index_path)
115 with open(index_path, 'r') as f:
116 contents = f.read()
117 soup = BeautifulSoup(contents, 'html.parser')
118 return soup
120 async def download(self, index: int):
121 local_path = self.fasta_path(index, download=False)
123 if not local_path.exists():
124 limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
125 async with httpx.AsyncClient(limits=limits) as client:
126 url = self.fasta_url(index)
127 print(f"downloading {url}")
128 response = await client.get(url)
129 open(local_path, 'wb').write(response.content)
130 print(f"done {local_path}")
131 return local_path
133 async def download_all_async(self):
134 paths = []
135 max_files = self.max_files_available()
136 if self.max_files:
137 max_files = min(max_files, self.max_files)
138 print(f"max_files = {max_files}")
139 for index in range(max_files):
140 paths.append(self.download(index))
141 await asyncio.gather(*paths)
143 def download_all(self):
144 max_files = self.max_files_available()
145 if self.max_files:
146 max_files = min(max_files, self.max_files)
147 print(f"max_files = {max_files}")
148 for index in range(max_files):
149 print(f"{self.name} - {index}")
150 try:
151 self.fasta_path(index, download=True)
152 except Exception:
153 print(f"failed to download. {self.name} file {index}")
155 def fasta_path(self, index: int, download=True) -> Path:
156 """
157 Returns the local path for the file at this index.
159 If the file does not exist already in the base_dir then it is downloaded.
160 """
161 local_path = Path(self.base_dir) / self.name / self.filename(index)
162 local_path.parent.mkdir(exist_ok=True, parents=True)
163 if download and not local_path.exists():
164 url = self.fasta_url(index)
165 print("Downloading:", url)
166 urllib.request.urlretrieve(url, local_path)
167 return local_path
169 def h5_path(self):
170 base_dir = Path(self.base_dir)
171 base_dir.mkdir(exist_ok=True, parents=True)
172 return base_dir / f"{self.name}.h5"
174 def fasta_seq_count(self, index: int) -> int:
175 fasta_path = self.fasta_path(index)
176 seq_count = 0
177 with gzip.open(fasta_path, "rt") as fasta:
178 for line in fasta:
179 if line.startswith(">"):
180 seq_count += 1
181 return seq_count
183 def get_seq(self, accession):
184 if not hasattr(self, 'read_h5'):
185 self.read_h5 = h5py.File(self.h5_path(), "r")
186 try:
187 return self.read_h5[self.dataset_key(accession)]
188 except Exception:
189 raise Exception(f"Failed to read {accession} in {self.name}")
190 # print(f"Failed to read {accession} in {self.name}")
191 # return []
193 def dataset_key(self, accession):
194 # Using adler32 for a fast deterministic hash
195 accession_hash = str(adler32(accession.encode('ascii')))
196 return f"/{accession_hash[-6:-3]}/{accession_hash[-3:]}/{accession}"
198 def max_files_available(self):
199 max_files = 0
200 soup = self.index_html()
201 for link in soup.findAll("a"):
202 m = re.match(self.name + r"\.(\d+)\.1\.genomic\.fna\.gz", link.get("href"))
203 if m:
204 max_files = max(int(m.group(1)), max_files)
205 return max_files
207 def get_accessions(self):
208 accessions = set()
210 with h5py.File(self.h5_path(), "a") as h5:
211 for key0 in h5.keys():
212 for key1 in h5[f"/{key0}"].keys():
213 dir_accessions = h5[f"/{key0}/{key1}"].keys()
214 # for accession in dir_accessions:
215 # print(accession)
216 accessions.update(dir_accessions)
217 return accessions
219 def write_h5(self, show_bar=True, file_indexes=None):
220 result = []
221 if not sys.stdout.isatty():
222 show_bar = False
224 if file_indexes is None or len(file_indexes) == 0:
225 max_files = self.max_files_available()
226 if self.max_files:
227 max_files = min(max_files, self.max_files)
229 file_indexes = range(max_files)
231 accessions = self.get_accessions()
232 print(f"{len(accessions)} sequences already in HDF5 file for {self.name}.")
233 with h5py.File(self.h5_path(), "a") as h5:
234 for file_index in file_indexes:
235 print(f"Preprocessing file {file_index} from {self.name}", flush=True)
236 # Try to get next file
237 try:
238 fasta_path = self.fasta_path(file_index)
239 seq_count = self.fasta_seq_count(file_index)
240 except Exception:
241 # If it fails, then assume it doesn't exist and exit
242 print(f"Fasta file at index {file_index} for {self.name} not found.")
243 continue
245 result.extend(self.import_fasta(fasta_path, h5, show_bar=True))
246 print()
248 df = pd.DataFrame(result)
249 return df
251 def h5_filesize(self) -> str:
252 return filesize_readable(self.h5_path())
254 def fasta_filesize(self, index) -> str:
255 return filesize_readable(self.fasta_path(index))
257 def total_fasta_filesize(self) -> str:
258 size = sum(self.fasta_path(i).stat().st_size for i in range(self.max_files))
259 return humanize.naturalsize(size)
261 def total_fasta_filesize_server_bytes(self) -> int:
262 size = 0
263 for index in range(self.max_files):
264 url = self.fasta_url(index)
265 response = requests.head(url, allow_redirects=True)
266 size += int(response.headers.get("content-length", 0))
267 return size
269 def total_fasta_filesize_server(self) -> str:
270 return humanize.naturalsize(self.total_fasta_filesize_server_bytes())
272 def add_ncbi_datasets(self, accessions, base_dir):
273 results = []
274 all_accessions = self.get_accessions()
276 with h5py.File(self.h5_path(), "a") as h5:
277 for accession in accessions:
278 path = base_dir / "ncbi_dataset/data" / accession
279 for fasta_path in path.glob("*.fna"):
280 print(fasta_path)
281 results.extend(
282 self.import_fasta(fasta_path, h5, show_bar=True, accessions=all_accessions, use_gzip=False)
283 )
285 df = pd.DataFrame(results)
286 return df
288 def add_individual_accessions(self, accessions, email=None):
289 results = []
290 all_accessions = self.get_accessions()
292 with h5py.File(self.h5_path(), "a") as h5:
293 for accession in accessions:
294 fasta_path = self.individual_accession_path(accession, email=email)
295 if fasta_path:
296 results.extend(self.import_fasta(fasta_path, h5, show_bar=False, accessions=all_accessions))
298 df = pd.DataFrame(results)
299 return df
301 def individual_accession_path(self, accession: str, download: bool = True, email=None) -> Path:
302 local_path = Path(self.base_dir) / self.name / "individual" / f"{accession}.fa.gz"
303 local_path.parent.mkdir(exist_ok=True, parents=True)
304 db = "nucleotide"
305 if download and not local_path.exists():
306 from Bio import Entrez
308 if email:
309 Entrez.email = email
310 else:
311 raise Exception("no email provided")
313 print(f"Trying to download '{accession}'")
314 try:
315 print("trying nucleotide database")
316 net_handle = Entrez.efetch(db="nucleotide", id=accession, rettype="fasta", retmode="text")
317 except Exception as err:
318 print(f'failed {err}')
319 print("trying genome database")
320 time.sleep(3)
321 try:
322 net_handle = Entrez.efetch(db="genome", id=accession, rettype="fasta", retmode="text")
323 except Exception as err:
324 print(f'failed {err}')
325 print("trying nuccore database")
326 try:
327 net_handle = Entrez.efetch(db="nuccore", id=accession, rettype="fasta", retmode="text")
328 except:
329 print(f'failed {err}')
330 return None
332 with gzip.open(local_path, "wt") as f:
333 f.write(net_handle.read())
334 net_handle.close()
335 return local_path
337 def import_fasta(self, fasta_path: Path, h5, show_bar: bool = True, accessions=None, use_gzip=True):
338 accessions = accessions or self.get_accessions()
339 seq_count = 0
340 with open_fasta(fasta_path, use_gzip) as fasta:
341 for line in fasta:
342 if line.startswith(">"):
343 seq_count += 1
345 is_mitochondria = self.name.lower().startswith("mitochondr")
346 is_plastid = self.name.lower().startswith("plastid")
348 with open_fasta(fasta_path, use_gzip) as fasta:
349 result = []
350 if show_bar:
351 bar = progressbar.ProgressBar(max_value=seq_count - 1)
352 seqs = SeqIO.parse(fasta, "fasta")
353 for i, seq in enumerate(seqs):
354 dataset_key = self.dataset_key(seq.name)
355 description = seq.description.lower()
357 if not is_mitochondria and "mitochondr" in description:
358 continue
360 if not is_plastid and (
361 "plastid" in description
362 or "chloroplast" in description
363 or "apicoplast" in description
364 or "kinetoplast" in description
365 ):
366 continue
368 if not is_plastid and not is_mitochondria and "organelle" in description:
369 continue
371 # Check if we already have this dataset. If not then add.
372 if not seq.name in accessions:
373 t = tensor.dna_seq_to_numpy(seq.seq)
374 dset = h5.create_dataset(
375 dataset_key,
376 data=t,
377 dtype="u1",
378 compression="gzip",
379 compression_opts=9,
380 )
381 accessions.add(seq.name)
383 result.append(dict(category=self.name, accession=seq.name))
384 if i % 20 == 0:
385 if show_bar:
386 bar.update(i)
387 if show_bar:
388 bar.update(i)
390 return result