async botnotif

This commit is contained in:
ModulatingForce 2024-04-02 14:17:28 -04:00
parent e73bd75de9
commit e3718688a7
4 changed files with 213 additions and 40 deletions

View file

@ -169,6 +169,7 @@ impl BotInstance {
match message {
ServerMessage::Notice(msg) => {
dbg!("NOTICE TRIGGERED",msg.clone());
botlog::notice(
format!("NOTICE : (#{:?}) {}", msg.channel_login, msg.message_text)
.as_str(),
@ -536,6 +537,8 @@ impl BotInstance {
Some(msg),
);
dbg!("Running botcommand");
let a = Arc::clone(&bot);
c.execute(ExecBodyParams {
bot : a,

View file

@ -110,6 +110,7 @@ impl Chat {
// let chat_ar = Arc::new(RwLock::new(chat_clone));
let chat_ar_clone = chat_ar.clone();
// Spawning a Task that will process the Chat.outqueue
tokio::spawn(async move {
// loop {
// // tokio::spawn(async move {
@ -139,35 +140,66 @@ impl Chat {
let r_cc = r_clone.clone();
loop {
// loop {
// Spawn x helper threads to process messages in the outqueue
for _ in 1..10 {
dbg!("Chat queue Helper thread generated");
let r_ccc = r_cc.clone();
let chat_c = chat_ar.clone();
tokio::spawn(async move {
let r_cc = r_ccc.clone();
let chat_cc = chat_c.clone();
// let chat_cc = chat_c.clone();
let guard1 = r_cc.read().await;
match guard1.recv().await {
Ok(a) => {
while let Ok(a) = guard1.recv().await {
// let r_cc = r_ccc.clone();
let chat_cc = chat_c.clone();
// let guard2 = r_cc.read().await;
// Ok(a) => {
// [ ] This should spawn it's own ideally, so it does not lock other messages
// chat_clone.send_botmsg(a.0, Arc::new(RwLock::new(a.1))).await
let params_ar = Arc::new(RwLock::new(a.1));
// let chat_clone = chat_cc.clone();
dbg!(chrono::offset::Local::now(),"Message Spawned > Starting");
tokio::spawn( async move {
chat_cc.read().await.send_botmsg_inner(a.0, params_ar).await
});
dbg!(chrono::offset::Local::now(),"Message Spawned > Completed");
} ,
Err(err) => {
dbg!("ISSUE processing Receiver",err);
sleep(Duration::from_millis(10)).await;
}
let params_ar = Arc::new(RwLock::new(a.1));
// let chat_clone = chat_cc.clone();
dbg!(chrono::offset::Local::now(),"Message Spawned > Starting");
tokio::spawn( async move {
dbg!("Message from Chat Queue > Sending");
chat_cc.read().await.send_botmsg_inner(a.0, params_ar).await;
dbg!("Message from Chat Queue > Sent");
});
dbg!(chrono::offset::Local::now(),"Message Spawned > Completed");
// } ,
// Err(err) => {
// dbg!("ISSUE processing Receiver",err);
// sleep(Duration::from_millis(10)).await;
// }
}
dbg!("ISSUE : Helper thread observed an Err");
// match guard1.recv().await {
// Ok(a) => {
// // [ ] This should spawn it's own ideally, so it does not lock other messages
// // chat_clone.send_botmsg(a.0, Arc::new(RwLock::new(a.1))).await
// let params_ar = Arc::new(RwLock::new(a.1));
// // let chat_clone = chat_cc.clone();
// dbg!(chrono::offset::Local::now(),"Message Spawned > Starting");
// tokio::spawn( async move {
// chat_cc.read().await.send_botmsg_inner(a.0, params_ar).await
// });
// dbg!(chrono::offset::Local::now(),"Message Spawned > Completed");
// } ,
// Err(err) => {
// dbg!("ISSUE processing Receiver",err);
// sleep(Duration::from_millis(10)).await;
// }
// }
}
);
yield_now().await;
}
});
// chat
@ -357,6 +389,8 @@ impl Chat {
// let params_clone = params.clone();
dbg!("Send_botmsg_inner > Checking user roles first");
let botclone = Arc::clone(&params.read().await.bot);
let botlock = botclone.read().await;
let id = botlock.get_identity();
@ -373,6 +407,8 @@ impl Chat {
Some(&params.read().await.msg),
);
dbg!("Send_botmsg_inner > Checking user roles first > Prepared to check for roles");
Log::flush();
// [x] If user has any of the following target roles, they will be allowed - otherwise, they will not be allowed to send
@ -415,6 +451,8 @@ impl Chat {
}
dbg!("Send_botmsg_inner > Roles Checked");
/*
At this stage from the above Validations :
msginput would be :
@ -551,8 +589,11 @@ impl Chat {
// }
// => [x] 04.02 - Q. Why are we spawning here? => A. Believe this was to try to ensure not blocked
tokio::spawn(async move {
dbg!("Send_botmsg_inner > Within Spawn");
let chat_clone = chat_ar.clone();
let ratelimiters_clone = ratelimiters.clone();
let rguard = ratelimiters_clone.read().await;
@ -566,16 +607,75 @@ impl Chat {
// Continue to check the limiter and sleep if required if the minimum is not reached
loop {
// Check first if need to sleep, then sleep if required
dbg!("Send_botmsg_inner > Within Spawn > Before a Write Guard");
let mut crl_lock = contextratelimiter.write().await;
let sleeptime = if let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()).await {
// dbg!("Send_botmsg_inner > Within Spawn > Drop a Write Guard");
// drop(crl_lock);
// dbg!("determined to sleep > Sleeping now - ",chrono::offset::Local::now());
// sleep(Duration::from_secs_f64(sleeptime)).await;
sleeptime
} else { 0.0 };
drop(crl_lock);
if sleeptime > 0.0 {
dbg!("determined to sleep > Sleeping now - ",chrono::offset::Local::now());
sleep(Duration::from_secs_f64(sleeptime)).await;
}
// if told to sleep again, check again before looping for another check
dbg!("Send_botmsg_inner > Within Spawn > Before a Write Guard");
let mut crl_lock = contextratelimiter.write().await;
if let ratelimiter::LimiterResp::Sleep(_) = crl_lock.check_limiter(outmsg.clone()).await {
// do nothing here
} else {
break; // break out if no longer sleep LimiterResp
};
dbg!("Send_botmsg_inner > Within Spawn > Drop a Write Guard");
drop(crl_lock);
}
/*
// Original Logic
dbg!("Send_botmsg_inner > Within Spawn > Before a Write Guard");
let mut crl_lock = contextratelimiter.write().await;
while let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()) {
// dbg!("Send_botmsg_inner > Within Spawn > Reading Guard / Before a Write Guard");
// let mut crl_lock = contextratelimiter.read().await;
// let rslt = crl_lock.check_limiter(outmsg.clone()).await;
dbg!("before checklimiter sleep",&crl_lock);
// Assign the message here to rate limiter - or just during
// let a = *(contextratelimiter.read().await);
// let b = Arc::new(a);
// let c = b.check_limiter(inmsg).await;
while let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()).await {
// while let ratelimiter::LimiterResp::Sleep(sleeptime) = crl_lock.check_limiter(outmsg.clone()).await {
dbg!("within checklimiter sleep",&crl_lock);
// dbg!(chrono::offset::Local::now(),"Message Spawned > Completed");
// drop(crl_lock); //dropping the lock first
dbg!("sleep loop start",chrono::offset::Local::now(), sleeptime);
yield_now().await;
dbg!("yield in just after loop start",chrono::offset::Local::now());
sleep(Duration::from_secs_f64(sleeptime)).await;
}
*/
match crl_lock.check_limiter(outmsg.clone()) {
let mut crl_lock = contextratelimiter.write().await;
dbg!("after checklimiter sleep",&crl_lock);
match crl_lock.clone().check_limiter(outmsg.clone()).await {
ratelimiter::LimiterResp::Allow => {
// let maxblanks = rand::thread_rng().gen_range(1..=20);
@ -592,8 +692,8 @@ impl Chat {
// drop(blanks_to_add_lock);
dbg!("[ ] PROBLEM AREA - If notificaiton triggered, need this checked");
dbg!(outmsg.clone());
dbg!("Area that used to add blanks to out message");
dbg!("Sending the following messag to chat > ", outmsg.clone());
botlog::trace(
&format!("Out Message TO {} >> {}",channel_login.clone(),outmsg.clone()),
@ -614,7 +714,7 @@ impl Chat {
}
// contextratelimiter.increment_counter();
crl_lock.sent_msg(outmsg.clone());
crl_lock.sent_msg(outmsg.clone()).await;
let logstr = format!(
"(#{}) > {} ; contextratelimiter : {:?}",
@ -642,7 +742,7 @@ impl Chat {
// (); // do nothing otherwise
}
ratelimiter::LimiterResp::Sleep(err_passed_sleep_time) => {
dbg!(err_passed_sleep_time);
dbg!("Rate Limiter returned Sleep unexpetedly",err_passed_sleep_time);
// panic!("ISSUE : sleep was already awaited - Should not happen?");
botlog::warn(
format!(
@ -657,6 +757,7 @@ impl Chat {
drop(crl_lock);
dbg!("Send_botmsg_inner > Within Spawn > After Dropping a Write Guard");
});

View file

@ -545,6 +545,8 @@ async fn getroles(params : ExecBodyParams) {
*/
dbg!("In getroles command");
let mut argv = params.msg.message_text.split(' ');
@ -554,12 +556,24 @@ async fn getroles(params : ExecBodyParams) {
let targetuser = match arg1 {
None => {
dbg!("Expected Exit");
botlog::debug(
"Exittingcmd getroles - Invalid arguments ",
"Exitting cmd getroles - Invalid arguments ",
Some("identity.rs > init > getroles()".to_string()),
Some(&params.msg),
);
let botlock = params.bot.read().await;
botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif(
"Invalid Arguments".to_string()
),
Arc::new(RwLock::new(params.clone())),
).await;
drop(botlock);
return
}, // exit if no arguments
@ -570,12 +584,15 @@ async fn getroles(params : ExecBodyParams) {
let targetchnl = arg2;
dbg!("In prior to bot read guard command");
let botlock = params.bot.read().await;
let id = botlock.get_identity();
let idlock = id.read().await;
dbg!("Pror to getspecialuserroles()");
let sproles = match targetchnl {
None => {
// [ ] If targetchnl is not provided, default to pulling the current channel
@ -669,6 +686,8 @@ async fn getroles(params : ExecBodyParams) {
Some(&params.msg),
);
dbg!("Pror to getroles send_botmsg");
botlock.botmgrs.chat.read().await.send_botmsg(super::chat::BotMsgType::Notif(
outmsg.to_string()
),
@ -677,6 +696,9 @@ async fn getroles(params : ExecBodyParams) {
// [ ] NOTE : After the above, I should receive only the roles in the context of the current channel I received this ideally and maybe BotAdmin ; not outside
dbg!("End of getroles command");
}
@ -1544,6 +1566,8 @@ impl IdentityManager {
Note : Ideally this be called for a given chatter name ?
*/
dbg!("In getspecialuserroles()");
// [ ] !!! TODO: I don't think below is evaluating by given channel
botlog::debug(
&format!(
@ -1575,6 +1599,9 @@ impl IdentityManager {
};
dbg!("Before reading Vector Roles");
let rolesdb = Arc::clone(&self.special_roles_users);
let rolesdb_lock = rolesdb.read().await;
@ -1630,6 +1657,8 @@ impl IdentityManager {
}
}
dbg!("exit getspecialusreroles with",&evalsproles);
botlog::debug(
&format!("OUT > evalsproles {:?}", &evalsproles),
Some("IdentityManager > getspecialuserroles()".to_string()),

View file

@ -6,7 +6,9 @@ const TIME_MIN_S_F64: f64 = 1.0;
const MSG_THRESHOLD: u32 = 20;
const DUPMSG_MIN_SEND_S: f64 = 30.0;
use std::time::Instant;
use std::{sync::Arc, time::Instant};
use tokio::sync::RwLock;
use crate::core::botlog;
#[derive(Debug, Clone)]
@ -42,14 +44,15 @@ impl RateLimiter {
}
fn dupmsg(&self,inmsg : String) -> bool {
if self.lastmsgdup == inmsg {
dbg!("dubmsg()",&inmsg,&self.lastmsgdup);
if self.lastmsgdup == inmsg && self.lastmsgtimer.elapsed().as_secs_f64() < DUPMSG_MIN_SEND_S {
// duplicate detected
// self.lastmsgdup = String::new(); // update it as no longer duplicate
return true;
} else { return false; };
}
pub fn check_limiter(&mut self,inmsg : String) -> LimiterResp {
pub async fn check_limiter(&mut self,inmsg : String) -> LimiterResp {
let logstr = format!(
@ -74,42 +77,67 @@ impl RateLimiter {
>> LimiterResp::Sleep(TIME_MIN_S_F64 - self.lastmsgtimer.elapsed().as_secs_f64() + 0.1)
*/
// let ratelimiter_ar = Arc::new(RwLock::new((*self).clone()));
let ratelimiter_ar = Arc::new(RwLock::new(self));
let mut rl_guard = ratelimiter_ar.write().await;
let rsp = if self.timer.elapsed().as_secs() >= TIME_THRESHOLD_S {
self.timer = Instant::now();
self.msgcounter = 0;
let rsp = if rl_guard.timer.elapsed().as_secs() >= TIME_THRESHOLD_S {
rl_guard.timer = Instant::now();
rl_guard.msgcounter = 0;
LimiterResp::Allow
} else if self.msgcounter < MSG_THRESHOLD &&
self.lastmsgtimer.elapsed().as_secs_f64() >= TIME_MIN_S_F64 {
} else if rl_guard.msgcounter < MSG_THRESHOLD &&
rl_guard.lastmsgtimer.elapsed().as_secs_f64() >= TIME_MIN_S_F64 {
LimiterResp::Allow
} else {
// when elapsed() < TIME_THRESHOLD_S && msgcounter >= MSG_THRESHOLD
// LimiterResp::Skip
LimiterResp::Sleep(TIME_MIN_S_F64 - self.lastmsgtimer.elapsed().as_secs_f64() + 0.1)
LimiterResp::Sleep(TIME_MIN_S_F64 - rl_guard.lastmsgtimer.elapsed().as_secs_f64() + 0.1)
};
// [ ] If rsp is still allow at this point, also do a dupcheck to adjust rsp to a sleep
dbg!(inmsg.clone());
dbg!(self.dupmsg(inmsg.clone()));
dbg!(rl_guard.dupmsg(inmsg.clone()));
dbg!("Before Dup test",rsp.clone());
dbg!(matches!(rsp.clone(),LimiterResp::Allow));
let rsp = if self.dupmsg(inmsg) && matches!(rsp,LimiterResp::Allow) {
let rsp = if rl_guard.dupmsg(inmsg.clone()) && matches!(rsp,LimiterResp::Allow) {
//self.lastmsgdup = String::new();
// LimiterResp::Sleep(DUPMSG_MIN_SEND_S)
LimiterResp::Sleep(DUPMSG_MIN_SEND_S - self.lastmsgtimer.elapsed().as_secs_f64() + 0.1)
dbg!("Duplicate detected");
LimiterResp::Sleep(DUPMSG_MIN_SEND_S - rl_guard.lastmsgtimer.elapsed().as_secs_f64() + 0.1)
} else { rsp.clone() };
// => 04.02 - Don't update here
// // After Dup is checked, Set the lastmsgdup to latest message
// dbg!("Before Assigning Lastmsgdup",rl_guard.lastmsgdup.clone(),inmsg.clone(),rsp.clone());
// rl_guard.lastmsgdup = inmsg.clone();
// dbg!("After Assigning Lastmsgdup",rl_guard.lastmsgdup.clone(),inmsg.clone());
// [ ] Allows if sleep came up as negative
let rsp = match rsp {
LimiterResp::Sleep(sleeptime) if sleeptime < 0.0 => LimiterResp::Allow , // allow is sleeptime evaluated is negative
_ => rsp.clone(),
} ;
dbg!(rsp.clone());
botlog::trace(
&format!("Limiter Response : {:?} ; Elapsed (as_sec_f64) : {}",
rsp, self.lastmsgtimer.elapsed().as_secs_f64()),
rsp, rl_guard.lastmsgtimer.elapsed().as_secs_f64()),
Some("Rate Limiter Inner".to_string()),
None,
);
dbg!("check_limiter() > ratelimiter guard",rl_guard.clone());
drop(rl_guard);
rsp
}
@ -119,9 +147,21 @@ impl RateLimiter {
// self.lastmsgtimer = Instant::now();
// }
pub fn sent_msg(&mut self,msgstr : String) {
pub async fn sent_msg(&mut self,msgstr : String) {
dbg!("sent_msg triggered",&msgstr);
// let ratelimiter_ar = Arc::new(RwLock::new((*self).clone()));
// let mut rl_guard = ratelimiter_ar.write().await;
dbg!("sent_msg > lastmsg before update",&self.lastmsgdup);
self.lastmsgdup = msgstr;
self.msgcounter += 1;
self.lastmsgtimer = Instant::now();
dbg!("sent_msg > lastmsg after update",&self.lastmsgdup);
// drop(rl_guard);
}
}