From ec29c9983ef8bb58243cd98dc209886a22071127 Mon Sep 17 00:00:00 2001 From: James Barford-Evans Date: Wed, 23 Apr 2025 14:02:04 +0100 Subject: [PATCH 1/4] Queries for inserting jobs & taking a job --- database/src/lib.rs | 117 ++++++++++++++++++ database/src/pool.rs | 10 +- database/src/pool/postgres.rs | 215 ++++++++++++++++++++++++++++++++- database/src/pool/sqlite.rs | 219 +++++++++++++++++++++++++++++++++- 4 files changed, 555 insertions(+), 6 deletions(-) diff --git a/database/src/lib.rs b/database/src/lib.rs index 618f0178f..17b2266ed 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -152,6 +152,16 @@ impl FromStr for CommitType { } } +impl ToString for CommitType { + fn to_string(&self) -> String { + match self { + CommitType::Try => "try", + CommitType::Master => "master", + } + .to_string() + } +} + #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct Commit { pub sha: String, @@ -791,3 +801,110 @@ pub struct ArtifactCollection { pub duration: Duration, pub end_time: DateTime, } + +#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] +pub enum CommitJobStatus { + Queued, + InProgress, + Finished, +} + +impl FromStr for CommitJobStatus { + type Err = String; + fn from_str(s: &str) -> Result { + Ok(match s.to_ascii_lowercase().as_str() { + "queued" => CommitJobStatus::Queued, + "in_progress" => CommitJobStatus::InProgress, + "finished" => CommitJobStatus::Finished, + _ => return Err(format!("{} is not a valid `CommitJobStatus`", s)), + }) + } +} + +impl ToString for CommitJobStatus { + fn to_string(&self) -> String { + match self { + CommitJobStatus::Queued => "queued", + CommitJobStatus::InProgress => "in_progress", + CommitJobStatus::Finished => "finished", + } + .to_string() + } +} + +/// Represents a job in the work queue for collectors +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommitJob { + pub sha: String, + pub parent_sha: Option, + pub commit_type: CommitType, + pub pr: u32, + pub commit_time: Date, + pub target: Target, + pub machine_id: Option, + pub started_at: Option, + pub finished_at: Option, + pub status: CommitJobStatus, +} + +impl CommitJob { + /// Create a new commit job + pub fn new( + sha: String, + parent_sha: Option, + pr: u32, + commit_type: CommitType, + commit_time: Date, + ) -> Self { + Self { + sha, + parent_sha, + commit_type, + pr, + commit_time, + status: CommitJobStatus::Queued, + target: Target::X86_64UnknownLinuxGnu, + machine_id: None, + started_at: None, + finished_at: None, + } + } + + pub fn from_db( + sha: String, + parent_sha: Option, + commit_type: CommitType, + pr: u32, + commit_time: Date, + target: Target, + machine_id: Option, + started_at: Option, + finished_at: Option, + status: CommitJobStatus, + ) -> Self { + Self { + sha, + parent_sha, + commit_type, + pr, + commit_time, + target, + machine_id, + started_at, + finished_at, + status, + } + } + + pub fn get_enqueue_column_names() -> Vec { + vec![ + String::from("sha"), + String::from("parent_sha"), + String::from("commit_type"), + String::from("pr"), + String::from("commit_time"), + String::from("status"), + String::from("target"), + ] + } +} diff --git a/database/src/pool.rs b/database/src/pool.rs index d33f5b432..6c52703e0 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,5 +1,5 @@ use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CompileBenchmark, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CommitJob, CompileBenchmark, Target }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -178,6 +178,14 @@ pub trait Connection: Send + Sync { /// Removes all data associated with the given artifact. async fn purge_artifact(&self, aid: &ArtifactId); + + /* @Queue - Adds a job - we want to "double up" by adding one per `Target` */ + /// Add a job to the queue + async fn enqueue_commit_job(&self, target: Target, jobs: &[CommitJob]); + + /* @Queue - currently extracts everything out of the queue as a SELECT */ + /// Dequeue jobs + async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option; } #[async_trait::async_trait] diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index f3501e716..85830b0fd 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,7 +1,8 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, CodegenBackend, CollectionId, - Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, Scenario, Target, + Commit, CommitJob, CommitJobStatus, CommitType, CompileBenchmark, Date, Index, Profile, + QueuedCommit, Scenario, Target, }; use anyhow::Context as _; use chrono::{DateTime, TimeZone, Utc}; @@ -12,6 +13,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; +use tokio_postgres::types::{FromSql, ToSql}; use tokio_postgres::GenericClient; use tokio_postgres::Statement; @@ -1365,6 +1367,217 @@ where .await .unwrap(); } + + /* @Queue */ + async fn enqueue_commit_job(&self, target: Target, jobs: &[CommitJob]) { + let row_count = jobs.len(); + if row_count == 0 { + return; + } + + let column_names = CommitJob::get_enqueue_column_names(); + let column_string_names = column_names.join(", "); + let column_count: usize = column_names.len(); + // Generate the placeholders like ($1, $2, ..., $7), ($8, $9, ..., $14), ... + let placeholders = (0..row_count) + .map(|i| { + let offset = i * column_count; + let group = (1..=column_count) + .map(|j| format!("${}", offset + j)) + .collect::>() + .join(", "); + format!("({})", group) + }) + .collect::>() + .join(", "); + + let sql = format!( + "INSERT INTO commit_queue ({}) VALUES {}", + column_string_names, placeholders, + ); + + let params: Vec<&(dyn tokio_postgres::types::ToSql + Sync)> = jobs + .iter() + .flat_map(|job| { + vec![ + &job.sha as &(dyn tokio_postgres::types::ToSql + Sync), + &job.parent_sha, + &job.commit_type, + &job.pr, + &job.commit_time, + &job.status, + &target, + ] + }) + .collect(); + + self.conn().execute(&sql, ¶ms).await.unwrap(); + } + + /* @Queue */ + async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option { + /* Check to see if this machine possibly went offline while doing + * a previous job - if it did we'll take that job */ + let maybe_previous_job = self + .conn() + .query_opt( + " + WITH job_to_update AS ( + SELECT sha + FROM commit_queue + WHERE machine_id = $1 + AND target = $2 + AND status = 'in_progress' + ORDER BY started_at + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE commit_queue + SET started_at = NOW(), + status = 'in_progress' + WHERE machine_id = $1 + AND target = $2 + AND sha = (SELECT sha FROM job_to_update) + RETURNING sha; + ", + &[&machine_id, &target], + ) + .await + .unwrap(); + + /* If it was we will take that job */ + if let Some(row) = maybe_previous_job { + return Some(row.get("sha")); + } + + let maybe_drift_job = self + .conn() + .query_opt( + " + WITH job_to_update AS ( + SELECT * + FROM commit_queue + WHERE target != $1 + AND status IN ('finished', 'in_progress') + AND sha NOT IN ( + SELECT sha + FROM commit_queue + WHERE target != $1 + AND status = 'finished' + ) + ORDER BY started_at + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE commit_queue + SET started_at = NOW(), + status = 'in_progress', + machine_id = $2 + WHERE + target = $1 + AND sha = (SELECT sha FROM job_to_update) + RETURNING sha; + ", + &[&target, &machine_id], + ) + .await + .unwrap(); + + /* If we are, we will take that job */ + if let Some(row) = maybe_drift_job { + return Some(row.get("sha")); + } + + /* See if there are any jobs that need taking care of */ + let job = self + .conn() + .query_opt(" + WITH job_to_update AS ( + SELECT sha + FROM commit_queue + WHERE target = $1 + AND status = 'queued' + ORDER BY pr ASC, commit_type, sha + LIMIT 1 + FOR UPDATE SKIP LOCKED + ) + UPDATE commit_queue + SET started_at = NOW(), + status = 'in_progress', + machine_id = $2 + WHERE + sha = (SELECT sha FROM job_to_update) + AND target = $1 + RETURNING sha; + ", &[&target, &machine_id]) + .await + .unwrap(); + + /* If there is one, we will take that job */ + if let Some(row) = job { + return Some(row.get("sha")); + } + + /* There are no jobs in the queue */ + return None; + } +} + +#[macro_export] +macro_rules! impl_to_postgresql_via_to_string { + ($t:ty) => { + impl tokio_postgres::types::ToSql for $t { + fn to_sql( + &self, + ty: &tokio_postgres::types::Type, + out: &mut bytes::BytesMut, + ) -> Result> + { + self.to_string().to_sql(ty, out) + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool { + ::accepts(ty) + } + + // Only compile if the type is acceptable + tokio_postgres::types::to_sql_checked!(); + } + }; +} + +impl_to_postgresql_via_to_string!(Target); +impl_to_postgresql_via_to_string!(CommitType); +impl_to_postgresql_via_to_string!(CommitJobStatus); + +impl ToSql for Date { + fn to_sql( + &self, + ty: &tokio_postgres::types::Type, + out: &mut bytes::BytesMut, + ) -> Result> { + self.0.to_sql(ty, out) + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool { + as ToSql>::accepts(ty) + } + + tokio_postgres::types::to_sql_checked!(); +} + +impl<'a> FromSql<'a> for Date { + fn from_sql( + ty: &tokio_postgres::types::Type, + raw: &'a [u8], + ) -> Result> { + let dt = DateTime::::from_sql(ty, raw)?; + Ok(Date(dt)) + } + + fn accepts(ty: &tokio_postgres::types::Type) -> bool { + as FromSql>::accepts(ty) + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index 27d6b46de..d9545e7b4 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,13 +1,13 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, Benchmark, CodegenBackend, CollectionId, Commit, CommitType, - CompileBenchmark, Date, Profile, Target, + ArtifactCollection, ArtifactId, Benchmark, CodegenBackend, CollectionId, Commit, CommitJob, + CommitJobStatus, CommitType, CompileBenchmark, Date, Profile, Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; use hashbrown::HashMap; -use rusqlite::params; -use rusqlite::OptionalExtension; +use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ValueRef}; +use rusqlite::{params, params_from_iter, OptionalExtension, Row, ToSql}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; @@ -1252,6 +1252,217 @@ impl Connection for SqliteConnection { ) .unwrap(); } + + /* @Queue */ + async fn enqueue_commit_job(&self, target: Target, jobs: &[CommitJob]) { + let row_count = jobs.len(); + if row_count == 0 { + return; + } + + let column_names = CommitJob::get_enqueue_column_names(); + let column_string_names = column_names.join(", "); + let query_params = std::iter::repeat("?") + .take(column_names.len()) + .collect::>() + .join(", "); + let placeholders = (0..jobs.len()) + .map(|_| format!("({})", query_params)) + .collect::>() + .join(", "); + + let sql = format!( + "INSERT INTO commit_queue ({}) VALUES {}", + column_string_names, placeholders + ); + + let params: Vec<&dyn ToSql> = jobs + .iter() + .flat_map(|job| { + vec![ + &job.sha as &dyn ToSql, + &job.parent_sha, + &job.commit_type, + &job.pr, + &job.commit_time, + &job.status as &dyn ToSql, + &target, + ] + }) + .collect(); + + self.raw_ref() + .execute(&sql, params_from_iter(params)) + .unwrap(); + } + + /* @Queue */ + async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option { + /* See if the machine was doing something and then failed */ + let get_sha = |row: &Row| row.get::<_, String>(0).unwrap(); + + /* Check to see if this machine possibly went offline while doing + * a previous job - if it did we'll take that job */ + let maybe_previous_job = self + .raw_ref() + .prepare( + " + WITH job_to_update AS ( + SELECT sha + FROM commit_queue + WHERE machine_id = ? + AND target = ? + AND status = 'in_progress' + ORDER BY started_at + LIMIT 1 + ) + UPDATE commit_queue + SET started_at = datetime('now'), + status = 'in_progress' + WHERE machine_id = ? + AND target = ? + AND sha = (SELECT sha FROM job_to_update) + RETURNING sha; + ", + ) + .unwrap() + .query_map(params![&machine_id, &target, &machine_id, &target], |row| { + Ok(get_sha(row)) + }) + .unwrap() + .map(|row| row.unwrap()) + .collect::>(); + + if let Some(previous_job) = maybe_previous_job.get(0) { + return Some(previous_job.clone()); + } + + /* Check to see if we are out of sync with other collectors of + * different architectures, if we are we will update the row and + * return this `sha` */ + let maybe_drift_job = self + .raw_ref() + .prepare( + " + WITH job_to_update AS ( + SELECT * + FROM commit_queue + WHERE target != ? + AND status IN ('finished', 'in_progress') + AND sha NOT IN ( + SELECT sha + FROM commit_queue + WHERE target != ? + AND status = 'finished' + ) + ORDER BY started_at + LIMIT 1 + ) + UPDATE commit_queue + SET started_at = DATETIME('now'), + status = 'in_progress', + machine_id = ? + WHERE + target = ? + AND sha = (SELECT sha FROM job_to_update) + RETURNING sha; + ", + ) + .unwrap() + .query_map(params![&target, &target, &machine_id, &target], |row| { + Ok(get_sha(row)) + }) + .unwrap() + .map(|sha| sha.unwrap()) + .collect::>(); + + if let Some(drift_job) = maybe_drift_job.get(0) { + return Some(drift_job.clone()); + } + + /* See if there are any jobs that need taking care of */ + let jobs = self + .raw_ref() + .prepare( + " + WITH job_to_update AS ( + SELECT sha + FROM commit_queue + WHERE target = ? + AND status = 'queued' + ORDER BY + pr ASC, + commit_type, + sha + LIMIT 1 + ) + UPDATE commit_queue + SET started_at = DATETIME('now'), + status = 'in_progress', + machine_id = ? + WHERE + sha = (SELECT sha FROM job_to_update) + AND target = ? + RETURNING sha; + ", + ) + .unwrap() + .query_map(params![&target, &machine_id, &target], |row| { + Ok(row.get::<_, String>(0).unwrap()) + }) + .unwrap() + .map(|r| r.unwrap()) + .collect::>(); + + /* If there is one, we will take that job */ + if let Some(sha) = jobs.get(0) { + return Some(sha.clone()); + } + + /* There are no jobs in the queue */ + return None; + } +} + +#[macro_export] +macro_rules! impl_to_sqlite_via_to_string { + ($t:ty) => { + impl ToSql for $t { + fn to_sql(&self) -> rusqlite::Result> { + Ok(self.to_string().into()) + } + } + }; +} + +impl_to_sqlite_via_to_string!(Target); +impl_to_sqlite_via_to_string!(CommitType); +impl_to_sqlite_via_to_string!(CommitJobStatus); + +impl ToSql for Date { + fn to_sql(&self) -> rusqlite::Result> { + Ok(self.0.to_rfc3339().into()) + } +} + +impl FromSql for Date { + fn column_result(value: ValueRef<'_>) -> FromSqlResult { + match value { + ValueRef::Text(text) => { + let s = std::str::from_utf8(text).map_err(|e| FromSqlError::Other(Box::new(e)))?; + DateTime::parse_from_rfc3339(s) + .map(|dt| Date(dt.with_timezone(&Utc))) + .map_err(|e| FromSqlError::Other(Box::new(e))) + } + ValueRef::Integer(i) => Ok(Date(Utc.timestamp_opt(i, 0).unwrap())), + ValueRef::Real(f) => { + let secs = f.trunc() as i64; + let nanos = ((f - f.trunc()) * 1e9) as u32; + Ok(Date(Utc.timestamp_opt(secs, nanos).unwrap())) + } + _ => Err(FromSqlError::InvalidType), + } + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId { From 9b5d0cf0d57b31cd1a5a51a35ea4f8f6165dcf75 Mon Sep 17 00:00:00 2001 From: James Barford-Evans Date: Thu, 24 Apr 2025 14:18:09 +0100 Subject: [PATCH 2/4] periodically update the database queue --- database/src/pool/postgres.rs | 32 ++++++- database/src/pool/sqlite.rs | 25 ++++- site/src/lib.rs | 1 + site/src/load.rs | 173 +++++++++++++++++++++++++++++++++- site/src/main.rs | 6 +- site/src/queue_jobs.rs | 20 ++++ 6 files changed, 253 insertions(+), 4 deletions(-) create mode 100644 site/src/queue_jobs.rs diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 85830b0fd..9225ca746 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -287,6 +287,27 @@ static MIGRATIONS: &[&str] = &[ alter table pstat_series drop constraint test_case; alter table pstat_series add constraint test_case UNIQUE(crate, profile, scenario, backend, target, metric); "#, + r#" + CREATE TABLE IF NOT EXISTS commit_queue ( + sha TEXT, + parent_sha TEXT, + commit_type TEXT CHECK (commit_type IN ('master', 'try')), + pr INTEGER, + commit_time TIMESTAMP, + target TEXT, + -- The below are filled in by the collector that picks up the job + machine_id TEXT, + started_at TIMESTAMP, + finished_at TIMESTAMP, + status TEXT, + -- We have a primary key as the sha <-> target as there can only be one + -- pairing + PRIMARY KEY (sha, target) + ); + CREATE INDEX IF NOT EXISTS sha_idx ON commit_queue (sha); + CREATE INDEX IF NOT EXISTS machine_id_idx ON commit_queue (machine_id); + CREATE INDEX IF NOT EXISTS sha_machine_id_idx ON commit_queue (sha, machine_id); + "#, ]; #[async_trait::async_trait] @@ -1391,8 +1412,10 @@ where .collect::>() .join(", "); + /* Add everything to the database if there already exists something + * with a `sha <-> target` pairing it will simply be ignored */ let sql = format!( - "INSERT INTO commit_queue ({}) VALUES {}", + "INSERT INTO commit_queue ({}) VALUES {} ON CONFLICT DO NOTHING", column_string_names, placeholders, ); @@ -1430,7 +1453,14 @@ where AND status = 'in_progress' ORDER BY started_at LIMIT 1 + + -- @Note; This prevents multiple + machines of the same + architecture taking the + same job. See here for more information; + https://www.postgresql.org/docs/17/sql-select.html#SQL-FOR-UPDATE-SHARE FOR UPDATE SKIP LOCKED + ) UPDATE commit_queue SET started_at = NOW(), diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index d9545e7b4..fc7ccd1eb 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -404,6 +404,29 @@ static MIGRATIONS: &[Migration] = &[ alter table pstat_series_with_target rename to pstat_series; "#, ), + Migration::without_foreign_key_constraints( + r#" + CREATE TABLE IF NOT EXISTS commit_queue ( + sha TEXT, + parent_sha TEXT, + commit_type TEXT CHECK (commit_type IN ('master', 'try')), + pr INTEGER, + commit_time TIMESTAMP, + target TEXT, + -- The below are filled in by the collector that picks up the job + machine_id TEXT, + started_at TIMESTAMP, + finished_at TIMESTAMP, + status TEXT, + -- We have a primary key as the sha <-> target as there can only be one + -- pairing + PRIMARY KEY (sha, target) + ); + CREATE INDEX IF NOT EXISTS sha_idx ON commit_queue (sha); + CREATE INDEX IF NOT EXISTS machine_id_idx ON commit_queue (machine_id); + CREATE INDEX IF NOT EXISTS sha_machine_id_idx ON commit_queue (sha, machine_id); + "#, + ), ]; #[async_trait::async_trait] @@ -1272,7 +1295,7 @@ impl Connection for SqliteConnection { .join(", "); let sql = format!( - "INSERT INTO commit_queue ({}) VALUES {}", + "INSERT OR IGNORE INTO commit_queue ({}) VALUES {};", column_string_names, placeholders ); diff --git a/site/src/lib.rs b/site/src/lib.rs index 05bb1e443..82c6aa3fe 100644 --- a/site/src/lib.rs +++ b/site/src/lib.rs @@ -5,6 +5,7 @@ pub mod api; pub mod github; pub mod load; pub mod server; +pub mod queue_jobs; mod average; mod benchmark_metadata; diff --git a/site/src/load.rs b/site/src/load.rs index 6722d73c2..cf4975406 100644 --- a/site/src/load.rs +++ b/site/src/load.rs @@ -14,8 +14,8 @@ use serde::{Deserialize, Serialize}; use crate::self_profile::SelfProfileCache; use collector::compile::benchmark::category::Category; use collector::{Bound, MasterCommit}; -use database::Pool; pub use database::{ArtifactId, Benchmark, Commit}; +use database::{CommitJob, Pool, Target}; use database::{CommitType, Date}; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] @@ -210,6 +210,38 @@ impl SiteCtxt { ) } + /// Will enqueue jobs to the database ready for a collector to take + pub async fn enqueue_commit_jobs(&self) { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); + loop { + let conn = self.conn().await; + let (queued_pr_commits, in_progress_artifacts) = + futures::join!(conn.queued_commits(), conn.in_progress_artifacts()); + let master_commits = &self.get_master_commits().commits; + + let index = self.index.load(); + let all_commits = index + .commits() + .iter() + .map(|commit| commit.sha.clone()) + .collect::>(); + + let jobs = enqueue_new_jobs( + master_commits.clone(), + queued_pr_commits, + in_progress_artifacts, + all_commits, + Utc::now(), + ); + + conn.enqueue_commit_job(Target::X86_64UnknownLinuxGnu, &jobs) + .await; + interval.tick().await; + + println!("Cron job executed at: {:?}", std::time::SystemTime::now()); + } + } + /// Returns the not yet tested published artifacts, sorted from newest to oldest. pub async fn missing_published_artifacts(&self) -> anyhow::Result> { let artifact_list: String = reqwest::get("https://static.rust-lang.org/manifests.txt") @@ -521,6 +553,145 @@ where true_count } +/// Conversion between our application layer and database object, also +/// simplifies the `(Commit, MissingReason)` tuple by essentially flatteing it +fn commit_job_from_queue_item(item: &(Commit, MissingReason)) -> CommitJob { + let parent_sha = if let Some(parent_sha) = item.1.parent_sha() { + Some(parent_sha.to_string()) + } else { + None + }; + CommitJob::new( + item.0.sha.clone(), + parent_sha, + item.1.pr().unwrap_or(0), + item.0.r#type.clone(), + item.0.date, + ) +} + +fn enqueue_new_jobs( + master_commits: Vec, + queued_pr_commits: Vec, + in_progress_artifacts: Vec, + mut all_commits: HashSet, + time: chrono::DateTime, +) -> Vec { + let mut queue = master_commits + .into_iter() + .filter(|c| time.signed_duration_since(c.time) < Duration::days(29)) + .map(|c| { + ( + Commit { + sha: c.sha, + date: Date(c.time), + r#type: CommitType::Master, + }, + // All recent master commits should have an associated PR + MissingReason::Master { + pr: c.pr.unwrap_or(0), + parent_sha: c.parent_sha, + is_try_parent: false, + }, + ) + }) + .collect::>(); + let master_commits = queue + .iter() + .map(|(commit, _)| commit.sha.clone()) + .collect::>(); + for database::QueuedCommit { + sha, + parent_sha, + pr, + include, + exclude, + runs, + commit_date, + backends, + } in queued_pr_commits + .into_iter() + // filter out any queued PR master commits (leaving only try commits) + .filter(|queued_commit| !master_commits.contains(&queued_commit.sha)) + { + // Mark the parent commit as a try_parent. + if let Some((_, missing_reason)) = queue + .iter_mut() + .find(|(commit, _)| commit.sha == parent_sha.as_str()) + { + /* Mutates the parent by scanning the list again... bad. */ + if let MissingReason::Master { is_try_parent, .. } = missing_reason { + *is_try_parent = true; + } else { + unreachable!("try commit has non-master parent {:?}", missing_reason); + }; + } + queue.push(( + Commit { + sha: sha.to_string(), + date: commit_date.unwrap_or(Date::empty()), + r#type: CommitType::Try, + }, + MissingReason::Try { + pr, + parent_sha, + include, + exclude, + runs, + backends, + }, + )); + } + for aid in in_progress_artifacts { + match aid { + ArtifactId::Commit(aid_commit) => { + let previous = queue + .iter() + .find(|(commit, _)| commit.sha == aid_commit.sha) + .map(|pair| Box::new(pair.1.clone())); + all_commits.remove(&aid_commit.sha); + queue.insert(0, (aid_commit, MissingReason::InProgress(previous))); + } + ArtifactId::Tag(_) => { + // do nothing, for now, though eventually we'll want an artifact queue + } + } + } + let mut already_tested = all_commits.clone(); + let mut i = 0; + while i != queue.len() { + if !already_tested.insert(queue[i].0.sha.clone()) { + queue.remove(i); + } else { + i += 1; + } + } + + let (jobs, job_sha_set) = queue.iter().fold( + (vec![], HashSet::::new()), + |(mut vec, mut set), item| { + let job = commit_job_from_queue_item(item); + set.insert(job.sha.clone()); + vec.push(job); + (vec, set) + }, + ); + + /* Jobs that are viable to be inserted into the database, these are jobs + * which either have a parent that is in the set or do not have a parent. + * Some commits could reference a parent that is not in the set which means + * it is not ready to be queued */ + jobs.into_iter() + .filter(|job| { + if let Some(parent_sha) = &job.parent_sha { + job_sha_set.contains(parent_sha) + } else { + false + } + }) + .collect() +} + /// One decimal place rounded percent #[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)] pub struct Percent(#[serde(with = "collector::round_float")] pub f64); diff --git a/site/src/main.rs b/site/src/main.rs index 9af67b6b5..076053c3f 100644 --- a/site/src/main.rs +++ b/site/src/main.rs @@ -8,6 +8,8 @@ use std::sync::Arc; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; +static QUEUE_UPDATE_INTERVAL_SECONDS: u64 = 5; + #[tokio::main] async fn main() { env_logger::init(); @@ -50,7 +52,9 @@ async fn main() { .fuse(); println!("Starting server with port={:?}", port); - let server = site::server::start(ctxt, port).fuse(); + let server = site::server::start(ctxt.clone(), port).fuse(); + site::queue_jobs::cron_enqueue_jobs(ctxt, QUEUE_UPDATE_INTERVAL_SECONDS).await; + futures::pin_mut!(server); futures::pin_mut!(fut); loop { diff --git a/site/src/queue_jobs.rs b/site/src/queue_jobs.rs new file mode 100644 index 000000000..d200875c4 --- /dev/null +++ b/site/src/queue_jobs.rs @@ -0,0 +1,20 @@ +use std::sync::Arc; + +use parking_lot::RwLock; +use tokio::time::{self, Duration}; +use crate::load::SiteCtxt; + +/// Inserts into the queue at `seconds` interval +pub async fn cron_enqueue_jobs(site_ctxt: Arc>>>, seconds: u64) { + let mut interval = time::interval(Duration::from_secs(seconds)); + + loop { + let ctxt = site_ctxt.clone(); + let ctxt = ctxt.read(); + if let Some(ctxt) = ctxt.as_ref() { + ctxt.enqueue_commit_jobs().await; + } + interval.tick().await; + println!("Cron job executed at: {:?}", std::time::SystemTime::now()); + } +} From 7d524cede7dd7e0932755e226a1b66ccd6908254 Mon Sep 17 00:00:00 2001 From: James Barford-Evans Date: Thu, 24 Apr 2025 16:17:51 +0100 Subject: [PATCH 3/4] hook up dequeuing logic to collector --- collector/src/bin/collector.rs | 387 ++++++++++++++------- database/src/lib.rs | 38 +- database/src/pool.rs | 12 +- database/src/pool/postgres.rs | 107 ++++-- database/src/pool/sqlite.rs | 108 ++++-- site/src/lib.rs | 2 +- site/src/load.rs | 82 ++++- site/src/main.rs | 9 +- site/src/queue_jobs.rs | 2 +- site/src/request_handlers.rs | 2 +- site/src/request_handlers/next_artifact.rs | 20 ++ site/src/server.rs | 5 + 12 files changed, 557 insertions(+), 217 deletions(-) diff --git a/collector/src/bin/collector.rs b/collector/src/bin/collector.rs index c82929267..5d22f2d86 100644 --- a/collector/src/bin/collector.rs +++ b/collector/src/bin/collector.rs @@ -56,7 +56,7 @@ use collector::toolchain::{ use collector::utils::cachegrind::cachegrind_diff; use collector::utils::{is_installed, wait_for_future}; use collector::{utils, CollectorCtx, CollectorStepBuilder}; -use database::{ArtifactId, ArtifactIdNumber, Commit, CommitType, Connection, Pool}; +use database::{ArtifactId, ArtifactIdNumber, Commit, CommitJob, CommitType, Connection, Pool}; fn n_normal_benchmarks_remaining(n: usize) -> String { let suffix = if n == 1 { "" } else { "s" }; @@ -581,6 +581,21 @@ enum Commands { self_profile: SelfProfileOption, }, + /// Benchmarks commits from the queue sequentially + BenchFromQueue { + /// Site URL + site_url: String, + + #[command(flatten)] + db: DbOption, + + #[command(flatten)] + bench_rustc: BenchRustcOption, + + #[command(flatten)] + self_profile: SelfProfileOption, + }, + /// Benchmarks a published toolchain for perf.rust-lang.org's dashboard BenchPublished { /// Toolchain (e.g. stable, beta, 1.26.0) @@ -947,129 +962,113 @@ fn main_result() -> anyhow::Result { // no missing artifacts return Ok(0); }; + benchmark_next_artifact( + next, + &db, + &target_triple, + &runtime_benchmark_dir, + &benchmark_dirs, + client, + &site_url, + self_profile, + bench_rustc, + &compile_benchmark_dir, + ) + } - let res = std::panic::catch_unwind(|| { - let pool = database::Pool::open(&db.db); - let mut rt = build_async_runtime(); - - match next { - NextArtifact::Release(tag) => { - let toolchain = - create_toolchain_from_published_version(&tag, &target_triple)?; - bench_published_artifact( - rt.block_on(pool.connection()), - &mut rt, - toolchain, - &benchmark_dirs, - ) - } - NextArtifact::Commit { - commit, - include, - exclude, - runs, - backends: requested_backends, - } => { - // Parse the requested backends, with LLVM as a fallback if none or no valid - // ones were explicitly specified, which will be the case for the vast - // majority of cases. - let mut backends = vec![]; - if let Some(requested_backends) = requested_backends { - let requested_backends = requested_backends.to_lowercase(); - if requested_backends.contains("llvm") { - backends.push(CodegenBackend::Llvm); - } - if requested_backends.contains("cranelift") { - backends.push(CodegenBackend::Cranelift); - } - } - - if backends.is_empty() { - backends.push(CodegenBackend::Llvm); - } - - // FIXME: remove this when/if NextArtifact::Commit's include/exclude - // changed from Option to Vec - // to not to manually parse args - let split_args = |l: Option| -> Vec { - if let Some(l) = l { - l.split(',').map(|arg| arg.trim().to_owned()).collect() - } else { - vec![] - } - }; - let sha = commit.sha.to_string(); - let sysroot = Sysroot::install(sha.clone(), &target_triple, &backends) - .with_context(|| { - format!("failed to install sysroot for {:?}", commit) - })?; - - let mut benchmarks = get_compile_benchmarks( - &compile_benchmark_dir, - &split_args(include), - &split_args(exclude), - &[], - )?; - benchmarks.retain(|b| b.category().is_primary_or_secondary()); - - let artifact_id = ArtifactId::Commit(commit); - let mut conn = rt.block_on(pool.connection()); - let toolchain = Toolchain::from_sysroot(&sysroot, sha); - - let compile_config = CompileBenchmarkConfig { - benchmarks, - profiles: vec![ - Profile::Check, - Profile::Debug, - Profile::Doc, - Profile::Opt, - ], - scenarios: Scenario::all(), - backends, - iterations: runs.map(|v| v as usize), - is_self_profile: self_profile.self_profile, - bench_rustc: bench_rustc.bench_rustc, - targets: vec![Target::default()], - }; - let runtime_suite = rt.block_on(load_runtime_benchmarks( - conn.as_mut(), - &runtime_benchmark_dir, - CargoIsolationMode::Isolated, - None, - &toolchain, - &artifact_id, - ))?; - - let runtime_config = RuntimeBenchmarkConfig { - runtime_suite, - filter: BenchmarkFilter::keep_all(), - iterations: DEFAULT_RUNTIME_ITERATIONS, - }; - let shared = SharedBenchmarkConfig { - artifact_id, - toolchain, - }; - - run_benchmarks( - &mut rt, - conn, - shared, - Some(compile_config), - Some(runtime_config), - ) - } - } - }); - // We need to send a message to this endpoint even if the collector panics - client.post(format!("{}/perf/onpush", site_url)).send()?; + Commands::BenchFromQueue { + site_url, + db, + bench_rustc, + self_profile, + } => { + log_db(&db); + println!("processing artifacts"); + let client = reqwest::blocking::Client::new(); + /* @Queue - How do we do this per architecture if it's not queued? + * do this from the Cron job and create a "fake" queued job + * with a PR of 0 and a random uuid for a sha? Then remove + * this api call? + * + * Or should we insert it into the database as the current + * job? + * See if we have a release artifact before consulting the database + * queue */ + let response: collector::api::next_artifact::Response = client + .get(format!("{}/perf/released_artifact", site_url)) + .send()? + .json()?; - match res { - Ok(res) => res?, - Err(error) => { - log::error!("The collector has crashed\n{error:?}"); - } + let next = if let Some(c) = response.artifact { + c + } else { + let commit_job_future_result = std::panic::catch_unwind(async || { + let pool = database::Pool::open(&db.db); + let rt = build_async_runtime(); + let conn = rt.block_on(pool.connection()); + conn.dequeue_commit_job( + "MACHINE_ID".into(), + database::Target::X86_64UnknownLinuxGnu, + ) + .await + }); + + let Ok(commit_job_future) = commit_job_future_result else { + return Ok(1); + }; + + let Some(commit_job) = futures::executor::block_on(commit_job_future) else { + println!("no artifact to benchmark"); + // Sleep for a bit to avoid spamming the database too much + // This sleep serves to remove a needless sleep in `collector/collect.sh` when + // a benchmark was actually executed. + std::thread::sleep(Duration::from_secs(60 * 2)); + return Ok(0); + }; + + next_artifact_from_commit_job(&commit_job) + }; + + // get the sha so we can update the job in the database + let commit_sha = match &next { + NextArtifact::Release(_) => None, + NextArtifact::Commit { + commit, + include: _, + exclude: _, + runs: _, + backends: _, + } => Some(commit.sha.clone()), + }; + + let benchmark_result = benchmark_next_artifact( + next, + &db, + &target_triple, + &runtime_benchmark_dir, + &benchmark_dirs, + client, + &site_url, + self_profile, + bench_rustc, + &compile_benchmark_dir, + ); + + if let Some(sha) = commit_sha { + let _ = std::panic::catch_unwind(async || { + let pool = database::Pool::open(&db.db); + let rt = build_async_runtime(); + let conn = rt.block_on(pool.connection()); + conn.finish_commit_job( + "MACHINE_ID".into(), + database::Target::X86_64UnknownLinuxGnu, + sha, + ) + .await + }); } - Ok(0) + + benchmark_result } Commands::BenchPublished { toolchain, db } => { @@ -2049,3 +2048,149 @@ fn get_commit_or_fake_it(sha: &str) -> anyhow::Result { } })) } + +/// Adaptor to map a `CommitJob` -> `NextArtifact` +fn next_artifact_from_commit_job(commit_job: &CommitJob) -> NextArtifact { + let commit = Commit { + sha: commit_job.sha.clone(), + date: commit_job.commit_time, + r#type: commit_job.commit_type.clone(), + }; + NextArtifact::Commit { + commit, + include: commit_job.include.clone(), + exclude: commit_job.exclude.clone(), + runs: commit_job.runs, + backends: commit_job.backends.clone(), + } +} + +// This functions body can be pasted into `BenchFromQueue` +/// Benchmark an artifact +fn benchmark_next_artifact( + next: NextArtifact, + db: &DbOption, + target_triple: &str, + runtime_benchmark_dir: &PathBuf, + benchmark_dirs: &BenchmarkDirs, + client: reqwest::blocking::Client, + site_url: &str, + self_profile: SelfProfileOption, + bench_rustc: BenchRustcOption, + compile_benchmark_dir: &PathBuf, +) -> anyhow::Result { + let res = std::panic::catch_unwind(|| { + let pool = database::Pool::open(&db.db); + let mut rt = build_async_runtime(); + + match next { + NextArtifact::Release(tag) => { + let toolchain = create_toolchain_from_published_version(&tag, &target_triple)?; + bench_published_artifact( + rt.block_on(pool.connection()), + &mut rt, + toolchain, + &benchmark_dirs, + ) + } + NextArtifact::Commit { + commit, + include, + exclude, + runs, + backends: requested_backends, + } => { + // Parse the requested backends, with LLVM as a fallback if none or no valid + // ones were explicitly specified, which will be the case for the vast + // majority of cases. + let mut backends = vec![]; + if let Some(requested_backends) = requested_backends { + let requested_backends = requested_backends.to_lowercase(); + if requested_backends.contains("llvm") { + backends.push(CodegenBackend::Llvm); + } + if requested_backends.contains("cranelift") { + backends.push(CodegenBackend::Cranelift); + } + } + + if backends.is_empty() { + backends.push(CodegenBackend::Llvm); + } + + // FIXME: remove this when/if NextArtifact::Commit's include/exclude + // changed from Option to Vec + // to not to manually parse args + let split_args = |l: Option| -> Vec { + if let Some(l) = l { + l.split(',').map(|arg| arg.trim().to_owned()).collect() + } else { + vec![] + } + }; + let sha = commit.sha.to_string(); + let sysroot = Sysroot::install(sha.clone(), &target_triple, &backends) + .with_context(|| format!("failed to install sysroot for {:?}", commit))?; + + let mut benchmarks = get_compile_benchmarks( + &compile_benchmark_dir, + &split_args(include), + &split_args(exclude), + &[], + )?; + benchmarks.retain(|b| b.category().is_primary_or_secondary()); + + let artifact_id = ArtifactId::Commit(commit); + let mut conn = rt.block_on(pool.connection()); + let toolchain = Toolchain::from_sysroot(&sysroot, sha); + + let compile_config = CompileBenchmarkConfig { + benchmarks, + profiles: vec![Profile::Check, Profile::Debug, Profile::Doc, Profile::Opt], + scenarios: Scenario::all(), + backends, + iterations: runs.map(|v| v as usize), + is_self_profile: self_profile.self_profile, + bench_rustc: bench_rustc.bench_rustc, + targets: vec![Target::default()], + }; + let runtime_suite = rt.block_on(load_runtime_benchmarks( + conn.as_mut(), + &runtime_benchmark_dir, + CargoIsolationMode::Isolated, + None, + &toolchain, + &artifact_id, + ))?; + + let runtime_config = RuntimeBenchmarkConfig { + runtime_suite, + filter: BenchmarkFilter::keep_all(), + iterations: DEFAULT_RUNTIME_ITERATIONS, + }; + let shared = SharedBenchmarkConfig { + artifact_id, + toolchain, + }; + + run_benchmarks( + &mut rt, + conn, + shared, + Some(compile_config), + Some(runtime_config), + ) + } + } + }); + // We need to send a message to this endpoint even if the collector panics + client.post(format!("{}/perf/onpush", site_url)).send()?; + + match res { + Ok(res) => res?, + Err(error) => { + log::error!("The collector has crashed\n{error:?}"); + } + } + Ok(0) +} diff --git a/database/src/lib.rs b/database/src/lib.rs index 17b2266ed..920c192cb 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -845,31 +845,13 @@ pub struct CommitJob { pub started_at: Option, pub finished_at: Option, pub status: CommitJobStatus, + pub include: Option, + pub exclude: Option, + pub runs: Option, + pub backends: Option, } impl CommitJob { - /// Create a new commit job - pub fn new( - sha: String, - parent_sha: Option, - pr: u32, - commit_type: CommitType, - commit_time: Date, - ) -> Self { - Self { - sha, - parent_sha, - commit_type, - pr, - commit_time, - status: CommitJobStatus::Queued, - target: Target::X86_64UnknownLinuxGnu, - machine_id: None, - started_at: None, - finished_at: None, - } - } - pub fn from_db( sha: String, parent_sha: Option, @@ -881,6 +863,10 @@ impl CommitJob { started_at: Option, finished_at: Option, status: CommitJobStatus, + include: Option, + exclude: Option, + runs: Option, + backends: Option, ) -> Self { Self { sha, @@ -893,6 +879,10 @@ impl CommitJob { started_at, finished_at, status, + include, + exclude, + runs, + backends, } } @@ -905,6 +895,10 @@ impl CommitJob { String::from("commit_time"), String::from("status"), String::from("target"), + String::from("include"), + String::from("exclude"), + String::from("runs"), + String::from("backends"), ] } } diff --git a/database/src/pool.rs b/database/src/pool.rs index 6c52703e0..c6594cc17 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,5 +1,6 @@ use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CommitJob, CompileBenchmark, Target + ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CommitJob, CompileBenchmark, + Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -179,13 +180,14 @@ pub trait Connection: Send + Sync { /// Removes all data associated with the given artifact. async fn purge_artifact(&self, aid: &ArtifactId); - /* @Queue - Adds a job - we want to "double up" by adding one per `Target` */ - /// Add a job to the queue + /// Add a jobs to the queue async fn enqueue_commit_job(&self, target: Target, jobs: &[CommitJob]); - /* @Queue - currently extracts everything out of the queue as a SELECT */ /// Dequeue jobs - async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option; + async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option; + + /// Mark the job as finished + async fn finish_commit_job(&self, machine_id: String, target: Target, sha: String) -> bool; } #[async_trait::async_trait] diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 9225ca746..2de427c10 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -291,18 +291,19 @@ static MIGRATIONS: &[&str] = &[ CREATE TABLE IF NOT EXISTS commit_queue ( sha TEXT, parent_sha TEXT, - commit_type TEXT CHECK (commit_type IN ('master', 'try')), + commit_type TEXT, pr INTEGER, commit_time TIMESTAMP, target TEXT, - -- The below are filled in by the collector that picks up the job + include TEXT, + exclude TEXT, + runs INTEGER, + backends TEXT, machine_id TEXT, started_at TIMESTAMP, finished_at TIMESTAMP, status TEXT, - -- We have a primary key as the sha <-> target as there can only be one - -- pairing - PRIMARY KEY (sha, target) + PRIMARY KEY (sha, target) ); CREATE INDEX IF NOT EXISTS sha_idx ON commit_queue (sha); CREATE INDEX IF NOT EXISTS machine_id_idx ON commit_queue (machine_id); @@ -1389,7 +1390,6 @@ where .unwrap(); } - /* @Queue */ async fn enqueue_commit_job(&self, target: Target, jobs: &[CommitJob]) { let row_count = jobs.len(); if row_count == 0 { @@ -1412,7 +1412,7 @@ where .collect::>() .join(", "); - /* Add everything to the database if there already exists something + /* Add everything to the database if there already exists something * with a `sha <-> target` pairing it will simply be ignored */ let sql = format!( "INSERT INTO commit_queue ({}) VALUES {} ON CONFLICT DO NOTHING", @@ -1430,6 +1430,10 @@ where &job.commit_time, &job.status, &target, + &job.include, + &job.exclude, + &job.runs, + &job.backends, ] }) .collect(); @@ -1437,8 +1441,7 @@ where self.conn().execute(&sql, ¶ms).await.unwrap(); } - /* @Queue */ - async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option { + async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option { /* Check to see if this machine possibly went offline while doing * a previous job - if it did we'll take that job */ let maybe_previous_job = self @@ -1446,7 +1449,7 @@ where .query_opt( " WITH job_to_update AS ( - SELECT sha + SELECT * FROM commit_queue WHERE machine_id = $1 AND target = $2 @@ -1455,10 +1458,10 @@ where LIMIT 1 -- @Note; This prevents multiple - machines of the same - architecture taking the - same job. See here for more information; - https://www.postgresql.org/docs/17/sql-select.html#SQL-FOR-UPDATE-SHARE + -- machines of the same + -- architecture taking the + -- same job. See here for more information; + -- https://www.postgresql.org/docs/17/sql-select.html#SQL-FOR-UPDATE-SHARE FOR UPDATE SKIP LOCKED ) @@ -1468,7 +1471,7 @@ where WHERE machine_id = $1 AND target = $2 AND sha = (SELECT sha FROM job_to_update) - RETURNING sha; + RETURNING *; ", &[&machine_id, &target], ) @@ -1477,7 +1480,7 @@ where /* If it was we will take that job */ if let Some(row) = maybe_previous_job { - return Some(row.get("sha")); + return Some(commit_queue_row_to_commit_job(&row)); } let maybe_drift_job = self @@ -1506,7 +1509,7 @@ where WHERE target = $1 AND sha = (SELECT sha FROM job_to_update) - RETURNING sha; + RETURNING *; ", &[&target, &machine_id], ) @@ -1515,15 +1518,16 @@ where /* If we are, we will take that job */ if let Some(row) = maybe_drift_job { - return Some(row.get("sha")); + return Some(commit_queue_row_to_commit_job(&row)); } /* See if there are any jobs that need taking care of */ let job = self .conn() - .query_opt(" + .query_opt( + " WITH job_to_update AS ( - SELECT sha + SELECT * FROM commit_queue WHERE target = $1 AND status = 'queued' @@ -1538,19 +1542,76 @@ where WHERE sha = (SELECT sha FROM job_to_update) AND target = $1 - RETURNING sha; - ", &[&target, &machine_id]) + RETURNING *; + ", + &[&target, &machine_id], + ) .await .unwrap(); /* If there is one, we will take that job */ if let Some(row) = job { - return Some(row.get("sha")); + return Some(commit_queue_row_to_commit_job(&row)); } /* There are no jobs in the queue */ return None; } + + /// Mark a job in the database as done + async fn finish_commit_job(&self, machine_id: String, target: Target, sha: String) -> bool { + let jobs = self + .conn() + .query_opt( + " + UPDATE commit_queue + SET finished_at = DATETIME('now'), + status = 'finished', + WHERE + sha = $1 + AND machine_id = $1 + AND target = $1; + ", + &[&sha, &machine_id, &target], + ) + .await + .unwrap(); + return jobs.is_some(); + } +} + +/// Map a database row from the commit queue to a `CommitJob` +fn commit_queue_row_to_commit_job(row: &tokio_postgres::Row) -> CommitJob { + CommitJob { + sha: row.get::<_, String>("sha"), + parent_sha: row.get::<_, Option>("parent_sha"), + commit_type: CommitType::from_str(&row.get::<_, String>("commit_type")).unwrap(), + pr: row.get::<_, u32>("pr"), + commit_time: row.get::<_, String>("commit_time").parse::().unwrap(), + target: Target::from_str(&row.get::<_, String>("target")).unwrap(), + machine_id: row.get::<_, Option>("machine_id"), + started_at: { + let started: Option = row.get("started_at"); + if let Some(ts) = started { + Some(ts.parse::().unwrap()) + } else { + None + } + }, + finished_at: { + let finished: Option = row.get("finished_at"); + if let Some(ts) = finished { + Some(ts.parse::().unwrap()) + } else { + None + } + }, + status: CommitJobStatus::from_str(&row.get::<_, String>("status")).unwrap(), + include: row.get::<_, Option>("include"), + exclude: row.get::<_, Option>("exclude"), + runs: row.get::<_, Option>("runs"), + backends: row.get::<_, Option>("backends"), + } } #[macro_export] diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index fc7ccd1eb..b4a3635bd 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -7,7 +7,7 @@ use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; use hashbrown::HashMap; use rusqlite::types::{FromSql, FromSqlError, FromSqlResult, ValueRef}; -use rusqlite::{params, params_from_iter, OptionalExtension, Row, ToSql}; +use rusqlite::{params, params_from_iter, OptionalExtension, ToSql}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; @@ -409,18 +409,19 @@ static MIGRATIONS: &[Migration] = &[ CREATE TABLE IF NOT EXISTS commit_queue ( sha TEXT, parent_sha TEXT, - commit_type TEXT CHECK (commit_type IN ('master', 'try')), + commit_type TEXT, pr INTEGER, commit_time TIMESTAMP, target TEXT, - -- The below are filled in by the collector that picks up the job + include TEXT, + exclude TEXT, + runs INTEGER, + backends TEXT, machine_id TEXT, started_at TIMESTAMP, finished_at TIMESTAMP, status TEXT, - -- We have a primary key as the sha <-> target as there can only be one - -- pairing - PRIMARY KEY (sha, target) + PRIMARY KEY (sha, target) ); CREATE INDEX IF NOT EXISTS sha_idx ON commit_queue (sha); CREATE INDEX IF NOT EXISTS machine_id_idx ON commit_queue (machine_id); @@ -1276,7 +1277,6 @@ impl Connection for SqliteConnection { .unwrap(); } - /* @Queue */ async fn enqueue_commit_job(&self, target: Target, jobs: &[CommitJob]) { let row_count = jobs.len(); if row_count == 0 { @@ -1310,6 +1310,10 @@ impl Connection for SqliteConnection { &job.commit_time, &job.status as &dyn ToSql, &target, + &job.include, + &job.exclude, + &job.runs, + &job.backends, ] }) .collect(); @@ -1319,11 +1323,7 @@ impl Connection for SqliteConnection { .unwrap(); } - /* @Queue */ - async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option { - /* See if the machine was doing something and then failed */ - let get_sha = |row: &Row| row.get::<_, String>(0).unwrap(); - + async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option { /* Check to see if this machine possibly went offline while doing * a previous job - if it did we'll take that job */ let maybe_previous_job = self @@ -1331,7 +1331,7 @@ impl Connection for SqliteConnection { .prepare( " WITH job_to_update AS ( - SELECT sha + SELECT * FROM commit_queue WHERE machine_id = ? AND target = ? @@ -1345,16 +1345,16 @@ impl Connection for SqliteConnection { WHERE machine_id = ? AND target = ? AND sha = (SELECT sha FROM job_to_update) - RETURNING sha; + RETURNING *; ", ) .unwrap() .query_map(params![&machine_id, &target, &machine_id, &target], |row| { - Ok(get_sha(row)) + Ok(commit_queue_row_to_commit_job(row)) }) .unwrap() .map(|row| row.unwrap()) - .collect::>(); + .collect::>(); if let Some(previous_job) = maybe_previous_job.get(0) { return Some(previous_job.clone()); @@ -1388,16 +1388,16 @@ impl Connection for SqliteConnection { WHERE target = ? AND sha = (SELECT sha FROM job_to_update) - RETURNING sha; + RETURNING *; ", ) .unwrap() .query_map(params![&target, &target, &machine_id, &target], |row| { - Ok(get_sha(row)) + Ok(commit_queue_row_to_commit_job(row)) }) .unwrap() .map(|sha| sha.unwrap()) - .collect::>(); + .collect::>(); if let Some(drift_job) = maybe_drift_job.get(0) { return Some(drift_job.clone()); @@ -1409,7 +1409,7 @@ impl Connection for SqliteConnection { .prepare( " WITH job_to_update AS ( - SELECT sha + SELECT * FROM commit_queue WHERE target = ? AND status = 'queued' @@ -1426,25 +1426,45 @@ impl Connection for SqliteConnection { WHERE sha = (SELECT sha FROM job_to_update) AND target = ? - RETURNING sha; + RETURNING *; ", ) .unwrap() .query_map(params![&target, &machine_id, &target], |row| { - Ok(row.get::<_, String>(0).unwrap()) + Ok(commit_queue_row_to_commit_job(row)) }) .unwrap() .map(|r| r.unwrap()) - .collect::>(); + .collect::>(); /* If there is one, we will take that job */ - if let Some(sha) = jobs.get(0) { - return Some(sha.clone()); + if let Some(commit_job) = jobs.get(0) { + return Some(commit_job.clone()); } /* There are no jobs in the queue */ return None; } + + /// Mark a job in the database as done + async fn finish_commit_job(&self, machine_id: String, target: Target, sha: String) -> bool { + let jobs = self + .raw_ref() + .execute( + " + UPDATE commit_queue + SET finished_at = DATETIME('now'), + status = 'finished', + WHERE + sha = ? + AND machine_id = ? + AND target = ?; + ", + params![&sha, &machine_id, &target], + ) + .unwrap(); + return jobs == 1; + } } #[macro_export] @@ -1462,6 +1482,44 @@ impl_to_sqlite_via_to_string!(Target); impl_to_sqlite_via_to_string!(CommitType); impl_to_sqlite_via_to_string!(CommitJobStatus); +/// Map a database row from the commit queue to a `CommitJob` +fn commit_queue_row_to_commit_job(row: &rusqlite::Row) -> CommitJob { + CommitJob { + sha: row.get::<_, String>("sha").unwrap(), + parent_sha: row.get::<_, Option>("parent_sha").unwrap(), + commit_type: CommitType::from_str(&row.get::<_, String>("commit_type").unwrap()).unwrap(), + pr: row.get::<_, u32>("pr").unwrap(), + commit_time: row + .get::<_, String>("commit_time") + .unwrap() + .parse::() + .unwrap(), + target: Target::from_str(&row.get::<_, String>("target").unwrap()).unwrap(), + machine_id: row.get::<_, Option>("machine_id").unwrap(), + started_at: { + let started: Option = row.get("started_at").unwrap(); + if let Some(ts) = started { + Some(ts.parse::().unwrap()) + } else { + None + } + }, + finished_at: { + let finished: Option = row.get("finished_at").unwrap(); + if let Some(ts) = finished { + Some(ts.parse::().unwrap()) + } else { + None + } + }, + status: CommitJobStatus::from_str(&row.get::<_, String>("status").unwrap()).unwrap(), + include: row.get::<_, Option>("include").unwrap(), + exclude: row.get::<_, Option>("exclude").unwrap(), + runs: row.get::<_, Option>("runs").unwrap(), + backends: row.get::<_, Option>("backends").unwrap(), + } +} + impl ToSql for Date { fn to_sql(&self) -> rusqlite::Result> { Ok(self.0.to_rfc3339().into()) diff --git a/site/src/lib.rs b/site/src/lib.rs index 82c6aa3fe..ab4a3190c 100644 --- a/site/src/lib.rs +++ b/site/src/lib.rs @@ -4,8 +4,8 @@ extern crate itertools; pub mod api; pub mod github; pub mod load; -pub mod server; pub mod queue_jobs; +pub mod server; mod average; mod benchmark_metadata; diff --git a/site/src/load.rs b/site/src/load.rs index cf4975406..ff2d97095 100644 --- a/site/src/load.rs +++ b/site/src/load.rs @@ -15,7 +15,7 @@ use crate::self_profile::SelfProfileCache; use collector::compile::benchmark::category::Category; use collector::{Bound, MasterCommit}; pub use database::{ArtifactId, Benchmark, Commit}; -use database::{CommitJob, Pool, Target}; +use database::{CommitJob, CommitJobStatus, Pool, Target}; use database::{CommitType, Date}; #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] @@ -38,7 +38,7 @@ pub enum MissingReason { } impl MissingReason { - fn pr(&self) -> Option { + pub fn pr(&self) -> Option { let mut this = self; loop { match this { @@ -49,7 +49,7 @@ impl MissingReason { } } } - fn parent_sha(&self) -> Option<&str> { + pub fn parent_sha(&self) -> Option<&str> { let mut this = self; loop { match this { @@ -60,6 +60,54 @@ impl MissingReason { } } } + pub fn include(&self) -> Option<&str> { + let mut this = self; + loop { + match this { + // For Master variant there is no include field. + MissingReason::Master { .. } => return None, + MissingReason::Try { include, .. } => return include.as_deref(), + MissingReason::InProgress(Some(s)) => this = s, + MissingReason::InProgress(None) => return None, + } + } + } + pub fn exclude(&self) -> Option<&str> { + let mut this = self; + loop { + match this { + // For Master variant there is no exclude field. + MissingReason::Master { .. } => return None, + MissingReason::Try { exclude, .. } => return exclude.as_deref(), + MissingReason::InProgress(Some(s)) => this = s, + MissingReason::InProgress(None) => return None, + } + } + } + pub fn runs(&self) -> Option { + let mut this = self; + loop { + match this { + // For Master variant there is no runs field. + MissingReason::Master { .. } => return None, + MissingReason::Try { runs, .. } => return *runs, + MissingReason::InProgress(Some(s)) => this = s, + MissingReason::InProgress(None) => return None, + } + } + } + pub fn backends(&self) -> Option<&str> { + let mut this = self; + loop { + match this { + // For Master variant there is no backends field. + MissingReason::Master { .. } => return None, + MissingReason::Try { backends, .. } => return backends.as_deref(), + MissingReason::InProgress(Some(s)) => this = s, + MissingReason::InProgress(None) => return None, + } + } + } } #[derive(Clone, Debug, PartialEq, Eq)] @@ -556,18 +604,22 @@ where /// Conversion between our application layer and database object, also /// simplifies the `(Commit, MissingReason)` tuple by essentially flatteing it fn commit_job_from_queue_item(item: &(Commit, MissingReason)) -> CommitJob { - let parent_sha = if let Some(parent_sha) = item.1.parent_sha() { - Some(parent_sha.to_string()) - } else { - None - }; - CommitJob::new( - item.0.sha.clone(), - parent_sha, - item.1.pr().unwrap_or(0), - item.0.r#type.clone(), - item.0.date, - ) + CommitJob { + sha: item.0.sha.clone(), + parent_sha: item.1.parent_sha().map(|it| it.to_string()), + commit_type: item.0.r#type.clone(), + pr: item.1.pr().unwrap_or(0), + commit_time: item.0.date, + target: Target::X86_64UnknownLinuxGnu, + machine_id: None, + started_at: None, + finished_at: None, + status: CommitJobStatus::Queued, + include: item.1.include().map(|it| it.to_string()), + exclude: item.1.exclude().map(|it| it.to_string()), + runs: item.1.runs(), + backends: item.1.backends().map(|it| it.to_string()), + } } fn enqueue_new_jobs( diff --git a/site/src/main.rs b/site/src/main.rs index 076053c3f..a4a41cf9f 100644 --- a/site/src/main.rs +++ b/site/src/main.rs @@ -8,8 +8,6 @@ use std::sync::Arc; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -static QUEUE_UPDATE_INTERVAL_SECONDS: u64 = 5; - #[tokio::main] async fn main() { env_logger::init(); @@ -30,6 +28,11 @@ async fn main() { .ok() .and_then(|x| x.parse().ok()) .unwrap_or(2346); + let queue_update_interval_seconds: u64 = env::var("QUEUE_UPDATE_INTERVAL_SECONDS") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(5); + let fut = tokio::task::spawn_blocking(move || { tokio::task::spawn(async move { let res = Arc::new(load::SiteCtxt::from_db_url(&db_url).await.unwrap()); @@ -53,7 +56,7 @@ async fn main() { println!("Starting server with port={:?}", port); let server = site::server::start(ctxt.clone(), port).fuse(); - site::queue_jobs::cron_enqueue_jobs(ctxt, QUEUE_UPDATE_INTERVAL_SECONDS).await; + site::queue_jobs::cron_enqueue_jobs(ctxt, queue_update_interval_seconds).await; futures::pin_mut!(server); futures::pin_mut!(fut); diff --git a/site/src/queue_jobs.rs b/site/src/queue_jobs.rs index d200875c4..ce59b5e17 100644 --- a/site/src/queue_jobs.rs +++ b/site/src/queue_jobs.rs @@ -1,8 +1,8 @@ use std::sync::Arc; +use crate::load::SiteCtxt; use parking_lot::RwLock; use tokio::time::{self, Duration}; -use crate::load::SiteCtxt; /// Inserts into the queue at `seconds` interval pub async fn cron_enqueue_jobs(site_ctxt: Arc>>>, seconds: u64) { diff --git a/site/src/request_handlers.rs b/site/src/request_handlers.rs index d73e3cbc1..d106e3f38 100644 --- a/site/src/request_handlers.rs +++ b/site/src/request_handlers.rs @@ -13,7 +13,7 @@ pub use graph::{ handle_compile_detail_graphs, handle_compile_detail_sections, handle_graphs, handle_runtime_detail_graphs, }; -pub use next_artifact::handle_next_artifact; +pub use next_artifact::{handle_next_artifact, handle_released_artifact}; pub use self_profile::{ handle_self_profile, handle_self_profile_processed_download, handle_self_profile_raw, handle_self_profile_raw_download, diff --git a/site/src/request_handlers/next_artifact.rs b/site/src/request_handlers/next_artifact.rs index 95253d119..5250d79ba 100644 --- a/site/src/request_handlers/next_artifact.rs +++ b/site/src/request_handlers/next_artifact.rs @@ -82,3 +82,23 @@ pub async fn handle_next_artifact(ctxt: Arc) -> next_artifact::Respons artifact: next_commit, } } + +/// Retrieve the next release artifact or return NULL +pub async fn handle_released_artifact(ctxt: Arc) -> next_artifact::Response { + match ctxt.missing_published_artifacts().await { + Ok(next_artifact) => { + if let Some(next_artifact) = next_artifact.into_iter().next() { + log::debug!("next_artifact: {next_artifact}"); + next_artifact::Response { + artifact: Some(next_artifact::NextArtifact::Release(next_artifact)), + } + } else { + next_artifact::Response { artifact: None } + } + } + Err(error) => { + log::error!("Failed to fetch missing artifacts: {error:?}"); + next_artifact::Response { artifact: None } + } + } +} diff --git a/site/src/server.rs b/site/src/server.rs index 0a29b0ba2..3864678f7 100644 --- a/site/src/server.rs +++ b/site/src/server.rs @@ -387,6 +387,11 @@ async fn serve_req(server: Server, req: Request) -> Result { + return server + .handle_get_async(&req, request_handlers::handle_released_artifact) + .await; + } "/perf/triage" if *req.method() == http::Method::GET => { let ctxt: Arc = server.ctxt.read().as_ref().unwrap().clone(); let input: triage::Request = check!(parse_query_string(req.uri())); From 697814a05d5559116a03ba2327228ec1b2520cd9 Mon Sep 17 00:00:00 2001 From: James Barford-Evans Date: Tue, 29 Apr 2025 08:30:48 +0100 Subject: [PATCH 4/4] assigning machine-id and getting machines architecture --- collector/src/bin/collector.rs | 42 +++++++++++++++++++++++++--------- database/src/pool.rs | 4 ++-- database/src/pool/postgres.rs | 4 ++-- database/src/pool/sqlite.rs | 12 +++++----- 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/collector/src/bin/collector.rs b/collector/src/bin/collector.rs index 5d22f2d86..33c90972f 100644 --- a/collector/src/bin/collector.rs +++ b/collector/src/bin/collector.rs @@ -982,6 +982,11 @@ fn main_result() -> anyhow::Result { bench_rustc, self_profile, } => { + /* Both of these functions will panic if they are not used on a + * linux machine */ + let machine_id = get_machine_uuid(); + let target = get_machine_target(); + log_db(&db); println!("processing artifacts"); let client = reqwest::blocking::Client::new(); @@ -1006,11 +1011,7 @@ fn main_result() -> anyhow::Result { let pool = database::Pool::open(&db.db); let rt = build_async_runtime(); let conn = rt.block_on(pool.connection()); - conn.dequeue_commit_job( - "MACHINE_ID".into(), - database::Target::X86_64UnknownLinuxGnu, - ) - .await + conn.dequeue_commit_job(&machine_id, target).await }); let Ok(commit_job_future) = commit_job_future_result else { @@ -1059,12 +1060,7 @@ fn main_result() -> anyhow::Result { let pool = database::Pool::open(&db.db); let rt = build_async_runtime(); let conn = rt.block_on(pool.connection()); - conn.finish_commit_job( - "MACHINE_ID".into(), - database::Target::X86_64UnknownLinuxGnu, - sha, - ) - .await + conn.finish_commit_job(&machine_id, target, sha).await }); } @@ -2194,3 +2190,27 @@ fn benchmark_next_artifact( } Ok(0) } + +fn get_machine_uuid() -> String { + if let Ok(raw) = fs::read_to_string("/etc/machine-id") { + return raw; + } + panic!("Not running on a linux machine"); +} + +/// Get the machines target triple +fn get_machine_target() -> database::Target { + // The implementation of this is sub-par + let arch = std::env::consts::ARCH; + let os = std::env::consts::OS; + // This is slightly extreme, however the context of which this function is + // used means that we _cannot_ proceeed - namely we are benchmarking, at + // the present time, on linux. + if os != "linux" { + panic!("Machine can only run on linux"); + } + match arch { + "x86_64" => database::Target::X86_64UnknownLinuxGnu, + _ => panic!("Arch: `{}` not supported", arch), + } +} diff --git a/database/src/pool.rs b/database/src/pool.rs index c6594cc17..615cad0a7 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -184,10 +184,10 @@ pub trait Connection: Send + Sync { async fn enqueue_commit_job(&self, target: Target, jobs: &[CommitJob]); /// Dequeue jobs - async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option; + async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option; /// Mark the job as finished - async fn finish_commit_job(&self, machine_id: String, target: Target, sha: String) -> bool; + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool; } #[async_trait::async_trait] diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 2de427c10..6559decbc 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1441,7 +1441,7 @@ where self.conn().execute(&sql, ¶ms).await.unwrap(); } - async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option { + async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option { /* Check to see if this machine possibly went offline while doing * a previous job - if it did we'll take that job */ let maybe_previous_job = self @@ -1559,7 +1559,7 @@ where } /// Mark a job in the database as done - async fn finish_commit_job(&self, machine_id: String, target: Target, sha: String) -> bool { + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool { let jobs = self .conn() .query_opt( diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index b4a3635bd..2a01e62d6 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1323,7 +1323,7 @@ impl Connection for SqliteConnection { .unwrap(); } - async fn dequeue_commit_job(&self, machine_id: String, target: Target) -> Option { + async fn dequeue_commit_job(&self, machine_id: &str, target: Target) -> Option { /* Check to see if this machine possibly went offline while doing * a previous job - if it did we'll take that job */ let maybe_previous_job = self @@ -1349,7 +1349,7 @@ impl Connection for SqliteConnection { ", ) .unwrap() - .query_map(params![&machine_id, &target, &machine_id, &target], |row| { + .query_map(params![machine_id, &target, machine_id, &target], |row| { Ok(commit_queue_row_to_commit_job(row)) }) .unwrap() @@ -1392,7 +1392,7 @@ impl Connection for SqliteConnection { ", ) .unwrap() - .query_map(params![&target, &target, &machine_id, &target], |row| { + .query_map(params![&target, &target, machine_id, &target], |row| { Ok(commit_queue_row_to_commit_job(row)) }) .unwrap() @@ -1430,7 +1430,7 @@ impl Connection for SqliteConnection { ", ) .unwrap() - .query_map(params![&target, &machine_id, &target], |row| { + .query_map(params![&target, machine_id, &target], |row| { Ok(commit_queue_row_to_commit_job(row)) }) .unwrap() @@ -1447,7 +1447,7 @@ impl Connection for SqliteConnection { } /// Mark a job in the database as done - async fn finish_commit_job(&self, machine_id: String, target: Target, sha: String) -> bool { + async fn finish_commit_job(&self, machine_id: &str, target: Target, sha: String) -> bool { let jobs = self .raw_ref() .execute( @@ -1460,7 +1460,7 @@ impl Connection for SqliteConnection { AND machine_id = ? AND target = ?; ", - params![&sha, &machine_id, &target], + params![&sha, machine_id, &target], ) .unwrap(); return jobs == 1;