Improve error handling in startup

This commit is contained in:
William Brown 2019-02-26 14:32:38 +10:00
parent d80390ba07
commit 8f7b5550d3
2 changed files with 50 additions and 22 deletions

View file

@ -210,7 +210,13 @@ pub fn create_server_core(config: Configuration) {
// server as they come in. // server as they come in.
// Start the query server with the given be path: future config // Start the query server with the given be path: future config
let server_addr = server::start(log_addr.clone(), config.db_path.as_str(), config.threads); let server_addr = match server::start(log_addr.clone(), config.db_path.as_str(), config.threads) {
Ok(addr) => addr,
Err(e) => {
// Oh shiiiiiiii
unimplemented!()
}
};
// Setup timed events // Setup timed events
let _int_addr = IntervalActor::new(server_addr.clone()).start(); let _int_addr = IntervalActor::new(server_addr.clone()).start();

View file

@ -22,23 +22,31 @@ use modify::{Modify, ModifyList};
use plugins::Plugins; use plugins::Plugins;
use schema::{Schema, SchemaReadTransaction, SchemaTransaction, SchemaWriteTransaction}; use schema::{Schema, SchemaReadTransaction, SchemaTransaction, SchemaWriteTransaction};
pub fn start(log: actix::Addr<EventLog>, path: &str, threads: usize) -> actix::Addr<QueryServer> { pub fn start(log: actix::Addr<EventLog>, path: &str, threads: usize) -> Result<actix::Addr<QueryServer>, OperationError> {
let mut audit = AuditScope::new("server_start"); let mut audit = AuditScope::new("server_start");
let log_inner = log.clone(); let log_inner = log.clone();
let qs_addr = audit_segment!(audit, || { let qs_addr: Result<actix::Addr<QueryServer>, _> = audit_segment!(audit, || {
// Create "just enough" schema for us to be able to load from // Create "just enough" schema for us to be able to load from
// disk ... Schema loading is one time where we validate the // disk ... Schema loading is one time where we validate the
// entries as we read them, so we need this here. // entries as we read them, so we need this here.
// FIXME: Handle results in start correctly // FIXME: Handle results in start correctly
let schema = Arc::new(Schema::new(&mut audit).unwrap()); let schema = match Schema::new(&mut audit) {
let mut audit_be = AuditScope::new("backend_new"); Ok(s) => Arc::new(s),
let be = Backend::new(&mut audit_be, path).unwrap(); Err(e) => return Err(e),
{ };
// Create a new backend audit scope // Create a new backend audit scope
let mut audit_be = AuditScope::new("backend_new");
let be = match Backend::new(&mut audit_be, path) {
Ok(be) => be,
Err(e) => return Err(e),
};
audit.append_scope(audit_be);
{
let be_txn = be.write(); let be_txn = be.write();
let mut schema_write = schema.write(); let mut schema_write = schema.write();
audit.append_scope(audit_be);
// Now, we have the initial schema in memory. Use this to trigger // Now, we have the initial schema in memory. Use this to trigger
// an index of the be for the core schema. // an index of the be for the core schema.
@ -49,16 +57,22 @@ pub fn start(log: actix::Addr<EventLog>, path: &str, threads: usize) -> actix::A
// Now load the remaining backend schema into memory. // Now load the remaining backend schema into memory.
// TODO: Schema elements should be versioned individually. // TODO: Schema elements should be versioned individually.
schema_write.bootstrap_core(&mut audit).unwrap(); match schema_write.bootstrap_core(&mut audit)
// TODO: Backend setup indexes as needed from schema, for the core // TODO: Backend setup indexes as needed from schema, for the core
// system schema. // system schema.
// TODO: Trigger an index? This could be costly ... // TODO: Trigger an index? This could be costly ...
// Perhaps a config option to say if we index on startup or not. // Perhaps a config option to say if we index on startup or not.
// TODO: Check the results! // TODO: Check the results!
.and_then(|_| {
schema_write.validate(&mut audit); schema_write.validate(&mut audit);
be_txn.commit(); be_txn.commit()
schema_write.commit(); })
.and_then(|_| {
schema_write.commit()
}) {
Ok(_) => {}
Err(e) => return Err(e),
}
} }
// Create a temporary query_server implementation // Create a temporary query_server implementation
@ -68,12 +82,17 @@ pub fn start(log: actix::Addr<EventLog>, path: &str, threads: usize) -> actix::A
let query_server_write = query_server.write(); let query_server_write = query_server.write();
query_server_write.initialise(&mut audit_qsc); query_server_write.initialise(&mut audit_qsc);
// We are good to go! Finally commit and consume the txn. // We are good to go! Finally commit and consume the txn.
audit_segment!(audit_qsc, || query_server_write.commit(&mut audit_qsc)); match audit_segment!(audit_qsc, || query_server_write.commit(&mut audit_qsc)) {
Ok(_) => {}
Err(e) => return Err(e),
};
audit.append_scope(audit_qsc); audit.append_scope(audit_qsc);
SyncArbiter::start(threads, move || { let x = SyncArbiter::start(threads, move || {
QueryServer::new(log_inner.clone(), be.clone(), schema.clone()) QueryServer::new(log_inner.clone(), be.clone(), schema.clone())
}) });
Ok(x)
}); });
log.do_send(audit); log.do_send(audit);
qs_addr qs_addr
@ -859,9 +878,12 @@ impl<'a> QueryServerWriteTransaction<'a> {
// it exists. To guarantee content exactly as is, we compare if it's identical. // it exists. To guarantee content exactly as is, we compare if it's identical.
if !e.compare(&results[0]) { if !e.compare(&results[0]) {
self.internal_delete(audit, filt) self.internal_delete(audit, filt)
.and_then( .and_then(|_| {
self.internal_create(audit, vec![e.invalidate()]) self.internal_create(audit, vec![e.invalidate()])
) })
} else {
// No action required
Ok(())
} }
} else { } else {
Err(OperationError::InvalidDBState) Err(OperationError::InvalidDBState)