Browse Source

[7] Updated mount injection

The mount injection into the containers was improved
by passing a JSON string as environment variable.
at-robins 8 months ago
parent
commit
345cdc06b5

+ 28 - 4
backend/src/application/config.rs

@@ -198,9 +198,7 @@ impl Configuration {
     ) -> PathBuf {
         let mut path: PathBuf = self.experiment_steps_path(experiment_id);
         // Hashing the ID prevents invalid characters in file paths.
-        let mut hasher = XxHash64::with_seed(154);
-        step_id.as_ref().hash(&mut hasher);
-        path.push(hasher.finish().to_string());
+        path.push(Self::hash_string(step_id));
         path
     }
 
@@ -229,6 +227,18 @@ impl Configuration {
     pub fn server_address_and_port(&self) -> String {
         format!("{}:{}", self.server_address(), self.server_port())
     }
+
+    /// Hashes the specified string and returns the resulting number as a string.
+    /// The hash is constant for the same string on repeated uses.
+    ///
+    /// # Parameters
+    ///
+    /// * `value` - the string to hash
+    pub fn hash_string<T: AsRef<str>>(value: T) -> String {
+        let mut hasher = XxHash64::with_seed(154);
+        value.as_ref().hash(&mut hasher);
+        hasher.finish().to_string()
+    }
 }
 
 #[cfg(test)]
@@ -288,7 +298,21 @@ mod tests {
     fn test_experiment_step_path() {
         let config = Configuration::new("", "", "", "", "./application/context", "");
         // Hash of step_id.
-        let path: PathBuf = "./application/context/experiments/experiment_id/steps/4363919453614495606".into();
+        let path: PathBuf =
+            "./application/context/experiments/experiment_id/steps/4363919453614495606".into();
         assert_eq!(config.experiment_step_path("experiment_id", "step_id"), path);
     }
+
+    #[test]
+    fn test_hash_string() {
+        let random_string = "39012rtuj132-0t1jp41-9/n\n\t@#$%^&*()|}{\"?>¡ªº£€˚„";
+        let hash = Configuration::hash_string(random_string);
+        let allowed_characters = vec!['0', '1', '2', '3', '4', '5', '6', '7', '8', '9'];
+        assert!(hash.len() > 0);
+        // u64 max
+        assert!(hash.len() <= 20);
+        for character in hash.chars() {
+            assert!(allowed_characters.contains(&character));
+        }
+    }
 }

+ 44 - 15
backend/src/service/container_service.rs

@@ -1,15 +1,14 @@
 use std::{
+    collections::HashMap,
     ffi::OsString,
-    hash::{Hash, Hasher},
     io::{BufWriter, Write},
     path::Path,
-    process::{Child, Command, Output, Stdio}, collections::HashMap,
+    process::{Child, Command, Output, Stdio},
 };
 
 use actix_web::web;
 use chrono::NaiveDateTime;
 use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
-use twox_hash::XxHash64;
 
 use crate::{
     application::{
@@ -28,6 +27,9 @@ use crate::{
 
 use super::pipeline_service::LoadedPipelines;
 
+/// The container environment variable specifying all mounts.
+const CONTAINER_ENV_MOUNT: &str = "MOUNT_PATHS";
+
 /// Builds the specifc pipeline step container at the specified context.
 ///
 /// # Parameters
@@ -91,35 +93,56 @@ pub fn run_pipeline_step<T: AsRef<str>>(
         format_container_name(&pipeline_id, step.id()).into(),
         "--rm".into(),
     ];
+
     arguments.extend(pipeline_step_mount(output_path, "/output", false));
     // Set initial sample input mount.
     arguments.extend(pipeline_step_mount(
         app_config.experiment_input_path(&experiment_id),
-        "/input/samples",
+        "/input/base",
         true,
     ));
     // Set input mounts / dependencies.
+    let mut mount_map_dependencies = serde_json::Map::new();
     for dependency_id in step.dependencies() {
+        let target = format!("/input/steps/{}", Configuration::hash_string(dependency_id));
+        mount_map_dependencies
+            .insert(dependency_id.to_string(), serde_json::Value::String(target.clone()));
         arguments.extend(pipeline_step_mount(
             app_config.experiment_step_path(&experiment_id, dependency_id),
-            format!("/input/steps/{}", dependency_id),
+            target,
             true,
         ));
     }
     // Set global mounts.
+    let mut mount_map_globals = serde_json::Map::new();
     step.variables()
         .iter()
         .filter(|var_instance| var_instance.is_global_data_reference())
         // Filter out variables without values.
         .filter_map(|var_instance| var_instance.value().as_ref().map(|value| (var_instance.id(), value)))
         .for_each(|(global_var_id, global_var_value)| {
+            let target = format!("/input/globals/{}", Configuration::hash_string(global_var_id));
+            mount_map_globals.insert(
+                global_var_id.to_string(),
+                serde_json::Value::String(target.clone()),
+            );
             arguments.extend(pipeline_step_mount(
                 app_config.global_data_path(global_var_value),
-                format!("/input/globals/{}", global_var_id),
+                target,
                 true,
             ));
         });
 
+    // Set mount envrionment variable.
+    let mut mount_map_top = serde_json::Map::new();
+    mount_map_top.insert("input".to_string(), serde_json::Value::String("/input/base".to_string()));
+    mount_map_top.insert("output".to_string(), serde_json::Value::String("/output".to_string()));
+    mount_map_top.insert("dependencies".to_string(), serde_json::Value::Object(mount_map_dependencies));
+    mount_map_top.insert("globals".to_string(), serde_json::Value::Object(mount_map_globals));
+    let mount_paths = serde_json::Value::Object(mount_map_top);
+    arguments.push("--env".into());
+    arguments.push(format!("{}={}", CONTAINER_ENV_MOUNT, mount_paths.to_string()).into());
+
     // Set other variables.
     step.variables()
         .iter()
@@ -127,8 +150,12 @@ pub fn run_pipeline_step<T: AsRef<str>>(
         // Filter out variables without values.
         .filter_map(|var_instance| var_instance.value().as_ref().map(|value| (var_instance.id(), value)))
         .for_each(|(other_var_id, other_var_value)| {
-            arguments.push("--env".into());
-            arguments.push(format!("{}={}", other_var_id, other_var_value).into());
+            if other_var_id != CONTAINER_ENV_MOUNT {
+                arguments.push("--env".into());
+                arguments.push(format!("{}={}", other_var_id, other_var_value).into());
+            } else {
+                log::warn!("Pipeline {} step {} tried to overwrite the reserved environment variable {} with value {}.", pipeline_id.as_ref(), step.id(), other_var_id, other_var_value);
+            }
         });
 
     // Set container to run.
@@ -176,9 +203,7 @@ fn format_container_name<T: AsRef<str>, R: AsRef<str>>(
     pipeline_step_id: R,
 ) -> String {
     let name = format!("{}{}", pipeline_id.as_ref(), pipeline_step_id.as_ref());
-    let mut hasher = XxHash64::with_seed(154);
-    name.hash(&mut hasher);
-    hasher.finish().to_string()
+    Configuration::hash_string(name)
 }
 
 pub struct ContainerHandler {
@@ -330,10 +355,10 @@ impl ContainerHandler {
 
     /// Aborts the the pipeline step belonging to the specified experiment
     /// if currently executed.
-    /// 
+    ///
     /// # Parameters
-    /// 
-    /// * `experiment_id` - the ID of the experiment to abort 
+    ///
+    /// * `experiment_id` - the ID of the experiment to abort
     pub fn abort(&mut self, experiment_id: i32) -> Result<(), SeqError> {
         if let Some(step) = &self.executed_step {
             if experiment_id == step.experiment_id {
@@ -550,7 +575,11 @@ impl ContainerHandler {
             let mut connection = self.database_manager.database_connection()?;
             let values = crate::model::db::pipeline_step_variable::PipelineStepVariable::get_values_by_experiment_and_pipeline(step.experiment_id, pipeline.pipeline().id(), &mut connection)?;
             // The stati of the pipeline steps should be None at this point so an empty map is supplied instead of loading them from the database.
-            let pipeline = ExperimentPipelineBlueprint::from_internal(pipeline.pipeline(), values, HashMap::new());
+            let pipeline = ExperimentPipelineBlueprint::from_internal(
+                pipeline.pipeline(),
+                values,
+                HashMap::new(),
+            );
             if let Some(step_blueprint) = pipeline
                 .steps()
                 .iter()

+ 8 - 5
pipelines/wiedemann_atac_pipeline/container/fastqc_initial/run_fastqc.py

@@ -1,18 +1,21 @@
 #!/usr/bin/python
 """This module runs the initial QC process."""
 
+import json
 import os
 import sys
+from contextlib import suppress
 
 BASE_COMMAND = "perl -- /FastQC/fastqc"
-INPUT_FOLDER = "/input/samples/"
+MOUNT_PATHS = json.loads(os.environ.get("MOUNT_PATHS"))
+INPUT_FOLDER = MOUNT_PATHS["input"] + "/"
 
 # If a specific environment variable is set, appends the respective option.
 options = ""
 
-adapters = os.environ.get("ADAPTERS")
-if adapters is not None:
-    options += f" --adapters /input/globals/{adapters}/adapters.txt"
+with suppress(Exception):
+    adapters = MOUNT_PATHS["globals"]["ADAPTERS"]
+    options += f" --adapters {adapters}/adapters.txt"
 
 kmers = os.environ.get("KMERS")
 if kmers is not None:
@@ -34,7 +37,7 @@ for root, dirs, files in os.walk(INPUT_FOLDER):
             if file.casefold().endswith(".fq.gz") or file.casefold().endswith(".fastq.gz"):
                 file_input_path = os.path.join(root, file)
                 folder_output_path = os.path.join(
-                    "/output",
+                    MOUNT_PATHS["output"],
                     root.removeprefix(INPUT_FOLDER)
                 )
                 full_command = (f"{BASE_COMMAND}{options} "

+ 9 - 5
pipelines/wiedemann_atac_pipeline/container/fastqc_trimming/run_fastqc.py

@@ -1,18 +1,22 @@
 #!/usr/bin/python
 """This module runs the after trimming QC process."""
 
+import json
 import os
 import sys
+from contextlib import suppress
 
 BASE_COMMAND = "perl -- /FastQC/fastqc"
-INPUT_FOLDER = "/input/steps/trimming/"
+MOUNT_PATHS = json.loads(os.environ.get("MOUNT_PATHS"))
+INPUT_FOLDER = MOUNT_PATHS["dependencies"]["trimming"] + "/"
 
+print(MOUNT_PATHS)
 # If a specific environment variable is set, appends the respective option.
 options = ""
 
-adapters = os.environ.get("ADAPTERS")
-if adapters is not None:
-    options += f" --adapters /input/globals/{adapters}/adapters.txt"
+with suppress(Exception):
+    adapters = MOUNT_PATHS["globals"]["ADAPTERS"]
+    options += f" --adapters {adapters}/adapters.txt"
 
 kmers = os.environ.get("KMERS")
 if kmers is not None:
@@ -34,7 +38,7 @@ for root, dirs, files in os.walk(INPUT_FOLDER):
             if file.casefold().endswith(".fq.gz") or file.casefold().endswith(".fastq.gz"):
                 file_input_path = os.path.join(root, file)
                 folder_output_path = os.path.join(
-                    "/output",
+                    MOUNT_PATHS["output"],
                     root.removeprefix(INPUT_FOLDER)
                 )
                 full_command = (f"{BASE_COMMAND}{options} "

+ 4 - 8
pipelines/wiedemann_atac_pipeline/container/trimmomatic/run_trimming.py

@@ -1,13 +1,15 @@
 #!/usr/bin/python
 """This module runs the trimming process."""
 
+import json
 import os
 import sys
 
 BASE_COMMAND = "java -jar /Trimmomatic-0.39/trimmomatic-0.39.jar PE"
 STEP_OPTIONS = ("ILLUMINACLIP:/Trimmomatic-0.39/adapters/NexteraPE-PE.fa:2:30:10:2:True "
 "LEADING:3 TRAILING:3 SLIDINGWINDOW:4:15 MINLEN:36")
-INPUT_FOLDER = "/input/samples/"
+MOUNT_PATHS = json.loads(os.environ.get("MOUNT_PATHS"))
+INPUT_FOLDER = MOUNT_PATHS["input"] + "/"
 
 # If a specific environment variable is set, appends the respective option.
 options = ""
@@ -20,12 +22,6 @@ if phred is not None:
     else:
         print(f"Unknown PHRED score option: {phred}", file=sys.stderr)
 
-if os.environ.get("PHRED33") is not None:
-    options += " --phred33"
-
-if os.environ.get("PHRED64") is not None:
-    options += " --phred64"
-
 if not options:
     print("Running with default options.")
 else:
@@ -49,7 +45,7 @@ for root, dirs, files in os.walk(INPUT_FOLDER):
 
             if input_files:
                 file_base_output_path = os.path.join(
-                    "/output",
+                    MOUNT_PATHS["output"],
                     file_base_input_path.removeprefix(INPUT_FOLDER)
                 )
                 full_command = (f"{BASE_COMMAND}{options} {input_files} "

+ 1 - 1
pipelines/wiedemann_atac_pipeline/pipeline.json

@@ -1,7 +1,7 @@
 {
   "id": "wiedemann_atac_paired_end",
   "name": "Wiedemann ATAC paired end pipeline",
-  "description": "<p>This pipeline processes paired end ATAC data.</p>",
+  "description": "<p>This pipeline processes paired end ATAC data. The input is expected as gzipped FASTQ files (.fq.gz / .fastq.gz) and might be organised in sub-folders.</p>",
   "steps": [
     {
       "id": "qc_initial",