Coverage for crunch/client/run.py: 99.28%

139 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2023-10-01 13:43 +0000

1from typing import Dict, Union 

2from datetime import datetime 

3import requests 

4from functools import cached_property 

5from pathlib import Path 

6import traceback 

7import subprocess 

8import json 

9import shutil 

10from django.core.files.storage import DefaultStorage 

11 

12from crunch.django.app.enums import Stage, State 

13from crunch.django.app import storages 

14from rich.console import Console 

15 

16console = Console() 

17 

18from . import utils 

19from .connections import Connection 

20from .enums import WorkflowType, RunResult 

21 

22STAGE_STYLE = "bold red" 

23 

24class Run(): 

25 """ 

26 An object to manage processing a crunch dataset. 

27 """ 

28 def __init__( 

29 self, 

30 connection:Connection, 

31 dataset_slug:str, 

32 storage_settings:Union[Dict,Path], 

33 working_directory:Path, 

34 workflow_type:WorkflowType, 

35 workflow_path:Path=None, 

36 download_from_storage:bool=True, 

37 upload_to_storage:bool=True, 

38 cleanup:bool=False, 

39 cores:str="1", 

40 ): 

41 self.connection = connection 

42 self.dataset_slug = dataset_slug 

43 

44 if not self.dataset_slug: 

45 raise ValueError("Please specifiy dataset.") 

46 

47 self.dataset_data = connection.get_json_response(f"/api/datasets/{dataset_slug}/") 

48 self.workflow_type = workflow_type 

49 self.workflow_path = workflow_path 

50 self.cores = cores 

51 self.storage_settings = storage_settings 

52 self.setup_md5_checksums = dict() 

53 self.download_from_storage = download_from_storage 

54 self.upload_to_storage = upload_to_storage 

55 self.cleanup = cleanup 

56 

57 # TODO raise exception 

58 assert self.dataset_data["slug"] == dataset_slug 

59 self.project = self.dataset_data["parent"] 

60 self.dataset_id = self.dataset_data["id"] 

61 self.base_file_path = self.dataset_data["base_file_path"] 

62 

63 my_working_directory = Path(working_directory, self.dataset_data["slug"].replace(":", "--")) 

64 my_working_directory.mkdir(exist_ok=True, parents=True) 

65 self.working_directory = my_working_directory 

66 

67 def send_status(self, state, note:str="") -> requests.Response: 

68 """ Sends a status update about the processing of this dataset. """ 

69 return self.connection.send_status( 

70 self.dataset_id, 

71 stage=self.current_stage, 

72 state=state, 

73 note=note, 

74 ) 

75 

76 @cached_property 

77 def crunch_subdir(self) -> Path: 

78 """ Returns the path to the .crunch subdirectory in the working directory for this dataset. """ 

79 crunch_subdir = self.working_directory/".crunch" 

80 crunch_subdir.mkdir(exist_ok=True, parents=True) 

81 return crunch_subdir 

82 

83 @cached_property 

84 def storage(self) -> DefaultStorage: 

85 """ Gets the default storage object. """ 

86 return storages.get_storage_with_settings(self.storage_settings) 

87 

88 def setup(self) -> RunResult: 

89 """  

90 Sets up this dataset for processing. 

91  

92 This involves: 

93 

94 - Copying the initial data from storage 

95 - Saving the MD5 checksums for all the initial data in ``.crunch/setup_md5_checksums.json`` 

96 - Saves the metadata for the dataset in ``.crunch/dataset.json`` 

97 - Saves the metadata for the project in ``.crunch/project.json`` 

98 - Creates the script to run the workflow (either a bash script or a Snakefile for Snakemake) 

99 

100 

101 Returns: 

102 RunResult: Whether or not this stage was successful. 

103 """ 

104 self.current_stage = Stage.SETUP 

105 console.print(f"Setup stage {self.dataset_slug}", style=STAGE_STYLE) 

106 try: 

107 self.send_status(State.START) 

108 

109 # Pull data from storage 

110 if self.download_from_storage: 

111 storages.copy_recursive_from_storage( 

112 self.base_file_path, self.working_directory, storage=self.storage 

113 ) 

114 self.setup_md5_checksums = utils.md5_checksums(self.working_directory) 

115 with open(self.crunch_subdir / "setup_md5_checksums.json", "w", encoding="utf-8") as f: 

116 json.dump(self.setup_md5_checksums, f, ensure_ascii=False, indent=4) 

117 

118 # TODO check to see if dataset.json already exists 

119 with open(self.crunch_subdir / "dataset.json", "w", encoding="utf-8") as f: 

120 json.dump(self.dataset_data, f, ensure_ascii=False, indent=4) 

121 

122 # get project details 

123 project_data = self.connection.get_json_response(f"/api/projects/{self.project}/") 

124 # TODO raise exception 

125 assert project_data["slug"] == self.project 

126 

127 # TODO check to see if project.json already exists 

128 with open(self.crunch_subdir / "project.json", "w", encoding="utf-8") as f: 

129 json.dump(project_data, f, ensure_ascii=False, indent=4) 

130 

131 # get snakefile or script 

132 if not self.workflow_path: 

133 self.workflow_path = utils.write_workflow( 

134 project_data["workflow"], 

135 working_directory=self.crunch_subdir, 

136 workflow_type=self.workflow_type, 

137 ) 

138 

139 self.send_status(State.SUCCESS) 

140 console.print(f"Setup success {self.dataset_slug}", style=STAGE_STYLE) 

141 except Exception as e: 

142 console.print(f"Setup failed {self.dataset_slug}: {e}", style=STAGE_STYLE) 

143 self.send_status(State.FAIL, note=str(e)) 

144 return RunResult.FAIL 

145 

146 return RunResult.SUCCESS 

147 

148 def workflow(self) -> RunResult: 

149 """  

150 Runs the workflow on a dataset that has been set up. 

151  

152 This involves running a bash script as a subprocess or running Snakemake with a Snakefile. 

153 

154 Returns: 

155 RunResult: Whether or not this stage was successful. 

156 """ 

157 self.current_stage = Stage.WORKFLOW 

158 console.print(f"Worlflow stage {self.dataset_slug}", style=STAGE_STYLE) 

159 try: 

160 self.send_status(State.START) 

161 if self.workflow_type == WorkflowType.snakemake: 

162 import snakemake 

163 

164 args = [ 

165 f"--snakefile={self.workflow_path}", 

166 "--use-conda", 

167 f"--cores={self.cores}", 

168 f"--directory={self.working_directory}", 

169 f"--conda-frontend={utils.conda_frontend()}" 

170 ] 

171 

172 try: 

173 snakemake.main(args) 

174 except SystemExit as result: 

175 print(f"result {result}") 

176 elif self.workflow_type == WorkflowType.script: 

177 result = subprocess.run(f"{self.workflow_path.resolve()}", capture_output=True, cwd=self.working_directory) 

178 if result.returncode: 

179 raise ChildProcessError(result.stderr.decode("utf-8")) 

180 

181 self.send_status(State.SUCCESS) 

182 console.print(f"Workflow success {self.dataset_slug}", style=STAGE_STYLE) 

183 except Exception as e: 

184 console.print(f"Workflow failed {self.dataset_slug}: {e}", style=STAGE_STYLE) 

185 self.send_status(State.FAIL, note=str(e)) 

186 return RunResult.FAIL 

187 

188 return RunResult.SUCCESS 

189 

190 def upload(self) -> RunResult: 

191 """ 

192 Uploads new or modified files to the storage for the dataset. 

193 

194 It also creates the following files: 

195 - .crunch/upload_md5_checksums.json which lists all MD5 checksums after the dataset has finished. 

196 - .crunch/deleted.txt which lists all files that were present after setup but which were deleted as the workflow ran. 

197 

198 Returns: 

199 RunResult: Whether or not this stage was successful. 

200 """ 

201 self.current_stage = Stage.UPLOAD 

202 console.print(f"Upload stage {self.dataset_slug}", style=STAGE_STYLE) 

203 try: 

204 self.send_status(State.START) 

205 

206 if self.upload_to_storage: 

207 # calculate checksums 

208 self.upload_md5_checksums = utils.md5_checksums(self.working_directory) 

209 upload_md5_checksums_path = self.crunch_subdir / "upload_md5_checksums.json" 

210 with open(upload_md5_checksums_path, "w", encoding="utf-8") as f: 

211 json.dump(self.upload_md5_checksums, f, ensure_ascii=False, indent=4) 

212 

213 setup_files = set(self.setup_md5_checksums.keys()) 

214 upload_files = set(self.upload_md5_checksums.keys()) 

215 new_files = upload_files - setup_files 

216 deleted_files = setup_files - upload_files 

217 remaining_files = setup_files.intersection(upload_files) 

218 modified_files = set( 

219 file for file in remaining_files 

220 if self.upload_md5_checksums[file] != self.setup_md5_checksums[file] 

221 ) 

222 deleted_log_path = self.crunch_subdir / "deleted.txt" 

223 deleted_log_path.write_text("\n".join(deleted_files)) 

224 

225 files_to_upload = modified_files | new_files | set([upload_md5_checksums_path, deleted_log_path]) 

226 paths_to_upload = [self.working_directory/file for file in files_to_upload] 

227 

228 storages.copy_to_storage( 

229 paths_to_upload, 

230 local_dir=self.working_directory, 

231 base=self.base_file_path, 

232 storage=self.storage, 

233 ) 

234 

235 # Option to delete on remote storage? 

236 if self.cleanup: 

237 shutil.rmtree(self.working_directory) 

238 

239 self.send_status(State.SUCCESS) 

240 console.print(f"Upload success {self.dataset_slug}", style=STAGE_STYLE) 

241 except Exception as e: 

242 console.print(f"Upload failed {self.dataset_slug}: {e}", style=STAGE_STYLE) 

243 traceback.print_exc() 

244 self.send_status(State.FAIL, note=str(e)) 

245 return RunResult.FAIL 

246 

247 return RunResult.SUCCESS 

248 

249 def __call__(self) -> RunResult: 

250 console.print(f"Processing '{self.dataset_slug}'.") 

251 self.setup_result = self.setup() 

252 if self.setup_result: 

253 return self.setup_result 

254 

255 self.workflow_result = self.workflow() 

256 if self.workflow_result: 

257 return self.workflow_result 

258 

259 self.upload_result = self.upload() 

260 return self.upload_result 

261