Implement backup, restore and server modes

This allows backup and restore of the server backend data from the command line. Backups can be taken while the server is running. Automated backups are *not* part of this yet. 

This also adds a few missing files from a previous commit mistake. Opps!
This commit is contained in:
Firstyear 2019-07-15 09:15:25 +10:00 committed by GitHub
parent 94a6bde269
commit c5497b8024
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 367 additions and 94 deletions

View file

@ -16,12 +16,12 @@ name = "rsidm"
path = "src/lib/lib.rs"
[[bin]]
name = "rsidm_core"
name = "rsidmd"
path = "src/server/main.rs"
[[bin]]
name = "rsidm_whoami"
path = "src/clients/whoami.rs"
name = "kanidm"
path = "src/clients/main.rs"
[dependencies]
@ -43,6 +43,7 @@ tokio = "0.1"
futures = "0.1"
uuid = { version = "0.7", features = ["serde", "v4"] }
serde = "1.0"
serde_cbor = "0.10"
serde_json = "1.0"
serde_derive = "1.0"
@ -50,6 +51,8 @@ rusqlite = { version = "0.15", features = ["backup"] }
r2d2 = "0.8"
r2d2_sqlite = "0.7"
structopt = { version = "0.2", default-features = false }
concread = "0.1"

3
src/clients/main.rs Normal file
View file

@ -0,0 +1,3 @@
fn main() {
println!("Hello kanidm");
}

81
src/lib/async_log.rs Normal file
View file

@ -0,0 +1,81 @@
use actix::prelude::*;
use crate::audit::AuditScope;
// Helper for internal logging.
// Should only be used at startup/shutdown
#[macro_export]
macro_rules! log_event {
($log_addr:expr, $($arg:tt)*) => ({
use crate::async_log::LogEvent;
use std::fmt;
$log_addr.do_send(
LogEvent {
msg: fmt::format(
format_args!($($arg)*)
)
}
)
})
}
// We need to pass in config for this later
// Or we need to pass in the settings for it IE level and dest?
// Is there an efficent way to set a log level filter in the macros
// so that we don't msg unless it's the correct level?
// Do we need config in the log macro?
pub fn start() -> actix::Addr<EventLog> {
SyncArbiter::start(1, move || EventLog {})
}
pub struct EventLog {}
impl Actor for EventLog {
type Context = SyncContext<Self>;
/*
fn started(&mut self, ctx: &mut Self::Context) {
ctx.set_mailbox_capacity(1 << 31);
}
*/
}
// What messages can we be sent. Basically this is all the possible
// inputs we *could* recieve.
// Add a macro for easy msg write
pub struct LogEvent {
pub msg: String,
}
impl Message for LogEvent {
type Result = ();
}
impl Handler<LogEvent> for EventLog {
type Result = ();
fn handle(&mut self, event: LogEvent, _: &mut SyncContext<Self>) -> Self::Result {
info!("logevent: {}", event.msg);
}
}
impl Handler<AuditScope> for EventLog {
type Result = ();
fn handle(&mut self, event: AuditScope, _: &mut SyncContext<Self>) -> Self::Result {
info!("audit: {}", event);
}
}
/*
impl Handler<Event> for EventLog {
type Result = ();
fn handle(&mut self, event: Event, _: &mut SyncContext<Self>) -> Self::Result {
println!("EVENT: {:?}", event)
}
}
*/

View file

@ -4,6 +4,7 @@ use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::types::ToSql;
use rusqlite::NO_PARAMS;
use serde_cbor;
use serde_json;
use std::convert::TryFrom;
use std::fs;
@ -24,7 +25,7 @@ mod sqlite_be;
struct IdEntry {
// TODO: for now this is i64 to make sqlite work, but entry is u64 for indexing reasons!
id: i64,
data: String,
data: Vec<u8>,
}
pub struct Backend {
@ -100,8 +101,8 @@ pub trait BackendTransaction {
let entries: Result<Vec<Entry<EntryValid, EntryCommitted>>, _> = raw_entries
.iter()
.filter_map(|id_ent| {
let db_e = match serde_json::from_str(id_ent.data.as_str())
.map_err(|_| OperationError::SerdeJsonError)
let db_e = match serde_cbor::from_slice(id_ent.data.as_slice())
.map_err(|_| OperationError::SerdeCborError)
{
Ok(v) => v,
Err(e) => return Some(Err(e)),
@ -161,6 +162,65 @@ pub trait BackendTransaction {
fn verify(&self) -> Vec<Result<(), ConsistencyError>> {
Vec::new()
}
fn backup(&self, audit: &mut AuditScope, dst_path: &str) -> Result<(), OperationError> {
// load all entries into RAM, may need to change this later
// if the size of the database compared to RAM is an issue
let mut raw_entries: Vec<IdEntry> = Vec::new();
{
let mut stmt = try_audit!(
audit,
self.get_conn().prepare("SELECT id, data FROM id2entry"),
"sqlite error {:?}",
OperationError::SQLiteError
);
let id2entry_iter = try_audit!(
audit,
stmt.query_map(NO_PARAMS, |row| IdEntry {
id: row.get(0),
data: row.get(1),
}),
"sqlite error {:?}",
OperationError::SQLiteError
);
for row in id2entry_iter {
raw_entries.push(row.map_err(|_| OperationError::SQLiteError)?);
}
}
let entries: Result<Vec<DbEntry>, _> = raw_entries
.iter()
.map(|id_ent| {
serde_cbor::from_slice(id_ent.data.as_slice())
.map_err(|_| OperationError::SerdeJsonError)
})
.collect();
let entries = entries?;
let serialized_entries = serde_json::to_string_pretty(&entries);
let serialized_entries_str = try_audit!(
audit,
serialized_entries,
"serde error {:?}",
OperationError::SerdeJsonError
);
let result = fs::write(dst_path, serialized_entries_str);
try_audit!(
audit,
result,
"fs::write error {:?}",
OperationError::FsError
);
Ok(())
}
}
impl Drop for BackendReadTransaction {
@ -278,7 +338,7 @@ impl BackendWriteTransaction {
.map(|ser_db_e| {
id_max = id_max + 1;
let data =
serde_json::to_string(&ser_db_e).map_err(|_| OperationError::SerdeJsonError)?;
serde_cbor::to_vec(&ser_db_e).map_err(|_| OperationError::SerdeCborError)?;
Ok(IdEntry {
id: id_max,
@ -371,8 +431,7 @@ impl BackendWriteTransaction {
}
})?;
let data =
serde_json::to_string(&db_e).map_err(|_| OperationError::SerdeJsonError)?;
let data = serde_cbor::to_vec(&db_e).map_err(|_| OperationError::SerdeCborError)?;
Ok(IdEntry {
// TODO: Instead of getting these from the entry, we could lookup
@ -476,65 +535,6 @@ impl BackendWriteTransaction {
})
}
pub fn backup(&self, audit: &mut AuditScope, dst_path: &str) -> Result<(), OperationError> {
// load all entries into RAM, may need to change this later
// if the size of the database compared to RAM is an issue
let mut raw_entries: Vec<IdEntry> = Vec::new();
{
let mut stmt = try_audit!(
audit,
self.get_conn().prepare("SELECT id, data FROM id2entry"),
"sqlite error {:?}",
OperationError::SQLiteError
);
let id2entry_iter = try_audit!(
audit,
stmt.query_map(NO_PARAMS, |row| IdEntry {
id: row.get(0),
data: row.get(1),
}),
"sqlite error {:?}",
OperationError::SQLiteError
);
for row in id2entry_iter {
raw_entries.push(row.map_err(|_| OperationError::SQLiteError)?);
}
}
let entries: Result<Vec<DbEntry>, _> = raw_entries
.iter()
.map(|id_ent| {
serde_json::from_str(id_ent.data.as_str())
.map_err(|_| OperationError::SerdeJsonError)
})
.collect();
let entries = entries?;
let serialized_entries = serde_json::to_string_pretty(&entries);
let serialized_entries_str = try_audit!(
audit,
serialized_entries,
"serde error {:?}",
OperationError::SerdeJsonError
);
let result = fs::write(dst_path, serialized_entries_str);
try_audit!(
audit,
result,
"fs::write error {:?}",
OperationError::FsError
);
Ok(())
}
pub unsafe fn purge(&self, audit: &mut AuditScope) -> Result<(), OperationError> {
// remove all entries from database
try_audit!(
@ -650,7 +650,7 @@ impl BackendWriteTransaction {
self.conn.execute(
"CREATE TABLE IF NOT EXISTS id2entry (
id INTEGER PRIMARY KEY ASC,
data TEXT NOT NULL
data BLOB NOT NULL
)
",
NO_PARAMS,

View file

@ -1,3 +1,5 @@
use std::path::PathBuf;
#[derive(Serialize, Deserialize, Debug)]
pub struct Configuration {
pub address: String,
@ -23,4 +25,14 @@ impl Configuration {
secure_cookies: false,
}
}
pub fn update_db_path(&mut self, p: &PathBuf) {
match p.to_str() {
Some(p) => self.db_path = p.to_string(),
None => {
error!("Invalid DB path supplied");
std::process::exit(1);
}
}
}
}

View file

@ -12,6 +12,8 @@ use crate::config::Configuration;
// SearchResult
use crate::async_log;
use crate::audit::AuditScope;
use crate::be::{Backend, BackendTransaction};
use crate::error::OperationError;
use crate::interval::IntervalActor;
use crate::proto::v1::actors::QueryServerV1;
@ -253,6 +255,62 @@ fn auth(
)
}
fn setup_backend(config: &Configuration) -> Result<Backend, OperationError> {
let mut audit_be = AuditScope::new("backend_setup");
let be = Backend::new(&mut audit_be, config.db_path.as_str());
// debug!
debug!("{}", audit_be);
be
}
pub fn backup_server_core(config: Configuration, dst_path: &str) {
let be = match setup_backend(&config) {
Ok(be) => be,
Err(e) => {
error!("Failed to setup BE: {:?}", e);
return;
}
};
let mut audit = AuditScope::new("backend_backup");
let be_ro_txn = be.read();
let r = be_ro_txn.backup(&mut audit, dst_path);
debug!("{}", audit);
match r {
Ok(_) => info!("Backup success!"),
Err(e) => {
error!("Backup failed: {:?}", e);
std::process::exit(1);
}
};
// Let the txn abort, even on success.
}
pub fn restore_server_core(config: Configuration, dst_path: &str) {
let be = match setup_backend(&config) {
Ok(be) => be,
Err(e) => {
error!("Failed to setup BE: {:?}", e);
return;
}
};
let mut audit = AuditScope::new("backend_restore");
let be_wr_txn = be.write();
let r = be_wr_txn
.restore(&mut audit, dst_path)
.and_then(|_| be_wr_txn.commit());
debug!("{}", audit);
match r {
Ok(_) => info!("Restore success!"),
Err(e) => {
error!("Restore failed: {:?}", e);
std::process::exit(1);
}
};
}
pub fn create_server_core(config: Configuration) {
// Until this point, we probably want to write to the log macro fns.
@ -264,18 +322,26 @@ pub fn create_server_core(config: Configuration) {
// Similar, create a stats thread which aggregates statistics from the
// server as they come in.
// Setup the be for the qs.
let be = match setup_backend(&config) {
Ok(be) => be,
Err(e) => {
error!("Failed to setup BE: {:?}", e);
return;
}
};
// Start the query server with the given be path: future config
let server_addr =
match QueryServerV1::start(log_addr.clone(), config.db_path.as_str(), config.threads) {
Ok(addr) => addr,
Err(e) => {
println!(
"An unknown failure in startup has occured - exiting -> {:?}",
e
);
return;
}
};
let server_addr = match QueryServerV1::start(log_addr.clone(), be, config.threads) {
Ok(addr) => addr,
Err(e) => {
println!(
"An unknown failure in startup has occured - exiting -> {:?}",
e
);
return;
}
};
// Setup timed events
let _int_addr = IntervalActor::new(server_addr.clone()).start();

View file

@ -34,6 +34,7 @@ pub enum OperationError {
SQLiteError, //(RusqliteError)
FsError,
SerdeJsonError,
SerdeCborError,
AccessDenied,
NotAuthenticated,
InvalidAuthState(&'static str),

18
src/lib/idm/claim.rs Normal file
View file

@ -0,0 +1,18 @@
use crate::proto::v1::Claim as ProtoClaim;
#[derive(Debug)]
pub struct Claim {
// For now, empty. Later we'll flesh this out to uuid + name?
}
impl Claim {
pub fn new() -> Self {
Claim {
// Fill this in!
}
}
pub fn into_proto(&self) -> ProtoClaim {
unimplemented!();
}
}

17
src/lib/idm/group.rs Normal file
View file

@ -0,0 +1,17 @@
use crate::proto::v1::Group as ProtoGroup;
#[derive(Debug, Clone)]
pub struct Group {
// name
// uuid
}
impl Group {
pub fn new() -> Self {
Group {}
}
pub fn into_proto(&self) -> ProtoGroup {
unimplemented!();
}
}

View file

@ -1,6 +1,7 @@
#[macro_use]
extern crate log;
extern crate serde;
extern crate serde_cbor;
extern crate serde_json;
#[macro_use]
extern crate serde_derive;

View file

@ -53,7 +53,7 @@ impl QueryServerV1 {
// threads for write vs read
pub fn start(
log: actix::Addr<EventLog>,
path: &str,
be: Backend,
threads: usize,
) -> Result<actix::Addr<QueryServerV1>, OperationError> {
let mut audit = AuditScope::new("server_start");
@ -68,14 +68,6 @@ impl QueryServerV1 {
Err(e) => return Err(e),
};
// Create a new backend audit scope
let mut audit_be = AuditScope::new("backend_new");
let be = match Backend::new(&mut audit_be, path) {
Ok(be) => be,
Err(e) => return Err(e),
};
audit.append_scope(audit_be);
{
let be_txn = be.write();
let mut schema_write = schema.write();

View file

@ -2,21 +2,100 @@ extern crate actix;
extern crate env_logger;
extern crate rsidm;
extern crate structopt;
#[macro_use]
extern crate log;
use rsidm::config::Configuration;
use rsidm::core::create_server_core;
use rsidm::core::{backup_server_core, create_server_core, restore_server_core};
use std::path::PathBuf;
use structopt::StructOpt;
#[derive(Debug, StructOpt)]
struct ServerOpt {
#[structopt(short = "d", long = "debug")]
debug: bool,
#[structopt(parse(from_os_str), short = "D", long = "db_path")]
db_path: PathBuf,
}
#[derive(Debug, StructOpt)]
struct BackupOpt {
#[structopt(parse(from_os_str))]
path: PathBuf,
#[structopt(flatten)]
serveropts: ServerOpt,
}
#[derive(Debug, StructOpt)]
struct RestoreOpt {
#[structopt(parse(from_os_str))]
path: PathBuf,
#[structopt(flatten)]
serveropts: ServerOpt,
}
#[derive(Debug, StructOpt)]
enum Opt {
#[structopt(name = "server")]
Server(ServerOpt),
#[structopt(name = "backup")]
Backup(BackupOpt),
#[structopt(name = "restore")]
Restore(RestoreOpt),
}
fn main() {
// Read cli args, determine if we should backup/restore
let opt = Opt::from_args();
// Read our config (if any)
let config = Configuration::new();
let mut config = Configuration::new();
// Apply any cli overrides?
// Configure the server logger. This could be adjusted based on what config
// says.
::std::env::set_var("RUST_LOG", "actix_web=info,rsidm=info");
env_logger::init();
let sys = actix::System::new("rsidm-server");
match opt {
Opt::Server(sopt) => {
info!("Running in server mode ...");
create_server_core(config);
let _ = sys.run();
config.update_db_path(&sopt.db_path);
let sys = actix::System::new("rsidm-server");
create_server_core(config);
let _ = sys.run();
}
Opt::Backup(bopt) => {
info!("Running in backup mode ...");
config.update_db_path(&bopt.serveropts.db_path);
let p = match bopt.path.to_str() {
Some(p) => p,
None => {
error!("Invalid backup path");
std::process::exit(1);
}
};
backup_server_core(config, p);
}
Opt::Restore(ropt) => {
info!("Running in restore mode ...");
config.update_db_path(&ropt.serveropts.db_path);
let p = match ropt.path.to_str() {
Some(p) => p,
None => {
error!("Invalid restore path");
std::process::exit(1);
}
};
restore_server_core(config, p);
}
}
}