diff --git a/Cargo.lock b/Cargo.lock index 296b771..a733c32 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,6 +47,19 @@ version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" +[[package]] +name = "async-channel" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28243a43d821d11341ab73c80bed182dc015c514b951616cf79bd4af39af0c3" +dependencies = [ + "concurrent-queue", + "event-listener", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-recursion" version = "1.1.0" @@ -323,6 +336,15 @@ dependencies = [ "tracing-error", ] +[[package]] +name = "concurrent-queue" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console-api" version = "0.6.0" @@ -486,6 +508,27 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "event-listener" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b5fb89194fa3cad959b833185b3063ba881dbfc7030680b314250779fb4cc91" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332f51cb23d20b0de8458b86580878211da09bcd4503cb579c225b3d124cabb3" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.12" @@ -522,6 +565,7 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" name = "forcebot_rs" version = "0.1.0" dependencies = [ + "async-channel", "async-recursion", "async-trait", "casual_logger", @@ -1122,6 +1166,12 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "parking_lot" version = "0.12.1" diff --git a/Cargo.toml b/Cargo.toml index 0d96343..7b370e6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ casual_logger = "0.6.5" chrono = "0.4.35" tokio-console = "0.1.10" console-subscriber = "0.2.0" +async-channel = "2.2.0" [lib] diff --git a/src/core/botinstance.rs b/src/core/botinstance.rs index d17f8c8..7198d01 100644 --- a/src/core/botinstance.rs +++ b/src/core/botinstance.rs @@ -43,12 +43,12 @@ use super::botmodules::StatusType; #[derive(Clone, Debug)] pub struct BotManagers { pub identity: Arc>, - pub chat: Chat, + pub chat: Arc>, } impl BotManagers { pub fn init( - ratelimiters: HashMap, + ratelimiters: HashMap>>, client: TwitchIRCClient, StaticLoginCredentials>, ) -> BotManagers { BotManagers { @@ -116,7 +116,7 @@ impl BotInstance { client.join(chnl.to_owned()).unwrap(); - let n = RateLimiter::new(); + let n = Arc::new(RwLock::new(RateLimiter::new())); ratelimiters.insert(Channel(String::from(chnl)), n); } @@ -169,6 +169,7 @@ impl BotInstance { match message { ServerMessage::Notice(msg) => { + dbg!("NOTICE TRIGGERED",msg.clone()); botlog::notice( format!("NOTICE : (#{:?}) {}", msg.channel_login, msg.message_text) .as_str(), @@ -201,7 +202,18 @@ impl BotInstance { ); Log::flush(); - BotInstance::listener_main_prvmsg(Arc::clone(&bot), &msg).await; + let botarc = Arc::clone(&bot); + // let msgarc = Arc::new(RwLock::new(msg.clone())); + + let msgtext = msg.message_text.clone(); + + dbg!("will spawn listener_main_prvmsg from runner",chrono::offset::Local::now(),msgtext.clone()); + tokio::spawn( async move { + // let msgarc_c = msgarc.clone(); + BotInstance::listener_main_prvmsg(botarc, &msg.clone()).await; + }); + // BotInstance::listener_main_prvmsg(Arc::clone(&bot), &msg).await; + dbg!("completed spawn listener_main_prvmsg from runner",chrono::offset::Local::now(), msgtext); } ServerMessage::Whisper(msg) => { botlog::debug( @@ -367,13 +379,13 @@ impl BotInstance { Channel(msg.channel_login.to_string())).await; - if let StatusType::Disabled(a) = modstatus { + if let StatusType::Disabled(statuslvl) = modstatus { // [x] Should only respond if a BotAdmin , Mod , SupMod , BroadCaster // - Specifically it should respond only to those who may be able to enable the module botlog::trace( - &format!("Identified cmd is associated with Disabled Module : StatusLvl = {:?}", a), + &format!("Identified cmd is associated with Disabled Module : StatusLvl = {:?}", statuslvl), Some("BotInstance > listener_main_prvmsg()".to_string()), Some(msg), ); @@ -397,9 +409,10 @@ impl BotInstance { ); // Only respond to those with th ebelow User Roles + // [x] Need to add in Module as well let outstr = - format!("sadg Module is disabled : {:?}",a); + format!("sadg Module is disabled : {:?} {:?}",c.module.clone(),statuslvl); let params = ExecBodyParams { @@ -409,9 +422,11 @@ impl BotInstance { curr_act : Some(Arc::clone(&act_clone)), }; + let params = Arc::new(RwLock::new(params)); + // When sending a BotMsgTypeNotif, send_botmsg does Roles related validation as required - botlock.botmgrs.chat.send_botmsg(super::chat::BotMsgType::Notif( + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outstr ), params, @@ -468,7 +483,10 @@ impl BotInstance { }; - botlock.botmgrs.chat.send_botmsg(super::chat::BotMsgType::Notif( + + let params = Arc::new(RwLock::new(params)); + + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outstr.to_string() ), params.clone(), @@ -499,7 +517,10 @@ impl BotInstance { }; - botlock.botmgrs.chat.send_botmsg(super::chat::BotMsgType::Notif( + + let params = Arc::new(RwLock::new(params)); + + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outstr.to_string() ), params.clone(), @@ -516,6 +537,8 @@ impl BotInstance { Some(msg), ); + dbg!("Running botcommand"); + let a = Arc::clone(&bot); c.execute(ExecBodyParams { bot : a, diff --git a/src/core/botmodules.rs b/src/core/botmodules.rs index fcc731e..a56e5e9 100644 --- a/src/core/botmodules.rs +++ b/src/core/botmodules.rs @@ -193,10 +193,10 @@ pub async fn init(mgr: Arc) { let botclone = Arc::clone(&bot); let botlock = botclone.read().await; - botlock.botmgrs.chat.send_botmsg(super::chat::BotMsgType::Notif( + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outmsg.to_string() ), - params.clone(), + Arc::new(RwLock::new(params.clone())), ).await; return; @@ -236,10 +236,10 @@ pub async fn init(mgr: Arc) { ChangeResult::Success(a) => format!("YAAY Success : {}",a), }; - botlock.botmgrs.chat.send_botmsg(super::chat::BotMsgType::Notif( + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outmsg.to_string() ), - params.clone(), + Arc::new(RwLock::new(params.clone())), ).await; @@ -374,10 +374,10 @@ pub async fn init(mgr: Arc) { // We should call a notification around here - botlock.botmgrs.chat.send_botmsg(super::chat::BotMsgType::Notif( + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outmsg.to_string() ), - params.clone(), + Arc::new(RwLock::new(params.clone())), ).await; return; @@ -421,10 +421,10 @@ pub async fn init(mgr: Arc) { // We should call a notification around here - botlock.botmgrs.chat.send_botmsg(super::chat::BotMsgType::Notif( + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outmsg.to_string() ), - params.clone(), + Arc::new(RwLock::new(params.clone())), ).await; } diff --git a/src/core/chat.rs b/src/core/chat.rs index 275ef3b..ca70c0a 100644 --- a/src/core/chat.rs +++ b/src/core/chat.rs @@ -1,8 +1,13 @@ use std::collections::HashMap; +use std::i8; use std::sync::Arc; +use std::num::{self, Saturating, Wrapping}; +use std::time::Instant; -use tokio::sync::Mutex; +use chrono::NaiveDateTime; +use tokio::sync::{Mutex, RwLock}; +use tokio::task::yield_now; use twitch_irc::login::StaticLoginCredentials; use twitch_irc::message::PrivmsgMessage; use twitch_irc::transport::tcp::{TCPTransport, TLS}; @@ -27,16 +32,54 @@ use super::identity; use async_recursion::async_recursion; +use async_channel::{self, bounded, Receiver, Sender}; + + + +// pub mod chat_util { + +// use super::*; + +// use std::boxed::Box; +// use std::future::Future; +// use std::pin::Pin; + +// // pub type MsgFuture = Box< +// // dyn Fn(BotMsgType,ExecBodyParams) -> Pin + Send>> + Send + Sync, +// // >; + +// // pub fn msgbox(f: fn(BotMsgType,ExecBodyParams) -> T) -> MsgFuture +// // where +// // T: Future + Send + 'static, +// // { +// // Box::new(move |a,b| Box::pin(f(a,b))) +// // } + +// pub type MsgFuture = Pin + Send + 'static>>; + +// } + + + #[derive(Clone, Debug)] pub struct Chat { - pub ratelimiters: Arc>>, // used to limit messages sent per channel + // pub ratelimiters: Arc>>, // used to limit messages sent per channel + pub ratelimiters: Arc>>>>, // used to limit messages sent per channel pub client: TwitchIRCClient, StaticLoginCredentials>, + // outqueue : Arc>> + outqueue : (Sender<(BotMsgType,ExecBodyParams)>, + Arc>>), + // https://doc.rust-lang.org/std/num/struct.Saturating.html + // spaceiter : Arc>>, + spaceiter : Arc>, } #[derive(Clone,Debug)] -pub enum BotMsgType<'a> { - SayInReplyTo(&'a PrivmsgMessage,String), +pub enum BotMsgType { + // SayInReplyTo(&'a PrivmsgMessage,String), + SayInReplyTo(Arc,String), Say(String,String), Notif(String), // For Bot Sent Notifications } @@ -44,22 +87,139 @@ pub enum BotMsgType<'a> { impl Chat { pub fn init( - ratelimiters: HashMap, + ratelimiters: HashMap>>, client: TwitchIRCClient, StaticLoginCredentials>, - ) -> Chat { - Chat { - ratelimiters: Arc::new(Mutex::new(ratelimiters)), + ) -> Arc> { + + let (s,r) = bounded(100); + + let r = Arc::new(RwLock::new(r)); + + let spaceiter = Arc::new(Mutex::new(true)); + + + let chat = Chat { + ratelimiters: Arc::new(RwLock::new(ratelimiters)), client, - } + // outqueue : Arc::new(RwLock::new(vec![])), + outqueue : (s,r.clone()), + spaceiter , + }; + + let chat_ar = Arc::new(RwLock::new(chat)); + + let r_clone = r.clone(); + + // let chat_ar = Arc::new(RwLock::new(chat_clone)); + let chat_ar_clone = chat_ar.clone(); + + // Spawning a Task that will process the Chat.outqueue + tokio::spawn(async move { + // loop { + // // tokio::spawn(async move { + // // let r_clone = r_clone.clone(); + + // let r_clone = r_clone.clone(); + // let guard1 = r_clone.read().await; + // match guard1.recv().await { + // Ok(a) => { + // // [ ] This should spawn it's own ideally, so it does not lock other messages + // // chat_clone.send_botmsg(a.0, Arc::new(RwLock::new(a.1))).await + // let params_ar = Arc::new(RwLock::new(a.1)); + // let chat_clone = chat_clone.clone(); + // dbg!(chrono::offset::Local::now(),"Message Spawned > Starting"); + // tokio::spawn( async move { + // chat_clone.send_botmsg_inner(a.0, params_ar).await + // }); + // dbg!(chrono::offset::Local::now(),"Message Spawned > Completed"); + // } , + // Err(err) => { + // dbg!("ISSUE processing Receiver",err); + // sleep(Duration::from_millis(10)).await; + // } + // } + // // }); + // } + + let r_cc = r_clone.clone(); + + // loop { + // Spawn x helper threads to process messages in the outqueue + for _ in 1..5 { + + dbg!("Chat queue Helper thread generated"); + let r_ccc = r_cc.clone(); + let chat_c = chat_ar.clone(); + tokio::spawn(async move { + let r_cc = r_ccc.clone(); + // let chat_cc = chat_c.clone(); + let guard1 = r_cc.read().await; + + while let Ok(a) = guard1.recv().await { + // let r_cc = r_ccc.clone(); + let chat_cc = chat_c.clone(); + // let guard2 = r_cc.read().await; + // Ok(a) => { + // [ ] This should spawn it's own ideally, so it does not lock other messages + // chat_clone.send_botmsg(a.0, Arc::new(RwLock::new(a.1))).await + let params_ar = Arc::new(RwLock::new(a.1)); + // let chat_clone = chat_cc.clone(); + dbg!(chrono::offset::Local::now(),"Message Spawned > Starting"); + tokio::spawn( async move { + dbg!("Message from Chat Queue > Sending"); + chat_cc.read().await.send_botmsg_inner(a.0, params_ar).await; + dbg!("Message from Chat Queue > Sent"); + }); + dbg!(chrono::offset::Local::now(),"Message Spawned > Completed"); + // } , + // Err(err) => { + // dbg!("ISSUE processing Receiver",err); + // sleep(Duration::from_millis(10)).await; + // } + } + dbg!("ISSUE : Helper thread observed an Err"); + + // match guard1.recv().await { + // Ok(a) => { + // // [ ] This should spawn it's own ideally, so it does not lock other messages + // // chat_clone.send_botmsg(a.0, Arc::new(RwLock::new(a.1))).await + // let params_ar = Arc::new(RwLock::new(a.1)); + // // let chat_clone = chat_cc.clone(); + // dbg!(chrono::offset::Local::now(),"Message Spawned > Starting"); + // tokio::spawn( async move { + // chat_cc.read().await.send_botmsg_inner(a.0, params_ar).await + // }); + // dbg!(chrono::offset::Local::now(),"Message Spawned > Completed"); + // } , + // Err(err) => { + // dbg!("ISSUE processing Receiver",err); + // sleep(Duration::from_millis(10)).await; + // } + // } + + } + ); + yield_now().await; + + } + + }); + + // chat + chat_ar_clone } pub async fn init_channel(&mut self, chnl: Channel) { - let n = RateLimiter::new(); - self.ratelimiters.lock().await.insert(chnl, n); + let n = Arc::new(RwLock::new(RateLimiter::new())); + + let mut rguard = self.ratelimiters.write().await; + rguard.insert(chnl, n); + drop(rguard); } #[async_recursion] - pub async fn send_botmsg(&self, msginput: BotMsgType<'async_recursion>, params : ExecBodyParams) { + // pub async fn send_botmsg(&self, msginput: BotMsgType<'async_recursion>, params : ExecBodyParams) { + pub async fn send_botmsg_inner(&self, msginput: BotMsgType, params : Arc>) { @@ -72,15 +232,17 @@ impl Chat { */ + let chat_ar = Arc::new(RwLock::new((*self).clone())); + botlog::trace( format!("send_bot_msg params : {:?}",msginput).as_str(), Some("chat.rs > send_botmsg ".to_string()), - Some(¶ms.msg), + Some(¶ms.read().await.msg), ); Log::flush(); - let (channel_login,mut outmsg) = match msginput.clone() { + let (channel_login,outmsg) = match msginput.clone() { BotMsgType::SayInReplyTo(msg, outmsg) => { (msg.channel_login.clone(),outmsg) }, @@ -88,7 +250,7 @@ impl Chat { (a.clone(),b.clone()) }, BotMsgType::Notif(outmsg) => { - (params.msg.channel_login.clone(),outmsg) + (params.read().await.msg.channel_login.clone(),outmsg) } }; @@ -98,13 +260,13 @@ impl Chat { botlog::trace( "BEFORE parent_module call", Some("chat.rs > send_botmsg ".to_string()), - Some(¶ms.msg), + Some(¶ms.read().await.msg), ); - let parent_module = params.get_module().await; + let parent_module = params.read().await.get_module().await; let params_clone = params.clone(); - let botclone = Arc::clone(¶ms_clone.bot); + let botclone = Arc::clone(¶ms_clone.read().await.bot); let botlock = botclone.read().await; let modmgr = Arc::clone(&botlock.botmodules); let modstatus = (*modmgr).modstatus( @@ -113,7 +275,7 @@ impl Chat { Channel(channel_login.clone()) ).await; - if !params.bot.read().await.bot_channels.contains(&Channel(channel_login.clone())) { + if !params.read().await.bot.read().await.bot_channels.contains(&Channel(channel_login.clone())) { botlog::warn( &format!("A message attempted to send for a Non-Joined Channel : {}",channel_login.clone()), Some("Chat > send_botmsg".to_string()), @@ -122,7 +284,7 @@ impl Chat { if let BotMsgType::SayInReplyTo(_prvmsg,_outmsg) = msginput { - self.send_botmsg(BotMsgType::Notif( + chat_ar.read().await.send_botmsg(BotMsgType::Notif( "uuh Bot can't send to a channel it isn't joined".to_string(), ), params).await; @@ -147,7 +309,7 @@ impl Chat { botlog::trace( format!("BEFORE modstatus check : modstatus = {:?}",modstatus).as_str(), Some("chat.rs > send_botmsg ".to_string()), - Some(¶ms.msg), + Some(¶ms.read().await.msg), ); @@ -161,7 +323,7 @@ impl Chat { botlog::trace( "BEFORE msginput check", Some("chat.rs > send_botmsg ".to_string()), - Some(¶ms.msg), + Some(¶ms.read().await.msg), ); Log::flush(); @@ -173,7 +335,7 @@ impl Chat { botlog::trace( "BEFORE potential Async recursion", Some("chat.rs > send_botmsg ".to_string()), - Some(¶ms.clone().msg), + Some(¶ms.read().await.clone().msg), ); Log::flush(); @@ -193,7 +355,7 @@ impl Chat { botlog::trace( "AFTER potential Async recursion", Some("chat.rs > send_botmsg ".to_string()), - Some(¶ms.msg), + Some(¶ms.read().await.msg), ); @@ -230,22 +392,26 @@ impl Chat { // let params_clone = params.clone(); - let botclone = Arc::clone(¶ms.bot); + dbg!("Send_botmsg_inner > Checking user roles first"); + + let botclone = Arc::clone(¶ms.read().await.bot); let botlock = botclone.read().await; let id = botlock.get_identity(); let id = Arc::clone(&id); let idlock = id.read().await; // <-- [x] 03.24 - seems to work let user_roles = idlock.getspecialuserroles( - params.get_sender(), + params.read().await.get_sender(), Some(Channel(channel_login.clone())) ).await; botlog::trace( format!("BEFORE user roles check check : userroles = {:?}",user_roles).as_str(), Some("chat.rs > send_botmsg ".to_string()), - Some(¶ms.msg), + Some(¶ms.read().await.msg), ); + dbg!("Send_botmsg_inner > Checking user roles first > Prepared to check for roles"); + Log::flush(); // [x] If user has any of the following target roles, they will be allowed - otherwise, they will not be allowed to send @@ -288,6 +454,8 @@ impl Chat { } + dbg!("Send_botmsg_inner > Roles Checked"); + /* At this stage from the above Validations : msginput would be : @@ -312,8 +480,11 @@ impl Chat { */ - let rl = Arc::clone(&self.ratelimiters); - let mut rllock = rl.lock().await; + // let rl = chat_ar.read().await.ratelimiters.clone(); + // // let rl = Arc::clone(&chat_ar.clone().read().await.ratelimiters); + // // let mut rllock = rl.read().await; + // let rllock = rl.write().await; + // // let mut hmap = (*rllock).clone(); botlog::debug( &format!("Ratelimiter being checked for channel : {}",channel_login.clone()), @@ -321,141 +492,600 @@ impl Chat { None, ); - let contextratelimiter = rllock - // .get_mut() - .get_mut(&Channel(channel_login.to_lowercase().clone())) - .expect("ERROR: Issue with Rate limiters"); + // let rllock_clone = Arc::new(RwLock::new((*rllock).clone())); + // let guard = rllock_clone.read().await; + // let contextratelimiter = (*guard).clone() + // // .get_mut() + // .get_mut(&Channel(channel_login.to_lowercase().clone())) + // .expect("ERROR: Issue with Rate limiters"); + // // drop(guard); - // Continue to check the limiter and sleep if required if the minimum is not reached - while let ratelimiter::LimiterResp::Sleep(sleeptime) = contextratelimiter.check_limiter() { - sleep(Duration::from_secs_f64(sleeptime)).await; - } + // let a = rllock.get_mut(&Channel(channel_login.to_lowercase().clone())).expect("ERROR: Issue with Rate limiters"); - match contextratelimiter.check_limiter() { - ratelimiter::LimiterResp::Allow => { - let maxblanks = rand::thread_rng().gen_range(1..=20); + let ratelimiters = chat_ar.read().await.ratelimiters.clone(); - for _i in 1..maxblanks { - let blankspace: &str = "󠀀"; - outmsg.push_str(blankspace); - } + // let contextratelimiter = Arc::new(RwLock::new(contextratelimiter)); - match msginput.clone() { - BotMsgType::SayInReplyTo(msg, _) => { - self.client.say_in_reply_to(msg, outmsg).await.unwrap(); - }, - BotMsgType::Say(a, _) => { - self.client.say(a, outmsg).await.unwrap(); - } - BotMsgType::Notif(outmsg) => { - self.client.say_in_reply_to(¶ms.msg, outmsg).await.unwrap(); - } - } + // // Continue to check the limiter and sleep if required if the minimum is not reached + // while let ratelimiter::LimiterResp::Sleep(sleeptime) = contextratelimiter.check_limiter(outmsg.clone()) { + // sleep(Duration::from_secs_f64(sleeptime)).await; + // } + + // match contextratelimiter.check_limiter(outmsg.clone()) { + // ratelimiter::LimiterResp::Allow => { + // // let maxblanks = rand::thread_rng().gen_range(1..=20); + + // // let mut blanks_to_add_lock = self.spaceiter.lock().await; + // // (*blanks_to_add_lock).0 += 1; + + + // // for _i in 1..(*blanks_to_add_lock).0 { + // // // let blankspace: &str = " 󠀀"; + // // // let blankspace: &str = " ‎ ."; + // // let blankspace: &str = "."; + // // outmsg.push_str(blankspace); + // // } + + // // drop(blanks_to_add_lock); + + // dbg!("[ ] PROBLEM AREA - If notificaiton triggered, need this checked"); + // dbg!(outmsg.clone()); + + // botlog::trace( + // &format!("Out Message TO {} >> {}",channel_login.clone(),outmsg.clone()), + // Some("Chat > send_botmsg".to_string()), + // None, + // ); + + // match msginput.clone() { + // BotMsgType::SayInReplyTo(msg, _) => { + // self.client.say_in_reply_to(&(*msg), outmsg.clone()).await.unwrap(); + // }, + // BotMsgType::Say(a, _) => { + // self.client.say(a, outmsg.clone()).await.unwrap(); + // } + // BotMsgType::Notif(outmsg) => { + // self.client.say_in_reply_to(¶ms.read().await.msg, outmsg.clone()).await.unwrap(); + // } + // } - contextratelimiter.increment_counter(); + // // contextratelimiter.increment_counter(); + // contextratelimiter.sent_msg(outmsg.clone()); - let logstr = format!( - "(#{}) > {} ; contextratelimiter : {:?}", - channel_login.clone(), "rate limit counter increase", contextratelimiter - ); + // let logstr = format!( + // "(#{}) > {} ; contextratelimiter : {:?}", + // channel_login.clone(), "rate limit counter increase", contextratelimiter + // ); - if let BotMsgType::SayInReplyTo(msg,_ ) = msginput { - botlog::trace( - logstr.as_str(), - Some("Chat > send_botmsg".to_string()), - Some(msg), - ); + // if let BotMsgType::SayInReplyTo(msg,_ ) = msginput { + // botlog::trace( + // logstr.as_str(), + // Some("Chat > send_botmsg".to_string()), + // Some(&(*msg)), + // ); + // } else { + // botlog::trace( + // logstr.as_str(), + // Some("Chat > send_botmsg".to_string()), + // None, + // ); + // } + + + + // } + // ratelimiter::LimiterResp::Skip => { + // // (); // do nothing otherwise + // } + // ratelimiter::LimiterResp::Sleep(err_passed_sleep_time) => { + // dbg!(err_passed_sleep_time); + // // panic!("ISSUE : sleep was already awaited - Should not happen?"); + // botlog::warn( + // format!( + // "Warning : RATE LIMITERS returned Sleep Unexpectedly . This should not occur often . Rate Limiter passed Sleep({})", + // err_passed_sleep_time + // ).as_str(), + // Some("Chat > send_botmsg".to_string()), + // None, + // ); + // } + // } + + + // => [x] 04.02 - Q. Why are we spawning here? => A. Believe this was to try to ensure not blocked + tokio::spawn(async move { + + dbg!("Send_botmsg_inner > Within Spawn"); + + let chat_clone = chat_ar.clone(); + let ratelimiters_clone = ratelimiters.clone(); + let rguard = ratelimiters_clone.read().await; + let mut rclone = (*rguard).clone(); + drop(rguard); + let contextratelimiter = (rclone + .get_mut(&Channel(channel_login.to_lowercase().clone())) + .expect("ERROR: Issue with Rate limiters")).clone(); + + // let mut contextlimiter_guard = contextratelimiter.write().await; + + // Continue to check the limiter and sleep if required if the minimum is not reached + + + + loop { + // Check first if need to sleep, then sleep if required + dbg!("Send_botmsg_inner > Within Spawn > Before a Write Guard"); + let mut crl_lock = contextratelimiter.write().await; + let sleeptime = if let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()).await { + // dbg!("Send_botmsg_inner > Within Spawn > Drop a Write Guard"); + // drop(crl_lock); + // dbg!("determined to sleep > Sleeping now - ",chrono::offset::Local::now()); + // sleep(Duration::from_secs_f64(sleeptime)).await; + sleeptime + } else { 0.0 }; + + + drop(crl_lock); + + if sleeptime > 0.0 { + dbg!("determined to sleep > Sleeping now - ",chrono::offset::Local::now()); + sleep(Duration::from_secs_f64(sleeptime)).await; + } + + // if told to sleep again, check again before looping for another check + dbg!("Send_botmsg_inner > Within Spawn > Before a Write Guard"); + let mut crl_lock = contextratelimiter.write().await; + if let ratelimiter::LimiterResp::Sleep(_) = crl_lock.check_limiter(outmsg.clone()).await { + // do nothing here } else { + break; // break out if no longer sleep LimiterResp + }; + dbg!("Send_botmsg_inner > Within Spawn > Drop a Write Guard"); + drop(crl_lock); + } + + /* + // Original Logic + + dbg!("Send_botmsg_inner > Within Spawn > Before a Write Guard"); + let mut crl_lock = contextratelimiter.write().await; + // dbg!("Send_botmsg_inner > Within Spawn > Reading Guard / Before a Write Guard"); + // let mut crl_lock = contextratelimiter.read().await; + // let rslt = crl_lock.check_limiter(outmsg.clone()).await; + + dbg!("before checklimiter sleep",&crl_lock); + // Assign the message here to rate limiter - or just during + + // let a = *(contextratelimiter.read().await); + // let b = Arc::new(a); + // let c = b.check_limiter(inmsg).await; + + while let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()).await { + // while let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()).await { + + dbg!("within checklimiter sleep",&crl_lock); + + // dbg!(chrono::offset::Local::now(),"Message Spawned > Completed"); + + // drop(crl_lock); //dropping the lock first + dbg!("sleep loop start",chrono::offset::Local::now(), sleeptime); + yield_now().await; + dbg!("yield in just after loop start",chrono::offset::Local::now()); + sleep(Duration::from_secs_f64(sleeptime)).await; + + } + */ + + let mut crl_lock = contextratelimiter.write().await; + dbg!("after checklimiter sleep",&crl_lock); + match crl_lock.clone().check_limiter(outmsg.clone()).await { + ratelimiter::LimiterResp::Allow => { + // let maxblanks = rand::thread_rng().gen_range(1..=20); + + let chat_guard = chat_clone.read().await; + + let mut blanks_to_add_lock = chat_guard.spaceiter.lock().await; + + dbg!("Before blanks to add adjusted",*blanks_to_add_lock); + + // *blanks_to_add_lock += 1; + *blanks_to_add_lock = if *blanks_to_add_lock { false } else { true }; + + dbg!("After blanks to add adjusted",*blanks_to_add_lock); + + + let mut outmsg = outmsg; + // for i in 0..*blanks_to_add_lock+1 { + // dbg!("Processing blank spaces : ",i,*blanks_to_add_lock); + // // let blankspace: &str = " 󠀀"; + // // let blankspace: &str = " ‎ ."; + // if i > 0 { + // // let blankspace: &str = " \u{e0000}"; + // let blankspace: &str = " "; + // outmsg.push_str(blankspace); + // dbg!("Added 'blank' string",outmsg.clone(),blankspace); + // } + // }; + + // let blankspace: &str = " "; + // let (firstword,restofstr) = outmsg.split_once(' '); + outmsg = if let Some((firstword,restofstr)) = outmsg.split_once(' ') { + if *blanks_to_add_lock { format!("{} {}\u{e0000}",firstword,restofstr,) } + else { format!("{} {}",firstword,restofstr) } + } else { outmsg.clone() } ; + + + + drop(blanks_to_add_lock); + + dbg!("Area that used to add blanks to out message"); + dbg!("OUT MESSAGE TO CHAT > ", chrono::offset::Local::now(), outmsg.clone()); + botlog::trace( - logstr.as_str(), + &format!("Out Message TO {} >> {}",channel_login.clone(),outmsg.clone()), Some("Chat > send_botmsg".to_string()), None, ); + + match msginput.clone() { + BotMsgType::SayInReplyTo(msg, _) => { + chat_clone.read().await.client.say_in_reply_to(&(*msg), outmsg.clone()).await.unwrap(); + }, + BotMsgType::Say(a, _) => { + chat_clone.read().await.client.say(a, outmsg.clone()).await.unwrap(); + } + BotMsgType::Notif(_) => { + chat_clone.read().await.client.say_in_reply_to(¶ms.read().await.msg, outmsg.clone()).await.unwrap(); + } + } + + // contextratelimiter.increment_counter(); + crl_lock.sent_msg(outmsg.clone()).await; + + let logstr = format!( + "(#{}) > {} ; contextratelimiter : {:?}", + channel_login.clone(), "rate limit counter increase", crl_lock + ); + + if let BotMsgType::SayInReplyTo(msg,_ ) = msginput { + botlog::trace( + logstr.as_str(), + Some("Chat > send_botmsg".to_string()), + Some(&(*msg)), + ); + } else { + botlog::trace( + logstr.as_str(), + Some("Chat > send_botmsg".to_string()), + None, + ); + } + + + } + ratelimiter::LimiterResp::Skip => { + // (); // do nothing otherwise + } + ratelimiter::LimiterResp::Sleep(err_passed_sleep_time) => { + dbg!("Rate Limiter returned Sleep unexpetedly",err_passed_sleep_time); + // panic!("ISSUE : sleep was already awaited - Should not happen?"); + botlog::warn( + format!( + "Warning : RATE LIMITERS returned Sleep Unexpectedly . This should not occur often . Rate Limiter passed Sleep({})", + err_passed_sleep_time + ).as_str(), + Some("Chat > send_botmsg".to_string()), + None, + ); + dbg!("Rate Limiter returned Sleep unexpetedly > Re-enqueueing message"); + // [x] Need to Re-Enqueue the message , but needs to be controlled + chat_clone.read().await.send_botmsg_inner(msginput.clone(),params).await; + } + } - } - ratelimiter::LimiterResp::Skip => { - // (); // do nothing otherwise - } - ratelimiter::LimiterResp::Sleep(_) => { - panic!("ISSUE : sleep was already awaited - Should not happen?"); - } - } + + drop(crl_lock); + dbg!("Send_botmsg_inner > Within Spawn > After Dropping a Write Guard"); + + }); Log::flush(); } + pub async fn send_botmsg(&self, msginput: BotMsgType, params : Arc>) { - // pub async fn say_in_reply_to(&self, msg: &PrivmsgMessage, outmsg: String) { - // #[async_recursion] - pub async fn say_in_reply_to(&self, msg: &PrivmsgMessage, outmsg: String , params : ExecBodyParams) { + self.send_botmsg_inner(msginput, params).await; - // let params_clone = params.clone(); + // let chat = Arc::new(RwLock::new((*self).clone())); + // let chat_clone = chat.clone(); - // let botclone = Arc::clone(¶ms_clone.bot); - // let botlock = botclone.read().await; - // let id = botlock.get_identity(); - // let id = Arc::clone(&id); + // let cguard = chat_clone.read().await; + // let outguard = cguard.outqueue.1.read().await; + // // let r = (*outguard).clone(); + // match outguard.recv().await { + // Ok(a) => { - // // botlog::trace( - // // "ACQUIRING WRITE LOCK : ID", - // // Some("Chat > send_botmsg".to_string()), - // // Some(¶ms.msg), - // // ); - // // Log::flush(); - - // botlog::trace( - // "ACQUIRING READ LOCK : ID", - // Some("Chat > send_botmsg".to_string()), - // Some(¶ms.msg), - // ); - // Log::flush(); - - - // // let idlock = id.write().await; // <-- [ ] 03.24 - This is definitely locking it - // let idlock = id.read().await; // <-- [ ] 03.24 - seems to work - // let a = idlock.getspecialuserroles(params.get_sender(), Some(Channel(msg.channel_login.clone()))).await; - // botlog::trace( - // format!("GETSPECIALUSERROLES RESULT : {:?}",a).as_str(), - // Some("Chat > send_botmsg".to_string()), - // Some(¶ms.msg), - // ); - // Log::flush(); - - - - // // botlog::trace( - // // "ACQUIRED WRITE LOCK : ID", - // // Some("Chat > send_botmsg".to_string()), - // // Some(¶ms.msg), - // // ); - // // Log::flush(); + // }, + // Err(err) => { + // dbg!("ISSUE processing Receiver",err); + // sleep(Duration::from_millis(10)).await; + // } + // } - - // botlog::trace( - // "ACQUIRED READ LOCK : ID", - // Some("Chat > send_botmsg".to_string()), - // Some(¶ms.msg), - // ); - // Log::flush(); + // let msg_clone = Arc::new(msg.clone()); + // let blockingspawn = tokio::task::spawn_blocking( move || { + // // let paramsguard = params.blocking_read(); + // // let params = (*paramsguard).clone(); + // // drop(paramsguard); + // // chat_clone.blocking_read().outqueue.0.send_blocking(( + // // BotMsgType::SayInReplyTo(msg_clone, outmsg), + // // params + // // )).unwrap(); + // chat_clone.blocking_read().blocking_send_botmsg( + // msginput, + // params, + // ); + // }); - - self.send_botmsg(BotMsgType::SayInReplyTo(msg, outmsg) , params).await; + // blockingspawn.await.unwrap(); } - // pub async fn say(&self, channel_login: String, message: String) { - pub async fn say(&self, channel_login: String, message: String , params : ExecBodyParams) { - // more info https://docs.rs/twitch-irc/latest/twitch_irc/client/struct.TwitchIRCClient.html#method.say - self.send_botmsg(BotMsgType::Say(channel_login.to_lowercase(), message), params).await; + pub fn blocking_send_botmsg(&self, msginput: BotMsgType, params : Arc>) { + + let chat = Arc::new(RwLock::new((*self).clone())); + let chat_clone = chat.clone(); + // let msg_clone = Arc::new(msg.clone()); + let paramsguard = params.blocking_read(); + let params = (*paramsguard).clone(); + drop(paramsguard); + dbg!("Called blocking_send_botmsg",msginput.clone()); + chat_clone.blocking_read().outqueue.0.send_blocking(( + msginput, + params + )).unwrap(); + } + + pub async fn blocking_async_send_botmsg(&self, msginput: BotMsgType, params : Arc>) { + + let chat = Arc::new(RwLock::new((*self).clone())); + let chat_clone = chat.clone(); + // let msg_clone = Arc::new(msg.clone()); + let blockingspawn = tokio::task::spawn_blocking( move || { + // let paramsguard = params.blocking_read(); + // let params = (*paramsguard).clone(); + // drop(paramsguard); + // chat_clone.blocking_read().outqueue.0.send_blocking(( + // BotMsgType::SayInReplyTo(msg_clone, outmsg), + // params + // )).unwrap(); + chat_clone.blocking_read().blocking_send_botmsg( + msginput, + params, + ); + }); + + blockingspawn.await.unwrap(); + + } + + + pub async fn say_in_reply_to(&self, msg: &PrivmsgMessage, outmsg: String , params : Arc>) { + // let a = Arc::new(RwLock::new(self)); + // let a_clone = a.clone(); + self.send_botmsg( + BotMsgType::SayInReplyTo( + Arc::new((*msg).clone()), + outmsg) , + params + ).await; + } + + + pub fn blocking_say_in_reply_to(&self, msg: &PrivmsgMessage, outmsg: String , params : Arc>) { + + let chat = Arc::new(RwLock::new((*self).clone())); + let chat_clone = chat.clone(); + let msg_clone = Arc::new(msg.clone()); + + let paramsguard = params.blocking_read(); + let params = (*paramsguard).clone(); + drop(paramsguard); + chat_clone.blocking_read().outqueue.0.send_blocking(( + BotMsgType::SayInReplyTo(msg_clone, outmsg), + params + )).unwrap(); + + } + + + + pub async fn blocking_async_say_in_reply_to(&self, msg: &PrivmsgMessage, outmsg: String , params : Arc>) { + + // let params_clone = params.clone(); + + // let botclone = Arc::clone(¶ms_clone.bot); + // let botlock = botclone.read().await; + // let id = botlock.get_identity(); + // let id = Arc::clone(&id); + + // // botlog::trace( + // // "ACQUIRING WRITE LOCK : ID", + // // Some("Chat > send_botmsg".to_string()), + // // Some(¶ms.msg), + // // ); + // // Log::flush(); + + // botlog::trace( + // "ACQUIRING READ LOCK : ID", + // Some("Chat > send_botmsg".to_string()), + // Some(¶ms.msg), + // ); + // Log::flush(); + + + // // let idlock = id.write().await; // <-- [ ] 03.24 - This is definitely locking it + // let idlock = id.read().await; // <-- [ ] 03.24 - seems to work + // let a = idlock.getspecialuserroles(params.get_sender(), Some(Channel(msg.channel_login.clone()))).await; + // botlog::trace( + // format!("GETSPECIALUSERROLES RESULT : {:?}",a).as_str(), + // Some("Chat > send_botmsg".to_string()), + // Some(¶ms.msg), + // ); + // Log::flush(); + + + + // // botlog::trace( + // // "ACQUIRED WRITE LOCK : ID", + // // Some("Chat > send_botmsg".to_string()), + // // Some(¶ms.msg), + // // ); + // // Log::flush(); + + + + // botlog::trace( + // "ACQUIRED READ LOCK : ID", + // Some("Chat > send_botmsg".to_string()), + // Some(¶ms.msg), + // ); + // Log::flush(); + + + // self.send_botmsg(BotMsgType::SayInReplyTo(Arc::new(msg.clone()), outmsg) , params).await; + // self.outqueue.write().await.push(( + // BotMsgType::SayInReplyTo(Arc::new(msg.clone()), outmsg), + // params + // )); + // let paramsguard = params.blocking_read(); + // let a = (*paramsguard).clone(); + // drop(paramsguard); + // self.outqueue.0.send_blocking(( + // BotMsgType::SayInReplyTo(Arc::new(msg.clone()), outmsg), + // a + // )).unwrap(); + + + // let a = Arc::new(RwLock::new(*self)); + + let chat = Arc::new(RwLock::new((*self).clone())); + let chat_clone = chat.clone(); + let msg_clone = Arc::new(msg.clone()); + let blockingspawn = tokio::task::spawn_blocking( move || { + // let paramsguard = params.blocking_read(); + // let params = (*paramsguard).clone(); + // drop(paramsguard); + // chat_clone.blocking_read().outqueue.0.send_blocking(( + // BotMsgType::SayInReplyTo(msg_clone, outmsg), + // params + // )).unwrap(); + chat_clone.blocking_read().blocking_say_in_reply_to( + &(*msg_clone), + outmsg, + params, + ); + }); + + blockingspawn.await.unwrap(); + + + // let a = Arc::new(RwLock::new(self)); + // let a_clone = a.clone(); + // let b = self.send_botmsg(BotMsgType::SayInReplyTo(msg, outmsg) , params); + + + // struct Tester { + // queue : Vec, + // } + + // let mut q = vec![]; + // q.push(b); + + // let inst = Tester { + // queue : q, + // }; + + + + } + + + pub async fn say(&self, channel_login: String, message: String , params : Arc>) { + self.send_botmsg( + BotMsgType::Say( + channel_login, + message), + params + ).await; + } + + pub fn blocking_say(&self, channel_login: String, message: String , params : Arc>) { + + let chat = Arc::new(RwLock::new((*self).clone())); + let chat_clone = chat.clone(); + + let paramsguard = params.blocking_read(); + let params = (*paramsguard).clone(); + drop(paramsguard); + chat_clone.blocking_read().outqueue.0.send_blocking(( + BotMsgType::Say(channel_login.to_lowercase(), message), + params + )).unwrap(); + + } + + + + pub async fn blocking_async_say(&self, channel_login: String, message: String , params : Arc>) { + // more info https://docs.rs/twitch-irc/latest/twitch_irc/client/struct.TwitchIRCClient.html#method.say + + // self.send_botmsg(BotMsgType::Say(channel_login.to_lowercase(), message), params).await; + + // let paramguard = params.blocking_read(); + // let a = (*paramguard).clone(); + // drop(paramguard); + // self.outqueue.0.send_blocking(( + // BotMsgType::Say(channel_login.to_lowercase(), message), + // a + // )).unwrap(); + + + let chat = Arc::new(RwLock::new((*self).clone())); + let chat_clone = chat.clone(); + // let msg_clone = Arc::new(msg.clone()); + let blockingspawn = tokio::task::spawn_blocking( move || { + // let paramsguard = params.blocking_read(); + // let params = (*paramsguard).clone(); + // drop(paramsguard); + // chat_clone.blocking_read().outqueue.0.send_blocking(( + // BotMsgType::Say(channel_login.to_lowercase(), message), + // params + // )).unwrap(); + chat_clone.blocking_read().blocking_say( + channel_login, + message, + params, + ); + }); + + blockingspawn.await.unwrap(); + + + } + + + + async fn _me(&self, _: String, _: String) { // more info https://docs.rs/twitch-irc/latest/twitch_irc/client/struct.TwitchIRCClient.html#method.say diff --git a/src/core/identity.rs b/src/core/identity.rs index 7a9e579..feb6791 100644 --- a/src/core/identity.rs +++ b/src/core/identity.rs @@ -214,11 +214,11 @@ async fn cmd_promote(params : ExecBodyParams) { { arg2 } else if let Some(a) = arg1 { if a.starts_with('-') { - botlock.botmgrs.chat.send_botmsg( + botlock.botmgrs.chat.read().await.send_botmsg( super::chat::BotMsgType::Notif( "Invalid Argument Flag".to_string() ), - params.clone(), + Arc::new(RwLock::new(params.clone())), ).await; return } else { arg1 } @@ -294,10 +294,10 @@ async fn cmd_promote(params : ExecBodyParams) { // We should call a notification around here - botlock.botmgrs.chat.send_botmsg(super::chat::BotMsgType::Notif( + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outmsg.to_string() ), - params.clone(), + Arc::new(RwLock::new(params.clone())), ).await; @@ -432,11 +432,11 @@ async fn cmd_demote(params : ExecBodyParams) { { arg2 } else if let Some(a) = arg1 { if a.starts_with('-') { - botlock.botmgrs.chat.send_botmsg( + botlock.botmgrs.chat.read().await.send_botmsg( super::chat::BotMsgType::Notif( "Invalid Argument Flag".to_string() ), - params.clone(), + Arc::new(RwLock::new(params.clone())), ).await; return } else { arg1 } @@ -519,10 +519,10 @@ async fn cmd_demote(params : ExecBodyParams) { Some(¶ms.msg), ); - botlock.botmgrs.chat.send_botmsg(super::chat::BotMsgType::Notif( + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outmsg.to_string() ), - params.clone(), + Arc::new(RwLock::new(params.clone())), ).await; @@ -545,6 +545,8 @@ async fn getroles(params : ExecBodyParams) { */ + dbg!("In getroles command"); + let mut argv = params.msg.message_text.split(' '); @@ -554,12 +556,24 @@ async fn getroles(params : ExecBodyParams) { let targetuser = match arg1 { None => { - + dbg!("Expected Exit"); botlog::debug( - "Exittingcmd getroles - Invalid arguments ", + "Exitting cmd getroles - Invalid arguments ", Some("identity.rs > init > getroles()".to_string()), Some(¶ms.msg), ); + + + let botlock = params.bot.read().await; + + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( + "Invalid Arguments".to_string() + ), + Arc::new(RwLock::new(params.clone())), + ).await; + + drop(botlock); + return }, // exit if no arguments @@ -570,12 +584,15 @@ async fn getroles(params : ExecBodyParams) { let targetchnl = arg2; + dbg!("In prior to bot read guard command"); + let botlock = params.bot.read().await; let id = botlock.get_identity(); let idlock = id.read().await; + dbg!("Pror to getspecialuserroles()"); let sproles = match targetchnl { None => { // [ ] If targetchnl is not provided, default to pulling the current channel @@ -669,14 +686,19 @@ async fn getroles(params : ExecBodyParams) { Some(¶ms.msg), ); - botlock.botmgrs.chat.send_botmsg(super::chat::BotMsgType::Notif( + dbg!("Pror to getroles send_botmsg"); + + botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif( outmsg.to_string() ), - params.clone(), + Arc::new(RwLock::new(params.clone())), ).await; // [ ] NOTE : After the above, I should receive only the roles in the context of the current channel I received this ideally and maybe BotAdmin ; not outside + + + dbg!("End of getroles command"); } @@ -1544,6 +1566,8 @@ impl IdentityManager { Note : Ideally this be called for a given chatter name ? */ + dbg!("In getspecialuserroles()"); + // [ ] !!! TODO: I don't think below is evaluating by given channel botlog::debug( &format!( @@ -1575,6 +1599,9 @@ impl IdentityManager { }; + + dbg!("Before reading Vector Roles"); + let rolesdb = Arc::clone(&self.special_roles_users); let rolesdb_lock = rolesdb.read().await; @@ -1630,6 +1657,8 @@ impl IdentityManager { } } + dbg!("exit getspecialusreroles with",&evalsproles); + botlog::debug( &format!("OUT > evalsproles {:?}", &evalsproles), Some("IdentityManager > getspecialuserroles()".to_string()), diff --git a/src/core/ratelimiter.rs b/src/core/ratelimiter.rs index 4de77bc..cdfa123 100644 --- a/src/core/ratelimiter.rs +++ b/src/core/ratelimiter.rs @@ -1,8 +1,14 @@ +// Related : https://dev.twitch.tv/docs/irc/#rate-limits + + const TIME_THRESHOLD_S: u64 = 30; const TIME_MIN_S_F64: f64 = 1.0; const MSG_THRESHOLD: u32 = 20; +const DUPMSG_MIN_SEND_S: f64 = 30.0; + +use std::{sync::Arc, time::Instant}; +use tokio::sync::RwLock; -use std::time::Instant; use crate::core::botlog; #[derive(Debug, Clone)] @@ -10,9 +16,10 @@ pub struct RateLimiter { timer: Instant, msgcounter: u32, lastmsgtimer : Instant, + lastmsgdup : String, } -#[derive(Debug)] +#[derive(Debug,Clone)] pub enum LimiterResp { Allow, // when it's evaluated to be within limits Skip, // as outside of rate limits @@ -32,10 +39,20 @@ impl RateLimiter { timer: Instant::now(), msgcounter: 0, lastmsgtimer: Instant::now(), + lastmsgdup : String::new(), } } - pub fn check_limiter(&mut self) -> LimiterResp { + fn dupmsg(&self,inmsg : String) -> bool { + dbg!("dubmsg()",&inmsg,&self.lastmsgdup); + if self.lastmsgdup == inmsg && self.lastmsgtimer.elapsed().as_secs_f64() < DUPMSG_MIN_SEND_S { + // duplicate detected + // self.lastmsgdup = String::new(); // update it as no longer duplicate + return true; + } else { return false; }; + } + + pub async fn check_limiter(&mut self,inmsg : String) -> LimiterResp { let logstr = format!( @@ -49,32 +66,104 @@ impl RateLimiter { ); - let rsp = if self.timer.elapsed().as_secs() >= TIME_THRESHOLD_S { - self.timer = Instant::now(); - self.msgcounter = 0; + /* + Documented Thesholds from https://dev.twitch.tv/docs/irc/#rate-limits + - [x] self.timer.elapsed().as_secs() >= TIME_THRESHOLD_S + >> initialize with LimiterResp::Allow + - [x] self.msgcounter < MSG_THRESHOLD && + self.lastmsgtimer.elapsed().as_secs_f64() >= TIME_MIN_S_F64 + >> LimiterResp::Allow + - [x] else + >> LimiterResp::Sleep(TIME_MIN_S_F64 - self.lastmsgtimer.elapsed().as_secs_f64() + 0.1) + */ + + // let ratelimiter_ar = Arc::new(RwLock::new((*self).clone())); + let ratelimiter_ar = Arc::new(RwLock::new(self)); + let mut rl_guard = ratelimiter_ar.write().await; + + let rsp = if rl_guard.timer.elapsed().as_secs() >= TIME_THRESHOLD_S { + rl_guard.timer = Instant::now(); + rl_guard.msgcounter = 0; LimiterResp::Allow - } else if self.msgcounter < MSG_THRESHOLD && - self.lastmsgtimer.elapsed().as_secs_f64() >= TIME_MIN_S_F64 { + } else if rl_guard.msgcounter < MSG_THRESHOLD && + rl_guard.lastmsgtimer.elapsed().as_secs_f64() >= TIME_MIN_S_F64 { LimiterResp::Allow } else { // when elapsed() < TIME_THRESHOLD_S && msgcounter >= MSG_THRESHOLD // LimiterResp::Skip - LimiterResp::Sleep(TIME_MIN_S_F64 - self.lastmsgtimer.elapsed().as_secs_f64() + 0.1) + LimiterResp::Sleep(TIME_MIN_S_F64 - rl_guard.lastmsgtimer.elapsed().as_secs_f64() + 0.1) }; + // [ ] If rsp is still allow at this point, also do a dupcheck to adjust rsp to a sleep + + dbg!(inmsg.clone()); + dbg!(rl_guard.dupmsg(inmsg.clone())); + dbg!("Before Dup test",rsp.clone()); + dbg!(matches!(rsp.clone(),LimiterResp::Allow)); + + + // [-] Comment this area if removing Duplicate checking functionality + /* */ + let rsp = if rl_guard.dupmsg(inmsg.clone()) && matches!(rsp,LimiterResp::Allow) { + //self.lastmsgdup = String::new(); + // LimiterResp::Sleep(DUPMSG_MIN_SEND_S) + dbg!("Duplicate detected"); + LimiterResp::Sleep(DUPMSG_MIN_SEND_S - rl_guard.lastmsgtimer.elapsed().as_secs_f64() + 0.1) + } else { rsp.clone() }; + /* */ + + // => 04.02 - Don't update here + // // After Dup is checked, Set the lastmsgdup to latest message + // dbg!("Before Assigning Lastmsgdup",rl_guard.lastmsgdup.clone(),inmsg.clone(),rsp.clone()); + // rl_guard.lastmsgdup = inmsg.clone(); + // dbg!("After Assigning Lastmsgdup",rl_guard.lastmsgdup.clone(),inmsg.clone()); + + + // [ ] Allows if sleep came up as negative + let rsp = match rsp { + LimiterResp::Sleep(sleeptime) if sleeptime < 0.0 => LimiterResp::Allow , // allow is sleeptime evaluated is negative + _ => rsp.clone(), + } ; + + + dbg!(rsp.clone()); + botlog::trace( &format!("Limiter Response : {:?} ; Elapsed (as_sec_f64) : {}", - rsp, self.lastmsgtimer.elapsed().as_secs_f64()), + rsp, rl_guard.lastmsgtimer.elapsed().as_secs_f64()), Some("Rate Limiter Inner".to_string()), None, ); + dbg!("check_limiter() > ratelimiter guard",rl_guard.clone()); + + drop(rl_guard); + + rsp } - pub fn increment_counter(&mut self) { + // pub fn increment_counter(&mut self) { + // self.msgcounter += 1; + // self.lastmsgtimer = Instant::now(); + // } + + pub async fn sent_msg(&mut self,msgstr : String) { + + dbg!("sent_msg triggered",&msgstr); + + // let ratelimiter_ar = Arc::new(RwLock::new((*self).clone())); + // let mut rl_guard = ratelimiter_ar.write().await; + + dbg!("sent_msg > lastmsg before update",&self.lastmsgdup); + + self.lastmsgdup = msgstr; self.msgcounter += 1; self.lastmsgtimer = Instant::now(); + + dbg!("sent_msg > lastmsg after update",&self.lastmsgdup); + + // drop(rl_guard); } } diff --git a/src/custom/experimental/experiment001.rs b/src/custom/experimental/experiment001.rs index dd3cba4..36156d9 100644 --- a/src/custom/experimental/experiment001.rs +++ b/src/custom/experimental/experiment001.rs @@ -15,6 +15,7 @@ const OF_CMD_CHANNEL:Channel = Channel(String::new()); use rand::Rng; +use tokio::sync::RwLock; use std::sync::Arc; use crate::core::bot_actions::ExecBodyParams; @@ -130,10 +131,11 @@ async fn good_girl(params : ExecBodyParams) { botlock .botmgrs .chat + .read().await .say_in_reply_to( ¶ms.msg, String::from("GoodGirl xdd "), - params.clone() + Arc::new(RwLock::new(params.clone())) ).await; @@ -170,22 +172,25 @@ async fn babygirl(params : ExecBodyParams) { botlock .botmgrs .chat + .read().await .say_in_reply_to( ¶ms.msg, String::from("16:13 notohh: cafdk"), - params.clone() + Arc::new(RwLock::new(params.clone())) ).await; + sleep(Duration::from_secs_f64(0.5)).await; botlock .botmgrs .chat + .read().await .say_in_reply_to( ¶ms.msg, String::from("16:13 notohh: have fun eating princess"), - params.clone() + Arc::new(RwLock::new(params.clone())) ).await; @@ -194,10 +199,11 @@ async fn babygirl(params : ExecBodyParams) { botlock .botmgrs .chat + .read().await .say_in_reply_to( ¶ms.msg, String::from("16:13 notohh: baby girl"), - params.clone() + Arc::new(RwLock::new(params.clone())) ).await; @@ -225,5 +231,39 @@ async fn routinelike(params : ExecBodyParams) { // lines are executed after in conjunction to the spawn + // spawn 5 independent spawns + + let bot = Arc::clone(¶ms.bot); + let params_ar = Arc::new(RwLock::new(params.clone())); + let params_c = params_ar.clone(); + + for _ in 0..5 { + let bot_clone = bot.clone(); + let params_cc = params_c.clone(); + tokio::spawn( async move { + println!(">> SPAWNED Innterroutine triggered!"); + sleep(Duration::from_secs_f64(5.0)).await; + + let botlock = bot_clone.read().await; + let params_ccc = params_cc.clone(); + let paramsguard = params_ccc.read().await; + + botlock + .botmgrs + .chat + .read().await + .say_in_reply_to( + ¶msguard.msg, + String::from("SPAWNED GoodGirl xdd "), + params_cc, + ).await; + + drop(botlock); + drop(paramsguard); + + }); + }; + + } diff --git a/src/custom/experimental/experiment002.rs b/src/custom/experimental/experiment002.rs index a4ecb25..e1e621d 100644 --- a/src/custom/experimental/experiment002.rs +++ b/src/custom/experimental/experiment002.rs @@ -17,6 +17,7 @@ const OF_CMD_CHANNEL:Channel = Channel(String::new()); use std::sync::Arc; use chrono::{TimeZone,Local}; +use tokio::sync::RwLock; use crate::core::bot_actions::ExecBodyParams; @@ -116,7 +117,7 @@ async fn sayout(params : ExecBodyParams) { botlog::trace( &format!("[TRACE] Evaluated status of {} : {:?}", - trgchnl.to_string().clone(),botlock.botmgrs.chat.client.get_channel_status(trgchnl.to_string().clone()).await), + trgchnl.to_string().clone(),botlock.botmgrs.chat.read().await.client.get_channel_status(trgchnl.to_string().clone()).await), Some("Chat > send_botmsg".to_string()), None, ); @@ -156,10 +157,11 @@ async fn sayout(params : ExecBodyParams) { botlock .botmgrs .chat + .read().await .say( trgchnl.to_string(), newoutmsg.to_string(), - params.clone(), + Arc::new(RwLock::new(params.clone())), ).await; @@ -179,10 +181,11 @@ async fn sayout(params : ExecBodyParams) { botlock .botmgrs .chat + .read().await .say_in_reply_to( ¶ms.msg, String::from("Invalid arguments"), - params.clone() + Arc::new(RwLock::new(params.clone())) ).await; diff --git a/src/custom/experimental/experiment003.rs b/src/custom/experimental/experiment003.rs index ed1c6ba..d76d8fb 100644 --- a/src/custom/experimental/experiment003.rs +++ b/src/custom/experimental/experiment003.rs @@ -289,12 +289,12 @@ async fn countdown_chnl_v1(params : ExecBodyParams) { // let chosen_channel_ar = Arc::new(RwLock::new(chosen_channel)); // let params_clone = params.clone(); - // let bot = Arc::clone(¶ms.blocking_read().bot); + // // let bot = Arc::clone(¶ms.blocking_read().bot); - // dbg!("in chat async function"); + // // dbg!("in chat async function"); - // let botlock = bot.blocking_read(); + // // let botlock = bot.blocking_read(); // let channel_ar_clone = chosen_channel_ar.clone(); // let outmsg = if iterleft <= 1 { @@ -303,11 +303,24 @@ async fn countdown_chnl_v1(params : ExecBodyParams) { // botlock.botmgrs.chat // .say( - // channel_ar_clone.read().await.0.clone(), + // channel_ar_clone.blocking_read().0.clone(), // outmsg, - // params_clone.read().await.clone() - // ).await; + // params_clone, + // ); + let outmsg = if iterleft <= 1 { + // format!("{} I love you uwu~",iterleft) + let msgtxt = params.blocking_read().msg.message_text.clone(); + // format!("{} {} I love you uwu~",iterleft,msgtxt) + format!("{} {}",iterleft,msgtxt) + } else { format!("{} Tomfoolery Clap",iterleft) }; + + botlock.botmgrs.chat.blocking_read() + .blocking_say( + chosen_channel.0.clone(), + outmsg, + Arc::new(RwLock::new(params.blocking_read().clone())), + ); } } @@ -343,22 +356,66 @@ async fn countdown_chnl_v1(params : ExecBodyParams) { let bot = Arc::clone(¶ms_ar.read().await.bot); - let botlock = bot.read().await; + // let botlock = bot.read().await; // uses chat.say_in_reply_to() for the bot controls for messages - botlock - .botmgrs - .chat - .say_in_reply_to( - ¶ms_ar.read().await.msg, - "Started Routine!".to_string(), - params_ar.read().await.clone() - ).await; + + // [ ] 04.01 - in an ASYNC context, ensure to raise a Blocking Spawn + // botlock + // .botmgrs + // .chat + // .say_in_reply_to( + // ¶ms_ar.read().await.msg, + // "Started Routine!".to_string(), + // Arc::new(RwLock::new(params_ar.read().await.clone())) + // ); // let jhandle = newr.clone().read().await.join_handle.clone().unwrap(); // let a = jhandle.write().await; // a. // sleep(Duration::from_secs(300)).await; + + + // let loopbodyspawn = tokio::task::spawn_blocking( move || { + // let botlock = bot.blocking_read(); + // botlock + // .botmgrs + // .chat + // .say_in_reply_to( + // ¶ms_ar.blocking_read().msg, + // "Started Routine!".to_string(), + // Arc::new(RwLock::new(params_ar.blocking_read().clone())) + // ); + // }); + + // loopbodyspawn.await.unwrap(); + + // let botlock = bot.read().await; + // botlock + // .botmgrs + // .chat + // .say_in_reply_to( + // ¶ms_ar.blocking_read().msg, + // "Started Routine!".to_string(), + // Arc::new(RwLock::new(params_ar.blocking_read().clone())) + // ).await; + // drop(botlock); + + + let botlock = bot.read().await; + let params_guard = params_ar.read().await; + botlock + .botmgrs + .chat + .read().await + .say_in_reply_to( + // ¶ms_ar.blocking_read().msg, + ¶ms_guard.msg, + "Started Routine!".to_string(), + Arc::new(RwLock::new(params_guard.clone())) + ).await; + drop(botlock); + drop(params_guard); } @@ -602,10 +659,11 @@ async fn test3_body(params : ExecBodyParams) { botlock .botmgrs .chat + .read().await .say_in_reply_to( ¶ms_ar.read().await.msg, format!("Routine Result : {:?}",rsltstr), - params_clone.read().await.clone() + Arc::new(RwLock::new(params_clone.read().await.clone())) ).await; // [x] Will not be handling JoinHandles here . If immediate abort() handling is required, below is an example that works @@ -669,10 +727,11 @@ async fn good_girl(params : ExecBodyParams) { botlock .botmgrs .chat + .read().await .say_in_reply_to( ¶ms.msg, String::from("GoodGirl xdd "), - params.clone() + Arc::new(RwLock::new(params.clone())) ).await; @@ -709,10 +768,11 @@ async fn babygirl(params : ExecBodyParams) { botlock .botmgrs .chat + .read().await .say_in_reply_to( ¶ms.msg, String::from("16:13 notohh: cafdk"), - params.clone() + Arc::new(RwLock::new(params.clone())) ).await; @@ -721,10 +781,11 @@ async fn babygirl(params : ExecBodyParams) { botlock .botmgrs .chat + .read().await .say_in_reply_to( ¶ms.msg, String::from("16:13 notohh: have fun eating princess"), - params.clone() + Arc::new(RwLock::new(params.clone())) ).await; @@ -733,10 +794,11 @@ async fn babygirl(params : ExecBodyParams) { botlock .botmgrs .chat + .read().await .say_in_reply_to( ¶ms.msg, String::from("16:13 notohh: baby girl"), - params.clone() + Arc::new(RwLock::new(params.clone())) ).await;