Skip to content

Commit df7062e

Browse files
committed
add tool, test data and check.nf
1 parent a5ac296 commit df7062e

24 files changed

Lines changed: 1028 additions & 52 deletions

payload-gen-qc/Dockerfile

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,19 @@ FROM python:3.7.5-slim-buster
22

33
LABEL org.opencontainers.image.source https://github.com/icgc-argo-workflows/data-processing-utility-tools
44

5+
RUN apt-get update && apt-get install -y procps
6+
7+
RUN groupadd -g 1000 ubuntu && \
8+
useradd -l -u 1000 -g ubuntu ubuntu && \
9+
install -d -m 0755 -o ubuntu -g ubuntu /home/ubuntu
10+
511
ENV PATH="/tools:${PATH}"
612

713
COPY *.py /tools/
814

9-
ENTRYPOINT ["/usr/bin/env"]
15+
WORKDIR /tools
1016

17+
USER ubuntu
18+
19+
ENTRYPOINT ["/usr/bin/env"]
1120
CMD ["/bin/bash"]

payload-gen-qc/main.nf

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,32 +44,46 @@ params.publish_dir = "" // set to empty string will disable publishDir
4444

4545

4646
// tool specific parmas go here, add / change as needed
47-
params.input_file = ""
48-
params.output_pattern = "*" // output file name pattern
47+
params.files_to_upload = ""
48+
params.metadata_analysis = ""
49+
params.wf_name = ""
50+
params.wf_version = ""
51+
params.genome_annotation = ""
52+
params.genome_build = ""
4953

5054

5155
process payloadGenQc {
5256
container "${params.container ?: container[params.container_registry ?: default_container_registry]}:${params.container_version ?: version}"
53-
publishDir "${params.publish_dir}/${task.process.replaceAll(':', '_')}", mode: "copy", enabled: params.publish_dir
57+
publishDir "${params.publish_dir}/${task.process.replaceAll(':', '_')}", mode: "copy", enabled: params.publish_dir ? true : false
5458

5559
cpus params.cpus
5660
memory "${params.mem} GB"
5761

5862
input: // input, make update as needed
59-
path input_file
63+
path files_to_upload
64+
path metadata_analysis
65+
val genome_annotation
66+
val genome_build
67+
val wf_name
68+
val wf_version
6069

6170
output: // output, make update as needed
62-
path "output_dir/${params.output_pattern}", emit: output_file
71+
path "*.payload.json", emit: payload
72+
path "out/*", emit: files_to_upload
6373

6474
script:
6575
// add and initialize variables here as needed
6676

6777
"""
68-
mkdir -p output_dir
69-
7078
main.py \
71-
-i ${input_file} \
72-
-o output_dir
79+
-f ${files_to_upload} \
80+
-a ${metadata_analysis} \
81+
-g "${genome_annotation}" \
82+
-b "${genome_build}" \
83+
-w "${wf_name}" \
84+
-r ${workflow.runName} \
85+
-s ${workflow.sessionId} \
86+
-v ${wf_version}
7387
7488
"""
7589
}
@@ -79,6 +93,11 @@ process payloadGenQc {
7993
// using this command: nextflow run <git_acc>/<repo>/<pkg_name>/<main_script>.nf -r <pkg_name>.v<pkg_version> --params-file xxx
8094
workflow {
8195
payloadGenQc(
82-
file(params.input_file)
96+
Channel.fromPath(params.files_to_upload).collect(),
97+
file(params.metadata_analysis),
98+
params.genome_annotation,
99+
params.genome_build,
100+
params.wf_name,
101+
params.wf_version
83102
)
84103
}

payload-gen-qc/main.py

Lines changed: 212 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,230 @@
2525
import sys
2626
import argparse
2727
import subprocess
28+
import json
29+
import re
30+
import hashlib
31+
import uuid
32+
import tarfile
33+
from datetime import date
34+
import copy
2835

36+
workflow_full_name = {
37+
'pre-alignment-qc': 'Pre Alignment QC Workflow'
38+
}
39+
40+
def calculate_size(file_path):
41+
return os.stat(file_path).st_size
42+
43+
44+
def calculate_md5(file_path):
45+
md5 = hashlib.md5()
46+
with open(file_path, 'rb') as f:
47+
for chunk in iter(lambda: f.read(1024 * 1024), b''):
48+
md5.update(chunk)
49+
return md5.hexdigest()
50+
51+
def get_rg_id_from_lane_qc(tar, metadata):
52+
# tar name pattern
53+
# friendly_rg_id.6cae87bf9f05cdfaa4a26f2da625f3b2.fastqc.tgz
54+
# friendly_rg_id.6cae87bf9f05cdfaa4a26f2da625f3b2.cutadapt.tgz
55+
tar_basename = os.path.basename(tar)
56+
md5sum_from_filename = tar_basename.split('.')[-3]
57+
if not re.match(r'^[a-f0-9]{32}$', md5sum_from_filename):
58+
sys.exit('Error: lane naming not expected %s' % tar_basename)
59+
60+
for rg in metadata.get("read_groups"):
61+
rg_id = rg.get("submitter_read_group_id")
62+
friendly_rg_id = "".join([ c if re.match(r"[a-zA-Z0-9\.\-_]", c) else "_" for c in rg_id ])
63+
md5sum_from_metadata = hashlib.md5(rg_id.encode('utf-8')).hexdigest()
64+
if md5sum_from_metadata == md5sum_from_filename:
65+
return friendly_rg_id, rg.get("submitter_read_group_id")
66+
67+
# up to this point no match found, then something wrong
68+
sys.exit('Error: unable to match ubam qc metric tar "%s" to read group id' % tar_basename)
69+
70+
71+
def get_files_info(file_to_upload, date_str, analysis_dict):
72+
file_info = {
73+
'fileSize': calculate_size(file_to_upload),
74+
'fileMd5sum': calculate_md5(file_to_upload),
75+
'fileAccess': 'controlled',
76+
'info': {
77+
'data_category': 'Quality Control Metrics',
78+
'data_subtypes': None,
79+
'files_in_tgz': []
80+
}
81+
}
82+
83+
submitter_rg_id = None
84+
process_indicator = None
85+
if re.match(r'.+?\.fastqc\.tgz$', file_to_upload):
86+
file_type = 'fastqc'
87+
file_info.update({'dataType': 'Sequencing QC'})
88+
file_info['info']['data_subtypes'] = ['Read Group Metrics']
89+
file_info['info'].update({'analysis_tools': ['FastQC']})
90+
process_indicator, submitter_rg_id = get_rg_id_from_lane_qc(file_to_upload, analysis_dict)
91+
elif re.match(r'.+?\.cutadapt\.tgz$', file_to_upload):
92+
file_type = 'cutadapt'
93+
file_info.update({'dataType': 'Sequencing QC'})
94+
file_info['info']['data_subtypes'] = ['Read Group Metrics']
95+
file_info['info'].update({'analysis_tools': ['Cutadapt']})
96+
process_indicator, submitter_rg_id = get_rg_id_from_lane_qc(file_to_upload, analysis_dict)
97+
elif re.match(r'^multiqc\.tgz$', file_to_upload):
98+
file_type = 'multiqc'
99+
file_info.update({'dataType': 'Sequencing QC'})
100+
file_info['info']['data_subtypes'] = ['Read Group Metrics']
101+
file_info['info'].update({'analysis_tools': ['MultiQC']})
102+
process_indicator = 'summary'
103+
104+
else:
105+
sys.exit('Error: unknown QC metrics file: %s' % file_to_upload)
106+
107+
# file naming patterns:
108+
# pattern: <argo_study_id>.<argo_donor_id>.<argo_sample_id>.<experiment_strategy>.<date>.<process_indicator>.<file_type>.<file_ext>
109+
# process_indicator: pre-alignment(rg_id), alignment(aligner), post-alignment(caller)
110+
# example: TEST-PR.DO250183.SA610229.rna-seq.20200319.star.genome_aln.cram
111+
new_fname = '.'.join([
112+
analysis_dict['studyId'],
113+
analysis_dict['samples'][0]['donor']['donorId'],
114+
analysis_dict['samples'][0]['sampleId'],
115+
analysis_dict['experiment']['experimental_strategy'].lower() if analysis_dict['experiment'].get('experimental_strategy') else analysis_dict['experiment']['library_strategy'],
116+
date_str,
117+
process_indicator,
118+
file_type,
119+
'tgz'
120+
])
121+
122+
file_info['fileName'] = new_fname
123+
file_info['fileType'] = new_fname.split('.')[-1].upper()
124+
125+
extra_info = {}
126+
with tarfile.open(file_to_upload, 'r') as tar:
127+
for member in tar.getmembers():
128+
if member.name.endswith('qc_metrics.json') or member.name.endswith('.extra_info.json'):
129+
f = tar.extractfile(member)
130+
extra_info = json.load(f)
131+
else:
132+
if not file_info['info'].get('files_in_tgz'): file_info['info']['files_in_tgz'] = []
133+
file_info['info']['files_in_tgz'].append(os.path.basename(member.name))
134+
135+
if file_type =='fastqc':
136+
for e in extra_info['metrics']:
137+
e.update({'read_group_id': submitter_rg_id})
138+
elif file_type == 'cutadapt':
139+
extra_info['metrics'].update({'read_group_id': submitter_rg_id})
140+
else:
141+
pass
142+
143+
extra_info.pop('tool', None)
144+
if extra_info:
145+
file_info['info'].update({'metrics': extra_info.get('metrics', None)})
146+
file_info['info'].update({'description': extra_info.get('description', None)})
147+
148+
new_dir = 'out'
149+
try:
150+
os.mkdir(new_dir)
151+
except FileExistsError:
152+
pass
153+
154+
dst = os.path.join(os.getcwd(), new_dir, new_fname)
155+
os.symlink(os.path.abspath(file_to_upload), dst)
156+
157+
return file_info
158+
159+
def get_basename(metadata):
160+
study_id = metadata['studyId']
161+
donor_id = metadata['samples'][0]['donor']['donorId']
162+
sample_id = metadata['samples'][0]['sampleId']
163+
164+
if not sample_id or not donor_id or not study_id:
165+
sys.exit('Error: missing study/donor/sample ID in the provided metadata')
166+
167+
return ".".join([study_id, donor_id, sample_id])
168+
169+
def get_sample_info(sample_list):
170+
samples = copy.deepcopy(sample_list)
171+
for sample in samples:
172+
for item in ['info', 'sampleId', 'specimenId', 'donorId', 'studyId']:
173+
sample.pop(item, None)
174+
sample['specimen'].pop(item, None)
175+
sample['donor'].pop(item, None)
176+
177+
return samples
29178

30179
def main():
31180
"""
32181
Python implementation of tool: payload-gen-qc
33-
34-
This is auto-generated Python code, please update as needed!
35182
"""
36183

37184
parser = argparse.ArgumentParser(description='Tool: payload-gen-qc')
38-
parser.add_argument('-i', '--input-file', dest='input_file', type=str,
39-
help='Input file', required=True)
40-
parser.add_argument('-o', '--output-dir', dest='output_dir', type=str,
41-
help='Output directory', required=True)
185+
parser.add_argument("-a", "--metatada-analysis", dest="metadata_analysis", required=True,
186+
help="Input metadata analysis", type=str)
187+
parser.add_argument("-f", "--files_to_upload", dest="files_to_upload", type=str, required=True,
188+
nargs="+", help="All files to upload")
189+
parser.add_argument("-g", "--genome_annotation", dest="genome_annotation", default="", help="Genome annotation")
190+
parser.add_argument("-b", "--genome_build", dest="genome_build", default="", help="Genome build")
191+
parser.add_argument("-w", "--wf-name", dest="wf_name", required=True, help="Workflow name")
192+
parser.add_argument("-r", "--wf-run", dest="wf_run", required=True, help="workflow run ID")
193+
parser.add_argument("-s", "--wf-session", dest="wf_session", required=True, help="workflow session ID")
194+
parser.add_argument("-v", "--wf-version", dest="wf_version", required=True, help="Workflow version")
42195
args = parser.parse_args()
196+
197+
with open(args.metadata_analysis, 'r') as f:
198+
analysis_dict = json.load(f)
199+
200+
payload = {
201+
'analysisType': {
202+
'name': 'qc_metrics'
203+
},
204+
'studyId': analysis_dict.get('studyId'),
205+
'info': {},
206+
'workflow': {
207+
'workflow_name': workflow_full_name.get(args.wf_name, args.wf_name),
208+
'workflow_version': args.wf_version,
209+
'run_id': args.wf_run,
210+
'session_id': args.wf_session,
211+
'inputs': [
212+
{
213+
'analysis_type': analysis_dict['analysisType']['name'],
214+
'input_analysis_id': analysis_dict.get('analysisId')
215+
}
216+
]
217+
},
218+
'files': [],
219+
'experiment': analysis_dict.get('experiment'),
220+
'samples': get_sample_info(analysis_dict.get('samples'))
221+
}
222+
if args.genome_build:
223+
payload['workflow']['genome_build'] = args.genome_build
224+
if args.genome_annotation:
225+
payload['workflow']['genome_annotation'] = args.genome_annotation
226+
227+
# pass `info` dict from seq_experiment payload to new payload
228+
if 'info' in analysis_dict and isinstance(analysis_dict['info'], dict):
229+
payload['info'] = analysis_dict['info']
230+
else:
231+
payload.pop('info')
232+
233+
if 'library_strategy' in payload['experiment']:
234+
experimental_strategy = payload['experiment'].pop('library_strategy')
235+
payload['experiment']['experimental_strategy'] = experimental_strategy
236+
237+
new_dir = 'out'
238+
try:
239+
os.mkdir(new_dir)
240+
except FileExistsError:
241+
pass
43242

44-
if not os.path.isfile(args.input_file):
45-
sys.exit('Error: specified input file %s does not exist or is not accessible!' % args.input_file)
243+
# get file of the payload
244+
date_str = date.today().strftime("%Y%m%d")
245+
for f in args.files_to_upload:
246+
file_info = get_files_info(f, date_str, analysis_dict)
247+
payload['files'].append(file_info)
46248

47-
if not os.path.isdir(args.output_dir):
48-
sys.exit('Error: specified output dir %s does not exist or is not accessible!' % args.output_dir)
249+
with open("%s.%s.payload.json" % (str(uuid.uuid4()), args.wf_name), 'w') as f:
250+
f.write(json.dumps(payload, indent=2))
49251

50-
subprocess.run(f"cp {args.input_file} {args.output_dir}/", shell=True, check=True)
51252

52253

53254
if __name__ == "__main__":

0 commit comments

Comments
 (0)