From c0cd3c56c44eb5915dd4714323e09820977596c9 Mon Sep 17 00:00:00 2001 From: James Pace Date: Sun, 16 Feb 2025 10:17:40 -0500 Subject: [PATCH] Add listener ability to sdk. --- src/bin/listener.rs | 25 ++++++++++++++++++++++++ src/lib.rs | 2 +- src/listener.rs | 46 ++++++++++++++++++++++++++++++++++----------- 3 files changed, 61 insertions(+), 12 deletions(-) create mode 100644 src/bin/listener.rs diff --git a/src/bin/listener.rs b/src/bin/listener.rs new file mode 100644 index 0000000..42d9323 --- /dev/null +++ b/src/bin/listener.rs @@ -0,0 +1,25 @@ +use matrix_sdk::ruma::events::room::message::MessageType; +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let url = "http://matrix.internal.jpace121.net:8008"; + let token = std::env::var("TOKEN")?; + let device_name = "my_bot"; + + let listener = j7s_matrix_sdk::MatrixListener::new(&url, &token, &device_name).await?; + + let mut receiver = listener.get_receiver(); + + #[allow(irrefutable_let_patterns)] + while let (message, room) = receiver.recv().await? { + match message { + MessageType::Text(content) => println!( + "{}: {}", + room.name().unwrap_or("N/A".to_string()), + content.body + ), + _ => continue, + } + } + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index a13fe60..237e80b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,4 +2,4 @@ mod broadcaster; mod listener; pub use broadcaster::*; -//pub use listener::*; +pub use listener::*; diff --git a/src/listener.rs b/src/listener.rs index 0286c48..3f89000 100644 --- a/src/listener.rs +++ b/src/listener.rs @@ -1,7 +1,6 @@ use anyhow::Context; use std::sync::Arc; -use tokio::sync::mpsc; -use tokio::sync::Mutex; +use tokio::sync::broadcast; use tokio::sync::RwLock; pub type MessageFromRoom = ( @@ -13,7 +12,7 @@ pub type MessageFromRoom = ( pub struct MatrixListener { client: Arc, rooms: Arc>>, - sender: tokio::sync::mpsc::Sender, + sender: broadcast::Sender, } impl MatrixListener { @@ -38,10 +37,13 @@ impl MatrixListener { .await .context("Failed to login.")?; + let (sender, _) = broadcast::channel(10); // TODO: What should the capacity be? + // Build our object with the client. let object = Self { client: Arc::new(client), rooms: Arc::new(RwLock::new(Vec::new())), + sender: sender, }; object.spin().await?; return Ok(object); @@ -65,21 +67,44 @@ impl MatrixListener { Ok(()) } + pub fn get_receiver(&self) -> broadcast::Receiver { + self.sender.subscribe() + } + async fn spin(self: &Self) -> anyhow::Result<()> { - // The callback function. - let clone_of_me = self.clone(); + // The callback functions. + let clone_of_me_for_stripped = self.clone(); let on_stripped_state_member = |room_member: matrix_sdk::ruma::events::room::member::StrippedRoomMemberEvent, client: matrix_sdk::Client, room: matrix_sdk::Room| async move { - clone_of_me.join_callback(room_member, client, room).await; + clone_of_me_for_stripped + .join_callback(room_member, client, room) + .await; }; + let clone_of_me_for_message = self.clone(); + let on_room_message = + |event: matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent, + room: matrix_sdk::Room, + client: matrix_sdk::Client| async move { + clone_of_me_for_message + .message_callback(event, room, client) + .await; + }; + self.client.add_event_handler(on_stripped_state_member); - // Spawn a task to sync. + let sync_token = self + .client + .sync_once(matrix_sdk::config::SyncSettings::default()) + .await + .context("Failed on initial sync.")? + .next_batch; + self.client.add_event_handler(on_room_message); + // Spawn a task for the rest of the syncs. let spawn_client = self.client.clone(); tokio::spawn(async move { let sync_err = spawn_client - .sync(matrix_sdk::config::SyncSettings::default()) + .sync(matrix_sdk::config::SyncSettings::default().token(sync_token)) .await; if sync_err.is_err() { log::error!("Sync error: {:?}", sync_err); @@ -93,7 +118,7 @@ impl MatrixListener { self: &Self, event: matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent, room: matrix_sdk::Room, - client: matirx_sdk::Client, + client: matrix_sdk::Client, ) { // Ignore events for rooms I'm not in. if room.state() != matrix_sdk::RoomState::Joined { @@ -104,8 +129,7 @@ impl MatrixListener { return; } - let content = RoomMessageEventContent::text_plain("i am groot"); - room.send(content).await.unwrap(); + let _ = self.sender.send((event.content.msgtype, room)); } async fn join_callback(