From edbebe64fe433d40f257b746172ae68c11b82c81 Mon Sep 17 00:00:00 2001 From: Victor Vobis Date: Tue, 10 Mar 2026 22:37:48 +0100 Subject: [PATCH] Finishing channel refactor, todo: fix chain rebuilding --- node/package-lock.json | 6 + node/src/args.rs | 15 +- node/src/cli.rs | 7 +- node/src/network/connection.rs | 36 ++-- node/src/network/connector.rs | 132 ++++++------ node/src/network/message.rs | 17 +- node/src/node/blockchain.rs | 4 +- node/src/node/error.rs | 4 - node/src/node/node.rs | 370 +++++++++++++++------------------ node/src/watcher/builder.rs | 11 +- node/src/watcher/watcher.rs | 57 +++-- 11 files changed, 322 insertions(+), 337 deletions(-) create mode 100644 node/package-lock.json diff --git a/node/package-lock.json b/node/package-lock.json new file mode 100644 index 0000000..93bc066 --- /dev/null +++ b/node/package-lock.json @@ -0,0 +1,6 @@ +{ + "name": "node", + "lockfileVersion": 3, + "requires": true, + "packages": {} +} diff --git a/node/src/args.rs b/node/src/args.rs index 3fa8a6f..b0369a6 100644 --- a/node/src/args.rs +++ b/node/src/args.rs @@ -1,11 +1,9 @@ use std::net::SocketAddr; -use shared::blockchain_core; +use shared::blockchain_core::{self, Address, AddressParser}; use cli_renderer::RenderLayoutKind; use clap::{Parser, Subcommand}; -use clap::*; - #[derive(Parser)] pub struct Cli { #[command(subcommand)] @@ -14,6 +12,7 @@ pub struct Cli { #[derive(Subcommand)] pub enum CliCommand { + /// Ping a node #[command(name = "ping")] Ping { #[command(subcommand)] @@ -34,12 +33,12 @@ pub enum CliCommand { block_cmd: CliBlockCommand, }, + /// Award currency to wallet #[command(name = "award")] Award { - #[arg(short, long)] + #[clap(value_parser = AddressParser {})] + address: Address, amount: u64, - #[arg(short, long)] - address: String, }, /// Make a Transaction @@ -120,14 +119,14 @@ pub enum CliPingCommand { /// Ping Peer by Id #[command(name = "id", aliases = ["i"])] Id { - #[arg(short, long)] + #[arg()] id: String, }, /// Ping Peer by Address #[command(name = "addr", aliases = ["a", "ad"])] Addr { - #[arg(short, long)] + #[arg()] addr: String, }, } diff --git a/node/src/cli.rs b/node/src/cli.rs index 11ff17b..39498aa 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -60,15 +60,12 @@ pub fn cli(input: &str) -> WatcherCommand { WatcherCommand::Node(NodeCommand::ProcessChainData(ChainData::NodeTransaction(tx))) } CliCommand::Award { address, amount } => { - let mut bytes = [0u8; 20]; + log(msg!(DEBUG, "Received address: {:?}", address)); if address.len() != 20 { log(msg!(ERROR, "Invalid address length")) - } else if !address.is_ascii() { - log(msg!(ERROR, "Invalid address content")) } - bytes.copy_from_slice(address.as_bytes()); - WatcherCommand::Node(NodeCommand::AwardCurrency{ address: bytes, amount } + WatcherCommand::Node(NodeCommand::AwardCurrency{ address, amount } )} CliCommand::DebugShowId => WatcherCommand::Node(NodeCommand::ShowId), CliCommand::StartListner { addr } => { diff --git a/node/src/network/connection.rs b/node/src/network/connection.rs index 35f7cc2..03d3ef4 100644 --- a/node/src/network/connection.rs +++ b/node/src/network/connection.rs @@ -3,7 +3,7 @@ use crate::network::NodeId; use crate::node::node; use super::ProtocolMessage; use tokio::net; -use tokio::sync::mpsc; +use futures::stream::Stream; use super::Connector; @@ -14,22 +14,27 @@ use vlogger::*; pub struct Connection { node_id: NodeId, peer_id: NodeId, - stream: net::TcpStream, - rx: mpsc::Receiver, + r_stream: net::tcp::OwnedReadHalf, +} + +impl Stream for Connection { + type Item = (NodeId, Option); + + fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { + todo!("Impl poll next") + } } impl Connection { pub fn new( node_id: NodeId, peer_id: NodeId, - stream: net::TcpStream, - rx: mpsc::Receiver, + r_stream: net::tcp::OwnedReadHalf, ) -> Self { Self { node_id, peer_id, - stream, - rx, + r_stream, } } @@ -38,22 +43,7 @@ impl Connection { loop { tokio::select! { - response_result = self.rx.recv() => { - match response_result { - Some(response) => { - if let Err(e) = Connector::send_message(&mut self.stream, &response).await { - log(msg!(ERROR, "Failed to send response to {}: {}", self.peer_id.clone(), e)); - break; - } - }, - None => { - log(msg!(DEBUG, "Response channel closed for {}", self.peer_id)); - break; - } - } - } - - message_result = Connector::receive_message(&mut self.stream) => { + message_result = Connector::receive_message(&mut self.r_stream) => { match message_result { Ok(message) => { todo!("[TODO] Return parsed command to propagate"); diff --git a/node/src/network/connector.rs b/node/src/network/connector.rs index 2f0c63b..636907a 100644 --- a/node/src/network/connector.rs +++ b/node/src/network/connector.rs @@ -1,19 +1,18 @@ -use anyhow::Context; +use futures::stream::{ StreamExt, SelectAll }; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use std::net::SocketAddr; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net; -use tokio::sync::mpsc; use vlogger::*; use shared::print_error_chain; use thiserror::*; use crate::db::BINCODE_CONFIG; use crate::log; -use crate::network::NodeId; +use crate::network::{NodeId, ProtocolError}; use super::Connection; use crate::bus::*; use crate::node::node; -use crate::node::{NetworkError, error}; use super::ProtocolMessage; pub enum ConnectorCommand { @@ -26,6 +25,8 @@ pub struct Connector { node_id: NodeId, addr: SocketAddr, connections: Vec, + peer_streams: Vec<(NodeId, OwnedWriteHalf)>, + streams: SelectAll, listener: Option, exit: bool, } @@ -33,7 +34,17 @@ pub struct Connector { #[derive(Error, Debug)] pub enum ConnectorError { #[error("Connection failed")] - ConnectionError(#[from] anyhow::Error), + IO(#[from] std::io::Error), + #[error("Decode Error: {0}")] + Decode(#[from] bincode::error::DecodeError), + #[error("Encode Error: {0}")] + Encode(#[from] bincode::error::EncodeError), + #[error("Protocol Error: {0}")] + Protocol(#[from] crate::network::message::ProtocolError), + #[error("Unkown Peer Id: {0}")] + UnknownPeerId(NodeId), + #[error("Unkown Peer Address: {0}")] + UnknownPeerAddr(SocketAddr), } const MAX_LISTNER_TRIES: usize = 5; @@ -47,6 +58,8 @@ impl Connector { node_id, addr, connections: Vec::new(), + peer_streams: Vec::new(), + streams: SelectAll::new(), listener: None, exit: false, } @@ -71,8 +84,11 @@ impl Connector { pub async fn poll(&mut self) -> Result, ConnectorError> { if let Some(listener) = &mut self.listener { - todo!("Implement Vec Poll for connections"); tokio::select! { + protocol_message = self.streams.next() => { + todo!("Implement protocol message"); + } + // cmd_result = self.rx.recv() => { // todo!("Implement Vec Poll for connections"); // match cmd_result { @@ -108,7 +124,7 @@ impl Connector { } } - pub async fn execute_cmd(&mut self, cmd: ConnectorCommand) -> Result, ConnectorError> { + pub async fn command(&mut self, cmd: ConnectorCommand) -> Result, ConnectorError> { match cmd { ConnectorCommand::ConnectToTcpPeer(addr) => { let peer = self.connect_to_peer(addr).await?; @@ -128,7 +144,6 @@ impl Connector { pub async fn connect_to_seed(&mut self, addr: SocketAddr) -> Result { match net::TcpStream::connect(addr) .await - .with_context(|| format!("Connecting to {}", addr)) { Ok(stream) => { let peer = self.establish_connection_outbound(stream, addr).await; @@ -136,9 +151,8 @@ impl Connector { peer }, Err(e) => { - // let err = ConnectorError::ConnectionError(e.into()); - print_error_chain(&e.into()); - todo!("Handle connector error propagation"); + let err = ConnectorError::IO(e); + Err(err) } } } @@ -147,9 +161,8 @@ impl Connector { match net::TcpStream::connect(addr).await { Ok(stream) => self.establish_connection_inbound(stream, addr).await, Err(e) => { - let err = ConnectorError::ConnectionError(e.into()); - print_error_chain(&err.into()); - todo!("Handle connector error propagation"); + let err = ConnectorError::IO(e); + Err(err) } } } @@ -158,29 +171,28 @@ impl Connector { match net::TcpStream::connect(addr).await { Ok(stream) => self.establish_connection_outbound(stream, addr).await, Err(e) => { - let err = ConnectorError::ConnectionError(e.into()); - print_error_chain(&err.into()); - todo!("Handle connector error propagation"); + let err = ConnectorError::IO(e); + Err(err) } } } async fn establish_connection_outbound( &mut self, - mut stream: tokio::net::TcpStream, + stream: tokio::net::TcpStream, addr: SocketAddr, ) -> Result { + let (mut r_stream, mut w_stream) = stream.into_split(); let handshake = ProtocolMessage::Handshake { peer_id: self.node_id.clone(), version: "".to_string(), }; - match Connector::send_message(&mut stream, &handshake).await { + match Connector::send_message(&mut w_stream, &handshake).await { Ok(()) => { - if let Ok(mes) = Connector::receive_message(&mut stream).await { - let (ch_tx, ch_rx) = mpsc::channel::(100); + if let Ok(mes) = Connector::receive_message(&mut r_stream).await { let peer = match mes { ProtocolMessage::HandshakeAck { peer_id, .. } => { - node::TcpPeer::new(peer_id, addr, ch_tx) + node::TcpPeer::new(peer_id, addr) } _ => { log(msg!( @@ -190,7 +202,7 @@ impl Connector { todo!("Handle connector receive message fail"); } }; - let connection = Connection::new(self.node_id.clone(), peer.id.clone(), stream, ch_rx); + let connection = Connection::new(self.node_id.clone(), peer.id.clone(), r_stream); self.connections.push(connection); Ok(peer) } else { @@ -206,82 +218,78 @@ impl Connector { async fn establish_connection_inbound( &mut self, - mut stream: tokio::net::TcpStream, + stream: tokio::net::TcpStream, addr: SocketAddr, ) -> Result { - if let Ok(mes) = Connector::receive_message(&mut stream).await { - let (ch_tx, ch_rx) = mpsc::channel::(100); - let peer = match mes { - ProtocolMessage::Handshake { peer_id, .. } => { - let ack = ProtocolMessage::HandshakeAck { - peer_id: self.node_id.clone(), - version: "".to_string(), - }; - match Connector::send_message(&mut stream, &ack).await { - Ok(()) => node::TcpPeer::new(peer_id, addr, ch_tx), - Err(e) => { - print_error_chain(&e.into()); - todo!("Implement inbound conneciton message send fail") + let (mut r_stream, mut w_stream) = stream.into_split(); + let mes = Connector::receive_message(&mut r_stream).await?; + let peer = match mes { + ProtocolMessage::Handshake { peer_id, .. } => { + let ack = ProtocolMessage::HandshakeAck { + peer_id: self.node_id.clone(), + version: "".to_string(), + }; + Connector::send_message(&mut w_stream, &ack).await?; + node::TcpPeer::new(peer_id, addr) + } + e => { + return Err(ConnectorError::Protocol(ProtocolError::Unexpected(e))); + } + }; + let connection = Connection::new(self.node_id.clone(), peer.id.clone(), r_stream); + self.connections.push(connection); + Ok(peer) + } - }, - } - } - _ => { - log(msg!( - ERROR, - "Invalid Message On Connetion Establishment: {mes}" - )); - todo!("Implement inbound conneciton message invalid"); - } - }; - let connection = Connection::new(self.node_id.clone(), peer.id.clone(), stream, ch_rx); - self.connections.push(connection); - Ok(peer) + pub async fn send_message_to_peer(&mut self, peer_id: NodeId, message: &ProtocolMessage) -> Result<(), ConnectorError> { + if let Some((_, w_stream)) = &mut self.peer_streams.iter_mut().find(|p| p.0 == peer_id) { + Connector::send_message(w_stream, message).await } else { - todo!("Implement inbount connection message read fail"); + Err(ConnectorError::UnknownPeerId(peer_id)) } } pub async fn send_message( - stream: &mut net::TcpStream, + stream: &mut net::tcp::OwnedWriteHalf, message: &ProtocolMessage, - ) -> Result<(), NetworkError> { + ) -> Result<(), ConnectorError> { let data = bincode::encode_to_vec(message, BINCODE_CONFIG)?; let len = data.len() as u32; + stream .write_all(&len.to_be_bytes()) .await - .map_err(|_e| NetworkError::TODO)?; + .map_err(|e| ConnectorError::IO(e))?; stream .write_all(&data) .await - .map_err(|_e| NetworkError::TODO)?; - stream.flush().await.map_err(|_e| NetworkError::TODO)?; + .map_err(|e| ConnectorError::IO(e))?; + stream.flush().await.map_err(|e| ConnectorError::IO(e))?; Ok(()) } pub async fn receive_message( - stream: &mut tokio::net::TcpStream, - ) -> Result { + stream: &mut OwnedReadHalf, + ) -> Result { let mut len_bytes = [0u8; 4]; stream .read_exact(&mut len_bytes) .await - .map_err(|_e| NetworkError::TODO)?; + .map_err(|e| ConnectorError::IO(e))?; let len = u32::from_be_bytes(len_bytes) as usize; if len >= super::message::MAX_MESSAGE_SIZE { - return Err(NetworkError::TODO); + return Err(ConnectorError::Protocol(ProtocolError::MessageTooLong(len))); } let mut data = vec![0u8; len]; stream .read_exact(&mut data) .await - .map_err(|_e| NetworkError::TODO)?; + .map_err(|e| ConnectorError::IO(e))?; let (message, _): (ProtocolMessage, usize) = bincode::decode_from_slice(&data, BINCODE_CONFIG)?; diff --git a/node/src/network/message.rs b/node/src/network/message.rs index 68518df..11bce96 100644 --- a/node/src/network/message.rs +++ b/node/src/network/message.rs @@ -4,9 +4,17 @@ use std::net::SocketAddr; pub const MAX_MESSAGE_SIZE: usize = 1_000_000; -#[derive(Debug, Clone, bincode::Encode, bincode::Decode, Hash, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, bincode::Encode, bincode::Decode, Hash, PartialEq, Eq)] pub struct NodeId(pub [u8; 16]); +#[derive(thiserror::Error, Debug)] +pub enum ProtocolError { + #[error("Invalid message len: {0}")] + MessageTooLong(usize), + #[error("Unexpected message: {0}")] + Unexpected(ProtocolMessage) +} + #[derive(Debug, Clone, bincode::Encode, bincode::Decode)] pub enum ProtocolMessage { BootstrapRequest { @@ -51,10 +59,9 @@ pub enum ProtocolMessage { } impl fmt::Display for NodeId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let msg = self.to_string(); - write!(f, "{}", msg) - } + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", hex::encode(self.0)) + } } impl fmt::Display for ProtocolMessage { diff --git a/node/src/node/blockchain.rs b/node/src/node/blockchain.rs index 32571b8..8420073 100644 --- a/node/src/node/blockchain.rs +++ b/node/src/node/blockchain.rs @@ -33,9 +33,9 @@ pub struct ChainBootStrap { #[allow(dead_code)] #[derive(Error, Debug)] pub enum BlockchainError { - #[error("Failed to serialize data: {0}")] + #[error("Failed to serialize data")] Encode(#[from] bincode::error::EncodeError), - #[error("Failed to deserialize data: {0}")] + #[error("Failed to deserialize data")] Decode(#[from] bincode::error::DecodeError), #[error("Database operation failed")] Database(#[from] DatabaseError), diff --git a/node/src/node/error.rs b/node/src/node/error.rs index 64d8f2b..dc8dbd5 100644 --- a/node/src/node/error.rs +++ b/node/src/node/error.rs @@ -4,8 +4,4 @@ use thiserror::Error; pub enum NetworkError { #[error("Implement NetworkError Enum: ({})", file!())] TODO, - #[error("Decode Error: {0}")] - Decode(#[from] bincode::error::DecodeError), - #[error("Encode Error: {0}")] - Encode(#[from] bincode::error::EncodeError), } diff --git a/node/src/node/node.rs b/node/src/node/node.rs index 3933ad3..1ac8ebf 100644 --- a/node/src/node/node.rs +++ b/node/src/node/node.rs @@ -1,6 +1,5 @@ -use crate::bus::{publish_system_event, publish_watcher_event, subscribe_system_event, SystemEvent}; -use shared::blockchain_core::{self, ChainData, SignedTransaction, validator::ValidationError}; -use crate::print_error_chain; +use crate::network::ConnectorError; +use shared::blockchain_core::{self, ChainData, SignedTransaction}; use crate::log; use crate::network::{NodeId, ProtocolMessage}; use crate::network::{Connector, ConnectorCommand}; @@ -20,24 +19,22 @@ use vlogger::*; pub struct TcpPeer { pub id: NodeId, pub addr: SocketAddr, - pub sender: tokio::sync::mpsc::Sender, } impl TcpPeer { pub fn new( id: NodeId, addr: SocketAddr, - sender: tokio::sync::mpsc::Sender, ) -> Self { - Self { id, addr, sender } + Self { id, addr } } } #[allow(dead_code)] pub struct Node { - pub tcp_connector: Option, + pub tcp_connector: Connector, pub id: NodeId, - pub addr: Option, + pub addr: SocketAddr, pub tcp_peers: HashMap, chain: Blockchain, listner_handle: Option>, @@ -46,7 +43,15 @@ pub struct Node { #[derive(Debug, Error)] pub enum NodeError { #[error("Block chain error")] - ChainError(#[from] BlockchainError), + Blockchain(#[from] BlockchainError), + #[error("Connector Error")] + Connector(#[from] ConnectorError), + #[error("Unknown peer address")] + UnknownPeerAddr(SocketAddr), + #[error("Invalid Address:\npeer: {0}\naddr: {1}")] + InvalidAddress(String, SocketAddr), + #[error("Hex Error")] + Hex(#[from] hex::FromHexError), } #[derive(Debug, Clone)] @@ -86,9 +91,7 @@ impl Node { .iter() .map(|p| p.1.addr.to_string().parse::().unwrap()) .collect(); - if let Some(a) = self.addr { - addr.push(a.clone()); - } + addr.push(self.addr.clone()); addr } @@ -109,43 +112,42 @@ impl Node { self.tcp_peers.remove_entry(&peer_id); } - async fn add_tcp_peer(&mut self, peer: TcpPeer) { + fn add_tcp_peer(&mut self, peer: TcpPeer) { log(msg!(DEBUG, "Added Peer from address: {}", peer.addr)); self.tcp_peers.insert(peer.id.clone(), peer); } pub async fn new_with_id( id: NodeId, - addr: Option, + addr: SocketAddr, chain: Blockchain, ) -> Self { Self { - id, + id: id.clone(), tcp_peers: HashMap::new(), addr, chain, listner_handle: None, - tcp_connector: None, + tcp_connector: Connector::new(id, addr), } } pub fn new( - addr: Option, + addr: SocketAddr, chain: Blockchain, ) -> Self { + let id = NodeId(*Uuid::new_v4().as_bytes()); Self { - id: NodeId(*Uuid::new_v4().as_bytes()), + id: id.clone(), tcp_peers: HashMap::new(), addr, listner_handle: None, - tcp_connector: None, + tcp_connector: Connector::new(id, addr), chain, } } async fn shutdown(&mut self) { - if let Some(conn) = &self.tcp_connector { - } let _ = self.chain.shutdown().await; } @@ -153,6 +155,25 @@ impl Node { Ok(self.chain.blocks()?) } + async fn send_message(&mut self, peer_id: NodeId, message: &ProtocolMessage) -> Result<(), NodeError> { + self.tcp_connector.send_message_to_peer(peer_id, &message).await?; + Ok(()) + } + + async fn broadcast_message(&mut self, msg: &ProtocolMessage) -> Result<(), NodeError> { + let keys: Vec<_> = self.tcp_peers.keys().cloned().collect(); + let mut results = Vec::new(); + for id in keys { + let res = self.send_message(id, msg).await; + results.push(res); + } + if let Some(error) = results.into_iter().find(|r| r.is_err()) { + error + } else { + Ok(()) + } + } + pub async fn process_message(&mut self, peer_id: NodeId, message: ProtocolMessage) -> Result<(), NodeError> { match message { ProtocolMessage::BootstrapRequest { .. } => { @@ -160,7 +181,7 @@ impl Node { let peer = &self.tcp_peers[&peer_id]; let blocks = self.chain.bootstrap()?; let resp = ProtocolMessage::BootstrapResponse { blocks }; - peer.sender.send(resp).await.unwrap(); + self.send_message(peer.id.clone(), &resp).await?; log(msg!(DEBUG, "Send BootstrapResponse to {peer_id}")); } ProtocolMessage::BootstrapResponse { blocks } => { @@ -172,11 +193,11 @@ impl Node { } ProtocolMessage::Ping { peer_id } => { log(msg!(DEBUG, "Received Ping from {peer_id}")); + let peer = &self.tcp_peers[&peer_id]; let resp = ProtocolMessage::Pong { peer_id: self.id.clone(), }; - let peer = &self.tcp_peers[&peer_id]; - peer.sender.send(resp).await.unwrap(); + self.send_message(peer.id.clone(), &resp).await?; } ProtocolMessage::GetPeersRequest { peer_id } => { log(msg!(DEBUG, "Received GetPeersRequest from {peer_id}")); @@ -185,7 +206,7 @@ impl Node { peer_addresses: peers, }; let peer = &self.tcp_peers[&peer_id]; - peer.sender.send(resp).await.unwrap(); + self.send_message(peer.id.clone(), &resp).await?; } ProtocolMessage::Block { block, .. } => { log(msg!(DEBUG, "Received Block from {peer_id}")); @@ -209,223 +230,163 @@ impl Node { Ok(()) } - pub async fn send_message_to_peer_addr(&self, addr: SocketAddr, msg: ProtocolMessage) { + pub async fn send_message_to_peer_addr(&mut self, addr: SocketAddr, msg: &ProtocolMessage) -> Result<(), NodeError> { if let Some((_, peer)) = self.tcp_peers.iter().find(|(_, v)| v.addr == addr) { - if let Err(e) = peer.sender.send(msg).await { - log(msg!(ERROR, "Error Sending message to peer: {e}")); - } - log(msg!(DEBUG, "Sent BootstrapRequest to seed")); + self.send_message(peer.id.clone(), &msg).await?; + Ok(()) } else { - log(msg!( - ERROR, - "Error Sending message to peer: peer not in list" - )); + Err(NodeError::UnknownPeerAddr(addr)) } } - pub async fn send_message_to_peer_id(&self, id: NodeId, msg: ProtocolMessage) { - if let Some(peer) = self.tcp_peers.get(&id) { - if let Err(e) = peer.sender.send(msg).await { - log(msg!(ERROR, "Error Sending message to peer: {e}")); - } - } + pub async fn send_message_to_peer_id(&mut self, id: NodeId, msg: &ProtocolMessage) -> Result<(), NodeError> { + self.send_message(id.clone(), &msg).await?; + Ok(()) } - async fn send_message_to_seed(&self, msg: ProtocolMessage) { + async fn send_message_to_seed(&mut self, msg: &ProtocolMessage) -> Result<(), NodeError> { for seed in SEED_NODES.iter() { - if let Some(_) = self.tcp_peers.iter().find(|(_, v)| v.addr == *seed) { - self.send_message_to_peer_addr(*seed, msg).await; - return; - } else { - self.send_message_to_peer_addr(*seed, msg).await; - return; + if let Some(s) = self.tcp_peers.iter().find(|(_, v)| v.addr == *seed) { + self.send_message_to_peer_addr(s.1.addr, &msg).await?; } } - log(msg!(ERROR, "No Seed Nodes Avaliable")); + Ok(()) } - async fn bootstrap(&mut self) -> Result<(), ValidationError> { - log(msg!(DEBUG, "Bootstrapping")); - + async fn bootstrap(&mut self) -> Result<(), NodeError> { let message = ProtocolMessage::BootstrapRequest { peer_id: self.id.clone(), version: "".to_string(), }; - self.send_message_to_seed(message).await; - + self.send_message_to_seed(&message).await?; Ok(()) } - async fn broadcast_network_data(&self, data: ChainData) { - for (id, peer) in &self.tcp_peers { - let message = ProtocolMessage::ChainData { - peer_id: self.id.clone(), - data: data.clone(), - }; - peer.sender.send(message).await.unwrap(); - log(msg!(DEBUG, "Send Transaction message to {id}")); - } + async fn broadcast_network_data(&mut self, data: ChainData) -> Result<(), NodeError> { + let message = ProtocolMessage::ChainData { + peer_id: self.id.clone(), + data, + }; + self.broadcast_message(&message).await } - async fn broadcast_block(&self, block: &blockchain_core::Block) { - for (id, peer) in &self.tcp_peers { - let message = ProtocolMessage::Block { - peer_id: self.id.clone(), - height: block.head().height as u64, - block: block.clone(), - }; - peer.sender.send(message).await.unwrap(); - log(msg!(DEBUG, "Send Block message to {id}")); - } + async fn broadcast_block(&mut self, block: &blockchain_core::Block) -> Result<(), NodeError> { + let message = ProtocolMessage::Block { + peer_id: self.id.clone(), + height: block.head().height as u64, + block: block.clone(), + }; + self.broadcast_message(&message).await } - async fn connector_cmd(&mut self, cmd: ConnectorCommand) -> Result, crate::network::ConnectorError> { - match &mut self.tcp_connector { - Some(t) => { t.execute_cmd(cmd).await }, - None => { - log(msg!(ERROR, "No Connector Availiable")); - todo!("Implement node level connection cmd fail"); - }, - } + async fn connector_cmd(&mut self, cmd: ConnectorCommand) -> Result, NodeError> { + let res = self.tcp_connector.command(cmd).await?; + Ok(res) } - async fn start_connection_listner(&mut self, addr: SocketAddr) { + fn start_connection_listner(&mut self, addr: SocketAddr) { log(msg!(DEBUG, "Starting Connection Listener")); - let connector = Connector::new(self.id.clone(), addr); - log(msg!(DEBUG, "Connector Build")); - self.tcp_connector = Some(connector); + let connector = Connector::new(self.id.clone(), addr); + log(msg!(DEBUG, "Connector Build")); + self.tcp_connector = connector; } - async fn connect_to_seed(&mut self) { - self - .connector_cmd(ConnectorCommand::ConnectToTcpSeed(SEED_NODES[0])) - .await; + async fn connect_to_seed(&mut self) -> Result, NodeError> { + let res = self.tcp_connector.command(ConnectorCommand::ConnectToTcpSeed(SEED_NODES[0])).await?; + Ok(res) } - pub async fn command(&mut self, command: NodeCommand) { - match command { - NodeCommand::BootStrap => { - log(msg!(DEBUG, "Received NodeCommand::BootStrap")); - let _ = self.bootstrap().await; + pub async fn command(&mut self, command: NodeCommand) -> Result, NodeError> { + match command { + NodeCommand::BootStrap => { + self.bootstrap().await?; + } + NodeCommand::BroadcastTransaction(sign_tx) => { + self.broadcast_network_data(ChainData::Transaction(sign_tx)).await?; + } + NodeCommand::StartListner(addr) => { + self.start_connection_listner(addr); + } + NodeCommand::ConnectToSeeds => { + self.connect_to_seed().await?; + } + NodeCommand::ConnectTcpPeer(addr) => { + let addr_sock = addr.parse::() + .map_err(|_| NodeError::InvalidAddress("Self".to_string(), self.addr))?; + let res = self.connector_cmd(ConnectorCommand::ConnectToTcpPeer(addr_sock)).await?; + if let Some(node_cmd) = res { + return Ok(Some(WatcherCommand::Node(node_cmd))) } - NodeCommand::BroadcastTransaction(sign_tx) => { - self.broadcast_network_data(ChainData::Transaction(sign_tx)).await; - } - NodeCommand::StartListner(addr) => { - self.start_connection_listner(addr).await; - } - NodeCommand::ConnectToSeeds => { - self.connect_to_seed().await; - } - NodeCommand::ConnectTcpPeer(addr) => { - log(msg!(DEBUG, "Received ConnectToPeer: {addr}")); - if let Ok(addr_sock) = addr.parse::() { - let mes = ConnectorCommand::ConnectToTcpPeer(addr_sock); - self.connector_cmd(mes).await; - } else { - log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}")); - } - } - NodeCommand::PingAddr(addr) => { - if let Ok(addr_sock) = addr.parse::() { - let mes = ProtocolMessage::Ping { peer_id: self.id.clone() }; - self.send_message_to_peer_addr(addr_sock, mes).await; - } else { - log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}")); - } - } - NodeCommand::PingId(id) => { - let mes = ProtocolMessage::Ping { peer_id: self.id.clone() }; - self.send_message_to_peer_id(id, mes).await; - } - NodeCommand::AddPeer(peer) => { - self.add_tcp_peer(peer).await; - } - NodeCommand::RemovePeer { peer_id } => { - self.remove_tcp_peer(peer_id).await; - } - - NodeCommand::ProcessMessage { peer_id, message } => { - self.process_message(peer_id, message).await.unwrap(); - } - NodeCommand::AwardCurrency { address, amount } => { - if let Err(e) = self.chain.award_currency(address, amount) { - print_error_chain(&e.into()); - } - } - NodeCommand::ProcessChainData(data) => { - if let Err(e) = self.chain.apply_chain_data(data.clone()) { - print_error_chain(&e.into()); - } - self.broadcast_network_data(data).await; - } - NodeCommand::CreateBlock => { - log(msg!(DEBUG, "Received CreateBlock Command")); - match self.chain.create_block() { - Ok(block) => { - log(msg!( - INFO, - "Created Block with hash {}", - hex::encode(block.head().block_hash()) - )); - self.broadcast_block(&block).await; - } - Err(e) => print_error_chain(&e.into()), - } - } - NodeCommand::DisplayBlockInteractive => { - let blocks = match self.chain.list_blocks() { - Ok(b) => b, - Err(e) => return print_error_chain(&e.into()), - }; - let wat_cmd = WatcherCommand::SetMode(WatcherMode::Select { + } + NodeCommand::PingAddr(addr) => { + let addr_sock = addr.parse::() + .map_err(|_| NodeError::InvalidAddress("Self".to_string(), self.addr))?; + let mes = ProtocolMessage::Ping { peer_id: self.id.clone() }; + self.send_message_to_peer_addr(addr_sock, &mes).await?; + } + NodeCommand::PingId(id) => { + let mes = ProtocolMessage::Ping { peer_id: self.id.clone() }; + self.send_message_to_peer_id(id, &mes).await?; + } + NodeCommand::AddPeer(peer) => { + self.add_tcp_peer(peer); + } + NodeCommand::RemovePeer { peer_id } => { + self.remove_tcp_peer(peer_id).await; + } + NodeCommand::ProcessMessage { peer_id, message } => { + self.process_message(peer_id, message).await?; + } + NodeCommand::AwardCurrency { address, amount } => { + self.chain.award_currency(address, amount)?; + } + NodeCommand::ProcessChainData(data) => { + self.chain.apply_chain_data(data.clone())?; + self.broadcast_network_data(data).await?; + } + NodeCommand::CreateBlock => { + let block = self.chain.create_block()?; + self.broadcast_block(&block).await?; + } + NodeCommand::DisplayBlockInteractive => { + let blocks = self.chain.list_blocks()?; + return Ok(Some( + WatcherCommand::SetMode(WatcherMode::Select { content: blocks.iter().map(|h| hex::encode(h)).collect::>().into(), title: "Select Block to display".to_string(), callback: Box::new(WatcherCommand::Node(NodeCommand::DisplayBlockByKey("".to_string()))), - index: 0 - }); - publish_watcher_event(wat_cmd); - } - NodeCommand::DisplayBlockByKey(key) => { - if let Ok(block_hash) = hex::decode(key) { - self.chain.display_block_by_key(&block_hash) - } - }, - NodeCommand::DisplayBlockByHeight(height) => self.chain.display_block_by_height(height), - NodeCommand::ListBlocks => { - log(msg!(DEBUG, "Received DebugListBlocks command")); - match self.chain.list_blocks() { - Ok(s) => log(s.iter().map(|h| format!("{}\n", hex::encode(h))).collect::>().join("\n")), - Err(e) => print_error_chain(&e.into()), - } - } - NodeCommand::ListPeers => { - log(msg!(DEBUG, "Received DebugListPeers command")); - log(self.list_peers()); - } - NodeCommand::ShowId => { - log(msg!(DEBUG, "Received DebugListBlocks command")); - self.show_id().await; - } - NodeCommand::Exit => { - log(msg!(DEBUG, "Node Exit")); + index: 0, + }) + )); } + NodeCommand::DisplayBlockByKey(key) => { + self.chain.display_block_by_key(&hex::decode(key)?); + } + NodeCommand::DisplayBlockByHeight(height) => { + self.chain.display_block_by_height(height); + } + NodeCommand::ListBlocks => { + let s = self.chain.list_blocks()?; + let block_list = s.iter().map(|h| format!("{}\n", hex::encode(h))).collect::>().join("\n"); + return Ok(Some(WatcherCommand::Print(block_list))); + } + NodeCommand::ListPeers => { + return Ok(Some(WatcherCommand::Print(self.list_peers()))); + } + NodeCommand::ShowId => { + self.show_id().await; + } + NodeCommand::Exit => {} } + Ok(None) + } + + fn handle_error(&self, error: NodeError) { + log(msg!(ERROR, "{error}")); } pub async fn init(&mut self) { - if let Some(addr) = self.addr { - self.start_connection_listner(addr).await; - } else { - self - .start_connection_listner(SocketAddr::new( - std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), - 8080, - )) - .await; - }; - let _http_handle = tokio::spawn(async move { let _ = crate::api::server::start_server().await; }); @@ -440,15 +401,12 @@ impl Node { // } // }); - let mut _system_rx = subscribe_system_event(); - publish_system_event(SystemEvent::NodeStarted); - self.chain.recover_mempool(); } pub async fn poll(&mut self) -> Option<()> { tokio::select! { - _con_cmd = self.tcp_connector.as_mut()?.poll() => { + _con_cmd = self.tcp_connector.poll() => { None } } diff --git a/node/src/watcher/builder.rs b/node/src/watcher/builder.rs index b3eb698..44b69c7 100644 --- a/node/src/watcher/builder.rs +++ b/node/src/watcher/builder.rs @@ -71,14 +71,21 @@ impl WatcherBuilder { self.addr = Some(crate::seeds_constants::SEED_NODES[0]); } + if self.addr.is_none() { + self.addr = Some(SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), + 8080, + )) + } + let chain = node::Blockchain::new(self.database, self.temporary).unwrap(); - let mut node = Node::new(self.addr.clone(), chain); + let mut node = Node::new(self.addr.unwrap(), chain); node.init().await; log(msg!(INFO, "Built Node")); if self.bootstrap { - node.command(NodeCommand::BootStrap).await; + node.command(NodeCommand::BootStrap).await.unwrap(); } let cmd_history = Vec::new(); diff --git a/node/src/watcher/watcher.rs b/node/src/watcher/watcher.rs index 9bd4f0b..ff5f4dd 100644 --- a/node/src/watcher/watcher.rs +++ b/node/src/watcher/watcher.rs @@ -1,12 +1,13 @@ use crate::{ - bus::{SystemEvent, publish_system_event, subscribe_system_event}, cli::cli, node::{Node, node::NodeCommand}, watcher::WatcherMode + bus::{SystemEvent, publish_system_event, subscribe_system_event}, cli::cli, node::{Node, NodeError, node::NodeCommand}, watcher::WatcherMode }; -use shared::print_error_chain; +use ratatui::prelude::CrosstermBackend; +use crate::print_error_chain; use crossterm::event::{Event, EventStream, KeyCode, KeyEventKind, MouseButton, MouseEventKind}; use futures::StreamExt; use memory_stats::memory_stats; -use std::io::{self, Write}; +use std::io::{self, Stdout, Write}; use tokio::{ select, time::{Duration, interval}, @@ -24,6 +25,12 @@ use cli_renderer::{ RenderCommand }; +#[derive(thiserror::Error, Debug)] +pub enum WatcherError { + #[error("Node Error")] + Node(#[from] NodeError), +} + #[allow(dead_code)] pub struct Watcher { cmd_buffer: String, @@ -86,10 +93,9 @@ impl Watcher { poll_res = self.poll() => { match poll_res { Ok(event) => { - self.renderer.set_area(terminal.get_frame().area()); - match self.handle_event(event).await { - Ok(ret) => if !ret { self.exit(); break } - Err(e) => log(msg!(ERROR, "{}", e)), + match self.handle_event(event, &mut terminal).await { + Ok(_) => {} + Err(e) => print_error_chain(&e.into()), } } Err(()) => { log(msg!(ERROR, "Failed to read from Stream")) } @@ -99,10 +105,13 @@ impl Watcher { match ui_event { Ok(cmd) => { self.renderer.set_area(terminal.get_frame().area()); - self.command(cmd); + match self.command(cmd).await { + Ok(_) => {} + Err(e) => print_error_chain(&e.into()), + } }, Err(e) => { - log(msg!(ERROR, "{}", e)) + print_error_chain(&e.into()) } } } @@ -160,28 +169,32 @@ impl Watcher { self.mode = mode; } - pub async fn command(&mut self, cmd: WatcherCommand) { + pub async fn command(&mut self, cmd: WatcherCommand) -> Result, WatcherError> { match cmd { WatcherCommand::NodeResponse(resp) => log(resp), - WatcherCommand::Node(n) => self.node.command(n).await, + WatcherCommand::Node(n) => return Ok(self.node.command(n).await?), WatcherCommand::Render(p) => self.renderer.apply(p), WatcherCommand::Echo(s) => self.echo(s), WatcherCommand::Print(s) => log(s), WatcherCommand::InvalidCommand(str) => self.invalid_command(str).await, WatcherCommand::Exit => self.exit(), WatcherCommand::SetMode(mode) => self.set_mode(mode), - } + }; + Ok(None) } - async fn handle_enter(&mut self) { + async fn handle_enter(&mut self) -> Result<(), WatcherError> { match &self.mode { WatcherMode::Input => { if !self.cmd_buffer.is_empty() { - let exec_event = cli(&self.cmd_buffer); - self.command(exec_event).await; + let mut cmd = cli(&self.cmd_buffer); self.cmd_buffer.clear(); - self.renderer.handle_enter() + self.renderer.handle_enter(); + while let Some(new_cmd) = self.command(cmd).await? { + cmd = new_cmd; + } } + Ok(()) } WatcherMode::Select { content, callback, index, .. } => { match &&**callback { @@ -190,7 +203,7 @@ impl Watcher { NodeCommand::DisplayBlockByKey(_) => { let key = (*content)[*index].clone().to_string(); log(msg!(DEBUG, "KEY IN ENTER: {key}")); - self.node.command(NodeCommand::DisplayBlockByKey(key)); + self.node.command(NodeCommand::DisplayBlockByKey(key)).await?; } _ => {log(msg!(DEBUG, "TODO: Implement callback for {:?}", nd_cmd))} } @@ -200,6 +213,7 @@ impl Watcher { self.mode = WatcherMode::Input; let rd_cmd = RenderCommand::SetMode(InputMode::Input); self.renderer.apply(rd_cmd); + Ok(()) } } } @@ -268,7 +282,7 @@ impl Watcher { }); } - pub async fn handle_event(&mut self, event: Event) -> io::Result { + pub async fn handle_event(&mut self, event: Event, terminal: &mut ratatui::Terminal>) -> Result<(), WatcherError> { match event { Event::Mouse(event) => match event.kind { MouseEventKind::ScrollUp => { @@ -288,6 +302,9 @@ impl Watcher { }, _ => {} }, + Event::Resize(_, _) => { + self.renderer.set_area(terminal.get_frame().area()); + } Event::Key(k) if k.kind == KeyEventKind::Press => match k.code { KeyCode::Esc => publish_system_event(SystemEvent::Shutdown), KeyCode::Char(c) => { @@ -299,7 +316,7 @@ impl Watcher { self.renderer.handle_backspace() } KeyCode::Enter => { - self.handle_enter().await; + self.handle_enter().await?; } KeyCode::Up | KeyCode::Down | KeyCode::Left | KeyCode::Right => { self.handle_arrow_key(k.code); @@ -313,7 +330,7 @@ impl Watcher { } _ => {} } - Ok(true) + Ok(()) } pub async fn poll(&mut self) -> Result {