From 71933d5827da7facb91ba8765fd2cdb9182a2f71 Mon Sep 17 00:00:00 2001 From: James Pace Date: Sat, 15 Feb 2025 15:39:07 -0500 Subject: [PATCH] Init commit. --- .gitignore | 2 + Cargo.toml | 14 +++++ src/bin/broadcaster.rs | 15 ++++++ src/bin/ex.rs | 87 +++++++++++++++++++++++++++++++ src/broadcaster.rs | 115 +++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 ++ 6 files changed, 236 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 src/bin/broadcaster.rs create mode 100644 src/bin/ex.rs create mode 100644 src/broadcaster.rs create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..629ec6d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "j7s-matrix-sdk" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.95" +env_logger = "0.11.6" +log = "0.4.25" +matrix-sdk = { version = "0.10.0", features = ["anyhow", "e2e-encryption", "markdown", "sso-login", "experimental-oidc"] } +serde_json = "1.0.138" +tokio = { version = "1.43.0", features = ["full"] } diff --git a/src/bin/broadcaster.rs b/src/bin/broadcaster.rs new file mode 100644 index 0000000..b63024a --- /dev/null +++ b/src/bin/broadcaster.rs @@ -0,0 +1,15 @@ +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let url = "http://matrix.internal.jpace121.net:8008"; + let token = std::env::var("TOKEN")?; + let device_name = "my_bot"; + + let broadcaster = j7s_matrix_sdk::MatrixBroadcaster::new(&url, &token, &device_name).await?; + + let message = + matrix_sdk::ruma::events::room::message::RoomMessageEventContent::text_plain("Hi!"); + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + broadcaster.broadcast(&message).await?; + } +} diff --git a/src/bin/ex.rs b/src/bin/ex.rs new file mode 100644 index 0000000..f434a93 --- /dev/null +++ b/src/bin/ex.rs @@ -0,0 +1,87 @@ +use matrix_sdk::{ + config::SyncSettings, + ruma::events::room::{ + member::StrippedRoomMemberEvent, + message::{MessageType, OriginalSyncRoomMessageEvent, RoomMessageEventContent}, + }, + Client, Room, RoomState, +}; +use tokio::time::{sleep, Duration}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let client = Client::builder() + .homeserver_url("http://matrix.internal.jpace121.net:8008") + .build() + .await?; + + let token = std::env::var("TOKEN")?; + + // First we need to log in. + let mut login_data = serde_json::Map::new(); + login_data.insert("token".to_string(), token.into()); + client + .matrix_auth() + .login_custom("org.matrix.login.jwt", login_data)? + .initial_device_display_name("bot") + .send() + .await?; + + client.add_event_handler(on_stripped_state_member); + let sync_token = client + .sync_once(SyncSettings::default()) + .await + .unwrap() + .next_batch; + + client.add_event_handler(on_room_message); + + let settings = SyncSettings::default().token(sync_token); + client.sync(settings).await?; + + Ok(()) +} + +// This is going to respond to invites. +async fn on_stripped_state_member( + room_member: StrippedRoomMemberEvent, + client: Client, + room: Room, +) { + if room_member.state_key != client.user_id().unwrap() { + return; + } + + tokio::spawn(async move { + println!("Autojoining room {}", room.room_id()); + let mut delay = 2; + + while let Err(err) = room.join().await { + sleep(Duration::from_secs(delay)).await; + delay *= 2; + + if delay > 3600 { + eprintln!("Can't join room {} ({err:?})", room.room_id()); + break; + } + } + println!("Successfully joined room {}", room.room_id()); + }); +} + +async fn on_room_message(event: OriginalSyncRoomMessageEvent, room: Room, client: Client) { + if room.state() != RoomState::Joined { + return; + } + if event.sender == client.user_id().unwrap() { + return; + } + let MessageType::Text(text_content) = event.content.msgtype else { + return; + }; + + println!("{:?}", text_content.body); + + let content = RoomMessageEventContent::text_plain("i am groot"); + room.send(content).await.unwrap(); +} diff --git a/src/broadcaster.rs b/src/broadcaster.rs new file mode 100644 index 0000000..9618b86 --- /dev/null +++ b/src/broadcaster.rs @@ -0,0 +1,115 @@ +use anyhow::Context; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Clone)] +pub struct MatrixBroadcaster { + client: Arc, + rooms: Arc>>, +} + +impl MatrixBroadcaster { + pub async fn new(url: &str, token: &str, device_name: &str) -> anyhow::Result { + let mut login_data = serde_json::Map::new(); + login_data.insert("token".to_string(), token.into()); + + // Make a client. + let client = matrix_sdk::Client::builder() + .homeserver_url(url) + .build() + .await + .context("Could not build client.")?; + + // Now log the client in. + client + .matrix_auth() + .login_custom("org.matrix.login.jwt", login_data) + .context("Failed to set custom login up.")? + .initial_device_display_name(device_name) + .send() + .await + .context("Failed to login.")?; + + // Build our object with the client. + let object = Self { + client: Arc::new(client), + rooms: Arc::new(RwLock::new(Vec::new())), + }; + object.spin().await?; + return Ok(object); + } + + pub async fn broadcast( + self: &Self, + msg: &matrix_sdk::ruma::events::room::message::RoomMessageEventContent, + ) -> anyhow::Result<()> { + let rooms_clone = self.rooms.clone(); + let rooms = rooms_clone.read().await.clone(); + + for room in rooms { + room.send(msg.clone()) + .await + .context("Failed to send message to room.")?; + } + + Ok(()) + } + + async fn spin(self: &Self) -> anyhow::Result<()> { + // The callback function. + let clone_of_me = self.clone(); + let on_stripped_state_member = + |room_member: matrix_sdk::ruma::events::room::member::StrippedRoomMemberEvent, + client: matrix_sdk::Client, + room: matrix_sdk::Room| async move { + clone_of_me.join_callback(room_member, client, room).await; + }; + self.client.add_event_handler(on_stripped_state_member); + // Spawn a task to sync. + let spawn_client = self.client.clone(); + tokio::spawn(async move { + let sync_err = spawn_client + .sync(matrix_sdk::config::SyncSettings::default()) + .await; + if sync_err.is_err() { + log::error!("Sync error: {:?}", sync_err); + } + }); + + Ok(()) + } + + async fn join_callback( + self: &Self, + room_member: matrix_sdk::ruma::events::room::member::StrippedRoomMemberEvent, + client: matrix_sdk::Client, + room: matrix_sdk::Room, + ) { + // This event wasn't for me... + if room_member.state_key != client.user_id().unwrap() { + return; + } + + // Spawn a task to try to join the room. + let self_copy = self.clone(); + tokio::spawn(async move { + log::debug!("Autojoining room {}", room.room_id()); + let mut delay = 2; + + while let Err(err) = room.join().await { + tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await; + delay *= 2; + + if delay > 3600 { + log::debug!("Can't join room {} ({err:?})", room.room_id()); + break; + } + } + log::debug!("Successfully joined room {}", room.room_id()); + let rooms_clone = self_copy.rooms.clone(); + let mut rooms = rooms_clone.write().await; + rooms.push(room); + log::debug!("Tracking {} rooms.", rooms.len()); + }); + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..0495d70 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,3 @@ +mod broadcaster; + +pub use broadcaster::*;