WIP: Routine Unresponsive due to Deadlock #51

Draft
modulatingforce wants to merge 10 commits from issue-routine-lock into routines-functionality
10 changed files with 1555 additions and 111 deletions

View file

@ -1,4 +1,7 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
[env]
# Based on https://doc.rust-lang.org/cargo/reference/config.html
OtherBots = "Supibot,buttsbot,PotatBotat,StreamElements,yuumeibot"

1122
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@ edition = "2021"
[dependencies]
dotenv = "0.15.0"
tokio = { version = "1.33.0", features = ["full"] }
tokio = { version = "1.33.0", features = ["full", "tracing"] }
twitch-irc = "5.0.1"
rand = { version = "0.8.5", features = [] }
futures = "0.3"
@ -15,6 +15,8 @@ async-trait = "0.1.77"
async-recursion = "1.1.0"
casual_logger = "0.6.5"
chrono = "0.4.35"
tokio-console = "0.1.10"
console-subscriber = "0.2.0"
Review

@notohh & @mzntori - these two crates I added tokio-console and console-subscriber created very large changes in Cargo.lock . Do you see this as an issue?

I'm going to shrug it off cuz LULE I don't know - but this is really helpful

If you're not sure or you may need time to look, I'll just shrug this off for now and say let's add

More information on these crates:

@notohh & @mzntori - these two crates I added `tokio-console` and `console-subscriber` created very large changes in `Cargo.lock` . **Do you see this as an issue?** I'm going to shrug it off cuz LULE I don't know - but this is really helpful If you're not sure or you may need time to look, I'll just shrug this off for now and say let's add More information on these crates: - https://git.flake.sh/modulatingforce/forcebot_rs/pulls/51#issuecomment-973
[lib]

View file

@ -12,12 +12,18 @@ pub type BotAR = Arc<RwLock<BotInstance>>;
pub type ActAR = Arc<RwLock<BotAction>>;
pub type RoutineAR = Arc<RwLock<Routine>>;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ExecBodyParams {
pub bot : BotAR,
pub msg : PrivmsgMessage,
pub parent_act : Option<ActAR> ,
pub curr_act : ActAR ,
pub curr_act : Option<ActAR> ,
}
#[derive(Debug,Eq,PartialEq,Hash)]
pub enum ParamLevel {
Parent,
Current,
}
@ -27,8 +33,8 @@ impl ExecBodyParams {
// pub async fn get_parent_module(&self) -> BotModule {
pub async fn get_module(&self) -> BotModule {
let curr_act = Arc::clone(&self.curr_act);
let parent_act_lock = curr_act.read().await;
let curr_acta = Arc::clone(&(self.curr_act.clone().unwrap()));
let parent_act_lock = curr_acta.read().await;
let act = &(*parent_act_lock);
match act {
BotAction::C(c) => {
@ -50,16 +56,45 @@ impl ExecBodyParams {
}
}
pub async fn get_channel(&self) -> Option<Channel> {
// pub async fn get_channel(&self) -> Option<Channel> {
pub fn get_channel(&self) -> Option<Channel> {
// THIS IS INCORRECT - BELOW MAY BE PULLING THE PARENT BOTACTION
// NOT THE CURRENT BOT ACTION
dbg!("Core > ExecBodyParams > GetChannels START");
dbg!("!! [x] Document - After SUCCESS message was sent to chat");
dbg!(">> BotActionAR - RwLock from botmodules.rs::929:46 - current_readers = 1 ");
dbg!(">> RoutineAR - RwLock from botmodules.rs::1261:32 - current_readers = 1 ");
dbg!(">> join_handle - RwLock from botmodules.rs::1226:46 - current_readers = 0 ");
dbg!(">> BotInstanceAR - RwLock from botinstance.rs::150:28 - current_readers = 2");
let curr_act = Arc::clone(&self.curr_act);
let parent_act_lock = curr_act.read().await;
let act = &(*parent_act_lock);
match act {
let curr_act = Arc::clone(&self.curr_act.clone().unwrap());
// let curr_act_lock = curr_act.read().await;
// Note : The below `curr_act.blocking_read()` is not possible. I get the following
// during runtime in this areas
/*
thread 'tokio-runtime-worker' panicked at src\core\bot_actions.rs:66:38:
Cannot block the current thread from within a runtime. This happens because a function attempted to block the current thread while the thread is being used to drive asynchronous tasks.
*/
let curr_act_lock = curr_act.blocking_read();
dbg!("Core > ExecBodyParams > After Creating ExecBodyParams.current_act.read() Guard ");
dbg!("!! [x] Document - After SUCCESS message was sent to chat");
dbg!(">> BotActionAR - RwLock from botmodules.rs::929:46 - current_readers = 1 ");
dbg!(">> RoutineAR - RwLock from botmodules.rs::1261:32 - current_readers = 1 ");
dbg!(">> join_handle - RwLock from botmodules.rs::1226:46 - current_readers = 0 ");
dbg!(">> BotInstanceAR - RwLock from botinstance.rs::150:28 - current_readers = 2");
let act = &(*curr_act_lock);
dbg!("Core > ExecBodyParams > Using the Read Guard ");
dbg!("!! [x] Document - After SUCCESS message was sent to chat");
dbg!(">> BotActionAR - RwLock from botmodules.rs::929:46 - current_readers = 1 ");
dbg!(">> RoutineAR - RwLock from botmodules.rs::1261:32 - current_readers = 1");
dbg!(">> join_handle - RwLock from botmodules.rs::1226:46 - current_readers = Not Listed ");
dbg!(">> BotInstanceAR - RwLock from botinstance.rs::150:28 - current_readers = 2");
let out = match act {
BotAction::C(_) => {
// let temp = c.module.clone();
// Some(temp)
@ -74,10 +109,35 @@ impl ExecBodyParams {
BotAction::R(r) => {
// let temp = r.module.clone();
// Some(temp)
Some(r.read().await.channel.clone())
dbg!("Core > ExecBodyParams > GetChannels - routine identified");
dbg!("!! [x] Document");
dbg!(">> BotActionAR - RwLock from botmodules.rs::930:46 - current_readers = 2");
dbg!(">> RoutineAR - RwLock from botmodules.rs::1262:32 - current_readers = 1");
dbg!(">> join_handle - RwLock from botmodules.rs::1226:46 - current_readers = 0");
dbg!(">> BotInstanceAR - RwLock from botinstance.rs::150:28 - current_readers = 1");
// => 03.30 - Just before deadlock
// dbg!("ISSUE : RoutineAR - RwLock from botmodules.rs::1261:32 - current_readers = 1");
// let out = Some(r.read().await.channel.clone());
dbg!("ISSUE > Never makes it after the following read guard");
// let guard = r.read().await;
let guard = r.blocking_read();
dbg!("ISSUE RESOLVED > If passed htis point");
let channel = guard.channel.clone();
drop(guard);
let out = Some(channel);
// => 03.30 - This isn't reached because of the Deadlock
dbg!(">> Just after Potential Deadlock Lock");
out
}
// _ => None
}
};
out
}

View file

@ -40,7 +40,7 @@ pub struct Channel(pub String);
use super::bot_actions::ExecBodyParams;
use super::botmodules::StatusType;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct BotManagers {
pub identity: Arc<RwLock<IdentityManager>>,
pub chat: Chat,
@ -70,6 +70,7 @@ impl<T: Clone> ArcBox<T> {
}
}
#[derive(Debug)]
pub struct BotInstance {
pub prefix: char,
pub bot_channel: Channel,
@ -405,7 +406,7 @@ impl BotInstance {
bot : Arc::clone(&bot),
msg : (*msg).clone(),
parent_act : None,
curr_act : Arc::clone(&act_clone),
curr_act : Some(Arc::clone(&act_clone)),
};
// When sending a BotMsgTypeNotif, send_botmsg does Roles related validation as required
@ -463,7 +464,7 @@ impl BotInstance {
bot : Arc::clone(&bot),
msg : (*msg).clone(),
parent_act : None,
curr_act : Arc::clone(&act_clone),
curr_act : Some(Arc::clone(&act_clone)),
};
@ -494,7 +495,7 @@ impl BotInstance {
bot : Arc::clone(&bot),
msg : (*msg).clone(),
parent_act : None,
curr_act : Arc::clone(&act_clone),
curr_act : Some(Arc::clone(&act_clone)),
};
@ -520,7 +521,7 @@ impl BotInstance {
bot : a,
msg : msg.clone() ,
parent_act : None,
curr_act : Arc::clone(&act_clone),
curr_act : Some(Arc::clone(&act_clone)),
}).await;
botlog::trace(
@ -569,7 +570,7 @@ impl BotInstance {
bot : a,
msg : msg.clone() ,
parent_act : None,
curr_act : Arc::clone(&act_clone),
curr_act : Some(Arc::clone(&act_clone)),
} ).await;
}

View file

@ -28,6 +28,7 @@ use core::panic;
use std::borrow::Borrow;
use std::borrow::BorrowMut;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::ops::DerefMut;
use std::sync::Arc;
@ -56,6 +57,7 @@ use crate::core::bot_actions;
use std::hash::{Hash, Hasher};
use super::bot_actions::ActAR;
use super::bot_actions::ParamLevel;
use super::bot_actions::RoutineAR;
use super::identity::ChatBadge;
@ -470,6 +472,7 @@ pub enum StatusType {
Disabled(StatusLvl),
}
#[derive(Debug)]
pub enum BotAction {
C(BotCommand),
L(Listener),
@ -505,6 +508,7 @@ pub struct BotCommand {
impl BotCommand {
pub async fn execute(&self, params : ExecBodyParams) {
// This is how BotCommand implements their Working exec_body
(*self.exec_body)(params).await;
}
}
@ -532,6 +536,12 @@ impl BotActionTrait for BotCommand {
}
}
impl Debug for BotCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?} {:?} {:?} {:?} {:?}", self.module, self.command, self.alias, self.help, self.required_roles)
Review

Holy I love this idea! 🤤 with the others too!

Holy I love this idea! 🤤 with the others too!
}
}
pub struct Listener {
pub module: BotModule,
pub name: String,
@ -579,6 +589,12 @@ impl BotActionTrait for Listener {
}
}
impl Debug for Listener {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?} {:?} {:?}", self.module, self.name, self.help)
}
}
// #[derive(Debug, PartialEq, Eq, Hash, Clone)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub enum RoutineAttr {
@ -617,6 +633,7 @@ pub enum RoutineAttr {
*/
// For some key statuses and in particular Stopping to Gracefully stop
#[derive(Debug)]
pub enum RoutineSignal {
Stopping, // Gracefully Stopping
Stopped, // When cancelling or aborting, this also is represented by Stopped
@ -624,13 +641,18 @@ pub enum RoutineSignal {
NotStarted,
}
// #[derive(Debug)]
pub type ExecBodyParamsAr = Arc<RwLock<ExecBodyParams>>;
pub struct Routine {
pub name : String ,
pub module : BotModule , // from() can determine this if passed parents_params
pub channel : Channel , // Requiring some channel context
exec_body: bot_actions::actions_util::ExecBody,
pub parent_params : ExecBodyParams ,
// exec_body: Arc<RwLock<bot_actions::actions_util::ExecBody>>,
// exec_body : fn(ExecBodyParams),
Review

Code Fix Involved :

  • Within Blocking Non-Async functions, when accessing Arc<RwLock<BotAction>> , use blocking_read() & blocking_write()
  • ExecBodyParams::get_channel() be defined as non-async
  • Routine definition involve :
    • exec_body : fn(Arc<RwLock<ExecBodyParams>>)
    • parent_params : Arc<RwLock<ExecBodyParams>>
  • Routine::start() involves a tokio::task::spawn_blocking execute the loopbody()
### Code Fix Involved : - [x] Within Blocking Non-Async functions, when accessing `Arc<RwLock<BotAction>>` , use `blocking_read()` & `blocking_write()` - [x] `ExecBodyParams::get_channel()` be defined as non-async - [x] `Routine` definition involve : - [x] `exec_body : fn(Arc<RwLock<ExecBodyParams>>)` - [x] `parent_params : Arc<RwLock<ExecBodyParams>>` - [x] `Routine::start()` involves a `tokio::task::spawn_blocking` execute the `loopbody()`
exec_body : fn(ExecBodyParamsAr),
// pub parent_params : ExecBodyParams ,
// pub parent_params : Arc<RwLock<ExecBodyParams>> ,
pub params : HashMap<ParamLevel,ExecBodyParamsAr>,
pub join_handle : Option<Arc<RwLock<JoinHandle<RoutineAR>>>> ,
start_time : Option<DateTime<Local>> ,
pub complete_iterations : i64 ,
@ -641,6 +663,23 @@ pub struct Routine {
pub self_act_ar : Option<ActAR> ,
}
// implement Debug manually witouth `exec_body` since you cant debug `ExecBody`.
impl Debug for Routine {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.name)?;
write!(f, "{:?}", self.module)?;
write!(f, "{:?}", self.channel)?;
write!(f, "{:?}", self.params)?;
write!(f, "{:?}", self.join_handle)?;
write!(f, "{:?}", self.start_time)?;
write!(f, "{:?}", self.complete_iterations)?;
write!(f, "{:?}", self.remaining_iterations)?;
write!(f, "{:?}", self.routine_attr)?;
write!(f, "{:?}", self.internal_signal)?;
write!(f, "{:?}", self.self_routine_ar)?;
write!(f, "{:?}", self.self_act_ar)
}
}
impl Routine {
@ -864,8 +903,9 @@ impl Routine {
module : BotModule ,
channel : Channel,
routine_attr : Vec<RoutineAttr> ,
exec_body : bot_actions::actions_util::ExecBody ,
parent_params : ExecBodyParams
// exec_body : Arc<RwLock<bot_actions::actions_util::ExecBody>> ,
exec_body : fn(Arc<RwLock<ExecBodyParams>>) ,
parent_params : Arc<RwLock<ExecBodyParams>>
) -> Result<
Arc<RwLock<Routine>>,
String
@ -875,12 +915,27 @@ impl Routine {
Routine::validate_attr(&routine_attr).await?;
let mut params = HashMap::new();
params.insert(ParamLevel::Parent,parent_params.clone());
let pparam_clone = parent_params.clone();
let parent_guard = pparam_clone.read().await;
let curr_params = ExecBodyParams {
bot : parent_guard.bot.clone(),
msg : parent_guard.msg.clone(),
parent_act : parent_guard.curr_act.clone() ,
curr_act : None,
};
params.insert(ParamLevel::Current,Arc::new(RwLock::new(curr_params)));
let routine_ar = Arc::new(RwLock::new(Routine {
name ,
module ,
channel ,
exec_body ,
parent_params ,
// parent_params ,
params,
join_handle : None ,
start_time : None ,
complete_iterations : 0 ,
@ -897,6 +952,8 @@ impl Routine {
// 2. Update the current self_act_ar
mut_lock.self_act_ar = Some(Arc::new(RwLock::new(BotAction::R(routine_ar.clone()))));
mut_lock.params.get(&ParamLevel::Current).unwrap().write().await.curr_act = Some(Arc::new(RwLock::new(BotAction::R(routine_ar.clone()))));
Ok(routine_ar.clone())
// return Ok(Arc::new(RwLock::new(Routine {
@ -982,7 +1039,7 @@ impl Routine {
Some(format!(
"Routine > start() > (In Tokio Spawn)",
)),
Some(&trg_routine_ar.read().await.parent_params.msg),
Some(&trg_routine_ar.read().await.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -995,7 +1052,7 @@ impl Routine {
Some(format!(
"Routine > start() > (In Tokio Spawn)",
)),
Some(&trg_routine_ar.read().await.parent_params.msg),
Some(&trg_routine_ar.read().await.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1078,7 +1135,7 @@ impl Routine {
"Routine > start() > (In Tokio Spawn) > {:?}",
trg_routine_ar.read().await.module
)),
Some(&trg_routine_ar.read().await.parent_params.msg),
Some(&trg_routine_ar.read().await.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
if let Some(dur) = delayduration {
@ -1087,6 +1144,8 @@ impl Routine {
{ // [x] Loop Initialization - Prior to Loop that calls Custom Routine Execution Body
let mut a = trg_routine_ar.write().await;
a.start_time = Some(chrono::offset::Local::now());
@ -1097,15 +1156,33 @@ impl Routine {
{
a.remaining_iterations = Some(iternum);
}
drop(a);
}
loop { // [x] Routine loop
// [x] execution body
// trg_routine_ar.read().await.loopbody().await;
{
trg_routine_ar.write().await.loopbody().await;
let trg_routine_ar_clone = trg_routine_ar.clone();
// trg_routine_ar.write().await.loopbody().await;
let loopbodyspawn = tokio::task::spawn_blocking( move || {
// let routine_ar_clone = Arc::clone(&trg_routine_ar);
// trg_routine_ar
// trg_routine_ar_clone.blocking_write().loopbody();
let guard1 = trg_routine_ar_clone.blocking_read();
guard1.loopbody();
drop(guard1);
});
loopbodyspawn.await.unwrap();
// trg_routine_ar.write().await.loopbody()
}
@ -1124,6 +1201,7 @@ impl Routine {
if i > 0 { a.remaining_iterations = Some(i-1) ; }
else { break ; } // if remaining iterations is 0, exit
}
drop(a);
}
@ -1152,6 +1230,8 @@ impl Routine {
}
dbg!("SUCCESS! Routinecompleted");
botlog::trace(
format!(
@ -1164,7 +1244,7 @@ impl Routine {
"Routine > start() > (In Tokio Spawn) > {:?}",
trg_routine_ar.read().await.module
)),
Some(&trg_routine_ar.read().await.parent_params.msg),
Some(&trg_routine_ar.read().await.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
botlog::trace(
@ -1179,7 +1259,7 @@ impl Routine {
"Routine > start() > (In Tokio Spawn) > {:?}",
trg_routine_ar.read().await.module
)),
Some(&trg_routine_ar.read().await.parent_params.msg),
Some(&trg_routine_ar.read().await.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1190,18 +1270,31 @@ impl Routine {
{ // Recommendation to ensure a clean update is to use one write() lock that was awaited
// - We can isolate the write lock by ensuring it's in it's own block
let mut lock = trg_routine_arout.write().await;
// => 03.30 - involved with Problem Deadlock Current Readers = 0
let mut lock = trg_routine_arout.write().await;
lock.join_handle = Some(Arc::new(RwLock::new(join_handle)));
lock.internal_signal = RoutineSignal::Started;
drop(lock); // => 03.30 - added this to try to address Deadlock issue
}
trg_routine_arout.write().await.internal_signal = RoutineSignal::Started;
// trg_routine_arout.write().await.internal_signal = RoutineSignal::Started;
return Ok(trg_routine_arout);
}
async fn loopbody(&mut self)
// pub async fn execute(routine:&Routine, params : ExecBodyParams) {
// (routine.exec_body)(params).await;
// }
// fn (){
// }
// async fn loopbody(&mut self)
// fn loopbody(&mut self)
fn loopbody(&self)
// [x] => 03.27 - COMPLETED
{
botlog::trace(
@ -1213,20 +1306,93 @@ impl Routine {
);
Log::flush();
// => 03.30 - involved with Problem Deadlock Current Readers = 1
// - Below self appears to be a Arc<RwLock<&mut Routine>>
let self_ar = Arc::new(RwLock::new(self));
let self_ar = Arc::new(RwLock::new(self));
// => 03.31 - Temporarily removing the current_act / parrent_act functionality
/*
{
// let mutlock = self_ar.read().await;
let mutlock = self_ar.blocking_read();
mutlock.parent_params.blocking_write().parent_act = Some(mutlock.parent_params.blocking_read().curr_act.clone());
mutlock.parent_params.blocking_write().curr_act = mutlock.self_act_ar.to_owned().unwrap();
drop(mutlock); // => 03.30 - Added to address Deadlock issue
}
*/
dbg!("Core > Before Guards");
// `self_ar` is the routine you want to read in the execbody i think, but its used to call that execbody.
// So execbody waits for itself to finish.
// (self_ar.read().await.exec_body)(
// self_ar.read().await.parent_params.clone()
// ).await;
// let parent_params = {
// let guard1 = self_ar.blocking_read();
// let parent_params = guard1.params.get(&ParamLevel::Parent).unwrap().clone();
// drop(guard1);
// parent_params
// };
let curr_params = {
let guard1 = self_ar.blocking_read();
let curr_params = guard1.params.get(&ParamLevel::Current).unwrap().clone();
drop(guard1);
curr_params
};
dbg!("Core > Guarding and will Execute Child Execution Body");
dbg!(">> BotActionAR - RwLock from botmodules.rs::929:46 - current_readers = 0 ");
dbg!(">> RoutineAR - RwLock from botmodules.rs::1261:32 - current_readers = Not Listed");
dbg!(">> BotInstanceAR - RwLock from botinstance.rs::150:28 - current_readers = 3 ");
// {
// // The below starts the Read Lock that conlicts
// let guard2 = self_ar.read().await;
// dbg!("Core > Guarded & Executing Child Execution Body");
// dbg!("!! [x] Document ");
// dbg!(">> BotActionAR - RwLock from botmodules.rs::929:46 - current_readers = 0 ");
// dbg!(">> RoutineAR - RwLock from botmodules.rs::1261:32 - current_readers = 1");
// dbg!(">> BotInstanceAR - RwLock from botinstance.rs::150:28 - current_readers = 2 ");
// // this way seems to be having issues for Routine > Loopbody
// // in particular, because by this point we have a ReadGuard on Routine
// // somehow the read guard is conflicting with a read guard in the underlying
// // fn?
// (guard2.exec_body)(parent_params).await;
// drop(guard2);
// }
{
let mut mutlock = self_ar.write().await;
mutlock.parent_params.parent_act = Some(mutlock.parent_params.curr_act.clone());
mutlock.parent_params.curr_act = mutlock.self_act_ar.to_owned().unwrap();
let guard2 = self_ar.blocking_read();
let exec_body_ar = guard2.exec_body.clone();
drop(guard2);
// let guard3 = exec_body_ar.read().await;
// (guard3)(parent_params).await;
// drop(guard3);
// [ ] May need to do a blocking async of this?
exec_body_ar(curr_params);
}
(self_ar.read().await.exec_body)(
self_ar.read().await.parent_params.clone()
).await;
// (self_ar.read().await.exec_body)(
// parent_params
// ).await;
dbg!("Core > After Execution Body is completed");
// (self.exec_body)(
// self.parent_params.clone()
// ).await;
@ -1242,6 +1408,7 @@ impl Routine {
{
let mut self_lock = self_rw.write().await;
self_lock.internal_signal = RoutineSignal::Stopping;
drop(self_lock);
}
@ -1257,7 +1424,7 @@ impl Routine {
"Routine > stop() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.msg),
Some(&self_lock.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1302,6 +1469,7 @@ impl Routine {
{
let mut lock_mut = self_rw.write().await;
lock_mut.internal_signal = RoutineSignal::Stopped;
drop(lock_mut);
}
}
@ -1318,7 +1486,7 @@ impl Routine {
"Routine > cancel() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.msg),
Some(&self_lock.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1358,9 +1526,11 @@ impl Routine {
{
let mut self_lock = self_rw.write().await;
self_lock.cancel().await?;
drop(self_lock);
} else {
let mut self_lock = self_rw.write().await;
self_lock.stop().await?;
drop(self_lock);
}
Routine::start(self_rw.clone()).await?;
@ -1377,7 +1547,7 @@ impl Routine {
"Routine > restart() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.msg),
Some(&self_lock.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1411,7 +1581,7 @@ impl Routine {
"Routine > restart() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.msg),
Some(&self_lock.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1438,6 +1608,7 @@ impl Routine {
{
let mut self_lock = self_rw.write().await;
self_lock.routine_attr = routine_attr;
drop(self_lock);
}
// let self_rw = Arc::new(RwLock::new(self));
@ -1454,7 +1625,7 @@ impl Routine {
"Routine > restart() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.msg),
Some(&self_lock.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1472,6 +1643,7 @@ impl Routine {
type StatusdbEntry = (ModGroup, Vec<StatusType>);
type ModuleActions = Vec<Arc<RwLock<BotAction>>>;
#[derive(Debug)]
pub struct ModulesManager {
statusdb: Arc<RwLock<HashMap<BotModule, StatusdbEntry>>>,
pub botactions: Arc<RwLock<HashMap<BotModule, ModuleActions>>>,

View file

@ -27,7 +27,7 @@ use super::identity;
use async_recursion::async_recursion;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Chat {
pub ratelimiters: Arc<Mutex<HashMap<Channel, RateLimiter>>>, // used to limit messages sent per channel
pub client: TwitchIRCClient<TCPTransport<TLS>, StaticLoginCredentials>,

View file

@ -698,7 +698,7 @@ pub enum Permissible {
type UserRolesDB = HashMap<String, Arc<RwLock<Vec<UserRole>>>>;
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct IdentityManager {
special_roles_users: Arc<RwLock<UserRolesDB>>,
}

View file

@ -139,21 +139,40 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
// [ ] 1. Create a Routine & start a routine
let params_ar = Arc::new(RwLock::new(params));
// let parentmodule = params.get_parent_module().await;
let module = params.get_module().await;
let channel = params.get_channel().await;
let module = params_ar.read().await.get_module().await;
// let channel = params.get_channel().await;
// let channel = params.get_channel().await;
// let channel = params.get_channel();
let channel_task_grabber = tokio::task::spawn_blocking( move || {
let params_clone = params_ar.clone();
let channel = params_clone.blocking_read().get_channel();
drop(params_clone);
(channel,params_ar)
});
let (channel,params_ar) = channel_task_grabber.await.unwrap();
let routine_attr = vec![
// RoutineAttr::RunOnce
RoutineAttr::MaxIterations(5),
RoutineAttr::LoopDuration(Duration::from_secs(1))
];
// let exec_body = actions_util::asyncbox(rtestbody);
let exec_body = actions_util::asyncbox(innertester); // <-- 03.27 - when below is uncommented, this is throwing an issue
// let exec_body = actions_util::asyncbox(innertester); // <-- 03.27 - when below is uncommented, this is throwing an issue
let exec_body = innertester;
async fn innertester(params : ExecBodyParams) {
// async fn innertester(params : ExecBodyParams) {
fn innertester(params : Arc<RwLock<ExecBodyParams>>) {
{
let curract_guard = params.curr_act.read().await;
// let curract_guard = params.curr_act.read().await;
let paramguard1 = params.blocking_read();
let curract = paramguard1.curr_act.clone().unwrap();
let curract_guard = curract.blocking_read();
// let logmsg_botact = match *params.curr_act.read().await {
@ -168,16 +187,20 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
botlog::trace(
format!("Params > Curr_act type : {:?}", logmsg_botact).as_str(),
Some("Experiments003 > countdown_chnl()".to_string()),
Some(&params.msg),
Some(&params.blocking_read().msg),
);
Log::flush();
}
{
let bot = Arc::clone(&params.bot);
let botlock = bot.read().await;
let bot = Arc::clone(&params.blocking_read().bot);
// let botlock = bot.read().await;
let botlock = bot.blocking_read();
let curract_guard = params.curr_act.write().await;
// let curract_guard = params.curr_act.write().await;
let paramguard1 = params.blocking_read();
let curract = paramguard1.curr_act.clone().unwrap();
let curract_guard = curract.blocking_write();
// let routine_lock = arr.write().await;
@ -206,10 +229,13 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
// println!("Remaining iterations > {}",a)
// }
let iterleft;
{
let routine_lock = arr.write().await;
let a = routine_lock.remaining_iterations;
println!("remaining iterations : {:?}", a);
// let routine_lock = arr.write().await;
// let routine_lock = arr.blocking_write();
let routine_lock = arr.blocking_read();
iterleft = routine_lock.remaining_iterations.unwrap_or(0);
println!("remaining iterations : {:?}", iterleft);
}
botlog::trace(
@ -244,26 +270,44 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
let chosen_channel = pick_a_channel(joinedchannels);
dbg!("SUCCESS", chosen_channel.clone());
botlog::trace(
format!("Picked a channel: {:?}", chosen_channel).as_str(),
Some("Experiments003 > countdown_chnl()".to_string()),
Some(&params.msg),
Some(&params.blocking_read().msg),
);
Log::flush();
// [ ] !! ISSUE : With this fix though, the custom fn is no longer
// async. This is an issue because chat module is async
// - There should be no need to change chat , as async allows
// us to multitask with messages
Review

Issue with the Fix

If child Routine fn are defined as regular fn rather than async , Custom Routine fn would not be able to call Chat.say() or similar functionality

Here for example, in a Custom Routine fn innertester(params : Arc<RwLock<ExecBodyParams>> , we can't call chat.say() in this area

### Issue with the Fix If child Routine `fn` are defined as regular `fn` rather than `async` , Custom Routine `fn` would not be able to call `Chat.say()` or similar functionality Here for example, in a Custom Routine `fn innertester(params : Arc<RwLock<ExecBodyParams>>` , we can't call `chat.say()` in this area
Review

Honestly I'm favoring keeping the fox and maybe enhancing Chat so it has non-async calls for say().

Keep in mind in implementing, we cannot call async calls in a blocking context. This means if I have a non async code block, code calls (e.g. fn) cannot be awaited on. In other words in non Async fn, I cannot use client.send().await. There are ways around this though (later notes)

I don't foresee issues with multitasking here if we enhance Chat so users essentially enqueue "messages" (consisting of BotMsgType and ExecParams) then the bot has a separate tokio::task running that processes those messages (i.e., triggering the message using given twitch-irc call like client.say() )

In addition with in place, when custom module developers define a routine, they don't have to worry about locks as within the context of a custom built Routine (in theory) should be blocking

  • ofc we still need to be mindful of locks initiated and managed at core level

I'm thinking of using async-channel crate
https://docs.rs/async-channel/latest/async_channel/fn.bounded.html

Honestly I'm favoring keeping the fox and maybe enhancing `Chat` so it has non-async calls for `say()`. Keep in mind in implementing, we cannot call `async` calls in a blocking context. This means if I have a non `async` code block, code calls (e.g. `fn`) cannot be awaited on. In other words in non Async fn, I cannot use `client.send().await`. There are ways around this though (later notes) I don't foresee issues with multitasking here if we enhance `Chat` so users essentially enqueue "messages" (consisting of `BotMsgType` and `ExecParams`) then the bot has a separate `tokio::task` running that processes those messages (i.e., triggering the message using given `twitch-irc` call like `client.say()` ) In addition with in place, when custom module developers define a routine, they don't have to worry about locks as within the context of a custom built `Routine` (in theory) should be blocking - ofc we still need to be mindful of locks initiated and managed at `core` level --- I'm thinking of using `async-channel` crate https://docs.rs/async-channel/latest/async_channel/fn.bounded.html
// let outmsg = if iterleft == 1 {
// let a = || {
// let chosen_channel_ar = Arc::new(RwLock::new(chosen_channel));
// let params_clone = params.clone();
// let bot = Arc::clone(&params.blocking_read().bot);
// dbg!("in chat async function");
// let botlock = bot.blocking_read();
// let channel_ar_clone = chosen_channel_ar.clone();
// let outmsg = if iterleft <= 1 {
// format!("{} I love you uwu~",iterleft)
// } else { format!("{}",iterleft) };
// botlock.botmgrs.chat
// .say(
// // joinedchannels.choose(&mut rng).unwrap().0.clone(),
// chosen_channel.0.clone(),
// channel_ar_clone.read().await.0.clone(),
// outmsg,
// params.clone()
// ).await;
// params_clone.read().await.clone()
// ).await;
}
}
@ -279,8 +323,9 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
module,
channel.unwrap(),
routine_attr,
// Arc::new(RwLock::new(exec_body)),
exec_body,
params.clone()
Arc::new(RwLock::new(params_ar.clone().read().await.clone())),
).await {
let newr_ar = newr.clone();
// [ ] start the routine
@ -290,13 +335,13 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
botlog::debug(
"Successfully started",
Some("experiment003 > countdown_chnl()".to_string()),
Some(&params.msg),
Some(&params_ar.read().await.msg),
);
Log::flush();
let bot = Arc::clone(&params.bot);
let bot = Arc::clone(&params_ar.read().await.bot);
let botlock = bot.read().await;
@ -305,9 +350,9 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
.botmgrs
.chat
.say_in_reply_to(
&params.msg,
&params_ar.read().await.msg,
"Started Routine!".to_string(),
params.clone()
params_ar.read().await.clone()
).await;
// let jhandle = newr.clone().read().await.join_handle.clone().unwrap();
@ -357,26 +402,50 @@ async fn test3_body(params : ExecBodyParams) {
// [x] Get the module from params
let params_ar = Arc::new(RwLock::new(params));
// let parentmodule = params.get_parent_module().await;
let module = params.get_module().await;
let channel = params.get_channel().await;
let module = params_ar.read().await.get_module().await;
// let channel = params.get_channel().await;
// let channel = params.get_channel().await;
// let channel = params.get_channel();
let routine_attr = vec![
RoutineAttr::RunOnce
];
// let exec_body = actions_util::asyncbox(rtestbody);
let exec_body = actions_util::asyncbox(rtestbody); // <-- 03.27 - when below is uncommented, this is throwing an issue
// let exec_body = actions_util::asyncbox(rtestbody); // <-- 03.27 - when below is uncommented, this is throwing an issue
// let channel = params.get_channel();
let channel_task_grabber = tokio::task::spawn_blocking( move || {
let params_clone = params_ar.clone();
let channel = params_clone.blocking_read().get_channel();
drop(params_clone);
(channel,params_ar)
});
let (channel,params_ar) = channel_task_grabber.await.unwrap();
let exec_body = rtestbody;
// let parent_params = params.clone();
// let params_clone = params.clone();
async fn rtestbody(params : ExecBodyParams) {
// async fn rtestbody(params : ExecBodyParams) {
fn rtestbody(params : Arc<RwLock<ExecBodyParams>>) {
let guard = params.curr_act.read().await;
// let guard1 = params.curr_act.read().await;
let paramguard1 = params.blocking_read();
let curract = paramguard1.curr_act.clone().unwrap();
let guard1 = curract.blocking_read();
{
let logmsg_botact = match *guard {
let logmsg_botact = match *guard1 {
BotAction::C(_) => "command",
BotAction::R(_) => "routine",
BotAction::L(_) => "listener",
@ -385,14 +454,14 @@ async fn test3_body(params : ExecBodyParams) {
botlog::trace(
format!("Params > Curr_act type : {:?}", logmsg_botact).as_str(),
Some("Experiments003 > test3 command body".to_string()),
Some(&params.msg),
Some(&params.blocking_read().msg),
);
Log::flush();
}
{
let logmsg_botact = match *guard {
let logmsg_botact = match &*guard1 {
BotAction::C(_) => "command 2",
BotAction::R(_) => "routine 2",
BotAction::L(_) => "listener 2",
@ -401,19 +470,51 @@ async fn test3_body(params : ExecBodyParams) {
botlog::trace(
format!("Params > Curr_act type : {:?}", logmsg_botact).as_str(),
Some("Experiments003 > test3 command body".to_string()),
Some(&params.msg),
Some(&params.blocking_read().msg),
);
Log::flush();
}
// drop(guard1);
{
dbg!("Custom > within Child Custom fn - Before Critical area");
println!("Critical code area start"); // <= 03.29 - This is printed
if let BotAction::R(c) = &*guard {
println!("{:?}",c.read().await.channel);
}
println!("Critical code area end"); // <= 03.29 - ISSUE This is NOT printed
// if let BotAction::R(c) = &*guard {
// println!("{:?}",c.read().await.channel);
// }
// let routine_channel = if let BotAction::R(c) = &*guard1 {
// dbg!("Custom > within Child Custom fn - During Critical area > Routine Guard ");
// let routineguard = c.read().await;
// Some(routineguard.channel.clone());
// } else {
// None
// };
// let chnl = match &*guard1 {
// BotAction::R(arr) => {
// dbg!("Custom > within Child Custom fn - During Critical area > Before Routine Guard ");
// let routineguard = arr.read().await;
// dbg!("Custom > within Child Custom fn - During Critical area > After Routine Guard ");
// Some(routineguard.channel.clone())
// },
// BotAction::C(_) | BotAction::L(_) => None ,
// } ;
// let chnl = params.get_channel().await;
// let chnl = params.get_channel().await;
let chnl = params.blocking_read().get_channel();
dbg!("Custom > within Child Custom fn - after GetChannel");
println!("{:?}",chnl);
println!("Critical code area end"); // <= 03.29 - ISSUE This is NOT printed
dbg!("Custom > within Child Custom fn - Before Critical area");
// if let BotAction::R(arr) = &*params.curr_act.read().await {
// for curriter in 0..5 {
// println!("tester - Routine - Completed Iterations : {}",
@ -428,14 +529,14 @@ async fn test3_body(params : ExecBodyParams) {
}
let params_clone = params_ar.clone();
botlog::debug(
format!("RTESTBODY : module - {:?} ; channel - {:?}",
module,channel
).as_str(),
Some("experiment003 > test3_body".to_string()),
Some(&params.msg),
Some(&params_clone.read().await.msg),
);
@ -445,8 +546,9 @@ async fn test3_body(params : ExecBodyParams) {
module,
channel.unwrap(),
routine_attr,
// Arc::new(RwLock::new(exec_body)),
exec_body,
params.clone()
Arc::new(RwLock::new(params_clone.read().await.clone()))
).await;
@ -487,12 +589,12 @@ async fn test3_body(params : ExecBodyParams) {
rsltstr
).as_str(),
Some("experiment003 > test3_body".to_string()),
Some(&params.msg),
Some(&(params_ar.clone()).read().await.msg),
);
Log::flush();
let bot = Arc::clone(&params.bot);
let bot = Arc::clone(&params_ar.read().await.bot);
let botlock = bot.read().await;
@ -501,9 +603,9 @@ async fn test3_body(params : ExecBodyParams) {
.botmgrs
.chat
.say_in_reply_to(
&params.msg,
&params_ar.read().await.msg,
format!("Routine Result : {:?}",rsltstr),
params.clone()
params_clone.read().await.clone()
).await;
// [x] Will not be handling JoinHandles here . If immediate abort() handling is required, below is an example that works

View file

@ -14,12 +14,22 @@ pub type BotAR = Arc<RwLock<BotInstance>>;
#[tokio::main]
pub async fn main() {
console_subscriber::init();
Log::set_file_ext(Extension::Log);
Log::set_level(Level::Trace);
Log::set_retention_days(2);
// Log::set_level(Level::Notice);
// fn innerbody() -> i64 {
// println!("hello");
// 64
// }
// let exec_body:fn() -> i64 = innerbody;