WIP: Routine Unresponsive due to Deadlock #51

Draft
modulatingforce wants to merge 10 commits from issue-routine-lock into routines-functionality
4 changed files with 112 additions and 47 deletions
Showing only changes of commit e711c6454d - Show all commits

View file

@ -17,7 +17,13 @@ pub struct ExecBodyParams {
pub bot : BotAR,
pub msg : PrivmsgMessage,
pub parent_act : Option<ActAR> ,
pub curr_act : ActAR ,
pub curr_act : Option<ActAR> ,
}
#[derive(Debug,Eq,PartialEq,Hash)]
pub enum ParamLevel {
Parent,
Current,
}
@ -27,8 +33,8 @@ impl ExecBodyParams {
// pub async fn get_parent_module(&self) -> BotModule {
pub async fn get_module(&self) -> BotModule {
let curr_act = Arc::clone(&self.curr_act);
let parent_act_lock = curr_act.read().await;
let curr_acta = Arc::clone(&(self.curr_act.clone().unwrap()));
let parent_act_lock = curr_acta.read().await;
let act = &(*parent_act_lock);
match act {
BotAction::C(c) => {
@ -61,7 +67,7 @@ impl ExecBodyParams {
dbg!(">> BotInstanceAR - RwLock from botinstance.rs::150:28 - current_readers = 2");
let curr_act = Arc::clone(&self.curr_act);
let curr_act = Arc::clone(&self.curr_act.clone().unwrap());
// let curr_act_lock = curr_act.read().await;
// Note : The below `curr_act.blocking_read()` is not possible. I get the following

View file

@ -406,7 +406,7 @@ impl BotInstance {
bot : Arc::clone(&bot),
msg : (*msg).clone(),
parent_act : None,
curr_act : Arc::clone(&act_clone),
curr_act : Some(Arc::clone(&act_clone)),
};
// When sending a BotMsgTypeNotif, send_botmsg does Roles related validation as required
@ -464,7 +464,7 @@ impl BotInstance {
bot : Arc::clone(&bot),
msg : (*msg).clone(),
parent_act : None,
curr_act : Arc::clone(&act_clone),
curr_act : Some(Arc::clone(&act_clone)),
};
@ -495,7 +495,7 @@ impl BotInstance {
bot : Arc::clone(&bot),
msg : (*msg).clone(),
parent_act : None,
curr_act : Arc::clone(&act_clone),
curr_act : Some(Arc::clone(&act_clone)),
};
@ -521,7 +521,7 @@ impl BotInstance {
bot : a,
msg : msg.clone() ,
parent_act : None,
curr_act : Arc::clone(&act_clone),
curr_act : Some(Arc::clone(&act_clone)),
}).await;
botlog::trace(
@ -570,7 +570,7 @@ impl BotInstance {
bot : a,
msg : msg.clone() ,
parent_act : None,
curr_act : Arc::clone(&act_clone),
curr_act : Some(Arc::clone(&act_clone)),
} ).await;
}

View file

@ -57,6 +57,7 @@ use crate::core::bot_actions;
use std::hash::{Hash, Hasher};
use super::bot_actions::ActAR;
use super::bot_actions::ParamLevel;
use super::bot_actions::RoutineAR;
use super::identity::ChatBadge;
@ -640,15 +641,18 @@ pub enum RoutineSignal {
NotStarted,
}
pub type ExecBodyParamsAr = Arc<RwLock<ExecBodyParams>>;
pub struct Routine {
pub name : String ,
pub module : BotModule , // from() can determine this if passed parents_params
pub channel : Channel , // Requiring some channel context
// exec_body: Arc<RwLock<bot_actions::actions_util::ExecBody>>,
// exec_body : fn(ExecBodyParams),
Review

Code Fix Involved :

  • Within Blocking Non-Async functions, when accessing Arc<RwLock<BotAction>> , use blocking_read() & blocking_write()
  • ExecBodyParams::get_channel() be defined as non-async
  • Routine definition involve :
    • exec_body : fn(Arc<RwLock<ExecBodyParams>>)
    • parent_params : Arc<RwLock<ExecBodyParams>>
  • Routine::start() involves a tokio::task::spawn_blocking execute the loopbody()
### Code Fix Involved : - [x] Within Blocking Non-Async functions, when accessing `Arc<RwLock<BotAction>>` , use `blocking_read()` & `blocking_write()` - [x] `ExecBodyParams::get_channel()` be defined as non-async - [x] `Routine` definition involve : - [x] `exec_body : fn(Arc<RwLock<ExecBodyParams>>)` - [x] `parent_params : Arc<RwLock<ExecBodyParams>>` - [x] `Routine::start()` involves a `tokio::task::spawn_blocking` execute the `loopbody()`
exec_body : fn(Arc<RwLock<ExecBodyParams>>),
exec_body : fn(ExecBodyParamsAr),
// pub parent_params : ExecBodyParams ,
pub parent_params : Arc<RwLock<ExecBodyParams>> ,
// pub parent_params : Arc<RwLock<ExecBodyParams>> ,
pub params : HashMap<ParamLevel,ExecBodyParamsAr>,
pub join_handle : Option<Arc<RwLock<JoinHandle<RoutineAR>>>> ,
start_time : Option<DateTime<Local>> ,
pub complete_iterations : i64 ,
@ -665,7 +669,7 @@ impl Debug for Routine {
write!(f, "{:?}", self.name)?;
write!(f, "{:?}", self.module)?;
write!(f, "{:?}", self.channel)?;
write!(f, "{:?}", self.parent_params)?;
write!(f, "{:?}", self.params)?;
write!(f, "{:?}", self.join_handle)?;
write!(f, "{:?}", self.start_time)?;
write!(f, "{:?}", self.complete_iterations)?;
@ -911,12 +915,27 @@ impl Routine {
Routine::validate_attr(&routine_attr).await?;
let mut params = HashMap::new();
params.insert(ParamLevel::Parent,parent_params.clone());
let pparam_clone = parent_params.clone();
let parent_guard = pparam_clone.read().await;
let curr_params = ExecBodyParams {
bot : parent_guard.bot.clone(),
msg : parent_guard.msg.clone(),
parent_act : parent_guard.curr_act.clone() ,
curr_act : None,
};
params.insert(ParamLevel::Current,Arc::new(RwLock::new(curr_params)));
let routine_ar = Arc::new(RwLock::new(Routine {
name ,
module ,
channel ,
exec_body ,
parent_params ,
// parent_params ,
params,
join_handle : None ,
start_time : None ,
complete_iterations : 0 ,
@ -933,6 +952,8 @@ impl Routine {
// 2. Update the current self_act_ar
mut_lock.self_act_ar = Some(Arc::new(RwLock::new(BotAction::R(routine_ar.clone()))));
mut_lock.params.get(&ParamLevel::Current).unwrap().write().await.curr_act = Some(Arc::new(RwLock::new(BotAction::R(routine_ar.clone()))));
Ok(routine_ar.clone())
// return Ok(Arc::new(RwLock::new(Routine {
@ -1018,7 +1039,7 @@ impl Routine {
Some(format!(
"Routine > start() > (In Tokio Spawn)",
)),
Some(&trg_routine_ar.read().await.parent_params.read().await.msg),
Some(&trg_routine_ar.read().await.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1031,7 +1052,7 @@ impl Routine {
Some(format!(
"Routine > start() > (In Tokio Spawn)",
)),
Some(&trg_routine_ar.read().await.parent_params.read().await.msg),
Some(&trg_routine_ar.read().await.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1114,7 +1135,7 @@ impl Routine {
"Routine > start() > (In Tokio Spawn) > {:?}",
trg_routine_ar.read().await.module
)),
Some(&trg_routine_ar.read().await.parent_params.read().await.msg),
Some(&trg_routine_ar.read().await.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
if let Some(dur) = delayduration {
@ -1223,7 +1244,7 @@ impl Routine {
"Routine > start() > (In Tokio Spawn) > {:?}",
trg_routine_ar.read().await.module
)),
Some(&trg_routine_ar.read().await.parent_params.read().await.msg),
Some(&trg_routine_ar.read().await.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
botlog::trace(
@ -1238,7 +1259,7 @@ impl Routine {
"Routine > start() > (In Tokio Spawn) > {:?}",
trg_routine_ar.read().await.module
)),
Some(&trg_routine_ar.read().await.parent_params.read().await.msg),
Some(&trg_routine_ar.read().await.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1311,14 +1332,22 @@ impl Routine {
// self_ar.read().await.parent_params.clone()
// ).await;
let parent_params = {
// let parent_params = {
// let guard1 = self_ar.blocking_read();
// let parent_params = guard1.params.get(&ParamLevel::Parent).unwrap().clone();
// drop(guard1);
// parent_params
// };
let curr_params = {
let guard1 = self_ar.blocking_read();
let parent_params = guard1.parent_params.clone();
let curr_params = guard1.params.get(&ParamLevel::Current).unwrap().clone();
drop(guard1);
parent_params
curr_params
};
dbg!("Core > Guarding and will Execute Child Execution Body");
dbg!(">> BotActionAR - RwLock from botmodules.rs::929:46 - current_readers = 0 ");
dbg!(">> RoutineAR - RwLock from botmodules.rs::1261:32 - current_readers = Not Listed");
@ -1352,7 +1381,7 @@ impl Routine {
// drop(guard3);
// [ ] May need to do a blocking async of this?
exec_body_ar(parent_params);
exec_body_ar(curr_params);
}
@ -1395,7 +1424,7 @@ impl Routine {
"Routine > stop() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.read().await.msg),
Some(&self_lock.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1457,7 +1486,7 @@ impl Routine {
"Routine > cancel() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.read().await.msg),
Some(&self_lock.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1518,7 +1547,7 @@ impl Routine {
"Routine > restart() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.read().await.msg),
Some(&self_lock.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1552,7 +1581,7 @@ impl Routine {
"Routine > restart() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.read().await.msg),
Some(&self_lock.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();
@ -1596,7 +1625,7 @@ impl Routine {
"Routine > restart() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.read().await.msg),
Some(&self_lock.params.get(&ParamLevel::Parent).unwrap().read().await.msg),
);
Log::flush();

View file

@ -139,11 +139,23 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
// [ ] 1. Create a Routine & start a routine
let params_ar = Arc::new(RwLock::new(params));
// let parentmodule = params.get_parent_module().await;
let module = params.get_module().await;
let module = params_ar.read().await.get_module().await;
// let channel = params.get_channel().await;
// let channel = params.get_channel().await;
let channel = params.get_channel();
// let channel = params.get_channel();
let channel_task_grabber = tokio::task::spawn_blocking( move || {
let params_clone = params_ar.clone();
let channel = params_clone.blocking_read().get_channel();
drop(params_clone);
(channel,params_ar)
});
let (channel,params_ar) = channel_task_grabber.await.unwrap();
let routine_attr = vec![
// RoutineAttr::RunOnce
RoutineAttr::MaxIterations(5),
@ -159,7 +171,8 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
{
// let curract_guard = params.curr_act.read().await;
let paramguard1 = params.blocking_read();
let curract_guard = paramguard1.curr_act.blocking_read();
let curract = paramguard1.curr_act.clone().unwrap();
let curract_guard = curract.blocking_read();
// let logmsg_botact = match *params.curr_act.read().await {
@ -186,7 +199,8 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
// let curract_guard = params.curr_act.write().await;
let paramguard1 = params.blocking_read();
let curract_guard = paramguard1.curr_act.blocking_write();
let curract = paramguard1.curr_act.clone().unwrap();
let curract_guard = curract.blocking_write();
// let routine_lock = arr.write().await;
@ -215,11 +229,13 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
// println!("Remaining iterations > {}",a)
// }
let iterleft;
{
// let routine_lock = arr.write().await;
let routine_lock = arr.blocking_write();
let a = routine_lock.remaining_iterations;
println!("remaining iterations : {:?}", a);
// let routine_lock = arr.blocking_write();
let routine_lock = arr.blocking_read();
iterleft = routine_lock.remaining_iterations.unwrap_or(0);
println!("remaining iterations : {:?}", iterleft);
}
botlog::trace(
@ -254,26 +270,39 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
let chosen_channel = pick_a_channel(joinedchannels);
dbg!("SUCCESS", chosen_channel.clone());
botlog::trace(
format!("Picked a channel: {:?}", chosen_channel).as_str(),
Some("Experiments003 > countdown_chnl()".to_string()),
Some(&params.blocking_read().msg),
);
Log::flush();
// let a = || {
// let outmsg = if iterleft == 1 {
// let chosen_channel_ar = Arc::new(RwLock::new(chosen_channel));
Review

Issue with the Fix

If child Routine fn are defined as regular fn rather than async , Custom Routine fn would not be able to call Chat.say() or similar functionality

Here for example, in a Custom Routine fn innertester(params : Arc<RwLock<ExecBodyParams>> , we can't call chat.say() in this area

### Issue with the Fix If child Routine `fn` are defined as regular `fn` rather than `async` , Custom Routine `fn` would not be able to call `Chat.say()` or similar functionality Here for example, in a Custom Routine `fn innertester(params : Arc<RwLock<ExecBodyParams>>` , we can't call `chat.say()` in this area
Review

Honestly I'm favoring keeping the fox and maybe enhancing Chat so it has non-async calls for say().

Keep in mind in implementing, we cannot call async calls in a blocking context. This means if I have a non async code block, code calls (e.g. fn) cannot be awaited on. In other words in non Async fn, I cannot use client.send().await. There are ways around this though (later notes)

I don't foresee issues with multitasking here if we enhance Chat so users essentially enqueue "messages" (consisting of BotMsgType and ExecParams) then the bot has a separate tokio::task running that processes those messages (i.e., triggering the message using given twitch-irc call like client.say() )

In addition with in place, when custom module developers define a routine, they don't have to worry about locks as within the context of a custom built Routine (in theory) should be blocking

  • ofc we still need to be mindful of locks initiated and managed at core level

I'm thinking of using async-channel crate
https://docs.rs/async-channel/latest/async_channel/fn.bounded.html

Honestly I'm favoring keeping the fox and maybe enhancing `Chat` so it has non-async calls for `say()`. Keep in mind in implementing, we cannot call `async` calls in a blocking context. This means if I have a non `async` code block, code calls (e.g. `fn`) cannot be awaited on. In other words in non Async fn, I cannot use `client.send().await`. There are ways around this though (later notes) I don't foresee issues with multitasking here if we enhance `Chat` so users essentially enqueue "messages" (consisting of `BotMsgType` and `ExecParams`) then the bot has a separate `tokio::task` running that processes those messages (i.e., triggering the message using given `twitch-irc` call like `client.say()` ) In addition with in place, when custom module developers define a routine, they don't have to worry about locks as within the context of a custom built `Routine` (in theory) should be blocking - ofc we still need to be mindful of locks initiated and managed at `core` level --- I'm thinking of using `async-channel` crate https://docs.rs/async-channel/latest/async_channel/fn.bounded.html
// let params_clone = params.clone();
// let bot = Arc::clone(&params.blocking_read().bot);
// dbg!("in chat async function");
// let botlock = bot.blocking_read();
// let channel_ar_clone = chosen_channel_ar.clone();
// let outmsg = if iterleft <= 1 {
// format!("{} I love you uwu~",iterleft)
// } else { format!("{}",iterleft) };
// botlock.botmgrs.chat
// .say(
// // joinedchannels.choose(&mut rng).unwrap().0.clone(),
// chosen_channel.0.clone(),
// channel_ar_clone.read().await.0.clone(),
// outmsg,
// params.clone()
// ).await;
// params_clone.read().await.clone()
// ).await;
}
}
@ -291,7 +320,7 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
routine_attr,
// Arc::new(RwLock::new(exec_body)),
exec_body,
Arc::new(RwLock::new(params.clone())),
Arc::new(RwLock::new(params_ar.clone().read().await.clone())),
).await {
let newr_ar = newr.clone();
// [ ] start the routine
@ -301,13 +330,13 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
botlog::debug(
"Successfully started",
Some("experiment003 > countdown_chnl()".to_string()),
Some(&params.msg),
Some(&params_ar.read().await.msg),
);
Log::flush();
let bot = Arc::clone(&params.bot);
let bot = Arc::clone(&params_ar.read().await.bot);
let botlock = bot.read().await;
@ -316,9 +345,9 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
.botmgrs
.chat
.say_in_reply_to(
&params.msg,
&params_ar.read().await.msg,
"Started Routine!".to_string(),
params.clone()
params_ar.read().await.clone()
).await;
// let jhandle = newr.clone().read().await.join_handle.clone().unwrap();
@ -408,7 +437,8 @@ async fn test3_body(params : ExecBodyParams) {
// let guard1 = params.curr_act.read().await;
let paramguard1 = params.blocking_read();
let guard1 = paramguard1.curr_act.blocking_read();
let curract = paramguard1.curr_act.clone().unwrap();
let guard1 = curract.blocking_read();
{
let logmsg_botact = match *guard1 {
BotAction::C(_) => "command",