diff --git a/src/core/botinstance.rs b/src/core/botinstance.rs index 2872181..1cd1494 100644 --- a/src/core/botinstance.rs +++ b/src/core/botinstance.rs @@ -169,6 +169,7 @@ impl BotInstance { match message { ServerMessage::Notice(msg) => { + dbg!("NOTICE TRIGGERED",msg.clone()); botlog::notice( format!("NOTICE : (#{:?}) {}", msg.channel_login, msg.message_text) .as_str(), @@ -536,6 +537,8 @@ impl BotInstance { Some(msg), ); + dbg!("Running botcommand"); + let a = Arc::clone(&bot); c.execute(ExecBodyParams { bot : a, diff --git a/src/core/chat.rs b/src/core/chat.rs index 7c20cd8..2ff55c3 100644 --- a/src/core/chat.rs +++ b/src/core/chat.rs @@ -110,6 +110,7 @@ impl Chat { // let chat_ar = Arc::new(RwLock::new(chat_clone)); let chat_ar_clone = chat_ar.clone(); + // Spawning a Task that will process the Chat.outqueue tokio::spawn(async move { // loop { // // tokio::spawn(async move { @@ -139,35 +140,66 @@ impl Chat { let r_cc = r_clone.clone(); - loop { + // loop { + // Spawn x helper threads to process messages in the outqueue + for _ in 1..10 { + + dbg!("Chat queue Helper thread generated"); let r_ccc = r_cc.clone(); let chat_c = chat_ar.clone(); tokio::spawn(async move { let r_cc = r_ccc.clone(); - let chat_cc = chat_c.clone(); + // let chat_cc = chat_c.clone(); let guard1 = r_cc.read().await; - match guard1.recv().await { - Ok(a) => { + + while let Ok(a) = guard1.recv().await { + // let r_cc = r_ccc.clone(); + let chat_cc = chat_c.clone(); + // let guard2 = r_cc.read().await; + // Ok(a) => { // [ ] This should spawn it's own ideally, so it does not lock other messages // chat_clone.send_botmsg(a.0, Arc::new(RwLock::new(a.1))).await - let params_ar = Arc::new(RwLock::new(a.1)); - // let chat_clone = chat_cc.clone(); - dbg!(chrono::offset::Local::now(),"Message Spawned > Starting"); - tokio::spawn( async move { - chat_cc.read().await.send_botmsg_inner(a.0, params_ar).await - }); - dbg!(chrono::offset::Local::now(),"Message Spawned > Completed"); - } , - Err(err) => { - dbg!("ISSUE processing Receiver",err); - sleep(Duration::from_millis(10)).await; - } + let params_ar = Arc::new(RwLock::new(a.1)); + // let chat_clone = chat_cc.clone(); + dbg!(chrono::offset::Local::now(),"Message Spawned > Starting"); + tokio::spawn( async move { + dbg!("Message from Chat Queue > Sending"); + chat_cc.read().await.send_botmsg_inner(a.0, params_ar).await; + dbg!("Message from Chat Queue > Sent"); + }); + dbg!(chrono::offset::Local::now(),"Message Spawned > Completed"); + // } , + // Err(err) => { + // dbg!("ISSUE processing Receiver",err); + // sleep(Duration::from_millis(10)).await; + // } } + dbg!("ISSUE : Helper thread observed an Err"); + + // match guard1.recv().await { + // Ok(a) => { + // // [ ] This should spawn it's own ideally, so it does not lock other messages + // // chat_clone.send_botmsg(a.0, Arc::new(RwLock::new(a.1))).await + // let params_ar = Arc::new(RwLock::new(a.1)); + // // let chat_clone = chat_cc.clone(); + // dbg!(chrono::offset::Local::now(),"Message Spawned > Starting"); + // tokio::spawn( async move { + // chat_cc.read().await.send_botmsg_inner(a.0, params_ar).await + // }); + // dbg!(chrono::offset::Local::now(),"Message Spawned > Completed"); + // } , + // Err(err) => { + // dbg!("ISSUE processing Receiver",err); + // sleep(Duration::from_millis(10)).await; + // } + // } + } ); yield_now().await; } + }); // chat @@ -357,6 +389,8 @@ impl Chat { // let params_clone = params.clone(); + dbg!("Send_botmsg_inner > Checking user roles first"); + let botclone = Arc::clone(¶ms.read().await.bot); let botlock = botclone.read().await; let id = botlock.get_identity(); @@ -373,6 +407,8 @@ impl Chat { Some(¶ms.read().await.msg), ); + dbg!("Send_botmsg_inner > Checking user roles first > Prepared to check for roles"); + Log::flush(); // [x] If user has any of the following target roles, they will be allowed - otherwise, they will not be allowed to send @@ -415,6 +451,8 @@ impl Chat { } + dbg!("Send_botmsg_inner > Roles Checked"); + /* At this stage from the above Validations : msginput would be : @@ -551,8 +589,11 @@ impl Chat { // } - + // => [x] 04.02 - Q. Why are we spawning here? => A. Believe this was to try to ensure not blocked tokio::spawn(async move { + + dbg!("Send_botmsg_inner > Within Spawn"); + let chat_clone = chat_ar.clone(); let ratelimiters_clone = ratelimiters.clone(); let rguard = ratelimiters_clone.read().await; @@ -566,16 +607,75 @@ impl Chat { // Continue to check the limiter and sleep if required if the minimum is not reached + + + loop { + // Check first if need to sleep, then sleep if required + dbg!("Send_botmsg_inner > Within Spawn > Before a Write Guard"); + let mut crl_lock = contextratelimiter.write().await; + let sleeptime = if let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()).await { + // dbg!("Send_botmsg_inner > Within Spawn > Drop a Write Guard"); + // drop(crl_lock); + // dbg!("determined to sleep > Sleeping now - ",chrono::offset::Local::now()); + // sleep(Duration::from_secs_f64(sleeptime)).await; + sleeptime + } else { 0.0 }; + + + drop(crl_lock); + + if sleeptime > 0.0 { + dbg!("determined to sleep > Sleeping now - ",chrono::offset::Local::now()); + sleep(Duration::from_secs_f64(sleeptime)).await; + } + + // if told to sleep again, check again before looping for another check + dbg!("Send_botmsg_inner > Within Spawn > Before a Write Guard"); + let mut crl_lock = contextratelimiter.write().await; + if let ratelimiter::LimiterResp::Sleep(_) = crl_lock.check_limiter(outmsg.clone()).await { + // do nothing here + } else { + break; // break out if no longer sleep LimiterResp + }; + dbg!("Send_botmsg_inner > Within Spawn > Drop a Write Guard"); + drop(crl_lock); + } + + /* + // Original Logic + + dbg!("Send_botmsg_inner > Within Spawn > Before a Write Guard"); let mut crl_lock = contextratelimiter.write().await; - while let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()) { + // dbg!("Send_botmsg_inner > Within Spawn > Reading Guard / Before a Write Guard"); + // let mut crl_lock = contextratelimiter.read().await; + // let rslt = crl_lock.check_limiter(outmsg.clone()).await; + + dbg!("before checklimiter sleep",&crl_lock); + // Assign the message here to rate limiter - or just during + + // let a = *(contextratelimiter.read().await); + // let b = Arc::new(a); + // let c = b.check_limiter(inmsg).await; + + while let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()).await { + // while let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()).await { + + dbg!("within checklimiter sleep",&crl_lock); + // dbg!(chrono::offset::Local::now(),"Message Spawned > Completed"); + + // drop(crl_lock); //dropping the lock first dbg!("sleep loop start",chrono::offset::Local::now(), sleeptime); yield_now().await; dbg!("yield in just after loop start",chrono::offset::Local::now()); sleep(Duration::from_secs_f64(sleeptime)).await; + } + */ - match crl_lock.check_limiter(outmsg.clone()) { + let mut crl_lock = contextratelimiter.write().await; + dbg!("after checklimiter sleep",&crl_lock); + match crl_lock.clone().check_limiter(outmsg.clone()).await { ratelimiter::LimiterResp::Allow => { // let maxblanks = rand::thread_rng().gen_range(1..=20); @@ -592,8 +692,8 @@ impl Chat { // drop(blanks_to_add_lock); - dbg!("[ ] PROBLEM AREA - If notificaiton triggered, need this checked"); - dbg!(outmsg.clone()); + dbg!("Area that used to add blanks to out message"); + dbg!("Sending the following messag to chat > ", outmsg.clone()); botlog::trace( &format!("Out Message TO {} >> {}",channel_login.clone(),outmsg.clone()), @@ -614,7 +714,7 @@ impl Chat { } // contextratelimiter.increment_counter(); - crl_lock.sent_msg(outmsg.clone()); + crl_lock.sent_msg(outmsg.clone()).await; let logstr = format!( "(#{}) > {} ; contextratelimiter : {:?}", @@ -642,7 +742,7 @@ impl Chat { // (); // do nothing otherwise } ratelimiter::LimiterResp::Sleep(err_passed_sleep_time) => { - dbg!(err_passed_sleep_time); + dbg!("Rate Limiter returned Sleep unexpetedly",err_passed_sleep_time); // panic!("ISSUE : sleep was already awaited - Should not happen?"); botlog::warn( format!( @@ -657,6 +757,7 @@ impl Chat { drop(crl_lock); + dbg!("Send_botmsg_inner > Within Spawn > After Dropping a Write Guard"); }); diff --git a/src/core/identity.rs b/src/core/identity.rs index 50a55bc..feb6791 100644 --- a/src/core/identity.rs +++ b/src/core/identity.rs @@ -545,6 +545,8 @@ async fn getroles(params : ExecBodyParams) { */ + dbg!("In getroles command"); + let mut argv = params.msg.message_text.split(' '); @@ -554,12 +556,24 @@ async fn getroles(params : ExecBodyParams) { let targetuser = match arg1 { None => { - + dbg!("Expected Exit"); botlog::debug( - "Exittingcmd getroles - Invalid arguments ", + "Exitting cmd getroles - Invalid arguments ", Some("identity.rs > init > getroles()".to_string()), Some(¶ms.msg), ); + + + let botlock = params.bot.read().await; + + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( + "Invalid Arguments".to_string() + ), + Arc::new(RwLock::new(params.clone())), + ).await; + + drop(botlock); + return }, // exit if no arguments @@ -570,12 +584,15 @@ async fn getroles(params : ExecBodyParams) { let targetchnl = arg2; + dbg!("In prior to bot read guard command"); + let botlock = params.bot.read().await; let id = botlock.get_identity(); let idlock = id.read().await; + dbg!("Pror to getspecialuserroles()"); let sproles = match targetchnl { None => { // [ ] If targetchnl is not provided, default to pulling the current channel @@ -669,6 +686,8 @@ async fn getroles(params : ExecBodyParams) { Some(¶ms.msg), ); + dbg!("Pror to getroles send_botmsg"); + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outmsg.to_string() ), @@ -677,6 +696,9 @@ async fn getroles(params : ExecBodyParams) { // [ ] NOTE : After the above, I should receive only the roles in the context of the current channel I received this ideally and maybe BotAdmin ; not outside + + + dbg!("End of getroles command"); } @@ -1544,6 +1566,8 @@ impl IdentityManager { Note : Ideally this be called for a given chatter name ? */ + dbg!("In getspecialuserroles()"); + // [ ] !!! TODO: I don't think below is evaluating by given channel botlog::debug( &format!( @@ -1575,6 +1599,9 @@ impl IdentityManager { }; + + dbg!("Before reading Vector Roles"); + let rolesdb = Arc::clone(&self.special_roles_users); let rolesdb_lock = rolesdb.read().await; @@ -1630,6 +1657,8 @@ impl IdentityManager { } } + dbg!("exit getspecialusreroles with",&evalsproles); + botlog::debug( &format!("OUT > evalsproles {:?}", &evalsproles), Some("IdentityManager > getspecialuserroles()".to_string()), diff --git a/src/core/ratelimiter.rs b/src/core/ratelimiter.rs index 3a6b95d..dcd4f08 100644 --- a/src/core/ratelimiter.rs +++ b/src/core/ratelimiter.rs @@ -6,7 +6,9 @@ const TIME_MIN_S_F64: f64 = 1.0; const MSG_THRESHOLD: u32 = 20; const DUPMSG_MIN_SEND_S: f64 = 30.0; -use std::time::Instant; +use std::{sync::Arc, time::Instant}; +use tokio::sync::RwLock; + use crate::core::botlog; #[derive(Debug, Clone)] @@ -42,14 +44,15 @@ impl RateLimiter { } fn dupmsg(&self,inmsg : String) -> bool { - if self.lastmsgdup == inmsg { + dbg!("dubmsg()",&inmsg,&self.lastmsgdup); + if self.lastmsgdup == inmsg && self.lastmsgtimer.elapsed().as_secs_f64() < DUPMSG_MIN_SEND_S { // duplicate detected // self.lastmsgdup = String::new(); // update it as no longer duplicate return true; } else { return false; }; } - pub fn check_limiter(&mut self,inmsg : String) -> LimiterResp { + pub async fn check_limiter(&mut self,inmsg : String) -> LimiterResp { let logstr = format!( @@ -74,42 +77,67 @@ impl RateLimiter { >> LimiterResp::Sleep(TIME_MIN_S_F64 - self.lastmsgtimer.elapsed().as_secs_f64() + 0.1) */ + // let ratelimiter_ar = Arc::new(RwLock::new((*self).clone())); + let ratelimiter_ar = Arc::new(RwLock::new(self)); + let mut rl_guard = ratelimiter_ar.write().await; - let rsp = if self.timer.elapsed().as_secs() >= TIME_THRESHOLD_S { - self.timer = Instant::now(); - self.msgcounter = 0; + let rsp = if rl_guard.timer.elapsed().as_secs() >= TIME_THRESHOLD_S { + rl_guard.timer = Instant::now(); + rl_guard.msgcounter = 0; LimiterResp::Allow - } else if self.msgcounter < MSG_THRESHOLD && - self.lastmsgtimer.elapsed().as_secs_f64() >= TIME_MIN_S_F64 { + } else if rl_guard.msgcounter < MSG_THRESHOLD && + rl_guard.lastmsgtimer.elapsed().as_secs_f64() >= TIME_MIN_S_F64 { LimiterResp::Allow } else { // when elapsed() < TIME_THRESHOLD_S && msgcounter >= MSG_THRESHOLD // LimiterResp::Skip - LimiterResp::Sleep(TIME_MIN_S_F64 - self.lastmsgtimer.elapsed().as_secs_f64() + 0.1) + LimiterResp::Sleep(TIME_MIN_S_F64 - rl_guard.lastmsgtimer.elapsed().as_secs_f64() + 0.1) }; // [ ] If rsp is still allow at this point, also do a dupcheck to adjust rsp to a sleep - + dbg!(inmsg.clone()); - dbg!(self.dupmsg(inmsg.clone())); + dbg!(rl_guard.dupmsg(inmsg.clone())); dbg!("Before Dup test",rsp.clone()); dbg!(matches!(rsp.clone(),LimiterResp::Allow)); + - let rsp = if self.dupmsg(inmsg) && matches!(rsp,LimiterResp::Allow) { + let rsp = if rl_guard.dupmsg(inmsg.clone()) && matches!(rsp,LimiterResp::Allow) { //self.lastmsgdup = String::new(); // LimiterResp::Sleep(DUPMSG_MIN_SEND_S) - LimiterResp::Sleep(DUPMSG_MIN_SEND_S - self.lastmsgtimer.elapsed().as_secs_f64() + 0.1) + dbg!("Duplicate detected"); + LimiterResp::Sleep(DUPMSG_MIN_SEND_S - rl_guard.lastmsgtimer.elapsed().as_secs_f64() + 0.1) } else { rsp.clone() }; + + // => 04.02 - Don't update here + // // After Dup is checked, Set the lastmsgdup to latest message + // dbg!("Before Assigning Lastmsgdup",rl_guard.lastmsgdup.clone(),inmsg.clone(),rsp.clone()); + // rl_guard.lastmsgdup = inmsg.clone(); + // dbg!("After Assigning Lastmsgdup",rl_guard.lastmsgdup.clone(),inmsg.clone()); + + + // [ ] Allows if sleep came up as negative + let rsp = match rsp { + LimiterResp::Sleep(sleeptime) if sleeptime < 0.0 => LimiterResp::Allow , // allow is sleeptime evaluated is negative + _ => rsp.clone(), + } ; + + dbg!(rsp.clone()); botlog::trace( &format!("Limiter Response : {:?} ; Elapsed (as_sec_f64) : {}", - rsp, self.lastmsgtimer.elapsed().as_secs_f64()), + rsp, rl_guard.lastmsgtimer.elapsed().as_secs_f64()), Some("Rate Limiter Inner".to_string()), None, ); + dbg!("check_limiter() > ratelimiter guard",rl_guard.clone()); + + drop(rl_guard); + + rsp } @@ -119,9 +147,21 @@ impl RateLimiter { // self.lastmsgtimer = Instant::now(); // } - pub fn sent_msg(&mut self,msgstr : String) { + pub async fn sent_msg(&mut self,msgstr : String) { + + dbg!("sent_msg triggered",&msgstr); + + // let ratelimiter_ar = Arc::new(RwLock::new((*self).clone())); + // let mut rl_guard = ratelimiter_ar.write().await; + + dbg!("sent_msg > lastmsg before update",&self.lastmsgdup); + self.lastmsgdup = msgstr; self.msgcounter += 1; self.lastmsgtimer = Instant::now(); + + dbg!("sent_msg > lastmsg after update",&self.lastmsgdup); + + // drop(rl_guard); } }