#![deny(clippy::pedantic)]
#![deny(clippy::unwrap_used)]
use std::{
fs::File,
io::{read_to_string, Seek, Write},
path::PathBuf,
sync::Arc,
time::Duration,
};
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use config::{open_default_path, parse_config};
use log::{debug, error, info, trace, warn};
use rustix::fs::sync;
use stdio::{StdIo, StdIoLock};
use tempfile::NamedTempFile;
use tokio::{sync::Mutex, task::JoinSet};
use vstorage::{
base::{IcsItem, Item, Storage, VcardItem},
sync::{
declare::StoragePair,
plan::{ItemAction, Plan},
status::{StatusDatabase, StatusError},
SyncError,
},
Etag,
};
use crate::cli::{Cli, Command};
mod cli;
mod config;
mod stdio;
mod tls;
pub const VERSION: &str = "2.0.0-alpha0";
pub(crate) struct NamedPair<I: Item> {
name: String,
pub(crate) inner: StoragePair<I>,
status_path: Utf8PathBuf,
conflict_resolution: Option<RawCommand>,
locks: (Arc<Mutex<()>>, Arc<Mutex<()>>),
names: (String, String),
}
pub struct RawCommand {
command: String,
args: Vec<String>,
}
impl RawCommand {
#[must_use]
pub fn command(&self) -> std::process::Command {
let mut cmd = std::process::Command::new(&self.command);
cmd.args(&self.args);
cmd
}
}
#[allow(clippy::needless_pass_by_value)]
pub fn log_error(error: SyncError) {
error!("{error}");
}
impl<I: Item> NamedPair<I> {
async fn daemon(self, interval: Duration) -> StatusError {
loop {
if let Err(err) = self.sync_once(false).await {
error!("Error synchronising {}: {}", self.name, err);
if let Ok(status_error) = err.downcast::<StatusError>() {
return status_error;
};
};
warn!(
"Monitoring is not implemented, will auto-sync every {} minutes.",
interval.as_secs() / 60
);
tokio::time::sleep(interval).await;
}
}
async fn sync_once(&self, dry_run: bool ) -> anyhow::Result<()> {
let lock_0 = self.locks.0.lock().await;
let lock_1 = self.locks.1.lock().await;
let plan = self.create_plan().await?;
self.print_plan(&plan);
if !dry_run {
let status_rw = StatusDatabase::open_or_create(&self.status_path)
.with_context(|| format!("open_or_create status db for {}", self.name))?;
plan.execute(&status_rw, log_error).await?;
}
drop(lock_0);
drop(lock_1);
Ok(())
}
async fn create_plan(&self) -> anyhow::Result<Plan<I>> {
debug!("Creating plan for storage pair '{}'.", self.name);
let status_ro = StatusDatabase::open_readonly(&self.status_path)
.with_context(|| format!("open_readonly status db for {}", self.name))?;
Ok(Plan::new(&self.inner, status_ro.as_ref()).await?)
}
async fn discover(&self) -> anyhow::Result<()> {
let disco = self.inner.storage_a().discover_collections().await?;
println!("For pair {}, storage a/{}:", self.name, self.names.0);
for collection in disco.collections() {
println!("- id={} href={}", collection.id(), collection.href());
}
let disco = self.inner.storage_b().discover_collections().await?;
println!("For pair {}, storage b/{}:", self.name, self.names.1);
for collection in disco.collections() {
println!("- id={} href={}", collection.id(), collection.href());
}
Ok(())
}
fn print_plan(&self, plan: &Plan<I>) {
info!(">>> Plan for storage pair '{}'", self.name);
for cp in &plan.collection_plans {
info!(
"collection: {}, action: {}. {} item actions. {} property actions.",
cp.alias(),
cp.collection_action,
cp.item_actions.len(),
cp.property_actions.len(),
);
for item in &cp.item_actions {
info!("item: {}", item);
debug!("{item:?}");
}
for prop in &cp.property_actions {
info!("property: {:?}", prop);
}
}
}
async fn resolve_conflicts(self, stdio: Arc<StdIo>) -> anyhow::Result<()> {
let Some(ref raw_cmd) = self.conflict_resolution else {
error!("No conflict resolution command for {}.", self.name);
return Ok(());
};
info!("Resolving conflicts for pair {}.", self.name);
let plan = self.create_plan().await?;
self.print_plan(&plan);
let conflicts = plan
.collection_plans
.into_iter()
.flat_map(|cp| cp.item_actions)
.filter_map(|action| match action {
ItemAction::Conflict { a, b, .. } => Some((a, b)),
_ => None,
})
.collect::<Vec<_>>();
let total = conflicts.len();
trace!("Taking stdio lock...");
let lock = stdio.lock().await;
trace!("Stdio lock taken.");
for (i, (a, b)) in conflicts.into_iter().enumerate() {
println!("Next is item {}/{total}", i + 1);
continue_or_abort(&lock)?;
let (mut temp_a, etag_a) = save_item_to_tempfile(self.inner.storage_a(), &a.href)
.await
.context("fetching conflicted item from A")?;
let (mut temp_b, etag_b) = save_item_to_tempfile(self.inner.storage_b(), &b.href)
.await
.context("fetching conflicted item from B")?;
info!("Running conflict resolution for item {}", a.uid);
let exit_status = raw_cmd
.command()
.arg(temp_a.path())
.arg(temp_b.path())
.spawn()
.context("executing conflict resolution command")?
.wait()
.context("waiting for conflict resolution command")?;
if !exit_status.success() {
error!("Conflict resolution command failed: {exit_status}");
continue;
}
sync();
temp_a.rewind().context("seeking in temporary file for A")?;
temp_b.rewind().context("seeking in temporary file for B")?;
let new_a = read_to_string(temp_a).context("reading resolved item A")?;
let new_b = read_to_string(temp_b).context("reading resolved item B")?;
if new_a.is_empty() {
error!("Resolved item A is empty.");
continue;
}
if new_b.is_empty() {
error!("Resolved item B is empty.");
continue;
}
if new_a.trim() != new_b.trim() {
error!("Conflict resolution yielded mismatching items; skipping");
continue;
}
let new = I::from(new_a);
drop(new_b);
self.inner
.storage_a()
.update_item(&a.href, &etag_a, &new)
.await
.context("uploading resolved item into A")?;
debug!("Uploaded resolved item to A.");
self.inner
.storage_b()
.update_item(&b.href, &etag_b, &new)
.await
.context("uploading resolved item into B")?;
debug!("Uploaded resolved item to B.");
info!("Resolved conflicts for '{}'.", new.ident());
}
Ok(())
}
}
async fn save_item_to_tempfile<I: Item>(
storage: &dyn Storage<I>,
href: &str,
) -> anyhow::Result<(NamedTempFile, Etag)> {
let mut temp =
NamedTempFile::new().context("creating temporary file for conflict resolution")?;
debug!("Fetching {href} for conflict resolution...");
let (data, etag) = storage
.get_item(href)
.await
.context("fetching conflicting item from a")?;
temp.write_all(data.as_str().as_bytes())
.context("writing item into temporary file")?;
Ok((temp, etag))
}
fn continue_or_abort(stdio: &StdIoLock) -> anyhow::Result<()> {
let input = stdio.stdin();
loop {
println!("Continue? [Y/n]");
let mut response = String::new();
input
.read_line(&mut response)
.context("Reading response from stdin")?;
match response.trim().to_lowercase().as_str() {
"" | "y" => return Ok(()),
"n" => bail!("Aborted by user."),
_ => {}
};
}
}
pub(crate) struct App {
interval: Duration,
calendar_pairs: Vec<NamedPair<IcsItem>>,
contact_pairs: Vec<NamedPair<VcardItem>>,
stdio: Arc<StdIo>,
}
impl App {
async fn discover(&self) -> anyhow::Result<()> {
for pair in &self.calendar_pairs {
pair.discover().await?;
}
for pair in &self.contact_pairs {
pair.discover().await?;
}
Ok(())
}
async fn daemon(self) -> anyhow::Result<()> {
let mut set = JoinSet::new();
for pair in self.calendar_pairs {
set.spawn(pair.daemon(self.interval));
}
for pair in self.contact_pairs {
set.spawn(pair.daemon(self.interval));
}
while let Some(res) = set.join_next().await {
match res {
Ok(err) => error!("Error in sync task: {}.", err),
Err(joinerr) => error!("Sync task aborted: {}.", joinerr),
}
}
anyhow::bail!("All sync tasks exited.");
}
async fn sync(self, dry_run: bool) -> anyhow::Result<()> {
let mut set = JoinSet::new();
for pair in self.calendar_pairs {
set.spawn(async move { pair.sync_once(dry_run).await });
}
for pair in self.contact_pairs {
set.spawn(async move { pair.sync_once(dry_run).await });
}
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(())) => {}
Ok(Err(err)) => error!("Error in sync task: {}.", err),
Err(joinerr) => error!("Sync task aborted: {}.", joinerr),
}
}
Ok(())
}
async fn resolve_conflicts(self, dry_run: bool) -> anyhow::Result<()> {
if dry_run {
bail!("dry_run is not implemented for resolve-conflicts");
}
let mut set = JoinSet::new();
for pair in self.calendar_pairs {
set.spawn(pair.resolve_conflicts(self.stdio.clone()));
}
for pair in self.contact_pairs {
set.spawn(pair.resolve_conflicts(self.stdio.clone()));
}
while let Some(res) = set.join_next().await {
match res {
Ok(Ok(())) => {}
Ok(Err(err)) => error!("Error resolving conflicts: {}.", err),
Err(joinerr) => error!("Sync task aborted: {}.", joinerr),
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse(std::env::args()).unwrap_or_else(|err| {
eprintln!("Bad usage: {err}\n");
eprintln!("Usage: pimsync [-c CONFIGFILE ] [-v LOGLEVEL] [-p PAIR] COMMAND [ARGS...]");
eprintln!("Commands:");
eprintln!("\tcheck\t\t\tcheck configuration and exit");
eprintln!("\tdaemon -[r READY_FD]\tkeep storages in sync");
eprintln!("\tsync [-d]\t\tsync storages once");
eprintln!("\tresolve-conflicts [-d]\tmanually resolve conflicts");
eprintln!("\tdiscover\t\tprint discovered collections");
eprintln!("\tversion\t\t\tprint version");
eprintln!("See 'man pimsync' for details");
std::process::exit(100);
});
if let Command::Version = cli.command {
println!("pimsync {VERSION}");
return Ok(());
};
simple_logger::SimpleLogger::new()
.with_level(cli.log_level)
.init()
.expect("logger should initialise");
info!("Logging enabled with {} level", cli.log_level);
let (config_path, config_file) = match cli.config_file {
Some(file) => {
let path = PathBuf::from(file);
let file = File::open(&path)
.with_context(|| format!("Could not open {}.", path.to_string_lossy()))?;
debug!("Opened config file {}", path.to_string_lossy());
(path, file)
}
None => open_default_path()?,
};
let config_data = read_to_string(config_file)?;
let config = parse_config(&config_data, &cli.pairs).with_context(|| {
format!(
"Could not parse configuration file at {}",
config_path.display()
)
})?;
trace!("Parsed configuration: {:?}", &config);
let app = config
.into_app()
.await
.context("initialising application")?;
debug!("Initialised application");
match cli.command {
Command::Check => Ok(()),
Command::Daemon { ready_fd } => {
if let Some(mut f) = ready_fd {
f.write_all(b"READY=1\n")
.context("writing to readiness fd")?;
};
app.daemon().await
}
Command::Sync { dry_run } => app.sync(dry_run).await,
Command::ResolveConflicts { dry_run } => app.resolve_conflicts(dry_run).await,
Command::Discover => app.discover().await,
Command::Version => unreachable!(),
}
}