lock: propagate error from lock(), remove panic from unlock path

This commit is contained in:
Yuya Nishihara 2024-11-12 22:16:10 +09:00
parent 6ffd4d4f63
commit aaf7f33804
8 changed files with 71 additions and 29 deletions

View File

@ -1729,7 +1729,10 @@ impl WorkingCopy for LocalWorkingCopy {
fn start_mutation(&self) -> Result<Box<dyn LockedWorkingCopy>, WorkingCopyStateError> {
let lock_path = self.state_path.join("working_copy.lock");
let lock = FileLock::lock(lock_path);
let lock = FileLock::lock(lock_path).map_err(|err| WorkingCopyStateError {
message: "Failed to lock working copy".to_owned(),
err: err.into(),
})?;
let wc = LocalWorkingCopy {
store: self.store.clone(),

View File

@ -19,6 +19,8 @@ use std::time::Duration;
use tracing::instrument;
use super::FileLockError;
pub struct FileLock {
path: PathBuf,
_file: File,
@ -56,7 +58,7 @@ impl Iterator for BackoffIterator {
// Suppress warning on platforms where specialized lock impl is available
#[cfg_attr(unix, allow(dead_code))]
impl FileLock {
pub fn lock(path: PathBuf) -> FileLock {
pub fn lock(path: PathBuf) -> Result<FileLock, FileLockError> {
let mut options = OpenOptions::new();
options.create_new(true);
options.write(true);
@ -64,7 +66,7 @@ impl FileLock {
loop {
match options.open(&path) {
Ok(file) => {
return FileLock { path, _file: file };
return Ok(FileLock { path, _file: file });
}
Err(err)
if err.kind() == std::io::ErrorKind::AlreadyExists
@ -74,15 +76,19 @@ impl FileLock {
if let Some(duration) = backoff_iterator.next() {
std::thread::sleep(duration);
} else {
panic!(
"Timed out while trying to create lock file {}: {}",
path.display(),
err
);
return Err(FileLockError {
message: "Timed out while trying to create lock file",
path,
err,
});
}
}
Err(err) => {
panic!("Failed to create lock file {}: {}", path.display(), err);
return Err(FileLockError {
message: "Failed to create lock file",
path,
err,
})
}
}
}
@ -92,6 +98,8 @@ impl FileLock {
impl Drop for FileLock {
#[instrument(skip_all)]
fn drop(&mut self) {
std::fs::remove_file(&self.path).expect("Failed to delete lock file");
std::fs::remove_file(&self.path)
.inspect_err(|err| tracing::warn!(?err, ?self.path, "Failed to delete lock file"))
.ok();
}
}

View File

@ -18,16 +18,29 @@ mod fallback;
#[cfg(unix)]
mod unix;
use std::io;
use std::path::PathBuf;
use thiserror::Error;
#[cfg(not(unix))]
pub use self::fallback::FileLock;
#[cfg(unix)]
pub use self::unix::FileLock;
#[derive(Debug, Error)]
#[error("{message}: {path}")]
pub struct FileLockError {
pub message: &'static str,
pub path: PathBuf,
#[source]
pub err: io::Error,
}
#[cfg(test)]
mod tests {
use std::cmp::max;
use std::fs;
use std::path::PathBuf;
use std::thread;
use std::time::Duration;
@ -37,12 +50,12 @@ mod tests {
#[test_case(FileLock::lock)]
#[cfg_attr(unix, test_case(fallback::FileLock::lock))]
fn lock_basic<T>(lock_fn: fn(PathBuf) -> T) {
fn lock_basic<T>(lock_fn: fn(PathBuf) -> Result<T, FileLockError>) {
let temp_dir = testutils::new_temp_dir();
let lock_path = temp_dir.path().join("test.lock");
assert!(!lock_path.exists());
{
let _lock = lock_fn(lock_path.clone());
let _lock = lock_fn(lock_path.clone()).unwrap();
assert!(lock_path.exists());
}
assert!(!lock_path.exists());
@ -50,7 +63,7 @@ mod tests {
#[test_case(FileLock::lock)]
#[cfg_attr(unix, test_case(fallback::FileLock::lock))]
fn lock_concurrent<T>(lock_fn: fn(PathBuf) -> T) {
fn lock_concurrent<T>(lock_fn: fn(PathBuf) -> Result<T, FileLockError>) {
let temp_dir = testutils::new_temp_dir();
let data_path = temp_dir.path().join("test");
let lock_path = temp_dir.path().join("test.lock");
@ -59,7 +72,7 @@ mod tests {
thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
let _lock = lock_fn(lock_path.clone());
let _lock = lock_fn(lock_path.clone()).unwrap();
let data = fs::read(&data_path).unwrap();
let value = u32::from_le_bytes(data.try_into().unwrap());
thread::sleep(Duration::from_millis(1));

View File

@ -20,21 +20,36 @@ use std::path::PathBuf;
use rustix::fs::FlockOperation;
use tracing::instrument;
use super::FileLockError;
pub struct FileLock {
path: PathBuf,
file: File,
}
impl FileLock {
pub fn lock(path: PathBuf) -> FileLock {
pub fn lock(path: PathBuf) -> Result<FileLock, FileLockError> {
loop {
// Create lockfile, or open pre-existing one
let file = File::create(&path).expect("failed to open lockfile");
let file = File::create(&path).map_err(|err| FileLockError {
message: "Failed to open lock file",
path: path.clone(),
err,
})?;
// If the lock was already held, wait for it to be released
rustix::fs::flock(&file, FlockOperation::LockExclusive)
.expect("failed to lock lockfile");
rustix::fs::flock(&file, FlockOperation::LockExclusive).map_err(|errno| {
FileLockError {
message: "Failed to lock lock file",
path: path.clone(),
err: errno.into(),
}
})?;
let stat = rustix::fs::fstat(&file).expect("failed to stat lockfile");
let stat = rustix::fs::fstat(&file).map_err(|errno| FileLockError {
message: "failed to stat lock file",
path: path.clone(),
err: errno.into(),
})?;
if stat.st_nlink == 0 {
// Lockfile was deleted, probably by the previous holder's `Drop` impl; create a
// new one so our ownership is visible, rather than hidden in an
@ -43,7 +58,7 @@ impl FileLock {
continue;
}
return Self { path, file };
return Ok(Self { path, file });
}
}
}

View File

@ -37,6 +37,8 @@ pub enum OpHeadsStoreError {
new_op_id: OperationId,
source: Box<dyn std::error::Error + Send + Sync>,
},
#[error("Failed to lock operation heads store")]
Lock(#[source] Box<dyn std::error::Error + Send + Sync>),
}
#[derive(Debug, Error)]
@ -68,7 +70,7 @@ pub trait OpHeadsStore: Send + Sync + Debug {
/// is to prevent concurrent processes from resolving the same divergent
/// operations. It is not needed for correctness; implementations are free
/// to return a type that doesn't hold a lock.
fn lock(&self) -> Box<dyn OpHeadsStoreLock + '_>;
fn lock(&self) -> Result<Box<dyn OpHeadsStoreLock + '_>, OpHeadsStoreError>;
}
// Given an OpHeadsStore, fetch and resolve its op heads down to one under a
@ -102,7 +104,7 @@ where
// Note that the locking isn't necessary for correctness of merge; we take
// the lock only to prevent other concurrent processes from doing the same
// work (and producing another set of divergent heads).
let _lock = op_heads_store.lock();
let _lock = op_heads_store.lock()?;
let op_head_ids = op_heads_store.get_op_heads()?;
if op_head_ids.is_empty() {

View File

@ -132,9 +132,9 @@ impl OpHeadsStore for SimpleOpHeadsStore {
Ok(op_heads)
}
fn lock(&self) -> Box<dyn OpHeadsStoreLock + '_> {
Box::new(SimpleOpHeadsStoreLock {
_lock: FileLock::lock(self.dir.join("lock")),
})
fn lock(&self) -> Result<Box<dyn OpHeadsStoreLock + '_>, OpHeadsStoreError> {
let lock = FileLock::lock(self.dir.join("lock"))
.map_err(|err| OpHeadsStoreError::Lock(err.into()))?;
Ok(Box::new(SimpleOpHeadsStoreLock { _lock: lock }))
}
}

View File

@ -435,7 +435,8 @@ impl TableStore {
}
fn lock(&self) -> FileLock {
FileLock::lock(self.dir.join("lock"))
// TODO: propagate error
FileLock::lock(self.dir.join("lock")).unwrap()
}
fn load_table(&self, name: String) -> TableStoreResult<Arc<ReadonlyTable>> {

View File

@ -200,7 +200,7 @@ impl UnpublishedOperation {
}
pub fn publish(self) -> Result<Arc<ReadonlyRepo>, OpHeadsStoreError> {
let _lock = self.op_heads_store.lock();
let _lock = self.op_heads_store.lock()?;
self.op_heads_store
.update_op_heads(self.operation().parent_ids(), self.operation().id())?;
Ok(self.repo)