WIP: Routine Unresponsive due to Deadlock #51

Draft
modulatingforce wants to merge 10 commits from issue-routine-lock into routines-functionality
4 changed files with 147 additions and 62 deletions
Showing only changes of commit fa5de03bae - Show all commits

View file

@ -50,8 +50,8 @@ impl ExecBodyParams {
}
}
pub async fn get_channel(&self) -> Option<Channel> {
// pub fn get_channel(&self) -> Option<Channel> {
// pub async fn get_channel(&self) -> Option<Channel> {
pub fn get_channel(&self) -> Option<Channel> {
dbg!("Core > ExecBodyParams > GetChannels START");
dbg!("!! [x] Document - After SUCCESS message was sent to chat");
@ -62,7 +62,7 @@ impl ExecBodyParams {
let curr_act = Arc::clone(&self.curr_act);
let curr_act_lock = curr_act.read().await;
// let curr_act_lock = curr_act.read().await;
// Note : The below `curr_act.blocking_read()` is not possible. I get the following
// during runtime in this areas
@ -70,7 +70,7 @@ impl ExecBodyParams {
thread 'tokio-runtime-worker' panicked at src\core\bot_actions.rs:66:38:
Cannot block the current thread from within a runtime. This happens because a function attempted to block the current thread while the thread is being used to drive asynchronous tasks.
*/
// let curr_act_lock = curr_act.blocking_read();
let curr_act_lock = curr_act.blocking_read();
dbg!("Core > ExecBodyParams > After Creating ExecBodyParams.current_act.read() Guard ");
dbg!("!! [x] Document - After SUCCESS message was sent to chat");
@ -113,10 +113,13 @@ Cannot block the current thread from within a runtime. This happens because a fu
// dbg!("ISSUE : RoutineAR - RwLock from botmodules.rs::1261:32 - current_readers = 1");
// let out = Some(r.read().await.channel.clone());
let guard = r.read().await;
// let guard = r.blocking_read();
dbg!("ISSUE > Never makes it after the following read guard");
dbg!("ISSUE> Never makes it after the read guard");
// let guard = r.read().await;
let guard = r.blocking_read();
dbg!("ISSUE RESOLVED > If passed htis point");
let channel = guard.channel.clone();
drop(guard);

View file

@ -644,8 +644,11 @@ pub struct Routine {
pub name : String ,
pub module : BotModule , // from() can determine this if passed parents_params
pub channel : Channel , // Requiring some channel context
exec_body: Arc<RwLock<bot_actions::actions_util::ExecBody>>,
pub parent_params : ExecBodyParams ,
// exec_body: Arc<RwLock<bot_actions::actions_util::ExecBody>>,
// exec_body : fn(ExecBodyParams),
exec_body : fn(Arc<RwLock<ExecBodyParams>>),
// pub parent_params : ExecBodyParams ,
pub parent_params : Arc<RwLock<ExecBodyParams>> ,
Review

Code Fix Involved :

  • Within Blocking Non-Async functions, when accessing Arc<RwLock<BotAction>> , use blocking_read() & blocking_write()
  • ExecBodyParams::get_channel() be defined as non-async
  • Routine definition involve :
    • exec_body : fn(Arc<RwLock<ExecBodyParams>>)
    • parent_params : Arc<RwLock<ExecBodyParams>>
  • Routine::start() involves a tokio::task::spawn_blocking execute the loopbody()
### Code Fix Involved : - [x] Within Blocking Non-Async functions, when accessing `Arc<RwLock<BotAction>>` , use `blocking_read()` & `blocking_write()` - [x] `ExecBodyParams::get_channel()` be defined as non-async - [x] `Routine` definition involve : - [x] `exec_body : fn(Arc<RwLock<ExecBodyParams>>)` - [x] `parent_params : Arc<RwLock<ExecBodyParams>>` - [x] `Routine::start()` involves a `tokio::task::spawn_blocking` execute the `loopbody()`
pub join_handle : Option<Arc<RwLock<JoinHandle<RoutineAR>>>> ,
start_time : Option<DateTime<Local>> ,
pub complete_iterations : i64 ,
@ -896,8 +899,9 @@ impl Routine {
module : BotModule ,
channel : Channel,
routine_attr : Vec<RoutineAttr> ,
exec_body : Arc<RwLock<bot_actions::actions_util::ExecBody>> ,
parent_params : ExecBodyParams
// exec_body : Arc<RwLock<bot_actions::actions_util::ExecBody>> ,
exec_body : fn(Arc<RwLock<ExecBodyParams>>) ,
parent_params : Arc<RwLock<ExecBodyParams>>
) -> Result<
Arc<RwLock<Routine>>,
String
@ -1014,7 +1018,7 @@ impl Routine {
Some(format!(
"Routine > start() > (In Tokio Spawn)",
)),
Some(&trg_routine_ar.read().await.parent_params.msg),
Some(&trg_routine_ar.read().await.parent_params.read().await.msg),
);
Log::flush();
@ -1027,7 +1031,7 @@ impl Routine {
Some(format!(
"Routine > start() > (In Tokio Spawn)",
)),
Some(&trg_routine_ar.read().await.parent_params.msg),
Some(&trg_routine_ar.read().await.parent_params.read().await.msg),
);
Log::flush();
@ -1110,7 +1114,7 @@ impl Routine {
"Routine > start() > (In Tokio Spawn) > {:?}",
trg_routine_ar.read().await.module
)),
Some(&trg_routine_ar.read().await.parent_params.msg),
Some(&trg_routine_ar.read().await.parent_params.read().await.msg),
);
if let Some(dur) = delayduration {
@ -1119,6 +1123,8 @@ impl Routine {
{ // [x] Loop Initialization - Prior to Loop that calls Custom Routine Execution Body
let mut a = trg_routine_ar.write().await;
a.start_time = Some(chrono::offset::Local::now());
@ -1129,15 +1135,33 @@ impl Routine {
{
a.remaining_iterations = Some(iternum);
}
drop(a);
}
loop { // [x] Routine loop
// [x] execution body
// trg_routine_ar.read().await.loopbody().await;
{
trg_routine_ar.write().await.loopbody().await;
let trg_routine_ar_clone = trg_routine_ar.clone();
// trg_routine_ar.write().await.loopbody().await;
let loopbodyspawn = tokio::task::spawn_blocking( move || {
// let routine_ar_clone = Arc::clone(&trg_routine_ar);
// trg_routine_ar
// trg_routine_ar_clone.blocking_write().loopbody();
let guard1 = trg_routine_ar_clone.blocking_read();
guard1.loopbody();
drop(guard1);
});
loopbodyspawn.await.unwrap();
// trg_routine_ar.write().await.loopbody()
}
@ -1156,6 +1180,7 @@ impl Routine {
if i > 0 { a.remaining_iterations = Some(i-1) ; }
else { break ; } // if remaining iterations is 0, exit
}
drop(a);
}
@ -1184,6 +1209,8 @@ impl Routine {
}
dbg!("SUCCESS! Routinecompleted");
botlog::trace(
format!(
@ -1196,7 +1223,7 @@ impl Routine {
"Routine > start() > (In Tokio Spawn) > {:?}",
trg_routine_ar.read().await.module
)),
Some(&trg_routine_ar.read().await.parent_params.msg),
Some(&trg_routine_ar.read().await.parent_params.read().await.msg),
);
botlog::trace(
@ -1211,7 +1238,7 @@ impl Routine {
"Routine > start() > (In Tokio Spawn) > {:?}",
trg_routine_ar.read().await.module
)),
Some(&trg_routine_ar.read().await.parent_params.msg),
Some(&trg_routine_ar.read().await.parent_params.read().await.msg),
);
Log::flush();
@ -1229,7 +1256,7 @@ impl Routine {
drop(lock); // => 03.30 - added this to try to address Deadlock issue
}
trg_routine_arout.write().await.internal_signal = RoutineSignal::Started;
// trg_routine_arout.write().await.internal_signal = RoutineSignal::Started;
return Ok(trg_routine_arout);
@ -1244,7 +1271,9 @@ impl Routine {
// }
async fn loopbody(&mut self)
// async fn loopbody(&mut self)
// fn loopbody(&mut self)
fn loopbody(&self)
// [x] => 03.27 - COMPLETED
{
botlog::trace(
@ -1260,15 +1289,19 @@ impl Routine {
// => 03.30 - involved with Problem Deadlock Current Readers = 1
// - Below self appears to be a Arc<RwLock<&mut Routine>>
let self_ar = Arc::new(RwLock::new(self));
// => 03.31 - Temporarily removing the current_act / parrent_act functionality
/*
{
let mut mutlock = self_ar.write().await;
// let mutlock = self_ar.read().await;
let mutlock = self_ar.blocking_read();
mutlock.parent_params.parent_act = Some(mutlock.parent_params.curr_act.clone());
mutlock.parent_params.curr_act = mutlock.self_act_ar.to_owned().unwrap();
mutlock.parent_params.blocking_write().parent_act = Some(mutlock.parent_params.blocking_read().curr_act.clone());
mutlock.parent_params.blocking_write().curr_act = mutlock.self_act_ar.to_owned().unwrap();
drop(mutlock); // => 03.30 - Added to address Deadlock issue
}
*/
dbg!("Core > Before Guards");
@ -1279,7 +1312,7 @@ impl Routine {
// ).await;
let parent_params = {
let guard1 = self_ar.read().await;
let guard1 = self_ar.blocking_read();
let parent_params = guard1.parent_params.clone();
drop(guard1);
@ -1311,12 +1344,15 @@ impl Routine {
{
let guard2 = self_ar.read().await;
let guard2 = self_ar.blocking_read();
let exec_body_ar = guard2.exec_body.clone();
drop(guard2);
let guard3 = exec_body_ar.read().await;
(guard3)(parent_params).await;
drop(guard3);
// let guard3 = exec_body_ar.read().await;
// (guard3)(parent_params).await;
// drop(guard3);
// [ ] May need to do a blocking async of this?
exec_body_ar(parent_params);
}
@ -1343,6 +1379,7 @@ impl Routine {
{
let mut self_lock = self_rw.write().await;
self_lock.internal_signal = RoutineSignal::Stopping;
drop(self_lock);
}
@ -1358,7 +1395,7 @@ impl Routine {
"Routine > stop() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.msg),
Some(&self_lock.parent_params.read().await.msg),
);
Log::flush();
@ -1403,6 +1440,7 @@ impl Routine {
{
let mut lock_mut = self_rw.write().await;
lock_mut.internal_signal = RoutineSignal::Stopped;
drop(lock_mut);
}
}
@ -1419,7 +1457,7 @@ impl Routine {
"Routine > cancel() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.msg),
Some(&self_lock.parent_params.read().await.msg),
);
Log::flush();
@ -1459,9 +1497,11 @@ impl Routine {
{
let mut self_lock = self_rw.write().await;
self_lock.cancel().await?;
drop(self_lock);
} else {
let mut self_lock = self_rw.write().await;
self_lock.stop().await?;
drop(self_lock);
}
Routine::start(self_rw.clone()).await?;
@ -1478,7 +1518,7 @@ impl Routine {
"Routine > restart() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.msg),
Some(&self_lock.parent_params.read().await.msg),
);
Log::flush();
@ -1512,7 +1552,7 @@ impl Routine {
"Routine > restart() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.msg),
Some(&self_lock.parent_params.read().await.msg),
);
Log::flush();
@ -1539,6 +1579,7 @@ impl Routine {
{
let mut self_lock = self_rw.write().await;
self_lock.routine_attr = routine_attr;
drop(self_lock);
}
// let self_rw = Arc::new(RwLock::new(self));
@ -1555,7 +1596,7 @@ impl Routine {
"Routine > restart() > {:?}",
self_lock.module
)),
Some(&self_lock.parent_params.msg),
Some(&self_lock.parent_params.read().await.msg),
);
Log::flush();

View file

@ -142,19 +142,24 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
// let parentmodule = params.get_parent_module().await;
let module = params.get_module().await;
// let channel = params.get_channel().await;
let channel = params.get_channel().await;
// let channel = params.get_channel().await;
let channel = params.get_channel();
let routine_attr = vec![
// RoutineAttr::RunOnce
RoutineAttr::MaxIterations(5),
RoutineAttr::LoopDuration(Duration::from_secs(1))
];
// let exec_body = actions_util::asyncbox(rtestbody);
let exec_body = actions_util::asyncbox(innertester); // <-- 03.27 - when below is uncommented, this is throwing an issue
// let exec_body = actions_util::asyncbox(innertester); // <-- 03.27 - when below is uncommented, this is throwing an issue
let exec_body = innertester;
async fn innertester(params : ExecBodyParams) {
// async fn innertester(params : ExecBodyParams) {
fn innertester(params : Arc<RwLock<ExecBodyParams>>) {
{
let curract_guard = params.curr_act.read().await;
// let curract_guard = params.curr_act.read().await;
let paramguard1 = params.blocking_read();
let curract_guard = paramguard1.curr_act.blocking_read();
// let logmsg_botact = match *params.curr_act.read().await {
@ -169,16 +174,19 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
botlog::trace(
format!("Params > Curr_act type : {:?}", logmsg_botact).as_str(),
Some("Experiments003 > countdown_chnl()".to_string()),
Some(&params.msg),
Some(&params.blocking_read().msg),
);
Log::flush();
}
{
let bot = Arc::clone(&params.bot);
let botlock = bot.read().await;
let bot = Arc::clone(&params.blocking_read().bot);
// let botlock = bot.read().await;
let botlock = bot.blocking_read();
let curract_guard = params.curr_act.write().await;
// let curract_guard = params.curr_act.write().await;
let paramguard1 = params.blocking_read();
let curract_guard = paramguard1.curr_act.blocking_write();
// let routine_lock = arr.write().await;
@ -208,7 +216,8 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
// }
{
let routine_lock = arr.write().await;
// let routine_lock = arr.write().await;
let routine_lock = arr.blocking_write();
let a = routine_lock.remaining_iterations;
println!("remaining iterations : {:?}", a);
}
@ -248,7 +257,7 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
botlog::trace(
format!("Picked a channel: {:?}", chosen_channel).as_str(),
Some("Experiments003 > countdown_chnl()".to_string()),
Some(&params.msg),
Some(&params.blocking_read().msg),
);
Log::flush();
@ -280,8 +289,9 @@ async fn countdown_chnl_v1(params : ExecBodyParams) {
module,
channel.unwrap(),
routine_attr,
Arc::new(RwLock::new(exec_body)),
params.clone()
// Arc::new(RwLock::new(exec_body)),
exec_body,
Arc::new(RwLock::new(params.clone())),
).await {
let newr_ar = newr.clone();
// [ ] start the routine
@ -358,25 +368,47 @@ async fn test3_body(params : ExecBodyParams) {
// [x] Get the module from params
let params_ar = Arc::new(RwLock::new(params));
// let parentmodule = params.get_parent_module().await;
let module = params.get_module().await;
let module = params_ar.read().await.get_module().await;
// let channel = params.get_channel().await;
let channel = params.get_channel().await;
// let channel = params.get_channel().await;
// let channel = params.get_channel();
let routine_attr = vec![
RoutineAttr::RunOnce
];
// let exec_body = actions_util::asyncbox(rtestbody);
let exec_body = actions_util::asyncbox(rtestbody); // <-- 03.27 - when below is uncommented, this is throwing an issue
// let exec_body = actions_util::asyncbox(rtestbody); // <-- 03.27 - when below is uncommented, this is throwing an issue
// let channel = params.get_channel();
let channel_task_grabber = tokio::task::spawn_blocking( move || {
let params_clone = params_ar.clone();
let channel = params_clone.blocking_read().get_channel();
drop(params_clone);
(channel,params_ar)
});
let (channel,params_ar) = channel_task_grabber.await.unwrap();
let exec_body = rtestbody;
// let parent_params = params.clone();
// let params_clone = params.clone();
async fn rtestbody(params : ExecBodyParams) {
// async fn rtestbody(params : ExecBodyParams) {
fn rtestbody(params : Arc<RwLock<ExecBodyParams>>) {
let guard1 = params.curr_act.read().await;
// let guard1 = params.curr_act.read().await;
let paramguard1 = params.blocking_read();
let guard1 = paramguard1.curr_act.blocking_read();
{
let logmsg_botact = match *guard1 {
BotAction::C(_) => "command",
@ -387,7 +419,7 @@ async fn test3_body(params : ExecBodyParams) {
botlog::trace(
format!("Params > Curr_act type : {:?}", logmsg_botact).as_str(),
Some("Experiments003 > test3 command body".to_string()),
Some(&params.msg),
Some(&params.blocking_read().msg),
);
Log::flush();
}
@ -403,7 +435,7 @@ async fn test3_body(params : ExecBodyParams) {
botlog::trace(
format!("Params > Curr_act type : {:?}", logmsg_botact).as_str(),
Some("Experiments003 > test3 command body".to_string()),
Some(&params.msg),
Some(&params.blocking_read().msg),
);
Log::flush();
}
@ -439,7 +471,8 @@ async fn test3_body(params : ExecBodyParams) {
// } ;
// let chnl = params.get_channel().await;
let chnl = params.get_channel().await;
// let chnl = params.get_channel().await;
let chnl = params.blocking_read().get_channel();
dbg!("Custom > within Child Custom fn - after GetChannel");
println!("{:?}",chnl);
@ -461,14 +494,14 @@ async fn test3_body(params : ExecBodyParams) {
}
let params_clone = params_ar.clone();
botlog::debug(
format!("RTESTBODY : module - {:?} ; channel - {:?}",
module,channel
).as_str(),
Some("experiment003 > test3_body".to_string()),
Some(&params.msg),
Some(&params_clone.read().await.msg),
);
@ -478,8 +511,9 @@ async fn test3_body(params : ExecBodyParams) {
module,
channel.unwrap(),
routine_attr,
Arc::new(RwLock::new(exec_body)),
params.clone()
// Arc::new(RwLock::new(exec_body)),
exec_body,
Arc::new(RwLock::new(params_clone.read().await.clone()))
).await;
@ -520,12 +554,12 @@ async fn test3_body(params : ExecBodyParams) {
rsltstr
).as_str(),
Some("experiment003 > test3_body".to_string()),
Some(&params.msg),
Some(&(params_ar.clone()).read().await.msg),
);
Log::flush();
let bot = Arc::clone(&params.bot);
let bot = Arc::clone(&params_ar.read().await.bot);
let botlock = bot.read().await;
@ -534,9 +568,9 @@ async fn test3_body(params : ExecBodyParams) {
.botmgrs
.chat
.say_in_reply_to(
&params.msg,
&params_ar.read().await.msg,
format!("Routine Result : {:?}",rsltstr),
params.clone()
params_clone.read().await.clone()
).await;
// [x] Will not be handling JoinHandles here . If immediate abort() handling is required, below is an example that works

View file

@ -22,7 +22,14 @@ pub async fn main() {
Log::set_retention_days(2);
// Log::set_level(Level::Notice);
// fn innerbody() -> i64 {
// println!("hello");
// 64
// }
// let exec_body:fn() -> i64 = innerbody;