use crate::native_node::{message, node}; use crate::seeds_constants::SEED_NODES; use vlogger::*; use tokio::select; use tokio::sync::mpsc; impl node::NativeNode { pub async fn connect_to_seeds(&mut self, sender: mpsc::Sender) { for seed in SEED_NODES { if seed != self.addr { if let Ok(mut stream) = tokio::net::TcpStream::connect(seed).await { if let Ok(peer_id) = node::NativeNode::send_handshake(self.id.clone(), &mut stream).await { let sender = sender.clone(); node::NativeNode::establish_connection(peer_id, seed.to_string(), stream, sender).await; } } } } } pub async fn establish_connection( peer_id: uuid::Uuid, addr: String, stream: tokio::net::TcpStream, request_sender: tokio::sync::mpsc::Sender ) { let (response_sender, response_receiver) = mpsc::channel::(100); let add_peer = node::NodeCommand::AddPeer { peer_id, addr: addr.clone(), sender: response_sender }; if let Err(_) = request_sender.send(add_peer).await { log!(ERROR, "Failed to send AddPeer to {}", addr); } log!(INFO, "Established Connection with {}", addr); node::NativeNode::start_peer_handler(stream, peer_id, request_sender.clone(), response_receiver).await; } pub async fn accept_connections( listner: tokio::net::TcpListener, request_sender: tokio::sync::mpsc::Sender, node_id: uuid::Uuid ) { log!(INFO, "Starting to accept connections"); while let Ok((mut stream, addr)) = listner.accept().await { let handshake_response = message::ProtocolMessage::Handshake { node_id: node_id.clone(), version: "".to_string() }; if let Ok(_) = node::NativeNode::send_message(&mut stream, &handshake_response).await { if let Ok(message) = node::NativeNode::receive_message(&mut stream).await { match message { message::ProtocolMessage::Handshake { node_id, .. } => { node::NativeNode::establish_connection(node_id, addr.to_string(), stream, request_sender.clone()).await; }, _ => { log!(WARNING, "Invalid Response! expected Handshake, got {:?}", message); } } } } } } async fn start_peer_handler( mut stream: tokio::net::TcpStream, peer_id: uuid::Uuid, request_sender: tokio::sync::mpsc::Sender, mut response_receiver: tokio::sync::mpsc::Receiver ) { let peer_id_clone = peer_id.clone(); tokio::spawn(async move { log!(INFO, "Started Message Handler for {}", peer_id_clone); loop { select! { response_result = response_receiver.recv() => { match response_result { Some(response) => { log!(INFO, "Sending response to {peer_id_clone}: {:#?}", response); if let Err(e) = node::NativeNode::send_message(&mut stream, &response).await { log!(ERROR, "Failed to send response to {peer_id_clone}: {}", e); break; } }, None => { log!(INFO, "Response channel closed for {peer_id_clone}"); break; } } } message_result = node::NativeNode::receive_message(&mut stream) => { match message_result { Ok(message) => { log!(INFO, "Received Message from {peer_id_clone}"); let command = node::NodeCommand::ProcessMessage { peer_id, message: message.clone() }; if request_sender.send(command).await.is_err() { log!(ERROR, "Failed to send command to main thread from {peer_id}"); break; } }, Err(e) => { log!(WARNING, "Connection to {peer_id_clone} closed: {}", e.message); let cmd = node::NodeCommand::RemovePeer { peer_id: peer_id_clone.clone() }; request_sender.send(cmd).await.unwrap(); break; } } } } } }); } }