working_copy: send updates via channel

In preparation of traversing the filesystem in parallel, send updates via `channel`.

An alternative is to modify shared mutable state, e.g. put `self.file_states` behind a mutex or use a concurrent hash-map. This risks leaving the `TreeState` in an invalid state if an error occurs, and makes invariants harder to reason about.

Using a channel introduces a small performance regression. (I didn't try out the concurrent hash-map approach.)

```sh
$ hyperfine --parameter-list hash before,after --setup 'git checkout {hash} && cargo build --profile release-with-debug' --warmup 3 './target/release-with-debug/jj -R ../nixpkgs st'
Benchmark 1: ./target/release-with-debug/jj -R ../nixpkgs st (hash = before)
  Time (mean ± σ):      1.533 s ±  0.013 s    [User: 0.587 s, System: 0.926 s]
  Range (min … max):    1.510 s …  1.559 s    10 runs

Benchmark 2: ./target/release-with-debug/jj -R ../nixpkgs st (hash = after)
  Time (mean ± σ):      1.563 s ±  0.021 s    [User: 0.607 s, System: 0.936 s]
  Range (min … max):    1.518 s …  1.595 s    10 runs

Summary
  ./target/release-with-debug/jj -R ../nixpkgs st (hash = before) ran
    1.02 ± 0.02 times faster than ./target/release-with-debug/jj -R ../nixpkgs st (hash = after)
```
This commit is contained in:
Waleed Khan 2023-08-03 08:13:56 -07:00 committed by Martin von Zweigbergk
parent 174704d752
commit 326be7c91e

View File

@ -26,6 +26,7 @@ use std::os::unix::fs::symlink;
#[cfg(unix)] #[cfg(unix)]
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Sender};
use std::sync::Arc; use std::sync::Arc;
use std::time::UNIX_EPOCH; use std::time::UNIX_EPOCH;
@ -51,7 +52,6 @@ use crate::op_store::{OperationId, WorkspaceId};
use crate::repo_path::{FsPathParseError, RepoPath, RepoPathComponent, RepoPathJoin}; use crate::repo_path::{FsPathParseError, RepoPath, RepoPathComponent, RepoPathJoin};
use crate::store::Store; use crate::store::Store;
use crate::tree::{Diff, Tree}; use crate::tree::{Diff, Tree};
use crate::tree_builder::TreeBuilder;
#[cfg(unix)] #[cfg(unix)]
type FileExecutableFlag = bool; type FileExecutableFlag = bool;
@ -655,17 +655,34 @@ impl TreeState {
git_ignore: base_ignores, git_ignore: base_ignores,
}]; }];
trace_span!("traverse filesystem").in_scope(|| -> Result<(), SnapshotError> { trace_span!("traverse filesystem").in_scope(|| -> Result<(), SnapshotError> {
let (tree_entries_tx, tree_entries_rx) = channel();
let (file_states_tx, file_states_rx) = channel();
let (deleted_files_tx, deleted_files_rx) = channel();
while let Some(work_item) = work.pop() { while let Some(work_item) = work.pop() {
work.extend(self.visit_directory( work.extend(self.visit_directory(
&matcher, &matcher,
&current_tree, &current_tree,
&mut tree_builder, tree_entries_tx.clone(),
&mut deleted_files, file_states_tx.clone(),
deleted_files_tx.clone(),
work_item, work_item,
progress, progress,
)?); )?);
} }
drop(tree_entries_tx);
drop(file_states_tx);
drop(deleted_files_tx);
while let Ok((path, tree_value)) = tree_entries_rx.recv() {
tree_builder.set(path, tree_value);
}
while let Ok((path, file_state)) = file_states_rx.recv() {
self.file_states.insert(path, file_state);
}
while let Ok(path) = deleted_files_rx.recv() {
deleted_files.remove(&path);
}
Ok(()) Ok(())
})?; })?;
@ -679,12 +696,14 @@ impl TreeState {
Ok(has_changes || fsmonitor_clock_needs_save) Ok(has_changes || fsmonitor_clock_needs_save)
} }
#[allow(clippy::too_many_arguments)]
fn visit_directory( fn visit_directory(
&mut self, &self,
matcher: &dyn Matcher, matcher: &dyn Matcher,
current_tree: &Tree, current_tree: &Tree,
tree_builder: &mut TreeBuilder, tree_entries_tx: Sender<(RepoPath, TreeValue)>,
deleted_files: &mut HashSet<RepoPath>, file_states_tx: Sender<(RepoPath, FileState)>,
deleted_files_tx: Sender<RepoPath>,
work_item: WorkItem, work_item: WorkItem,
progress: Option<&SnapshotProgress>, progress: Option<&SnapshotProgress>,
) -> Result<Vec<WorkItem>, SnapshotError> { ) -> Result<Vec<WorkItem>, SnapshotError> {
@ -747,7 +766,7 @@ impl TreeState {
} }
}; };
if let Some(new_file_state) = file_state(&metadata) { if let Some(new_file_state) = file_state(&metadata) {
deleted_files.remove(&tracked_path); deleted_files_tx.send(tracked_path.clone()).ok();
let update = self.get_updated_tree_value( let update = self.get_updated_tree_value(
&tracked_path, &tracked_path,
disk_path, disk_path,
@ -756,9 +775,11 @@ impl TreeState {
&new_file_state, &new_file_state,
)?; )?;
if let Some(tree_value) = update { if let Some(tree_value) = update {
tree_builder.set(tracked_path.clone(), tree_value); tree_entries_tx
.send((tracked_path.clone(), tree_value))
.ok();
} }
self.file_states.insert(tracked_path, new_file_state); file_states_tx.send((tracked_path, new_file_state)).ok();
} }
} }
} else { } else {
@ -785,7 +806,7 @@ impl TreeState {
err, err,
})?; })?;
if let Some(new_file_state) = file_state(&metadata) { if let Some(new_file_state) = file_state(&metadata) {
deleted_files.remove(&path); deleted_files_tx.send(path.clone()).ok();
let update = self.get_updated_tree_value( let update = self.get_updated_tree_value(
&path, &path,
entry.path(), entry.path(),
@ -794,9 +815,9 @@ impl TreeState {
&new_file_state, &new_file_state,
)?; )?;
if let Some(tree_value) = update { if let Some(tree_value) = update {
tree_builder.set(path.clone(), tree_value); tree_entries_tx.send((path.clone(), tree_value)).ok();
} }
self.file_states.insert(path, new_file_state); file_states_tx.send((path, new_file_state)).ok();
} }
} }
} }