diff --git a/src/args.rs b/src/args.rs index ce20fba..517b656 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,3 +1,5 @@ +use std::net::SocketAddr; + use clap::{Parser, Subcommand}; use crate::core; @@ -6,7 +8,7 @@ use crate::core; pub struct Args { /// Provide address on which node will listen #[arg(short = 'a', long)] - pub addr: Option, + pub addr: Option, /// Provide File with current chain #[arg(short = 'f', long)] diff --git a/src/main.rs b/src/main.rs index ec62e4b..2da97e7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,3 @@ -use error::{ BlockchainError, handle_error }; - pub mod error; pub mod args; pub mod core; @@ -9,28 +7,19 @@ pub mod watcher; use crate::{args::get_args, watcher::watcher::Watcher}; -const SEED_ADDR: &str = "127.0.0.1:8333"; - -fn add_transaction(tx: core::Tx) -> Result<(), BlockchainError> { - Ok(()) -} - -fn list_accounts() { - println!("Account Balances:\n-----------------"); -} - #[tokio::main] async fn main() { let args = get_args(); - let mut watcher = Watcher::build().file(args.seed_file).addr(args.addr).start(); + let mut watcher = Watcher::build().file(args.seed_file).addr(args.addr).start().await; loop { - if !watcher.poll().await.is_ok() { + if watcher.poll().await.is_ok_and(|b| b) { break ; } } + ratatui::restore(); println!("Hello, world!"); } diff --git a/src/native_node/message.rs b/src/native_node/message.rs index 561e1de..315abe4 100644 --- a/src/native_node/message.rs +++ b/src/native_node/message.rs @@ -1,3 +1,5 @@ +use std::net::SocketAddr; + use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net; use crate::native_node::node; @@ -19,7 +21,7 @@ pub enum ProtocolMessage { peer_id: uuid::Uuid }, GetPeersResponse { - peer_addresses: Vec + peer_addresses: Vec }, Handshake { node_id: uuid::Uuid, version: String }, Block { peer_id: uuid::Uuid, height: u64, block: core::Block }, diff --git a/src/native_node/network.rs b/src/native_node/network.rs index 09d29ee..3c5a2fd 100644 --- a/src/native_node/network.rs +++ b/src/native_node/network.rs @@ -1,3 +1,5 @@ +use std::net::SocketAddr; + use crate::native_node::{message, node}; use crate::seeds_constants::SEED_NODES; @@ -8,21 +10,23 @@ use tokio::sync::mpsc; impl node::NativeNode { pub async fn connect_to_seeds(&mut self, sender: mpsc::Sender) { - for seed in SEED_NODES { - if seed != self.addr { + for seed in SEED_NODES.iter() { + if let Some(a)= self.addr { + if *seed != a { if let Ok(mut stream) = tokio::net::TcpStream::connect(seed).await { if let Ok(peer_id) = node::NativeNode::send_handshake(self.id.clone(), &mut stream).await { let sender = sender.clone(); - node::NativeNode::establish_connection(peer_id, seed.to_string(), stream, sender).await; + node::NativeNode::establish_connection(peer_id, *seed, stream, sender).await; } } } + } } } pub async fn establish_connection( peer_id: uuid::Uuid, - addr: String, + addr: SocketAddr, stream: tokio::net::TcpStream, request_sender: tokio::sync::mpsc::Sender ) { @@ -59,7 +63,7 @@ impl node::NativeNode { if let Ok(message) = node::NativeNode::receive_message(&mut stream).await { match message { message::ProtocolMessage::Handshake { node_id, .. } => { - node::NativeNode::establish_connection(node_id, addr.to_string(), stream, request_sender.clone()).await; + node::NativeNode::establish_connection(node_id, addr, stream, request_sender.clone()).await; }, _ => { log!(WARNING, "Invalid Response! expected Handshake, got {:?}", message); diff --git a/src/native_node/node.rs b/src/native_node/node.rs index aabe7e5..f3dd2ab 100644 --- a/src/native_node/node.rs +++ b/src/native_node/node.rs @@ -5,45 +5,52 @@ use crate::seeds_constants::SEED_NODES; use crate::watcher::executor::ExecutorCommand; use std::collections::HashMap; +use std::net::SocketAddr; use vlogger::*; use tokio::sync::mpsc; use uuid::Uuid; pub struct TcpPeer { pub id: Uuid, - pub addr: String, + pub addr: SocketAddr, pub sender: tokio::sync::mpsc::Sender } pub struct NativeNode { pub id: Uuid, - pub addr: String, + pub addr: Option, pub tcp_peers: HashMap, pub chain: core::Blockchain, + listner_handle: Option>, exec_tx: mpsc::Sender, rx: mpsc::Receiver, + tx: mpsc::Sender, } #[derive(Debug)] pub enum NodeCommand { - AddPeer { peer_id: Uuid, addr: String, sender: tokio::sync::mpsc::Sender }, + AddPeer { peer_id: Uuid, addr: SocketAddr, sender: tokio::sync::mpsc::Sender }, RemovePeer { peer_id: Uuid }, ProcessMessage { peer_id: Uuid, message: ProtocolMessage }, Transaction { tx: core::Tx }, + StartListner(SocketAddr), CreateBlock, DebugListBlocks, DebugListPeers, DebugShowId, DebugDumpBlocks, - ConnectToSeeds + ConnectToSeeds, + Exit, } impl NativeNode { - pub fn peer_addresses(&self) -> Vec { - let mut addr: Vec = self.tcp_peers.iter().map(|p| p.1.addr.to_string()).collect(); - addr.push(self.addr.clone()); + pub fn peer_addresses(&self) -> Vec { + let mut addr: Vec = self.tcp_peers.iter().map(|p| p.1.addr.to_string().parse::().unwrap()).collect(); + if let Some(a) = self.addr { + addr.push(a.clone()); + } addr } @@ -63,7 +70,7 @@ impl NativeNode { self.tcp_peers.remove_entry(&peer_id); } - fn add_tcp_peer(&mut self, id: Uuid, addr: String, sender: tokio::sync::mpsc::Sender) { + fn add_tcp_peer(&mut self, id: Uuid, addr: SocketAddr, sender: tokio::sync::mpsc::Sender) { let peer = TcpPeer { id: id, addr, @@ -78,24 +85,34 @@ impl NativeNode { pub fn new_with_id( id: uuid::Uuid, exec_tx: mpsc::Sender, - rx: mpsc::Receiver + addr: Option, + blocks: Option>, ) -> Self { + let (tx, rx) = mpsc::channel::(100); Self { id, tcp_peers: HashMap::new(), - chain: Default::default(), - addr: String::new(), + chain: { + if blocks.is_some() { + Blockchain::build(blocks.unwrap()).unwrap_or(Default::default()) + } else { + Default::default() + } + }, + addr, exec_tx, + listner_handle: None, + tx, rx, } } pub fn new( - addr: Option, + addr: Option, blocks: Option>, exec_tx: mpsc::Sender, - rx: mpsc::Receiver, ) -> Self { + let (tx, rx) = mpsc::channel::(100); Self { id: Uuid::new_v4(), tcp_peers: HashMap::new(), @@ -106,9 +123,11 @@ impl NativeNode { Default::default() } }, - addr: if addr.is_some() { addr.unwrap() } else { String::new() }, + addr, exec_tx, - rx + listner_handle: None, + tx, + rx, } } @@ -171,7 +190,7 @@ impl NativeNode { pub async fn broadcast_block(&self, block: &core::Block) { for (id, peer) in &self.tcp_peers { - let message = ProtocolMessage::Block{ + let message = ProtocolMessage::Block { peer_id: self.id, height: self.chain.blocks().len() as u64, block: block.clone() @@ -181,22 +200,39 @@ impl NativeNode { } } + pub fn tx(&self) -> mpsc::Sender { + return self.tx.clone() + } + + async fn start_connection_listner(&mut self, addr: SocketAddr) { + if self.listner_handle.is_some() { + self.listner_handle.as_ref().unwrap().abort(); + self.listner_handle = None; + } + + if let Ok(tcp_listner) = tokio::net::TcpListener::bind(addr).await { + let id = self.id.clone(); + let node_tx = self.tx(); + self.listner_handle = Some(tokio::spawn({ + async move { + NativeNode::accept_connections(tcp_listner, node_tx, id).await; + }})) + }; + } + pub async fn run_native(&mut self) { - let tcp_listner = tokio::net::TcpListener::bind(&self.addr).await.unwrap(); - let (channel_write, mut channel_read) = mpsc::channel::(100); + if let Some(a) = self.addr { + self.start_connection_listner(a.clone()).await; + }; - let id = self.id.clone(); - tokio::spawn({ - let c = channel_write.clone(); - async move { - NativeNode::accept_connections(tcp_listner, c, id).await; - }}); - - while let Some(command) = channel_read.recv().await { + while let Some(command) = self.rx.recv().await { match command { + NodeCommand::StartListner(addr) => { + self.start_connection_listner(addr).await; + } NodeCommand::ConnectToSeeds => { - self.connect_to_seeds(channel_write.clone()).await; + self.connect_to_seeds(self.tx()).await; }, NodeCommand::AddPeer { peer_id, addr, sender } => { self.add_tcp_peer(peer_id, addr, sender); @@ -231,6 +267,10 @@ impl NativeNode { NodeCommand::DebugDumpBlocks => { // self.chain.dump_blocks(&mut self.db_file); } + NodeCommand::Exit => { + log!(DEBUG, "Node Exit"); + break ; + } } } } diff --git a/src/seeds_constants.rs b/src/seeds_constants.rs index e4ce96c..6fc4487 100644 --- a/src/seeds_constants.rs +++ b/src/seeds_constants.rs @@ -1,3 +1,8 @@ -pub const SEED_NODES: [&str; 1] = [ - "127.0.0.1:8333" -]; +use once_cell::sync::Lazy; +use std::net::{SocketAddr, IpAddr, Ipv4Addr}; + +pub static SEED_NODES: Lazy<[SocketAddr; 3]> = Lazy::new(|| [ + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8333), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 3000), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 5432), +]); diff --git a/src/watcher/executor.rs b/src/watcher/executor.rs index 7cefd54..fd08d21 100644 --- a/src/watcher/executor.rs +++ b/src/watcher/executor.rs @@ -1,5 +1,6 @@ use crate::{native_node::node::NodeCommand, watcher::renderer::*}; use tokio::sync::mpsc; +use vlogger::*; pub enum ExecutorCommand { NodeResponse(String), @@ -28,6 +29,7 @@ impl Executor { } fn exit(&mut self) { + log!(DEBUG, "Executor Exit"); self.exit = true } diff --git a/src/watcher/parser.rs b/src/watcher/parser.rs index 462e36c..4955eb7 100644 --- a/src/watcher/parser.rs +++ b/src/watcher/parser.rs @@ -1,3 +1,5 @@ +use std::net::SocketAddr; + use crate::native_node::node::NodeCommand; use crate::watcher::executor::{ExecutorCommand}; use vlogger::*; @@ -50,6 +52,7 @@ impl Parser { async fn listen(&mut self) { if let Ok(Some(mes)) = timeout(Duration::from_millis(400), self.rx.recv()).await { + log!(DEBUG, "Parser: Received message from watcher"); match mes { ParserCommand::ParseCmdString(s) => { let s_split: Vec<&str> = s.split(" ").collect(); @@ -87,12 +90,13 @@ impl Parser { }, "list" => { if args.len() != 1 { - log!(ERROR, "{cmd}: Invalid arg! (blocks, peers)"); - } - match args[0] { - "blocks" => ExecutorCommand::Node(NodeCommand::DebugListBlocks), - "peers" => ExecutorCommand::Node(NodeCommand::DebugListPeers), - _ => ExecutorCommand::InvalidCommand(msg!(ERROR, "Unkown arg: {}", args[0])), + ExecutorCommand::InvalidCommand(msg!(ERROR, "node list: expects 1 argument")) + } else { + match args[0] { + "blocks" => ExecutorCommand::Node(NodeCommand::DebugListBlocks), + "peers" => ExecutorCommand::Node(NodeCommand::DebugListPeers), + _ => ExecutorCommand::InvalidCommand(msg!(ERROR, "Unkown arg: {}", args[0])), + } } }, "dump_blocks" => { @@ -100,6 +104,17 @@ impl Parser { }, "connect" => { ExecutorCommand::Node(NodeCommand::ConnectToSeeds) + }, + "listen" => { + if args.len() != 1 { + ExecutorCommand::InvalidCommand(msg!(ERROR, "node listen: expects 1 argument")) + } else { + if let Ok(addr) = args[0].parse::() { + ExecutorCommand::Node(NodeCommand::StartListner(addr)) + } else { + ExecutorCommand::InvalidCommand(msg!(ERROR, "node listen: invalid address {}", args[0])) + } + } } _ => { ExecutorCommand::InvalidCommand(msg!(ERROR, "node: unknown argument {}", args[0])) @@ -131,10 +146,11 @@ impl Parser { ExecutorCommand::InvalidCommand(msg!(ERROR, "Unknown Command {cmd}")) } }; - let _ = self.exec_tx.send(exec_cmd); + let _ = self.exec_tx.send(exec_cmd).await; } } ParserCommand::Exit => { + log!(DEBUG, "Parser Exit"); self.exit(); } } diff --git a/src/watcher/renderer.rs b/src/watcher/renderer.rs index 603d4fc..e8e6756 100644 --- a/src/watcher/renderer.rs +++ b/src/watcher/renderer.rs @@ -7,9 +7,10 @@ use ratatui::{ symbols::border, widgets::{Block, Paragraph, Widget}, DefaultTerminal, Frame, - }; +use vlogger::*; + use tokio::sync::mpsc; use tokio::time::{timeout, Duration}; use std::io; @@ -121,6 +122,7 @@ impl Renderer { } fn exit(&mut self) { + log!(DEBUG, "Renderer Exit"); self.exit = true; } diff --git a/src/watcher/watcher.rs b/src/watcher/watcher.rs index 1ef0103..5a9bc21 100644 --- a/src/watcher/watcher.rs +++ b/src/watcher/watcher.rs @@ -2,9 +2,10 @@ use crate::watcher::*; use crossterm::{event::{self, Event, KeyCode, KeyEventKind}}; use tokio::sync::mpsc; -use std::io::{self}; +use std::{io::{self}, net::SocketAddr}; use crate::{native_node::node::{NativeNode, NodeCommand}}; +use vlogger::*; pub struct Watcher { render_tx: mpsc::Sender, @@ -17,7 +18,7 @@ pub struct Watcher { #[derive(Default)] pub struct WatcherBuilder { - addr: Option, + addr: Option, seed_file: Option } @@ -26,6 +27,11 @@ impl Watcher { WatcherBuilder::new() } + pub async fn log(&self, msg: String) -> Result<(), mpsc::error::SendError> { + let rendermsg = RenderCommand::RenderStringToPane { str: msg, pane: RenderPane::CliOutput }; + self.render_tx.send(rendermsg).await + } + pub async fn exit(self) { ratatui::restore(); // for (i, handle) in self.handles.into_iter().enumerate() { @@ -41,27 +47,29 @@ impl Watcher { KeyCode::Char(c) => { self.cmd_buffer.push(c); let message = RenderCommand::RenderInput(k.code); - let _ = self.render_tx.send(message); + let _ = self.render_tx.send(message).await; } KeyCode::Backspace => { self.cmd_buffer.pop(); let message = RenderCommand::RenderInput(k.code); - let _ = self.render_tx.send(message); + let _ = self.render_tx.send(message).await; }, KeyCode::Enter => { let rd_mes = RenderCommand::RenderInput(k.code); let pr_mes = ParserCommand::ParseCmdString(self.cmd_buffer.clone()); - let _ = self.render_tx.send(rd_mes); - let _ = self.parser_tx.send(pr_mes); + let _ = self.render_tx.send(rd_mes).await; + let _ = self.parser_tx.send(pr_mes).await; self.cmd_buffer.clear(); } KeyCode::Esc => { let rd_mes = RenderCommand::Exit; let pr_mes = ParserCommand::Exit; let exec_mes = ExecutorCommand::Exit; - let _ = self.render_tx.send(rd_mes); - let _ = self.parser_tx.send(pr_mes); - let _ = self.exec_tx.send(exec_mes); + let node_mes = NodeCommand::Exit; + let _ = self.render_tx.send(rd_mes).await; + let _ = self.parser_tx.send(pr_mes).await; + let _ = self.exec_tx.send(exec_mes).await; + let _ = self.node_tx.send(node_mes).await; return Ok(false); } _ => {} @@ -78,7 +86,7 @@ impl WatcherBuilder { Self::default() } - pub fn addr(mut self, addr: Option) -> Self { + pub fn addr(mut self, addr: Option) -> Self { self.addr = addr; self } @@ -88,11 +96,16 @@ impl WatcherBuilder { self } - pub fn start(self) -> Watcher { + pub async fn log(render_tx: &mpsc::Sender, msg: String) -> Result<(), mpsc::error::SendError> { + let rendermsg = RenderCommand::RenderStringToPane { str: msg, pane: RenderPane::CliOutput }; + render_tx.send(rendermsg).await + } + + + pub async fn start(self) -> Watcher { let (render_tx, render_rx) = mpsc::channel::(100); let (parser_tx, parser_rx) = mpsc::channel::(100); let (exec_tx, exec_rx) = mpsc::channel::(100); - let (node_tx, node_rx) = mpsc::channel::(100); let mut terminal = ratatui::init(); let render_handle = tokio::spawn({ @@ -100,6 +113,14 @@ impl WatcherBuilder { let _ = Renderer::new(render_rx, RenderLayoutKind::Cli).run(&mut terminal).await; } }); + let _ = Self::log(&render_tx, msg!(INFO, "Started Renderer")).await; + + let blocks = self.seed_file + .as_ref() + .and_then(|path| std::fs::read_to_string(path).ok()) + .and_then(|content| serde_json::from_str(&content).ok()); + let mut node = NativeNode::new(self.addr.clone(), blocks, exec_tx.clone()); + let _ = Self::log(&render_tx, msg!(INFO, "Build Node")); let parser_handle = tokio::spawn({ let exec_tx = exec_tx.clone(); @@ -108,27 +129,28 @@ impl WatcherBuilder { } }); + let _ = Self::log(&render_tx, msg!(INFO, "Started Parser")).await; + let executor_handle = tokio::spawn({ let rend_tx = render_tx.clone(); - let node_tx = node_tx.clone(); + let node_tx = node.tx(); async move { let _ = Executor::new(rend_tx, node_tx, exec_rx).run().await; } }); - let blocks = self.seed_file - .as_ref() - .and_then(|path| std::fs::read_to_string(path).ok()) - .and_then(|content| serde_json::from_str(&content).ok()); + let _ = Self::log(&render_tx, msg!(INFO, "Started Executor")).await; + let node_tx = node.tx(); let node_handle = tokio::spawn({ - let exec_tx = exec_tx.clone(); async move { - let _ = NativeNode::new(self.addr.clone(), blocks, exec_tx, node_rx); + node.run_native().await; } }); + let _ = Self::log(&render_tx, msg!(INFO, "Started Node")).await; + Watcher { render_tx, node_tx,