Add listener ability to sdk.

This commit is contained in:
James Pace 2025-02-16 10:17:40 -05:00
parent 253b844d4c
commit c0cd3c56c4
3 changed files with 61 additions and 12 deletions

25
src/bin/listener.rs Normal file
View File

@ -0,0 +1,25 @@
use matrix_sdk::ruma::events::room::message::MessageType;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let url = "http://matrix.internal.jpace121.net:8008";
let token = std::env::var("TOKEN")?;
let device_name = "my_bot";
let listener = j7s_matrix_sdk::MatrixListener::new(&url, &token, &device_name).await?;
let mut receiver = listener.get_receiver();
#[allow(irrefutable_let_patterns)]
while let (message, room) = receiver.recv().await? {
match message {
MessageType::Text(content) => println!(
"{}: {}",
room.name().unwrap_or("N/A".to_string()),
content.body
),
_ => continue,
}
}
Ok(())
}

View File

@ -2,4 +2,4 @@ mod broadcaster;
mod listener;
pub use broadcaster::*;
//pub use listener::*;
pub use listener::*;

View File

@ -1,7 +1,6 @@
use anyhow::Context;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::sync::broadcast;
use tokio::sync::RwLock;
pub type MessageFromRoom = (
@ -13,7 +12,7 @@ pub type MessageFromRoom = (
pub struct MatrixListener {
client: Arc<matrix_sdk::Client>,
rooms: Arc<RwLock<Vec<matrix_sdk::Room>>>,
sender: tokio::sync::mpsc::Sender<MessageFromRoom>,
sender: broadcast::Sender<MessageFromRoom>,
}
impl MatrixListener {
@ -38,10 +37,13 @@ impl MatrixListener {
.await
.context("Failed to login.")?;
let (sender, _) = broadcast::channel(10); // TODO: What should the capacity be?
// Build our object with the client.
let object = Self {
client: Arc::new(client),
rooms: Arc::new(RwLock::new(Vec::new())),
sender: sender,
};
object.spin().await?;
return Ok(object);
@ -65,21 +67,44 @@ impl MatrixListener {
Ok(())
}
pub fn get_receiver(&self) -> broadcast::Receiver<MessageFromRoom> {
self.sender.subscribe()
}
async fn spin(self: &Self) -> anyhow::Result<()> {
// The callback function.
let clone_of_me = self.clone();
// The callback functions.
let clone_of_me_for_stripped = self.clone();
let on_stripped_state_member =
|room_member: matrix_sdk::ruma::events::room::member::StrippedRoomMemberEvent,
client: matrix_sdk::Client,
room: matrix_sdk::Room| async move {
clone_of_me.join_callback(room_member, client, room).await;
clone_of_me_for_stripped
.join_callback(room_member, client, room)
.await;
};
let clone_of_me_for_message = self.clone();
let on_room_message =
|event: matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent,
room: matrix_sdk::Room,
client: matrix_sdk::Client| async move {
clone_of_me_for_message
.message_callback(event, room, client)
.await;
};
self.client.add_event_handler(on_stripped_state_member);
// Spawn a task to sync.
let sync_token = self
.client
.sync_once(matrix_sdk::config::SyncSettings::default())
.await
.context("Failed on initial sync.")?
.next_batch;
self.client.add_event_handler(on_room_message);
// Spawn a task for the rest of the syncs.
let spawn_client = self.client.clone();
tokio::spawn(async move {
let sync_err = spawn_client
.sync(matrix_sdk::config::SyncSettings::default())
.sync(matrix_sdk::config::SyncSettings::default().token(sync_token))
.await;
if sync_err.is_err() {
log::error!("Sync error: {:?}", sync_err);
@ -93,7 +118,7 @@ impl MatrixListener {
self: &Self,
event: matrix_sdk::ruma::events::room::message::OriginalSyncRoomMessageEvent,
room: matrix_sdk::Room,
client: matirx_sdk::Client,
client: matrix_sdk::Client,
) {
// Ignore events for rooms I'm not in.
if room.state() != matrix_sdk::RoomState::Joined {
@ -104,8 +129,7 @@ impl MatrixListener {
return;
}
let content = RoomMessageEventContent::text_plain("i am groot");
room.send(content).await.unwrap();
let _ = self.sender.send((event.content.msgtype, room));
}
async fn join_callback(