Browse Source

[7] Added step restarting

Restarting specific steps was implemented and
log file generation was updated acordingly.
at-robins 8 months ago
parent
commit
47a53f1b90

+ 43 - 0
backend/src/application/config.rs

@@ -214,6 +214,30 @@ impl Configuration {
         path
     }
 
+    /// The context path where a specific pipeline log file is stored.
+    ///
+    /// # Parameters
+    ///
+    /// * `experiment_id` - the ID of the experiment
+    /// * `pipeline_id` - the ID of the pipeline
+    /// * `step_id` - the ID of the pipeline step
+    /// * `process_type` - the type of process to log
+    pub fn experiment_log_path<P: AsRef<str>, Q: AsRef<str>, R: AsRef<str>, S: AsRef<str>>(
+        &self,
+        experiment_id: P,
+        pipeline_id: Q,
+        step_id: R,
+        process_type: S,
+    ) -> PathBuf {
+        let mut path: PathBuf = self.experiment_logs_path(experiment_id);
+        path.push(format!(
+            "{}_{}.log",
+            Self::hash_string(format!("{}{}", pipeline_id.as_ref(), step_id.as_ref())),
+            process_type.as_ref()
+        ));
+        path
+    }
+
     /// Generates a V1 UUID.
     pub fn generate_uuid() -> Uuid {
         let now = SystemTime::now()
@@ -303,6 +327,25 @@ mod tests {
         assert_eq!(config.experiment_step_path("experiment_id", "step_id"), path);
     }
 
+    #[test]
+    fn test_experiment_logs_path() {
+        let config = Configuration::new("", "", "", "", "./application/context", "");
+        // Hash of step_id.
+        let path: PathBuf =
+            "./application/context/experiments/experiment_id/logs".into();
+        assert_eq!(config.experiment_logs_path("experiment_id"), path);
+    }
+
+
+    #[test]
+    fn test_experiment_log_path() {
+        let config = Configuration::new("", "", "", "", "./application/context", "");
+        // Hash of step_id.
+        let path: PathBuf =
+            "./application/context/experiments/experiment_id/logs/13269802908832430007_type.log".into();
+        assert_eq!(config.experiment_log_path("experiment_id", "pipeline_id", "step_id", "type"), path);
+    }
+
     #[test]
     fn test_hash_string() {
         let random_string = "39012rtuj132-0t1jp41-9/n\n\t@#$%^&*()|}{\"?>¡ªº£€˚„";

+ 178 - 4
backend/src/controller/experiment_controller.rs

@@ -26,6 +26,7 @@ use crate::{
     },
 };
 use actix_web::{web, HttpResponse};
+use chrono::NaiveDateTime;
 use diesel::{ExpressionMethods, QueryDsl};
 use parking_lot::Mutex;
 
@@ -260,9 +261,7 @@ pub async fn get_experiment_pipeline_run(
                 ExperimentExecution::get_by_experiment(experiment_id, &mut connection)?
                     .into_iter()
                     .filter(|execution| &execution.pipeline_id == &pipeline_id)
-                    .map(|execution| {
-                        (execution.pipeline_step_id, execution.execution_status)
-                    })
+                    .map(|execution| (execution.pipeline_step_id, execution.execution_status))
                     .collect();
             Some(ExperimentPipelineBlueprint::from_internal(pipeline.pipeline(), values, stati))
         } else {
@@ -346,6 +345,7 @@ pub async fn post_execute_experiment(
 ) -> Result<HttpResponse, SeqError> {
     let experiment_id: i32 = experiment_id.into_inner();
     let mut connection = database_manager.database_connection()?;
+    Experiment::exists_err(experiment_id, &mut connection)?;
     // Error if the experiment was already submitted for execution.
     if ExperimentExecution::has_experiment_execution_entries(experiment_id, &mut connection)? {
         return Err(SeqError::new(
@@ -418,7 +418,181 @@ pub async fn post_execute_experiment(
             "Invalid run",
             SeqErrorType::BadRequestError,
             format!(
-                "No pipeline was selected for experiment {}, so i cannot be run.",
+                "No pipeline was selected for experiment {}, so it cannot be run.",
+                experiment_id
+            ),
+            "The requested run parameters are invalid.",
+        ))
+    }
+}
+
+pub async fn post_execute_experiment_step(
+    database_manager: web::Data<DatabaseManager>,
+    app_config: web::Data<Configuration>,
+    experiment_id: web::Path<i32>,
+    step_id: web::Json<String>,
+    pipelines: web::Data<LoadedPipelines>,
+) -> Result<HttpResponse, SeqError> {
+    let experiment_id: i32 = experiment_id.into_inner();
+    let step_id: String = step_id.into_inner();
+    let mut connection = database_manager.database_connection()?;
+    // Error if the experiment does not exist.
+    Experiment::exists_err(experiment_id, &mut connection)?;
+    if let Some(pipeline_id) = Experiment::get(experiment_id, &mut connection)?.pipeline_id {
+        if let Some(pipeline) = pipelines.get(&pipeline_id) {
+            let pipeline = pipeline.pipeline();
+            // Checks if the step exists within the currently selected pipeline
+            if let Some(step) = pipeline.steps().iter().find(|step| step.id() == &step_id) {
+                // Checks if the dependencies are satisfied.
+                let executions: Vec<ExperimentExecution> =
+                    ExperimentExecution::get_by_experiment(experiment_id, &mut connection)?
+                        .into_iter()
+                        .filter(|s| &s.pipeline_id == &pipeline_id)
+                        .collect();
+                let satisfied_dependencies: Vec<&String> = executions
+                    .iter()
+                    .filter(|s| s.execution_status == ExecutionStatus::Finished.to_string())
+                    .map(|s| &s.pipeline_step_id)
+                    .collect();
+                let are_dependencies_satisfied: bool = step
+                    .dependencies()
+                    .iter()
+                    .all(|dependency| satisfied_dependencies.contains(&dependency));
+                if !are_dependencies_satisfied {
+                    return Err(SeqError::new(
+                        "Invalid run",
+                        SeqErrorType::BadRequestError,
+                        format!("The experiment {} is missing dependencies for execution of pipeline {} step {}.\nRequired dependencies: {:?}\nSatisfied dependencies: {:?}", experiment_id, pipeline.id(), step.id(), step.dependencies(), satisfied_dependencies),
+                        "The requested run parameters are invalid.",
+                    ));
+                }
+                // Checks if the required variables are set.
+                let experiment_variables =
+                    PipelineStepVariable::get_values_by_experiment_and_pipeline(
+                        experiment_id,
+                        &pipeline_id,
+                        &mut connection,
+                    )?;
+                for variable in step.variables() {
+                    if variable.required().unwrap_or(false) {
+                        // Error if required variables are not set.
+                        if !experiment_variables.contains_key(&format!(
+                            "{}{}",
+                            step.id(),
+                            variable.id()
+                        )) {
+                            return Err(SeqError::new(
+                                    "Invalid run",
+                                    SeqErrorType::BadRequestError,
+                                    format!("The experiment {} is missing the required variable with pipeline id {} step id {} and variable id {}.", experiment_id, pipeline.id(), step.id(), variable.id()),
+                                    "The requested run parameters are invalid.",
+                                ));
+                        }
+                    }
+                }
+                // Submit execution step.
+                if let Some(existing_execution) =
+                    executions.iter().find(|s| s.pipeline_step_id == step_id)
+                {
+                    // Restart an existing pipeline step.
+                    if existing_execution.execution_status == ExecutionStatus::Running.to_string()
+                        || existing_execution.execution_status
+                            == ExecutionStatus::Waiting.to_string()
+                    {
+                        // Error if the step is currently scheduled for execution.
+                        return Err(SeqError::new(
+                            "Invalid run",
+                            SeqErrorType::BadRequestError,
+                            format!("The experiment {} pipeline {} step {} is already scheduled for execution and can thus not be restarted.", experiment_id, pipeline.id(), step.id()),
+                            "The requested run parameters are invalid.",
+                        ));
+                    }
+                    // Remove resources realted to the pipeline step.
+                    let step_path =
+                        app_config.experiment_step_path(experiment_id.to_string(), &step_id);
+                    if step_path.exists() {
+                        std::fs::remove_dir_all(step_path)?;
+                    }
+                    let log_path_build = app_config.experiment_log_path(
+                        experiment_id.to_string(),
+                        &pipeline_id,
+                        &step_id,
+                        "build",
+                    );
+                    if log_path_build.exists() {
+                        std::fs::remove_file(log_path_build)?;
+                    }
+                    let log_path_run = app_config.experiment_log_path(
+                        experiment_id.to_string(),
+                        &pipeline_id,
+                        &step_id,
+                        "run",
+                    );
+                    if log_path_run.exists() {
+                        std::fs::remove_file(log_path_run)?;
+                    }
+                    connection.immediate_transaction(|connection| {
+                        let clear_time: Option<NaiveDateTime> = None;
+                        diesel::update(
+                            crate::schema::experiment_execution::table.find(existing_execution.id),
+                        )
+                        .set((
+                            crate::schema::experiment_execution::execution_status
+                                .eq(ExecutionStatus::Waiting.to_string()),
+                            crate::schema::experiment_execution::start_time.eq(clear_time.clone()),
+                            crate::schema::experiment_execution::end_time.eq(clear_time),
+                        ))
+                        .execute(connection)
+                    })?;
+                } else {
+                    // Create a newly added pipeline step.
+                    let execution_step: NewExperimentExecution =
+                        NewExperimentExecution::new(experiment_id, pipeline.id(), step.id());
+
+                    connection.immediate_transaction(|connection| {
+                        diesel::insert_into(crate::schema::experiment_execution::table)
+                            .values(&execution_step)
+                            .execute(connection)
+                    })?;
+                }
+                log::info!(
+                    "Submitted experiment {} with pipeline {} step {} for execution.",
+                    experiment_id,
+                    pipeline.id(),
+                    step_id
+                );
+                Ok(HttpResponse::Ok().finish())
+            } else {
+                // Error the pipeline does not contain the step.
+                Err(SeqError::new(
+                    "Invalid run",
+                    SeqErrorType::BadRequestError,
+                    format!(
+                        "The selected pipeline {} for experiment {} is does not contain step {}.",
+                        pipeline_id, experiment_id, step_id
+                    ),
+                    "The requested run parameters are invalid.",
+                ))
+            }
+        } else {
+            // Error if the pipeline is not loaded.
+            Err(SeqError::new(
+                "Invalid run",
+                SeqErrorType::BadRequestError,
+                format!(
+                    "The selected pipeline {} for experiment {} is not loaded.",
+                    pipeline_id, experiment_id
+                ),
+                "The requested run parameters are invalid.",
+            ))
+        }
+    } else {
+        // Error if no pipeline was selected.
+        Err(SeqError::new(
+            "Invalid run",
+            SeqErrorType::BadRequestError,
+            format!(
+                "No pipeline was selected for experiment {}, so it cannot be run.",
                 experiment_id
             ),
             "The requested run parameters are invalid.",

+ 2 - 1
backend/src/controller/routing.rs

@@ -8,7 +8,7 @@ use super::{
     experiment_controller::{
         create_experiment, delete_experiment, get_experiment, get_experiment_pipelines,
         list_experiment, patch_experiment_comment, patch_experiment_mail, patch_experiment_name,
-        patch_experiment_pipeline, post_experiment_pipeline_variable, post_execute_experiment, get_experiment_execution_status, post_experiment_execution_abort, post_experiment_execution_reset, get_experiment_pipeline_run,
+        patch_experiment_pipeline, post_experiment_pipeline_variable, post_execute_experiment, get_experiment_execution_status, post_experiment_execution_abort, post_experiment_execution_reset, get_experiment_pipeline_run, post_execute_experiment_step,
     },
     file_controller::{delete_files_by_path, get_files, post_add_file, post_add_folder},
     global_data_controller::{
@@ -55,6 +55,7 @@ pub fn routing_config(cfg: &mut ServiceConfig) {
     .route("/api/experiments/{id}/name", web::patch().to(patch_experiment_name))
     .route("/api/experiments/{id}/pipeline", web::patch().to(patch_experiment_pipeline))
     .route("/api/experiments/{id}/pipelines", web::get().to(get_experiment_pipelines))
+    .route("/api/experiments/{id}/rerun", web::post().to(post_execute_experiment_step))
     .route("/api/experiments/{id}/reset", web::post().to(post_experiment_execution_reset))
     .route("/api/experiments/{id}/run", web::get().to(get_experiment_pipeline_run))
     .route("/api/experiments/{id}/status", web::get().to(get_experiment_execution_status))

+ 10 - 8
backend/src/service/container_service.rs

@@ -137,7 +137,8 @@ pub fn run_pipeline_step<T: AsRef<str>>(
     let mut mount_map_top = serde_json::Map::new();
     mount_map_top.insert("input".to_string(), serde_json::Value::String("/input/base".to_string()));
     mount_map_top.insert("output".to_string(), serde_json::Value::String("/output".to_string()));
-    mount_map_top.insert("dependencies".to_string(), serde_json::Value::Object(mount_map_dependencies));
+    mount_map_top
+        .insert("dependencies".to_string(), serde_json::Value::Object(mount_map_dependencies));
     mount_map_top.insert("globals".to_string(), serde_json::Value::Object(mount_map_globals));
     let mount_paths = serde_json::Value::Object(mount_map_top);
     arguments.push("--env".into());
@@ -405,16 +406,17 @@ impl ContainerHandler {
     /// * `build` - ```true``` if the build output is parsed, ```false``` if the run output is parsed
     fn parse_output(&self, output: Output, build: bool) -> Result<(), SeqError> {
         if let Some(step) = &self.executed_step {
-            let mut log_path = self
+            let logs_path = self
                 .config
                 .experiment_logs_path(step.experiment_id.to_string());
             let process_type = if build { "build" } else { "run" };
-            std::fs::create_dir_all(&log_path)?;
-            log_path.push(format!(
-                "{}_{}.log",
-                format_container_name(&step.pipeline_id, &step.pipeline_step_id),
-                &process_type
-            ));
+            std::fs::create_dir_all(&logs_path)?;
+            let log_path = self.config.experiment_log_path(
+                step.experiment_id.to_string(),
+                &step.pipeline_id,
+                &step.pipeline_step_id,
+                &process_type,
+            );
             let log_file = std::fs::OpenOptions::new()
                 .create(true)
                 .write(true)