diff --git a/Cargo.toml b/Cargo.toml index 00113e601..e38a3177f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,12 +16,12 @@ name = "rsidm" path = "src/lib/lib.rs" [[bin]] -name = "rsidm_core" +name = "rsidmd" path = "src/server/main.rs" [[bin]] -name = "rsidm_whoami" -path = "src/clients/whoami.rs" +name = "kanidm" +path = "src/clients/main.rs" [dependencies] @@ -43,6 +43,7 @@ tokio = "0.1" futures = "0.1" uuid = { version = "0.7", features = ["serde", "v4"] } serde = "1.0" +serde_cbor = "0.10" serde_json = "1.0" serde_derive = "1.0" @@ -50,6 +51,8 @@ rusqlite = { version = "0.15", features = ["backup"] } r2d2 = "0.8" r2d2_sqlite = "0.7" +structopt = { version = "0.2", default-features = false } + concread = "0.1" diff --git a/src/clients/main.rs b/src/clients/main.rs new file mode 100644 index 000000000..44401251e --- /dev/null +++ b/src/clients/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello kanidm"); +} diff --git a/src/lib/async_log.rs b/src/lib/async_log.rs new file mode 100644 index 000000000..35a9a2a60 --- /dev/null +++ b/src/lib/async_log.rs @@ -0,0 +1,81 @@ +use actix::prelude::*; + +use crate::audit::AuditScope; + +// Helper for internal logging. +// Should only be used at startup/shutdown +#[macro_export] +macro_rules! log_event { + ($log_addr:expr, $($arg:tt)*) => ({ + use crate::async_log::LogEvent; + use std::fmt; + $log_addr.do_send( + LogEvent { + msg: fmt::format( + format_args!($($arg)*) + ) + } + ) + }) +} + +// We need to pass in config for this later +// Or we need to pass in the settings for it IE level and dest? +// Is there an efficent way to set a log level filter in the macros +// so that we don't msg unless it's the correct level? +// Do we need config in the log macro? + +pub fn start() -> actix::Addr { + SyncArbiter::start(1, move || EventLog {}) +} + +pub struct EventLog {} + +impl Actor for EventLog { + type Context = SyncContext; + + /* + fn started(&mut self, ctx: &mut Self::Context) { + ctx.set_mailbox_capacity(1 << 31); + } + */ +} + +// What messages can we be sent. Basically this is all the possible +// inputs we *could* recieve. + +// Add a macro for easy msg write + +pub struct LogEvent { + pub msg: String, +} + +impl Message for LogEvent { + type Result = (); +} + +impl Handler for EventLog { + type Result = (); + + fn handle(&mut self, event: LogEvent, _: &mut SyncContext) -> Self::Result { + info!("logevent: {}", event.msg); + } +} + +impl Handler for EventLog { + type Result = (); + + fn handle(&mut self, event: AuditScope, _: &mut SyncContext) -> Self::Result { + info!("audit: {}", event); + } +} + +/* +impl Handler for EventLog { + type Result = (); + + fn handle(&mut self, event: Event, _: &mut SyncContext) -> Self::Result { + println!("EVENT: {:?}", event) + } +} +*/ diff --git a/src/lib/be/mod.rs b/src/lib/be/mod.rs index 2ea9c836e..3ed5f21aa 100644 --- a/src/lib/be/mod.rs +++ b/src/lib/be/mod.rs @@ -4,6 +4,7 @@ use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::types::ToSql; use rusqlite::NO_PARAMS; +use serde_cbor; use serde_json; use std::convert::TryFrom; use std::fs; @@ -24,7 +25,7 @@ mod sqlite_be; struct IdEntry { // TODO: for now this is i64 to make sqlite work, but entry is u64 for indexing reasons! id: i64, - data: String, + data: Vec, } pub struct Backend { @@ -100,8 +101,8 @@ pub trait BackendTransaction { let entries: Result>, _> = raw_entries .iter() .filter_map(|id_ent| { - let db_e = match serde_json::from_str(id_ent.data.as_str()) - .map_err(|_| OperationError::SerdeJsonError) + let db_e = match serde_cbor::from_slice(id_ent.data.as_slice()) + .map_err(|_| OperationError::SerdeCborError) { Ok(v) => v, Err(e) => return Some(Err(e)), @@ -161,6 +162,65 @@ pub trait BackendTransaction { fn verify(&self) -> Vec> { Vec::new() } + + fn backup(&self, audit: &mut AuditScope, dst_path: &str) -> Result<(), OperationError> { + // load all entries into RAM, may need to change this later + // if the size of the database compared to RAM is an issue + let mut raw_entries: Vec = Vec::new(); + + { + let mut stmt = try_audit!( + audit, + self.get_conn().prepare("SELECT id, data FROM id2entry"), + "sqlite error {:?}", + OperationError::SQLiteError + ); + + let id2entry_iter = try_audit!( + audit, + stmt.query_map(NO_PARAMS, |row| IdEntry { + id: row.get(0), + data: row.get(1), + }), + "sqlite error {:?}", + OperationError::SQLiteError + ); + + for row in id2entry_iter { + raw_entries.push(row.map_err(|_| OperationError::SQLiteError)?); + } + } + + let entries: Result, _> = raw_entries + .iter() + .map(|id_ent| { + serde_cbor::from_slice(id_ent.data.as_slice()) + .map_err(|_| OperationError::SerdeJsonError) + }) + .collect(); + + let entries = entries?; + + let serialized_entries = serde_json::to_string_pretty(&entries); + + let serialized_entries_str = try_audit!( + audit, + serialized_entries, + "serde error {:?}", + OperationError::SerdeJsonError + ); + + let result = fs::write(dst_path, serialized_entries_str); + + try_audit!( + audit, + result, + "fs::write error {:?}", + OperationError::FsError + ); + + Ok(()) + } } impl Drop for BackendReadTransaction { @@ -278,7 +338,7 @@ impl BackendWriteTransaction { .map(|ser_db_e| { id_max = id_max + 1; let data = - serde_json::to_string(&ser_db_e).map_err(|_| OperationError::SerdeJsonError)?; + serde_cbor::to_vec(&ser_db_e).map_err(|_| OperationError::SerdeCborError)?; Ok(IdEntry { id: id_max, @@ -371,8 +431,7 @@ impl BackendWriteTransaction { } })?; - let data = - serde_json::to_string(&db_e).map_err(|_| OperationError::SerdeJsonError)?; + let data = serde_cbor::to_vec(&db_e).map_err(|_| OperationError::SerdeCborError)?; Ok(IdEntry { // TODO: Instead of getting these from the entry, we could lookup @@ -476,65 +535,6 @@ impl BackendWriteTransaction { }) } - pub fn backup(&self, audit: &mut AuditScope, dst_path: &str) -> Result<(), OperationError> { - // load all entries into RAM, may need to change this later - // if the size of the database compared to RAM is an issue - let mut raw_entries: Vec = Vec::new(); - - { - let mut stmt = try_audit!( - audit, - self.get_conn().prepare("SELECT id, data FROM id2entry"), - "sqlite error {:?}", - OperationError::SQLiteError - ); - - let id2entry_iter = try_audit!( - audit, - stmt.query_map(NO_PARAMS, |row| IdEntry { - id: row.get(0), - data: row.get(1), - }), - "sqlite error {:?}", - OperationError::SQLiteError - ); - - for row in id2entry_iter { - raw_entries.push(row.map_err(|_| OperationError::SQLiteError)?); - } - } - - let entries: Result, _> = raw_entries - .iter() - .map(|id_ent| { - serde_json::from_str(id_ent.data.as_str()) - .map_err(|_| OperationError::SerdeJsonError) - }) - .collect(); - - let entries = entries?; - - let serialized_entries = serde_json::to_string_pretty(&entries); - - let serialized_entries_str = try_audit!( - audit, - serialized_entries, - "serde error {:?}", - OperationError::SerdeJsonError - ); - - let result = fs::write(dst_path, serialized_entries_str); - - try_audit!( - audit, - result, - "fs::write error {:?}", - OperationError::FsError - ); - - Ok(()) - } - pub unsafe fn purge(&self, audit: &mut AuditScope) -> Result<(), OperationError> { // remove all entries from database try_audit!( @@ -650,7 +650,7 @@ impl BackendWriteTransaction { self.conn.execute( "CREATE TABLE IF NOT EXISTS id2entry ( id INTEGER PRIMARY KEY ASC, - data TEXT NOT NULL + data BLOB NOT NULL ) ", NO_PARAMS, diff --git a/src/lib/config.rs b/src/lib/config.rs index 1c68ff364..ced75273b 100644 --- a/src/lib/config.rs +++ b/src/lib/config.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + #[derive(Serialize, Deserialize, Debug)] pub struct Configuration { pub address: String, @@ -23,4 +25,14 @@ impl Configuration { secure_cookies: false, } } + + pub fn update_db_path(&mut self, p: &PathBuf) { + match p.to_str() { + Some(p) => self.db_path = p.to_string(), + None => { + error!("Invalid DB path supplied"); + std::process::exit(1); + } + } + } } diff --git a/src/lib/core.rs b/src/lib/core.rs index ccbe8641e..63d760631 100644 --- a/src/lib/core.rs +++ b/src/lib/core.rs @@ -12,6 +12,8 @@ use crate::config::Configuration; // SearchResult use crate::async_log; +use crate::audit::AuditScope; +use crate::be::{Backend, BackendTransaction}; use crate::error::OperationError; use crate::interval::IntervalActor; use crate::proto::v1::actors::QueryServerV1; @@ -253,6 +255,62 @@ fn auth( ) } +fn setup_backend(config: &Configuration) -> Result { + let mut audit_be = AuditScope::new("backend_setup"); + let be = Backend::new(&mut audit_be, config.db_path.as_str()); + // debug! + debug!("{}", audit_be); + be +} + +pub fn backup_server_core(config: Configuration, dst_path: &str) { + let be = match setup_backend(&config) { + Ok(be) => be, + Err(e) => { + error!("Failed to setup BE: {:?}", e); + return; + } + }; + let mut audit = AuditScope::new("backend_backup"); + + let be_ro_txn = be.read(); + let r = be_ro_txn.backup(&mut audit, dst_path); + debug!("{}", audit); + match r { + Ok(_) => info!("Backup success!"), + Err(e) => { + error!("Backup failed: {:?}", e); + std::process::exit(1); + } + }; + // Let the txn abort, even on success. +} + +pub fn restore_server_core(config: Configuration, dst_path: &str) { + let be = match setup_backend(&config) { + Ok(be) => be, + Err(e) => { + error!("Failed to setup BE: {:?}", e); + return; + } + }; + let mut audit = AuditScope::new("backend_restore"); + + let be_wr_txn = be.write(); + let r = be_wr_txn + .restore(&mut audit, dst_path) + .and_then(|_| be_wr_txn.commit()); + debug!("{}", audit); + + match r { + Ok(_) => info!("Restore success!"), + Err(e) => { + error!("Restore failed: {:?}", e); + std::process::exit(1); + } + }; +} + pub fn create_server_core(config: Configuration) { // Until this point, we probably want to write to the log macro fns. @@ -264,18 +322,26 @@ pub fn create_server_core(config: Configuration) { // Similar, create a stats thread which aggregates statistics from the // server as they come in. + // Setup the be for the qs. + let be = match setup_backend(&config) { + Ok(be) => be, + Err(e) => { + error!("Failed to setup BE: {:?}", e); + return; + } + }; + // Start the query server with the given be path: future config - let server_addr = - match QueryServerV1::start(log_addr.clone(), config.db_path.as_str(), config.threads) { - Ok(addr) => addr, - Err(e) => { - println!( - "An unknown failure in startup has occured - exiting -> {:?}", - e - ); - return; - } - }; + let server_addr = match QueryServerV1::start(log_addr.clone(), be, config.threads) { + Ok(addr) => addr, + Err(e) => { + println!( + "An unknown failure in startup has occured - exiting -> {:?}", + e + ); + return; + } + }; // Setup timed events let _int_addr = IntervalActor::new(server_addr.clone()).start(); diff --git a/src/lib/error.rs b/src/lib/error.rs index 36b1a3032..1b5f34bea 100644 --- a/src/lib/error.rs +++ b/src/lib/error.rs @@ -34,6 +34,7 @@ pub enum OperationError { SQLiteError, //(RusqliteError) FsError, SerdeJsonError, + SerdeCborError, AccessDenied, NotAuthenticated, InvalidAuthState(&'static str), diff --git a/src/lib/idm/claim.rs b/src/lib/idm/claim.rs new file mode 100644 index 000000000..49d92983b --- /dev/null +++ b/src/lib/idm/claim.rs @@ -0,0 +1,18 @@ +use crate::proto::v1::Claim as ProtoClaim; + +#[derive(Debug)] +pub struct Claim { + // For now, empty. Later we'll flesh this out to uuid + name? +} + +impl Claim { + pub fn new() -> Self { + Claim { + // Fill this in! + } + } + + pub fn into_proto(&self) -> ProtoClaim { + unimplemented!(); + } +} diff --git a/src/lib/idm/group.rs b/src/lib/idm/group.rs new file mode 100644 index 000000000..26dfffd63 --- /dev/null +++ b/src/lib/idm/group.rs @@ -0,0 +1,17 @@ +use crate::proto::v1::Group as ProtoGroup; + +#[derive(Debug, Clone)] +pub struct Group { + // name +// uuid +} + +impl Group { + pub fn new() -> Self { + Group {} + } + + pub fn into_proto(&self) -> ProtoGroup { + unimplemented!(); + } +} diff --git a/src/lib/lib.rs b/src/lib/lib.rs index 31256ebf2..fd2c876d4 100644 --- a/src/lib/lib.rs +++ b/src/lib/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate log; extern crate serde; +extern crate serde_cbor; extern crate serde_json; #[macro_use] extern crate serde_derive; diff --git a/src/lib/proto/v1/actors.rs b/src/lib/proto/v1/actors.rs index d40aac785..cddbe647a 100644 --- a/src/lib/proto/v1/actors.rs +++ b/src/lib/proto/v1/actors.rs @@ -53,7 +53,7 @@ impl QueryServerV1 { // threads for write vs read pub fn start( log: actix::Addr, - path: &str, + be: Backend, threads: usize, ) -> Result, OperationError> { let mut audit = AuditScope::new("server_start"); @@ -68,14 +68,6 @@ impl QueryServerV1 { Err(e) => return Err(e), }; - // Create a new backend audit scope - let mut audit_be = AuditScope::new("backend_new"); - let be = match Backend::new(&mut audit_be, path) { - Ok(be) => be, - Err(e) => return Err(e), - }; - audit.append_scope(audit_be); - { let be_txn = be.write(); let mut schema_write = schema.write(); diff --git a/src/server/main.rs b/src/server/main.rs index 7cc9f8184..5730569f0 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -2,21 +2,100 @@ extern crate actix; extern crate env_logger; extern crate rsidm; +extern crate structopt; +#[macro_use] +extern crate log; use rsidm::config::Configuration; -use rsidm::core::create_server_core; +use rsidm::core::{backup_server_core, create_server_core, restore_server_core}; + +use std::path::PathBuf; +use structopt::StructOpt; + +#[derive(Debug, StructOpt)] +struct ServerOpt { + #[structopt(short = "d", long = "debug")] + debug: bool, + #[structopt(parse(from_os_str), short = "D", long = "db_path")] + db_path: PathBuf, +} + +#[derive(Debug, StructOpt)] +struct BackupOpt { + #[structopt(parse(from_os_str))] + path: PathBuf, + #[structopt(flatten)] + serveropts: ServerOpt, +} + +#[derive(Debug, StructOpt)] +struct RestoreOpt { + #[structopt(parse(from_os_str))] + path: PathBuf, + #[structopt(flatten)] + serveropts: ServerOpt, +} + +#[derive(Debug, StructOpt)] +enum Opt { + #[structopt(name = "server")] + Server(ServerOpt), + #[structopt(name = "backup")] + Backup(BackupOpt), + #[structopt(name = "restore")] + Restore(RestoreOpt), +} fn main() { + // Read cli args, determine if we should backup/restore + let opt = Opt::from_args(); + // Read our config (if any) - let config = Configuration::new(); + let mut config = Configuration::new(); + // Apply any cli overrides? // Configure the server logger. This could be adjusted based on what config // says. ::std::env::set_var("RUST_LOG", "actix_web=info,rsidm=info"); env_logger::init(); - let sys = actix::System::new("rsidm-server"); + match opt { + Opt::Server(sopt) => { + info!("Running in server mode ..."); - create_server_core(config); - let _ = sys.run(); + config.update_db_path(&sopt.db_path); + + let sys = actix::System::new("rsidm-server"); + create_server_core(config); + let _ = sys.run(); + } + Opt::Backup(bopt) => { + info!("Running in backup mode ..."); + + config.update_db_path(&bopt.serveropts.db_path); + + let p = match bopt.path.to_str() { + Some(p) => p, + None => { + error!("Invalid backup path"); + std::process::exit(1); + } + }; + backup_server_core(config, p); + } + Opt::Restore(ropt) => { + info!("Running in restore mode ..."); + + config.update_db_path(&ropt.serveropts.db_path); + + let p = match ropt.path.to_str() { + Some(p) => p, + None => { + error!("Invalid restore path"); + std::process::exit(1); + } + }; + restore_server_core(config, p); + } + } }