diff --git a/src/lib/core.rs b/src/lib/core.rs index d240b24ae..2ab7266b1 100644 --- a/src/lib/core.rs +++ b/src/lib/core.rs @@ -210,7 +210,13 @@ pub fn create_server_core(config: Configuration) { // server as they come in. // Start the query server with the given be path: future config - let server_addr = server::start(log_addr.clone(), config.db_path.as_str(), config.threads); + let server_addr = match server::start(log_addr.clone(), config.db_path.as_str(), config.threads) { + Ok(addr) => addr, + Err(e) => { + // Oh shiiiiiiii + unimplemented!() + } + }; // Setup timed events let _int_addr = IntervalActor::new(server_addr.clone()).start(); diff --git a/src/lib/server.rs b/src/lib/server.rs index 6b352020f..3bd06fad8 100644 --- a/src/lib/server.rs +++ b/src/lib/server.rs @@ -22,23 +22,31 @@ use modify::{Modify, ModifyList}; use plugins::Plugins; use schema::{Schema, SchemaReadTransaction, SchemaTransaction, SchemaWriteTransaction}; -pub fn start(log: actix::Addr, path: &str, threads: usize) -> actix::Addr { +pub fn start(log: actix::Addr, path: &str, threads: usize) -> Result, OperationError> { let mut audit = AuditScope::new("server_start"); let log_inner = log.clone(); - let qs_addr = audit_segment!(audit, || { + let qs_addr: Result, _> = audit_segment!(audit, || { // Create "just enough" schema for us to be able to load from // disk ... Schema loading is one time where we validate the // entries as we read them, so we need this here. // FIXME: Handle results in start correctly - let schema = Arc::new(Schema::new(&mut audit).unwrap()); + let schema = match Schema::new(&mut audit) { + Ok(s) => Arc::new(s), + Err(e) => return Err(e), + }; + + // Create a new backend audit scope let mut audit_be = AuditScope::new("backend_new"); - let be = Backend::new(&mut audit_be, path).unwrap(); + let be = match Backend::new(&mut audit_be, path) { + Ok(be) => be, + Err(e) => return Err(e), + }; + audit.append_scope(audit_be); + { - // Create a new backend audit scope let be_txn = be.write(); let mut schema_write = schema.write(); - audit.append_scope(audit_be); // Now, we have the initial schema in memory. Use this to trigger // an index of the be for the core schema. @@ -49,16 +57,22 @@ pub fn start(log: actix::Addr, path: &str, threads: usize) -> actix::A // Now load the remaining backend schema into memory. // TODO: Schema elements should be versioned individually. - schema_write.bootstrap_core(&mut audit).unwrap(); - - // TODO: Backend setup indexes as needed from schema, for the core - // system schema. - // TODO: Trigger an index? This could be costly ... - // Perhaps a config option to say if we index on startup or not. - // TODO: Check the results! - schema_write.validate(&mut audit); - be_txn.commit(); - schema_write.commit(); + match schema_write.bootstrap_core(&mut audit) + // TODO: Backend setup indexes as needed from schema, for the core + // system schema. + // TODO: Trigger an index? This could be costly ... + // Perhaps a config option to say if we index on startup or not. + // TODO: Check the results! + .and_then(|_| { + schema_write.validate(&mut audit); + be_txn.commit() + }) + .and_then(|_| { + schema_write.commit() + }) { + Ok(_) => {} + Err(e) => return Err(e), + } } // Create a temporary query_server implementation @@ -68,12 +82,17 @@ pub fn start(log: actix::Addr, path: &str, threads: usize) -> actix::A let query_server_write = query_server.write(); query_server_write.initialise(&mut audit_qsc); // We are good to go! Finally commit and consume the txn. - audit_segment!(audit_qsc, || query_server_write.commit(&mut audit_qsc)); + match audit_segment!(audit_qsc, || query_server_write.commit(&mut audit_qsc)) { + Ok(_) => {} + Err(e) => return Err(e), + }; + audit.append_scope(audit_qsc); - SyncArbiter::start(threads, move || { + let x = SyncArbiter::start(threads, move || { QueryServer::new(log_inner.clone(), be.clone(), schema.clone()) - }) + }); + Ok(x) }); log.do_send(audit); qs_addr @@ -859,9 +878,12 @@ impl<'a> QueryServerWriteTransaction<'a> { // it exists. To guarantee content exactly as is, we compare if it's identical. if !e.compare(&results[0]) { self.internal_delete(audit, filt) - .and_then( + .and_then(|_| { self.internal_create(audit, vec![e.invalidate()]) - ) + }) + } else { + // No action required + Ok(()) } } else { Err(OperationError::InvalidDBState)