From aaf7f338045f64f93599b03c39f8e488420c1a09 Mon Sep 17 00:00:00 2001 From: Yuya Nishihara Date: Tue, 12 Nov 2024 22:16:10 +0900 Subject: [PATCH] lock: propagate error from lock(), remove panic from unlock path --- lib/src/local_working_copy.rs | 5 ++++- lib/src/lock/fallback.rs | 26 +++++++++++++++++--------- lib/src/lock/mod.rs | 23 ++++++++++++++++++----- lib/src/lock/unix.rs | 27 +++++++++++++++++++++------ lib/src/op_heads_store.rs | 6 ++++-- lib/src/simple_op_heads_store.rs | 8 ++++---- lib/src/stacked_table.rs | 3 ++- lib/src/transaction.rs | 2 +- 8 files changed, 71 insertions(+), 29 deletions(-) diff --git a/lib/src/local_working_copy.rs b/lib/src/local_working_copy.rs index be9882c65..0b6ff39d0 100644 --- a/lib/src/local_working_copy.rs +++ b/lib/src/local_working_copy.rs @@ -1729,7 +1729,10 @@ impl WorkingCopy for LocalWorkingCopy { fn start_mutation(&self) -> Result, WorkingCopyStateError> { let lock_path = self.state_path.join("working_copy.lock"); - let lock = FileLock::lock(lock_path); + let lock = FileLock::lock(lock_path).map_err(|err| WorkingCopyStateError { + message: "Failed to lock working copy".to_owned(), + err: err.into(), + })?; let wc = LocalWorkingCopy { store: self.store.clone(), diff --git a/lib/src/lock/fallback.rs b/lib/src/lock/fallback.rs index 9bfd6603c..513da8ac6 100644 --- a/lib/src/lock/fallback.rs +++ b/lib/src/lock/fallback.rs @@ -19,6 +19,8 @@ use std::time::Duration; use tracing::instrument; +use super::FileLockError; + pub struct FileLock { path: PathBuf, _file: File, @@ -56,7 +58,7 @@ impl Iterator for BackoffIterator { // Suppress warning on platforms where specialized lock impl is available #[cfg_attr(unix, allow(dead_code))] impl FileLock { - pub fn lock(path: PathBuf) -> FileLock { + pub fn lock(path: PathBuf) -> Result { let mut options = OpenOptions::new(); options.create_new(true); options.write(true); @@ -64,7 +66,7 @@ impl FileLock { loop { match options.open(&path) { Ok(file) => { - return FileLock { path, _file: file }; + return Ok(FileLock { path, _file: file }); } Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists @@ -74,15 +76,19 @@ impl FileLock { if let Some(duration) = backoff_iterator.next() { std::thread::sleep(duration); } else { - panic!( - "Timed out while trying to create lock file {}: {}", - path.display(), - err - ); + return Err(FileLockError { + message: "Timed out while trying to create lock file", + path, + err, + }); } } Err(err) => { - panic!("Failed to create lock file {}: {}", path.display(), err); + return Err(FileLockError { + message: "Failed to create lock file", + path, + err, + }) } } } @@ -92,6 +98,8 @@ impl FileLock { impl Drop for FileLock { #[instrument(skip_all)] fn drop(&mut self) { - std::fs::remove_file(&self.path).expect("Failed to delete lock file"); + std::fs::remove_file(&self.path) + .inspect_err(|err| tracing::warn!(?err, ?self.path, "Failed to delete lock file")) + .ok(); } } diff --git a/lib/src/lock/mod.rs b/lib/src/lock/mod.rs index ab59f7538..d71764ed1 100644 --- a/lib/src/lock/mod.rs +++ b/lib/src/lock/mod.rs @@ -18,16 +18,29 @@ mod fallback; #[cfg(unix)] mod unix; +use std::io; +use std::path::PathBuf; + +use thiserror::Error; + #[cfg(not(unix))] pub use self::fallback::FileLock; #[cfg(unix)] pub use self::unix::FileLock; +#[derive(Debug, Error)] +#[error("{message}: {path}")] +pub struct FileLockError { + pub message: &'static str, + pub path: PathBuf, + #[source] + pub err: io::Error, +} + #[cfg(test)] mod tests { use std::cmp::max; use std::fs; - use std::path::PathBuf; use std::thread; use std::time::Duration; @@ -37,12 +50,12 @@ mod tests { #[test_case(FileLock::lock)] #[cfg_attr(unix, test_case(fallback::FileLock::lock))] - fn lock_basic(lock_fn: fn(PathBuf) -> T) { + fn lock_basic(lock_fn: fn(PathBuf) -> Result) { let temp_dir = testutils::new_temp_dir(); let lock_path = temp_dir.path().join("test.lock"); assert!(!lock_path.exists()); { - let _lock = lock_fn(lock_path.clone()); + let _lock = lock_fn(lock_path.clone()).unwrap(); assert!(lock_path.exists()); } assert!(!lock_path.exists()); @@ -50,7 +63,7 @@ mod tests { #[test_case(FileLock::lock)] #[cfg_attr(unix, test_case(fallback::FileLock::lock))] - fn lock_concurrent(lock_fn: fn(PathBuf) -> T) { + fn lock_concurrent(lock_fn: fn(PathBuf) -> Result) { let temp_dir = testutils::new_temp_dir(); let data_path = temp_dir.path().join("test"); let lock_path = temp_dir.path().join("test.lock"); @@ -59,7 +72,7 @@ mod tests { thread::scope(|s| { for _ in 0..num_threads { s.spawn(|| { - let _lock = lock_fn(lock_path.clone()); + let _lock = lock_fn(lock_path.clone()).unwrap(); let data = fs::read(&data_path).unwrap(); let value = u32::from_le_bytes(data.try_into().unwrap()); thread::sleep(Duration::from_millis(1)); diff --git a/lib/src/lock/unix.rs b/lib/src/lock/unix.rs index 1f65e3a02..b698adcd6 100644 --- a/lib/src/lock/unix.rs +++ b/lib/src/lock/unix.rs @@ -20,21 +20,36 @@ use std::path::PathBuf; use rustix::fs::FlockOperation; use tracing::instrument; +use super::FileLockError; + pub struct FileLock { path: PathBuf, file: File, } impl FileLock { - pub fn lock(path: PathBuf) -> FileLock { + pub fn lock(path: PathBuf) -> Result { loop { // Create lockfile, or open pre-existing one - let file = File::create(&path).expect("failed to open lockfile"); + let file = File::create(&path).map_err(|err| FileLockError { + message: "Failed to open lock file", + path: path.clone(), + err, + })?; // If the lock was already held, wait for it to be released - rustix::fs::flock(&file, FlockOperation::LockExclusive) - .expect("failed to lock lockfile"); + rustix::fs::flock(&file, FlockOperation::LockExclusive).map_err(|errno| { + FileLockError { + message: "Failed to lock lock file", + path: path.clone(), + err: errno.into(), + } + })?; - let stat = rustix::fs::fstat(&file).expect("failed to stat lockfile"); + let stat = rustix::fs::fstat(&file).map_err(|errno| FileLockError { + message: "failed to stat lock file", + path: path.clone(), + err: errno.into(), + })?; if stat.st_nlink == 0 { // Lockfile was deleted, probably by the previous holder's `Drop` impl; create a // new one so our ownership is visible, rather than hidden in an @@ -43,7 +58,7 @@ impl FileLock { continue; } - return Self { path, file }; + return Ok(Self { path, file }); } } } diff --git a/lib/src/op_heads_store.rs b/lib/src/op_heads_store.rs index 1a72bf7b4..b7c29e002 100644 --- a/lib/src/op_heads_store.rs +++ b/lib/src/op_heads_store.rs @@ -37,6 +37,8 @@ pub enum OpHeadsStoreError { new_op_id: OperationId, source: Box, }, + #[error("Failed to lock operation heads store")] + Lock(#[source] Box), } #[derive(Debug, Error)] @@ -68,7 +70,7 @@ pub trait OpHeadsStore: Send + Sync + Debug { /// is to prevent concurrent processes from resolving the same divergent /// operations. It is not needed for correctness; implementations are free /// to return a type that doesn't hold a lock. - fn lock(&self) -> Box; + fn lock(&self) -> Result, OpHeadsStoreError>; } // Given an OpHeadsStore, fetch and resolve its op heads down to one under a @@ -102,7 +104,7 @@ where // Note that the locking isn't necessary for correctness of merge; we take // the lock only to prevent other concurrent processes from doing the same // work (and producing another set of divergent heads). - let _lock = op_heads_store.lock(); + let _lock = op_heads_store.lock()?; let op_head_ids = op_heads_store.get_op_heads()?; if op_head_ids.is_empty() { diff --git a/lib/src/simple_op_heads_store.rs b/lib/src/simple_op_heads_store.rs index f7765a4bf..1f22d3e1a 100644 --- a/lib/src/simple_op_heads_store.rs +++ b/lib/src/simple_op_heads_store.rs @@ -132,9 +132,9 @@ impl OpHeadsStore for SimpleOpHeadsStore { Ok(op_heads) } - fn lock(&self) -> Box { - Box::new(SimpleOpHeadsStoreLock { - _lock: FileLock::lock(self.dir.join("lock")), - }) + fn lock(&self) -> Result, OpHeadsStoreError> { + let lock = FileLock::lock(self.dir.join("lock")) + .map_err(|err| OpHeadsStoreError::Lock(err.into()))?; + Ok(Box::new(SimpleOpHeadsStoreLock { _lock: lock })) } } diff --git a/lib/src/stacked_table.rs b/lib/src/stacked_table.rs index b7d140c1a..112586f5d 100644 --- a/lib/src/stacked_table.rs +++ b/lib/src/stacked_table.rs @@ -435,7 +435,8 @@ impl TableStore { } fn lock(&self) -> FileLock { - FileLock::lock(self.dir.join("lock")) + // TODO: propagate error + FileLock::lock(self.dir.join("lock")).unwrap() } fn load_table(&self, name: String) -> TableStoreResult> { diff --git a/lib/src/transaction.rs b/lib/src/transaction.rs index d507f91ff..1d708e6d6 100644 --- a/lib/src/transaction.rs +++ b/lib/src/transaction.rs @@ -200,7 +200,7 @@ impl UnpublishedOperation { } pub fn publish(self) -> Result, OpHeadsStoreError> { - let _lock = self.op_heads_store.lock(); + let _lock = self.op_heads_store.lock()?; self.op_heads_store .update_op_heads(self.operation().parent_ids(), self.operation().id())?; Ok(self.repo)