use crate::executor::ExecutorCommand; use crate::log; use crate::node::node; use super::ProtocolMessage; use tokio::net; use tokio::sync::mpsc; use super::Connector; use vlogger::*; #[allow(dead_code)] #[derive(Debug)] pub struct Connection { node_id: uuid::Uuid, peer_id: uuid::Uuid, stream: net::TcpStream, exec_tx: mpsc::Sender, rx: mpsc::Receiver, } impl Connection { pub fn new( node_id: uuid::Uuid, peer_id: uuid::Uuid, stream: net::TcpStream, exec_tx: mpsc::Sender, rx: mpsc::Receiver, ) -> Self { Self { node_id, peer_id, stream, rx, exec_tx, } } pub async fn start(mut self) { tokio::spawn(async move { log(msg!(DEBUG, "Started Message Handler for {}", self.peer_id)); loop { tokio::select! { response_result = self.rx.recv() => { match response_result { Some(response) => { if let Err(e) = Connector::send_message(&mut self.stream, &response).await { log(msg!(ERROR, "Failed to send response to {}: {}", self.peer_id, e)); break; } }, None => { log(msg!(DEBUG, "Response channel closed for {}", self.peer_id)); break; } } } message_result = Connector::receive_message(&mut self.stream) => { match message_result { Ok(message) => { log(msg!(DEBUG, "Received Message from {}", self.peer_id)); let command = ExecutorCommand::Node(node::NodeCommand::ProcessMessage { peer_id: self.peer_id, message: message.clone() }); if self.exec_tx.send(command).await.is_err() { log(msg!(ERROR, "Failed to send command to main thread from {}", self.peer_id)); break; } }, Err(e) => { log(msg!(WARNING, "Connection to {} closed: {}", self.peer_id, e)); let cmd = ExecutorCommand::Node(node::NodeCommand::RemovePeer { peer_id: self.peer_id }); self.exec_tx.send(cmd).await.unwrap(); break; } } } } } }); } }