Coverage for crunch/client/run.py: 99.28%
139 statements
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-01 13:43 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2023-10-01 13:43 +0000
1from typing import Dict, Union
2from datetime import datetime
3import requests
4from functools import cached_property
5from pathlib import Path
6import traceback
7import subprocess
8import json
9import shutil
10from django.core.files.storage import DefaultStorage
12from crunch.django.app.enums import Stage, State
13from crunch.django.app import storages
14from rich.console import Console
16console = Console()
18from . import utils
19from .connections import Connection
20from .enums import WorkflowType, RunResult
22STAGE_STYLE = "bold red"
24class Run():
25 """
26 An object to manage processing a crunch dataset.
27 """
28 def __init__(
29 self,
30 connection:Connection,
31 dataset_slug:str,
32 storage_settings:Union[Dict,Path],
33 working_directory:Path,
34 workflow_type:WorkflowType,
35 workflow_path:Path=None,
36 download_from_storage:bool=True,
37 upload_to_storage:bool=True,
38 cleanup:bool=False,
39 cores:str="1",
40 ):
41 self.connection = connection
42 self.dataset_slug = dataset_slug
44 if not self.dataset_slug:
45 raise ValueError("Please specifiy dataset.")
47 self.dataset_data = connection.get_json_response(f"/api/datasets/{dataset_slug}/")
48 self.workflow_type = workflow_type
49 self.workflow_path = workflow_path
50 self.cores = cores
51 self.storage_settings = storage_settings
52 self.setup_md5_checksums = dict()
53 self.download_from_storage = download_from_storage
54 self.upload_to_storage = upload_to_storage
55 self.cleanup = cleanup
57 # TODO raise exception
58 assert self.dataset_data["slug"] == dataset_slug
59 self.project = self.dataset_data["parent"]
60 self.dataset_id = self.dataset_data["id"]
61 self.base_file_path = self.dataset_data["base_file_path"]
63 my_working_directory = Path(working_directory, self.dataset_data["slug"].replace(":", "--"))
64 my_working_directory.mkdir(exist_ok=True, parents=True)
65 self.working_directory = my_working_directory
67 def send_status(self, state, note:str="") -> requests.Response:
68 """ Sends a status update about the processing of this dataset. """
69 return self.connection.send_status(
70 self.dataset_id,
71 stage=self.current_stage,
72 state=state,
73 note=note,
74 )
76 @cached_property
77 def crunch_subdir(self) -> Path:
78 """ Returns the path to the .crunch subdirectory in the working directory for this dataset. """
79 crunch_subdir = self.working_directory/".crunch"
80 crunch_subdir.mkdir(exist_ok=True, parents=True)
81 return crunch_subdir
83 @cached_property
84 def storage(self) -> DefaultStorage:
85 """ Gets the default storage object. """
86 return storages.get_storage_with_settings(self.storage_settings)
88 def setup(self) -> RunResult:
89 """
90 Sets up this dataset for processing.
92 This involves:
94 - Copying the initial data from storage
95 - Saving the MD5 checksums for all the initial data in ``.crunch/setup_md5_checksums.json``
96 - Saves the metadata for the dataset in ``.crunch/dataset.json``
97 - Saves the metadata for the project in ``.crunch/project.json``
98 - Creates the script to run the workflow (either a bash script or a Snakefile for Snakemake)
101 Returns:
102 RunResult: Whether or not this stage was successful.
103 """
104 self.current_stage = Stage.SETUP
105 console.print(f"Setup stage {self.dataset_slug}", style=STAGE_STYLE)
106 try:
107 self.send_status(State.START)
109 # Pull data from storage
110 if self.download_from_storage:
111 storages.copy_recursive_from_storage(
112 self.base_file_path, self.working_directory, storage=self.storage
113 )
114 self.setup_md5_checksums = utils.md5_checksums(self.working_directory)
115 with open(self.crunch_subdir / "setup_md5_checksums.json", "w", encoding="utf-8") as f:
116 json.dump(self.setup_md5_checksums, f, ensure_ascii=False, indent=4)
118 # TODO check to see if dataset.json already exists
119 with open(self.crunch_subdir / "dataset.json", "w", encoding="utf-8") as f:
120 json.dump(self.dataset_data, f, ensure_ascii=False, indent=4)
122 # get project details
123 project_data = self.connection.get_json_response(f"/api/projects/{self.project}/")
124 # TODO raise exception
125 assert project_data["slug"] == self.project
127 # TODO check to see if project.json already exists
128 with open(self.crunch_subdir / "project.json", "w", encoding="utf-8") as f:
129 json.dump(project_data, f, ensure_ascii=False, indent=4)
131 # get snakefile or script
132 if not self.workflow_path:
133 self.workflow_path = utils.write_workflow(
134 project_data["workflow"],
135 working_directory=self.crunch_subdir,
136 workflow_type=self.workflow_type,
137 )
139 self.send_status(State.SUCCESS)
140 console.print(f"Setup success {self.dataset_slug}", style=STAGE_STYLE)
141 except Exception as e:
142 console.print(f"Setup failed {self.dataset_slug}: {e}", style=STAGE_STYLE)
143 self.send_status(State.FAIL, note=str(e))
144 return RunResult.FAIL
146 return RunResult.SUCCESS
148 def workflow(self) -> RunResult:
149 """
150 Runs the workflow on a dataset that has been set up.
152 This involves running a bash script as a subprocess or running Snakemake with a Snakefile.
154 Returns:
155 RunResult: Whether or not this stage was successful.
156 """
157 self.current_stage = Stage.WORKFLOW
158 console.print(f"Worlflow stage {self.dataset_slug}", style=STAGE_STYLE)
159 try:
160 self.send_status(State.START)
161 if self.workflow_type == WorkflowType.snakemake:
162 import snakemake
164 args = [
165 f"--snakefile={self.workflow_path}",
166 "--use-conda",
167 f"--cores={self.cores}",
168 f"--directory={self.working_directory}",
169 f"--conda-frontend={utils.conda_frontend()}"
170 ]
172 try:
173 snakemake.main(args)
174 except SystemExit as result:
175 print(f"result {result}")
176 elif self.workflow_type == WorkflowType.script:
177 result = subprocess.run(f"{self.workflow_path.resolve()}", capture_output=True, cwd=self.working_directory)
178 if result.returncode:
179 raise ChildProcessError(result.stderr.decode("utf-8"))
181 self.send_status(State.SUCCESS)
182 console.print(f"Workflow success {self.dataset_slug}", style=STAGE_STYLE)
183 except Exception as e:
184 console.print(f"Workflow failed {self.dataset_slug}: {e}", style=STAGE_STYLE)
185 self.send_status(State.FAIL, note=str(e))
186 return RunResult.FAIL
188 return RunResult.SUCCESS
190 def upload(self) -> RunResult:
191 """
192 Uploads new or modified files to the storage for the dataset.
194 It also creates the following files:
195 - .crunch/upload_md5_checksums.json which lists all MD5 checksums after the dataset has finished.
196 - .crunch/deleted.txt which lists all files that were present after setup but which were deleted as the workflow ran.
198 Returns:
199 RunResult: Whether or not this stage was successful.
200 """
201 self.current_stage = Stage.UPLOAD
202 console.print(f"Upload stage {self.dataset_slug}", style=STAGE_STYLE)
203 try:
204 self.send_status(State.START)
206 if self.upload_to_storage:
207 # calculate checksums
208 self.upload_md5_checksums = utils.md5_checksums(self.working_directory)
209 upload_md5_checksums_path = self.crunch_subdir / "upload_md5_checksums.json"
210 with open(upload_md5_checksums_path, "w", encoding="utf-8") as f:
211 json.dump(self.upload_md5_checksums, f, ensure_ascii=False, indent=4)
213 setup_files = set(self.setup_md5_checksums.keys())
214 upload_files = set(self.upload_md5_checksums.keys())
215 new_files = upload_files - setup_files
216 deleted_files = setup_files - upload_files
217 remaining_files = setup_files.intersection(upload_files)
218 modified_files = set(
219 file for file in remaining_files
220 if self.upload_md5_checksums[file] != self.setup_md5_checksums[file]
221 )
222 deleted_log_path = self.crunch_subdir / "deleted.txt"
223 deleted_log_path.write_text("\n".join(deleted_files))
225 files_to_upload = modified_files | new_files | set([upload_md5_checksums_path, deleted_log_path])
226 paths_to_upload = [self.working_directory/file for file in files_to_upload]
228 storages.copy_to_storage(
229 paths_to_upload,
230 local_dir=self.working_directory,
231 base=self.base_file_path,
232 storage=self.storage,
233 )
235 # Option to delete on remote storage?
236 if self.cleanup:
237 shutil.rmtree(self.working_directory)
239 self.send_status(State.SUCCESS)
240 console.print(f"Upload success {self.dataset_slug}", style=STAGE_STYLE)
241 except Exception as e:
242 console.print(f"Upload failed {self.dataset_slug}: {e}", style=STAGE_STYLE)
243 traceback.print_exc()
244 self.send_status(State.FAIL, note=str(e))
245 return RunResult.FAIL
247 return RunResult.SUCCESS
249 def __call__(self) -> RunResult:
250 console.print(f"Processing '{self.dataset_slug}'.")
251 self.setup_result = self.setup()
252 if self.setup_result:
253 return self.setup_result
255 self.workflow_result = self.workflow()
256 if self.workflow_result:
257 return self.workflow_result
259 self.upload_result = self.upload()
260 return self.upload_result