3 Commity f6ae00bf1d ... cbeac92b6d

Autor SHA1 Wiadomość Data
  at-robins cbeac92b6d [7] Implemented log display 1 rok temu
  at-robins e1a34d3cd5 [7] Added frontend step restarting 1 rok temu
  at-robins 47a53f1b90 [7] Added step restarting 1 rok temu

+ 134 - 1
backend/src/application/config.rs

@@ -19,7 +19,7 @@ pub const PATH_FILES_EXPERIMENTS_LOGS: &str = "logs";
 /// The folder where global data is stored.
 pub const PATH_FILES_GLOBAL_DATA: &str = "globals";
 
-use std::{hash::Hash, hash::Hasher, path::PathBuf, time::SystemTime};
+use std::{fmt::Display, hash::Hash, hash::Hasher, path::PathBuf, time::SystemTime};
 
 use getset::Getters;
 use serde::{Deserialize, Serialize};
@@ -214,6 +214,65 @@ impl Configuration {
         path
     }
 
+    /// The context path where a specific pipeline log file is stored.
+    ///
+    /// # Parameters
+    ///
+    /// * `experiment_id` - the ID of the experiment
+    /// * `pipeline_id` - the ID of the pipeline
+    /// * `step_id` - the ID of the pipeline step
+    /// * `process_type` - the type of process to log
+    /// * `output_type` - the output type of the process to log
+    pub fn experiment_log_path<P: AsRef<str>, Q: AsRef<str>, R: AsRef<str>>(
+        &self,
+        experiment_id: P,
+        pipeline_id: Q,
+        step_id: R,
+        process_type: LogProcessType,
+        output_type: LogOutputType,
+    ) -> PathBuf {
+        let mut path: PathBuf = self.experiment_logs_path(experiment_id);
+        path.push(format!(
+            "{}_{}_{}.log",
+            Self::hash_string(format!("{}{}", pipeline_id.as_ref(), step_id.as_ref())),
+            process_type,
+            output_type,
+        ));
+        path
+    }
+
+    /// All potential context paths of pipeline log files.
+    ///
+    /// # Parameters
+    ///
+    /// * `experiment_id` - the ID of the experiment
+    /// * `pipeline_id` - the ID of the pipeline
+    /// * `step_id` - the ID of the pipeline step
+    pub fn experiment_log_paths_all<P: AsRef<str>, Q: AsRef<str>, R: AsRef<str>>(
+        &self,
+        experiment_id: P,
+        pipeline_id: Q,
+        step_id: R,
+    ) -> Vec<PathBuf> {
+        let mut paths = Vec::new();
+        for process_type in &[LogProcessType::Build, LogProcessType::Run] {
+            for output_type in &[
+                LogOutputType::StdOut,
+                LogOutputType::StdErr,
+                LogOutputType::ExitCode,
+            ] {
+                paths.push(self.experiment_log_path(
+                    experiment_id.as_ref(),
+                    pipeline_id.as_ref(),
+                    step_id.as_ref(),
+                    *process_type,
+                    *output_type,
+                ));
+            }
+        }
+        paths
+    }
+
     /// Generates a V1 UUID.
     pub fn generate_uuid() -> Uuid {
         let now = SystemTime::now()
@@ -241,6 +300,53 @@ impl Configuration {
     }
 }
 
+#[derive(Debug, Clone, Copy)]
+/// The process types of log files.
+pub enum LogProcessType {
+    /// The build process.
+    Build,
+    // The run process.
+    Run,
+}
+
+impl Display for LogProcessType {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{}",
+            match self {
+                LogProcessType::Build => "build",
+                LogProcessType::Run => "run",
+            }
+        )
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+/// The output types of log files.
+pub enum LogOutputType {
+    /// Standard output stream.
+    StdOut,
+    /// Standard error stream.
+    StdErr,
+    /// The exit code.
+    ExitCode,
+}
+
+impl Display for LogOutputType {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(
+            f,
+            "{}",
+            match self {
+                LogOutputType::StdOut => "stdout",
+                LogOutputType::StdErr => "stderr",
+                LogOutputType::ExitCode => "exitcode",
+            }
+        )
+    }
+}
+
 #[cfg(test)]
 mod tests {
 
@@ -303,6 +409,33 @@ mod tests {
         assert_eq!(config.experiment_step_path("experiment_id", "step_id"), path);
     }
 
+    #[test]
+    fn test_experiment_logs_path() {
+        let config = Configuration::new("", "", "", "", "./application/context", "");
+        // Hash of step_id.
+        let path: PathBuf = "./application/context/experiments/experiment_id/logs".into();
+        assert_eq!(config.experiment_logs_path("experiment_id"), path);
+    }
+
+    #[test]
+    fn test_experiment_log_path() {
+        let config = Configuration::new("", "", "", "", "./application/context", "");
+        // Hash of step_id.
+        let path: PathBuf =
+            "./application/context/experiments/experiment_id/logs/13269802908832430007_build_stderr.log"
+                .into();
+        assert_eq!(
+            config.experiment_log_path(
+                "experiment_id",
+                "pipeline_id",
+                "step_id",
+                LogProcessType::Build,
+                LogOutputType::StdErr
+            ),
+            path
+        );
+    }
+
     #[test]
     fn test_hash_string() {
         let random_string = "39012rtuj132-0t1jp41-9/n\n\t@#$%^&*()|}{\"?>¡ªº£€˚„";

+ 3 - 2
backend/src/controller.rs

@@ -1,5 +1,6 @@
-pub mod global_data_controller;
+pub mod experiment_controller;
 pub mod file_controller;
+pub mod global_data_controller;
+pub mod log_controller;
 pub mod pipeline_controller;
 pub mod routing;
-pub mod experiment_controller;

+ 174 - 9
backend/src/controller/experiment_controller.rs

@@ -26,6 +26,7 @@ use crate::{
     },
 };
 use actix_web::{web, HttpResponse};
+use chrono::NaiveDateTime;
 use diesel::{ExpressionMethods, QueryDsl};
 use parking_lot::Mutex;
 
@@ -94,6 +95,11 @@ pub async fn get_experiment_execution_status(
     let execution_steps = ExperimentExecution::get_by_experiment(id, &mut connection)?;
     let result = if execution_steps.is_empty() {
         "None".to_string()
+    } else if execution_steps
+        .iter()
+        .any(|execution| execution.execution_status == ExecutionStatus::Running.to_string())
+    {
+        ExecutionStatus::Running.to_string()
     } else if execution_steps
         .iter()
         .any(|execution| execution.execution_status == ExecutionStatus::Failed.to_string())
@@ -104,11 +110,6 @@ pub async fn get_experiment_execution_status(
         .any(|execution| execution.execution_status == ExecutionStatus::Aborted.to_string())
     {
         ExecutionStatus::Aborted.to_string()
-    } else if execution_steps
-        .iter()
-        .any(|execution| execution.execution_status == ExecutionStatus::Running.to_string())
-    {
-        ExecutionStatus::Running.to_string()
     } else if execution_steps
         .iter()
         .all(|execution| execution.execution_status == ExecutionStatus::Finished.to_string())
@@ -260,9 +261,7 @@ pub async fn get_experiment_pipeline_run(
                 ExperimentExecution::get_by_experiment(experiment_id, &mut connection)?
                     .into_iter()
                     .filter(|execution| &execution.pipeline_id == &pipeline_id)
-                    .map(|execution| {
-                        (execution.pipeline_step_id, execution.execution_status)
-                    })
+                    .map(|execution| (execution.pipeline_step_id, execution.execution_status))
                     .collect();
             Some(ExperimentPipelineBlueprint::from_internal(pipeline.pipeline(), values, stati))
         } else {
@@ -346,6 +345,7 @@ pub async fn post_execute_experiment(
 ) -> Result<HttpResponse, SeqError> {
     let experiment_id: i32 = experiment_id.into_inner();
     let mut connection = database_manager.database_connection()?;
+    Experiment::exists_err(experiment_id, &mut connection)?;
     // Error if the experiment was already submitted for execution.
     if ExperimentExecution::has_experiment_execution_entries(experiment_id, &mut connection)? {
         return Err(SeqError::new(
@@ -418,7 +418,172 @@ pub async fn post_execute_experiment(
             "Invalid run",
             SeqErrorType::BadRequestError,
             format!(
-                "No pipeline was selected for experiment {}, so i cannot be run.",
+                "No pipeline was selected for experiment {}, so it cannot be run.",
+                experiment_id
+            ),
+            "The requested run parameters are invalid.",
+        ))
+    }
+}
+
+pub async fn post_execute_experiment_step(
+    database_manager: web::Data<DatabaseManager>,
+    app_config: web::Data<Configuration>,
+    experiment_id: web::Path<i32>,
+    step_id: web::Json<String>,
+    pipelines: web::Data<LoadedPipelines>,
+) -> Result<HttpResponse, SeqError> {
+    let experiment_id: i32 = experiment_id.into_inner();
+    let step_id: String = step_id.into_inner();
+    let mut connection = database_manager.database_connection()?;
+    // Error if the experiment does not exist.
+    Experiment::exists_err(experiment_id, &mut connection)?;
+    if let Some(pipeline_id) = Experiment::get(experiment_id, &mut connection)?.pipeline_id {
+        if let Some(pipeline) = pipelines.get(&pipeline_id) {
+            let pipeline = pipeline.pipeline();
+            // Checks if the step exists within the currently selected pipeline
+            if let Some(step) = pipeline.steps().iter().find(|step| step.id() == &step_id) {
+                // Checks if the dependencies are satisfied.
+                let executions: Vec<ExperimentExecution> =
+                    ExperimentExecution::get_by_experiment(experiment_id, &mut connection)?
+                        .into_iter()
+                        .filter(|s| &s.pipeline_id == &pipeline_id)
+                        .collect();
+                let satisfied_dependencies: Vec<&String> = executions
+                    .iter()
+                    .filter(|s| s.execution_status == ExecutionStatus::Finished.to_string())
+                    .map(|s| &s.pipeline_step_id)
+                    .collect();
+                let are_dependencies_satisfied: bool = step
+                    .dependencies()
+                    .iter()
+                    .all(|dependency| satisfied_dependencies.contains(&dependency));
+                if !are_dependencies_satisfied {
+                    return Err(SeqError::new(
+                        "Invalid run",
+                        SeqErrorType::BadRequestError,
+                        format!("The experiment {} is missing dependencies for execution of pipeline {} step {}.\nRequired dependencies: {:?}\nSatisfied dependencies: {:?}", experiment_id, pipeline.id(), step.id(), step.dependencies(), satisfied_dependencies),
+                        "The requested run parameters are invalid.",
+                    ));
+                }
+                // Checks if the required variables are set.
+                let experiment_variables =
+                    PipelineStepVariable::get_values_by_experiment_and_pipeline(
+                        experiment_id,
+                        &pipeline_id,
+                        &mut connection,
+                    )?;
+                for variable in step.variables() {
+                    if variable.required().unwrap_or(false) {
+                        // Error if required variables are not set.
+                        if !experiment_variables.contains_key(&format!(
+                            "{}{}",
+                            step.id(),
+                            variable.id()
+                        )) {
+                            return Err(SeqError::new(
+                                    "Invalid run",
+                                    SeqErrorType::BadRequestError,
+                                    format!("The experiment {} is missing the required variable with pipeline id {} step id {} and variable id {}.", experiment_id, pipeline.id(), step.id(), variable.id()),
+                                    "The requested run parameters are invalid.",
+                                ));
+                        }
+                    }
+                }
+                // Submit execution step.
+                if let Some(existing_execution) =
+                    executions.iter().find(|s| s.pipeline_step_id == step_id)
+                {
+                    // Restart an existing pipeline step.
+                    if existing_execution.execution_status == ExecutionStatus::Running.to_string()
+                        || existing_execution.execution_status
+                            == ExecutionStatus::Waiting.to_string()
+                    {
+                        // Error if the step is currently scheduled for execution.
+                        return Err(SeqError::new(
+                            "Invalid run",
+                            SeqErrorType::BadRequestError,
+                            format!("The experiment {} pipeline {} step {} is already scheduled for execution and can thus not be restarted.", experiment_id, pipeline.id(), step.id()),
+                            "The requested run parameters are invalid.",
+                        ));
+                    }
+                    // Remove resources related to the pipeline step.
+                    let step_path =
+                        app_config.experiment_step_path(experiment_id.to_string(), &step_id);
+                    if step_path.exists() {
+                        std::fs::remove_dir_all(step_path)?;
+                    }
+                    for log_path in app_config.experiment_log_paths_all(
+                        experiment_id.to_string(),
+                        &pipeline_id,
+                        &step_id,
+                    ) {
+                        if log_path.exists() {
+                            std::fs::remove_file(log_path)?;
+                        }
+                    }
+                    connection.immediate_transaction(|connection| {
+                        let clear_time: Option<NaiveDateTime> = None;
+                        diesel::update(
+                            crate::schema::experiment_execution::table.find(existing_execution.id),
+                        )
+                        .set((
+                            crate::schema::experiment_execution::execution_status
+                                .eq(ExecutionStatus::Waiting.to_string()),
+                            crate::schema::experiment_execution::start_time.eq(clear_time.clone()),
+                            crate::schema::experiment_execution::end_time.eq(clear_time),
+                        ))
+                        .execute(connection)
+                    })?;
+                } else {
+                    // Create a newly added pipeline step.
+                    let execution_step: NewExperimentExecution =
+                        NewExperimentExecution::new(experiment_id, pipeline.id(), step.id());
+
+                    connection.immediate_transaction(|connection| {
+                        diesel::insert_into(crate::schema::experiment_execution::table)
+                            .values(&execution_step)
+                            .execute(connection)
+                    })?;
+                }
+                log::info!(
+                    "Submitted experiment {} with pipeline {} step {} for execution.",
+                    experiment_id,
+                    pipeline.id(),
+                    step_id
+                );
+                Ok(HttpResponse::Ok().finish())
+            } else {
+                // Error the pipeline does not contain the step.
+                Err(SeqError::new(
+                    "Invalid run",
+                    SeqErrorType::BadRequestError,
+                    format!(
+                        "The selected pipeline {} for experiment {} is does not contain step {}.",
+                        pipeline_id, experiment_id, step_id
+                    ),
+                    "The requested run parameters are invalid.",
+                ))
+            }
+        } else {
+            // Error if the pipeline is not loaded.
+            Err(SeqError::new(
+                "Invalid run",
+                SeqErrorType::BadRequestError,
+                format!(
+                    "The selected pipeline {} for experiment {} is not loaded.",
+                    pipeline_id, experiment_id
+                ),
+                "The requested run parameters are invalid.",
+            ))
+        }
+    } else {
+        // Error if no pipeline was selected.
+        Err(SeqError::new(
+            "Invalid run",
+            SeqErrorType::BadRequestError,
+            format!(
+                "No pipeline was selected for experiment {}, so it cannot be run.",
                 experiment_id
             ),
             "The requested run parameters are invalid.",

+ 84 - 0
backend/src/controller/log_controller.rs

@@ -0,0 +1,84 @@
+use crate::{
+    application::{
+        config::{Configuration, LogOutputType, LogProcessType},
+        database::DatabaseManager,
+        error::SeqError,
+    },
+    model::{
+        db::experiment::Experiment,
+        exchange::experiment_step_logs::{
+            ExperimentStepLog, ExperimentStepLogRequest, ExperimentStepLogs,
+        },
+    },
+};
+use actix_web::web;
+
+pub async fn get_experiment_step_logs(
+    database_manager: web::Data<DatabaseManager>,
+    app_config: web::Data<Configuration>,
+    experiment_id: web::Path<i32>,
+    info: web::Json<ExperimentStepLogRequest>,
+) -> Result<web::Json<ExperimentStepLogs>, SeqError> {
+    let experiment_id: i32 = experiment_id.into_inner();
+    let mut connection = database_manager.database_connection()?;
+    Experiment::exists_err(experiment_id, &mut connection)?;
+
+    let info: ExperimentStepLogRequest = info.into_inner();
+    let log_reader = LogFileReader {
+        config: app_config,
+        experiment_id,
+        pipeline_id: info.pipeline_id,
+        step_id: info.step_id,
+    };
+
+    let build_logs = ExperimentStepLog {
+        stdout: log_reader.get(LogProcessType::Build, LogOutputType::StdOut)?,
+        stderr: log_reader.get(LogProcessType::Build, LogOutputType::StdErr)?,
+        exit_code: log_reader.get(LogProcessType::Build, LogOutputType::ExitCode)?,
+    };
+    let run_logs = ExperimentStepLog {
+        stdout: log_reader.get(LogProcessType::Run, LogOutputType::StdOut)?,
+        stderr: log_reader.get(LogProcessType::Run, LogOutputType::StdErr)?,
+        exit_code: log_reader.get(LogProcessType::Run, LogOutputType::ExitCode)?,
+    };
+    let logs = ExperimentStepLogs {
+        build: build_logs,
+        run: run_logs,
+    };
+    Ok(web::Json(logs))
+}
+
+/// A reader for log files.
+struct LogFileReader {
+    pub config: web::Data<Configuration>,
+    pub experiment_id: i32,
+    pub pipeline_id: String,
+    pub step_id: String,
+}
+
+impl LogFileReader {
+    /// Reads the respective log file to a [`String`] if it exists.
+    ///
+    /// # Parameters
+    ///
+    /// * `process_type` - the process type of the log file
+    /// * `output_type` - the output type of the log file
+    pub fn get(
+        &self,
+        process_type: LogProcessType,
+        output_type: LogOutputType,
+    ) -> Result<Option<String>, SeqError> {
+        let path = self.config.experiment_log_path(
+            self.experiment_id.to_string(),
+            &self.pipeline_id,
+            &self.step_id,
+            process_type,
+            output_type,
+        );
+        Ok(if !path.exists() {
+            None
+        } else {
+            Some(std::fs::read_to_string(path)?)
+        })
+    }
+}

+ 10 - 3
backend/src/controller/routing.rs

@@ -6,15 +6,19 @@ use crate::application::error::SeqError;
 
 use super::{
     experiment_controller::{
-        create_experiment, delete_experiment, get_experiment, get_experiment_pipelines,
-        list_experiment, patch_experiment_comment, patch_experiment_mail, patch_experiment_name,
-        patch_experiment_pipeline, post_experiment_pipeline_variable, post_execute_experiment, get_experiment_execution_status, post_experiment_execution_abort, post_experiment_execution_reset, get_experiment_pipeline_run,
+        create_experiment, delete_experiment, get_experiment, get_experiment_execution_status,
+        get_experiment_pipeline_run, get_experiment_pipelines, list_experiment,
+        patch_experiment_comment, patch_experiment_mail, patch_experiment_name,
+        patch_experiment_pipeline, post_execute_experiment, post_execute_experiment_step,
+        post_experiment_execution_abort, post_experiment_execution_reset,
+        post_experiment_pipeline_variable,
     },
     file_controller::{delete_files_by_path, get_files, post_add_file, post_add_folder},
     global_data_controller::{
         create_global_data, delete_global_data, get_global_data, list_global_data,
         patch_global_data_comment, patch_global_data_name,
     },
+    log_controller::get_experiment_step_logs,
     pipeline_controller::{
         get_pipeline_blueprint, get_pipeline_blueprints, get_pipeline_instance,
         patch_pipeline_blueprints,
@@ -51,10 +55,13 @@ pub fn routing_config(cfg: &mut ServiceConfig) {
     .route("/api/experiments/{id}", web::post().to(post_execute_experiment))
     .route("/api/experiments/{id}/abort", web::post().to(post_experiment_execution_abort))
     .route("/api/experiments/{id}/comment", web::patch().to(patch_experiment_comment))
+        // This method is only POST to support the JSON message body.
+    .route("/api/experiments/{id}/logs", web::post().to(get_experiment_step_logs))
     .route("/api/experiments/{id}/mail", web::patch().to(patch_experiment_mail))
     .route("/api/experiments/{id}/name", web::patch().to(patch_experiment_name))
     .route("/api/experiments/{id}/pipeline", web::patch().to(patch_experiment_pipeline))
     .route("/api/experiments/{id}/pipelines", web::get().to(get_experiment_pipelines))
+    .route("/api/experiments/{id}/rerun", web::post().to(post_execute_experiment_step))
     .route("/api/experiments/{id}/reset", web::post().to(post_experiment_execution_reset))
     .route("/api/experiments/{id}/run", web::get().to(get_experiment_pipeline_run))
     .route("/api/experiments/{id}/status", web::get().to(get_experiment_execution_status))

+ 1 - 0
backend/src/model/exchange.rs

@@ -1,5 +1,6 @@
 pub mod experiment_details;
 pub mod experiment_pipeline;
+pub mod experiment_step_logs;
 pub mod file_path;
 pub mod global_data_details;
 pub mod pipeline_step_details;

+ 34 - 0
backend/src/model/exchange/experiment_step_logs.rs

@@ -0,0 +1,34 @@
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+/// The full logs of an experiment step.
+pub struct ExperimentStepLogs {
+    /// The build logs.
+    pub build: ExperimentStepLog,
+    /// The run logs.
+    pub run: ExperimentStepLog,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+/// The logs of a specific process of an experiment step.
+pub struct ExperimentStepLog {
+    /// The stdout output.
+    pub stdout: Option<String>,
+    /// The stderr output.
+    pub stderr: Option<String>,
+    /// The process exit code.
+    pub exit_code: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize)]
+#[serde(rename_all = "camelCase")]
+/// Information required to request log file
+/// for a specific pipeline step.
+pub struct ExperimentStepLogRequest {
+    /// The ID of the pipeline.
+    pub pipeline_id: String,
+    /// The ID of the pipeline step.
+    pub step_id: String,
+}

+ 101 - 28
backend/src/service/container_service.rs

@@ -3,7 +3,7 @@ use std::{
     ffi::OsString,
     io::{BufWriter, Write},
     path::Path,
-    process::{Child, Command, Output, Stdio},
+    process::{Child, Command, Output},
 };
 
 use actix_web::web;
@@ -12,7 +12,7 @@ use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
 
 use crate::{
     application::{
-        config::Configuration,
+        config::{Configuration, LogOutputType, LogProcessType},
         database::DatabaseManager,
         error::{SeqError, SeqErrorType},
     },
@@ -35,27 +35,65 @@ const CONTAINER_ENV_MOUNT: &str = "MOUNT_PATHS";
 /// # Parameters
 ///
 /// * `step` - the [`PipelineStepBlueprint`] to build the container for
+/// * `pipeline_id` - the ID of the containing [`PipelineBlueprint`]
 /// * `context` - the context directory contianing the pipeline
+/// * `experiment_id` - the ID of the experiment
+/// * `app_cofig` - the app [`Configuration`]
 pub fn build_pipeline_step<P: AsRef<Path>, T: AsRef<str>>(
     step: &PipelineStepBlueprint,
     pipeline_id: T,
     context: P,
+    experiment_id: i32,
+    app_config: web::Data<Configuration>,
 ) -> Result<Child, SeqError> {
     let mut pipeline_step_path = context.as_ref().to_path_buf();
     pipeline_step_path.push("container");
     pipeline_step_path.push(step.container());
     let build_arg: OsString = "build".into();
     let name_spec: OsString = "-t".into();
-    let name_arg: OsString = format_container_name(pipeline_id, step.id()).into();
+    let name_arg: OsString = format_container_name(&pipeline_id, step.id()).into();
+    let progress_arg: OsString = "--progress=plain".into();
+
+    // Create log directory.
+    let logs_path = app_config.experiment_logs_path(experiment_id.to_string());
+    std::fs::create_dir_all(&logs_path)?;
+    // Open stdout log file.
+    let log_path_stdout = app_config.experiment_log_path(
+        experiment_id.to_string(),
+        &pipeline_id,
+        step.id(),
+        LogProcessType::Build,
+        LogOutputType::StdOut,
+    );
+    let log_file_stdout = std::fs::OpenOptions::new()
+        .create(true)
+        .write(true)
+        .append(false)
+        .truncate(true)
+        .open(log_path_stdout)?;
+    // Open stderr log file.
+    let log_path_stderr = app_config.experiment_log_path(
+        experiment_id.to_string(),
+        &pipeline_id,
+        step.id(),
+        LogProcessType::Build,
+        LogOutputType::StdErr,
+    );
+    let log_file_stderr = std::fs::OpenOptions::new()
+        .create(true)
+        .write(true)
+        .append(false)
+        .truncate(true)
+        .open(log_path_stderr)?;
 
     let child = Command::new("docker")
-        .stdin(Stdio::piped())
-        .stdout(Stdio::piped())
-        .stderr(Stdio::piped())
+        .stdout(log_file_stdout)
+        .stderr(log_file_stderr)
         .args([
             build_arg.as_os_str(),
             name_spec.as_os_str(),
             name_arg.as_os_str(),
+            progress_arg.as_os_str(),
             pipeline_step_path.as_os_str(),
         ])
         .spawn()?;
@@ -137,7 +175,8 @@ pub fn run_pipeline_step<T: AsRef<str>>(
     let mut mount_map_top = serde_json::Map::new();
     mount_map_top.insert("input".to_string(), serde_json::Value::String("/input/base".to_string()));
     mount_map_top.insert("output".to_string(), serde_json::Value::String("/output".to_string()));
-    mount_map_top.insert("dependencies".to_string(), serde_json::Value::Object(mount_map_dependencies));
+    mount_map_top
+        .insert("dependencies".to_string(), serde_json::Value::Object(mount_map_dependencies));
     mount_map_top.insert("globals".to_string(), serde_json::Value::Object(mount_map_globals));
     let mount_paths = serde_json::Value::Object(mount_map_top);
     arguments.push("--env".into());
@@ -159,12 +198,43 @@ pub fn run_pipeline_step<T: AsRef<str>>(
         });
 
     // Set container to run.
-    arguments.push(format_container_name(pipeline_id, step.id()).into());
+    arguments.push(format_container_name(&pipeline_id, step.id()).into());
+
+    // Create log directory.
+    let logs_path = app_config.experiment_logs_path(experiment_id.to_string());
+    std::fs::create_dir_all(&logs_path)?;
+    // Open stdout log file.
+    let log_path_stdout = app_config.experiment_log_path(
+        experiment_id.to_string(),
+        &pipeline_id,
+        step.id(),
+        LogProcessType::Run,
+        LogOutputType::StdOut,
+    );
+    let log_file_stdout = std::fs::OpenOptions::new()
+        .create(true)
+        .write(true)
+        .append(false)
+        .truncate(true)
+        .open(log_path_stdout)?;
+    // Open stderr log file.
+    let log_path_stderr = app_config.experiment_log_path(
+        experiment_id.to_string(),
+        &pipeline_id,
+        step.id(),
+        LogProcessType::Run,
+        LogOutputType::StdErr,
+    );
+    let log_file_stderr = std::fs::OpenOptions::new()
+        .create(true)
+        .write(true)
+        .append(false)
+        .truncate(true)
+        .open(log_path_stderr)?;
 
     let output = Command::new("docker")
-        .stdin(Stdio::piped())
-        .stdout(Stdio::piped())
-        .stderr(Stdio::piped())
+        .stdout(log_file_stdout)
+        .stderr(log_file_stderr)
         .args(arguments)
         .spawn()?;
     Ok(output)
@@ -403,18 +473,19 @@ impl ContainerHandler {
     ///
     /// * `output` - the output to parse and log
     /// * `build` - ```true``` if the build output is parsed, ```false``` if the run output is parsed
-    fn parse_output(&self, output: Output, build: bool) -> Result<(), SeqError> {
+    fn parse_output(&self, output: Output, process_type: LogProcessType) -> Result<(), SeqError> {
         if let Some(step) = &self.executed_step {
-            let mut log_path = self
+            let logs_path = self
                 .config
                 .experiment_logs_path(step.experiment_id.to_string());
-            let process_type = if build { "build" } else { "run" };
-            std::fs::create_dir_all(&log_path)?;
-            log_path.push(format!(
-                "{}_{}.log",
-                format_container_name(&step.pipeline_id, &step.pipeline_step_id),
-                &process_type
-            ));
+            std::fs::create_dir_all(&logs_path)?;
+            let log_path = self.config.experiment_log_path(
+                step.experiment_id.to_string(),
+                &step.pipeline_id,
+                &step.pipeline_step_id,
+                process_type,
+                LogOutputType::ExitCode,
+            );
             let log_file = std::fs::OpenOptions::new()
                 .create(true)
                 .write(true)
@@ -422,12 +493,12 @@ impl ContainerHandler {
                 .truncate(true)
                 .open(log_path)?;
             let mut buffered_writer = BufWriter::new(log_file);
-            buffered_writer.write_all("[[ STDOUT ]]\n".as_bytes())?;
-            buffered_writer.write_all(&output.stdout)?;
-            buffered_writer.write_all("\n\n[[ STDERR ]]\n".as_bytes())?;
-            buffered_writer.write_all(&output.stderr)?;
-            buffered_writer.write_all("\n\n[[ EXIT STATUS ]]\n".as_bytes())?;
-            buffered_writer.write_all(output.status.to_string().as_bytes())?;
+            let exit_code = output
+                .status
+                .code()
+                .map(|code| code.to_string())
+                .unwrap_or("Terminated by signal".to_string());
+            buffered_writer.write_all(exit_code.as_bytes())?;
             if output.status.success() {
                 Ok(())
             } else {
@@ -474,7 +545,7 @@ impl ContainerHandler {
             ProcessStatus::Finished => {
                 if let Some(run) = self.run_process.take() {
                     // Handle output.
-                    self.parse_output(run.wait_with_output()?, false)?;
+                    self.parse_output(run.wait_with_output()?, LogProcessType::Run)?;
                     // Sets the status to finished.
                     let mut connection = self.database_manager.database_connection()?;
                     connection.immediate_transaction(|connection| {
@@ -506,7 +577,7 @@ impl ContainerHandler {
                 ProcessStatus::Finished => {
                     if let Some(build) = self.build_process.take() {
                         // Handle output.
-                        self.parse_output(build.wait_with_output()?, true)?;
+                        self.parse_output(build.wait_with_output()?, LogProcessType::Build)?;
                         // Start the subsequent run process.
                         self.start_run_process()?;
                         Ok(false)
@@ -545,6 +616,8 @@ impl ContainerHandler {
                     step_blueprint,
                     &step.pipeline_id,
                     pipeline.context(),
+                    step.experiment_id,
+                    web::Data::clone(&self.config),
                 )?);
                 Ok(())
             } else {

+ 116 - 6
frontend/src/components/experiment/ExperimentRunDetails.vue

@@ -2,7 +2,7 @@
   <div class="q-pa-md q-gutter-md">
     <q-card>
       <q-card-section>
-        <div class="text-h6">Experiment {{ id }}</div>
+        <div class="text-h6">Experiment: {{ experiment_name }}</div>
       </q-card-section>
       <div class="q-pa-md gutter-md no-wrap row" v-if="!loadingError">
         <div
@@ -77,9 +77,50 @@
         </div>
         <div v-else class="text-h6">{{ selectedStep.name }}</div>
       </q-card-section>
-      <div v-if="selectedStep !== null" class="q-gutter-md q-pa-md col">
-        <q-btn label="Display logs" class="row" />
+      <q-card-section>
+        <div v-if="selectedStep" class="q-pl-md">
+          <div v-html="selectedStep.description" />
+        </div>
+      </q-card-section>
+      <q-card-section v-if="selectedStep && pipeline">
+        <q-expansion-item
+          expand-separator
+          :icon="symOutlinedTerminal"
+          label="Display pipeline step logs"
+          class="shadow-1 overflow-hidden"
+          header-class="bg-secondary text-white"
+          style="border-radius: 3px"
+        >
+          <q-card>
+            <q-card-section>
+              <experiment-step-logs
+                :experiment-id="id"
+                :pipeline-id="pipeline.id"
+                :step-id="selectedStep.id"
+              />
+            </q-card-section>
+          </q-card>
+        </q-expansion-item>
+      </q-card-section>
+      <div v-if="selectedStep" class="q-gutter-md q-pa-md col">
         <q-btn label="Download output" class="row" />
+
+        <q-btn
+          v-if="canBeStarted(selectedStep)"
+          class="row"
+          :icon="matRestartAlt"
+          label="Restart step"
+          :color="restartingError ? 'negative' : 'positive'"
+          :loading="isRestarting"
+          @click="restartStep(selectedStep)"
+        >
+          <q-tooltip>
+            <div v-if="restartingError">
+              <error-popup :error-response="restartingError" />
+            </div>
+            <div>Restarts the experiment execution step.</div>
+          </q-tooltip>
+        </q-btn>
       </div>
     </q-card>
     <q-dialog v-model="showPollingError" v-if="pollingError">
@@ -89,9 +130,9 @@
 </template>
 
 <script setup lang="ts">
-import { type ErrorResponse } from "@/scripts/types";
+import { type ErrorResponse, type ExperimentDetails } from "@/scripts/types";
 import axios from "axios";
-import { ref, onMounted, type Ref } from "vue";
+import { ref, onMounted, type Ref, computed } from "vue";
 import ErrorPopup from "@/components/ErrorPopup.vue";
 import { onBeforeRouteLeave, useRouter } from "vue-router";
 import {
@@ -104,17 +145,23 @@ import {
   symOutlinedError,
   symOutlinedNotStarted,
   symOutlinedStopCircle,
+  symOutlinedTerminal,
 } from "@quasar/extras/material-symbols-outlined";
+import { matRestartAlt } from "@quasar/extras/material-icons";
+import ExperimentStepLogs from "./ExperimentStepLogs.vue";
 
 // The intervall in which pipeline updates are requested from the server.
 const POLLING_INTERVALL_MILLISECONDS = 10000;
 
+const experiment: Ref<ExperimentDetails | null> = ref(null);
 const pipeline: Ref<PipelineBlueprint | null> = ref(null);
 const sortedSteps: Ref<PipelineStepBlueprint[][]> = ref([]);
 const isLoadingPipelineDetails = ref(false);
+const isRestarting = ref(false);
 const loadingError: Ref<ErrorResponse | null> = ref(null);
 const isPollingPipelineDetails = ref(false);
 const pollingError: Ref<ErrorResponse | null> = ref(null);
+const restartingError: Ref<ErrorResponse | null> = ref(null);
 const selectedStep: Ref<PipelineStepBlueprint | null> = ref(null);
 const showPollingError = ref(false);
 const router = useRouter();
@@ -125,6 +172,10 @@ const props = defineProps({
   id: { type: String, required: true },
 });
 
+const experiment_name = computed(() => {
+  return experiment.value ? experiment.value.name : props.id;
+});
+
 onMounted(() => {
   loadPipelineDetails();
 });
@@ -142,7 +193,11 @@ function loadPipelineDetails() {
   isLoadingPipelineDetails.value = true;
   loadingError.value = null;
   axios
-    .get("/api/experiments/" + props.id + "/run")
+    .get("/api/experiments/" + props.id)
+    .then((response) => {
+      experiment.value = response.data;
+      return axios.get("/api/experiments/" + props.id + "/run");
+    })
     .then((response) => {
       setPipelineDetails(response.data);
       pollingTimer.value = window.setTimeout(
@@ -263,6 +318,61 @@ function selectStep(step: PipelineStepBlueprint) {
     selectedStep.value = step;
   }
 }
+
+/**
+ * Returns ```true``` if the specified step can be (re-)started.
+ *
+ * @param step the step to check
+ */
+function canBeStarted(step: PipelineStepBlueprint | null): boolean {
+  if (!pipeline.value) {
+    return false;
+  }
+  if (!step) {
+    return false;
+  }
+  const satisfied_dependencies = pipeline.value.steps
+    .filter((s) => s.status === PipelineStepStatus.Finished)
+    .map((s) => s.id);
+  const isDependecySatisfied = step.dependencies.every((dependency) =>
+    satisfied_dependencies.includes(dependency)
+  );
+  return (
+    step.status !== PipelineStepStatus.Running &&
+    step.status !== PipelineStepStatus.Waiting &&
+    isDependecySatisfied
+  );
+}
+
+/**
+ * Tries to restart the specified step.
+ *
+ * @param step the step to restart
+ */
+function restartStep(step: PipelineStepBlueprint | null) {
+  if (step && !isRestarting.value) {
+    isRestarting.value = true;
+    restartingError.value = null;
+    const config = {
+      headers: {
+        "content-type": "application/json",
+      },
+    };
+    axios
+      .post(
+        "/api/experiments/" + props.id + "/rerun",
+        JSON.stringify(step.id),
+        config
+      )
+      .then(() => (step.status = PipelineStepStatus.Waiting))
+      .catch((error) => {
+        restartingError.value = error.response.data;
+      })
+      .finally(() => {
+        isRestarting.value = false;
+      });
+  }
+}
 </script>
 <style scoped lang="scss">
 .chip-unselected {

+ 128 - 0
frontend/src/components/experiment/ExperimentStepLogs.vue

@@ -0,0 +1,128 @@
+<template>
+  <div class="no-wrap">
+    <q-tabs v-model="tab" narrow-indicator dense inline-label align="justify">
+      <q-tab
+        class="text-purple"
+        name="build"
+        :icon="symOutlinedBuildCircle"
+        label="Container build process"
+      />
+      <q-tab
+        class="text-orange"
+        name="run"
+        :icon="symOutlinedRunCircle"
+        label="Step run process"
+      />
+    </q-tabs>
+
+    <q-tab-panels v-model="tab" animated>
+      <q-tab-panel name="build">
+        <split-log-display v-if="logs" :log="logs.build" />
+      </q-tab-panel>
+
+      <q-tab-panel name="run">
+        <split-log-display v-if="logs" :log="logs.run" />
+      </q-tab-panel>
+    </q-tab-panels>
+  </div>
+</template>
+
+<script setup lang="ts">
+import { type ErrorResponse, type ExperimentStepLogs } from "@/scripts/types";
+import SplitLogDisplay from "@/components/shared/SplitLogDisplay.vue";
+import { onBeforeRouteLeave, useRouter } from "vue-router";
+import { ref, watch, type Ref, onBeforeUnmount } from "vue";
+import axios from "axios";
+import {
+  symOutlinedBuildCircle,
+  symOutlinedRunCircle,
+} from "@quasar/extras/material-symbols-outlined";
+
+// The intervall in which log updates are requested from the server.
+const POLLING_INTERVALL_MILLISECONDS = 10000;
+
+const isPolling = ref(false);
+const pollingError: Ref<ErrorResponse | null> = ref(null);
+const showPollingError = ref(false);
+const router = useRouter();
+const this_route = router.currentRoute.value.fullPath;
+const pollingTimer: Ref<number | null> = ref(null);
+const logs: Ref<ExperimentStepLogs | null> = ref(null);
+const tab = ref("build");
+
+const props = defineProps({
+  experimentId: { type: String, required: true },
+  pipelineId: { type: String, required: true },
+  stepId: { type: String, required: true },
+});
+
+watch(
+  () => props.stepId,
+  () => {
+    stopPolling();
+    logs.value = null;
+    pollLogChanges();
+  },
+  { immediate: true }
+);
+
+// Clears the timer if the route is changed.
+onBeforeRouteLeave(() => {
+  stopPolling();
+});
+
+onBeforeUnmount(() => {
+  stopPolling();
+});
+
+function stopPolling() {
+  if (pollingTimer.value !== null) {
+    clearTimeout(pollingTimer.value);
+    pollingTimer.value = null;
+  }
+}
+
+/**
+ * Conitinuesly polls changes from the server.
+ */
+function pollLogChanges() {
+  if (
+    !isPolling.value &&
+    !pollingError.value &&
+    // Stop polling if the route changes.
+    router.currentRoute.value.fullPath === this_route
+  ) {
+    isPolling.value = true;
+    pollingError.value = null;
+    const config = {
+      headers: {
+        "content-type": "application/json",
+      },
+    };
+    axios
+      .post(
+        "/api/experiments/" + props.experimentId + "/logs",
+        JSON.stringify({
+          pipelineId: props.pipelineId,
+          stepId: props.stepId,
+        }),
+        config
+      )
+      .then((response) => {
+        logs.value = response.data;
+        pollingTimer.value = window.setTimeout(
+          pollLogChanges,
+          POLLING_INTERVALL_MILLISECONDS
+        );
+      })
+      .catch((error) => {
+        showPollingError.value = true;
+        pollingError.value = error.response.data;
+      })
+      .finally(() => {
+        isPolling.value = false;
+      });
+  }
+}
+</script>
+<style scoped lang="scss"></style>

+ 30 - 0
frontend/src/components/shared/LogDisplay.vue

@@ -0,0 +1,30 @@
+<template>
+  <div>
+    <div class="text-h6 q-ma-md">{{ header }}</div>
+    <q-separator />
+    <div v-if="body === null || body === undefined" class="q-pa-md">
+      No logs are currently present.
+    </div>
+    <div
+      v-else
+      v-text="body"
+      class="q-pa-md"
+      style="
+        white-space: pre;
+        overflow: auto;
+        max-height: 80vh;
+        font-family: 'Courier New', Courier, monospace;
+      "
+    />
+  </div>
+</template>
+
+<script setup lang="ts">
+import type { PropType } from "vue";
+
+defineProps({
+  header: { type: String, required: true },
+  body: { type: String as PropType<string | null | undefined>, required: true },
+});
+</script>
+<style scoped lang="scss"></style>

+ 26 - 0
frontend/src/components/shared/SplitLogDisplay.vue

@@ -0,0 +1,26 @@
+<template>
+  <div style="width: 100%">
+    <q-splitter v-model="splitterModel" :limits="[0, 100]">
+      <template v-slot:before>
+        <log-display header="Standard output" :body="log.stdout" />
+      </template>
+
+      <template v-slot:after>
+        <log-display header="Standard error" :body="log.stderr" />
+      </template>
+    </q-splitter>
+  </div>
+</template>
+
+<script setup lang="ts">
+import { ref, type PropType, type Ref } from "vue";
+import LogDisplay from "./LogDisplay.vue";
+import type { ExperimentStepLog } from "@/scripts/types";
+
+defineProps({
+  log: { type: Object as PropType<ExperimentStepLog>, required: true },
+});
+
+const splitterModel: Ref<number> = ref(50);
+</script>
+<style scoped lang="scss"></style>

+ 2 - 2
frontend/src/router/index.ts

@@ -4,7 +4,7 @@ import GlobalDataView from "@/views/GlobalDataView.vue";
 import GlobalDataDetailsView from "@/views/GlobalDataDetailsView.vue";
 import ExperimentView from "@/views/ExperimentView.vue";
 import ExperimentDetailsView from "@/views/ExperimentDetailsView.vue";
-import ExperimentRunDetails from "@/components/experiment/ExperimentRunDetails.vue";
+import ExperimentRunDetailsView from "@/views/ExperimentRunDetailsView.vue";
 
 const router = createRouter({
   history: createWebHistory(import.meta.env.BASE_URL),
@@ -43,7 +43,7 @@ const router = createRouter({
     {
       path: "/ui/experiments/:id/run",
       name: "experiments_run_detail",
-      component: ExperimentRunDetails,
+      component: ExperimentRunDetailsView,
       props: true,
     },
   ],

+ 11 - 0
frontend/src/scripts/types.ts

@@ -55,3 +55,14 @@ export enum ExperimentExecutionStatus {
   Waiting = "Waiting",
   None = "None",
 }
+
+export type ExperimentStepLog = {
+  stdout: string | null | undefined;
+  stderr: string | null | undefined;
+  exitCode: string | null | undefined;
+};
+
+export type ExperimentStepLogs = {
+  build: ExperimentStepLog;
+  run: ExperimentStepLog;
+};

+ 15 - 0
frontend/src/views/ExperimentRunDetailsView.vue

@@ -0,0 +1,15 @@
+<script setup lang="ts">
+import ExperimentRunDetails from "@/components/experiment/ExperimentRunDetails.vue";
+defineProps({
+  id: {
+    type: String,
+    required: true,
+  },
+});
+</script>
+
+<template>
+  <main>
+    <experiment-run-details :id="id" />
+  </main>
+</template>