use std::collections::HashSet;
use std::sync::Arc;
use log::{debug, warn};
use crate::base::{FetchedItem, ItemRef, Property, Storage};
use crate::disco::{DiscoveredCollection, Discovery};
use crate::{base::Item, sync::declare::StoragePair};
use crate::{CollectionId, ErrorKind, Href};
use super::declare::{CollectionDescription, DeclaredMapping, OnEmpty};
use super::status::{ItemState, MappingUid, Side, StatusDatabase, StatusError, StatusForItem};
#[derive(thiserror::Error, Debug)]
pub enum PlanError {
#[error("Conflicting mappings on side {0} for href {1}.")]
ConflictingMappings(Side, Href),
#[error("Discovery failed for storage A: {0}")]
DiscoveryFailedA(#[source] crate::Error),
#[error("Discovery failed for storage B: {0}")]
DiscoveryFailedB(#[source] crate::Error),
#[error("Error interacting with underlying storage: {0}")]
Storage(#[from] crate::Error),
#[error("Error querying status database: {0}")]
StatusDb(#[from] StatusError),
}
pub struct Plan<I: Item> {
pub(super) storage_a: Arc<dyn Storage<I>>,
pub(super) storage_b: Arc<dyn Storage<I>>,
pub collection_plans: Vec<CollectionPlan<I>>,
}
impl<I: Item> std::fmt::Debug for Plan<I> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&self.collection_plans, f)
}
}
impl<I: Item> Plan<I> {
pub async fn new(
pair: &StoragePair<I>,
status: Option<&StatusDatabase>,
) -> Result<Plan<I>, PlanError> {
let mappings = create_mappings_for_pair(pair).await?;
let mut collection_plans = Vec::with_capacity(mappings.len());
for m in mappings {
collection_plans.push(CollectionPlan::new(pair, m, status).await?);
}
Ok(Plan {
storage_a: pair.storage_a.clone(),
storage_b: pair.storage_b.clone(),
collection_plans,
})
}
#[must_use]
pub fn storage_a(&self) -> &dyn Storage<I> {
self.storage_a.as_ref()
}
#[must_use]
pub fn storage_b(&self) -> &dyn Storage<I> {
self.storage_b.as_ref()
}
}
async fn create_mappings_for_pair<I: Item>(
pair: &StoragePair<I>,
) -> Result<Vec<ResolvedMapping>, PlanError> {
let mut mappings = Vec::<ResolvedMapping>::with_capacity(pair.mappings.len());
let disco_a = pair
.storage_a
.discover_collections()
.await
.map_err(PlanError::DiscoveryFailedA)?;
let disco_b = pair
.storage_b
.discover_collections()
.await
.map_err(PlanError::DiscoveryFailedB)?;
for mapping in &pair.mappings {
mappings.push(
ResolvedMapping::from_declared_mapping(
mapping,
pair.storage_a.as_ref(),
pair.storage_b.as_ref(),
&disco_a,
&disco_b,
)
.await?,
);
}
if pair.all_from_a {
mappings.reserve(disco_a.collection_count());
for collection in disco_a.collections() {
mappings.push(ResolvedMapping {
alias: format!("id:{}", collection.id()),
a: ResolvedCollection {
href: collection.href().to_string(),
id: Some(collection.id().clone()),
exists: true,
},
b: resolve_mapping_counterpart(collection, &disco_b, pair.storage_b())?,
});
}
}
if pair.all_from_b {
mappings.reserve(disco_b.collection_count());
for collection in disco_b.collections() {
let mapping = ResolvedMapping {
alias: format!("id:{}", collection.id()),
a: resolve_mapping_counterpart(collection, &disco_a, pair.storage_a())?,
b: ResolvedCollection {
href: collection.href().to_string(),
id: Some(collection.id().clone()),
exists: true,
},
};
if mappings.iter().any(|m| *m == mapping) {
debug!("Skipping mapping; already present.");
} else {
mappings.push(mapping);
}
}
}
check_for_duplicate_mappings(&mappings)?;
Ok(mappings)
}
fn check_for_duplicate_mappings(mappings: &[ResolvedMapping]) -> Result<(), PlanError> {
let mut seen = Vec::<(&Href, &Href)>::new(); for mapping in mappings {
if let Some(conflict) = seen.iter().find_map(|s| {
if s.0 == &mapping.a.href {
Some(PlanError::ConflictingMappings(Side::A, s.0.to_string()))
} else if s.1 == &mapping.b.href {
Some(PlanError::ConflictingMappings(Side::B, s.1.to_string()))
} else {
None
}
}) {
return Err(conflict);
};
seen.push((&mapping.a.href, &mapping.b.href));
}
Ok(())
}
#[cfg(test)]
mod test {
use std::{str::FromStr, sync::Arc};
use tempfile::Builder;
use crate::{
base::{IcsItem, Storage},
sync::{
declare::{CollectionDescription, DeclaredMapping, StoragePair},
plan::{create_mappings_for_pair, Plan, PlanError},
},
vdir::VdirStorage,
CollectionId,
};
#[tokio::test]
async fn test_plan_no_mappings() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(VdirStorage::<IcsItem>::new(
dir_a.path().to_path_buf().try_into().unwrap(),
"ics".to_string(),
));
let storage_b = Arc::from(VdirStorage::<IcsItem>::new(
dir_b.path().to_path_buf().try_into().unwrap(),
"ics".to_string(),
));
let pair = StoragePair::new(storage_a.clone(), storage_b.clone());
assert!(Plan::new(&pair, None).await.is_ok());
}
#[tokio::test]
async fn test_plan_simple_mapping() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(VdirStorage::<IcsItem>::new(
dir_a.path().to_path_buf().try_into().unwrap(),
"ics".to_string(),
));
let storage_b = Arc::from(VdirStorage::<IcsItem>::new(
dir_b.path().to_path_buf().try_into().unwrap(),
"ics".to_string(),
));
let collection = CollectionId::from_str("test").unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(DeclaredMapping::direct(collection));
let mappings = create_mappings_for_pair(&pair).await.unwrap();
assert_eq!(mappings.len(), 1);
let plan = Plan::new(&pair, None).await.unwrap();
assert_eq!(plan.collection_plans.len(), 1);
}
#[tokio::test]
async fn test_plan_duplicate_mapping() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(VdirStorage::<IcsItem>::new(
dir_a.path().to_path_buf().try_into().unwrap(),
"ics".to_string(),
));
let storage_b = Arc::from(VdirStorage::<IcsItem>::new(
dir_b.path().to_path_buf().try_into().unwrap(),
"ics".to_string(),
));
let collection = CollectionId::from_str("test").unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(DeclaredMapping::direct(collection.clone()))
.with_mapping(DeclaredMapping::direct(collection));
let err = create_mappings_for_pair(&pair).await.unwrap_err();
assert!(matches!(err, PlanError::ConflictingMappings(..)));
}
#[tokio::test]
async fn test_plan_conflicting_mapping() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(VdirStorage::<IcsItem>::new(
dir_a.path().to_path_buf().try_into().unwrap(),
"ics".to_string(),
));
let storage_b = Arc::from(VdirStorage::<IcsItem>::new(
dir_b.path().to_path_buf().try_into().unwrap(),
"ics".to_string(),
));
let collection = CollectionId::from_str("test").unwrap();
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_mapping(DeclaredMapping::direct(collection.clone()))
.with_mapping(DeclaredMapping::Mapped {
alias: "test".to_string(),
a: CollectionDescription::Id { id: collection },
b: CollectionDescription::Id {
id: CollectionId::from_str("test_2").unwrap(),
},
});
let err = create_mappings_for_pair(&pair).await.unwrap_err();
assert!(matches!(err, PlanError::ConflictingMappings(..)));
}
#[tokio::test]
async fn test_plan_same_from_both_sides() {
let dir_a = Builder::new().prefix("vstorage").tempdir().unwrap();
let dir_b = Builder::new().prefix("vstorage").tempdir().unwrap();
let storage_a = Arc::new(VdirStorage::<IcsItem>::new(
dir_a.path().to_path_buf().try_into().unwrap(),
"ics".to_string(),
));
let storage_b = Arc::from(VdirStorage::<IcsItem>::new(
dir_b.path().to_path_buf().try_into().unwrap(),
"ics".to_string(),
));
std::fs::create_dir(dir_a.path().join("one")).unwrap();
std::fs::create_dir(dir_b.path().join("one")).unwrap();
let disco = storage_a.discover_collections().await.unwrap();
assert_eq!(disco.collections().len(), 1);
let pair = StoragePair::new(storage_a.clone(), storage_b.clone())
.with_all_from_a()
.with_all_from_b();
let mappings = create_mappings_for_pair(&pair).await.unwrap();
assert_eq!(mappings.len(), 1);
let plan = Plan::new(&pair, None).await.unwrap();
assert_eq!(plan.collection_plans.len(), 1);
}
}
#[derive(Debug, PartialEq)]
pub(super) struct ResolvedMapping {
pub(super) alias: String,
pub(super) a: ResolvedCollection,
pub(super) b: ResolvedCollection,
}
impl ResolvedMapping {
async fn from_declared_mapping<I: Item>(
declared: &DeclaredMapping,
storage_a: &dyn Storage<I>,
storage_b: &dyn Storage<I>,
disco_a: &Discovery,
disco_b: &Discovery,
) -> Result<Self, crate::Error> {
match declared {
DeclaredMapping::Direct { description } => Ok(ResolvedMapping {
alias: description.alias(),
a: ResolvedCollection::from_declaration(description, disco_a, storage_a).await?,
b: ResolvedCollection::from_declaration(description, disco_b, storage_b).await?,
}),
DeclaredMapping::Mapped { a, b, alias } => Ok(ResolvedMapping {
alias: alias.to_string(),
a: ResolvedCollection::from_declaration(a, disco_a, storage_a).await?,
b: ResolvedCollection::from_declaration(b, disco_b, storage_b).await?,
}),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(super) struct ResolvedCollection {
pub(super) id: Option<CollectionId>,
pub(super) href: Href,
exists: bool,
}
impl ResolvedCollection {
async fn from_declaration<I: Item>(
declared: &CollectionDescription,
discovery: &Discovery,
storage: &dyn Storage<I>,
) -> Result<ResolvedCollection, crate::Error> {
match declared {
CollectionDescription::Id { id } => {
if let Some(collection) = discovery.find_collection_by_id(id) {
Ok(ResolvedCollection {
id: Some(id.clone()),
href: collection.href().to_string(),
exists: true,
})
} else {
Ok(ResolvedCollection {
id: Some(id.clone()),
href: storage.href_for_collection_id(id)?,
exists: false,
})
}
}
CollectionDescription::Href { href } => {
let id = discovery
.collections()
.iter()
.find(|c| c.href() == *href)
.map(|c| c.id().clone());
let exists = id.is_some() || storage_exists(storage, href).await?;
Ok(ResolvedCollection {
href: href.clone(),
id,
exists,
})
}
}
}
}
async fn storage_exists<I: Item>(
storage: &dyn Storage<I>,
href: &str,
) -> Result<bool, crate::Error> {
match storage.list_items(href).await {
Ok(_) => Ok(true),
Err(e) => {
if e.kind == ErrorKind::DoesNotExist || e.kind == ErrorKind::AccessDenied {
Ok(false)
} else {
Err(e)
}
}
}
}
fn resolve_mapping_counterpart<I: Item>(
source_collection: &DiscoveredCollection,
target_discovery: &Discovery,
target_storage: &dyn Storage<I>,
) -> Result<ResolvedCollection, PlanError> {
let id = source_collection.id();
match target_discovery.find_collection_by_id(id) {
Some(c) => Ok(ResolvedCollection {
href: c.href().to_string(),
id: Some(id.clone()),
exists: true,
}),
None => Ok(ResolvedCollection {
id: Some(id.clone()),
href: target_storage.href_for_collection_id(id)?,
exists: false,
}),
}
}
#[derive(Debug)]
pub struct CollectionPlan<I: Item> {
pub collection_action: CollectionAction,
pub item_actions: Vec<ItemAction>,
pub property_actions: Vec<PropertyPlan<I>>,
pub(super) mapping: ResolvedMapping,
}
impl<I: Item> CollectionPlan<I> {
async fn new(
pair: &StoragePair<I>,
mapping: ResolvedMapping,
status: Option<&StatusDatabase>,
) -> Result<CollectionPlan<I>, PlanError> {
let (href_a, href_b) = (&mapping.a.href, &mapping.b.href);
let mapping_uid = status
.map(|s| s.get_mapping_uid(href_a, href_b))
.transpose()?
.flatten();
let (items_a, items_b) = tokio::try_join!(
items_for_collection(status, pair.storage_a(), href_a, Side::A),
items_for_collection(status, pair.storage_b(), href_b, Side::B),
)?;
let status_uids = match (status, &mapping_uid) {
(Some(s), Some(m)) => s.all_uids(m)?,
_ => Vec::new(),
};
if (items_a.is_empty() || items_b.is_empty())
&& !status_uids.is_empty()
&& pair.on_empty == OnEmpty::Skip
{
let mapping_uid =
mapping_uid.expect("If mapping_uid is None, then status_uid must be empty.");
warn!("Collection has been emptied on one side; skipping.");
return Ok(CollectionPlan {
collection_action: CollectionAction::NoAction(mapping_uid),
item_actions: Vec::new(),
property_actions: Vec::new(),
mapping,
});
}
let all_uids = items_a
.iter()
.chain(items_b.iter())
.map(|i| &i.uid)
.chain(status_uids.iter())
.collect::<HashSet<_>>(); let item_actions = all_uids
.into_iter()
.map(|uid| {
let item_a = items_a.iter().find(|i| i.uid == *uid);
let item_b = items_b.iter().find(|i| i.uid == *uid);
let previous = match (status, &mapping_uid) {
(Some(s), Some(m)) => { s.get_item_hash_by_uid(m, uid) }?,
_ => None,
};
Ok(ItemAction::for_item(item_a, item_b, previous, uid))
})
.filter_map(Result::transpose)
.collect::<Result<Vec<_>, PlanError>>()?;
let collection_action =
CollectionAction::new(mapping.a.exists, mapping.b.exists, mapping_uid.clone());
let property_actions =
match PropertyPlan::create_for_collection(pair, &mapping, status, mapping_uid).await {
Ok(plan) => plan,
Err(err) => 'unsupported: {
if let PlanError::Storage(e) = &err {
if e.kind == ErrorKind::Unsupported {
break 'unsupported Vec::<PropertyPlan<I>>::new();
}
}
return Err(err);
}
};
Ok(CollectionPlan {
collection_action,
item_actions,
property_actions,
mapping,
})
}
#[must_use]
pub fn alias(&self) -> &str {
&self.mapping.alias
}
}
#[derive(PartialEq, Debug, Clone)]
pub enum ItemAction {
SaveToStatus {
a: ItemState,
b: ItemState,
},
UpdateStatus {
hash: String,
old_a: ItemRef,
old_b: ItemRef,
new_a: ItemRef,
new_b: ItemRef,
},
ClearStatus {
uid: String,
},
CreateInA {
source: ItemState,
},
CreateInB {
source: ItemState,
},
UpdateInA {
source: Href,
target: Href,
old_a: ItemRef,
old_b: ItemRef,
},
UpdateInB {
source: Href,
target: Href,
old_a: ItemRef,
old_b: ItemRef,
},
DeleteInA {
target: ItemState,
},
DeleteInB {
target: ItemState,
},
Conflict {
a: ItemState,
b: ItemState,
is_new: bool,
},
}
impl ItemAction {
#[must_use]
fn for_item(
current_a: Option<&ItemState>,
current_b: Option<&ItemState>,
previous: Option<StatusForItem>,
uid: &str,
) -> Option<ItemAction> {
match (current_a, current_b, previous) {
(None, None, None) => unreachable!("no action for item that doesn't exist anywhere"),
(None, None, Some(_)) => Some(ItemAction::ClearStatus {
uid: uid.to_string(),
}),
(None, Some(b), None) => Some(ItemAction::CreateInA { source: b.clone() }),
(None, Some(b), Some(prev)) => {
if b.hash == prev.hash {
Some(ItemAction::DeleteInB { target: b.clone() })
} else {
warn!("Item deleted in A but changed B: {}.", b.uid);
Some(ItemAction::CreateInA { source: b.clone() })
}
}
(Some(a), None, None) => Some(ItemAction::CreateInB { source: a.clone() }),
(Some(a), None, Some(prev)) => {
if a.hash == prev.hash {
Some(ItemAction::DeleteInA { target: a.clone() })
} else {
warn!("Item deleted in B but changed A: {}.", a.uid);
Some(ItemAction::CreateInB { source: a.clone() })
}
}
(Some(a), Some(b), Some(prev)) => {
if a.hash == b.hash {
if a.hash != prev.hash || a.href != prev.href_a || b.href != prev.href_b
|| a.etag != prev.etag_a || b.etag != prev.etag_b
{
Some(ItemAction::UpdateStatus {
hash: a.hash.clone(),
old_a: ItemRef {
href: prev.href_a,
etag: prev.etag_a,
},
old_b: ItemRef {
href: prev.href_b.clone(),
etag: prev.etag_b,
},
new_a: ItemRef {
href: a.href.clone(),
etag: a.etag.clone(),
},
new_b: ItemRef {
href: b.href.clone(),
etag: b.etag.clone(),
},
})
} else {
None
}
} else if a.hash == prev.hash {
Some(ItemAction::UpdateInA {
source: b.href.clone(),
target: a.href.clone(),
old_a: a.to_item_ref(),
old_b: ItemRef {
href: prev.href_b,
etag: prev.etag_b,
},
})
} else if b.hash == prev.hash {
Some(ItemAction::UpdateInB {
source: a.href.clone(),
target: b.href.clone(),
old_a: ItemRef {
href: prev.href_a,
etag: prev.etag_a,
},
old_b: b.to_item_ref(),
})
} else {
Some(ItemAction::Conflict {
a: a.clone(),
b: b.clone(),
is_new: false,
})
}
}
(Some(a), Some(b), None) => {
if a.hash == b.hash {
Some(ItemAction::SaveToStatus {
a: a.clone(),
b: b.clone(),
})
} else {
Some(ItemAction::Conflict {
a: a.clone(),
b: b.clone(),
is_new: true,
})
}
}
}
}
}
impl std::fmt::Display for ItemAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ItemAction::SaveToStatus { a, .. } => write!(f, "save to status (uid: {})", a.uid),
ItemAction::UpdateStatus { old_a, .. } => {
write!(f, "update in status (a.href: {})", old_a.href)
}
ItemAction::ClearStatus { uid } => write!(f, "clear from status (uid: {uid})"),
ItemAction::CreateInA { source } => {
write!(f, "create in storage a (uid: {})", source.uid)
}
ItemAction::CreateInB { source } => {
write!(f, "create in storage b (uid: {})", source.uid)
}
ItemAction::UpdateInA { source, .. } => {
write!(f, "update in storage a (href: {source})")
}
ItemAction::UpdateInB { source, .. } => {
write!(f, "update in storage b (href: {source})")
}
ItemAction::DeleteInA { target } => {
write!(f, "delete in storage a (uid: {})", target.uid)
}
ItemAction::DeleteInB { target } => {
write!(f, "delete in storage b (uid: {})", target.uid)
}
ItemAction::Conflict { a, .. } => {
write!(f, "conflict (uid: {})", a.uid)
}
}
}
}
#[allow(private_interfaces)]
#[derive(PartialEq, Debug, Clone)]
pub enum CollectionAction {
NoAction(MappingUid),
SaveToStatus,
CreateInA,
CreateInB,
CreateInBoth,
Delete(MappingUid, Side),
}
impl CollectionAction {
fn new(current_a: bool, current_b: bool, mapping_uid: Option<MappingUid>) -> CollectionAction {
match (current_a, current_b, mapping_uid) {
(false, false, _) => CollectionAction::CreateInBoth, (true, true, None) => CollectionAction::SaveToStatus, (true, true, Some(m)) => CollectionAction::NoAction(m), (false, true, Some(m)) => CollectionAction::Delete(m, Side::B), (false, true, None) => CollectionAction::CreateInA, (true, false, None) => CollectionAction::CreateInB, (true, false, Some(m)) => CollectionAction::Delete(m, Side::A), }
}
}
impl std::fmt::Display for CollectionAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CollectionAction::NoAction(_) => write!(f, "no action"),
CollectionAction::SaveToStatus => write!(f, "save to status"),
CollectionAction::CreateInA => write!(f, "create in storage a"),
CollectionAction::CreateInB => write!(f, "create in storage b"),
CollectionAction::CreateInBoth => write!(f, "create in both storages"),
CollectionAction::Delete(_, side) => write!(f, "delete from {side}"),
}
}
}
async fn items_for_collection<I: Item>(
status: Option<&StatusDatabase>,
storage: &dyn Storage<I>,
collection: &Href,
side: Side,
) -> Result<Vec<ItemState>, PlanError> {
debug!("Resolving state for collection: {}.", collection);
let mut items = Vec::new();
let prefetched = if let Some(status) = status {
let mut to_prefetch = Vec::new();
let listed_items = match storage.list_items(collection).await {
Ok(i) => i,
Err(err) if err.kind == ErrorKind::DoesNotExist => {
return Ok(Vec::new());
}
Err(err) => return Err(err.into()),
};
for item_ref in listed_items {
if let Some(prev_item) = status.get_item_by_href(side, &item_ref.href)? {
if prev_item.etag == item_ref.etag {
items.push(prev_item);
continue;
} } to_prefetch.push(item_ref.href);
}
let to_prefetch = to_prefetch.iter().map(String::as_str).collect::<Vec<_>>();
storage.get_many_items(&to_prefetch).await?
} else {
match storage.get_all_items(collection).await {
Ok(items) => items,
Err(err) if err.kind == ErrorKind::DoesNotExist => Vec::new(),
Err(err) => return Err(PlanError::from(err)),
}
};
let prefetched = prefetched
.into_iter()
.map(|FetchedItem { href, item, etag }| ItemState {
href,
uid: item.ident(),
etag,
hash: item.hash(),
});
items.extend(prefetched);
Ok(items)
}
#[derive(Debug)]
pub enum PropertyAction {
WriteToA { value: String },
WriteToB { value: String },
DeleteInA,
DeleteInB,
ClearStatus,
UpdateStatus { value: String },
Conflict,
}
impl std::fmt::Display for PropertyAction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PropertyAction::WriteToA { value } => write!(f, "Write to a: {value}"),
PropertyAction::WriteToB { value } => write!(f, "Write to b: {value}"),
PropertyAction::DeleteInA => write!(f, "Delete in a"),
PropertyAction::DeleteInB => write!(f, "Delete in b"),
PropertyAction::ClearStatus => write!(f, "Clear status"),
PropertyAction::UpdateStatus { .. } => write!(f, "Update status"),
PropertyAction::Conflict => write!(f, "Conflict"),
}
}
}
#[derive(Debug)]
pub struct PropertyPlan<I: Item> {
pub(super) property: I::Property,
pub(super) action: PropertyAction,
}
impl<I: Item> PropertyPlan<I> {
async fn create_for_collection(
pair: &StoragePair<I>,
mapping: &ResolvedMapping,
status: Option<&StatusDatabase>,
uid: Option<MappingUid>,
) -> Result<Vec<PropertyPlan<I>>, PlanError> {
let props_a = pair.storage_a().list_properties(&mapping.a.href).await?;
let props_b = pair.storage_b().list_properties(&mapping.b.href).await?;
let props_status = match (status, uid) {
(Some(s), Some(u)) => s.list_properties_for_collection(&u)?,
_ => Vec::new(),
};
let all_props = props_a
.iter()
.chain(props_b.iter())
.map(|p| &p.property)
.collect::<HashSet<_>>(); let mut actions = Vec::new();
for property in all_props {
let a = props_a.iter().find(|p| p.property == *property);
let b = props_b.iter().find(|p| p.property == *property);
let state = props_status.iter().find(|p| p.property == property.name());
let action = match (a, b, state) {
(None, None, None) => None,
(None, None, Some(_)) => Some(PropertyAction::ClearStatus),
(None, Some(b), None) => Some(PropertyAction::WriteToA {
value: b.value.clone(),
}),
(None, Some(_), Some(_)) => Some(PropertyAction::DeleteInB {}),
(Some(a), None, None) => Some(PropertyAction::WriteToB {
value: a.value.clone(),
}),
(Some(_), None, Some(_)) => Some(PropertyAction::DeleteInA {}),
(Some(a), Some(b), None) => {
if a.value == b.value {
Some(PropertyAction::UpdateStatus {
value: a.value.clone(),
})
} else {
Some(PropertyAction::Conflict)
}
}
(Some(a), Some(b), Some(s)) => {
if a.value == b.value {
if s.value == a.value {
None
} else {
Some(PropertyAction::UpdateStatus {
value: a.value.clone(),
})
}
} else if a.value == s.value {
Some(PropertyAction::WriteToA {
value: b.value.clone(),
})
} else if b.value == s.value {
Some(PropertyAction::WriteToB {
value: a.value.clone(),
})
} else {
Some(PropertyAction::Conflict)
}
}
};
if let Some(action) = action {
actions.push(PropertyPlan {
action,
property: property.clone(), });
}
}
Ok(actions)
}
}