view: use the Operation wrapper type in merge_op_heads()

This is partly to prepare for merging the operations in order of
transaction-commit time (currently merged in order of operation id),
so we can get a predictable order in tests (assuming transactions are
not committed the same millisecond).
This commit is contained in:
Martin von Zweigbergk 2021-01-10 00:07:43 -08:00
parent 48e664c716
commit c4cd12e93e

View File

@ -214,45 +214,55 @@ fn get_single_op_head(
fn merge_op_heads(
store: &StoreWrapper,
op_store: &Arc<dyn OpStore>,
op_heads: &[OperationId],
op_head_ids: &[OperationId],
) -> Result<(OperationId, op_store::Operation, op_store::View), OpHeadResolutionError> {
let neighbors_fn = |op_id: &OperationId| op_store.read_operation(op_id).unwrap().parents;
let op_heads: Vec<_> = op_head_ids
.iter()
.map(|op_id: &OperationId| {
let data = op_store.read_operation(op_id).unwrap();
Operation::new(op_store.clone(), op_id.clone(), data)
})
.collect();
let neighbors_fn = |op: &Operation| op.parents();
// Remove ancestors so we don't create merge operation with an operation and its
// ancestor
let op_heads = dag_walk::unreachable(
op_heads.iter().cloned(),
&neighbors_fn,
&|op_id: &OperationId| op_id.clone(),
);
let op_heads =
dag_walk::unreachable(op_heads, &neighbors_fn, &|op: &Operation| op.id().clone());
let mut op_heads: Vec<_> = op_heads.into_iter().collect();
op_heads.sort_by_key(|op_id| op_id.0.clone());
let first_op_head = op_store.read_operation(&op_heads[0]).unwrap();
let mut merged_view = op_store.read_view(&first_op_head.view_id).unwrap();
op_heads.sort_by_key(|op| op.id().0.clone());
let first_op_head = op_heads[0].clone();
let mut merged_view = op_store.read_view(first_op_head.view().id()).unwrap();
// Return without creating a merge operation
if op_heads.len() == 1 {
return Ok((op_heads[0].clone(), first_op_head, merged_view));
return Ok((
op_heads[0].id().clone(),
first_op_head.store_operation().clone(),
merged_view,
));
}
for (i, other_op_head_id) in op_heads.iter().enumerate().skip(1) {
let ancestor_op_id = dag_walk::closest_common_node(
for (i, other_op_head) in op_heads.iter().enumerate().skip(1) {
let ancestor_op = dag_walk::closest_common_node(
op_heads[0..i].to_vec(),
vec![other_op_head_id.clone()],
vec![other_op_head.clone()],
&neighbors_fn,
&|op_id: &OperationId| op_id.clone(),
&|op: &Operation| op.id().clone(),
)
.unwrap();
let ancestor_op = op_store.read_operation(&ancestor_op_id).unwrap();
let ancestor_view = op_store.read_view(&ancestor_op.view_id).unwrap();
let other_op = op_store.read_operation(other_op_head_id).unwrap();
let other_view = op_store.read_view(&other_op.view_id).unwrap();
merged_view = merge_views(store, &merged_view, &ancestor_view, &other_view);
merged_view = merge_views(
store,
&merged_view,
ancestor_op.view().store_view(),
other_op_head.view().store_view(),
);
}
let merged_view_id = op_store.write_view(&merged_view).unwrap();
let operation_metadata = OperationMetadata::new("resolve concurrent operations".to_string());
let op_parent_ids = op_heads.iter().map(|op| op.id().clone()).collect();
let merge_operation = op_store::Operation {
view_id: merged_view_id,
parents: op_heads,
parents: op_parent_ids,
metadata: operation_metadata,
};
let merge_operation_id = op_store.write_operation(&merge_operation).unwrap();