|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +""" |
| 4 | + Copyright (c) 2019, Ontario Institute for Cancer Research (OICR). |
| 5 | +
|
| 6 | + This program is free software: you can redistribute it and/or modify |
| 7 | + it under the terms of the GNU Affero General Public License as published |
| 8 | + by the Free Software Foundation, either version 3 of the License, or |
| 9 | + (at your option) any later version. |
| 10 | +
|
| 11 | + This program is distributed in the hope that it will be useful, |
| 12 | + but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 13 | + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 14 | + GNU Affero General Public License for more details. |
| 15 | +
|
| 16 | + You should have received a copy of the GNU Affero General Public License |
| 17 | + along with this program. If not, see <https://www.gnu.org/licenses/>. |
| 18 | +
|
| 19 | + Author: Junjun Zhang <junjun.zhang@oicr.on.ca> |
| 20 | + Linda Xiang <linda.xiang@oicr.on.ca> |
| 21 | + """ |
| 22 | + |
| 23 | +import os |
| 24 | +import sys |
| 25 | +import json |
| 26 | +from argparse import ArgumentParser |
| 27 | +import hashlib |
| 28 | +import uuid |
| 29 | +import subprocess |
| 30 | +import copy |
| 31 | +from datetime import date |
| 32 | + |
| 33 | +workflow_full_name = { |
| 34 | + 'dna-seq-alignment': 'DNA Seq Alignment' |
| 35 | +} |
| 36 | + |
| 37 | +def calculate_size(file_path): |
| 38 | + return os.stat(file_path).st_size |
| 39 | + |
| 40 | + |
| 41 | +def calculate_md5(file_path): |
| 42 | + md5 = hashlib.md5() |
| 43 | + with open(file_path, 'rb') as f: |
| 44 | + for chunk in iter(lambda: f.read(1024 * 1024), b''): |
| 45 | + md5.update(chunk) |
| 46 | + return md5.hexdigest() |
| 47 | + |
| 48 | + |
| 49 | +def get_rg_count(aligned_file): |
| 50 | + cmd = "samtools view -H %s | grep '^@RG' | tr '\t' '\n' | grep '^ID:'" % aligned_file |
| 51 | + |
| 52 | + try: |
| 53 | + p = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, check=True) |
| 54 | + |
| 55 | + except subprocess.CalledProcessError as e: |
| 56 | + print("Execution of 'samtools view' returned non-zero code: %s.\nStderr: %s" % \ |
| 57 | + (e.returncode, e.stderr), file=sys.stderr) |
| 58 | + sys.exit(e.returncode) |
| 59 | + |
| 60 | + except Exception as e: |
| 61 | + sys.exit("Error: %s. Unable to run 'samtools view' on aligned file: %s\n" % (e, aligned_file)) |
| 62 | + |
| 63 | + return len(p.stdout.decode('utf-8').strip().split('\n')) |
| 64 | + |
| 65 | + |
| 66 | +def rename_file(f, payload, rg_count, sample_info, date_str): |
| 67 | + experimental_strategy = payload['experiment']['experimental_strategy'].lower() |
| 68 | + |
| 69 | + if f.endswith('.bam'): |
| 70 | + file_ext = 'bam' |
| 71 | + elif f.endswith('.bam.bai'): |
| 72 | + file_ext = 'bam.bai' |
| 73 | + elif f.endswith('.cram'): |
| 74 | + file_ext = 'cram' |
| 75 | + elif f.endswith('.cram.crai'): |
| 76 | + file_ext = 'cram.crai' |
| 77 | + else: |
| 78 | + sys.exit('Error: unknown aligned seq extention: %s' % f) |
| 79 | + |
| 80 | + new_name = "%s.%s.%s.%s.%s.%s.%s" % ( |
| 81 | + payload['studyId'], |
| 82 | + sample_info[0]['donor']['donorId'], |
| 83 | + sample_info[0]['sampleId'], |
| 84 | + experimental_strategy, |
| 85 | + date_str, |
| 86 | + 'aln', |
| 87 | + file_ext |
| 88 | + ) |
| 89 | + |
| 90 | + new_dir = 'out' |
| 91 | + try: |
| 92 | + os.mkdir(new_dir) |
| 93 | + except FileExistsError: |
| 94 | + pass |
| 95 | + |
| 96 | + dst = os.path.join(os.getcwd(), new_dir, new_name) |
| 97 | + os.symlink(os.path.abspath(f), dst) |
| 98 | + |
| 99 | + return dst |
| 100 | + |
| 101 | + |
| 102 | +def get_files_info(file_to_upload): |
| 103 | + return { |
| 104 | + 'fileName': os.path.basename(file_to_upload), |
| 105 | + 'fileType': file_to_upload.split(".")[-1].upper(), |
| 106 | + 'fileSize': calculate_size(file_to_upload), |
| 107 | + 'fileMd5sum': calculate_md5(file_to_upload), |
| 108 | + 'fileAccess': 'controlled', |
| 109 | + 'dataType': 'Aligned Reads' if file_to_upload.split(".")[-1] in ('bam', 'cram') else 'Aligned Reads Index', |
| 110 | + 'info': { |
| 111 | + 'data_category': 'Sequencing Reads', |
| 112 | + 'data_subtypes': None, |
| 113 | + 'analysis_tools': ["BWA-MEM", "biobambam2:bammarkduplicates2"] |
| 114 | + } |
| 115 | + } |
| 116 | + |
| 117 | + |
| 118 | +def get_sample_info(sample_list): |
| 119 | + samples = copy.deepcopy(sample_list) |
| 120 | + for sample in samples: |
| 121 | + for item in ['info', 'sampleId', 'specimenId', 'donorId', 'studyId']: |
| 122 | + sample.pop(item, None) |
| 123 | + sample['specimen'].pop(item, None) |
| 124 | + sample['donor'].pop(item, None) |
| 125 | + |
| 126 | + return samples |
| 127 | + |
| 128 | + |
| 129 | +def main(args): |
| 130 | + with open(args.seq_experiment_analysis, 'r') as f: |
| 131 | + seq_experiment_analysis_dict = json.load(f) |
| 132 | + |
| 133 | + payload = { |
| 134 | + 'analysisType': { |
| 135 | + 'name': 'sequencing_alignment' |
| 136 | + }, |
| 137 | + 'studyId': seq_experiment_analysis_dict.get('studyId'), |
| 138 | + 'info': {}, |
| 139 | + 'workflow': { |
| 140 | + 'workflow_name': workflow_full_name.get(args.wf_name, args.wf_name), |
| 141 | + 'workflow_version': args.wf_version, |
| 142 | + 'genome_build': 'GRCh38_hla_decoy_ebv', |
| 143 | + 'run_id': args.wf_run, |
| 144 | + 'session_id': args.wf_session, |
| 145 | + 'inputs': [ |
| 146 | + { |
| 147 | + 'analysis_type': 'sequencing_experiment', |
| 148 | + 'input_analysis_id': seq_experiment_analysis_dict.get('analysisId') |
| 149 | + } |
| 150 | + ] |
| 151 | + }, |
| 152 | + 'files': [], |
| 153 | + 'samples': get_sample_info(seq_experiment_analysis_dict.get('samples')), |
| 154 | + 'experiment': {}, |
| 155 | + 'read_group_count': seq_experiment_analysis_dict.get('read_group_count'), |
| 156 | + 'read_groups': seq_experiment_analysis_dict.get('read_groups') |
| 157 | + } |
| 158 | + |
| 159 | + # pass `info` dict from seq_experiment payload to new payload |
| 160 | + if 'info' in seq_experiment_analysis_dict and isinstance(seq_experiment_analysis_dict['info'], dict): |
| 161 | + payload['info'] = seq_experiment_analysis_dict['info'] |
| 162 | + else: |
| 163 | + payload.pop('info') |
| 164 | + |
| 165 | + payload['experiment'].update(seq_experiment_analysis_dict.get('experiment', {})) |
| 166 | + |
| 167 | + if 'library_strategy' in payload['experiment']: |
| 168 | + experimental_strategy = payload['experiment'].pop('library_strategy') |
| 169 | + payload['experiment']['experimental_strategy'] = experimental_strategy |
| 170 | + |
| 171 | + # get inputs from read_group_ubam_analysis |
| 172 | + for ubam_analysis in args.read_group_ubam_analysis: |
| 173 | + with open(ubam_analysis, 'r') as f: |
| 174 | + ubam_analysis_dict = json.load(f) |
| 175 | + |
| 176 | + payload['workflow']['inputs'].append( |
| 177 | + { |
| 178 | + 'analysis_type': 'read_group_ubam', |
| 179 | + 'input_analysis_id': ubam_analysis_dict.get('analysisId') |
| 180 | + } |
| 181 | + ) |
| 182 | + |
| 183 | + # get number of read groups from aligned seq file |
| 184 | + aligned_file = [ f for f in args.files_to_upload if (f.endswith('.bam') or f.endswith('.cram')) ][0] |
| 185 | + rg_count = get_rg_count(aligned_file) |
| 186 | + |
| 187 | + # get file of the payload |
| 188 | + date_str = date.today().strftime("%Y%m%d") |
| 189 | + for f in args.files_to_upload: |
| 190 | + renamed_file = rename_file(f, payload, rg_count, seq_experiment_analysis_dict['samples'], date_str) |
| 191 | + payload['files'].append(get_files_info(renamed_file)) |
| 192 | + |
| 193 | + with open("%s.dna_alignment.payload.json" % str(uuid.uuid4()), 'w') as f: |
| 194 | + f.write(json.dumps(payload, indent=2)) |
| 195 | + |
| 196 | + |
| 197 | +if __name__ == "__main__": |
| 198 | + parser = ArgumentParser() |
| 199 | + parser.add_argument("-f", "--files_to_upload", dest="files_to_upload", type=str, required=True, |
| 200 | + nargs="+", help="Aligned reads files to upload") |
| 201 | + parser.add_argument("-a", "--seq_experiment_analysis", dest="seq_experiment_analysis", required=True, |
| 202 | + help="Input analysis for sequencing experiment", type=str) |
| 203 | + parser.add_argument("-u", "--read_group_ubam_analysis", dest="read_group_ubam_analysis", default=[], |
| 204 | + help="Input payloads for the analysis", type=str, nargs='+') |
| 205 | + parser.add_argument("-w", "--wf_name", dest="wf_name", required=True, help="Workflow name") |
| 206 | + parser.add_argument("-v", "--wf_version", dest="wf_version", required=True, help="Workflow version") |
| 207 | + parser.add_argument("-r", "--wf_run", dest="wf_run", required=True, help="workflow run ID") |
| 208 | + parser.add_argument("-s", "--wf_session", dest="wf_session", required=True, help="workflow session ID") |
| 209 | + args = parser.parse_args() |
| 210 | + |
| 211 | + main(args) |
0 commit comments