From 253b844d4cf8826203fa86eb3eec6d0e145a6b62 Mon Sep 17 00:00:00 2001 From: James Pace Date: Sun, 16 Feb 2025 09:19:49 -0500 Subject: [PATCH] Start listener. --- src/lib.rs | 2 + src/listener.rs | 144 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 src/listener.rs diff --git a/src/lib.rs b/src/lib.rs index 0495d70..a13fe60 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,5 @@ mod broadcaster; +mod listener; pub use broadcaster::*; +//pub use listener::*; diff --git a/src/listener.rs b/src/listener.rs new file mode 100644 index 0000000..0286c48 --- /dev/null +++ b/src/listener.rs @@ -0,0 +1,144 @@ +use anyhow::Context; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::sync::Mutex; +use tokio::sync::RwLock; + +pub type MessageFromRoom = ( + matrix_sdk::ruma::events::room::message::MessageType, + matrix_sdk::Room, +); + +#[derive(Clone)] +pub struct MatrixListener { + client: Arc, + rooms: Arc>>, + sender: tokio::sync::mpsc::Sender, +} + +impl MatrixListener { + pub async fn new(url: &str, token: &str, device_name: &str) -> anyhow::Result { + let mut login_data = serde_json::Map::new(); + login_data.insert("token".to_string(), token.into()); + + // Make a client. + let client = matrix_sdk::Client::builder() + .homeserver_url(url) + .build() + .await + .context("Could not build client.")?; + + // Now log the client in. + client + .matrix_auth() + .login_custom("org.matrix.login.jwt", login_data) + .context("Failed to set custom login up.")? + .initial_device_display_name(device_name) + .send() + .await + .context("Failed to login.")?; + + // Build our object with the client. + let object = Self { + client: Arc::new(client), + rooms: Arc::new(RwLock::new(Vec::new())), + }; + object.spin().await?; + return Ok(object); + } + + pub async fn broadcast( + self: &Self, + msg: &matrix_sdk::ruma::events::room::message::MessageType, + ) -> anyhow::Result<()> { + let rooms_clone = self.rooms.clone(); + let rooms = rooms_clone.read().await.clone(); + + for room in rooms { + room.send( + matrix_sdk::ruma::events::room::message::RoomMessageEventContent::new(msg.clone()), + ) + .await + .context("Failed to send message to room.")?; + } + + Ok(()) + } + + async fn spin(self: &Self) -> anyhow::Result<()> { + // The callback function. + let clone_of_me = self.clone(); + let on_stripped_state_member = + |room_member: matrix_sdk::ruma::events::room::member::StrippedRoomMemberEvent, + client: matrix_sdk::Client, + room: matrix_sdk::Room| async move { + clone_of_me.join_callback(room_member, client, room).await; + }; + self.client.add_event_handler(on_stripped_state_member); + // Spawn a task to sync. + let spawn_client = self.client.clone(); + tokio::spawn(async move { + let sync_err = spawn_client + .sync(matrix_sdk::config::SyncSettings::default()) + .await; + if sync_err.is_err() { + log::error!("Sync error: {:?}", sync_err); + } + }); + + Ok(()) + } + + async fn message_callback( + self: &Self, + event: matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent, + room: matrix_sdk::Room, + client: matirx_sdk::Client, + ) { + // Ignore events for rooms I'm not in. + if room.state() != matrix_sdk::RoomState::Joined { + return; + } + // Don't respond to myself. + if event.sender == client.user_id().unwrap() { + return; + } + + let content = RoomMessageEventContent::text_plain("i am groot"); + room.send(content).await.unwrap(); + } + + async fn join_callback( + self: &Self, + room_member: matrix_sdk::ruma::events::room::member::StrippedRoomMemberEvent, + client: matrix_sdk::Client, + room: matrix_sdk::Room, + ) { + // This event wasn't for me... + if room_member.state_key != client.user_id().unwrap() { + return; + } + + // Spawn a task to try to join the room. + let self_copy = self.clone(); + tokio::spawn(async move { + log::debug!("Autojoining room {}", room.room_id()); + let mut delay = 2; + + while let Err(err) = room.join().await { + tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await; + delay *= 2; + + if delay > 3600 { + log::debug!("Can't join room {} ({err:?})", room.room_id()); + break; + } + } + log::debug!("Successfully joined room {}", room.room_id()); + let rooms_clone = self_copy.rooms.clone(); + let mut rooms = rooms_clone.write().await; + rooms.push(room); + log::debug!("Tracking {} rooms.", rooms.len()); + }); + } +}