|
@@ -26,6 +26,7 @@ use crate::{
|
|
|
},
|
|
|
};
|
|
|
use actix_web::{web, HttpResponse};
|
|
|
+use chrono::NaiveDateTime;
|
|
|
use diesel::{ExpressionMethods, QueryDsl};
|
|
|
use parking_lot::Mutex;
|
|
|
|
|
@@ -94,6 +95,11 @@ pub async fn get_experiment_execution_status(
|
|
|
let execution_steps = ExperimentExecution::get_by_experiment(id, &mut connection)?;
|
|
|
let result = if execution_steps.is_empty() {
|
|
|
"None".to_string()
|
|
|
+ } else if execution_steps
|
|
|
+ .iter()
|
|
|
+ .any(|execution| execution.execution_status == ExecutionStatus::Running.to_string())
|
|
|
+ {
|
|
|
+ ExecutionStatus::Running.to_string()
|
|
|
} else if execution_steps
|
|
|
.iter()
|
|
|
.any(|execution| execution.execution_status == ExecutionStatus::Failed.to_string())
|
|
@@ -104,11 +110,6 @@ pub async fn get_experiment_execution_status(
|
|
|
.any(|execution| execution.execution_status == ExecutionStatus::Aborted.to_string())
|
|
|
{
|
|
|
ExecutionStatus::Aborted.to_string()
|
|
|
- } else if execution_steps
|
|
|
- .iter()
|
|
|
- .any(|execution| execution.execution_status == ExecutionStatus::Running.to_string())
|
|
|
- {
|
|
|
- ExecutionStatus::Running.to_string()
|
|
|
} else if execution_steps
|
|
|
.iter()
|
|
|
.all(|execution| execution.execution_status == ExecutionStatus::Finished.to_string())
|
|
@@ -260,9 +261,7 @@ pub async fn get_experiment_pipeline_run(
|
|
|
ExperimentExecution::get_by_experiment(experiment_id, &mut connection)?
|
|
|
.into_iter()
|
|
|
.filter(|execution| &execution.pipeline_id == &pipeline_id)
|
|
|
- .map(|execution| {
|
|
|
- (execution.pipeline_step_id, execution.execution_status)
|
|
|
- })
|
|
|
+ .map(|execution| (execution.pipeline_step_id, execution.execution_status))
|
|
|
.collect();
|
|
|
Some(ExperimentPipelineBlueprint::from_internal(pipeline.pipeline(), values, stati))
|
|
|
} else {
|
|
@@ -346,6 +345,7 @@ pub async fn post_execute_experiment(
|
|
|
) -> Result<HttpResponse, SeqError> {
|
|
|
let experiment_id: i32 = experiment_id.into_inner();
|
|
|
let mut connection = database_manager.database_connection()?;
|
|
|
+ Experiment::exists_err(experiment_id, &mut connection)?;
|
|
|
// Error if the experiment was already submitted for execution.
|
|
|
if ExperimentExecution::has_experiment_execution_entries(experiment_id, &mut connection)? {
|
|
|
return Err(SeqError::new(
|
|
@@ -418,7 +418,172 @@ pub async fn post_execute_experiment(
|
|
|
"Invalid run",
|
|
|
SeqErrorType::BadRequestError,
|
|
|
format!(
|
|
|
- "No pipeline was selected for experiment {}, so i cannot be run.",
|
|
|
+ "No pipeline was selected for experiment {}, so it cannot be run.",
|
|
|
+ experiment_id
|
|
|
+ ),
|
|
|
+ "The requested run parameters are invalid.",
|
|
|
+ ))
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+pub async fn post_execute_experiment_step(
|
|
|
+ database_manager: web::Data<DatabaseManager>,
|
|
|
+ app_config: web::Data<Configuration>,
|
|
|
+ experiment_id: web::Path<i32>,
|
|
|
+ step_id: web::Json<String>,
|
|
|
+ pipelines: web::Data<LoadedPipelines>,
|
|
|
+) -> Result<HttpResponse, SeqError> {
|
|
|
+ let experiment_id: i32 = experiment_id.into_inner();
|
|
|
+ let step_id: String = step_id.into_inner();
|
|
|
+ let mut connection = database_manager.database_connection()?;
|
|
|
+ // Error if the experiment does not exist.
|
|
|
+ Experiment::exists_err(experiment_id, &mut connection)?;
|
|
|
+ if let Some(pipeline_id) = Experiment::get(experiment_id, &mut connection)?.pipeline_id {
|
|
|
+ if let Some(pipeline) = pipelines.get(&pipeline_id) {
|
|
|
+ let pipeline = pipeline.pipeline();
|
|
|
+ // Checks if the step exists within the currently selected pipeline
|
|
|
+ if let Some(step) = pipeline.steps().iter().find(|step| step.id() == &step_id) {
|
|
|
+ // Checks if the dependencies are satisfied.
|
|
|
+ let executions: Vec<ExperimentExecution> =
|
|
|
+ ExperimentExecution::get_by_experiment(experiment_id, &mut connection)?
|
|
|
+ .into_iter()
|
|
|
+ .filter(|s| &s.pipeline_id == &pipeline_id)
|
|
|
+ .collect();
|
|
|
+ let satisfied_dependencies: Vec<&String> = executions
|
|
|
+ .iter()
|
|
|
+ .filter(|s| s.execution_status == ExecutionStatus::Finished.to_string())
|
|
|
+ .map(|s| &s.pipeline_step_id)
|
|
|
+ .collect();
|
|
|
+ let are_dependencies_satisfied: bool = step
|
|
|
+ .dependencies()
|
|
|
+ .iter()
|
|
|
+ .all(|dependency| satisfied_dependencies.contains(&dependency));
|
|
|
+ if !are_dependencies_satisfied {
|
|
|
+ return Err(SeqError::new(
|
|
|
+ "Invalid run",
|
|
|
+ SeqErrorType::BadRequestError,
|
|
|
+ format!("The experiment {} is missing dependencies for execution of pipeline {} step {}.\nRequired dependencies: {:?}\nSatisfied dependencies: {:?}", experiment_id, pipeline.id(), step.id(), step.dependencies(), satisfied_dependencies),
|
|
|
+ "The requested run parameters are invalid.",
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ // Checks if the required variables are set.
|
|
|
+ let experiment_variables =
|
|
|
+ PipelineStepVariable::get_values_by_experiment_and_pipeline(
|
|
|
+ experiment_id,
|
|
|
+ &pipeline_id,
|
|
|
+ &mut connection,
|
|
|
+ )?;
|
|
|
+ for variable in step.variables() {
|
|
|
+ if variable.required().unwrap_or(false) {
|
|
|
+ // Error if required variables are not set.
|
|
|
+ if !experiment_variables.contains_key(&format!(
|
|
|
+ "{}{}",
|
|
|
+ step.id(),
|
|
|
+ variable.id()
|
|
|
+ )) {
|
|
|
+ return Err(SeqError::new(
|
|
|
+ "Invalid run",
|
|
|
+ SeqErrorType::BadRequestError,
|
|
|
+ format!("The experiment {} is missing the required variable with pipeline id {} step id {} and variable id {}.", experiment_id, pipeline.id(), step.id(), variable.id()),
|
|
|
+ "The requested run parameters are invalid.",
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Submit execution step.
|
|
|
+ if let Some(existing_execution) =
|
|
|
+ executions.iter().find(|s| s.pipeline_step_id == step_id)
|
|
|
+ {
|
|
|
+ // Restart an existing pipeline step.
|
|
|
+ if existing_execution.execution_status == ExecutionStatus::Running.to_string()
|
|
|
+ || existing_execution.execution_status
|
|
|
+ == ExecutionStatus::Waiting.to_string()
|
|
|
+ {
|
|
|
+ // Error if the step is currently scheduled for execution.
|
|
|
+ return Err(SeqError::new(
|
|
|
+ "Invalid run",
|
|
|
+ SeqErrorType::BadRequestError,
|
|
|
+ format!("The experiment {} pipeline {} step {} is already scheduled for execution and can thus not be restarted.", experiment_id, pipeline.id(), step.id()),
|
|
|
+ "The requested run parameters are invalid.",
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ // Remove resources related to the pipeline step.
|
|
|
+ let step_path =
|
|
|
+ app_config.experiment_step_path(experiment_id.to_string(), &step_id);
|
|
|
+ if step_path.exists() {
|
|
|
+ std::fs::remove_dir_all(step_path)?;
|
|
|
+ }
|
|
|
+ for log_path in app_config.experiment_log_paths_all(
|
|
|
+ experiment_id.to_string(),
|
|
|
+ &pipeline_id,
|
|
|
+ &step_id,
|
|
|
+ ) {
|
|
|
+ if log_path.exists() {
|
|
|
+ std::fs::remove_file(log_path)?;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ connection.immediate_transaction(|connection| {
|
|
|
+ let clear_time: Option<NaiveDateTime> = None;
|
|
|
+ diesel::update(
|
|
|
+ crate::schema::experiment_execution::table.find(existing_execution.id),
|
|
|
+ )
|
|
|
+ .set((
|
|
|
+ crate::schema::experiment_execution::execution_status
|
|
|
+ .eq(ExecutionStatus::Waiting.to_string()),
|
|
|
+ crate::schema::experiment_execution::start_time.eq(clear_time.clone()),
|
|
|
+ crate::schema::experiment_execution::end_time.eq(clear_time),
|
|
|
+ ))
|
|
|
+ .execute(connection)
|
|
|
+ })?;
|
|
|
+ } else {
|
|
|
+ // Create a newly added pipeline step.
|
|
|
+ let execution_step: NewExperimentExecution =
|
|
|
+ NewExperimentExecution::new(experiment_id, pipeline.id(), step.id());
|
|
|
+
|
|
|
+ connection.immediate_transaction(|connection| {
|
|
|
+ diesel::insert_into(crate::schema::experiment_execution::table)
|
|
|
+ .values(&execution_step)
|
|
|
+ .execute(connection)
|
|
|
+ })?;
|
|
|
+ }
|
|
|
+ log::info!(
|
|
|
+ "Submitted experiment {} with pipeline {} step {} for execution.",
|
|
|
+ experiment_id,
|
|
|
+ pipeline.id(),
|
|
|
+ step_id
|
|
|
+ );
|
|
|
+ Ok(HttpResponse::Ok().finish())
|
|
|
+ } else {
|
|
|
+ // Error the pipeline does not contain the step.
|
|
|
+ Err(SeqError::new(
|
|
|
+ "Invalid run",
|
|
|
+ SeqErrorType::BadRequestError,
|
|
|
+ format!(
|
|
|
+ "The selected pipeline {} for experiment {} is does not contain step {}.",
|
|
|
+ pipeline_id, experiment_id, step_id
|
|
|
+ ),
|
|
|
+ "The requested run parameters are invalid.",
|
|
|
+ ))
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Error if the pipeline is not loaded.
|
|
|
+ Err(SeqError::new(
|
|
|
+ "Invalid run",
|
|
|
+ SeqErrorType::BadRequestError,
|
|
|
+ format!(
|
|
|
+ "The selected pipeline {} for experiment {} is not loaded.",
|
|
|
+ pipeline_id, experiment_id
|
|
|
+ ),
|
|
|
+ "The requested run parameters are invalid.",
|
|
|
+ ))
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Error if no pipeline was selected.
|
|
|
+ Err(SeqError::new(
|
|
|
+ "Invalid run",
|
|
|
+ SeqErrorType::BadRequestError,
|
|
|
+ format!(
|
|
|
+ "No pipeline was selected for experiment {}, so it cannot be run.",
|
|
|
experiment_id
|
|
|
),
|
|
|
"The requested run parameters are invalid.",
|