Upgrade dependencies, with the major highlight as the upgrade to tokio 1.0
This commit is contained in:
Firstyear 2021-01-10 13:41:56 +10:00 committed by GitHub
parent 0f6bc36cee
commit 3844aadf60
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 474 additions and 578 deletions

783
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -4,6 +4,7 @@ IMAGE_BASE ?= kanidm
IMAGE_VERSION ?= devel
EXT_OPTS ?=
IMAGE_ARCH ?= "linux/amd64,linux/arm64"
ARGS ?= --build-arg "SCCACHE_REDIS=redis://172.24.20.4:6379"
.DEFAULT: help
help:
@ -11,7 +12,8 @@ help:
buildx/kanidmd: ## build multiarch server images
buildx/kanidmd:
@docker buildx build --push --platform $(IMAGE_ARCH) -f kanidmd/Dockerfile -t $(IMAGE_BASE)/server:$(IMAGE_VERSION) .
echo @docker buildx build --push --platform $(IMAGE_ARCH) -f kanidmd/Dockerfile -t $(IMAGE_BASE)/server:$(IMAGE_VERSION) $(ARGS) .
@docker buildx build --push --platform $(IMAGE_ARCH) -f kanidmd/Dockerfile -t $(IMAGE_BASE)/server:$(IMAGE_VERSION) $(ARGS) .
@docker buildx imagetools inspect $(IMAGE_BASE)/server:$(IMAGE_VERSION)
buildx/radiusd: ## build multiarch radius images

View file

@ -11,8 +11,8 @@ repository = "https://github.com/kanidm/kanidm/"
[dependencies]
log = "0.4"
env_logger = "0.7"
reqwest = { version = "0.10", features=["blocking", "cookies", "json", "native-tls"] }
env_logger = "0.8"
reqwest = { version = "0.11", features=["blocking", "cookies", "json", "native-tls"] }
kanidm_proto = { path = "../kanidm_proto", version = "1.1.0-alpha" }
serde = "1.0"
serde_json = "1.0"
@ -28,7 +28,8 @@ webauthn-rs = "0.3.0-alpha.1"
# webauthn-rs = { path = "../../webauthn-rs" }
[dev-dependencies]
tokio = { version = "0.2", features = ["full"] }
# tokio = { version = "0.2", features = ["full"] }
tokio = { version = "1", features = ["rt", "net", "time", "macros", "sync", "signal"] }
kanidm = { path = "../kanidmd" }
futures = "0.3"
async-std = "1.6"

View file

@ -30,8 +30,8 @@ pub fn run_test(test_fn: fn(KanidmClient) -> ()) {
.is_test(true)
.try_init();
let (mut ready_tx, mut ready_rx) = mpsc::channel(1);
let (mut finish_tx, mut finish_rx) = mpsc::channel(1);
let (ready_tx, mut ready_rx) = mpsc::channel(1);
let (finish_tx, mut finish_rx) = mpsc::channel(1);
let mut counter = 0;
let port = loop {
@ -63,8 +63,7 @@ pub fn run_test(test_fn: fn(KanidmClient) -> ()) {
let t_handle = thread::spawn(move || {
// Spawn a thread for the test runner, this should have a unique
// port....
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to start tokio");

View file

@ -16,7 +16,7 @@ uuid = { version = "0.8", features = ["serde", "v4"] }
zxcvbn = { version = "2.0", features = ["ser"] }
base32 = "0.4"
thiserror = "1.0"
webauthn-rs = "0.3.0-alpha.1"
webauthn-rs = "0.3.0-alpha.5"
# webauthn-rs = { path = "../../webauthn-rs" }
[dev-dependencies]

View file

@ -29,10 +29,10 @@ path = "src/badlist_preprocess.rs"
[dependencies]
kanidm_client = { path = "../kanidm_client", version = "1.1.0-alpha.2" }
kanidm_proto = { path = "../kanidm_proto", version = "1.1.0-alpha.2" }
rpassword = "4.0"
rpassword = "5.0"
structopt = { version = "0.3", default-features = false }
log = "0.4"
env_logger = "0.7"
env_logger = "0.8"
serde = "1.0"
serde_json = "1.0"
shellexpand = "2.0"
@ -41,5 +41,5 @@ time = "0.2"
zxcvbn = "2.0"
webauthn-authenticator-rs = "0.3.0-alpha.1"
webauthn-authenticator-rs = "0.3.0-alpha.5"
# webauthn-authenticator-rs = { path = "../../webauthn-authenticator-rs" }

View file

@ -44,15 +44,15 @@ kanidm_client = { path = "../kanidm_client", version = "1.1.0-alpha" }
kanidm_proto = { path = "../kanidm_proto", version = "1.1.0-alpha" }
kanidm = { path = "../kanidmd", version = "1.1.0-alpha" }
toml = "0.5"
rpassword = "4.0"
tokio = { version = "0.2", features = ["rt-threaded", "macros", "rt-util", "sync", "time", "net", "io-util", "signal"] }
tokio-util = { version = "0.3", features = ["codec"] }
rpassword = "5.0"
tokio = { version = "1", features = ["rt", "macros", "sync", "time", "net", "io-util", "signal"] }
tokio-util = { version = "0.6", features = ["codec"] }
futures = "0.3"
bytes = "0.5"
bytes = "1.0"
libc = "0.2"
log = "0.4"
env_logger = "0.7"
env_logger = "0.8"
serde = "1.0"
serde_derive = "1.0"
serde_cbor = "0.11"
@ -63,9 +63,9 @@ rusqlite = { version = "0.23", features = ["backup"] }
r2d2 = "0.8"
r2d2_sqlite = "0.16"
reqwest = { version = "0.10" }
reqwest = { version = "0.11" }
users = "0.10"
users = "0.11"
async-std = "1.6"
lru = "0.6"

View file

@ -14,7 +14,6 @@ kanidm_unix_int = { path = "../", version = "1.1.0-alpha" }
# libnss = "0.2"
libnss = { git = "https://github.com/csnewman/libnss-rs.git", rev = "eab2d93d2438652773699b0807d558ce75b1e748" }
libc = "0.2.0"
paste = "0.1"
paste = "1.0"
lazy_static = "1.3"

View file

@ -11,6 +11,5 @@ path = "src/lib.rs"
[dependencies]
kanidm_unix_int = { path = "../", version = "1.1.0-alpha" }
futures = "0.3"
tokio = { version = "0.2", features=["full"] }
async-std = "1.6"
libc = "0.2"

View file

@ -20,10 +20,7 @@ use std::collections::BTreeSet;
use std::convert::TryFrom;
use std::ffi::CStr;
// use std::os::raw::c_char;
// use futures::executor::block_on;
use tokio::runtime::Runtime;
use async_std::task;
use kanidm_unix_common::client::call_daemon;
use kanidm_unix_common::unix_config::KanidmUnixdConfig;
use kanidm_unix_common::unix_proto::{ClientRequest, ClientResponse};
@ -95,12 +92,7 @@ impl PamHooks for PamKanidm {
let req = ClientRequest::PamAccountAllowed(account_id);
// PamResultCode::PAM_IGNORE
let mut rt = match Runtime::new() {
Ok(rt) => rt,
Err(_) => return PamResultCode::PAM_SERVICE_ERR,
};
match rt.block_on(call_daemon(cfg.sock_path.as_str(), req)) {
match task::block_on(call_daemon(cfg.sock_path.as_str(), req)) {
Ok(r) => match r {
ClientResponse::PamStatus(Some(true)) => {
// println!("PAM_SUCCESS");
@ -209,12 +201,7 @@ impl PamHooks for PamKanidm {
};
let req = ClientRequest::PamAuthenticate(account_id, authtok);
let mut rt = match Runtime::new() {
Ok(rt) => rt,
Err(_) => return PamResultCode::PAM_SERVICE_ERR,
};
match rt.block_on(call_daemon(cfg.sock_path.as_str(), req)) {
match task::block_on(call_daemon(cfg.sock_path.as_str(), req)) {
Ok(r) => match r {
ClientResponse::PamStatus(Some(true)) => {
// println!("PAM_SUCCESS");

View file

@ -25,7 +25,7 @@ dependencies = [
[[package]]
name = "pam_tester"
version = "0.1.0"
version = "0.1.2"
dependencies = [
"pam 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

View file

@ -1,3 +1,4 @@
use async_std::task;
use bytes::{BufMut, BytesMut};
use futures::SinkExt;
use futures::StreamExt;
@ -5,7 +6,6 @@ use std::error::Error;
use std::io::Error as IoError;
use std::io::ErrorKind;
use tokio::net::UnixStream;
use tokio::runtime::Runtime;
use tokio_util::codec::Framed;
use tokio_util::codec::{Decoder, Encoder};
@ -73,6 +73,5 @@ pub fn call_daemon_blocking(
path: &str,
req: ClientRequest,
) -> Result<ClientResponse, Box<dyn Error>> {
let mut rt = Runtime::new()?;
rt.block_on(call_daemon(path, req))
task::block_on(call_daemon(path, req))
}

View file

@ -218,7 +218,7 @@ async fn handle_client(
Ok(())
}
#[tokio::main(core_threads = 1, max_threads = 1)]
#[tokio::main]
async fn main() {
let cuid = get_current_uid();
let ceuid = get_effective_uid();
@ -398,7 +398,7 @@ async fn main() {
// Set the umask while we open the path
let before = unsafe { umask(0) };
let mut listener = match UnixListener::bind(cfg.sock_path.as_str()) {
let listener = match UnixListener::bind(cfg.sock_path.as_str()) {
Ok(l) => l,
Err(_e) => {
error!("Failed to bind unix socket.");
@ -411,10 +411,9 @@ async fn main() {
// TODO: Setup a task that handles pre-fetching here.
let server = async move {
let mut incoming = listener.incoming();
while let Some(socket_res) = incoming.next().await {
match socket_res {
Ok(socket) => {
loop {
match listener.accept().await {
Ok((socket, _addr)) => {
let cachelayer_ref = cachelayer.clone();
tokio::spawn(async move {
if let Err(e) = handle_client(socket, cachelayer_ref.clone()).await {

View file

@ -38,8 +38,8 @@ fn run_test(fix_fn: fn(&mut KanidmClient) -> (), test_fn: fn(CacheLayer, KanidmA
// ::std::env::set_var("RUST_LOG", "kanidm=debug");
let _ = env_logger::builder().is_test(true).try_init();
let (mut ready_tx, mut ready_rx) = mpsc::channel(1);
let (mut finish_tx, mut finish_rx) = mpsc::channel(1);
let (ready_tx, mut ready_rx) = mpsc::channel(1);
let (finish_tx, mut finish_rx) = mpsc::channel(1);
let mut counter = 0;
let port = loop {
@ -70,8 +70,7 @@ fn run_test(fix_fn: fn(&mut KanidmClient) -> (), test_fn: fn(CacheLayer, KanidmA
let t_handle = thread::spawn(move || {
// Spawn a thread for the test runner, this should have a unique
// port....
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to start tokio");
@ -177,7 +176,7 @@ fn test_fixture(rsclient: &mut KanidmClient) -> () {
#[test]
fn test_cache_sshkey() {
run_test(test_fixture, |cachelayer, _adminclient| {
let mut rt = Runtime::new().expect("Failed to start tokio");
let rt = Runtime::new().expect("Failed to start tokio");
let fut = async move {
// Force offline. Show we have no keys.
cachelayer.mark_offline().await;
@ -213,7 +212,7 @@ fn test_cache_sshkey() {
#[test]
fn test_cache_account() {
run_test(test_fixture, |cachelayer, _adminclient| {
let mut rt = Runtime::new().expect("Failed to start tokio");
let rt = Runtime::new().expect("Failed to start tokio");
let fut = async move {
// Force offline. Show we have no account
cachelayer.mark_offline().await;
@ -259,7 +258,7 @@ fn test_cache_account() {
#[test]
fn test_cache_group() {
run_test(test_fixture, |cachelayer, _adminclient| {
let mut rt = Runtime::new().expect("Failed to start tokio");
let rt = Runtime::new().expect("Failed to start tokio");
let fut = async move {
// Force offline. Show we have no groups.
cachelayer.mark_offline().await;
@ -328,7 +327,7 @@ fn test_cache_group() {
#[test]
fn test_cache_group_delete() {
run_test(test_fixture, |cachelayer, mut adminclient| {
let mut rt = Runtime::new().expect("Failed to start tokio");
let rt = Runtime::new().expect("Failed to start tokio");
let fut = async move {
// get the group
cachelayer.attempt_online().await;
@ -367,7 +366,7 @@ fn test_cache_group_delete() {
#[test]
fn test_cache_account_delete() {
run_test(test_fixture, |cachelayer, mut adminclient| {
let mut rt = Runtime::new().expect("Failed to start tokio");
let rt = Runtime::new().expect("Failed to start tokio");
let fut = async move {
// get the account
cachelayer.attempt_online().await;
@ -413,7 +412,7 @@ fn test_cache_account_delete() {
#[test]
fn test_cache_account_password() {
run_test(test_fixture, |cachelayer, mut adminclient| {
let mut rt = Runtime::new().expect("Failed to start tokio");
let rt = Runtime::new().expect("Failed to start tokio");
let fut = async move {
cachelayer.attempt_online().await;
// Test authentication failure.
@ -510,7 +509,7 @@ fn test_cache_account_password() {
#[test]
fn test_cache_account_pam_allowed() {
run_test(test_fixture, |cachelayer, mut adminclient| {
let mut rt = Runtime::new().expect("Failed to start tokio");
let rt = Runtime::new().expect("Failed to start tokio");
let fut = async move {
cachelayer.attempt_online().await;
@ -547,7 +546,7 @@ fn test_cache_account_pam_allowed() {
#[test]
fn test_cache_account_pam_nonexist() {
run_test(test_fixture, |cachelayer, _adminclient| {
let mut rt = Runtime::new().expect("Failed to start tokio");
let rt = Runtime::new().expect("Failed to start tokio");
let fut = async move {
cachelayer.attempt_online().await;
@ -584,7 +583,7 @@ fn test_cache_account_pam_nonexist() {
#[test]
fn test_cache_account_expiry() {
run_test(test_fixture, |cachelayer, mut adminclient| {
let mut rt = Runtime::new().expect("Failed to start tokio");
let rt = Runtime::new().expect("Failed to start tokio");
let fut = async move {
cachelayer.attempt_online().await;
assert!(cachelayer.test_connection().await);
@ -659,7 +658,7 @@ fn test_cache_account_expiry() {
#[test]
fn test_cache_nxcache() {
run_test(test_fixture, |cachelayer, mut _adminclient| {
let mut rt = Runtime::new().expect("Failed to start tokio");
let rt = Runtime::new().expect("Failed to start tokio");
let fut = async move {
cachelayer.attempt_online().await;
assert!(cachelayer.test_connection().await);

View file

@ -38,8 +38,8 @@ fernet = { git = "https://github.com/mozilla-services/fernet-rs.git" }
async-std = "1.6"
log = "0.4"
env_logger = "0.7"
rand = "0.7"
env_logger = "0.8"
rand = "0.8"
toml = "0.5"
chrono = "0.4"
@ -48,9 +48,9 @@ lazy_static = "1.2.0"
futures = "0.3"
futures-util = "0.3"
tokio = { version = "0.2", features = ["rt-threaded", "macros", "rt-util", "sync", "time", "net", "io-util", "signal"] }
tokio-util = { version = "0.3", features = ["codec"] }
tokio-openssl = "0.4"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.6", features = ["codec"] }
tokio-openssl = "0.6"
openssl = "0.10"
uuid = { version = "0.8", features = ["serde", "v4" ] }
@ -67,7 +67,7 @@ r2d2_sqlite = "0.16"
structopt = { version = "0.3", default-features = false }
time = { version = "0.2", features = ["serde", "std"] }
hashbrown = "0.8"
hashbrown = "0.9"
concread = "^0.2.5"
# concread = { version = "^0.2.5", features = ["simd_support"] }
# concread = { path = "../../concread" }
@ -76,23 +76,23 @@ concread = "^0.2.5"
sshkeys = "0.3"
rpassword = "4.0"
rpassword = "5.0"
num_cpus = "1.10"
idlset = { version = "0.1" , features = ["use_smallvec"] }
# idlset = { path = "../../idlset", features = ["use_smallvec"] }
zxcvbn = "2.0"
base64 = "0.12"
base64 = "0.13"
ldap3_server = "0.1"
# ldap3_server = { path = "../../ldap3_server" }
webauthn-rs = "0.3.0-alpha.1"
webauthn-rs = "0.3.0-alpha.5"
# webauthn-rs = { path = "../../webauthn-rs" }
libc = "0.2"
users = "0.10"
users = "0.11"
smartstring = { version = "0.2", features = ["serde"] }
@ -102,7 +102,7 @@ smartstring = { version = "0.2", features = ["serde"] }
[dev-dependencies]
criterion = "0.3"
# For testing webauthn
webauthn-authenticator-rs = "0.3.0-alpha.1"
webauthn-authenticator-rs = "0.3.0-alpha.5"
# webauthn-authenticator-rs = { path = "../../webauthn-authenticator-rs" }
[dev-dependencies.cargo-husky]

View file

@ -9,13 +9,15 @@ RUN zypper -vv ref && \
gcc \
clang lld \
make automake autoconf \
libopenssl-devel pam-devel && \
libopenssl-devel pam-devel \
sccache && \
zypper clean -a
COPY . /usr/src/kanidm
WORKDIR /usr/src/kanidm/kanidmd
ARG SCCACHE_REDIS
RUN ln -s -f /usr/bin/clang /usr/bin/cc && \
ln -s -f /usr/bin/ld.lld /usr/bin/ld
@ -25,10 +27,22 @@ RUN if [ "$(uname -m)" == "x86_64" ]; \
if [ "$(uname -m)" == "aarch64" ]; \
then export RUSTFLAGS=''; \
fi; \
if [ "${SCCACHE_REDIS}" != "" ]; \
then export CC="/usr/bin/sccache /usr/bin/clang"; export RUSTC_WRAPPER=sccache; sccache --start-server; \
else export CC="/usr/bin/clang"; \
fi; \
echo $RUSTC_WRAPPER; \
echo $RUSTFLAGS; \
CC=/usr/bin/clang RUSTC_BOOTSTRAP=1 \
echo $CC; \
RUSTC_BOOTSTRAP=1 \
cargo build --features=concread/simd_support,libsqlite3-sys/bundled \
--release
--release; \
if [ "${SCCACHE_REDIS}" != "" ]; \
then sccache -s; \
fi; \
echo $RUSTFLAGS; \
echo $CC; \
echo $RUSTC_WRAPPER;
FROM ${BASE_IMAGE}
LABEL mantainer william@blackhats.net.au

View file

@ -1,10 +1,13 @@
use crate::actors::v1_read::{LdapRequestMessage, QueryServerReadV1};
use crate::ldap::{LdapBoundToken, LdapResponseState};
use openssl::ssl::{SslAcceptor, SslAcceptorBuilder};
use core::pin::Pin;
use openssl::ssl::{Ssl, SslAcceptor, SslAcceptorBuilder};
use tokio_openssl::SslStream;
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
// use ldap3_server::simple::*;
// use ldap3_server::proto::LdapMsg;
use ldap3_server::LdapCodec;
// use std::convert::TryFrom;
use std::marker::Unpin;
@ -12,6 +15,7 @@ use std::net;
use std::str::FromStr;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpListener;
// use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_util::codec::{FramedRead, FramedWrite};
use uuid::Uuid;
@ -91,26 +95,30 @@ async fn client_process<W: AsyncWrite + Unpin, R: AsyncRead + Unpin>(
}
};
}
// We now are leaving, so any cleanup done here.
}
async fn tls_acceptor(
mut listener: TcpListener,
listener: TcpListener,
tls_parms: SslAcceptor,
qe_r_ref: &'static QueryServerReadV1,
) {
// Do we need to do the silly ssl leak?
loop {
match listener.accept().await {
Ok((tcpstream, paddr)) => {
let res = tokio_openssl::accept(&tls_parms, tcpstream).await;
let tlsstream = match res {
Ok(ts) => ts,
// From the parms we need to create an SslContext.
let mut tlsstream = match Ssl::new(tls_parms.context())
.and_then(|tls_obj| SslStream::new(tls_obj, tcpstream))
{
Ok(ta) => ta,
Err(e) => {
error!("tls handshake error, continuing -> {:?}", e);
error!("tls setup error, continuing -> {:?}", e);
continue;
}
};
if let Err(e) = SslStream::accept(Pin::new(&mut tlsstream)).await {
error!("tls accept error, continuing -> {:?}", e);
continue;
};
let (r, w) = tokio::io::split(tlsstream);
let r = FramedRead::new(r, LdapCodec);
let w = FramedWrite::new(w, LdapCodec);
@ -123,7 +131,7 @@ async fn tls_acceptor(
}
}
async fn acceptor(mut listener: TcpListener, qe_r_ref: &'static QueryServerReadV1) {
async fn acceptor(listener: TcpListener, qe_r_ref: &'static QueryServerReadV1) {
loop {
match listener.accept().await {
Ok((tcpstream, paddr)) => {

View file

@ -786,7 +786,9 @@ mod tests {
Ok(AuthState::Success(_)) => {}
_ => panic!(),
};
assert!(async_rx.try_recv().is_err());
drop(async_tx);
assert!(async_rx.blocking_recv().is_none());
audit.write_log();
}
@ -979,7 +981,8 @@ mod tests {
};
}
assert!(async_rx.try_recv().is_err());
drop(async_tx);
assert!(async_rx.blocking_recv().is_none());
audit.write_log();
}
@ -1104,8 +1107,8 @@ mod tests {
}
// Check the async counter update was sent.
match async_rx.try_recv() {
Ok(DelayedAction::WebauthnCounterIncrement(_)) => {}
match async_rx.blocking_recv() {
Some(DelayedAction::WebauthnCounterIncrement(_)) => {}
_ => assert!(false),
}
@ -1174,7 +1177,8 @@ mod tests {
};
}
assert!(async_rx.try_recv().is_err());
drop(async_tx);
assert!(async_rx.blocking_recv().is_none());
audit.write_log();
}
}

View file

@ -30,24 +30,22 @@ use crate::idm::delayed::{
use kanidm_proto::v1::OperationError;
use kanidm_proto::v1::RadiusAuthToken;
// use kanidm_proto::v1::TOTPSecret as ProtoTOTPSecret;
use kanidm_proto::v1::SetCredentialResponse;
use kanidm_proto::v1::UnixGroupToken;
use kanidm_proto::v1::UnixUserToken;
// use std::sync::Arc;
// use crossbeam::channel::{unbounded, Sender, Receiver, TryRecvError};
#[cfg(test)]
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::mpsc::{
unbounded_channel as unbounded, UnboundedReceiver as Receiver, UnboundedSender as Sender,
};
use tokio::sync::Semaphore;
// SemaphorePermit
use async_std::task;
#[cfg(test)]
use core::task::{Context, Poll};
#[cfg(test)]
use futures::task as futures_task;
use concread::bptree::{BptreeMap, BptreeMapWriteTxn};
use concread::hashmap::HashMap;
use rand::prelude::*;
@ -262,15 +260,23 @@ impl IdmServer {
impl IdmServerDelayed {
#[cfg(test)]
pub fn is_empty_or_panic(&mut self) {
assert!(self.async_rx.try_recv().is_err());
let waker = futures_task::noop_waker();
let mut cx = Context::from_waker(&waker);
match self.async_rx.poll_recv(&mut cx) {
Poll::Pending | Poll::Ready(None) => {}
Poll::Ready(Some(_m)) => panic!("Task queue not empty"),
}
}
#[cfg(test)]
pub(crate) fn try_recv(&mut self) -> Result<DelayedAction, OperationError> {
self.async_rx.try_recv().map_err(|e| match e {
TryRecvError::Empty => OperationError::InvalidState,
TryRecvError::Closed => OperationError::QueueDisconnected,
})
let waker = futures_task::noop_waker();
let mut cx = Context::from_waker(&waker);
match self.async_rx.poll_recv(&mut cx) {
Poll::Pending => Err(OperationError::InvalidState),
Poll::Ready(None) => Err(OperationError::QueueDisconnected),
Poll::Ready(Some(m)) => Ok(m),
}
}
pub(crate) async fn process_all(&mut self, server: &'static QueryServerWriteV1) {

View file

@ -791,7 +791,12 @@ impl QueryServer {
pub async fn read_async(&self) -> QueryServerReadTransaction<'_> {
// We need to ensure a db conn will be available
let db_ticket = self.db_tickets.acquire().await;
#[allow(clippy::expect_used)]
let db_ticket = self
.db_tickets
.acquire()
.await
.expect("unable to aquire db_ticket for qsr");
QueryServerReadTransaction {
be_txn: self.be.read(),
@ -808,10 +813,20 @@ impl QueryServer {
}
pub async fn write_async(&self, ts: Duration) -> QueryServerWriteTransaction<'_> {
// We need to ensure a db conn will be available
let db_ticket = self.db_tickets.acquire().await;
// Guarantee we are the only writer on the thread pool
let write_ticket = self.write_ticket.acquire().await;
#[allow(clippy::expect_used)]
let write_ticket = self
.write_ticket
.acquire()
.await
.expect("unable to aquire writer_ticket for qsw");
// We need to ensure a db conn will be available
#[allow(clippy::expect_used)]
let db_ticket = self
.db_tickets
.acquire()
.await
.expect("unable to aquire db_ticket for qsw");
// let schema_write = self.schema.write().await;
let schema_write = self.schema.write();

View file

@ -38,13 +38,25 @@ pub fn password_from_random() -> String {
}
pub fn readable_password_from_random() -> String {
let trng = thread_rng();
let mut trng = thread_rng();
format!(
"{}-{}-{}-{}",
trng.sample_iter(&DistinctAlpha).take(4).collect::<String>(),
trng.sample_iter(&DistinctAlpha).take(4).collect::<String>(),
trng.sample_iter(&DistinctAlpha).take(4).collect::<String>(),
trng.sample_iter(&DistinctAlpha).take(4).collect::<String>(),
(&mut trng)
.sample_iter(&DistinctAlpha)
.take(4)
.collect::<String>(),
(&mut trng)
.sample_iter(&DistinctAlpha)
.take(4)
.collect::<String>(),
(&mut trng)
.sample_iter(&DistinctAlpha)
.take(4)
.collect::<String>(),
(&mut trng)
.sample_iter(&DistinctAlpha)
.take(4)
.collect::<String>(),
)
}