Finished major transaction refactor

This commit is contained in:
William Brown 2018-12-30 12:17:09 +10:00
parent cdfe8f93d7
commit 3ad0f0ca28
22 changed files with 1854 additions and 657 deletions

View file

@ -42,8 +42,10 @@ serde = "1.0"
serde_json = "1.0"
serde_derive = "1.0"
rusqlite = "0.15"
rusqlite = { version = "0.15", features = ["backup"] }
r2d2 = "0.8"
r2d2_sqlite = "0.7"
concread = "0.1"

View file

@ -0,0 +1,7 @@
* Filters are security checked for access
* attribute request lists are checked for access
* profiles work on filters
*

15
designs/auth.rst Normal file
View file

@ -0,0 +1,15 @@
* auth is a stepped protocol (similar to SASL)
* we offer possible authentications
* these proceed until a deny or allow is hit.
* we provide a cookie that is valid on all server instances (except read-onlies
that have unique cookie keys to prevent forgery of writable master cookies)
* cookies can request tokens, tokens are signed cbor that contains the set
of group uuids + names derferenced so that a client can make all authorisation
decisions from a single datapoint
* each token can be unique based on the type of auth (ie 2fa needed to get access
to admin groups)

View file

@ -0,0 +1,6 @@
* configuration is static and read at start up
** performance and simplicity
** configuration defines if a plugin is enabled on not
** no dynamic plugins

View file

@ -0,0 +1,7 @@
* create, delete, modify all take multiple objects to work on so that changes can be consistent.
* in theory, there should be one interface, "modify" that specifies create, delete, modify, so that all changes are possible in a single operation
* This however presents some schema verification changes, but they are not insurmountable and it would make the main server core simpler

View file

@ -0,0 +1,7 @@
* user private group is implied
* uidnumber and gidnumber are stored on the entry
* if not set, derive from uuid
* if set, we respect the values

View file

@ -1,6 +1,6 @@
extern crate rsidm;
use rsidm::proto_v1;
// use rsidm::proto_v1;
fn main() {
println!("Hello whoami");

View file

@ -45,6 +45,8 @@ macro_rules! audit_segment {
let end = Instant::now();
let diff = end.duration_since(start);
$au.set_duration(diff);
// Return the result. Hope this works!
r
}};
@ -52,8 +54,8 @@ macro_rules! audit_segment {
#[derive(Serialize, Deserialize)]
enum AuditEvent {
log(AuditLog),
scope(AuditScope),
Log(AuditLog),
Scope(AuditScope),
}
#[derive(Debug, Serialize, Deserialize)]
@ -83,6 +85,8 @@ impl Message for AuditScope {
impl fmt::Display for AuditScope {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut depth = 0;
// write!(f, "{}: begin -> {}", self.time, self.name);
let d = serde_json::to_string_pretty(self).unwrap();
write!(f, "{}", d)
}
@ -105,16 +109,20 @@ impl AuditScope {
self.name.as_str()
}
pub fn set_duration(&mut self, diff: Duration) {
self.duration = Some(diff);
}
// Given a new audit event, append it in.
pub fn append_scope(&mut self, scope: AuditScope) {
self.events.push(AuditEvent::scope(scope))
self.events.push(AuditEvent::Scope(scope))
}
pub fn log_event(&mut self, data: String) {
let t_now = SystemTime::now();
let datetime: DateTime<Utc> = t_now.into();
self.events.push(AuditEvent::log(AuditLog {
self.events.push(AuditEvent::Log(AuditLog {
time: datetime.to_rfc3339(),
name: data,
}))

View file

@ -7,31 +7,14 @@ use rusqlite::NO_PARAMS;
use serde_json;
// use uuid;
use super::audit::AuditScope;
use super::entry::Entry;
use super::filter::Filter;
use audit::AuditScope;
use entry::Entry;
use filter::Filter;
mod idl;
mod mem_be;
mod sqlite_be;
// This contacts the needed backend and starts it up
#[derive(Debug, PartialEq)]
pub struct BackendAuditScope {
time_start: (),
time_end: (),
}
impl BackendAuditScope {
pub fn new() -> Self {
BackendAuditScope {
time_start: (),
time_end: (),
}
}
}
#[derive(Debug)]
struct IdEntry {
// FIXME: This should be u64, but sqlite uses i32 ...
@ -56,54 +39,144 @@ pub struct Backend {
pool: Pool<SqliteConnectionManager>,
}
// In the future this will do the routing between the chosen backends etc.
impl Backend {
pub fn new(audit: &mut AuditScope, path: &str) -> Self {
// this has a ::memory() type, but will path == "" work?
audit_segment!(audit, || {
let manager = SqliteConnectionManager::file(path);
let builder1 = Pool::builder();
let builder2 = if path == "" {
builder1.max_size(1)
} else {
// FIXME: Make this configurable
builder1.max_size(8)
};
// Look at max_size and thread_pool here for perf later
let pool = builder2.build(manager).expect("Failed to create pool");
pub struct BackendTransaction {
committed: bool,
conn: r2d2::PooledConnection<SqliteConnectionManager>,
}
pub struct BackendWriteTransaction {
committed: bool,
conn: r2d2::PooledConnection<SqliteConnectionManager>,
}
pub trait BackendReadTransaction {
fn get_conn(&self) -> &r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager>;
// Take filter, and AuditScope ref?
fn search(&self, au: &mut AuditScope, filt: &Filter) -> Result<Vec<Entry>, BackendError> {
// Do things
// Alloc a vec for the entries.
// FIXME: Make this actually a good size for the result set ...
// FIXME: Actually compute indexes here.
// So to make this use indexes, we can use the filter type and
// destructure it to work out what we need to actually search (if
// possible) to create the candidate set.
// Unlike DS, even if we don't get the index back, we can just pass
// to the in-memory filter test and be done.
audit_segment!(au, || {
let mut raw_entries: Vec<String> = Vec::new();
{
let conn = pool.get().unwrap();
// Perform any migrations as required?
// I think we only need the core table here, indexing will do it's own
// thing later
// conn.execute("PRAGMA journal_mode=WAL;", NO_PARAMS).unwrap();
conn.execute(
"CREATE TABLE IF NOT EXISTS id2entry (
id INTEGER PRIMARY KEY ASC,
data TEXT NOT NULL
)
",
NO_PARAMS,
)
.unwrap();
// Actually do a search now!
// let conn = self.pool.get().unwrap();
// Start a txn
// conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap();
// Create a version table for migration indication
// Create the core db
// read them all
let mut stmt = self.get_conn().prepare("SELECT id, data FROM id2entry").unwrap();
let id2entry_iter = stmt
.query_map(NO_PARAMS, |row| IdEntry {
id: row.get(0),
data: row.get(1),
})
.unwrap();
for row in id2entry_iter {
audit_log!(au, "raw entry: {:?}", row);
// FIXME: Handle this properly.
raw_entries.push(row.unwrap().data);
}
// Rollback, we should have done nothing.
// conn.execute("ROLLBACK TRANSACTION", NO_PARAMS).unwrap();
}
// Do other things
// Now, de-serialise the raw_entries back to entries
let entries: Vec<Entry> = raw_entries
.iter()
.filter_map(|val| {
// TODO: Should we do better than unwrap?
let e: Entry = serde_json::from_str(val.as_str()).unwrap();
if filt.entry_match_no_index(&e) {
Some(e)
} else {
None
}
})
.collect();
Backend { pool: pool }
Ok(entries)
})
}
pub fn create(
&mut self,
au: &mut AuditScope,
entries: &Vec<Entry>,
) -> Result<BackendAuditScope, BackendError> {
}
impl Drop for BackendTransaction {
// Abort
// TODO: Is this correct for RO txn?
fn drop(self: &mut Self) {
if !self.committed {
println!("Aborting txn");
self.conn
.execute("ROLLBACK TRANSACTION", NO_PARAMS)
.unwrap();
}
}
}
impl BackendTransaction {
pub fn new(conn: r2d2::PooledConnection<SqliteConnectionManager>) -> Self {
// Start the transaction
println!("Starting txn ...");
// TODO: Way to flag that this will be a read?
conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap();
BackendTransaction {
committed: false,
conn: conn,
}
}
}
impl BackendReadTransaction for BackendTransaction {
fn get_conn(&self) -> &r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager> {
&self.conn
}
}
static DBV_ID2ENTRY: &'static str = "id2entry";
static DBV_INDEX: &'static str = "index";
impl Drop for BackendWriteTransaction {
// Abort
fn drop(self: &mut Self) {
if !self.committed {
println!("Aborting txn");
self.conn
.execute("ROLLBACK TRANSACTION", NO_PARAMS)
.unwrap();
}
}
}
impl BackendReadTransaction for BackendWriteTransaction {
fn get_conn(&self) -> &r2d2::PooledConnection<r2d2_sqlite::SqliteConnectionManager> {
&self.conn
}
}
impl BackendWriteTransaction {
pub fn new(conn: r2d2::PooledConnection<SqliteConnectionManager>) -> Self {
// Start the transaction
println!("Starting txn ...");
// TODO: Way to flag that this will be a write?
conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap();
BackendWriteTransaction {
committed: false,
conn: conn,
}
}
pub fn create(&self, au: &mut AuditScope, entries: &Vec<Entry>) -> Result<(), BackendError> {
audit_segment!(au, || {
let be_audit = BackendAuditScope::new();
// Start be audit timer
if entries.is_empty() {
@ -128,79 +201,25 @@ impl Backend {
// THIS IS PROBABLY THE BIT WHERE YOU NEED DB ABSTRACTION
{
let conn = self.pool.get().unwrap();
// Start a txn
conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap();
// self.conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap();
// write them all
for ser_entry in ser_entries {
conn.execute(
"INSERT INTO id2entry (data) VALUES (?1)",
&[&ser_entry as &ToSql],
)
.unwrap();
self.conn
.execute(
"INSERT INTO id2entry (data) VALUES (?1)",
&[&ser_entry as &ToSql],
)
.unwrap();
}
// TODO: update indexes (as needed)
// Commit the txn
conn.execute("COMMIT TRANSACTION", NO_PARAMS).unwrap();
// conn.execute("COMMIT TRANSACTION", NO_PARAMS).unwrap();
}
Ok(be_audit)
})
}
// Take filter, and AuditScope ref?
pub fn search(&self, au: &mut AuditScope, filt: &Filter) -> Result<Vec<Entry>, BackendError> {
// Do things
// Alloc a vec for the entries.
// FIXME: Make this actually a good size for the result set ...
// FIXME: Actually compute indexes here.
// So to make this use indexes, we can use the filter type and
// destructure it to work out what we need to actually search (if
// possible) to create the candidate set.
// Unlike DS, even if we don't get the index back, we can just pass
// to the in-memory filter test and be done.
audit_segment!(au, || {
let mut raw_entries: Vec<String> = Vec::new();
{
// Actually do a search now!
let conn = self.pool.get().unwrap();
// Start a txn
conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap();
// read them all
let mut stmt = conn.prepare("SELECT id, data FROM id2entry").unwrap();
let id2entry_iter = stmt
.query_map(NO_PARAMS, |row| IdEntry {
id: row.get(0),
data: row.get(1),
})
.unwrap();
for row in id2entry_iter {
audit_log!(au, "raw entry: {:?}", row);
// FIXME: Handle this properly.
raw_entries.push(row.unwrap().data);
}
// Rollback, we should have done nothing.
conn.execute("ROLLBACK TRANSACTION", NO_PARAMS).unwrap();
}
// Do other things
// Now, de-serialise the raw_entries back to entries
let entries: Vec<Entry> = raw_entries
.iter()
.filter_map(|val| {
// TODO: Should we do better than unwrap?
let e: Entry = serde_json::from_str(val.as_str()).unwrap();
if filt.entry_match_no_index(&e) {
Some(e)
} else {
None
}
})
.collect();
Ok(entries)
Ok(())
})
}
@ -227,9 +246,162 @@ impl Backend {
}
}
pub fn modify() {}
pub fn modify() {
unimplemented!()
}
pub fn delete() {}
pub fn delete() {
unimplemented!()
}
pub fn backup() {
unimplemented!()
}
// Should this be offline only?
pub fn restore() {
unimplemented!()
}
pub fn commit(mut self) -> Result<(), ()> {
println!("Commiting txn");
self.committed = true;
self.conn
.execute("COMMIT TRANSACTION", NO_PARAMS)
.map(|_| ())
.map_err(|e| {
println!("{:?}", e);
()
})
}
// ===== inner helpers =====
// Some of these are not self due to use in new()
fn get_db_version_key(&self, key: &str) -> i32 {
match self.conn.query_row_named(
"SELECT version FROM db_version WHERE id = :id",
&[(":id", &key)],
|row| row.get(0),
) {
Ok(e) => e,
Err(_) => {
// The value is missing, default to 0.
0
}
}
}
pub fn setup(&self, audit: &mut AuditScope) -> Result<(), ()> {
{
// self.conn.execute("BEGIN TRANSACTION", NO_PARAMS).unwrap();
// conn.execute("PRAGMA journal_mode=WAL;", NO_PARAMS).unwrap();
//
// This stores versions of components. For example:
// ----------------------
// | id | version |
// | id2entry | 1 |
// | index | 1 |
// | schema | 1 |
// ----------------------
//
// This allows each component to initialise on it's own, be
// rolled back individually, by upgraded in isolation, and more
//
// NEVER CHANGE THIS DEFINITION.
self.conn
.execute(
"CREATE TABLE IF NOT EXISTS db_version (
id TEXT PRIMARY KEY,
version INTEGER
)
",
NO_PARAMS,
)
.unwrap();
// If the table is empty, populate the versions as 0.
let mut dbv_id2entry = self.get_db_version_key(DBV_ID2ENTRY);
audit_log!(audit, "dbv_id2entry initial == {}", dbv_id2entry);
// Check db_version here.
// * if 0 -> create v1.
if dbv_id2entry == 0 {
self.conn
.execute(
"CREATE TABLE IF NOT EXISTS id2entry (
id INTEGER PRIMARY KEY ASC,
data TEXT NOT NULL
)
",
NO_PARAMS,
)
.unwrap();
dbv_id2entry = 1;
audit_log!(audit, "dbv_id2entry migrated -> {}", dbv_id2entry);
}
// * if v1 -> complete.
self.conn
.execute_named(
"INSERT OR REPLACE INTO db_version (id, version) VALUES(:id, :dbv_id2entry)",
&[(":id", &DBV_ID2ENTRY), (":dbv_id2entry", &dbv_id2entry)],
)
.unwrap();
// NOTE: Indexing is configured in a different step!
// Indexing uses a db version flag to represent the version
// of the indexes representation on disk in case we change
// it.
Ok(())
}
}
}
// In the future this will do the routing between the chosen backends etc.
impl Backend {
pub fn new(audit: &mut AuditScope, path: &str) -> Result<Self, ()> {
// this has a ::memory() type, but will path == "" work?
audit_segment!(audit, || {
let manager = SqliteConnectionManager::file(path);
let builder1 = Pool::builder();
let builder2 = if path == "" {
// We are in a debug mode, with in memory. We MUST have only
// a single DB thread, else we cause consistency issues.
builder1.max_size(1)
} else {
// FIXME: Make this configurable
builder1.max_size(8)
};
// Look at max_size and thread_pool here for perf later
let pool = builder2.build(manager).expect("Failed to create pool");
let be = Backend { pool: pool };
// Now complete our setup with a txn
let r = {
let be_txn = be.write();
be_txn.setup(audit);
be_txn.commit()
};
audit_log!(audit, "be new setup: {:?}", r);
match r {
Ok(_) => Ok(be),
Err(e) => Err(e),
}
})
}
pub fn read(&self) -> BackendTransaction {
let conn = self.pool.get().unwrap();
BackendTransaction::new(conn)
}
pub fn write(&self) -> BackendWriteTransaction {
let conn = self.pool.get().unwrap();
BackendWriteTransaction::new(conn)
}
}
impl Clone for Backend {
@ -257,35 +429,27 @@ mod tests {
use super::super::audit::AuditScope;
use super::super::entry::Entry;
use super::super::filter::Filter;
use super::super::log;
use super::{Backend, BackendError};
use super::{Backend, BackendError, BackendTransaction, BackendWriteTransaction, BackendReadTransaction};
macro_rules! run_test {
($test_fn:expr) => {{
System::run(|| {
let mut audit = AuditScope::new("run_test");
let mut audit = AuditScope::new("run_test");
let test_log = log::start();
let be = Backend::new(&mut audit, "").unwrap();
let mut be_txn = be.write();
let be = Backend::new(&mut audit, "");
// Could wrap another future here for the future::ok bit...
let fut = $test_fn(&mut audit, be);
let comp_fut = fut.map_err(|()| ()).and_then(move |_r| {
test_log.do_send(audit);
println!("Stopping actix ...");
actix::System::current().stop();
future::result(Ok(()))
});
tokio::spawn(comp_fut);
});
// Could wrap another future here for the future::ok bit...
let r = $test_fn(&mut audit, &be_txn);
// Commit, to guarantee it worked.
assert!(be_txn.commit().is_ok());
println!("{}", audit);
r
}};
}
#[test]
fn test_simple_create() {
run_test!(|audit: &mut AuditScope, mut be: Backend| {
run_test!(|audit: &mut AuditScope, be: &BackendWriteTransaction| {
audit_log!(audit, "Simple Create");
let empty_result = be.create(audit, &Vec::new());
@ -306,34 +470,27 @@ mod tests {
// There should only be one entry so is this enough?
assert!(entries.first().is_some());
// Later we could check consistency of the entry saved ...
// Check it's there
future::ok(())
});
}
#[test]
fn test_simple_search() {
run_test!(|audit: &mut AuditScope, be| {
run_test!(|audit: &mut AuditScope, be: &BackendWriteTransaction| {
audit_log!(audit, "Simple Search");
future::ok(())
});
}
#[test]
fn test_simple_modify() {
run_test!(|audit: &mut AuditScope, be| {
run_test!(|audit: &mut AuditScope, be: &BackendWriteTransaction| {
audit_log!(audit, "Simple Modify");
future::ok(())
});
}
#[test]
fn test_simple_delete() {
run_test!(|audit: &mut AuditScope, be| {
run_test!(|audit: &mut AuditScope, be: &BackendWriteTransaction| {
audit_log!(audit, "Simple Delete");
future::ok(())
});
}
}

39
src/lib/constants.rs Normal file
View file

@ -0,0 +1,39 @@
pub static UUID_ADMIN: &'static str = "00000000-0000-0000-0000-000000000000";
pub static UUID_ANONYMOUS: &'static str = "00000000-0000-0000-0000-ffffffffffff";
// Core
pub static UUID_SCHEMA_ATTR_CLASS: &'static str = "aa0f193f-3010-4783-9c9e-f97edb14d8c2";
pub static UUID_SCHEMA_ATTR_UUID: &'static str = "642a893b-fe1a-4fe1-805d-fb78e7f83ee7";
pub static UUID_SCHEMA_ATTR_NAME: &'static str = "27be9127-5ba1-4c06-bce9-7250f2c7f630";
pub static UUID_SCHEMA_ATTR_PRINCIPAL_NAME: &'static str = "64dda3ac-12cb-4000-9b30-97a92767ccab";
pub static UUID_SCHEMA_ATTR_DESCRIPTION: &'static str = "a4da35a2-c5fb-4f8f-a341-72cd39ec9eee";
pub static UUID_SCHEMA_ATTR_SYSTEM: &'static str = "ee28df1e-cf02-49ca-80b5-8310fb619377";
pub static UUID_SCHEMA_ATTR_SECRET: &'static str = "0231c61a-0a43-4987-9293-8732ed9459fa";
pub static UUID_SCHEMA_ATTR_MULTIVALUE: &'static str = "8a6a8bf3-7053-42e2-8cda-15af7a197513";
pub static UUID_SCHEMA_ATTR_INDEX: &'static str = "2c5ff455-0709-4f67-a37c-35ff7e67bfff";
pub static UUID_SCHEMA_ATTR_SYNTAX: &'static str = "85e8c2c7-3852-48dd-bfc9-d0982a50e2ef";
pub static UUID_SCHEMA_ATTR_SYSTEMMAY: &'static str = "f3842165-90ad-4465-ad71-1de63f8c98a1";
pub static UUID_SCHEMA_ATTR_MAY: &'static str = "7adb7e2d-af8f-492e-8f1c-c5d9b7c47b5f";
pub static UUID_SCHEMA_ATTR_SYSTEMMUST: &'static str = "e2e4abc4-7083-41ea-a663-43d904d949ce";
pub static UUID_SCHEMA_ATTR_MUST: &'static str = "40e88ca8-06d7-4a51-b538-1125e51c02e0";
pub static UUID_SCHEMA_CLASS_ATTRIBUTETYPE: &'static str = "ed65a356-a4d9-45a8-b4b9-5d40d9acdb7e";
pub static UUID_SCHEMA_CLASS_CLASSTYPE: &'static str = "ec1964f6-0c72-4373-954f-f3a603c5f8bb";
pub static UUID_SCHEMA_CLASS_OBJECT: &'static str = "579bb16d-1d85-4f8e-bb3b-6fc55af582fe";
pub static UUID_SCHEMA_CLASS_EXTENSIBLEOBJECT: &'static str =
"0fb2171d-372b-4d0d-9194-9a4d6846c324";
// system supplementary
pub static UUID_SCHEMA_ATTR_DISPLAYNAME: &'static str = "201bc966-954b-48f5-bf25-99ffed759861";
pub static UUID_SCHEMA_ATTR_MAIL: &'static str = "fae94676-720b-461b-9438-bfe8cfd7e6cd";
pub static UUID_SCHEMA_ATTR_MEMBEROF: &'static str = "2ff1abc8-2f64-4f41-9e3d-33d44616a112";
pub static UUID_SCHEMA_ATTR_SSH_PUBLICKEY: &'static str = "52f2f13f-d35c-4cca-9f43-90a12c968f72";
pub static UUID_SCHEMA_ATTR_PASSWORD: &'static str = "a5121082-be54-4624-a307-383839b0366b";
pub static UUID_SCHEMA_ATTR_MEMBER: &'static str = "cbb7cb55-1d48-4b89-8da7-8d570e755b47";
pub static UUID_SCHEMA_ATTR_VERSION: &'static str = "896d5095-b3ae-451e-a91f-4314165b5395";
pub static UUID_SCHEMA_ATTR_DOMAIN: &'static str = "c9926716-eaaa-4c83-a1ab-1ed4372a7491";
pub static UUID_SCHEMA_CLASS_PERSON: &'static str = "86c4d9e8-3820-45d7-8a8c-d3c522287010";
pub static UUID_SCHEMA_CLASS_GROUP: &'static str = "c0e4e58c-1a2e-4bc3-ad56-5950ef810ea7";
pub static UUID_SCHEMA_CLASS_ACCOUNT: &'static str = "8bbff87c-1731-455e-a0e7-bf1d0908e983";
pub static UUID_SCHEMA_CLASS_SYSTEM_INFO: &'static str = "510b2a38-0577-4680-b0ad-836ca3415e6c";

View file

@ -14,7 +14,7 @@ use super::config::Configuration;
use super::event::{CreateEvent, SearchEvent};
use super::filter::Filter;
use super::log;
use super::proto_v1::{CreateRequest, Response, SearchRequest, SearchResponse};
use super::proto_v1::{CreateRequest, SearchRequest};
use super::server;
struct AppState {
@ -157,9 +157,14 @@ pub fn create_server_core(config: Configuration) {
// Until this point, we probably want to write to stderr
// Start up the logging system: for now it just maps to stderr
// The log server is started on it's own thread
let log_addr = log::start();
log_event!(log_addr, "Starting rsidm with configuration: {:?}", config);
// Similar, create a stats thread which aggregates statistics from the
// server as they come in.
// Start the query server with the given be path: future config
let server_addr = server::start(log_addr.clone(), config.db_path.as_str(), config.threads);
// Copy the max size

View file

@ -1,5 +1,6 @@
// use serde_json::{Error, Value};
use super::proto_v1::Entry as ProtoEntry;
use filter::Filter;
use std::collections::btree_map::{Iter as BTreeIter, IterMut as BTreeIterMut};
use std::collections::BTreeMap;
use std::slice::Iter as SliceIter;
@ -159,7 +160,7 @@ impl Entry {
})
}
pub fn attribute_substring(&self, attr: &str, subvalue: &str) -> bool {
pub fn attribute_substring(&self, _attr: &str, _subvalue: &str) -> bool {
unimplemented!();
}
@ -183,6 +184,10 @@ impl Entry {
}
}
pub fn filter_from_attrs(&self, attrs: Vec<&str>) -> Filter {
unimplemented!()
}
// FIXME: Can we consume protoentry?
pub fn from(e: &ProtoEntry) -> Self {
// Why not the trait? In the future we may want to extend
@ -278,51 +283,11 @@ struct User {
credentials: Vec<Credential>,
}
impl User {
pub fn new(username: &str, displayname: &str) -> Self {
// Build a blank value
User {
username: String::from(username),
class: Vec::new(),
displayname: String::from(displayname),
legalname: None,
email: Vec::new(),
memberof: Vec::new(),
sshpublickey: Vec::new(),
credentials: Vec::new(),
}
}
// We need a way to "diff" two User objects
// as on a modification we want to track the set of changes
// that is occuring -- needed for indexing to function.
// Basically we just need to check if it changed, remove
// the "former" and add the "newer" value.
// We have to sort vecs ...
// Is there a way to call this on serialise?
fn validate(&self) -> Result<(), ()> {
// Given a schema, validate our object is sane.
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{Entry, User};
use serde_json;
#[test]
fn test_user_basic() {
let u: User = User::new("william", "William Brown");
let d = serde_json::to_string_pretty(&u).unwrap();
let _u2: User = serde_json::from_str(d.as_str()).unwrap();
}
#[test]
fn test_entry_basic() {
let mut e: Entry = Entry::new();

View file

@ -35,14 +35,16 @@ impl Filter {
// If an or/not/and condition has no items, remove it
//
// If its the root item?
self.clone()
// self.clone()
unimplemented!()
}
// This is probably not safe, so it's for internal test cases
// only because I'm familiar with the syntax ... you have been warned.
fn from_ldap_string(_ldap_string: String) -> Result<Self, ()> {
unimplemented!()
// For now return an empty filters
Ok(Filter::And(Vec::new()))
// Ok(Filter::And(Vec::new()))
}
// What other parse types do we need?

View file

@ -20,6 +20,8 @@ extern crate regex;
#[macro_use]
extern crate lazy_static;
extern crate concread;
// use actix::prelude::*;
// use actix_web::{
// http, middleware, App, AsyncResponder, FutureResponse, HttpRequest, HttpResponse, Path, State,
@ -33,6 +35,7 @@ mod log;
#[macro_use]
mod audit;
mod be;
mod constants;
mod entry;
mod event;
mod identity;

View file

@ -1,5 +1,4 @@
use actix::prelude::*;
use serde_json;
use super::audit::AuditScope;

431
src/lib/plugins/base.rs Normal file
View file

@ -0,0 +1,431 @@
use plugins::Plugin;
use uuid::Uuid;
use audit::AuditScope;
use be::{BackendTransaction, BackendReadTransaction, BackendWriteTransaction};
use entry::Entry;
use error::OperationError;
use event::CreateEvent;
use filter::Filter;
use schema::{SchemaTransaction, SchemaWriteTransaction};
// TO FINISH
/*
Add normalisation step
Add filter normaliser to search.
Add principal name generation
*/
pub struct Base {}
impl Plugin for Base {
fn id() -> &'static str {
"Base"
}
// Need to be given the backend(for testing ease)
// audit
// the mut set of entries to create
// the create event itself (immutable, for checking originals)
// contains who is creating them
// the schema of the running instance
fn pre_create(
be: &BackendWriteTransaction,
au: &mut AuditScope,
cand: &mut Vec<Entry>,
_ce: &CreateEvent,
_schema: &SchemaWriteTransaction,
) -> Result<(), OperationError> {
// For each candidate
for entry in cand.iter_mut() {
let name_uuid = String::from("uuid");
audit_log!(au, "Base check on entry: {:?}", entry);
// First, ensure we have the 'object', class in the class set.
entry.add_ava(String::from("class"), String::from("object"));
audit_log!(au, "Object should now be in entry: {:?}", entry);
// If they have a name, but no principal name, derive it.
// if they don't have uuid, create it.
// TODO: get_ava should have a str version for effeciency?
let mut c_uuid = match entry.get_ava(&name_uuid) {
Some(u) => {
// Actually check we have a value, could be empty array ...
// TODO: Should this be left to schema to assert the value?
if u.len() > 1 {
audit_log!(au, "Entry defines uuid attr, but multiple values.");
return Err(OperationError::Plugin);
};
let v = match u.first() {
Some(v) => v,
None => {
// TODO: Should this be forgiving and just generate the UUID?
audit_log!(au, "Entry defines uuid attr, but no value.");
return Err(OperationError::Plugin);
}
};
// This could actually fail, so we probably need to handle
// this better ....
// TODO: Make this a SCHEMA check, not a manual one.
//
match Uuid::parse_str(v.as_str()) {
Ok(up) => up,
Err(_) => {
audit_log!(
au,
"Entry contains invalid Base content, rejecting out of principle."
);
return Err(OperationError::Plugin);
}
}
}
None => Uuid::new_v4(),
};
// Make it a string, so we can filter.
let str_uuid = format!("{}", c_uuid);
let mut au_be = AuditScope::new("be_exist");
// We need to clone to the filter because it owns the content
let filt = Filter::Eq(name_uuid.clone(), str_uuid.clone());
let r = be.exists(&mut au_be, &filt);
au.append_scope(au_be);
// end the scope for the be operation.
match r {
Ok(b) => {
if b == true {
audit_log!(au, "Base already exists, rejecting.");
return Err(OperationError::Plugin);
}
}
Err(e) => {
audit_log!(au, "Backend error occured checking Base existance. {:?}", e);
return Err(OperationError::Plugin);
}
}
let str_uuid = format!("{}", c_uuid);
audit_log!(au, "Setting UUID {} to entry", str_uuid);
let ava_uuid: Vec<String> = vec![str_uuid];
entry.set_avas(name_uuid, ava_uuid);
audit_log!(au, "Final entry state: {:?}", entry);
}
// done!
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::super::Plugin;
use super::Base;
use audit::AuditScope;
use be::{Backend, BackendWriteTransaction};
use entry::Entry;
use event::CreateEvent;
use schema::{Schema, SchemaWriteTransaction};
macro_rules! run_pre_create_test {
(
$preload_entries:ident,
$create_entries:ident,
$ident:ident,
$internal:ident,
$test_fn:expr
) => {{
let mut au = AuditScope::new("run_pre_create_test");
audit_segment!(au, || {
// Create an in memory BE
let be = Backend::new(&mut au, "").unwrap();
let be_txn = be.write();
// TODO: Preload entries here!
if !$preload_entries.is_empty() {
assert!(be_txn.create(&mut au, &$preload_entries).is_ok());
};
let ce = CreateEvent::from_vec($create_entries.clone());
let mut schema_be = Schema::new(&mut au).unwrap();
let mut schema = schema_be.write();
schema.bootstrap_core(&mut au).unwrap();
let mut au_test = AuditScope::new("pre_create_test");
audit_segment!(au_test, || $test_fn(
&be_txn,
&mut au_test,
&mut $create_entries,
&ce,
&schema,
));
schema.commit();
be_txn.commit();
au.append_scope(au_test);
});
// Dump the raw audit log.
println!("{}", au);
}};
}
// Check empty create
#[test]
fn test_pre_create_empty() {
let preload: Vec<Entry> = Vec::new();
let mut create: Vec<Entry> = Vec::new();
run_pre_create_test!(
preload,
create,
false,
false,
|be: &BackendWriteTransaction,
au: &mut AuditScope,
cand: &mut Vec<Entry>,
ce: &CreateEvent,
schema: &SchemaWriteTransaction| {
let r = Base::pre_create(be, au, cand, ce, schema);
assert!(r.is_ok());
// Nothing should have changed.
assert!(cand.len() == 0);
}
);
}
// check create where no uuid
#[test]
fn test_pre_create_no_uuid() {
let preload: Vec<Entry> = Vec::new();
let e: Entry = serde_json::from_str(
r#"{
"attrs": {
"class": ["person"],
"name": ["testperson"],
"description": ["testperson"],
"displayname": ["testperson"]
}
}"#,
)
.unwrap();
let mut create = vec![e];
run_pre_create_test!(
preload,
create,
false,
false,
|be: &BackendWriteTransaction,
au: &mut AuditScope,
cand: &mut Vec<Entry>,
ce: &CreateEvent,
schema: &SchemaWriteTransaction| {
let r = Base::pre_create(be, au, cand, ce, schema);
assert!(r.is_ok());
// Assert that the entry contains the attr "uuid" now.
let ue = cand.first().unwrap();
assert!(ue.attribute_pres("uuid"));
}
);
}
// check unparseable uuid
#[test]
fn test_pre_create_uuid_invalid() {
let preload: Vec<Entry> = Vec::new();
let e: Entry = serde_json::from_str(
r#"{
"attrs": {
"class": ["person"],
"name": ["testperson"],
"description": ["testperson"],
"displayname": ["testperson"],
"uuid": ["xxxxxx"]
}
}"#,
)
.unwrap();
let mut create = vec![e.clone()];
run_pre_create_test!(
preload,
create,
false,
false,
|be: &BackendWriteTransaction,
au: &mut AuditScope,
cand: &mut Vec<Entry>,
ce: &CreateEvent,
schema: &SchemaWriteTransaction| {
let r = Base::pre_create(be, au, cand, ce, schema);
assert!(r.is_err());
}
);
}
// check entry where uuid is empty list
#[test]
fn test_pre_create_uuid_empty() {
let preload: Vec<Entry> = Vec::new();
let e: Entry = serde_json::from_str(
r#"{
"attrs": {
"class": ["person"],
"name": ["testperson"],
"description": ["testperson"],
"displayname": ["testperson"],
"uuid": []
}
}"#,
)
.unwrap();
let mut create = vec![e.clone()];
run_pre_create_test!(
preload,
create,
false,
false,
|be: &BackendWriteTransaction,
au: &mut AuditScope,
cand: &mut Vec<Entry>,
ce: &CreateEvent,
schema: &SchemaWriteTransaction| {
let r = Base::pre_create(be, au, cand, ce, schema);
assert!(r.is_err());
}
);
}
// check create where provided uuid is valid. It should be unchanged.
#[test]
fn test_pre_create_uuid_valid() {
let preload: Vec<Entry> = Vec::new();
let e: Entry = serde_json::from_str(
r#"{
"attrs": {
"class": ["person"],
"name": ["testperson"],
"description": ["testperson"],
"displayname": ["testperson"],
"uuid": ["79724141-3603-4060-b6bb-35c72772611d"]
}
}"#,
)
.unwrap();
let mut create = vec![e.clone()];
run_pre_create_test!(
preload,
create,
false,
false,
|be: &BackendWriteTransaction,
au: &mut AuditScope,
cand: &mut Vec<Entry>,
ce: &CreateEvent,
schema: &SchemaWriteTransaction| {
let r = Base::pre_create(be, au, cand, ce, schema);
assert!(r.is_ok());
let ue = cand.first().unwrap();
assert!(ue.attribute_equality("uuid", "79724141-3603-4060-b6bb-35c72772611d"));
}
);
}
#[test]
fn test_pre_create_uuid_valid_multi() {
let preload: Vec<Entry> = Vec::new();
let e: Entry = serde_json::from_str(
r#"{
"attrs": {
"class": ["person"],
"name": ["testperson"],
"description": ["testperson"],
"displayname": ["testperson"],
"uuid": ["79724141-3603-4060-b6bb-35c72772611d", "79724141-3603-4060-b6bb-35c72772611d"]
}
}"#,
)
.unwrap();
let mut create = vec![e.clone()];
run_pre_create_test!(
preload,
create,
false,
false,
|be: &BackendWriteTransaction,
au: &mut AuditScope,
cand: &mut Vec<Entry>,
ce: &CreateEvent,
schema: &SchemaWriteTransaction| {
let r = Base::pre_create(be, au, cand, ce, schema);
assert!(r.is_err());
}
);
}
// check create where uuid already exists.
#[test]
fn test_pre_create_uuid_exist() {
let e: Entry = serde_json::from_str(
r#"{
"attrs": {
"class": ["person"],
"name": ["testperson"],
"description": ["testperson"],
"displayname": ["testperson"],
"uuid": ["79724141-3603-4060-b6bb-35c72772611d"]
}
}"#,
)
.unwrap();
let mut create = vec![e.clone()];
let preload = vec![e];
run_pre_create_test!(
preload,
create,
false,
false,
|be: &BackendWriteTransaction,
au: &mut AuditScope,
cand: &mut Vec<Entry>,
ce: &CreateEvent,
schema: &SchemaWriteTransaction| {
let r = Base::pre_create(be, au, cand, ce, schema);
assert!(r.is_err());
}
);
}
// check create where uuid is a well-known
// WARNING: This actually requires me to implement backend migrations and
// creation of default objects in the DB on new() if they don't exist, and
// to potentially support migrations of said objects.
}

View file

@ -1,21 +1,23 @@
use audit::AuditScope;
use be::Backend;
use be::{BackendTransaction, BackendWriteTransaction};
use entry::Entry;
use error::OperationError;
use event::CreateEvent;
use schema::Schema;
use schema::{SchemaTransaction, SchemaWriteTransaction};
mod base;
mod protected;
trait Plugin {
fn id() -> &'static str;
fn pre_create(
be: &mut Backend,
au: &mut AuditScope,
cand: &mut Vec<Entry>,
ce: &CreateEvent,
schema: &Schema,
// TODO: I think this is wrong, it should be a query server
_be: &BackendWriteTransaction,
_au: &mut AuditScope,
_cand: &mut Vec<Entry>,
_ce: &CreateEvent,
_schema: &SchemaWriteTransaction,
) -> Result<(), OperationError> {
Ok(())
}
@ -53,7 +55,7 @@ pub struct Plugins {}
macro_rules! run_pre_create_plugin {
(
$be:ident,
$be_txn:ident,
$au:ident,
$cand:ident,
$ce:ident,
@ -62,7 +64,7 @@ macro_rules! run_pre_create_plugin {
) => {{
let mut audit_scope = AuditScope::new(<($target_plugin)>::id());
let r = audit_segment!(audit_scope, || <($target_plugin)>::pre_create(
$be,
$be_txn,
&mut audit_scope,
$cand,
$ce,
@ -75,15 +77,15 @@ macro_rules! run_pre_create_plugin {
impl Plugins {
pub fn run_pre_create(
be: &mut Backend,
be_txn: &BackendWriteTransaction,
au: &mut AuditScope,
cand: &mut Vec<Entry>,
ce: &CreateEvent,
schema: &Schema,
schema: &SchemaWriteTransaction,
) -> Result<(), OperationError> {
audit_segment!(audit_plugin_pre, || {
audit_segment!(au, || {
// map chain?
let base_res = run_pre_create_plugin!(be, au, cand, ce, schema, base::Base);
let base_res = run_pre_create_plugin!(be_txn, au, cand, ce, schema, base::Base);
// TODO, actually return the right thing ...
base_res

View file

@ -0,0 +1,2 @@
// Objects matching some filter condition should
// be protected from modification / deletion

View file

@ -62,3 +62,16 @@ impl CreateRequest {
CreateRequest { entries: entries }
}
}
// Login is a multi-step process potentially. First the client says who they
// want to request
//
// we respond with a set of possible authentications that can proceed, and perhaps
// we indicate which options must/may?
//
// The client can then step and negotiate each.
//
// This continues until a LoginSuccess, or LoginFailure is returned.
//
// On loginSuccess, we send a cookie, and that allows the token to be
// generated. The cookie can be shared between servers.

File diff suppressed because it is too large Load diff

View file

@ -1,34 +1,86 @@
use actix::prelude::*;
// This is really only used for long lived, high level types that need clone
// that otherwise can't be cloned. Think Mutex.
use std::sync::Arc;
use audit::AuditScope;
use be::{Backend, BackendError};
use be::{Backend, BackendError, BackendReadTransaction, BackendTransaction, BackendWriteTransaction};
use entry::Entry;
use error::OperationError;
use event::{CreateEvent, OpResult, SearchEvent, SearchResult};
use filter::Filter;
use log::EventLog;
use plugins::Plugins;
use schema::Schema;
use schema::{Schema, SchemaTransaction, SchemaWriteTransaction};
pub fn start(log: actix::Addr<EventLog>, path: &str, threads: usize) -> actix::Addr<QueryServer> {
let mut audit = AuditScope::new("server_start");
let log_inner = log.clone();
let qs_addr = audit_segment!(audit, || {
// Create the BE connection
// probably need a config type soon ....
// Create a new backend audit scope
// Create "just enough" schema for us to be able to load from
// disk ... Schema loading is one time where we validate the
// entries as we read them, so we need this here.
// FIXME: Handle results in start correctly
let schema = Arc::new(Schema::new(&mut audit).unwrap());
let mut audit_be = AuditScope::new("backend_new");
let be = Backend::new(&mut audit_be, path);
audit.append_scope(audit_be);
let be = Backend::new(&mut audit_be, path).unwrap();
{
// Create a new backend audit scope
let mut be_txn = be.write();
let mut schema_write = schema.write();
audit.append_scope(audit_be);
// Now, we have the initial schema in memory. Use this to trigger
// an index of the be for the core schema.
// Now search for the schema itself, and validate that the system
// in memory matches the BE on disk, and that it's syntactically correct.
// Write it out if changes are needed.
// Now load the remaining backend schema into memory.
// TODO: Schema elements should be versioned individually.
schema_write.bootstrap_core(&mut audit).unwrap();
// TODO: Backend setup indexes as needed from schema, for the core
// system schema.
// TODO: Trigger an index? This could be costly ...
// Perhaps a config option to say if we index on startup or not.
schema_write.commit();
be_txn.commit();
}
// Create a temporary query_server implementation
let query_server = QueryServer::new(log_inner.clone(), be.clone(), schema.clone());
// Start the qs txn
let query_server_write = query_server.write();
// TODO: Create required system objects if they are missing
// These will each manage their own transaction per operation, so the
// we don't need to maintain the be_txn again.
// First, check the system_info object. This stores some server information
// and details. It's a pretty static thing.
let mut audit_si = AuditScope::new("start_system_info");
audit_segment!(audit_si, || start_system_info(&mut audit_si, &query_server_write));
audit.append_scope(audit_si);
// Check the anonymous object exists (migrations).
let mut audit_an = AuditScope::new("start_anonymous");
audit_segment!(audit_an, || start_anonymous(&mut audit_an, &query_server_write));
audit.append_scope(audit_an);
// Check the admin object exists (migrations).
// Load access profiles and configure them.
// We are good to go! Finally commit and consume the txn.
query_server_write.commit();
let mut schema = Schema::new();
schema.bootstrap_core();
// now we clone it out in the startup I think
// Should the be need a log clone ref? or pass it around?
// it probably needs it ...
// audit.end_event("server_new");
SyncArbiter::start(threads, move || {
QueryServer::new(log_inner.clone(), be.clone(), schema.clone())
})
@ -37,6 +89,55 @@ pub fn start(log: actix::Addr<EventLog>, path: &str, threads: usize) -> actix::A
qs_addr
}
fn start_system_info(audit: &mut AuditScope, qs: &QueryServerWriteTransaction) {
// FIXME: Get the domain from the config
let e: Entry = serde_json::from_str(
r#"{
"attrs": {
"class": ["object", "system_info"],
"name": ["system_info"],
"uuid": [],
"description": ["System info and metadata object."],
"version": ["1"],
"domain": ["example.com"]
}
}"#,
)
.unwrap();
// Does it exist?
// if yes, load
// if no, create
// TODO: internal_create function to allow plugin + schema checks
// check it's version
// migrate
qs.internal_assert_or_create(e);
}
fn start_anonymous(audit: &mut AuditScope, qs: &QueryServerWriteTransaction) {
// Does it exist?
let e: Entry = serde_json::from_str(
r#"{
"attrs": {
"class": ["object", "account"],
"name": ["anonymous"],
"uuid": [],
"description": ["Anonymous access account."],
"version": ["1"]
}
}"#,
)
.unwrap();
// if yes, load
// if no, create
// check it's version
// migrate
qs.internal_migrate_or_create(e);
}
// This is the core of the server. It implements all
// the search and modify actions, applies access controls
// and get's everything ready to push back to the fe code
@ -44,32 +145,13 @@ pub fn start(log: actix::Addr<EventLog>, path: &str, threads: usize) -> actix::A
// This is it's own actor, so we can have a write addr and a read addr,
// and it allows serialisation that way rather than relying on
// the backend
pub trait QueryServerReadTransaction {
type BackendTransactionType: BackendReadTransaction;
pub struct QueryServer {
log: actix::Addr<EventLog>,
// be: actix::Addr<BackendActor>,
// This probably needs to be Arc, or a ref. How do we want to manage this?
// I think the BE is build, configured and cloned? Maybe Backend
// is a wrapper type to Arc<BackendInner> or something.
be: Backend,
schema: Schema,
}
fn get_be_txn(&self) -> &Self::BackendTransactionType;
impl QueryServer {
pub fn new(log: actix::Addr<EventLog>, be: Backend, schema: Schema) -> Self {
log_event!(log, "Starting query worker ...");
QueryServer {
log: log,
be: be,
schema: schema,
}
}
// Actually conduct a search request
// This is the core of the server, as it processes the entire event
// applies all parts required in order and more.
pub fn search(
&mut self,
fn search(
&self,
au: &mut AuditScope,
se: &SearchEvent,
) -> Result<Vec<Entry>, OperationError> {
@ -81,11 +163,12 @@ impl QueryServer {
// TODO: Normalise the filter
// TODO: Assert access control allows the filter and requested attrs.
// TODO: Pre-search plugins
let mut audit_be = AuditScope::new("backend_search");
let res = self
.be
let res = self.get_be_txn()
.search(&mut audit_be, &se.filter)
.map(|r| r)
.map_err(|_| OperationError::Backend);
@ -99,6 +182,83 @@ impl QueryServer {
res
}
// Specialisation of search for exists or not
fn internal_exists(&self, filter: Filter) -> Result<bool, ()> {
unimplemented!()
}
fn internal_search(&self, filter: Filter) -> Result<(), ()> {
unimplemented!()
}
}
pub struct QueryServerTransaction {
be_txn: BackendTransaction,
// Anything else? In the future, we'll need to have a schema transaction
// type, maybe others?
schema: SchemaTransaction,
}
// Actually conduct a search request
// This is the core of the server, as it processes the entire event
// applies all parts required in order and more.
impl QueryServerReadTransaction for QueryServerTransaction {
type BackendTransactionType = BackendTransaction;
fn get_be_txn(&self) -> &BackendTransaction {
&self.be_txn
}
}
pub struct QueryServerWriteTransaction<'a> {
committed: bool,
// be_write_txn: BackendWriteTransaction,
// schema_write: SchemaWriteTransaction,
// read: QueryServerTransaction,
be_txn: BackendWriteTransaction,
schema: SchemaWriteTransaction<'a>,
}
impl<'a> QueryServerReadTransaction for QueryServerWriteTransaction<'a> {
type BackendTransactionType = BackendWriteTransaction;
fn get_be_txn(&self) -> &BackendWriteTransaction {
&self.be_txn
}
}
pub struct QueryServer {
log: actix::Addr<EventLog>,
// be: actix::Addr<BackendActor>,
// This probably needs to be Arc, or a ref. How do we want to manage this?
// I think the BE is build, configured and cloned? Maybe Backend
// is a wrapper type to Arc<BackendInner> or something.
be: Backend,
schema: Arc<Schema>,
}
impl QueryServer {
pub fn new(log: actix::Addr<EventLog>, be: Backend, schema: Arc<Schema>) -> Self {
log_event!(log, "Starting query worker ...");
QueryServer {
log: log,
be: be,
schema: schema,
}
}
pub fn read(&self) -> QueryServerTransaction {
unimplemented!()
}
pub fn write(&self) -> QueryServerWriteTransaction {
unimplemented!()
}
}
impl<'a> QueryServerWriteTransaction<'a> {
pub fn create(&mut self, au: &mut AuditScope, ce: &CreateEvent) -> Result<(), OperationError> {
// The create event is a raw, read only representation of the request
// that was made to us, including information about the identity
@ -112,15 +272,13 @@ impl QueryServer {
// Copy the entries to a writeable form.
let mut candidates: Vec<Entry> = ce.entries.iter().map(|er| er.clone()).collect();
// Start a txn
// run any pre plugins, giving them the list of mutable candidates.
// pre-plugins are defined here in their correct order of calling!
// I have no intent to make these dynamic or configurable.
let mut audit_plugin_pre = AuditScope::new("plugin_pre_create");
let plug_pre_res = Plugins::run_pre_create(
&mut self.be,
&self.be_txn,
&mut audit_plugin_pre,
&mut candidates,
ce,
@ -147,13 +305,18 @@ impl QueryServer {
return r;
}
// FIXME: Normalise all entries now.
// Normalise all the data now it's validated.
// FIXME: This normalisation COPIES everything, which may be
// slow.
let norm_cand: Vec<Entry> = candidates
.iter()
.map(|e| self.schema.normalise_entry(&e))
.collect();
let mut audit_be = AuditScope::new("backend_create");
// We may change from ce.entries later to something else?
let res = self
.be
.create(&mut audit_be, &candidates)
let res = self.be_txn
.create(&mut audit_be, &norm_cand)
.map(|_| ())
.map_err(|e| match e {
BackendError::EmptyRequest => OperationError::EmptyRequest,
@ -162,18 +325,91 @@ impl QueryServer {
au.append_scope(audit_be);
if res.is_err() {
// be_txn is dropped, ie aborted here.
audit_log!(au, "Create operation failed (backend), {:?}", r);
return res;
}
// Run any post plugins
// Commit the txn
// let commit, commit!
// be_txn.commit();
// We are complete, finalise logging and return
audit_log!(au, "Create operation success");
res
}
// internal server operation types.
// These just wrap the fn create/search etc, but they allow
// creating the needed create event with the correct internal flags
// and markers. They act as though they have the highest level privilege
// IE there are no access control checks.
pub fn internal_exists_or_create(&self, e: Entry) -> Result<(), ()> {
// If the thing exists, stop.
// if not, create from Entry.
unimplemented!()
}
pub fn internal_migrate_or_create(&self, e: Entry) -> Result<(), ()> {
// if the thing exists, ensure the set of attributes on
// Entry A match and are present (but don't delete multivalue, or extended
// attributes in the situation.
// If not exist, create from Entry B
//
// WARNING: this requires schema awareness for multivalue types!
// We need to either do a schema aware merge, or we just overwrite those
// few attributes.
//
// This will extra classes an attributes alone!
unimplemented!()
}
// Should this take a be_txn?
pub fn internal_assert_or_create(&self, e: Entry) -> Result<(), ()> {
// If exists, ensure the object is exactly as provided
// else, if not exists, create it. IE no extra or excess
// attributes and classes.
// Create a filter from the entry for assertion.
let filt = e.filter_from_attrs(vec!["name"]);
// Does it exist?
match self.internal_exists(filt) {
Ok(true) => {
// it exists. We need to ensure the content now.
unimplemented!()
}
Ok(false) => {
// It does not exist. Create it.
unimplemented!()
}
Err(e) => {
// An error occured. pass it back up.
Err(())
}
}
// If exist, check.
// if not the same, delete, then create
// If not exist, create.
}
// These are where searches and other actions are actually implemented. This
// is the "internal" version, where we define the event as being internal
// only, allowing certain plugin by passes etc.
pub fn internal_create(qs: &QueryServer) -> Result<(), ()> {
// This will call qs.create(), after we generate a createEvent with internal
// types etc.
unimplemented!()
}
pub fn commit(self) -> Result<(), ()> {
unimplemented!()
}
}
impl Actor for QueryServer {
@ -198,20 +434,21 @@ impl Handler<SearchEvent> for QueryServer {
let mut audit = AuditScope::new("search");
let res = audit_segment!(&mut audit, || {
audit_log!(audit, "Begin event {:?}", msg);
// Begin a read
let qs_read = self.read();
// Parse what we need from the event?
// What kind of event is it?
// In the future we'll likely change search event ...
// End the read
// was this ok?
match self.search(&mut audit, &msg) {
match qs_read.search(&mut audit, &msg) {
Ok(entries) => Ok(SearchResult::new(entries)),
Err(e) => Err(e),
}
// audit_log!(audit, "End event {:?}", msg);
// audit.end_event("search");
});
// At the end of the event we send it for logging.
self.log.do_send(audit);
@ -227,14 +464,18 @@ impl Handler<CreateEvent> for QueryServer {
let res = audit_segment!(&mut audit, || {
audit_log!(audit, "Begin create event {:?}", msg);
match self.create(&mut audit, &msg) {
Ok(()) => Ok(OpResult {}),
let mut qs_write = self.write();
match qs_write.create(&mut audit, &msg) {
Ok(()) => {
qs_write.commit();
Ok(OpResult {})
}
Err(e) => Err(e),
}
});
// At the end of the event we send it for logging.
self.log.do_send(audit);
// At the end of the event we send it for logging.
res
}
}
@ -249,11 +490,12 @@ mod tests {
extern crate futures;
use futures::future;
use futures::future::Future;
use std::sync::Arc;
extern crate tokio;
use super::super::audit::AuditScope;
use super::super::be::Backend;
use super::super::be::{Backend, BackendTransaction};
use super::super::entry::Entry;
use super::super::event::{CreateEvent, SearchEvent};
use super::super::filter::Filter;
@ -261,7 +503,7 @@ mod tests {
use super::super::proto_v1::Entry as ProtoEntry;
use super::super::proto_v1::{CreateRequest, SearchRequest};
use super::super::schema::Schema;
use super::super::server::QueryServer;
use super::super::server::{QueryServer, QueryServerWriteTransaction, QueryServerReadTransaction};
macro_rules! run_test {
($test_fn:expr) => {{
@ -269,10 +511,14 @@ mod tests {
let mut audit = AuditScope::new("run_test");
let test_log = log::start();
let be = Backend::new(&mut audit, "");
let mut schema = Schema::new();
schema.bootstrap_core();
let test_server = QueryServer::new(test_log.clone(), be, schema);
let be = Backend::new(&mut audit, "").unwrap();
let mut schema_outer = Schema::new(&mut audit).unwrap();
{
let mut schema = schema_outer.write();
schema.bootstrap_core(&mut audit).unwrap();
schema.commit();
}
let test_server = QueryServer::new(test_log.clone(), be, Arc::new(schema_outer));
// Could wrap another future here for the future::ok bit...
let fut = $test_fn(test_log.clone(), test_server, &mut audit);
@ -291,6 +537,7 @@ mod tests {
#[test]
fn test_be_create_user() {
run_test!(|_log, mut server: QueryServer, audit: &mut AuditScope| {
let mut server_txn = server.write();
let filt = Filter::Pres(String::from("name"));
let se1 = SearchEvent::from_request(SearchRequest::new(filt.clone()));
@ -313,18 +560,20 @@ mod tests {
let ce = CreateEvent::from_vec(expected.clone());
let r1 = server.search(audit, &se1).unwrap();
let r1 = server_txn.search(audit, &se1).unwrap();
assert!(r1.len() == 0);
let cr = server.create(audit, &ce);
let cr = server_txn.create(audit, &ce);
assert!(cr.is_ok());
let r2 = server.search(audit, &se2).unwrap();
let r2 = server_txn.search(audit, &se2).unwrap();
println!("--> {:?}", r2);
assert!(r2.len() == 1);
assert_eq!(r2, expected);
assert!(server_txn.commit().is_ok());
future::ok(())
});
}

View file

@ -1,4 +1,3 @@
#[macro_use]
extern crate actix;
extern crate rsidm;