|
@@ -3,7 +3,7 @@ use std::{
|
|
|
ffi::OsString,
|
|
|
io::{BufWriter, Write},
|
|
|
path::Path,
|
|
|
- process::{Child, Command, Output, Stdio},
|
|
|
+ process::{Child, Command, Output},
|
|
|
};
|
|
|
|
|
|
use actix_web::web;
|
|
@@ -12,7 +12,7 @@ use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
|
|
|
|
|
|
use crate::{
|
|
|
application::{
|
|
|
- config::Configuration,
|
|
|
+ config::{Configuration, LogOutputType, LogProcessType},
|
|
|
database::DatabaseManager,
|
|
|
error::{SeqError, SeqErrorType},
|
|
|
},
|
|
@@ -35,27 +35,65 @@ const CONTAINER_ENV_MOUNT: &str = "MOUNT_PATHS";
|
|
|
/// # Parameters
|
|
|
///
|
|
|
/// * `step` - the [`PipelineStepBlueprint`] to build the container for
|
|
|
+/// * `pipeline_id` - the ID of the containing [`PipelineBlueprint`]
|
|
|
/// * `context` - the context directory contianing the pipeline
|
|
|
+/// * `experiment_id` - the ID of the experiment
|
|
|
+/// * `app_cofig` - the app [`Configuration`]
|
|
|
pub fn build_pipeline_step<P: AsRef<Path>, T: AsRef<str>>(
|
|
|
step: &PipelineStepBlueprint,
|
|
|
pipeline_id: T,
|
|
|
context: P,
|
|
|
+ experiment_id: i32,
|
|
|
+ app_config: web::Data<Configuration>,
|
|
|
) -> Result<Child, SeqError> {
|
|
|
let mut pipeline_step_path = context.as_ref().to_path_buf();
|
|
|
pipeline_step_path.push("container");
|
|
|
pipeline_step_path.push(step.container());
|
|
|
let build_arg: OsString = "build".into();
|
|
|
let name_spec: OsString = "-t".into();
|
|
|
- let name_arg: OsString = format_container_name(pipeline_id, step.id()).into();
|
|
|
+ let name_arg: OsString = format_container_name(&pipeline_id, step.id()).into();
|
|
|
+ let progress_arg: OsString = "--progress=plain".into();
|
|
|
+
|
|
|
+ // Create log directory.
|
|
|
+ let logs_path = app_config.experiment_logs_path(experiment_id.to_string());
|
|
|
+ std::fs::create_dir_all(&logs_path)?;
|
|
|
+ // Open stdout log file.
|
|
|
+ let log_path_stdout = app_config.experiment_log_path(
|
|
|
+ experiment_id.to_string(),
|
|
|
+ &pipeline_id,
|
|
|
+ step.id(),
|
|
|
+ LogProcessType::Build,
|
|
|
+ LogOutputType::StdOut,
|
|
|
+ );
|
|
|
+ let log_file_stdout = std::fs::OpenOptions::new()
|
|
|
+ .create(true)
|
|
|
+ .write(true)
|
|
|
+ .append(false)
|
|
|
+ .truncate(true)
|
|
|
+ .open(log_path_stdout)?;
|
|
|
+ // Open stderr log file.
|
|
|
+ let log_path_stderr = app_config.experiment_log_path(
|
|
|
+ experiment_id.to_string(),
|
|
|
+ &pipeline_id,
|
|
|
+ step.id(),
|
|
|
+ LogProcessType::Build,
|
|
|
+ LogOutputType::StdErr,
|
|
|
+ );
|
|
|
+ let log_file_stderr = std::fs::OpenOptions::new()
|
|
|
+ .create(true)
|
|
|
+ .write(true)
|
|
|
+ .append(false)
|
|
|
+ .truncate(true)
|
|
|
+ .open(log_path_stderr)?;
|
|
|
|
|
|
let child = Command::new("docker")
|
|
|
- .stdin(Stdio::piped())
|
|
|
- .stdout(Stdio::piped())
|
|
|
- .stderr(Stdio::piped())
|
|
|
+ .stdout(log_file_stdout)
|
|
|
+ .stderr(log_file_stderr)
|
|
|
.args([
|
|
|
build_arg.as_os_str(),
|
|
|
name_spec.as_os_str(),
|
|
|
name_arg.as_os_str(),
|
|
|
+ progress_arg.as_os_str(),
|
|
|
pipeline_step_path.as_os_str(),
|
|
|
])
|
|
|
.spawn()?;
|
|
@@ -160,12 +198,43 @@ pub fn run_pipeline_step<T: AsRef<str>>(
|
|
|
});
|
|
|
|
|
|
// Set container to run.
|
|
|
- arguments.push(format_container_name(pipeline_id, step.id()).into());
|
|
|
+ arguments.push(format_container_name(&pipeline_id, step.id()).into());
|
|
|
+
|
|
|
+ // Create log directory.
|
|
|
+ let logs_path = app_config.experiment_logs_path(experiment_id.to_string());
|
|
|
+ std::fs::create_dir_all(&logs_path)?;
|
|
|
+ // Open stdout log file.
|
|
|
+ let log_path_stdout = app_config.experiment_log_path(
|
|
|
+ experiment_id.to_string(),
|
|
|
+ &pipeline_id,
|
|
|
+ step.id(),
|
|
|
+ LogProcessType::Run,
|
|
|
+ LogOutputType::StdOut,
|
|
|
+ );
|
|
|
+ let log_file_stdout = std::fs::OpenOptions::new()
|
|
|
+ .create(true)
|
|
|
+ .write(true)
|
|
|
+ .append(false)
|
|
|
+ .truncate(true)
|
|
|
+ .open(log_path_stdout)?;
|
|
|
+ // Open stderr log file.
|
|
|
+ let log_path_stderr = app_config.experiment_log_path(
|
|
|
+ experiment_id.to_string(),
|
|
|
+ &pipeline_id,
|
|
|
+ step.id(),
|
|
|
+ LogProcessType::Run,
|
|
|
+ LogOutputType::StdErr,
|
|
|
+ );
|
|
|
+ let log_file_stderr = std::fs::OpenOptions::new()
|
|
|
+ .create(true)
|
|
|
+ .write(true)
|
|
|
+ .append(false)
|
|
|
+ .truncate(true)
|
|
|
+ .open(log_path_stderr)?;
|
|
|
|
|
|
let output = Command::new("docker")
|
|
|
- .stdin(Stdio::piped())
|
|
|
- .stdout(Stdio::piped())
|
|
|
- .stderr(Stdio::piped())
|
|
|
+ .stdout(log_file_stdout)
|
|
|
+ .stderr(log_file_stderr)
|
|
|
.args(arguments)
|
|
|
.spawn()?;
|
|
|
Ok(output)
|
|
@@ -404,18 +473,18 @@ impl ContainerHandler {
|
|
|
///
|
|
|
/// * `output` - the output to parse and log
|
|
|
/// * `build` - ```true``` if the build output is parsed, ```false``` if the run output is parsed
|
|
|
- fn parse_output(&self, output: Output, build: bool) -> Result<(), SeqError> {
|
|
|
+ fn parse_output(&self, output: Output, process_type: LogProcessType) -> Result<(), SeqError> {
|
|
|
if let Some(step) = &self.executed_step {
|
|
|
let logs_path = self
|
|
|
.config
|
|
|
.experiment_logs_path(step.experiment_id.to_string());
|
|
|
- let process_type = if build { "build" } else { "run" };
|
|
|
std::fs::create_dir_all(&logs_path)?;
|
|
|
let log_path = self.config.experiment_log_path(
|
|
|
step.experiment_id.to_string(),
|
|
|
&step.pipeline_id,
|
|
|
&step.pipeline_step_id,
|
|
|
- &process_type,
|
|
|
+ process_type,
|
|
|
+ LogOutputType::ExitCode,
|
|
|
);
|
|
|
let log_file = std::fs::OpenOptions::new()
|
|
|
.create(true)
|
|
@@ -424,12 +493,12 @@ impl ContainerHandler {
|
|
|
.truncate(true)
|
|
|
.open(log_path)?;
|
|
|
let mut buffered_writer = BufWriter::new(log_file);
|
|
|
- buffered_writer.write_all("[[ STDOUT ]]\n".as_bytes())?;
|
|
|
- buffered_writer.write_all(&output.stdout)?;
|
|
|
- buffered_writer.write_all("\n\n[[ STDERR ]]\n".as_bytes())?;
|
|
|
- buffered_writer.write_all(&output.stderr)?;
|
|
|
- buffered_writer.write_all("\n\n[[ EXIT STATUS ]]\n".as_bytes())?;
|
|
|
- buffered_writer.write_all(output.status.to_string().as_bytes())?;
|
|
|
+ let exit_code = output
|
|
|
+ .status
|
|
|
+ .code()
|
|
|
+ .map(|code| code.to_string())
|
|
|
+ .unwrap_or("Terminated by signal".to_string());
|
|
|
+ buffered_writer.write_all(exit_code.as_bytes())?;
|
|
|
if output.status.success() {
|
|
|
Ok(())
|
|
|
} else {
|
|
@@ -476,7 +545,7 @@ impl ContainerHandler {
|
|
|
ProcessStatus::Finished => {
|
|
|
if let Some(run) = self.run_process.take() {
|
|
|
// Handle output.
|
|
|
- self.parse_output(run.wait_with_output()?, false)?;
|
|
|
+ self.parse_output(run.wait_with_output()?, LogProcessType::Run)?;
|
|
|
// Sets the status to finished.
|
|
|
let mut connection = self.database_manager.database_connection()?;
|
|
|
connection.immediate_transaction(|connection| {
|
|
@@ -508,7 +577,7 @@ impl ContainerHandler {
|
|
|
ProcessStatus::Finished => {
|
|
|
if let Some(build) = self.build_process.take() {
|
|
|
// Handle output.
|
|
|
- self.parse_output(build.wait_with_output()?, true)?;
|
|
|
+ self.parse_output(build.wait_with_output()?, LogProcessType::Build)?;
|
|
|
// Start the subsequent run process.
|
|
|
self.start_run_process()?;
|
|
|
Ok(false)
|
|
@@ -547,6 +616,8 @@ impl ContainerHandler {
|
|
|
step_blueprint,
|
|
|
&step.pipeline_id,
|
|
|
pipeline.context(),
|
|
|
+ step.experiment_id,
|
|
|
+ web::Data::clone(&self.config),
|
|
|
)?);
|
|
|
Ok(())
|
|
|
} else {
|