From c5497b8024e30d5029959543e117caf7992ee0fb Mon Sep 17 00:00:00 2001 From: Firstyear Date: Mon, 15 Jul 2019 09:15:25 +1000 Subject: [PATCH] Implement backup, restore and server modes This allows backup and restore of the server backend data from the command line. Backups can be taken while the server is running. Automated backups are *not* part of this yet. This also adds a few missing files from a previous commit mistake. Opps! --- Cargo.toml | 9 ++- src/clients/main.rs | 3 + src/lib/async_log.rs | 81 +++++++++++++++++++++++ src/lib/be/mod.rs | 132 ++++++++++++++++++------------------- src/lib/config.rs | 12 ++++ src/lib/core.rs | 88 +++++++++++++++++++++---- src/lib/error.rs | 1 + src/lib/idm/claim.rs | 18 +++++ src/lib/idm/group.rs | 17 +++++ src/lib/lib.rs | 1 + src/lib/proto/v1/actors.rs | 10 +-- src/server/main.rs | 89 +++++++++++++++++++++++-- 12 files changed, 367 insertions(+), 94 deletions(-) create mode 100644 src/clients/main.rs create mode 100644 src/lib/async_log.rs create mode 100644 src/lib/idm/claim.rs create mode 100644 src/lib/idm/group.rs diff --git a/Cargo.toml b/Cargo.toml index 00113e601..e38a3177f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,12 +16,12 @@ name = "rsidm" path = "src/lib/lib.rs" [[bin]] -name = "rsidm_core" +name = "rsidmd" path = "src/server/main.rs" [[bin]] -name = "rsidm_whoami" -path = "src/clients/whoami.rs" +name = "kanidm" +path = "src/clients/main.rs" [dependencies] @@ -43,6 +43,7 @@ tokio = "0.1" futures = "0.1" uuid = { version = "0.7", features = ["serde", "v4"] } serde = "1.0" +serde_cbor = "0.10" serde_json = "1.0" serde_derive = "1.0" @@ -50,6 +51,8 @@ rusqlite = { version = "0.15", features = ["backup"] } r2d2 = "0.8" r2d2_sqlite = "0.7" +structopt = { version = "0.2", default-features = false } + concread = "0.1" diff --git a/src/clients/main.rs b/src/clients/main.rs new file mode 100644 index 000000000..44401251e --- /dev/null +++ b/src/clients/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello kanidm"); +} diff --git a/src/lib/async_log.rs b/src/lib/async_log.rs new file mode 100644 index 000000000..35a9a2a60 --- /dev/null +++ b/src/lib/async_log.rs @@ -0,0 +1,81 @@ +use actix::prelude::*; + +use crate::audit::AuditScope; + +// Helper for internal logging. +// Should only be used at startup/shutdown +#[macro_export] +macro_rules! log_event { + ($log_addr:expr, $($arg:tt)*) => ({ + use crate::async_log::LogEvent; + use std::fmt; + $log_addr.do_send( + LogEvent { + msg: fmt::format( + format_args!($($arg)*) + ) + } + ) + }) +} + +// We need to pass in config for this later +// Or we need to pass in the settings for it IE level and dest? +// Is there an efficent way to set a log level filter in the macros +// so that we don't msg unless it's the correct level? +// Do we need config in the log macro? + +pub fn start() -> actix::Addr { + SyncArbiter::start(1, move || EventLog {}) +} + +pub struct EventLog {} + +impl Actor for EventLog { + type Context = SyncContext; + + /* + fn started(&mut self, ctx: &mut Self::Context) { + ctx.set_mailbox_capacity(1 << 31); + } + */ +} + +// What messages can we be sent. Basically this is all the possible +// inputs we *could* recieve. + +// Add a macro for easy msg write + +pub struct LogEvent { + pub msg: String, +} + +impl Message for LogEvent { + type Result = (); +} + +impl Handler for EventLog { + type Result = (); + + fn handle(&mut self, event: LogEvent, _: &mut SyncContext) -> Self::Result { + info!("logevent: {}", event.msg); + } +} + +impl Handler for EventLog { + type Result = (); + + fn handle(&mut self, event: AuditScope, _: &mut SyncContext) -> Self::Result { + info!("audit: {}", event); + } +} + +/* +impl Handler for EventLog { + type Result = (); + + fn handle(&mut self, event: Event, _: &mut SyncContext) -> Self::Result { + println!("EVENT: {:?}", event) + } +} +*/ diff --git a/src/lib/be/mod.rs b/src/lib/be/mod.rs index 2ea9c836e..3ed5f21aa 100644 --- a/src/lib/be/mod.rs +++ b/src/lib/be/mod.rs @@ -4,6 +4,7 @@ use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::types::ToSql; use rusqlite::NO_PARAMS; +use serde_cbor; use serde_json; use std::convert::TryFrom; use std::fs; @@ -24,7 +25,7 @@ mod sqlite_be; struct IdEntry { // TODO: for now this is i64 to make sqlite work, but entry is u64 for indexing reasons! id: i64, - data: String, + data: Vec, } pub struct Backend { @@ -100,8 +101,8 @@ pub trait BackendTransaction { let entries: Result>, _> = raw_entries .iter() .filter_map(|id_ent| { - let db_e = match serde_json::from_str(id_ent.data.as_str()) - .map_err(|_| OperationError::SerdeJsonError) + let db_e = match serde_cbor::from_slice(id_ent.data.as_slice()) + .map_err(|_| OperationError::SerdeCborError) { Ok(v) => v, Err(e) => return Some(Err(e)), @@ -161,6 +162,65 @@ pub trait BackendTransaction { fn verify(&self) -> Vec> { Vec::new() } + + fn backup(&self, audit: &mut AuditScope, dst_path: &str) -> Result<(), OperationError> { + // load all entries into RAM, may need to change this later + // if the size of the database compared to RAM is an issue + let mut raw_entries: Vec = Vec::new(); + + { + let mut stmt = try_audit!( + audit, + self.get_conn().prepare("SELECT id, data FROM id2entry"), + "sqlite error {:?}", + OperationError::SQLiteError + ); + + let id2entry_iter = try_audit!( + audit, + stmt.query_map(NO_PARAMS, |row| IdEntry { + id: row.get(0), + data: row.get(1), + }), + "sqlite error {:?}", + OperationError::SQLiteError + ); + + for row in id2entry_iter { + raw_entries.push(row.map_err(|_| OperationError::SQLiteError)?); + } + } + + let entries: Result, _> = raw_entries + .iter() + .map(|id_ent| { + serde_cbor::from_slice(id_ent.data.as_slice()) + .map_err(|_| OperationError::SerdeJsonError) + }) + .collect(); + + let entries = entries?; + + let serialized_entries = serde_json::to_string_pretty(&entries); + + let serialized_entries_str = try_audit!( + audit, + serialized_entries, + "serde error {:?}", + OperationError::SerdeJsonError + ); + + let result = fs::write(dst_path, serialized_entries_str); + + try_audit!( + audit, + result, + "fs::write error {:?}", + OperationError::FsError + ); + + Ok(()) + } } impl Drop for BackendReadTransaction { @@ -278,7 +338,7 @@ impl BackendWriteTransaction { .map(|ser_db_e| { id_max = id_max + 1; let data = - serde_json::to_string(&ser_db_e).map_err(|_| OperationError::SerdeJsonError)?; + serde_cbor::to_vec(&ser_db_e).map_err(|_| OperationError::SerdeCborError)?; Ok(IdEntry { id: id_max, @@ -371,8 +431,7 @@ impl BackendWriteTransaction { } })?; - let data = - serde_json::to_string(&db_e).map_err(|_| OperationError::SerdeJsonError)?; + let data = serde_cbor::to_vec(&db_e).map_err(|_| OperationError::SerdeCborError)?; Ok(IdEntry { // TODO: Instead of getting these from the entry, we could lookup @@ -476,65 +535,6 @@ impl BackendWriteTransaction { }) } - pub fn backup(&self, audit: &mut AuditScope, dst_path: &str) -> Result<(), OperationError> { - // load all entries into RAM, may need to change this later - // if the size of the database compared to RAM is an issue - let mut raw_entries: Vec = Vec::new(); - - { - let mut stmt = try_audit!( - audit, - self.get_conn().prepare("SELECT id, data FROM id2entry"), - "sqlite error {:?}", - OperationError::SQLiteError - ); - - let id2entry_iter = try_audit!( - audit, - stmt.query_map(NO_PARAMS, |row| IdEntry { - id: row.get(0), - data: row.get(1), - }), - "sqlite error {:?}", - OperationError::SQLiteError - ); - - for row in id2entry_iter { - raw_entries.push(row.map_err(|_| OperationError::SQLiteError)?); - } - } - - let entries: Result, _> = raw_entries - .iter() - .map(|id_ent| { - serde_json::from_str(id_ent.data.as_str()) - .map_err(|_| OperationError::SerdeJsonError) - }) - .collect(); - - let entries = entries?; - - let serialized_entries = serde_json::to_string_pretty(&entries); - - let serialized_entries_str = try_audit!( - audit, - serialized_entries, - "serde error {:?}", - OperationError::SerdeJsonError - ); - - let result = fs::write(dst_path, serialized_entries_str); - - try_audit!( - audit, - result, - "fs::write error {:?}", - OperationError::FsError - ); - - Ok(()) - } - pub unsafe fn purge(&self, audit: &mut AuditScope) -> Result<(), OperationError> { // remove all entries from database try_audit!( @@ -650,7 +650,7 @@ impl BackendWriteTransaction { self.conn.execute( "CREATE TABLE IF NOT EXISTS id2entry ( id INTEGER PRIMARY KEY ASC, - data TEXT NOT NULL + data BLOB NOT NULL ) ", NO_PARAMS, diff --git a/src/lib/config.rs b/src/lib/config.rs index 1c68ff364..ced75273b 100644 --- a/src/lib/config.rs +++ b/src/lib/config.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + #[derive(Serialize, Deserialize, Debug)] pub struct Configuration { pub address: String, @@ -23,4 +25,14 @@ impl Configuration { secure_cookies: false, } } + + pub fn update_db_path(&mut self, p: &PathBuf) { + match p.to_str() { + Some(p) => self.db_path = p.to_string(), + None => { + error!("Invalid DB path supplied"); + std::process::exit(1); + } + } + } } diff --git a/src/lib/core.rs b/src/lib/core.rs index ccbe8641e..63d760631 100644 --- a/src/lib/core.rs +++ b/src/lib/core.rs @@ -12,6 +12,8 @@ use crate::config::Configuration; // SearchResult use crate::async_log; +use crate::audit::AuditScope; +use crate::be::{Backend, BackendTransaction}; use crate::error::OperationError; use crate::interval::IntervalActor; use crate::proto::v1::actors::QueryServerV1; @@ -253,6 +255,62 @@ fn auth( ) } +fn setup_backend(config: &Configuration) -> Result { + let mut audit_be = AuditScope::new("backend_setup"); + let be = Backend::new(&mut audit_be, config.db_path.as_str()); + // debug! + debug!("{}", audit_be); + be +} + +pub fn backup_server_core(config: Configuration, dst_path: &str) { + let be = match setup_backend(&config) { + Ok(be) => be, + Err(e) => { + error!("Failed to setup BE: {:?}", e); + return; + } + }; + let mut audit = AuditScope::new("backend_backup"); + + let be_ro_txn = be.read(); + let r = be_ro_txn.backup(&mut audit, dst_path); + debug!("{}", audit); + match r { + Ok(_) => info!("Backup success!"), + Err(e) => { + error!("Backup failed: {:?}", e); + std::process::exit(1); + } + }; + // Let the txn abort, even on success. +} + +pub fn restore_server_core(config: Configuration, dst_path: &str) { + let be = match setup_backend(&config) { + Ok(be) => be, + Err(e) => { + error!("Failed to setup BE: {:?}", e); + return; + } + }; + let mut audit = AuditScope::new("backend_restore"); + + let be_wr_txn = be.write(); + let r = be_wr_txn + .restore(&mut audit, dst_path) + .and_then(|_| be_wr_txn.commit()); + debug!("{}", audit); + + match r { + Ok(_) => info!("Restore success!"), + Err(e) => { + error!("Restore failed: {:?}", e); + std::process::exit(1); + } + }; +} + pub fn create_server_core(config: Configuration) { // Until this point, we probably want to write to the log macro fns. @@ -264,18 +322,26 @@ pub fn create_server_core(config: Configuration) { // Similar, create a stats thread which aggregates statistics from the // server as they come in. + // Setup the be for the qs. + let be = match setup_backend(&config) { + Ok(be) => be, + Err(e) => { + error!("Failed to setup BE: {:?}", e); + return; + } + }; + // Start the query server with the given be path: future config - let server_addr = - match QueryServerV1::start(log_addr.clone(), config.db_path.as_str(), config.threads) { - Ok(addr) => addr, - Err(e) => { - println!( - "An unknown failure in startup has occured - exiting -> {:?}", - e - ); - return; - } - }; + let server_addr = match QueryServerV1::start(log_addr.clone(), be, config.threads) { + Ok(addr) => addr, + Err(e) => { + println!( + "An unknown failure in startup has occured - exiting -> {:?}", + e + ); + return; + } + }; // Setup timed events let _int_addr = IntervalActor::new(server_addr.clone()).start(); diff --git a/src/lib/error.rs b/src/lib/error.rs index 36b1a3032..1b5f34bea 100644 --- a/src/lib/error.rs +++ b/src/lib/error.rs @@ -34,6 +34,7 @@ pub enum OperationError { SQLiteError, //(RusqliteError) FsError, SerdeJsonError, + SerdeCborError, AccessDenied, NotAuthenticated, InvalidAuthState(&'static str), diff --git a/src/lib/idm/claim.rs b/src/lib/idm/claim.rs new file mode 100644 index 000000000..49d92983b --- /dev/null +++ b/src/lib/idm/claim.rs @@ -0,0 +1,18 @@ +use crate::proto::v1::Claim as ProtoClaim; + +#[derive(Debug)] +pub struct Claim { + // For now, empty. Later we'll flesh this out to uuid + name? +} + +impl Claim { + pub fn new() -> Self { + Claim { + // Fill this in! + } + } + + pub fn into_proto(&self) -> ProtoClaim { + unimplemented!(); + } +} diff --git a/src/lib/idm/group.rs b/src/lib/idm/group.rs new file mode 100644 index 000000000..26dfffd63 --- /dev/null +++ b/src/lib/idm/group.rs @@ -0,0 +1,17 @@ +use crate::proto::v1::Group as ProtoGroup; + +#[derive(Debug, Clone)] +pub struct Group { + // name +// uuid +} + +impl Group { + pub fn new() -> Self { + Group {} + } + + pub fn into_proto(&self) -> ProtoGroup { + unimplemented!(); + } +} diff --git a/src/lib/lib.rs b/src/lib/lib.rs index 31256ebf2..fd2c876d4 100644 --- a/src/lib/lib.rs +++ b/src/lib/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate log; extern crate serde; +extern crate serde_cbor; extern crate serde_json; #[macro_use] extern crate serde_derive; diff --git a/src/lib/proto/v1/actors.rs b/src/lib/proto/v1/actors.rs index d40aac785..cddbe647a 100644 --- a/src/lib/proto/v1/actors.rs +++ b/src/lib/proto/v1/actors.rs @@ -53,7 +53,7 @@ impl QueryServerV1 { // threads for write vs read pub fn start( log: actix::Addr, - path: &str, + be: Backend, threads: usize, ) -> Result, OperationError> { let mut audit = AuditScope::new("server_start"); @@ -68,14 +68,6 @@ impl QueryServerV1 { Err(e) => return Err(e), }; - // Create a new backend audit scope - let mut audit_be = AuditScope::new("backend_new"); - let be = match Backend::new(&mut audit_be, path) { - Ok(be) => be, - Err(e) => return Err(e), - }; - audit.append_scope(audit_be); - { let be_txn = be.write(); let mut schema_write = schema.write(); diff --git a/src/server/main.rs b/src/server/main.rs index 7cc9f8184..5730569f0 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -2,21 +2,100 @@ extern crate actix; extern crate env_logger; extern crate rsidm; +extern crate structopt; +#[macro_use] +extern crate log; use rsidm::config::Configuration; -use rsidm::core::create_server_core; +use rsidm::core::{backup_server_core, create_server_core, restore_server_core}; + +use std::path::PathBuf; +use structopt::StructOpt; + +#[derive(Debug, StructOpt)] +struct ServerOpt { + #[structopt(short = "d", long = "debug")] + debug: bool, + #[structopt(parse(from_os_str), short = "D", long = "db_path")] + db_path: PathBuf, +} + +#[derive(Debug, StructOpt)] +struct BackupOpt { + #[structopt(parse(from_os_str))] + path: PathBuf, + #[structopt(flatten)] + serveropts: ServerOpt, +} + +#[derive(Debug, StructOpt)] +struct RestoreOpt { + #[structopt(parse(from_os_str))] + path: PathBuf, + #[structopt(flatten)] + serveropts: ServerOpt, +} + +#[derive(Debug, StructOpt)] +enum Opt { + #[structopt(name = "server")] + Server(ServerOpt), + #[structopt(name = "backup")] + Backup(BackupOpt), + #[structopt(name = "restore")] + Restore(RestoreOpt), +} fn main() { + // Read cli args, determine if we should backup/restore + let opt = Opt::from_args(); + // Read our config (if any) - let config = Configuration::new(); + let mut config = Configuration::new(); + // Apply any cli overrides? // Configure the server logger. This could be adjusted based on what config // says. ::std::env::set_var("RUST_LOG", "actix_web=info,rsidm=info"); env_logger::init(); - let sys = actix::System::new("rsidm-server"); + match opt { + Opt::Server(sopt) => { + info!("Running in server mode ..."); - create_server_core(config); - let _ = sys.run(); + config.update_db_path(&sopt.db_path); + + let sys = actix::System::new("rsidm-server"); + create_server_core(config); + let _ = sys.run(); + } + Opt::Backup(bopt) => { + info!("Running in backup mode ..."); + + config.update_db_path(&bopt.serveropts.db_path); + + let p = match bopt.path.to_str() { + Some(p) => p, + None => { + error!("Invalid backup path"); + std::process::exit(1); + } + }; + backup_server_core(config, p); + } + Opt::Restore(ropt) => { + info!("Running in restore mode ..."); + + config.update_db_path(&ropt.serveropts.db_path); + + let p = match ropt.path.to_str() { + Some(p) => p, + None => { + error!("Invalid restore path"); + std::process::exit(1); + } + }; + restore_server_core(config, p); + } + } }