Browse Source

[7] Added experiment run endpoint

An enpoint to query the run status of an experiment pipeline was added.
at-robins 8 months ago
parent
commit
0037ab9372

+ 51 - 2
backend/src/controller/experiment_controller.rs

@@ -1,3 +1,5 @@
+use std::collections::HashMap;
+
 use crate::{
     application::{
         config::Configuration,
@@ -221,15 +223,62 @@ pub async fn get_experiment_pipelines(
 ) -> Result<web::Json<Vec<ExperimentPipelineBlueprint>>, SeqError> {
     let experiment_id: i32 = id.into_inner();
     let mut connection = database_manager.database_connection()?;
+    let all_experiment_stati =
+        ExperimentExecution::get_by_experiment(experiment_id, &mut connection)?;
     let mut experiment_pipelines = Vec::new();
     for pipeline in pipelines.pipelines() {
         let values = crate::model::db::pipeline_step_variable::PipelineStepVariable::get_values_by_experiment_and_pipeline(experiment_id, pipeline.pipeline().id(), &mut connection)?;
-        experiment_pipelines
-            .push(ExperimentPipelineBlueprint::from_internal(pipeline.pipeline(), values));
+        let stati: HashMap<String, String> = all_experiment_stati
+            .iter()
+            .filter(|execution| &execution.pipeline_id == pipeline.pipeline().id())
+            .map(|execution| {
+                (execution.pipeline_step_id.clone(), execution.execution_status.clone())
+            })
+            .collect();
+        experiment_pipelines.push(ExperimentPipelineBlueprint::from_internal(
+            pipeline.pipeline(),
+            values,
+            stati,
+        ));
     }
     Ok(web::Json(experiment_pipelines))
 }
 
+pub async fn get_experiment_pipeline_run(
+    database_manager: web::Data<DatabaseManager>,
+    pipelines: web::Data<LoadedPipelines>,
+    id: web::Path<i32>,
+) -> Result<web::Json<Option<ExperimentPipelineBlueprint>>, SeqError> {
+    let experiment_id: i32 = id.into_inner();
+    let mut connection = database_manager.database_connection()?;
+    Experiment::exists_err(experiment_id, &mut connection)?;
+    let experiment = Experiment::get(experiment_id, &mut connection)?;
+    let experiment_pipeline = if let Some(pipeline_id) = experiment.pipeline_id {
+        if let Some(pipeline) = pipelines.get(&pipeline_id) {
+            let values = crate::model::db::pipeline_step_variable::PipelineStepVariable::get_values_by_experiment_and_pipeline(experiment_id, pipeline.pipeline().id(), &mut connection)?;
+            let stati: HashMap<String, String> =
+                ExperimentExecution::get_by_experiment(experiment_id, &mut connection)?
+                    .into_iter()
+                    .filter(|execution| &execution.pipeline_id == &pipeline_id)
+                    .map(|execution| {
+                        (execution.pipeline_step_id, execution.execution_status)
+                    })
+                    .collect();
+            Some(ExperimentPipelineBlueprint::from_internal(pipeline.pipeline(), values, stati))
+        } else {
+            return Err(SeqError::new(
+                "Not Found",
+                SeqErrorType::NotFoundError,
+                format!("No pipeline with ID {} is currently loaded.", pipeline_id),
+                "The pipeline ID is invalid.",
+            ));
+        }
+    } else {
+        None
+    };
+    Ok(web::Json(experiment_pipeline))
+}
+
 pub async fn post_experiment_pipeline_variable(
     database_manager: web::Data<DatabaseManager>,
     pipelines: web::Data<LoadedPipelines>,

+ 2 - 1
backend/src/controller/routing.rs

@@ -8,7 +8,7 @@ use super::{
     experiment_controller::{
         create_experiment, delete_experiment, get_experiment, get_experiment_pipelines,
         list_experiment, patch_experiment_comment, patch_experiment_mail, patch_experiment_name,
-        patch_experiment_pipeline, post_experiment_pipeline_variable, post_execute_experiment, get_experiment_execution_status, post_experiment_execution_abort, post_experiment_execution_reset,
+        patch_experiment_pipeline, post_experiment_pipeline_variable, post_execute_experiment, get_experiment_execution_status, post_experiment_execution_abort, post_experiment_execution_reset, get_experiment_pipeline_run,
     },
     file_controller::{delete_files_by_path, get_files, post_add_file, post_add_folder},
     global_data_controller::{
@@ -56,6 +56,7 @@ pub fn routing_config(cfg: &mut ServiceConfig) {
     .route("/api/experiments/{id}/pipeline", web::patch().to(patch_experiment_pipeline))
     .route("/api/experiments/{id}/pipelines", web::get().to(get_experiment_pipelines))
     .route("/api/experiments/{id}/reset", web::post().to(post_experiment_execution_reset))
+    .route("/api/experiments/{id}/run", web::get().to(get_experiment_pipeline_run))
     .route("/api/experiments/{id}/status", web::get().to(get_experiment_execution_status))
     .route("/api/experiments/{id}/variable", web::post().to(post_experiment_pipeline_variable))
     // Global data repositories

+ 51 - 9
backend/src/model/exchange/experiment_pipeline.rs

@@ -33,15 +33,23 @@ impl ExperimentPipelineBlueprint {
     /// * `pipeline` - the pipeline to convert
     /// * `values` - a map of variable values, where the keys are a concatenation of the
     /// pipeline step ID and variable ID
-    pub fn from_internal<T: Borrow<PipelineBlueprint>, S: Borrow<HashMap<String, String>>>(
-        pipeline: T,
-        values: S,
+    /// * `stati` - a map of stati, where the keys are the pipeline step ID
+    pub fn from_internal<
+        PipelineType: Borrow<PipelineBlueprint>,
+        ValueMapType: Borrow<HashMap<String, String>>,
+        StatusMapType: Borrow<HashMap<String, String>>,
+    >(
+        pipeline: PipelineType,
+        values: ValueMapType,
+        stati: StatusMapType,
     ) -> Self {
         let steps = pipeline
             .borrow()
             .steps()
             .iter()
-            .map(|s| ExperimentPipelineStepBlueprint::from_internal(s, values.borrow()))
+            .map(|s| {
+                ExperimentPipelineStepBlueprint::from_internal(s, values.borrow(), stati.borrow())
+            })
             .collect();
         Self {
             id: pipeline.borrow().id().clone(),
@@ -73,6 +81,9 @@ pub struct ExperimentPipelineStepBlueprint {
     /// The variables that can be specified for the pipeline step.
     #[getset(get = "pub")]
     variables: Vec<ExperimentPipelineStepVariable>,
+    /// The execution status of the pipeline step.
+    #[getset(get = "pub")]
+    status: Option<String>,
 }
 
 impl ExperimentPipelineStepBlueprint {
@@ -84,9 +95,15 @@ impl ExperimentPipelineStepBlueprint {
     /// * `pipeline_step` - the step to convert
     /// * `values` - a map of variable values, where the keys are a concatenation of the
     /// pipeline step ID and variable ID
-    pub fn from_internal<T: Borrow<PipelineStepBlueprint>, S: Borrow<HashMap<String, String>>>(
-        pipeline_step: T,
-        values: S,
+    /// * `stati` - a map of stati, where the keys are the pipeline step ID
+    pub fn from_internal<
+        StepType: Borrow<PipelineStepBlueprint>,
+        ValueMapType: Borrow<HashMap<String, String>>,
+        StatusMapType: Borrow<HashMap<String, String>>,
+    >(
+        pipeline_step: StepType,
+        values: ValueMapType,
+        stati: StatusMapType,
     ) -> Self {
         let variables = pipeline_step
             .borrow()
@@ -102,6 +119,10 @@ impl ExperimentPipelineStepBlueprint {
                 )
             })
             .collect();
+        let status: Option<String> = stati
+            .borrow()
+            .get(pipeline_step.borrow().id())
+            .map(|s| s.clone());
         Self {
             id: pipeline_step.borrow().id().clone(),
             name: pipeline_step.borrow().name().clone(),
@@ -109,6 +130,7 @@ impl ExperimentPipelineStepBlueprint {
             container: pipeline_step.borrow().container().clone(),
             dependencies: pipeline_step.borrow().dependencies().clone(),
             variables,
+            status,
         }
     }
 }
@@ -168,6 +190,8 @@ impl ExperimentPipelineStepVariable {
 #[cfg(test)]
 mod tests {
 
+    use crate::model::internal::step::PipelineStepStatus;
+
     use super::*;
 
     #[test]
@@ -299,14 +323,18 @@ mod tests {
         values.insert("fastqcglobal".to_string(), value_global.clone());
         values.insert("fastqcbool".to_string(), value_bool.clone());
 
+        let mut stati = HashMap::new();
+        stati.insert("fastqc".to_string(), PipelineStepStatus::Pending.to_string());
+
         let experiment_step =
-            ExperimentPipelineStepBlueprint::from_internal(&pipeline_step, values);
+            ExperimentPipelineStepBlueprint::from_internal(&pipeline_step, values, stati);
         assert_eq!(experiment_step.id(), pipeline_step.id());
         assert_eq!(experiment_step.name(), pipeline_step.name());
         assert_eq!(experiment_step.description(), pipeline_step.description());
         assert_eq!(experiment_step.container(), pipeline_step.container());
         assert_eq!(experiment_step.dependencies(), pipeline_step.dependencies());
         assert_eq!(experiment_step.variables().len(), pipeline_step.variables().len());
+        assert_eq!(experiment_step.status(), &Some(PipelineStepStatus::Pending.to_string()));
 
         let experiment_vars = experiment_step.variables();
         let pipeline_vars = pipeline_step.variables();
@@ -399,7 +427,11 @@ mod tests {
         values.insert("fastqc2bool".to_string(), "10".to_string());
         values.insert("fastqc2global".to_string(), "11".to_string());
 
-        let experiment_pipeline = ExperimentPipelineBlueprint::from_internal(&pipeline, values);
+        let mut stati = HashMap::new();
+        stati.insert("fastqc2".to_string(), PipelineStepStatus::Failed.to_string());
+
+        let experiment_pipeline =
+            ExperimentPipelineBlueprint::from_internal(&pipeline, values, stati);
         assert_eq!(experiment_pipeline.id(), pipeline.id());
         assert_eq!(experiment_pipeline.name(), pipeline.name());
         assert_eq!(experiment_pipeline.description(), pipeline.description());
@@ -414,6 +446,16 @@ mod tests {
             assert_eq!(experiment_steps[i].container(), pipeline_steps[i].container());
             assert_eq!(experiment_steps[i].dependencies(), pipeline_steps[i].dependencies());
             assert_eq!(experiment_steps[i].variables().len(), pipeline_steps[i].variables().len());
+
+            if experiment_steps[i].id() == "fastqc2" {
+                assert_eq!(
+                    experiment_steps[i].status(),
+                    &Some(PipelineStepStatus::Failed.to_string())
+                );
+            } else {
+                assert_eq!(experiment_steps[i].status(), &None);
+            }
+
             let experiment_vars = experiment_steps[i].variables();
             let pipeline_vars = pipeline_steps[i].variables();
             for j in 0..experiment_vars.len() {

+ 3 - 2
backend/src/service/container_service.rs

@@ -3,7 +3,7 @@ use std::{
     hash::{Hash, Hasher},
     io::{BufWriter, Write},
     path::Path,
-    process::{Child, Command, Output, Stdio},
+    process::{Child, Command, Output, Stdio}, collections::HashMap,
 };
 
 use actix_web::web;
@@ -549,7 +549,8 @@ impl ContainerHandler {
         if let Some(pipeline) = self.loaded_pipelines.get(&step.pipeline_id) {
             let mut connection = self.database_manager.database_connection()?;
             let values = crate::model::db::pipeline_step_variable::PipelineStepVariable::get_values_by_experiment_and_pipeline(step.experiment_id, pipeline.pipeline().id(), &mut connection)?;
-            let pipeline = ExperimentPipelineBlueprint::from_internal(pipeline.pipeline(), values);
+            // The stati of the pipeline steps should be None at this point so an empty map is supplied instead of loading them from the database.
+            let pipeline = ExperimentPipelineBlueprint::from_internal(pipeline.pipeline(), values, HashMap::new());
             if let Some(step_blueprint) = pipeline
                 .steps()
                 .iter()