From fa5de03bae027336c17373dd463832264b22206d Mon Sep 17 00:00:00 2001 From: ModulatingForce <116608425+modulatingforce@users.noreply.github.com> Date: Sun, 31 Mar 2024 16:38:11 -0400 Subject: [PATCH] used blocking instead of async in some calls --- src/core/bot_actions.rs | 17 +++-- src/core/botmodules.rs | 93 +++++++++++++++++------- src/custom/experimental/experiment003.rs | 90 ++++++++++++++++------- src/main.rs | 9 ++- 4 files changed, 147 insertions(+), 62 deletions(-) diff --git a/src/core/bot_actions.rs b/src/core/bot_actions.rs index 9ad7eea..7775beb 100644 --- a/src/core/bot_actions.rs +++ b/src/core/bot_actions.rs @@ -50,8 +50,8 @@ impl ExecBodyParams { } } - pub async fn get_channel(&self) -> Option { - // pub fn get_channel(&self) -> Option { + // pub async fn get_channel(&self) -> Option { + pub fn get_channel(&self) -> Option { dbg!("Core > ExecBodyParams > GetChannels START"); dbg!("!! [x] Document - After SUCCESS message was sent to chat"); @@ -62,7 +62,7 @@ impl ExecBodyParams { let curr_act = Arc::clone(&self.curr_act); - let curr_act_lock = curr_act.read().await; + // let curr_act_lock = curr_act.read().await; // Note : The below `curr_act.blocking_read()` is not possible. I get the following // during runtime in this areas @@ -70,7 +70,7 @@ impl ExecBodyParams { thread 'tokio-runtime-worker' panicked at src\core\bot_actions.rs:66:38: Cannot block the current thread from within a runtime. This happens because a function attempted to block the current thread while the thread is being used to drive asynchronous tasks. */ - // let curr_act_lock = curr_act.blocking_read(); + let curr_act_lock = curr_act.blocking_read(); dbg!("Core > ExecBodyParams > After Creating ExecBodyParams.current_act.read() Guard "); dbg!("!! [x] Document - After SUCCESS message was sent to chat"); @@ -113,10 +113,13 @@ Cannot block the current thread from within a runtime. This happens because a fu // dbg!("ISSUE : RoutineAR - RwLock from botmodules.rs::1261:32 - current_readers = 1"); // let out = Some(r.read().await.channel.clone()); - let guard = r.read().await; - // let guard = r.blocking_read(); + + dbg!("ISSUE > Never makes it after the following read guard"); - dbg!("ISSUE> Never makes it after the read guard"); + // let guard = r.read().await; + let guard = r.blocking_read(); + + dbg!("ISSUE RESOLVED > If passed htis point"); let channel = guard.channel.clone(); drop(guard); diff --git a/src/core/botmodules.rs b/src/core/botmodules.rs index 5a45979..5bc9f0a 100644 --- a/src/core/botmodules.rs +++ b/src/core/botmodules.rs @@ -644,8 +644,11 @@ pub struct Routine { pub name : String , pub module : BotModule , // from() can determine this if passed parents_params pub channel : Channel , // Requiring some channel context - exec_body: Arc>, - pub parent_params : ExecBodyParams , + // exec_body: Arc>, + // exec_body : fn(ExecBodyParams), + exec_body : fn(Arc>), + // pub parent_params : ExecBodyParams , + pub parent_params : Arc> , pub join_handle : Option>>> , start_time : Option> , pub complete_iterations : i64 , @@ -896,8 +899,9 @@ impl Routine { module : BotModule , channel : Channel, routine_attr : Vec , - exec_body : Arc> , - parent_params : ExecBodyParams + // exec_body : Arc> , + exec_body : fn(Arc>) , + parent_params : Arc> ) -> Result< Arc>, String @@ -1014,7 +1018,7 @@ impl Routine { Some(format!( "Routine > start() > (In Tokio Spawn)", )), - Some(&trg_routine_ar.read().await.parent_params.msg), + Some(&trg_routine_ar.read().await.parent_params.read().await.msg), ); Log::flush(); @@ -1027,7 +1031,7 @@ impl Routine { Some(format!( "Routine > start() > (In Tokio Spawn)", )), - Some(&trg_routine_ar.read().await.parent_params.msg), + Some(&trg_routine_ar.read().await.parent_params.read().await.msg), ); Log::flush(); @@ -1110,7 +1114,7 @@ impl Routine { "Routine > start() > (In Tokio Spawn) > {:?}", trg_routine_ar.read().await.module )), - Some(&trg_routine_ar.read().await.parent_params.msg), + Some(&trg_routine_ar.read().await.parent_params.read().await.msg), ); if let Some(dur) = delayduration { @@ -1119,6 +1123,8 @@ impl Routine { { // [x] Loop Initialization - Prior to Loop that calls Custom Routine Execution Body + + let mut a = trg_routine_ar.write().await; a.start_time = Some(chrono::offset::Local::now()); @@ -1129,15 +1135,33 @@ impl Routine { { a.remaining_iterations = Some(iternum); } + drop(a); } + loop { // [x] Routine loop // [x] execution body // trg_routine_ar.read().await.loopbody().await; { - trg_routine_ar.write().await.loopbody().await; + + let trg_routine_ar_clone = trg_routine_ar.clone(); + // trg_routine_ar.write().await.loopbody().await; + let loopbodyspawn = tokio::task::spawn_blocking( move || { + // let routine_ar_clone = Arc::clone(&trg_routine_ar); + // trg_routine_ar + // trg_routine_ar_clone.blocking_write().loopbody(); + let guard1 = trg_routine_ar_clone.blocking_read(); + guard1.loopbody(); + drop(guard1); + }); + + loopbodyspawn.await.unwrap(); + + + // trg_routine_ar.write().await.loopbody() + } @@ -1156,6 +1180,7 @@ impl Routine { if i > 0 { a.remaining_iterations = Some(i-1) ; } else { break ; } // if remaining iterations is 0, exit } + drop(a); } @@ -1184,6 +1209,8 @@ impl Routine { } + dbg!("SUCCESS! Routinecompleted"); + botlog::trace( format!( @@ -1196,7 +1223,7 @@ impl Routine { "Routine > start() > (In Tokio Spawn) > {:?}", trg_routine_ar.read().await.module )), - Some(&trg_routine_ar.read().await.parent_params.msg), + Some(&trg_routine_ar.read().await.parent_params.read().await.msg), ); botlog::trace( @@ -1211,7 +1238,7 @@ impl Routine { "Routine > start() > (In Tokio Spawn) > {:?}", trg_routine_ar.read().await.module )), - Some(&trg_routine_ar.read().await.parent_params.msg), + Some(&trg_routine_ar.read().await.parent_params.read().await.msg), ); Log::flush(); @@ -1229,7 +1256,7 @@ impl Routine { drop(lock); // => 03.30 - added this to try to address Deadlock issue } - trg_routine_arout.write().await.internal_signal = RoutineSignal::Started; + // trg_routine_arout.write().await.internal_signal = RoutineSignal::Started; return Ok(trg_routine_arout); @@ -1244,7 +1271,9 @@ impl Routine { // } - async fn loopbody(&mut self) + // async fn loopbody(&mut self) + // fn loopbody(&mut self) + fn loopbody(&self) // [x] => 03.27 - COMPLETED { botlog::trace( @@ -1260,15 +1289,19 @@ impl Routine { // => 03.30 - involved with Problem Deadlock Current Readers = 1 // - Below self appears to be a Arc> let self_ar = Arc::new(RwLock::new(self)); - + + // => 03.31 - Temporarily removing the current_act / parrent_act functionality + /* { - let mut mutlock = self_ar.write().await; + // let mutlock = self_ar.read().await; + let mutlock = self_ar.blocking_read(); - mutlock.parent_params.parent_act = Some(mutlock.parent_params.curr_act.clone()); - mutlock.parent_params.curr_act = mutlock.self_act_ar.to_owned().unwrap(); + mutlock.parent_params.blocking_write().parent_act = Some(mutlock.parent_params.blocking_read().curr_act.clone()); + mutlock.parent_params.blocking_write().curr_act = mutlock.self_act_ar.to_owned().unwrap(); drop(mutlock); // => 03.30 - Added to address Deadlock issue } + */ dbg!("Core > Before Guards"); @@ -1279,7 +1312,7 @@ impl Routine { // ).await; let parent_params = { - let guard1 = self_ar.read().await; + let guard1 = self_ar.blocking_read(); let parent_params = guard1.parent_params.clone(); drop(guard1); @@ -1311,12 +1344,15 @@ impl Routine { { - let guard2 = self_ar.read().await; + let guard2 = self_ar.blocking_read(); let exec_body_ar = guard2.exec_body.clone(); drop(guard2); - let guard3 = exec_body_ar.read().await; - (guard3)(parent_params).await; - drop(guard3); + // let guard3 = exec_body_ar.read().await; + // (guard3)(parent_params).await; + // drop(guard3); + + // [ ] May need to do a blocking async of this? + exec_body_ar(parent_params); } @@ -1343,6 +1379,7 @@ impl Routine { { let mut self_lock = self_rw.write().await; self_lock.internal_signal = RoutineSignal::Stopping; + drop(self_lock); } @@ -1358,7 +1395,7 @@ impl Routine { "Routine > stop() > {:?}", self_lock.module )), - Some(&self_lock.parent_params.msg), + Some(&self_lock.parent_params.read().await.msg), ); Log::flush(); @@ -1403,6 +1440,7 @@ impl Routine { { let mut lock_mut = self_rw.write().await; lock_mut.internal_signal = RoutineSignal::Stopped; + drop(lock_mut); } } @@ -1419,7 +1457,7 @@ impl Routine { "Routine > cancel() > {:?}", self_lock.module )), - Some(&self_lock.parent_params.msg), + Some(&self_lock.parent_params.read().await.msg), ); Log::flush(); @@ -1459,9 +1497,11 @@ impl Routine { { let mut self_lock = self_rw.write().await; self_lock.cancel().await?; + drop(self_lock); } else { let mut self_lock = self_rw.write().await; self_lock.stop().await?; + drop(self_lock); } Routine::start(self_rw.clone()).await?; @@ -1478,7 +1518,7 @@ impl Routine { "Routine > restart() > {:?}", self_lock.module )), - Some(&self_lock.parent_params.msg), + Some(&self_lock.parent_params.read().await.msg), ); Log::flush(); @@ -1512,7 +1552,7 @@ impl Routine { "Routine > restart() > {:?}", self_lock.module )), - Some(&self_lock.parent_params.msg), + Some(&self_lock.parent_params.read().await.msg), ); Log::flush(); @@ -1539,6 +1579,7 @@ impl Routine { { let mut self_lock = self_rw.write().await; self_lock.routine_attr = routine_attr; + drop(self_lock); } // let self_rw = Arc::new(RwLock::new(self)); @@ -1555,7 +1596,7 @@ impl Routine { "Routine > restart() > {:?}", self_lock.module )), - Some(&self_lock.parent_params.msg), + Some(&self_lock.parent_params.read().await.msg), ); Log::flush(); diff --git a/src/custom/experimental/experiment003.rs b/src/custom/experimental/experiment003.rs index 774e24b..6b5a3de 100644 --- a/src/custom/experimental/experiment003.rs +++ b/src/custom/experimental/experiment003.rs @@ -142,19 +142,24 @@ async fn countdown_chnl_v1(params : ExecBodyParams) { // let parentmodule = params.get_parent_module().await; let module = params.get_module().await; // let channel = params.get_channel().await; - let channel = params.get_channel().await; + // let channel = params.get_channel().await; + let channel = params.get_channel(); let routine_attr = vec![ // RoutineAttr::RunOnce RoutineAttr::MaxIterations(5), RoutineAttr::LoopDuration(Duration::from_secs(1)) ]; // let exec_body = actions_util::asyncbox(rtestbody); - let exec_body = actions_util::asyncbox(innertester); // <-- 03.27 - when below is uncommented, this is throwing an issue + // let exec_body = actions_util::asyncbox(innertester); // <-- 03.27 - when below is uncommented, this is throwing an issue + let exec_body = innertester; - async fn innertester(params : ExecBodyParams) { + // async fn innertester(params : ExecBodyParams) { + fn innertester(params : Arc>) { { - let curract_guard = params.curr_act.read().await; + // let curract_guard = params.curr_act.read().await; + let paramguard1 = params.blocking_read(); + let curract_guard = paramguard1.curr_act.blocking_read(); // let logmsg_botact = match *params.curr_act.read().await { @@ -169,16 +174,19 @@ async fn countdown_chnl_v1(params : ExecBodyParams) { botlog::trace( format!("Params > Curr_act type : {:?}", logmsg_botact).as_str(), Some("Experiments003 > countdown_chnl()".to_string()), - Some(¶ms.msg), + Some(¶ms.blocking_read().msg), ); Log::flush(); } { - let bot = Arc::clone(¶ms.bot); - let botlock = bot.read().await; + let bot = Arc::clone(¶ms.blocking_read().bot); + // let botlock = bot.read().await; + let botlock = bot.blocking_read(); - let curract_guard = params.curr_act.write().await; + // let curract_guard = params.curr_act.write().await; + let paramguard1 = params.blocking_read(); + let curract_guard = paramguard1.curr_act.blocking_write(); // let routine_lock = arr.write().await; @@ -208,7 +216,8 @@ async fn countdown_chnl_v1(params : ExecBodyParams) { // } { - let routine_lock = arr.write().await; + // let routine_lock = arr.write().await; + let routine_lock = arr.blocking_write(); let a = routine_lock.remaining_iterations; println!("remaining iterations : {:?}", a); } @@ -248,7 +257,7 @@ async fn countdown_chnl_v1(params : ExecBodyParams) { botlog::trace( format!("Picked a channel: {:?}", chosen_channel).as_str(), Some("Experiments003 > countdown_chnl()".to_string()), - Some(¶ms.msg), + Some(¶ms.blocking_read().msg), ); Log::flush(); @@ -280,8 +289,9 @@ async fn countdown_chnl_v1(params : ExecBodyParams) { module, channel.unwrap(), routine_attr, - Arc::new(RwLock::new(exec_body)), - params.clone() + // Arc::new(RwLock::new(exec_body)), + exec_body, + Arc::new(RwLock::new(params.clone())), ).await { let newr_ar = newr.clone(); // [ ] start the routine @@ -358,25 +368,47 @@ async fn test3_body(params : ExecBodyParams) { // [x] Get the module from params + + let params_ar = Arc::new(RwLock::new(params)); + // let parentmodule = params.get_parent_module().await; - let module = params.get_module().await; + let module = params_ar.read().await.get_module().await; // let channel = params.get_channel().await; - let channel = params.get_channel().await; + // let channel = params.get_channel().await; + // let channel = params.get_channel(); let routine_attr = vec![ RoutineAttr::RunOnce ]; // let exec_body = actions_util::asyncbox(rtestbody); - let exec_body = actions_util::asyncbox(rtestbody); // <-- 03.27 - when below is uncommented, this is throwing an issue + // let exec_body = actions_util::asyncbox(rtestbody); // <-- 03.27 - when below is uncommented, this is throwing an issue + // let channel = params.get_channel(); + let channel_task_grabber = tokio::task::spawn_blocking( move || { + let params_clone = params_ar.clone(); + let channel = params_clone.blocking_read().get_channel(); + drop(params_clone); + (channel,params_ar) + }); + + let (channel,params_ar) = channel_task_grabber.await.unwrap(); + + + + + + let exec_body = rtestbody; // let parent_params = params.clone(); // let params_clone = params.clone(); - async fn rtestbody(params : ExecBodyParams) { + // async fn rtestbody(params : ExecBodyParams) { + fn rtestbody(params : Arc>) { - let guard1 = params.curr_act.read().await; + // let guard1 = params.curr_act.read().await; + let paramguard1 = params.blocking_read(); + let guard1 = paramguard1.curr_act.blocking_read(); { let logmsg_botact = match *guard1 { BotAction::C(_) => "command", @@ -387,7 +419,7 @@ async fn test3_body(params : ExecBodyParams) { botlog::trace( format!("Params > Curr_act type : {:?}", logmsg_botact).as_str(), Some("Experiments003 > test3 command body".to_string()), - Some(¶ms.msg), + Some(¶ms.blocking_read().msg), ); Log::flush(); } @@ -403,7 +435,7 @@ async fn test3_body(params : ExecBodyParams) { botlog::trace( format!("Params > Curr_act type : {:?}", logmsg_botact).as_str(), Some("Experiments003 > test3 command body".to_string()), - Some(¶ms.msg), + Some(¶ms.blocking_read().msg), ); Log::flush(); } @@ -439,7 +471,8 @@ async fn test3_body(params : ExecBodyParams) { // } ; // let chnl = params.get_channel().await; - let chnl = params.get_channel().await; + // let chnl = params.get_channel().await; + let chnl = params.blocking_read().get_channel(); dbg!("Custom > within Child Custom fn - after GetChannel"); println!("{:?}",chnl); @@ -461,14 +494,14 @@ async fn test3_body(params : ExecBodyParams) { } - + let params_clone = params_ar.clone(); botlog::debug( format!("RTESTBODY : module - {:?} ; channel - {:?}", module,channel ).as_str(), Some("experiment003 > test3_body".to_string()), - Some(¶ms.msg), + Some(¶ms_clone.read().await.msg), ); @@ -478,8 +511,9 @@ async fn test3_body(params : ExecBodyParams) { module, channel.unwrap(), routine_attr, - Arc::new(RwLock::new(exec_body)), - params.clone() + // Arc::new(RwLock::new(exec_body)), + exec_body, + Arc::new(RwLock::new(params_clone.read().await.clone())) ).await; @@ -520,12 +554,12 @@ async fn test3_body(params : ExecBodyParams) { rsltstr ).as_str(), Some("experiment003 > test3_body".to_string()), - Some(¶ms.msg), + Some(&(params_ar.clone()).read().await.msg), ); Log::flush(); - let bot = Arc::clone(¶ms.bot); + let bot = Arc::clone(¶ms_ar.read().await.bot); let botlock = bot.read().await; @@ -534,9 +568,9 @@ async fn test3_body(params : ExecBodyParams) { .botmgrs .chat .say_in_reply_to( - ¶ms.msg, + ¶ms_ar.read().await.msg, format!("Routine Result : {:?}",rsltstr), - params.clone() + params_clone.read().await.clone() ).await; // [x] Will not be handling JoinHandles here . If immediate abort() handling is required, below is an example that works diff --git a/src/main.rs b/src/main.rs index 866fc38..3a797a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -22,7 +22,14 @@ pub async fn main() { Log::set_retention_days(2); // Log::set_level(Level::Notice); - + // fn innerbody() -> i64 { + // println!("hello"); + // 64 + // } + + // let exec_body:fn() -> i64 = innerbody; + +