op_store: implement GC of unreachble operations and views

Since new operations and views may be added concurrently by another process,
there's a risk of data corruption. The keep_newer parameter is a mitigation
for this problem. It's set to preserve files modified within the last 2 weeks,
which is the default of "git gc". Still, a concurrent process may replace an
existing view which is about to be deleted by the gc process, and the view
file would be lost.

#12
This commit is contained in:
Yuya Nishihara 2024-01-07 18:21:10 +09:00
parent 7cfd32bac1
commit e9d31177cb
6 changed files with 191 additions and 5 deletions

View File

@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
* New `jj op abandon` command is added to clean up the operation history. If GC * New `jj op abandon` command is added to clean up the operation history. If GC
is implemented, Git refs and commit objects can be compacted. is implemented, Git refs and commit objects can be compacted.
* `jj util gc` now removes unreachable operation and view objects.
* `jj branch rename` will now warn if the renamed branch has a remote branch, since * `jj branch rename` will now warn if the renamed branch has a remote branch, since
those will have to be manually renamed outside of `jj`. those will have to be manually renamed outside of `jj`.

View File

@ -13,6 +13,8 @@
// limitations under the License. // limitations under the License.
use std::io::Write; use std::io::Write;
use std::slice;
use std::time::{Duration, SystemTime};
use clap::Subcommand; use clap::Subcommand;
use jj_lib::repo::Repo; use jj_lib::repo::Repo;
@ -114,8 +116,14 @@ fn cmd_util_gc(
)); ));
} }
let workspace_command = command.workspace_helper(ui)?; let workspace_command = command.workspace_helper(ui)?;
let store = workspace_command.repo().store(); // TODO: add command argument to specify the expiration time?
store.gc().map_err(|err| user_error(err.to_string()))?; let keep_newer = SystemTime::now() - Duration::from_secs(14 * 86400);
let repo = workspace_command.repo();
repo.op_store()
.gc(slice::from_ref(repo.op_id()), keep_newer)?;
repo.store()
.gc()
.map_err(|err| user_error(err.to_string()))?;
Ok(()) Ok(())
} }

View File

@ -104,6 +104,7 @@ pub fn persist_content_addressed_temp_file<P: AsRef<Path>>(
Ok(file) => Ok(file), Ok(file) => Ok(file),
Err(PersistError { error, file: _ }) => { Err(PersistError { error, file: _ }) => {
if let Ok(existing_file) = File::open(new_path) { if let Ok(existing_file) = File::open(new_path) {
// TODO: Update mtime to help GC keep this file
Ok(existing_file) Ok(existing_file)
} else { } else {
Err(error) Err(error)

View File

@ -17,6 +17,7 @@
use std::collections::{BTreeMap, HashMap, HashSet}; use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{Debug, Error, Formatter}; use std::fmt::{Debug, Error, Formatter};
use std::iter; use std::iter;
use std::time::SystemTime;
use itertools::Itertools as _; use itertools::Itertools as _;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
@ -424,6 +425,15 @@ pub trait OpStore: Send + Sync + Debug {
&self, &self,
prefix: &HexPrefix, prefix: &HexPrefix,
) -> OpStoreResult<PrefixResolution<OperationId>>; ) -> OpStoreResult<PrefixResolution<OperationId>>;
/// Prunes unreachable operations and views.
///
/// All operations and views reachable from the `head_ids` won't be
/// removed. In addition to that, objects created after `keep_newer` will be
/// preserved. This mitigates a risk of deleting new heads created
/// concurrently by another process.
// TODO: return stats?
fn gc(&self, head_ids: &[OperationId], keep_newer: SystemTime) -> OpStoreResult<()>;
} }
#[cfg(test)] #[cfg(test)]

View File

@ -14,26 +14,28 @@
#![allow(missing_docs)] #![allow(missing_docs)]
use std::collections::BTreeMap; use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::Debug; use std::fmt::Debug;
use std::io::{ErrorKind, Write}; use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::SystemTime;
use std::{fs, io}; use std::{fs, io};
use itertools::Itertools as _;
use prost::Message; use prost::Message;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use thiserror::Error; use thiserror::Error;
use crate::backend::{CommitId, MillisSinceEpoch, Timestamp}; use crate::backend::{CommitId, MillisSinceEpoch, Timestamp};
use crate::content_hash::blake2b_hash; use crate::content_hash::blake2b_hash;
use crate::file_util::{persist_content_addressed_temp_file, IoResultExt as _}; use crate::file_util::{persist_content_addressed_temp_file, IoResultExt as _, PathError};
use crate::merge::Merge; use crate::merge::Merge;
use crate::object_id::{HexPrefix, ObjectId, PrefixResolution}; use crate::object_id::{HexPrefix, ObjectId, PrefixResolution};
use crate::op_store::{ use crate::op_store::{
OpStore, OpStoreError, OpStoreResult, Operation, OperationId, OperationMetadata, RefTarget, OpStore, OpStoreError, OpStoreResult, Operation, OperationId, OperationMetadata, RefTarget,
RemoteRef, RemoteRefState, RemoteView, View, ViewId, WorkspaceId, RemoteRef, RemoteRefState, RemoteView, View, ViewId, WorkspaceId,
}; };
use crate::{git, op_store}; use crate::{dag_walk, git, op_store};
// BLAKE2b-512 hash length in bytes // BLAKE2b-512 hash length in bytes
const OPERATION_ID_LENGTH: usize = 64; const OPERATION_ID_LENGTH: usize = 64;
@ -195,6 +197,89 @@ impl OpStore for SimpleOpStore {
.context(&op_dir) .context(&op_dir)
.map_err(|err| OpStoreError::Other(err.into())) .map_err(|err| OpStoreError::Other(err.into()))
} }
#[tracing::instrument(skip(self))]
fn gc(&self, head_ids: &[OperationId], keep_newer: SystemTime) -> OpStoreResult<()> {
let to_op_id = |entry: &fs::DirEntry| -> Option<OperationId> {
let name = entry.file_name().into_string().ok()?;
OperationId::try_from_hex(&name).ok()
};
let to_view_id = |entry: &fs::DirEntry| -> Option<ViewId> {
let name = entry.file_name().into_string().ok()?;
ViewId::try_from_hex(&name).ok()
};
let remove_file_if_not_new = |entry: &fs::DirEntry| -> Result<(), PathError> {
let path = entry.path();
// Check timestamp, but there's still TOCTOU problem if an existing
// file is renewed.
let metadata = entry.metadata().context(&path)?;
let mtime = metadata.modified().expect("unsupported platform?");
if mtime > keep_newer {
tracing::trace!(?path, "not removing");
Ok(())
} else {
tracing::trace!(?path, "removing");
fs::remove_file(&path).context(&path)
}
};
// Reachable objects are resolved without considering the keep_newer
// parameter. We could collect ancestors of the "new" operations here,
// but more files can be added anyway after that.
let read_op = |id: &OperationId| self.read_operation(id).map(|data| (id.clone(), data));
let reachable_ops: HashMap<OperationId, Operation> = dag_walk::dfs_ok(
head_ids.iter().map(read_op),
|(id, _)| id.clone(),
|(_, data)| data.parents.iter().map(read_op).collect_vec(),
)
.try_collect()?;
let reachable_views: HashSet<&ViewId> =
reachable_ops.values().map(|data| &data.view_id).collect();
tracing::info!(
reachable_op_count = reachable_ops.len(),
reachable_view_count = reachable_views.len(),
"collected reachable objects"
);
let prune_ops = || -> Result<(), PathError> {
let op_dir = self.path.join("operations");
for entry in op_dir.read_dir().context(&op_dir)? {
let entry = entry.context(&op_dir)?;
let Some(id) = to_op_id(&entry) else {
tracing::trace!(?entry, "skipping invalid file name");
continue;
};
if reachable_ops.contains_key(&id) {
continue;
}
// If the operation was added after collecting reachable_views,
// its view mtime would also be renewed. So there's no need to
// update the reachable_views set to preserve the view.
remove_file_if_not_new(&entry)?;
}
Ok(())
};
prune_ops().map_err(|err| OpStoreError::Other(err.into()))?;
let prune_views = || -> Result<(), PathError> {
let view_dir = self.path.join("views");
for entry in view_dir.read_dir().context(&view_dir)? {
let entry = entry.context(&view_dir)?;
let Some(id) = to_view_id(&entry) else {
tracing::trace!(?entry, "skipping invalid file name");
continue;
};
if reachable_views.contains(&id) {
continue;
}
remove_file_if_not_new(&entry)?;
}
Ok(())
};
prune_views().map_err(|err| OpStoreError::Other(err.into()))?;
Ok(())
}
} }
fn io_to_read_error(err: std::io::Error, id: &impl ObjectId) -> OpStoreError { fn io_to_read_error(err: std::io::Error, id: &impl ObjectId) -> OpStoreError {

View File

@ -15,6 +15,7 @@
use std::path::Path; use std::path::Path;
use std::slice; use std::slice;
use std::sync::Arc; use std::sync::Arc;
use std::time::SystemTime;
use assert_matches::assert_matches; use assert_matches::assert_matches;
use itertools::Itertools as _; use itertools::Itertools as _;
@ -30,6 +31,7 @@ fn list_dir(dir: &Path) -> Vec<String> {
std::fs::read_dir(dir) std::fs::read_dir(dir)
.unwrap() .unwrap()
.map(|entry| entry.unwrap().file_name().to_str().unwrap().to_owned()) .map(|entry| entry.unwrap().file_name().to_str().unwrap().to_owned())
.sorted()
.collect() .collect()
} }
@ -568,3 +570,81 @@ fn test_resolve_op_parents_children() {
)) ))
); );
} }
#[test]
fn test_gc() {
let settings = stable_op_id_settings();
let test_repo = TestRepo::init();
let repo_0 = test_repo.repo;
let op_store = repo_0.op_store();
let op_dir = repo_0.repo_path().join("op_store").join("operations");
let view_dir = repo_0.repo_path().join("op_store").join("views");
// Set up operation graph:
//
// F
// E (empty)
// D |
// C |
// |/
// B
// A
// 0 (initial)
let empty_tx = |repo: &Arc<ReadonlyRepo>| repo.start_transaction(&settings);
let random_tx = |repo: &Arc<ReadonlyRepo>| {
let mut tx = repo.start_transaction(&settings);
write_random_commit(tx.mut_repo(), &settings);
tx
};
let repo_a = random_tx(&repo_0).commit("op A");
let repo_b = random_tx(&repo_a).commit("op B");
let repo_c = random_tx(&repo_b).commit("op C");
let repo_d = random_tx(&repo_c).commit("op D");
let repo_e = empty_tx(&repo_b).commit("op E");
let repo_f = random_tx(&repo_e).commit("op F");
// Sanity check for the original state
let mut expected_op_entries = list_dir(&op_dir);
let mut expected_view_entries = list_dir(&view_dir);
assert_eq!(expected_op_entries.len(), 7);
assert_eq!(expected_view_entries.len(), 6);
// No heads, but all kept by file modification time
op_store.gc(&[], SystemTime::UNIX_EPOCH).unwrap();
assert_eq!(list_dir(&op_dir), expected_op_entries);
assert_eq!(list_dir(&view_dir), expected_view_entries);
// All reachable from heads
let now = SystemTime::now();
let head_ids = [repo_d.op_id().clone(), repo_f.op_id().clone()];
op_store.gc(&head_ids, now).unwrap();
assert_eq!(list_dir(&op_dir), expected_op_entries);
assert_eq!(list_dir(&view_dir), expected_view_entries);
// E|F are no longer reachable, but E's view is still reachable
op_store.gc(slice::from_ref(repo_d.op_id()), now).unwrap();
expected_op_entries
.retain(|name| *name != repo_e.op_id().hex() && *name != repo_f.op_id().hex());
expected_view_entries.retain(|name| *name != repo_f.operation().view_id().hex());
assert_eq!(list_dir(&op_dir), expected_op_entries);
assert_eq!(list_dir(&view_dir), expected_view_entries);
// B|C|D are no longer reachable
op_store.gc(slice::from_ref(repo_a.op_id()), now).unwrap();
expected_op_entries.retain(|name| {
*name != repo_b.op_id().hex()
&& *name != repo_c.op_id().hex()
&& *name != repo_d.op_id().hex()
});
expected_view_entries.retain(|name| {
*name != repo_b.operation().view_id().hex()
&& *name != repo_c.operation().view_id().hex()
&& *name != repo_d.operation().view_id().hex()
});
assert_eq!(list_dir(&op_dir), expected_op_entries);
assert_eq!(list_dir(&view_dir), expected_view_entries);
// Sanity check for the last state
assert_eq!(expected_op_entries.len(), 2);
assert_eq!(expected_view_entries.len(), 2);
}