From df67c432f3df343ca6331481890f62db25c946d8 Mon Sep 17 00:00:00 2001 From: victor Date: Sun, 31 Aug 2025 20:58:20 +0200 Subject: [PATCH] need to fix pop selection in renderer --- node/src/args.rs | 223 ++++----- node/src/bus/executor.rs | 8 +- node/src/bus/network.rs | 16 +- node/src/bus/render.rs | 17 - node/src/bus/system.rs | 12 +- node/src/bus/watcher.rs | 15 + node/src/cli.rs | 98 ++-- node/src/core/block.rs | 72 +-- node/src/core/blockchain.rs | 41 +- node/src/core/data.rs | 2 +- node/src/core/hasher.rs | 4 +- node/src/core/tx.rs | 80 ++-- node/src/db/database.rs | 79 +++- node/src/db/error.rs | 2 +- node/src/executor/command.rs | 15 + node/src/executor/executor.rs | 92 ++++ node/src/lib.rs | 96 ++-- node/src/main.rs | 41 +- node/src/node/error.rs | 2 +- node/src/node/node.rs | 797 ++++++++++++++++---------------- node/src/protocol/connection.rs | 132 +++--- node/src/protocol/connector.rs | 506 ++++++++++---------- node/src/protocol/message.rs | 162 +++---- node/src/renderer/layout.rs | 93 ++++ node/src/renderer/pane.rs | 172 +++++++ node/src/renderer/renderer.rs | 307 ++++++++++++ node/src/seeds_constants.rs | 10 +- node/src/watcher/builder.rs | 150 ++++++ node/src/watcher/command.rs | 20 + node/src/watcher/executor.rs | 101 ---- node/src/watcher/parser.rs | 63 --- node/src/watcher/renderer.rs | 405 ---------------- node/src/watcher/watcher.rs | 570 +++++++++++------------ 33 files changed, 2344 insertions(+), 2059 deletions(-) delete mode 100644 node/src/bus/render.rs create mode 100644 node/src/bus/watcher.rs create mode 100644 node/src/executor/command.rs create mode 100644 node/src/executor/executor.rs create mode 100644 node/src/renderer/layout.rs create mode 100644 node/src/renderer/pane.rs create mode 100644 node/src/renderer/renderer.rs create mode 100644 node/src/watcher/builder.rs create mode 100644 node/src/watcher/command.rs delete mode 100644 node/src/watcher/executor.rs delete mode 100644 node/src/watcher/parser.rs delete mode 100644 node/src/watcher/renderer.rs diff --git a/node/src/args.rs b/node/src/args.rs index 74d7620..f8e5a37 100644 --- a/node/src/args.rs +++ b/node/src/args.rs @@ -1,134 +1,135 @@ use std::net::SocketAddr; use crate::core; -use crate::watcher::{RenderLayoutKind, RenderPane}; +use crate::renderer::RenderLayoutKind; use clap::{Parser, Subcommand}; use clap::*; #[derive(Parser)] pub struct Cli { - #[command(subcommand)] - pub command: CliCommand, + #[command(subcommand)] + pub command: CliCommand, } #[derive(Subcommand)] pub enum CliCommand { - #[command(name = "ping")] - Ping { - #[command(subcommand)] - ping_cmd: CliPingCommand, - }, + #[command(name = "ping")] + Ping { + #[command(subcommand)] + ping_cmd: CliPingCommand, + }, - /// Peer related Cmd - #[command(name = "peer")] - Peer { - #[command(subcommand)] - peer_cmd: CliPeerCommand, - }, + /// Peer related Cmd + #[command(name = "peer")] + Peer { + #[command(subcommand)] + peer_cmd: CliPeerCommand, + }, - /// Block related Cmd - #[command(name = "block")] - Block { - #[command(subcommand)] - block_cmd: CliBlockCommand, - }, + /// Block related Cmd + #[command(name = "block")] + Block { + #[command(subcommand)] + block_cmd: CliBlockCommand, + }, - /// Make a Transaction - #[command(name = "tx")] - Transaction(core::Tx), + /// Make a Transaction + #[command(name = "tx")] + Transaction(core::Tx), - /// Start new TcpListner on Addr - #[command(name = "listen")] - StartListner { addr: String }, + /// Start new TcpListner on Addr + #[command(name = "listen")] + StartListner { addr: String }, - /// Display Node id - #[command(name = "id")] - DebugShowId, + /// Display Node id + #[command(name = "id")] + DebugShowId, - /// Connect to Seed Nodes - #[command(name = "seed")] - Seeds { - #[command(subcommand)] - seed_cmd: CliSeedCommand, - }, + /// Connect to Seed Nodes + #[command(name = "seed")] + Seeds { + #[command(subcommand)] + seed_cmd: CliSeedCommand, + }, - /// Clear Pane - #[command(name = "clear", aliases = ["c"])] - Clear { pane: RenderPane }, + /// Clear Pane + #[command(name = "clear", aliases = ["c"])] + Clear, - #[command(name = "layout", aliases = ["lay"])] - Layout { mode: RenderLayoutKind }, + /// Set TUI layout + #[command(name = "layout", aliases = ["lay"])] + Layout { mode: RenderLayoutKind }, } #[derive(Subcommand)] pub enum CliPeerCommand { - /// Connect To Peer With IpAddr - #[command(name = "connect", aliases = ["c", "con"])] - Connect { addr: String }, + /// Connect To Peer With IpAddr + #[command(name = "connect", aliases = ["c", "con"])] + Connect { addr: String }, - /// Remove Peer Connection - #[command(name = "remove", aliases = ["rm"])] - Remove { id: String }, + /// Remove Peer Connection + #[command(name = "remove", aliases = ["rm"])] + Remove { id: String }, - /// List Connected Peers - #[command(name = "list", aliases = ["ls", "l"])] - List, + /// List Connected Peers + #[command(name = "list", aliases = ["ls", "l"])] + List, } #[derive(Subcommand)] pub enum CliSeedCommand { - /// Connect to Seed nodes - #[command(name = "connect", aliases = ["c", "con"])] - Connect, + /// Connect to Seed nodes + #[command(name = "connect", aliases = ["c", "con"])] + Connect, } #[derive(Subcommand)] pub enum CliBlockCommand { - /// List Blocks in Chain - #[command(name = "list", aliases = ["ls", "l"])] - List, + /// List Blocks in Chain + #[command(name = "list", aliases = ["ls", "l"])] + List, - /// Create and Broadcast new Block - #[command(name = "create", aliases = ["c", "new"])] - Create, + /// Create and Broadcast new Block + #[command(name = "create", aliases = ["c", "new"])] + Create, - /// Export Blocks to file - #[command(name = "dump", aliases = ["export"])] - Dump { - /// Output file - #[arg(short, long)] - output: String, - }, + /// Export Blocks to file + #[command(name = "dump", aliases = ["export"])] + Dump { + /// Output file + #[arg(short, long)] + output: String, + }, - /// Display Block by Hash - #[command(name = "display", aliases = ["d"])] - #[group(required = true, multiple = false)] - Display{ - /// Block Hash - #[arg(long)] - key: Option, - /// Block Height - #[arg(long)] - height: Option - } + /// Display Block by Hash + #[command(name = "display", aliases = ["d"])] + #[group(multiple = false)] + Display { + /// Block Hash + #[arg(long)] + key: Option, + /// Block Height + #[arg(long)] + height: Option, + }, } #[derive(Subcommand)] pub enum CliPingCommand { - /// Ping Peer by Id - #[command(name = "id", aliases = ["i"])] - Id { - #[arg(short, long)] - id: String, - }, + /// Ping Peer by Id + #[command(name = "id", aliases = ["i"])] + Id { + #[arg(short, long)] + id: String, + }, - /// Ping Peer by Address - #[command(name = "addr", aliases = ["a", "ad"])] - Addr { - #[arg(short, long)] - addr: String, - }, + /// Ping Peer by Address + #[command(name = "addr", aliases = ["a", "ad"])] + Addr { + #[arg(short, long)] + addr: String, + }, } #[derive(Subcommand)] @@ -136,36 +137,36 @@ pub enum CliPingCommand { #[command(about = "A blockchain node CLI tool")] #[command(version = "1.0")] #[command( - long_about = "A comprehensive CLI tool for managing blockchain nodes, peers, and transactions" + long_about = "A comprehensive CLI tool for managing blockchain nodes, peers, and transactions" )] pub enum CliNodeCommand {} #[derive(Parser, Debug)] #[command(version, about, long_about = None)] pub struct CliArgs { - /// Provide address on which node will listen - #[arg(short = 'a', long)] - pub addr: Option, + /// Provide address on which node will listen + #[arg(short = 'a', long)] + pub addr: Option, - /// Provide File with current chain - #[arg(short = 'd', long)] - pub database: Option, + /// Provide File with current chain + #[arg(short = 'd', long)] + pub database: Option, - /// Enable bootstrap mode (alternative syntax) - #[arg(short = 'b', long = "bootstrap", action = clap::ArgAction::SetTrue)] - pub bootstrap: bool, + /// Enable bootstrap mode (alternative syntax) + #[arg(short = 'b', long = "bootstrap", action = clap::ArgAction::SetTrue)] + pub bootstrap: bool, - /// Enable debug mode (alternative syntax) - #[arg(long = "debug", action = clap::ArgAction::SetTrue)] - pub debug: bool, + /// Enable debug mode (alternative syntax) + #[arg(long = "debug", action = clap::ArgAction::SetTrue)] + pub debug: bool, - /// Enable rendering (alternative syntax) - #[arg(short = 'r', long = "render", action = clap::ArgAction::Set, default_value = "true")] - pub render: bool, + /// Enable rendering (alternative syntax) + #[arg(short = 'r', long = "render", action = clap::ArgAction::Set, default_value = "true")] + pub render: bool, - /// Enable debug mode (alternative syntax) - #[arg(short = 's', long = "seed", action = clap::ArgAction::SetTrue)] - pub seed: bool, + /// Enable debug mode (alternative syntax) + #[arg(short = 's', long = "seed", action = clap::ArgAction::SetTrue)] + pub seed: bool, } #[derive(Subcommand, Debug)] @@ -173,11 +174,11 @@ pub enum Commands {} #[derive(Subcommand, Debug)] pub enum TxCmd { - /// Add a new transaction to the DB - #[command(short_flag = 'a')] - Add(core::Tx), + /// Add a new transaction to the DB + #[command(short_flag = 'a')] + Add(core::Tx), } pub fn get_args() -> CliArgs { - CliArgs::parse() + CliArgs::parse() } diff --git a/node/src/bus/executor.rs b/node/src/bus/executor.rs index f68b8a1..6fb1af2 100644 --- a/node/src/bus/executor.rs +++ b/node/src/bus/executor.rs @@ -3,15 +3,15 @@ use std::sync::Arc; use tokio::sync::broadcast; use super::event_bus::EventBus; -use crate::watcher::ExecutorCommand; +use crate::executor::ExecutorCommand; static EXECUTOR_EVENT_BUS: Lazy>> = - Lazy::new(|| Arc::new(EventBus::new())); + Lazy::new(|| Arc::new(EventBus::new())); pub fn publish_executor_event(event: ExecutorCommand) { - EXECUTOR_EVENT_BUS.publish(event); + EXECUTOR_EVENT_BUS.publish(event); } pub fn subscribe_executor_event() -> broadcast::Receiver { - EXECUTOR_EVENT_BUS.subscribe() + EXECUTOR_EVENT_BUS.subscribe() } diff --git a/node/src/bus/network.rs b/node/src/bus/network.rs index 2f9fb2d..eac67a5 100644 --- a/node/src/bus/network.rs +++ b/node/src/bus/network.rs @@ -6,20 +6,20 @@ use super::event_bus::EventBus; #[derive(Clone, Debug)] pub enum NetworkEvent { - SeedConnected(String), - SeedDisconnected(String), - AllSeedsConnected, - BootstrapCompleted, - NodeReady, + SeedConnected(String), + SeedDisconnected(String), + AllSeedsConnected, + BootstrapCompleted, + NodeReady, } static NETWORK_EVENT_BUS: Lazy>> = - Lazy::new(|| Arc::new(EventBus::new())); + Lazy::new(|| Arc::new(EventBus::new())); pub fn publish_network_event(event: NetworkEvent) { - NETWORK_EVENT_BUS.publish(event); + NETWORK_EVENT_BUS.publish(event); } pub fn subscribe_network_event() -> broadcast::Receiver { - NETWORK_EVENT_BUS.subscribe() + NETWORK_EVENT_BUS.subscribe() } diff --git a/node/src/bus/render.rs b/node/src/bus/render.rs deleted file mode 100644 index 8a5af82..0000000 --- a/node/src/bus/render.rs +++ /dev/null @@ -1,17 +0,0 @@ -use once_cell::sync::Lazy; -use std::sync::Arc; -use tokio::sync::broadcast; - -use super::event_bus::EventBus; -use crate::watcher::renderer::RenderCommand; - -static RENDER_CHANNEL: Lazy>> = Lazy::new(|| Arc::new(EventBus::new())); -pub fn publish_render_event(event: RenderCommand) { - RENDER_CHANNEL.publish(event); -} - -pub fn subscribe_render_event() -> broadcast::Receiver { - RENDER_CHANNEL.subscribe() - - -} diff --git a/node/src/bus/system.rs b/node/src/bus/system.rs index c36fb56..1056230 100644 --- a/node/src/bus/system.rs +++ b/node/src/bus/system.rs @@ -6,18 +6,18 @@ use super::event_bus::EventBus; #[derive(Clone, Debug)] pub enum SystemEvent { - ExecutorStarted, - RendererStarted, - NodeStarted, - Exit, + ExecutorStarted, + RendererStarted, + NodeStarted, + Exit, } static SYSTEM_EVENT_BUS: Lazy>> = Lazy::new(|| Arc::new(EventBus::new())); pub fn publish_system_event(event: SystemEvent) { - SYSTEM_EVENT_BUS.publish(event); + SYSTEM_EVENT_BUS.publish(event); } pub fn subscribe_system_event() -> broadcast::Receiver { - SYSTEM_EVENT_BUS.subscribe() + SYSTEM_EVENT_BUS.subscribe() } diff --git a/node/src/bus/watcher.rs b/node/src/bus/watcher.rs new file mode 100644 index 0000000..60c64dc --- /dev/null +++ b/node/src/bus/watcher.rs @@ -0,0 +1,15 @@ +use once_cell::sync::Lazy; +use std::sync::Arc; +use tokio::sync::broadcast; + +use super::event_bus::EventBus; +use crate::watcher::WatcherCommand; + +static WATCHER_CHANNEL: Lazy>> = Lazy::new(|| Arc::new(EventBus::new())); +pub fn publish_watcher_event(event: WatcherCommand) { + WATCHER_CHANNEL.publish(event); +} + +pub fn subscribe_watcher_event() -> broadcast::Receiver { + WATCHER_CHANNEL.subscribe() +} diff --git a/node/src/cli.rs b/node/src/cli.rs index 100fa39..17be654 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -1,68 +1,66 @@ use crate::args::*; use crate::core::ChainData; +use crate::executor::ExecutorCommand; use crate::node::*; -use crate::watcher::{RenderCommand, ExecutorCommand}; +use crate::renderer::RenderCommand; use clap::Parser; pub fn handle_peer_command(cmd: CliPeerCommand) -> NodeCommand { - match cmd { - CliPeerCommand::List => NodeCommand::ListPeers, - CliPeerCommand::Remove { id } => NodeCommand::RemovePeer { - peer_id: id.parse::().unwrap(), - }, - CliPeerCommand::Connect { addr } => NodeCommand::ConnectTcpPeer(addr), - } + match cmd { + CliPeerCommand::List => NodeCommand::ListPeers, + CliPeerCommand::Remove { id } => NodeCommand::RemovePeer { + peer_id: id.parse::().unwrap(), + }, + CliPeerCommand::Connect { addr } => NodeCommand::ConnectTcpPeer(addr), + } } pub fn handle_block_command(cmd: CliBlockCommand) -> NodeCommand { - match cmd { - CliBlockCommand::List => NodeCommand::ListBlocks, - CliBlockCommand::Dump { output } => NodeCommand::DumpBlocks(output), - CliBlockCommand::Create => NodeCommand::CreateBlock, - CliBlockCommand::Display{key, height} => { - match (key, height) { - (Some(k), _) => return NodeCommand::DisplayBlockByKey(k), - (_, Some(h)) => return NodeCommand::DisplayBlockByHeight(h), - (None, None) => panic!() - } - }, - } + match cmd { + CliBlockCommand::List => NodeCommand::ListBlocks, + CliBlockCommand::Dump { output } => NodeCommand::DumpBlocks(output), + CliBlockCommand::Create => NodeCommand::CreateBlock, + CliBlockCommand::Display { key, height } => match (key, height) { + (Some(k), _) => return NodeCommand::DisplayBlockByKey(k), + (_, Some(h)) => return NodeCommand::DisplayBlockByHeight(h), + (None, None) => return NodeCommand::DisplayBlockInteractive, + }, + } } fn handle_seed_command(cmd: CliSeedCommand) -> NodeCommand { - match cmd { - CliSeedCommand::Connect => NodeCommand::ConnectToSeeds, - } + match cmd { + CliSeedCommand::Connect => NodeCommand::ConnectToSeeds, + } } fn handle_ping(cmd: CliPingCommand) -> NodeCommand { - match cmd { - CliPingCommand::Id { id } => NodeCommand::PingId(id), - CliPingCommand::Addr { addr } => NodeCommand::PingAddr(addr), - } + match cmd { + CliPingCommand::Id { id } => NodeCommand::PingId(id), + CliPingCommand::Addr { addr } => NodeCommand::PingAddr(addr), + } } -pub fn cli(input: &[&str]) -> ExecutorCommand { - match Cli::try_parse_from(input) { - Ok(cmd) => match cmd.command { - CliCommand::Layout { mode } => { - ExecutorCommand::Render(RenderCommand::ChangeLayout(mode)) - } - CliCommand::Clear { pane } => ExecutorCommand::Render(RenderCommand::ClearPane(pane)), - CliCommand::Peer { peer_cmd } => ExecutorCommand::Node(handle_peer_command(peer_cmd)), - CliCommand::Block { block_cmd } => { - ExecutorCommand::Node(handle_block_command(block_cmd)) - } - CliCommand::Transaction(tx) => ExecutorCommand::Node(NodeCommand::ProcessChainData( - ChainData::Transaction(tx), - )), - CliCommand::DebugShowId => ExecutorCommand::Node(NodeCommand::ShowId), - CliCommand::StartListner { addr } => { - ExecutorCommand::Node(NodeCommand::StartListner(addr.parse().unwrap())) - } - CliCommand::Seeds { seed_cmd } => ExecutorCommand::Node(handle_seed_command(seed_cmd)), - CliCommand::Ping { ping_cmd } => ExecutorCommand::Node(handle_ping(ping_cmd)), - }, - Err(e) => ExecutorCommand::InvalidCommand(format!("{e}")), - } +pub fn cli(input: &str) -> ExecutorCommand { + let argv: Vec<&str> = std::iter::once(" ") + .chain(input.split_whitespace()) + .collect(); + match Cli::try_parse_from(argv) { + Ok(cmd) => match cmd.command { + CliCommand::Layout { mode } => ExecutorCommand::Render(RenderCommand::ChangeLayout(mode)), + CliCommand::Clear => ExecutorCommand::Render(RenderCommand::ClearPane), + CliCommand::Peer { peer_cmd } => ExecutorCommand::Node(handle_peer_command(peer_cmd)), + CliCommand::Block { block_cmd } => ExecutorCommand::Node(handle_block_command(block_cmd)), + CliCommand::Transaction(tx) => { + ExecutorCommand::Node(NodeCommand::ProcessChainData(ChainData::Transaction(tx))) + } + CliCommand::DebugShowId => ExecutorCommand::Node(NodeCommand::ShowId), + CliCommand::StartListner { addr } => { + ExecutorCommand::Node(NodeCommand::StartListner(addr.parse().unwrap())) + } + CliCommand::Seeds { seed_cmd } => ExecutorCommand::Node(handle_seed_command(seed_cmd)), + CliCommand::Ping { ping_cmd } => ExecutorCommand::Node(handle_ping(ping_cmd)), + }, + Err(e) => ExecutorCommand::InvalidCommand(format!("{e}")), + } } diff --git a/node/src/core/block.rs b/node/src/core/block.rs index f6a31d4..4ef9d99 100644 --- a/node/src/core/block.rs +++ b/node/src/core/block.rs @@ -1,45 +1,49 @@ -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Default, bincode::Decode, bincode::Encode)] +#[derive( + Clone, Debug, serde::Deserialize, serde::Serialize, Default, bincode::Decode, bincode::Encode, +)] pub struct BlockHeader { - pub previous_hash: String, - pub timestamp: u64, - pub merkle_root: String, - pub block_hash: String, - pub nonce: u32, - pub height: u64, + pub previous_hash: String, + pub timestamp: u64, + pub merkle_root: String, + pub block_hash: String, + pub nonce: u32, + pub height: u64, } -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Default, bincode::Decode, bincode::Encode)] +#[derive( + Clone, Debug, serde::Deserialize, serde::Serialize, Default, bincode::Decode, bincode::Encode, +)] pub struct Block { - pub head: BlockHeader, - pub data: Vec, + pub head: BlockHeader, + pub data: Vec, } impl BlockHeader { - pub fn previous_hash(&self) -> &str { - &self.previous_hash - } - pub fn timestamp(&self) -> u64 { - self.timestamp - } - pub fn nonce(&self) -> u32 { - self.nonce - } - pub fn merkle_root(&self) -> &str { - &self.merkle_root - } - pub fn block_hash(&self) -> &str { - &self.block_hash - } + pub fn previous_hash(&self) -> &str { + &self.previous_hash + } + pub fn timestamp(&self) -> u64 { + self.timestamp + } + pub fn nonce(&self) -> u32 { + self.nonce + } + pub fn merkle_root(&self) -> &str { + &self.merkle_root + } + pub fn block_hash(&self) -> &str { + &self.block_hash + } } impl Block { - pub fn new(head: BlockHeader, data: Vec) -> Self { - Self { head, data } - } - pub fn head(&self) -> &BlockHeader { - &self.head - } - pub fn data(&self) -> &[String] { - &self.data - } + pub fn new(head: BlockHeader, data: Vec) -> Self { + Self { head, data } + } + pub fn head(&self) -> &BlockHeader { + &self.head + } + pub fn data(&self) -> &[String] { + &self.data + } } diff --git a/node/src/core/blockchain.rs b/node/src/core/blockchain.rs index 5f1a36e..25681be 100644 --- a/node/src/core/blockchain.rs +++ b/node/src/core/blockchain.rs @@ -1,18 +1,18 @@ use std::sync::Arc; -use crate::core::ChainData; -use crate::db::database; -use crate::db; use crate::core; +use crate::core::ChainData; +use crate::db; use crate::db::DatabaseError; +use crate::db::database; use crate::error::TxError; use crate::log; use super::hasher::Hasher; -use vlogger::*; use std::collections::HashMap; use std::time::UNIX_EPOCH; +use vlogger::*; use thiserror::*; #[allow(dead_code)] @@ -31,8 +31,7 @@ pub enum BlockchainError { Validation(#[from] ValidationError), #[error("Block Creation Error")] - BlockCreation - + BlockCreation, } const BLOCKCHAIN_ID: &str = "watch-chain"; @@ -66,7 +65,6 @@ impl Blockchain { Ok(()) } - fn acc_exists(&self, acc: &Account) -> bool { self.balances.iter().find(|(k, _)| *k == acc).is_some() } @@ -76,10 +74,11 @@ impl Blockchain { } fn hash_transaction_pool(&self) -> Vec { - self.mempool - .iter() - .map(|tx| Hasher::hash_chain_data(tx)) - .collect() + self + .mempool + .iter() + .map(|tx| Hasher::hash_chain_data(tx)) + .collect() } pub fn create_block(&mut self) -> Result, BlockchainError> { @@ -120,7 +119,7 @@ impl Blockchain { self.add_block(new_block.clone())?; Ok(new_block) } - Err(_) => Err(BlockchainError::BlockCreation) + Err(_) => Err(BlockchainError::BlockCreation), } } @@ -163,11 +162,11 @@ impl Blockchain { } impl Blockchain { - pub fn list_blocks(&self) -> Result { - let mut ret = String::from("Blocks List\n-------------------\n"); + pub fn list_blocks(&self) -> Result, BlockchainError> { + let mut ret = Vec::new(); let blocks = self.blocks()?; - for (i, b) in blocks.iter().enumerate() { - ret.push_str(format!("Block Hash #{i}: {}\n", b.head.block_hash()).as_str()) + for b in blocks.iter() { + ret.push(b.head.block_hash().to_string()) } Ok(ret) } @@ -185,7 +184,7 @@ impl Blockchain { Ok(()) } - pub fn add_block(&mut self, block: Arc) -> Result<(), BlockchainError>{ + pub fn add_block(&mut self, block: Arc) -> Result<(), BlockchainError> { match self.validate_block(&block) { Ok(()) => Ok(self.insert_block(&block)?), Err(e) => Err(BlockchainError::Validation(e)), @@ -223,7 +222,7 @@ impl Blockchain { fn validate_chain(&self) -> Result<(), ValidationError> { if let Ok(blocks) = self.blocks() { if let Some(mut prev_block) = blocks.first() { - for (i, block) in blocks.iter().enumerate() { + for (i, block) in blocks.iter().skip(1).enumerate() { let head = block.head(); let hash = Hasher::calculate_block_hash(block.head()); @@ -246,9 +245,11 @@ impl Blockchain { balances: HashMap::new(), mempool: vec![], id: BLOCKCHAIN_ID.to_string(), - db + db, }; - chain.validate_chain().or_else(|e| return Err(BlockchainError::Validation(e)))?; + chain + .validate_chain() + .or_else(|e| return Err(BlockchainError::Validation(e)))?; Ok(chain) } } diff --git a/node/src/core/data.rs b/node/src/core/data.rs index 5240c33..b7dfa5d 100644 --- a/node/src/core/data.rs +++ b/node/src/core/data.rs @@ -4,5 +4,5 @@ use super::Tx; #[derive(serde::Deserialize, serde::Serialize, Encode, Decode, Debug, Clone)] pub enum ChainData { - Transaction(Tx), + Transaction(Tx), } diff --git a/node/src/core/hasher.rs b/node/src/core/hasher.rs index 884f279..639fc5f 100644 --- a/node/src/core/hasher.rs +++ b/node/src/core/hasher.rs @@ -1,7 +1,7 @@ -use sha2::Sha256; use sha2::Digest; +use sha2::Sha256; -use super::{ChainData, BlockHeader}; +use super::{BlockHeader, ChainData}; pub struct Hasher {} diff --git a/node/src/core/tx.rs b/node/src/core/tx.rs index 7a46ee9..9be6420 100644 --- a/node/src/core/tx.rs +++ b/node/src/core/tx.rs @@ -1,50 +1,52 @@ use crate::core::Account; use crate::error::TxError; -#[derive(serde::Deserialize, serde::Serialize, Debug, clap::Args, Clone, bincode::Encode, bincode::Decode)] +#[derive( + serde::Deserialize, serde::Serialize, Debug, clap::Args, Clone, bincode::Encode, bincode::Decode, +)] pub struct Tx { - from: Account, - to: Account, - value: u32, - data: String, + from: Account, + to: Account, + value: u32, + data: String, } impl Tx { - pub fn new(from: Account, to: Account, value: u32, data: String) -> Self { - Self { - from, - to, - value, - data, - } + pub fn new(from: Account, to: Account, value: u32, data: String) -> Self { + Self { + from, + to, + value, + data, } + } - pub fn validate(&self) -> Result<(), TxError> { - if self.from.is_empty() { - return Err(TxError::FromEmpty); - } else if self.to.is_empty() { - return Err(TxError::ToEmpty); - } else if self.value == 0 { - return Err(TxError::ValueEmpty); - } - Ok(()) - } - pub fn is_new_account(&self) -> bool { - return self.data == "new_account"; - } - pub fn is_reward(&self) -> bool { - return self.data == "reward"; - } - pub fn from(&self) -> &Account { - &self.from - } - pub fn to(&self) -> &Account { - &self.to - } - pub fn value(&self) -> u32 { - self.value - } - pub fn data(&self) -> &str { - &self.data + pub fn validate(&self) -> Result<(), TxError> { + if self.from.is_empty() { + return Err(TxError::FromEmpty); + } else if self.to.is_empty() { + return Err(TxError::ToEmpty); + } else if self.value == 0 { + return Err(TxError::ValueEmpty); } + Ok(()) + } + pub fn is_new_account(&self) -> bool { + return self.data == "new_account"; + } + pub fn is_reward(&self) -> bool { + return self.data == "reward"; + } + pub fn from(&self) -> &Account { + &self.from + } + pub fn to(&self) -> &Account { + &self.to + } + pub fn value(&self) -> u32 { + self.value + } + pub fn data(&self) -> &str { + &self.data + } } diff --git a/node/src/db/database.rs b/node/src/db/database.rs index 74b9872..999168b 100644 --- a/node/src/db/database.rs +++ b/node/src/db/database.rs @@ -1,8 +1,13 @@ +use crate::{ + core::{self, Block, ChainData, Hasher}, + db::error::DatabaseError, + error::print_error_chain, + log, +}; use bincode::{self, config::Configuration}; use sled::{self, Batch}; -use crate::{core::{self, Block, ChainData, Hasher}, db::error::DatabaseError, error::print_error_chain, log}; -use vlogger::*; use std::sync::Arc; +use vlogger::*; static BINCODE_CONFIG: Configuration = bincode::config::standard(); @@ -10,55 +15,66 @@ const DB_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/database"); const DB_TREE: &str = "blocks"; -const BLOCK_INDEX: &str = "blocks:"; -const CHAIN_DATA_INDEX: &str = "chain_data:"; -const DATA_TO_BLOCK_INDEX: &str = "data_to_block:"; -const METADATA_INDEX: &str= "metadata:"; +const BLOCK_INDEX: &str = "blocks"; +const CHAIN_DATA_INDEX: &str = "chain_data"; +const DATA_TO_BLOCK_INDEX: &str = "data_to_block"; +const METADATA_INDEX: &str = "metadata"; const TIP_KEY: &str = "chain_tip"; const HEIGHT_KEY: &str = "chain_height"; +const HEIGHT_TO_HASH_INDEX: &str = "height_to_hash"; + #[derive(Debug)] pub struct ChainDb { db: sled::Tree, } fn data_index(key: &str) -> String { - format!("{}{}", CHAIN_DATA_INDEX, key) + format!("{}:{}", CHAIN_DATA_INDEX, key) } fn data_to_block_index(key: &str) -> String { - format!("{}{}", DATA_TO_BLOCK_INDEX, key) + format!("{}:{}", DATA_TO_BLOCK_INDEX, key) } fn block_index(key: &str) -> String { - format!("{}{}", BLOCK_INDEX, key) + format!("{}:{}", BLOCK_INDEX, key) } fn metadata_index(key: &str) -> String { - format!("{}{}", METADATA_INDEX, key) + format!("{}:{}", METADATA_INDEX, key) } +fn height_to_hash_index(height: u64) -> String { + format!("{}:{:020}", HEIGHT_TO_HASH_INDEX, height) +} impl ChainDb { - pub fn new(path: Option) ->Result { + pub fn new(path: Option) -> Result { let path = if path.is_some() { &path.unwrap() } else { DB_PATH }; - match sled::open(&path) { + let config = sled::Config::new() + .cache_capacity(512 * 1024) + .segment_size(1024 * 1024) + .path(&path); + + match config.open() { Ok(db) => { if db.was_recovered() { - log(msg!(INFO, "Loaded Database from Previous state at: {}", path)); + log(msg!( + INFO, + "Loaded Database from Previous state at: {}", + path + )); } else { log(msg!(INFO, "Created Database at {}", path)); } - let db = db - .open_tree(DB_TREE)?; - Ok(ChainDb { - db - }) + let db = db.open_tree(DB_TREE)?; + Ok(ChainDb { db }) } Err(err) => { print_error_chain(&err.clone().into()); @@ -70,15 +86,18 @@ impl ChainDb { pub fn get_block_by_key(&self, block_hash: &str) -> Result, DatabaseError> { let block_hash = block_index(block_hash); if let Some(bin_block) = self.db.get(block_hash)? { - let (block, _size) = bincode::decode_from_slice::(&bin_block, BINCODE_CONFIG) - .map_err(|e| DatabaseError::Decode(e))?; + let (block, _size) = bincode::decode_from_slice::(&bin_block, BINCODE_CONFIG) + .map_err(|e| DatabaseError::Decode(e))?; Ok(Some(block)) } else { Ok(None) } } - pub fn get_block_by_height(&self, height: u64) -> Result>, DatabaseError> { + pub fn get_block_by_height( + &self, + height: u64, + ) -> Result>, DatabaseError> { for result in self.db.scan_prefix(BLOCK_INDEX) { let (_key, value) = result?; let (block, _size) = bincode::decode_from_slice::(&value, BINCODE_CONFIG)?; @@ -107,7 +126,9 @@ impl ChainDb { } pub fn get_all_blocks(&self) -> Result>, DatabaseError> { - self.db.scan_prefix(BLOCK_INDEX) + self + .db + .scan_prefix(BLOCK_INDEX) .map(|res| -> Result, DatabaseError> { let (_key, value) = res?; let (block, _size) = bincode::decode_from_slice::(&value, BINCODE_CONFIG) @@ -128,11 +149,21 @@ impl ChainDb { let mut db_batch = Batch::default(); let bin_block = bincode::encode_to_vec(block, BINCODE_CONFIG)?; db_batch.insert(block_index(block.head().block_hash()).as_str(), bin_block); + db_batch.insert( + height_to_hash_index(block.head().height).as_str(), + block.head().block_hash(), + ); for data in block.data() { - db_batch.insert(data_to_block_index(data.as_str()).as_str(), block.head().block_hash()); + db_batch.insert( + data_to_block_index(data.as_str()).as_str(), + block.head().block_hash(), + ); } db_batch.insert(metadata_index(TIP_KEY).as_str(), block.head().block_hash()); - db_batch.insert(metadata_index(HEIGHT_KEY).as_str(), &block.head().height.to_be_bytes()); + db_batch.insert( + metadata_index(HEIGHT_KEY).as_str(), + &block.head().height.to_be_bytes(), + ); self.db.apply_batch(db_batch)?; Ok(()) } diff --git a/node/src/db/error.rs b/node/src/db/error.rs index 97370c4..b07be5b 100644 --- a/node/src/db/error.rs +++ b/node/src/db/error.rs @@ -18,5 +18,5 @@ pub enum DatabaseError { Decode(#[from] bincode::error::DecodeError), #[error("Missing chain data for hash: {0}")] - MissingData(String) + MissingData(String), } diff --git a/node/src/executor/command.rs b/node/src/executor/command.rs new file mode 100644 index 0000000..81663db --- /dev/null +++ b/node/src/executor/command.rs @@ -0,0 +1,15 @@ +use crate::node::NodeCommand; +use crate::renderer::RenderCommand; +use crate::watcher::WatcherCommand; + +#[derive(Clone, Debug)] +pub enum ExecutorCommand { + NodeResponse(String), + Echo(Vec), + Print(String), + InvalidCommand(String), + Node(NodeCommand), + Render(RenderCommand), + Watcher(WatcherCommand), + Exit, +} diff --git a/node/src/executor/executor.rs b/node/src/executor/executor.rs new file mode 100644 index 0000000..2a7979a --- /dev/null +++ b/node/src/executor/executor.rs @@ -0,0 +1,92 @@ +use crate::{ + bus::{SystemEvent, publish_watcher_event, publish_system_event}, + log, + node::NodeCommand, + renderer::RenderTarget, + watcher::WatcherCommand, +}; +use thiserror::Error; +use tokio::sync::mpsc; +use vlogger::*; + +use super::ExecutorCommand; +use crate::RenderCommand; + +#[derive(Debug, Error)] +pub enum InProcessError { + #[error("TODO: {0}")] + TODO(String), +} + +pub struct Executor { + node_tx: mpsc::Sender, + rx: mpsc::Receiver, + exit: bool, +} + +impl Executor { + pub fn new(node_tx: mpsc::Sender, rx: mpsc::Receiver) -> Self { + Self { + node_tx, + rx, + exit: false, + } + } + + pub async fn run(&mut self) { + publish_system_event(SystemEvent::ExecutorStarted); + while !self.exit { + self.listen().await; + } + } + + async fn exit(&mut self) { + log(msg!(DEBUG, "Executor Exit")); + self.exit = true + } + + async fn listen(&mut self) { + if let Some(cmd) = self.rx.recv().await { + let _ = self.execute(cmd).await; + } + } + + async fn send_node_cmd(&self, cmd: NodeCommand) { + self.node_tx.send(cmd).await.unwrap() + } + + async fn handle_node_cmd(&self, cmd: NodeCommand) { + self.send_node_cmd(cmd).await; + } + + async fn echo(&self, s: Vec) { + let mut str = s.join(" "); + str.push_str("\n"); + let rd_cmd = WatcherCommand::Render(RenderCommand::RenderStringToPaneId { + str, + pane: RenderTarget::CliOutput, + }); + publish_watcher_event(rd_cmd); + } + + async fn invalid_command(&self, str: String) { + let rd_cmd = WatcherCommand::Render(RenderCommand::RenderStringToPaneId { + str, + pane: RenderTarget::CliOutput, + }); + publish_watcher_event(rd_cmd); + } + + async fn execute(&mut self, cmd: ExecutorCommand) { + match cmd { + ExecutorCommand::NodeResponse(resp) => log(resp), + ExecutorCommand::Node(n) => self.handle_node_cmd(n).await, + ExecutorCommand::Render(p) => publish_watcher_event(WatcherCommand::Render(p)), + ExecutorCommand::Watcher(w) => publish_watcher_event(w), + ExecutorCommand::Echo(s) => self.echo(s).await, + ExecutorCommand::Print(s) => log(s), + ExecutorCommand::InvalidCommand(str) => self.invalid_command(str).await, + ExecutorCommand::Exit => self.exit().await, + } + } +} diff --git a/node/src/lib.rs b/node/src/lib.rs index fe9ce32..079991e 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -1,9 +1,9 @@ pub mod node { - pub mod node; - pub use node::*; + pub mod node; + pub use node::*; - pub mod error; - pub use error::*; + pub mod error; + pub use error::*; } pub mod cli; @@ -17,69 +17,85 @@ pub mod db { pub mod error; pub use database::*; pub use error::*; - } pub mod bus { - pub mod executor; - pub mod network; - pub mod render; - pub mod system; pub mod event_bus; + pub mod network; + pub mod watcher; + pub mod system; pub use executor::*; + pub mod executor; pub use network::*; - pub use render::*; + pub use watcher::*; pub use system::*; } -pub mod watcher { - pub mod executor; - pub mod parser; - pub mod renderer; - pub mod watcher; +pub mod executor { + pub mod executor; + pub use executor::*; - pub use executor::*; - pub use parser::*; - pub use renderer::*; - pub use watcher::*; + pub mod command; + pub use command::*; +} +pub mod renderer { + pub mod renderer; + pub use renderer::*; + + pub mod pane; + pub use pane::*; + + pub mod layout; + pub use layout::*; +} + +pub mod watcher { + pub mod builder; + pub mod watcher; + + pub use builder::*; + pub use watcher::*; + + pub mod command; + pub use command::*; } pub mod protocol { - pub mod message; - pub use message::*; + pub mod message; + pub use message::*; - pub mod connection; - pub use connection::*; + pub mod connection; + pub use connection::*; - pub mod connector; - pub use connector::*; + pub mod connector; + pub use connector::*; } pub mod core { - pub mod block; - pub use block::*; + pub mod block; + pub use block::*; - pub mod blockchain; - pub use blockchain::*; + pub mod blockchain; + pub use blockchain::*; - pub mod tx; - pub use tx::*; + pub mod tx; + pub use tx::*; - pub mod data; - pub use data::*; + pub mod data; + pub use data::*; - pub mod hasher; - pub use hasher::*; + pub mod hasher; + pub use hasher::*; } pub mod seeds_constants; -use crate::watcher::renderer::{RenderCommand, RenderPane}; +use crate::renderer::{RenderCommand, RenderTarget}; pub fn log(msg: String) { - crate::bus::publish_render_event(RenderCommand::RenderStringToPane { - pane: RenderPane::CliOutput, - str: msg, - }) + crate::bus::publish_watcher_event(watcher::WatcherCommand::Render(RenderCommand::RenderStringToPaneId { + pane: RenderTarget::CliOutput, + str: msg, + })) } diff --git a/node/src/main.rs b/node/src/main.rs index 5879a2e..2db0abd 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -10,37 +10,20 @@ use clap::Parser; #[tokio::main] async fn main() -> Result<(), std::io::Error> { - let args = args::CliArgs::parse(); + let args = args::CliArgs::parse(); - let mut watcher = Watcher::build() - .addr(args.addr) - .seed(args.seed) - .debug(args.debug) - .render(args.render) - .bootstrap(args.bootstrap) - .start() - .await; + let mut watcher = Watcher::build() + .addr(args.addr) + .seed(args.seed) + .debug(args.debug) + .render(args.render) + .bootstrap(args.bootstrap) + .start() + .await; - crossterm::execute!( - std::io::stdout(), - crossterm::event::EnableBracketedPaste, - crossterm::event::EnableFocusChange, - crossterm::event::EnableMouseCapture, - )?; + watcher.run().await?; - loop { - if !watcher.poll().await.is_ok_and(|b| b) { - break; - } - } + println!("Hello, world!"); - crossterm::execute!( - std::io::stdout(), - crossterm::event::DisableBracketedPaste, - crossterm::event::DisableFocusChange, - crossterm::event::DisableMouseCapture - )?; - ratatui::restore(); - println!("Hello, world!"); - Ok(()) + Ok(()) } diff --git a/node/src/node/error.rs b/node/src/node/error.rs index ce36257..65cf602 100644 --- a/node/src/node/error.rs +++ b/node/src/node/error.rs @@ -3,5 +3,5 @@ use thiserror::Error; #[derive(Debug, Clone, Error)] pub enum NetworkError { #[error("Implement NetworkError Enum: ({})", file!())] - TODO + TODO, } diff --git a/node/src/node/node.rs b/node/src/node/node.rs index 9baa07a..9f3f413 100644 --- a/node/src/node/node.rs +++ b/node/src/node/node.rs @@ -1,443 +1,466 @@ +use crate::bus::{publish_system_event, publish_watcher_event, SystemEvent}; use crate::core::{self, Blockchain, BlockchainError, ChainData, ValidationError}; use crate::error::print_error_chain; -use crate::bus::{SystemEvent, publish_system_event}; +use crate::executor::ExecutorCommand; +use crate::log; use crate::protocol::ProtocolMessage; use crate::protocol::{Connector, ConnectorCommand}; use crate::seeds_constants::SEED_NODES; -use crate::watcher::executor::ExecutorCommand; -use crate::log; +use crate::watcher::{WatcherCommand, WatcherMode}; use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; +use thiserror::*; use tokio::sync::mpsc; use uuid::Uuid; use vlogger::*; -use thiserror::*; #[derive(Debug, Clone)] pub struct TcpPeer { - pub id: Uuid, - pub addr: SocketAddr, - pub sender: tokio::sync::mpsc::Sender, + pub id: Uuid, + pub addr: SocketAddr, + pub sender: tokio::sync::mpsc::Sender, } impl TcpPeer { - pub fn new( - id: Uuid, - addr: SocketAddr, - sender: tokio::sync::mpsc::Sender, - ) -> Self { - Self { id, addr, sender } - } + pub fn new( + id: Uuid, + addr: SocketAddr, + sender: tokio::sync::mpsc::Sender, + ) -> Self { + Self { id, addr, sender } + } } #[allow(dead_code)] pub struct Node { - pub tcp_connector: Option>, - pub id: Uuid, - pub addr: Option, - pub tcp_peers: HashMap, - chain: Blockchain, - listner_handle: Option>, - exec_tx: mpsc::Sender, - rx: mpsc::Receiver, - tx: mpsc::Sender, + pub tcp_connector: Option>, + pub id: Uuid, + pub addr: Option, + pub tcp_peers: HashMap, + chain: Blockchain, + listner_handle: Option>, + exec_tx: mpsc::Sender, + rx: mpsc::Receiver, + tx: mpsc::Sender, } #[derive(Debug, Error)] pub enum NodeError { #[error("Block chain error")] - ChainError(#[from] BlockchainError) + ChainError(#[from] BlockchainError), } #[derive(Debug, Clone)] pub enum NodeCommand { - AddPeer(TcpPeer), - RemovePeer { - peer_id: Uuid, - }, - ProcessMessage { - peer_id: Uuid, - message: ProtocolMessage, - }, - ProcessChainData(ChainData), - StartListner(SocketAddr), - PingAddr(String), - PingId(String), - CreateBlock, - DisplayBlockByKey(String), - DisplayBlockByHeight(u64), - ListBlocks, - ListPeers, - ShowId, - DumpBlocks(String), - ConnectToSeeds, - ConnectTcpPeer(String), - BootStrap, - Exit, + AddPeer(TcpPeer), + RemovePeer { + peer_id: Uuid, + }, + ProcessMessage { + peer_id: Uuid, + message: ProtocolMessage, + }, + ProcessChainData(ChainData), + StartListner(SocketAddr), + PingAddr(String), + PingId(String), + CreateBlock, + DisplayBlockInteractive, + DisplayBlockByKey(String), + DisplayBlockByHeight(u64), + ListBlocks, + ListPeers, + ShowId, + DumpBlocks(String), + ConnectToSeeds, + ConnectTcpPeer(String), + BootStrap, + Exit, } impl Node { - pub fn peer_addresses(&self) -> Vec { - let mut addr: Vec = self - .tcp_peers - .iter() - .map(|p| p.1.addr.to_string().parse::().unwrap()) - .collect(); - if let Some(a) = self.addr { - addr.push(a.clone()); - } - addr + pub fn peer_addresses(&self) -> Vec { + let mut addr: Vec = self + .tcp_peers + .iter() + .map(|p| p.1.addr.to_string().parse::().unwrap()) + .collect(); + if let Some(a) = self.addr { + addr.push(a.clone()); } + addr + } - pub fn list_peers(&self) -> String { - let mut ret = String::from("Peer List\n-----------\n"); - for (i, p) in self.tcp_peers.iter().enumerate() { - ret.push_str(format!("Peer #{i}: {}\n", p.1.id).as_str()) - } - ret + pub fn list_peers(&self) -> String { + let mut ret = String::from("Peer List\n-----------\n"); + for (i, p) in self.tcp_peers.iter().enumerate() { + ret.push_str(format!("Peer #{i}: {}\n", p.1.id).as_str()) } + ret + } - pub async fn show_id(&self) { - log(msg!(DEBUG, "Node Id: {}", self.id)) + pub async fn show_id(&self) { + log(msg!(DEBUG, "Node Id: {}", self.id)) + } + + async fn remove_tcp_peer(&mut self, peer_id: Uuid) { + log(msg!(DEBUG, "Removing Peer {peer_id}")); + self.tcp_peers.remove_entry(&peer_id); + } + + async fn add_tcp_peer(&mut self, peer: TcpPeer) { + log(msg!(DEBUG, "Added Peer from address: {}", peer.addr)); + self.tcp_peers.insert(peer.id, peer); + } + + pub async fn new_with_id( + id: uuid::Uuid, + exec_tx: mpsc::Sender, + addr: Option, + chain: Blockchain, + ) -> Self { + let (tx, rx) = mpsc::channel::(100); + Self { + id, + tcp_peers: HashMap::new(), + addr, + exec_tx, + chain, + listner_handle: None, + tcp_connector: None, + tx, + rx, } + } - async fn remove_tcp_peer(&mut self, peer_id: Uuid) { - log(msg!(DEBUG, "Removing Peer {peer_id}")); - self.tcp_peers.remove_entry(&peer_id); + pub fn new( + addr: Option, + exec_tx: mpsc::Sender, + chain: Blockchain, + ) -> Self { + let (tx, rx) = mpsc::channel::(100); + Self { + id: Uuid::new_v4(), + tcp_peers: HashMap::new(), + addr, + exec_tx, + listner_handle: None, + tcp_connector: None, + chain, + tx, + rx, } + } - async fn add_tcp_peer(&mut self, peer: TcpPeer) { - log(msg!(DEBUG, "Added Peer from address: {}", peer.addr)); - self.tcp_peers.insert(peer.id, peer); - } + fn get_blocks(&self) -> Result>, NodeError> { + Ok(self.chain.blocks()?) + } - pub async fn new_with_id( - id: uuid::Uuid, - exec_tx: mpsc::Sender, - addr: Option, - chain: Blockchain - ) -> Self { - let (tx, rx) = mpsc::channel::(100); - Self { - id, - tcp_peers: HashMap::new(), - addr, - exec_tx, - chain, - listner_handle: None, - tcp_connector: None, - tx, - rx, - } - } - - pub fn new( - addr: Option, - exec_tx: mpsc::Sender, - chain: Blockchain, - ) -> Self { - let (tx, rx) = mpsc::channel::(100); - Self { - id: Uuid::new_v4(), - tcp_peers: HashMap::new(), - addr, - exec_tx, - listner_handle: None, - tcp_connector: None, - chain, - tx, - rx, - } - } - - fn get_blocks(&self) -> Result>, NodeError> { - Ok(self.chain.blocks()?) - } - - pub async fn process_message(&mut self, peer_id: uuid::Uuid, message: ProtocolMessage) { - match message { - ProtocolMessage::BootstrapRequest { .. } => { - log(msg!(DEBUG, "Received BootstrapRequest from {peer_id}")); - let peer = &self.tcp_peers[&peer_id]; - let resp = ProtocolMessage::BootstrapResponse { - blocks: { - if let Ok(blocks) = self.get_blocks() { - serde_json::to_string(&blocks - .iter() - .map(|f| (**f).clone()) - .collect::>() - ).map_err( - |e| { - log(msg!( - ERROR, - "Failed to serde Chain for BootstrapResponse: {e}" - )); - e - }, - ).ok() - } else { - None - } - } - }; - peer.sender.send(resp).await.unwrap(); - log(msg!(DEBUG, "Send BootstrapResponse to {peer_id}")); - } - ProtocolMessage::BootstrapResponse { blocks } => { - log(msg!(DEBUG, "Received BootstrapResponse from seed")); - self.chain = core::Blockchain::build(blocks).unwrap(); - } - ProtocolMessage::Pong { peer_id } => { - log(msg!(DEBUG, "Received Pong from {peer_id}")); - } - ProtocolMessage::Ping { peer_id } => { - log(msg!(DEBUG, "Received Ping from {peer_id}")); - let resp = ProtocolMessage::Pong { - peer_id: self.id.clone(), - }; - let peer = &self.tcp_peers[&peer_id]; - peer.sender.send(resp).await.unwrap(); - } - ProtocolMessage::GetPeersRequest { peer_id } => { - log(msg!(DEBUG, "Received GetPeersRequest from {peer_id}")); - let peers = self.peer_addresses(); - let resp = ProtocolMessage::GetPeersResponse { - peer_addresses: peers, - }; - let peer = &self.tcp_peers[&peer_id]; - peer.sender.send(resp).await.unwrap(); - } - ProtocolMessage::Block { block, .. } => { - log(msg!(DEBUG, "Received Block from {peer_id}")); - if let Err(_e) = self.chain.add_block(block.into()) { - log(msg!(DEBUG, "TODO: implement error handling in {}:{}", file!(), line!())); - } - } - ProtocolMessage::ChainData { data, .. } => { - log(msg!(DEBUG, "Received ChainData from {peer_id}")); - self.chain.apply(data).unwrap() - } - _ => { - log(msg!(DEBUG, "TODO: implement this message type")); - } - } - } - - pub async fn send_message_to_peer_addr(&self, addr: SocketAddr, msg: ProtocolMessage) { - if let Some((_, peer)) = self.tcp_peers.iter().find(|(_, v)| v.addr == addr) { - if let Err(e) = peer.sender.send(msg).await { - log(msg!(ERROR, "Error Sending message to peer: {e}")); - } - log(msg!(DEBUG, "Sent BootstrapRequest to seed")); - } else { - log(msg!( - ERROR, - "Error Sending message to peer: peer not in list" - )); - } - } - - pub async fn send_message_to_peer_id(&self, id: Uuid, msg: ProtocolMessage) { - if let Some(peer) = self.tcp_peers.get(&id) { - if let Err(e) = peer.sender.send(msg).await { - log(msg!(ERROR, "Error Sending message to peer: {e}")); - } - } - } - - async fn send_message_to_seed(&self, msg: ProtocolMessage) { - for seed in SEED_NODES.iter() { - if let Some(_) = self.tcp_peers.iter().find(|(_, v)| v.addr == *seed) { - self.send_message_to_peer_addr(*seed, msg).await; - return; + pub async fn process_message(&mut self, peer_id: uuid::Uuid, message: ProtocolMessage) { + match message { + ProtocolMessage::BootstrapRequest { .. } => { + log(msg!(DEBUG, "Received BootstrapRequest from {peer_id}")); + let peer = &self.tcp_peers[&peer_id]; + let resp = ProtocolMessage::BootstrapResponse { + blocks: { + if let Ok(blocks) = self.get_blocks() { + serde_json::to_string( + &blocks + .iter() + .map(|f| (**f).clone()) + .collect::>(), + ) + .map_err(|e| { + log(msg!( + ERROR, + "Failed to serde Chain for BootstrapResponse: {e}" + )); + e + }) + .ok() } else { - self.send_message_to_peer_addr(*seed, msg).await; - return; + None } - } - log(msg!(ERROR, "No Seed Nodes Avaliable")); - } - - async fn bootstrap(&mut self) -> Result<(), ValidationError> { - log(msg!(DEBUG, "Bootstrapping")); - - let message = ProtocolMessage::BootstrapRequest { - peer_id: self.id, - version: "".to_string(), + }, }; - self.send_message_to_seed(message).await; - - Ok(()) - } - - async fn broadcast_network_data(&self, data: ChainData) { - for (id, peer) in &self.tcp_peers { - let message = ProtocolMessage::ChainData { - peer_id: self.id, - data: data.clone(), - }; - peer.sender.send(message).await.unwrap(); - log(msg!(DEBUG, "Send Transaction message to {id}")); + peer.sender.send(resp).await.unwrap(); + log(msg!(DEBUG, "Send BootstrapResponse to {peer_id}")); + } + ProtocolMessage::BootstrapResponse { blocks } => { + log(msg!(DEBUG, "Received BootstrapResponse from seed")); + self.chain = core::Blockchain::build(blocks).unwrap(); + } + ProtocolMessage::Pong { peer_id } => { + log(msg!(DEBUG, "Received Pong from {peer_id}")); + } + ProtocolMessage::Ping { peer_id } => { + log(msg!(DEBUG, "Received Ping from {peer_id}")); + let resp = ProtocolMessage::Pong { + peer_id: self.id.clone(), + }; + let peer = &self.tcp_peers[&peer_id]; + peer.sender.send(resp).await.unwrap(); + } + ProtocolMessage::GetPeersRequest { peer_id } => { + log(msg!(DEBUG, "Received GetPeersRequest from {peer_id}")); + let peers = self.peer_addresses(); + let resp = ProtocolMessage::GetPeersResponse { + peer_addresses: peers, + }; + let peer = &self.tcp_peers[&peer_id]; + peer.sender.send(resp).await.unwrap(); + } + ProtocolMessage::Block { block, .. } => { + log(msg!(DEBUG, "Received Block from {peer_id}")); + if let Err(_e) = self.chain.add_block(block.into()) { + log(msg!( + DEBUG, + "TODO: implement error handling in {}:{}", + file!(), + line!() + )); } + } + ProtocolMessage::ChainData { data, .. } => { + log(msg!(DEBUG, "Received ChainData from {peer_id}")); + self.chain.apply(data).unwrap() + } + _ => { + log(msg!(DEBUG, "TODO: implement this message type")); + } } + } - async fn broadcast_block(&self, block: &core::Block) { - for (id, peer) in &self.tcp_peers { - let message = ProtocolMessage::Block { - peer_id: self.id, - height: block.head().height as u64, - block: block.clone(), - }; - peer.sender.send(message).await.unwrap(); - log(msg!(DEBUG, "Send Block message to {id}")); + pub async fn send_message_to_peer_addr(&self, addr: SocketAddr, msg: ProtocolMessage) { + if let Some((_, peer)) = self.tcp_peers.iter().find(|(_, v)| v.addr == addr) { + if let Err(e) = peer.sender.send(msg).await { + log(msg!(ERROR, "Error Sending message to peer: {e}")); + } + log(msg!(DEBUG, "Sent BootstrapRequest to seed")); + } else { + log(msg!( + ERROR, + "Error Sending message to peer: peer not in list" + )); + } + } + + pub async fn send_message_to_peer_id(&self, id: Uuid, msg: ProtocolMessage) { + if let Some(peer) = self.tcp_peers.get(&id) { + if let Err(e) = peer.sender.send(msg).await { + log(msg!(ERROR, "Error Sending message to peer: {e}")); + } + } + } + + async fn send_message_to_seed(&self, msg: ProtocolMessage) { + for seed in SEED_NODES.iter() { + if let Some(_) = self.tcp_peers.iter().find(|(_, v)| v.addr == *seed) { + self.send_message_to_peer_addr(*seed, msg).await; + return; + } else { + self.send_message_to_peer_addr(*seed, msg).await; + return; + } + } + log(msg!(ERROR, "No Seed Nodes Avaliable")); + } + + async fn bootstrap(&mut self) -> Result<(), ValidationError> { + log(msg!(DEBUG, "Bootstrapping")); + + let message = ProtocolMessage::BootstrapRequest { + peer_id: self.id, + version: "".to_string(), + }; + self.send_message_to_seed(message).await; + + Ok(()) + } + + async fn broadcast_network_data(&self, data: ChainData) { + for (id, peer) in &self.tcp_peers { + let message = ProtocolMessage::ChainData { + peer_id: self.id, + data: data.clone(), + }; + peer.sender.send(message).await.unwrap(); + log(msg!(DEBUG, "Send Transaction message to {id}")); + } + } + + async fn broadcast_block(&self, block: &core::Block) { + for (id, peer) in &self.tcp_peers { + let message = ProtocolMessage::Block { + peer_id: self.id, + height: block.head().height as u64, + block: block.clone(), + }; + peer.sender.send(message).await.unwrap(); + log(msg!(DEBUG, "Send Block message to {id}")); + } + } + + pub fn tx(&self) -> mpsc::Sender { + return self.tx.clone(); + } + + pub fn exec_tx(&self) -> mpsc::Sender { + return self.exec_tx.clone(); + } + + async fn network_data(&mut self, data: ChainData) { + match self.chain.apply(data) { + Ok(_) => log(msg!(DEBUG, "ChainData Applied")), + Err(e) => print_error_chain(&e.into()), + }; + } + + async fn connector_cmd(&self, cmd: ConnectorCommand) { + match &self.tcp_connector { + Some(t) => match t.send(cmd).await { + Ok(()) => {} + Err(e) => log(msg!(ERROR, "Failed to Send Command to connector: {}", e)), + }, + None => log(msg!(ERROR, "No Connector Availiable")), + } + } + + async fn start_connection_listner(&mut self, addr: SocketAddr) { + log(msg!(DEBUG, "Starting Connection Listener")); + let (con_tx, con_rx) = mpsc::channel::(100); + + self.tcp_connector = Some(con_tx); + + self.listner_handle = Some(tokio::spawn({ + let mut connector = Connector::new(self.id, addr, self.exec_tx(), con_rx); + log(msg!(DEBUG, "Connector Build")); + async move { connector.start().await } + })); + } + + async fn connect_to_seed(&mut self) { + self + .connector_cmd(ConnectorCommand::ConnectToTcpSeed(SEED_NODES[0])) + .await; + } + + pub async fn run(&mut self) { + if let Some(addr) = self.addr { + self.start_connection_listner(addr).await; + } else { + self + .start_connection_listner(SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), + 8080, + )) + .await; + }; + + publish_system_event(SystemEvent::NodeStarted); + + while let Some(command) = self.rx.recv().await { + match command { + NodeCommand::BootStrap => { + log(msg!(DEBUG, "Received NodeCommand::BootStrap")); + let _ = self.bootstrap().await; } - } - - pub fn tx(&self) -> mpsc::Sender { - return self.tx.clone(); - } - - pub fn exec_tx(&self) -> mpsc::Sender { - return self.exec_tx.clone(); - } - - async fn network_data(&mut self, data: ChainData) { - match self.chain.apply(data) { - Ok(_) => log(msg!(DEBUG, "ChainData Applied")), + NodeCommand::StartListner(addr) => { + self.start_connection_listner(addr).await; + } + NodeCommand::ConnectToSeeds => { + self.connect_to_seed().await; + } + NodeCommand::ConnectTcpPeer(addr) => { + log(msg!(DEBUG, "Received ConnectToPeer: {addr}")); + if let Ok(addr_sock) = addr.parse::() { + let mes = ConnectorCommand::ConnectToTcpPeer(addr_sock); + self.connector_cmd(mes).await; + } else { + log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}")); + } + } + NodeCommand::PingAddr(addr) => { + if let Ok(addr_sock) = addr.parse::() { + let mes = ProtocolMessage::Ping { peer_id: self.id }; + self.send_message_to_peer_addr(addr_sock, mes).await; + } else { + log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}")); + } + } + NodeCommand::PingId(id) => { + if let Ok(id) = id.parse::() { + let mes = ProtocolMessage::Ping { peer_id: self.id }; + self.send_message_to_peer_id(id, mes).await; + } else { + log(msg!(ERROR, "Failed to Parse to sock_addr: {id}")); + } + } + NodeCommand::AddPeer(peer) => { + self.add_tcp_peer(peer).await; + } + NodeCommand::RemovePeer { peer_id } => { + self.remove_tcp_peer(peer_id).await; + } + NodeCommand::ProcessMessage { peer_id, message } => { + self.process_message(peer_id, message).await; + } + NodeCommand::ProcessChainData(data) => { + self.network_data(data.clone()).await; + self.broadcast_network_data(data).await; + } + NodeCommand::CreateBlock => { + log(msg!(DEBUG, "Received CreateBlock Command")); + if let Ok(block) = self.chain.create_block() { + log(msg!( + INFO, + "Created Block with hash {}", + block.head().block_hash() + )); + self.broadcast_block(&block).await; + } + } + NodeCommand::DisplayBlockInteractive => { + let blocks = match self.chain.list_blocks() { + Ok(b) => b, + Err(e) => return print_error_chain(&e.into()), + }; + let wat_cmd = WatcherCommand::SetMode(WatcherMode::Select { + content: blocks.into(), + title: "Select Block to display".to_string(), + callback: Box::new(ExecutorCommand::Node(NodeCommand::DisplayBlockByKey("".to_string()))), + index: 0 + }); + publish_watcher_event(wat_cmd); + } + NodeCommand::DisplayBlockByKey(key) => self.chain.display_block_by_key(key), + NodeCommand::DisplayBlockByHeight(height) => self.chain.display_block_by_height(height), + NodeCommand::ListBlocks => { + log(msg!(DEBUG, "Received DebugListBlocks command")); + match self.chain.list_blocks() { + Ok(s) => log(s.join("\n")), Err(e) => print_error_chain(&e.into()), - }; - } - - async fn connector_cmd(&self, cmd: ConnectorCommand) { - match &self.tcp_connector { - Some(t) => match t.send(cmd).await { - Ok(()) => {} - Err(e) => log(msg!(ERROR, "Failed to Send Command to connector: {}", e)), - }, - None => log(msg!(ERROR, "No Connector Availiable")), + } } - } - - async fn start_connection_listner(&mut self, addr: SocketAddr) { - log(msg!(DEBUG, "Starting Connection Listener")); - let (con_tx, con_rx) = mpsc::channel::(100); - - self.tcp_connector = Some(con_tx); - - self.listner_handle = Some(tokio::spawn({ - let mut connector = Connector::new(self.id, addr, self.exec_tx(), con_rx); - log(msg!(DEBUG, "Connector Build")); - async move { connector.start().await } - })); - } - - async fn connect_to_seed(&mut self) { - self.connector_cmd(ConnectorCommand::ConnectToTcpSeed(SEED_NODES[0])) - .await; - } - - pub async fn run(&mut self) { - if let Some(addr) = self.addr { - self.start_connection_listner(addr).await; - } else { - self.start_connection_listner(SocketAddr::new( - std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), - 8080, - )) - .await; - }; - - publish_system_event(SystemEvent::NodeStarted); - - while let Some(command) = self.rx.recv().await { - match command { - NodeCommand::BootStrap => { - log(msg!(DEBUG, "Received NodeCommand::BootStrap")); - let _ = self.bootstrap().await; - } - NodeCommand::StartListner(addr) => { - self.start_connection_listner(addr).await; - } - NodeCommand::ConnectToSeeds => { - self.connect_to_seed().await; - } - NodeCommand::ConnectTcpPeer(addr) => { - log(msg!(DEBUG, "Received ConnectToPeer: {addr}")); - if let Ok(addr_sock) = addr.parse::() { - let mes = ConnectorCommand::ConnectToTcpPeer(addr_sock); - self.connector_cmd(mes).await; - } else { - log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}")); - } - } - NodeCommand::PingAddr(addr) => { - if let Ok(addr_sock) = addr.parse::() { - let mes = ProtocolMessage::Ping { peer_id: self.id }; - self.send_message_to_peer_addr(addr_sock, mes).await; - } else { - log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}")); - } - } - NodeCommand::PingId(id) => { - if let Ok(id) = id.parse::() { - let mes = ProtocolMessage::Ping { peer_id: self.id }; - self.send_message_to_peer_id(id, mes).await; - } else { - log(msg!(ERROR, "Failed to Parse to sock_addr: {id}")); - } - } - NodeCommand::AddPeer(peer) => { - self.add_tcp_peer(peer).await; - } - NodeCommand::RemovePeer { peer_id } => { - self.remove_tcp_peer(peer_id).await; - } - NodeCommand::ProcessMessage { peer_id, message } => { - self.process_message(peer_id, message).await; - } - NodeCommand::ProcessChainData(data) => { - self.network_data(data.clone()).await; - self.broadcast_network_data(data).await; - } - NodeCommand::CreateBlock => { - log(msg!(DEBUG, "Received CreateBlock Command")); - if let Ok(block) = self.chain.create_block() { - log(msg!(INFO, "Created Block with hash {}", block.head().block_hash())); - self.broadcast_block(&block).await; - } - } - NodeCommand::DisplayBlockByKey(key) => { - self.chain.display_block_by_key(key) - } - NodeCommand::DisplayBlockByHeight(height) => { - self.chain.display_block_by_height(height) - } - NodeCommand::ListBlocks => { - log(msg!(DEBUG, "Received DebugListBlocks command")); - match self.chain.list_blocks() { - Ok(s) => log(s), - Err(e) => print_error_chain(&e.into()), - } - } - NodeCommand::ListPeers => { - log(msg!(DEBUG, "Received DebugListPeers command")); - log(self.list_peers()); - } - NodeCommand::ShowId => { - log(msg!(DEBUG, "Received DebugListBlocks command")); - self.show_id().await; - } - NodeCommand::DumpBlocks(s) => { - self.chain.dump_blocks(s); - } - NodeCommand::Exit => { - log(msg!(DEBUG, "Node Exit")); - break; - } - } + NodeCommand::ListPeers => { + log(msg!(DEBUG, "Received DebugListPeers command")); + log(self.list_peers()); } + NodeCommand::ShowId => { + log(msg!(DEBUG, "Received DebugListBlocks command")); + self.show_id().await; + } + NodeCommand::DumpBlocks(s) => { + self.chain.dump_blocks(s); + } + NodeCommand::Exit => { + log(msg!(DEBUG, "Node Exit")); + break; + } + } } + } } diff --git a/node/src/protocol/connection.rs b/node/src/protocol/connection.rs index 782202a..3fe02a7 100644 --- a/node/src/protocol/connection.rs +++ b/node/src/protocol/connection.rs @@ -1,9 +1,9 @@ +use crate::executor::ExecutorCommand; +use crate::log; use crate::node::node; use crate::protocol::ProtocolMessage; -use crate::watcher::ExecutorCommand; use tokio::net; use tokio::sync::mpsc; -use crate::log; use super::Connector; @@ -12,78 +12,78 @@ use vlogger::*; #[allow(dead_code)] #[derive(Debug)] pub struct Connection { + node_id: uuid::Uuid, + peer_id: uuid::Uuid, + stream: net::TcpStream, + exec_tx: mpsc::Sender, + rx: mpsc::Receiver, +} + +impl Connection { + pub fn new( node_id: uuid::Uuid, peer_id: uuid::Uuid, stream: net::TcpStream, exec_tx: mpsc::Sender, rx: mpsc::Receiver, -} - -impl Connection { - pub fn new( - node_id: uuid::Uuid, - peer_id: uuid::Uuid, - stream: net::TcpStream, - exec_tx: mpsc::Sender, - rx: mpsc::Receiver, - ) -> Self { - Self { - node_id, - peer_id, - stream, - rx, - exec_tx, - } + ) -> Self { + Self { + node_id, + peer_id, + stream, + rx, + exec_tx, } + } - pub async fn start(mut self) { - tokio::spawn(async move { - log(msg!(DEBUG, "Started Message Handler for {}", self.peer_id)); + pub async fn start(mut self) { + tokio::spawn(async move { + log(msg!(DEBUG, "Started Message Handler for {}", self.peer_id)); - loop { - tokio::select! { - response_result = self.rx.recv() => { - match response_result { - Some(response) => { - if let Err(e) = Connector::send_message(&mut self.stream, &response).await { - log(msg!(ERROR, "Failed to send response to {}: {}", self.peer_id, e)); - break; - } - }, - None => { - log(msg!(DEBUG, "Response channel closed for {}", self.peer_id)); - break; - } - } - } - - message_result = Connector::receive_message(&mut self.stream) => { - match message_result { - Ok(message) => { - log(msg!(DEBUG, "Received Message from {}", self.peer_id)); - - let command = ExecutorCommand::Node(node::NodeCommand::ProcessMessage { - peer_id: self.peer_id, - message: message.clone() - }); - - if self.exec_tx.send(command).await.is_err() { - log(msg!(ERROR, "Failed to send command to main thread from {}", self.peer_id)); - break; - } - }, - Err(e) => { - log(msg!(WARNING, "Connection to {} closed: {}", self.peer_id, e)); - let cmd = ExecutorCommand::Node(node::NodeCommand::RemovePeer { - peer_id: self.peer_id - }); - self.exec_tx.send(cmd).await.unwrap(); - break; - } - } - } + loop { + tokio::select! { + response_result = self.rx.recv() => { + match response_result { + Some(response) => { + if let Err(e) = Connector::send_message(&mut self.stream, &response).await { + log(msg!(ERROR, "Failed to send response to {}: {}", self.peer_id, e)); + break; } + }, + None => { + log(msg!(DEBUG, "Response channel closed for {}", self.peer_id)); + break; + } } - }); - } + } + + message_result = Connector::receive_message(&mut self.stream) => { + match message_result { + Ok(message) => { + log(msg!(DEBUG, "Received Message from {}", self.peer_id)); + + let command = ExecutorCommand::Node(node::NodeCommand::ProcessMessage { + peer_id: self.peer_id, + message: message.clone() + }); + + if self.exec_tx.send(command).await.is_err() { + log(msg!(ERROR, "Failed to send command to main thread from {}", self.peer_id)); + break; + } + }, + Err(e) => { + log(msg!(WARNING, "Connection to {} closed: {}", self.peer_id, e)); + let cmd = ExecutorCommand::Node(node::NodeCommand::RemovePeer { + peer_id: self.peer_id + }); + self.exec_tx.send(cmd).await.unwrap(); + break; + } + } + } + } + } + }); + } } diff --git a/node/src/protocol/connector.rs b/node/src/protocol/connector.rs index d874996..1fdeb24 100644 --- a/node/src/protocol/connector.rs +++ b/node/src/protocol/connector.rs @@ -10,304 +10,284 @@ use crate::log; use super::Connection; use crate::bus::*; -use crate::node::{error, NetworkError}; +use crate::executor::ExecutorCommand; use crate::node::node; +use crate::node::{NetworkError, error}; use crate::protocol::ProtocolMessage; -use crate::watcher::ExecutorCommand; use thiserror::*; pub enum ConnectorCommand { - ConnectToTcpPeer(SocketAddr), - ConnectToTcpSeed(SocketAddr), + ConnectToTcpPeer(SocketAddr), + ConnectToTcpSeed(SocketAddr), } pub struct Connector { - node_id: uuid::Uuid, - addr: SocketAddr, - exec_tx: mpsc::Sender, - rx: mpsc::Receiver, + node_id: uuid::Uuid, + addr: SocketAddr, + exec_tx: mpsc::Sender, + rx: mpsc::Receiver, } #[derive(Error, Debug)] pub enum ConnectorError { - #[error("Connection failed")] - ConnectionError(#[from] anyhow::Error), + #[error("Connection failed")] + ConnectionError(#[from] anyhow::Error), } const MAX_LISTNER_TRIES: usize = 5; impl Connector { - pub fn new( - node_id: uuid::Uuid, - addr: SocketAddr, - exec_tx: mpsc::Sender, - rx: mpsc::Receiver, - ) -> Self { - Self { - node_id, - addr, - exec_tx, - rx, - } + pub fn new( + node_id: uuid::Uuid, + addr: SocketAddr, + exec_tx: mpsc::Sender, + rx: mpsc::Receiver, + ) -> Self { + Self { + node_id, + addr, + exec_tx, + rx, } + } - pub async fn start(&mut self) { - let mut listner: Option = None; - let mut listner_err = None; - for _ in 0..MAX_LISTNER_TRIES { - match tokio::net::TcpListener::bind(self.addr).await { - Ok(l) => { - log(msg!(DEBUG, "Listening on address: {}", self.addr)); - listner = Some(l); - break; - } - Err(e) => { - self.addr.set_port(self.addr.port() + 1); - listner_err = Some(e); - } - }; + pub async fn start(&mut self) { + let mut listner: Option = None; + let mut listner_err = None; + for _ in 0..MAX_LISTNER_TRIES { + match tokio::net::TcpListener::bind(self.addr).await { + Ok(l) => { + log(msg!(DEBUG, "Listening on address: {}", self.addr)); + listner = Some(l); + break; } - if let Some(listener) = listner { - loop { - tokio::select! { - cmd_result = self.rx.recv() => { - match cmd_result { - Some(cmd) => { - self.execute_cmd(cmd).await; - } - None => { - log(msg!(DEBUG, "Command channel closed")); - break; - } - } - } - accept_result = listener.accept() => { - match accept_result { - Ok((stream, addr)) => { - log(msg!(DEBUG, "Accepted connection from {}", addr)); - self.establish_connection_inbound(stream, addr).await; - } - Err(e) => { - log(msg!(ERROR, "Failed to accept connection: {}", e)); - } - } - } - } + Err(e) => { + self.addr.set_port(self.addr.port() + 1); + listner_err = Some(e); + } + }; + } + if let Some(listener) = listner { + loop { + tokio::select! { + cmd_result = self.rx.recv() => { + match cmd_result { + Some(cmd) => { + self.execute_cmd(cmd).await; + } + None => { + log(msg!(DEBUG, "Command channel closed")); + break; + } } - } else { - log(msg!( - FATAL, - "Failed to start TCP Listener: {}", - listner_err.unwrap() - )); - } - } - - async fn execute_cmd(&mut self, cmd: ConnectorCommand) { - match cmd { - ConnectorCommand::ConnectToTcpPeer(addr) => self.connect_to_peer(addr).await, - ConnectorCommand::ConnectToTcpSeed(addr) => { - self.connect_to_seed(addr).await; + } + accept_result = listener.accept() => { + match accept_result { + Ok((stream, addr)) => { + log(msg!(DEBUG, "Accepted connection from {}", addr)); + self.establish_connection_inbound(stream, addr).await; + } + Err(e) => { + log(msg!(ERROR, "Failed to accept connection: {}", e)); + } } + } } + } + } else { + log(msg!( + FATAL, + "Failed to start TCP Listener: {}", + listner_err.unwrap() + )); } + } - pub async fn connect_to_seed(&self, addr: SocketAddr) { - match net::TcpStream::connect(addr) - .await - .with_context(|| format!("Connecting to {}", addr)) - { - Ok(stream) => self.establish_connection_to_seed(stream, addr).await, - Err(e) => { - // let err = ConnectorError::ConnectionError(e.into()); - print_error_chain(&e.into()); - } - } + async fn execute_cmd(&mut self, cmd: ConnectorCommand) { + match cmd { + ConnectorCommand::ConnectToTcpPeer(addr) => self.connect_to_peer(addr).await, + ConnectorCommand::ConnectToTcpSeed(addr) => { + self.connect_to_seed(addr).await; + } } + } - pub async fn connect_to_peer(&self, addr: SocketAddr) { - match net::TcpStream::connect(addr).await { - Ok(stream) => self.establish_connection_outbound(stream, addr).await, - Err(e) => { - let err = ConnectorError::ConnectionError(e.into()); - print_error_chain(&err.into()); - } - } + pub async fn connect_to_seed(&self, addr: SocketAddr) { + match net::TcpStream::connect(addr) + .await + .with_context(|| format!("Connecting to {}", addr)) + { + Ok(stream) => self.establish_connection_to_seed(stream, addr).await, + Err(e) => { + // let err = ConnectorError::ConnectionError(e.into()); + print_error_chain(&e.into()); + } } + } - pub async fn establish_connection_to_seed( - &self, - mut stream: tokio::net::TcpStream, - addr: SocketAddr, - ) { - let handshake = ProtocolMessage::Handshake { - peer_id: self.node_id, - version: "".to_string(), - }; - match Connector::send_message(&mut stream, &handshake).await { - Ok(()) => { - if let Ok(mes) = Connector::receive_message(&mut stream).await { - let (ch_tx, ch_rx) = mpsc::channel::(100); - let peer = match mes { - ProtocolMessage::HandshakeAck { peer_id, .. } => { - node::TcpPeer::new(peer_id, addr, ch_tx) - } - _ => { - log(msg!( - ERROR, - "Invalid Message On Connetion Establishment: {mes}" - )); - return; - } - }; - let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone())); - publish_network_event(NetworkEvent::SeedConnected(addr.to_string())); - let _ = self.exec_tx.send(cmd).await; - Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx) - .start() - .await; - } - } - Err(e) => print_error_chain(&e.into()), - } + pub async fn connect_to_peer(&self, addr: SocketAddr) { + match net::TcpStream::connect(addr).await { + Ok(stream) => self.establish_connection_outbound(stream, addr).await, + Err(e) => { + let err = ConnectorError::ConnectionError(e.into()); + print_error_chain(&err.into()); + } } + } - async fn establish_connection_outbound( - &self, - mut stream: tokio::net::TcpStream, - addr: SocketAddr, - ) { - let handshake = ProtocolMessage::Handshake { - peer_id: self.node_id, - version: "".to_string(), - }; - match Connector::send_message(&mut stream, &handshake).await { - Ok(()) => { - if let Ok(mes) = Connector::receive_message(&mut stream).await { - let (ch_tx, ch_rx) = mpsc::channel::(100); - let peer = match mes { - ProtocolMessage::HandshakeAck { peer_id, .. } => { - node::TcpPeer::new(peer_id, addr, ch_tx) - } - _ => { - log(msg!( - ERROR, - "Invalid Message On Connetion Establishment: {mes}" - )); - return; - } - }; - let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone())); - let _ = self.exec_tx.send(cmd).await; - Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx) - .start() - .await; - } - } - Err(e) => print_error_chain(&e.into()), - } - } - - async fn establish_connection_inbound( - &self, - mut stream: tokio::net::TcpStream, - addr: SocketAddr, - ) { + pub async fn establish_connection_to_seed( + &self, + mut stream: tokio::net::TcpStream, + addr: SocketAddr, + ) { + let handshake = ProtocolMessage::Handshake { + peer_id: self.node_id, + version: "".to_string(), + }; + match Connector::send_message(&mut stream, &handshake).await { + Ok(()) => { if let Ok(mes) = Connector::receive_message(&mut stream).await { - let (ch_tx, ch_rx) = mpsc::channel::(100); - let peer = match mes { - ProtocolMessage::Handshake { peer_id, .. } => { - let ack = ProtocolMessage::HandshakeAck { - peer_id: self.node_id, - version: "".to_string(), - }; - match Connector::send_message(&mut stream, &ack).await { - Ok(()) => node::TcpPeer::new(peer_id, addr, ch_tx), - Err(e) => return print_error_chain(&e.into()), - } - } - _ => { - log(msg!( - ERROR, - "Invalid Message On Connetion Establishment: {mes}" - )); - return; - } - }; - let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone())); - let _ = self.exec_tx.send(cmd).await; - Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx) - .start() - .await; + let (ch_tx, ch_rx) = mpsc::channel::(100); + let peer = match mes { + ProtocolMessage::HandshakeAck { peer_id, .. } => { + node::TcpPeer::new(peer_id, addr, ch_tx) + } + _ => { + log(msg!( + ERROR, + "Invalid Message On Connetion Establishment: {mes}" + )); + return; + } + }; + let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone())); + publish_network_event(NetworkEvent::SeedConnected(addr.to_string())); + let _ = self.exec_tx.send(cmd).await; + Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx) + .start() + .await; } + } + Err(e) => print_error_chain(&e.into()), } + } - pub async fn send_message( - stream: &mut net::TcpStream, - message: &ProtocolMessage, - ) -> Result<(), NetworkError> { - let json = serde_json::to_string(message) - .map_err(|_e| { - NetworkError::TODO - })?; - let data = json.as_bytes(); - - let len = data.len() as u32; - stream - .write_all(&len.to_be_bytes()) - .await - .map_err(|_e| { - NetworkError::TODO - })?; - - stream - .write_all(data) - .await - .map_err(|_e| { - NetworkError::TODO - })?; - stream.flush().await - .map_err(|_e| { - NetworkError::TODO - })?; - Ok(()) - } - - pub async fn receive_message( - stream: &mut tokio::net::TcpStream, - ) -> Result { - let mut len_bytes = [0u8; 4]; - stream - .read_exact(&mut len_bytes) - .await - .map_err(|_e| { - NetworkError::TODO - })?; - - let len = u32::from_be_bytes(len_bytes) as usize; - - if len >= super::message::MAX_MESSAGE_SIZE { - return Err(NetworkError::TODO); + async fn establish_connection_outbound( + &self, + mut stream: tokio::net::TcpStream, + addr: SocketAddr, + ) { + let handshake = ProtocolMessage::Handshake { + peer_id: self.node_id, + version: "".to_string(), + }; + match Connector::send_message(&mut stream, &handshake).await { + Ok(()) => { + if let Ok(mes) = Connector::receive_message(&mut stream).await { + let (ch_tx, ch_rx) = mpsc::channel::(100); + let peer = match mes { + ProtocolMessage::HandshakeAck { peer_id, .. } => { + node::TcpPeer::new(peer_id, addr, ch_tx) + } + _ => { + log(msg!( + ERROR, + "Invalid Message On Connetion Establishment: {mes}" + )); + return; + } + }; + let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone())); + let _ = self.exec_tx.send(cmd).await; + Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx) + .start() + .await; } - - let mut data = vec![0u8; len]; - stream - .read_exact(&mut data) - .await - .map_err(|_e| { - NetworkError::TODO - })?; - - let json = String::from_utf8(data) - .map_err(|_e| { - NetworkError::TODO - })?; - - let message: ProtocolMessage = serde_json::from_str(&json) - .map_err(|_e| { - NetworkError::TODO - })?; - - Ok(message) + } + Err(e) => print_error_chain(&e.into()), } + } + + async fn establish_connection_inbound( + &self, + mut stream: tokio::net::TcpStream, + addr: SocketAddr, + ) { + if let Ok(mes) = Connector::receive_message(&mut stream).await { + let (ch_tx, ch_rx) = mpsc::channel::(100); + let peer = match mes { + ProtocolMessage::Handshake { peer_id, .. } => { + let ack = ProtocolMessage::HandshakeAck { + peer_id: self.node_id, + version: "".to_string(), + }; + match Connector::send_message(&mut stream, &ack).await { + Ok(()) => node::TcpPeer::new(peer_id, addr, ch_tx), + Err(e) => return print_error_chain(&e.into()), + } + } + _ => { + log(msg!( + ERROR, + "Invalid Message On Connetion Establishment: {mes}" + )); + return; + } + }; + let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone())); + let _ = self.exec_tx.send(cmd).await; + Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx) + .start() + .await; + } + } + + pub async fn send_message( + stream: &mut net::TcpStream, + message: &ProtocolMessage, + ) -> Result<(), NetworkError> { + let json = serde_json::to_string(message).map_err(|_e| NetworkError::TODO)?; + let data = json.as_bytes(); + + let len = data.len() as u32; + stream + .write_all(&len.to_be_bytes()) + .await + .map_err(|_e| NetworkError::TODO)?; + + stream + .write_all(data) + .await + .map_err(|_e| NetworkError::TODO)?; + stream.flush().await.map_err(|_e| NetworkError::TODO)?; + Ok(()) + } + + pub async fn receive_message( + stream: &mut tokio::net::TcpStream, + ) -> Result { + let mut len_bytes = [0u8; 4]; + stream + .read_exact(&mut len_bytes) + .await + .map_err(|_e| NetworkError::TODO)?; + + let len = u32::from_be_bytes(len_bytes) as usize; + + if len >= super::message::MAX_MESSAGE_SIZE { + return Err(NetworkError::TODO); + } + + let mut data = vec![0u8; len]; + stream + .read_exact(&mut data) + .await + .map_err(|_e| NetworkError::TODO)?; + + let json = String::from_utf8(data).map_err(|_e| NetworkError::TODO)?; + + let message: ProtocolMessage = serde_json::from_str(&json).map_err(|_e| NetworkError::TODO)?; + + Ok(message) + } } diff --git a/node/src/protocol/message.rs b/node/src/protocol/message.rs index c9ed8c0..4ed8bb1 100644 --- a/node/src/protocol/message.rs +++ b/node/src/protocol/message.rs @@ -6,87 +6,91 @@ pub const MAX_MESSAGE_SIZE: usize = 1_000_000; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub enum ProtocolMessage { - BootstrapRequest { - peer_id: uuid::Uuid, - version: String, - }, - BootstrapResponse { - blocks: Option, - }, - GetPeersRequest { - peer_id: uuid::Uuid, - }, - GetPeersResponse { - peer_addresses: Vec, - }, - Handshake { - peer_id: uuid::Uuid, - version: String, - }, - HandshakeAck { - peer_id: uuid::Uuid, - version: String, - }, - Block { - peer_id: uuid::Uuid, - height: u64, - block: core::Block, - }, - ChainData { - peer_id: uuid::Uuid, - data: ChainData, - }, - Ping { - peer_id: uuid::Uuid, - }, - Pong { - peer_id: uuid::Uuid, - }, - Disconnect { - peer_id: uuid::Uuid, - }, + BootstrapRequest { + peer_id: uuid::Uuid, + version: String, + }, + BootstrapResponse { + blocks: Option, + }, + GetPeersRequest { + peer_id: uuid::Uuid, + }, + GetPeersResponse { + peer_addresses: Vec, + }, + Handshake { + peer_id: uuid::Uuid, + version: String, + }, + HandshakeAck { + peer_id: uuid::Uuid, + version: String, + }, + Block { + peer_id: uuid::Uuid, + height: u64, + block: core::Block, + }, + ChainData { + peer_id: uuid::Uuid, + data: ChainData, + }, + Ping { + peer_id: uuid::Uuid, + }, + Pong { + peer_id: uuid::Uuid, + }, + Disconnect { + peer_id: uuid::Uuid, + }, } impl fmt::Display for ProtocolMessage { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ProtocolMessage::BootstrapRequest { peer_id, version } => { - write!(f, "BootstrapRequest from {} (v{})", peer_id, version) - } - ProtocolMessage::BootstrapResponse { blocks } => { - write!(f, "BootstrapResponse with {:?} blocks", blocks.clone().unwrap_or_default().len()) - } - ProtocolMessage::GetPeersRequest { peer_id } => { - write!(f, "GetPeersRequest from {}", peer_id) - } - ProtocolMessage::GetPeersResponse { peer_addresses } => { - write!(f, "GetPeersResponse with {} peers", peer_addresses.len()) - } - ProtocolMessage::Handshake { peer_id, version } => { - write!(f, "Handshake from {} (v{})", peer_id, version) - } - ProtocolMessage::HandshakeAck { peer_id, version } => { - write!(f, "HandshakeAck from {} (v{})", peer_id, version) - } - ProtocolMessage::Block { - peer_id, - height, - block: _, - } => { - write!(f, "Block #{} from {}", height, peer_id) - } - ProtocolMessage::ChainData { peer_id, data: _ } => { - write!(f, "ChainData from {}", peer_id) - } - ProtocolMessage::Ping { peer_id } => { - write!(f, "Ping from {}", peer_id) - } - ProtocolMessage::Pong { peer_id } => { - write!(f, "Pong from {}", peer_id) - } - ProtocolMessage::Disconnect { peer_id } => { - write!(f, "Disconnect from {}", peer_id) - } - } + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ProtocolMessage::BootstrapRequest { peer_id, version } => { + write!(f, "BootstrapRequest from {} (v{})", peer_id, version) + } + ProtocolMessage::BootstrapResponse { blocks } => { + write!( + f, + "BootstrapResponse with {:?} blocks", + blocks.clone().unwrap_or_default().len() + ) + } + ProtocolMessage::GetPeersRequest { peer_id } => { + write!(f, "GetPeersRequest from {}", peer_id) + } + ProtocolMessage::GetPeersResponse { peer_addresses } => { + write!(f, "GetPeersResponse with {} peers", peer_addresses.len()) + } + ProtocolMessage::Handshake { peer_id, version } => { + write!(f, "Handshake from {} (v{})", peer_id, version) + } + ProtocolMessage::HandshakeAck { peer_id, version } => { + write!(f, "HandshakeAck from {} (v{})", peer_id, version) + } + ProtocolMessage::Block { + peer_id, + height, + block: _, + } => { + write!(f, "Block #{} from {}", height, peer_id) + } + ProtocolMessage::ChainData { peer_id, data: _ } => { + write!(f, "ChainData from {}", peer_id) + } + ProtocolMessage::Ping { peer_id } => { + write!(f, "Ping from {}", peer_id) + } + ProtocolMessage::Pong { peer_id } => { + write!(f, "Pong from {}", peer_id) + } + ProtocolMessage::Disconnect { peer_id } => { + write!(f, "Disconnect from {}", peer_id) + } } + } } diff --git a/node/src/renderer/layout.rs b/node/src/renderer/layout.rs new file mode 100644 index 0000000..0c2807d --- /dev/null +++ b/node/src/renderer/layout.rs @@ -0,0 +1,93 @@ +use ratatui::layout::{ Rect, Flex }; +use ratatui::prelude::*; + +use super::{Pane, RenderBuffer, RenderTarget}; + +#[derive(Debug, Clone, clap::ValueEnum)] +pub enum RenderLayoutKind { + #[value(name = "horizontal", aliases = ["h"])] + CliHorizontal, + #[value(name = "vertical", aliases = ["v"])] + CliVertical, +} + +const CLI_INPUT_PREFIX: &str = "> "; + +#[derive(Debug)] +pub struct RenderLayout { + layout: Layout, + pub panes: Vec, +} + +impl Widget for &mut RenderLayout { + fn render(self, area: Rect, buffer: &mut Buffer) { + let rects = self.rects(area); + + for (p, r) in self.panes.iter_mut().zip(rects.iter()) { + p.render(*r, buffer); + } + } +} + +pub fn center(area: Rect, horizontal: Constraint, vertical: Constraint) -> Rect { + let [area] = Layout::horizontal([horizontal]) + .flex(Flex::Center) + .areas(area); + let [area] = Layout::vertical([vertical]).flex(Flex::Center).areas(area); + area +} + +impl RenderLayout { + pub fn rects(&self, area: Rect) -> std::rc::Rc<[Rect]> { + self.layout.split(area) + } + + pub fn generate(kind: RenderLayoutKind) -> RenderLayout { + match kind { + RenderLayoutKind::CliVertical => RenderLayout { + layout: Layout::default() + .constraints([Constraint::Percentage(70), Constraint::Percentage(30)]), + panes: vec![ + Pane::new( + Some(" Input Pane ".to_string()), + RenderTarget::CliInput, + RenderBuffer::List { + list: vec![String::new()], + index: 0, + prefix: CLI_INPUT_PREFIX, + }, + true, + ), + Pane::new( + Some(" Output Pane ".to_string()), + RenderTarget::CliOutput, + RenderBuffer::String(String::new()), + false, + ), + ], + }, + RenderLayoutKind::CliHorizontal => RenderLayout { + layout: Layout::default() + .constraints([Constraint::Percentage(70), Constraint::Percentage(30)]), + panes: vec![ + Pane::new( + Some(" Output Pane ".to_string()), + RenderTarget::CliOutput, + RenderBuffer::String(String::new()), + false, + ), + Pane::new( + Some(" Input Pane ".to_string()), + RenderTarget::CliInput, + RenderBuffer::List { + list: vec![String::new()], + index: 0, + prefix: CLI_INPUT_PREFIX, + }, + true, + ), + ], + }, + } + } +} diff --git a/node/src/renderer/pane.rs b/node/src/renderer/pane.rs new file mode 100644 index 0000000..9054c5a --- /dev/null +++ b/node/src/renderer/pane.rs @@ -0,0 +1,172 @@ +use std::sync::Arc; + +use ratatui::prelude::*; +use ratatui::widgets::{Clear, Wrap}; +use ratatui::{ + buffer::Buffer, + layout::Rect, + symbols::border, + widgets::{Block, List, Paragraph, Widget}, +}; +use vlogger::{msg, DEBUG}; + +use crate::log; + +use super::center; + +#[derive(Clone, Debug)] +pub enum RenderBuffer { + List { + list: Vec, + index: usize, + prefix: &'static str, + }, + String(String), + Select(Arc>, usize), +} + +#[derive(Debug, PartialEq, Clone)] +pub enum RenderTarget { + All, + + CliInput, + + CliOutput, + + PopUp +} + +#[derive(Debug)] +pub struct Pane { + pub title: Option, + pub target: RenderTarget, + pub buffer: RenderBuffer, + pub focused: bool, + pub scroll: i16, + pub max_scroll: i16, +} + +impl Pane { + pub fn new( + title: Option, + target: RenderTarget, + buffer: RenderBuffer, + focused: bool, + ) -> Self { + Self { + title, + target, + buffer, + focused, + scroll: 0, + max_scroll: 0, + } + } +} + +impl Widget for &mut Pane { + fn render(self, area: Rect, buf: &mut Buffer) { + let block = Block::bordered() + .title({ + if let Some(t) = &self.title { + t.clone() + } else { + Default::default() + } + }) + .border_set(border::PLAIN) + .border_style({ + if self.focused { + Style::new().green() + } else { + Style::new().white() + } + }); + let inner_area = block.inner(area); + let content_width = inner_area.width as usize; + let content_height = inner_area.height as usize; + match &self.buffer { + RenderBuffer::String(s) => { + let wrapped_lines = s + .lines() + .map(|line| { + if line.is_empty() { + 1 + } else { + (line.len() + content_width - 1) / { content_width + (content_width == 0) as usize } + } + }) + .sum::(); + + self.max_scroll = if wrapped_lines > content_height { + (wrapped_lines - content_height) as i16 + } else { + 0 + }; + + let scroll_offset = self.max_scroll.saturating_sub(self.scroll) as u16; + + Paragraph::new(s.clone()) + .wrap(Wrap::default()) + .left_aligned() + .block(block) + .scroll((scroll_offset as u16, 0)) + .render(area, buf); + } + RenderBuffer::Select(list, idx) => { + let rect = center(area, Constraint::Percentage(60), Constraint::Percentage(60)); + Clear.render(rect, buf); + self.max_scroll = if list.len() > content_height { + (list.len() - content_height) as i16 + } else { + 0 + }; + let scroll_offset = self.max_scroll.saturating_sub(self.scroll) as u16; + log(msg!(DEBUG, "idx {idx}")); + let list_w = List::new( + list + .iter() + .skip(scroll_offset as usize) + .take(content_height) + .enumerate() + .map(|(i, s)| { + Line::from(format!( + "{}{}", + "", + textwrap::fill(s, content_width.saturating_sub(2)) + )).style(if i + scroll_offset as usize == *idx { + Style::new().fg(Color::Blue).bg(Color::Green) + } else { + Style::default() + }) + }), + ) + .block(block); + Widget::render(list_w, rect, buf); + } + RenderBuffer::List { list, prefix, .. } => { + self.max_scroll = if list.len() > content_height { + (list.len() - content_height) as i16 + } else { + 0 + }; + let scroll_offset = self.max_scroll.saturating_sub(self.scroll) as u16; + let list_w = List::new( + list + .iter() + .skip(scroll_offset as usize) + .take(content_height) + .map(|s| { + format!( + "{}{}", + prefix, + textwrap::fill(s, content_width.saturating_sub(2)) + ) + }), + ) + .block(block); + Widget::render(list_w, area, buf); + } + } + } +} diff --git a/node/src/renderer/renderer.rs b/node/src/renderer/renderer.rs new file mode 100644 index 0000000..ba2ca56 --- /dev/null +++ b/node/src/renderer/renderer.rs @@ -0,0 +1,307 @@ +use std::sync::Arc; + +use crossterm::event::KeyCode; +use ratatui::{Frame, buffer::Buffer, layout::Rect, widgets::Widget}; + +use vlogger::*; +use tokio::time::{Duration, timeout}; + +use crate::log; + +use super::*; + +#[derive(Debug, Clone)] +pub enum InputMode { + Input, + PopUp(Arc>, String, usize), +} + +#[derive(Debug)] +pub struct Renderer { + buffer: String, + exit: bool, + layout: RenderLayout, + mode: InputMode, +} + +#[derive(Clone, Debug)] +pub enum RenderCommand { + RenderStringToPaneId { + str: String, + pane: RenderTarget, + }, + RenderStringToPaneFocused { + str: String, + }, + RenderKeyInput(KeyCode), + ListMove { + pane: RenderTarget, + index: usize, + }, + ChangeLayout(RenderLayoutKind), + ClearPane, + + /// Mouse Events + MouseClickLeft(u16, u16), + MouseScrollUp, + MouseScrollDown, + + SetMode(InputMode), + Exit, +} + +#[allow(dead_code)] +impl Renderer { + pub fn new(layout: RenderLayoutKind) -> Self { + Self { + buffer: String::new(), + exit: false, + layout: RenderLayout::generate(layout), + mode: InputMode::Input, + } + } + + fn log(&mut self, msg: String) { + self.buffer.push_str(&msg) + } + + pub fn draw(&mut self, frame: &mut Frame) { + frame.render_widget(self, frame.area()); + } + + fn exit(&mut self) { + log!(DEBUG, "Renderer Exit"); + self.exit = true; + } + + fn buffer_extend>(&mut self, input: S) { + self.buffer.push_str(input.as_ref()); + } + + fn input_pane(&mut self) -> Option<&mut Pane> { + self + .layout + .panes + .iter_mut() + .find(|p| p.target == RenderTarget::CliInput) + } + + fn get_pane(&mut self, pane: RenderTarget) -> Option<&mut Pane> { + self.layout.panes.iter_mut().find(|p| p.target == pane) + } + + fn focused(&mut self) -> Option<&mut Pane> { + self.layout.panes.iter_mut().find(|p| p.focused == true) + } + + pub fn rects(&self, area: Rect) -> std::rc::Rc<[Rect]> { + self.layout.rects(area) + } + + pub fn handle_mouse_click_left(&mut self, x: u16, y: u16, rects: std::rc::Rc<[Rect]>) { + for (i, r) in rects.iter().enumerate() { + if r.contains(ratatui::layout::Position { x, y }) { + self.layout.panes[i].focused = true; + } else { + self.layout.panes[i].focused = false; + } + } + } + + pub fn handle_scroll_up(&mut self) { + if let Some(p) = self.focused() { + if p.scroll < p.max_scroll { + p.scroll += 1; + } + } + } + + pub fn handle_scroll_down(&mut self) { + if let Some(p) = self.focused() { + if p.scroll > i16::MIN { + p.scroll -= 1; + } + } + } + + pub fn handle_char_input(&mut self, c: char) { + if let Some(p) = self.input_pane() { + if let RenderBuffer::List { list, index, .. } = &mut p.buffer { + list[*index].push(c); + } + } + } + + pub fn handle_backspace(&mut self) { + if let Some(p) = self.input_pane() { + if let RenderBuffer::List { list, index, .. } = &mut p.buffer { + list[*index].pop(); + } + } + } + + pub fn handle_enter(&mut self) { + if let Some(p) = self.input_pane() { + if let RenderBuffer::List { list, index, .. } = &mut p.buffer { + list.push(String::new()); + *index += 1; + } + } + } + + pub fn handle_arrow_key(&mut self, key: KeyCode) { + match &mut self.mode { + InputMode::Input => {} + InputMode::PopUp(content, .., idx) => { + log(msg!(DEBUG, "Received keycode: {key}")); + log(msg!(DEBUG, "idx before: {idx}")); + match key { + KeyCode::Up => { *idx = idx.saturating_sub(1) } + KeyCode::Down => { + if *idx < content.len().saturating_sub(1) { + *idx += 1; + } + } + _ => {} + } + log(msg!(DEBUG, "idx after: {idx}")) + } + } + if let Some(pane) = self.focused() { + match &pane.target { + RenderTarget::CliInput => {} + RenderTarget::CliOutput => {} + _ => {} + } + } + } + + pub fn list_move(&mut self, pane: RenderTarget, index: usize) { + if let Some(p) = self.get_pane(pane) { + if let RenderBuffer::List { + list, index: idx, .. + } = &mut p.buffer + { + if index > 0 && index < list.len() { + list[*idx] = list[index].clone(); + } + } + } + } + + pub fn render_string_to_focused(&mut self, str: String) { + if let Some(p) = self.focused() { + match &mut p.buffer { + RenderBuffer::List { list, index, .. } => { + list.push(str); + *index += 1; + } + RenderBuffer::String(s) => s.push_str(&str), + _ => {} + } + } + } + + pub fn render_string_to_id(&mut self, str: String, pane: RenderTarget) { + if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) { + match &mut p.buffer { + RenderBuffer::List { list, index, .. } => { + list.push(str); + *index += 1; + } + RenderBuffer::String(s) => s.push_str(&str), + _ => {} + } + } + } + + pub fn clear_pane(&mut self, pane: RenderTarget) { + if matches!(pane, RenderTarget::All) { + for p in self.layout.panes.iter_mut() { + match &mut p.buffer { + RenderBuffer::List { list, index, .. } => { + list.clear(); + *index = 0; + list.push(String::new()); + } + RenderBuffer::String(s) => s.clear(), + _ => {} + } + } + } else if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) { + match &mut p.buffer { + RenderBuffer::List { list, index, .. } => { + list.clear(); + *index = 0; + list.push(String::new()); + } + RenderBuffer::String(s) => s.clear(), + _ => {} + } + } + } + + pub fn apply(&mut self, mes: RenderCommand, area: Rect) { + let rects = self.layout.rects(area); + match mes { + RenderCommand::MouseClickLeft(x, y) => self.handle_mouse_click_left(x, y, rects), + RenderCommand::MouseScrollUp => self.handle_scroll_up(), + RenderCommand::MouseScrollDown => self.handle_scroll_down(), + RenderCommand::RenderKeyInput(k) => match k { + KeyCode::Char(c) => self.handle_char_input(c), + KeyCode::Backspace => self.handle_backspace(), + KeyCode::Enter => self.handle_enter(), + KeyCode::Up | KeyCode::Down | KeyCode::Left | KeyCode::Right => self.handle_arrow_key(k), + _ => {} + }, + RenderCommand::ListMove { pane, index } => self.list_move(pane, index), + RenderCommand::RenderStringToPaneFocused { str } => self.render_string_to_focused(str), + RenderCommand::RenderStringToPaneId { str, pane } => self.render_string_to_id(str, pane), + RenderCommand::Exit => self.exit(), + RenderCommand::ChangeLayout(l) => self.layout = RenderLayout::generate(l), + RenderCommand::ClearPane => self.clear_pane(RenderTarget::All), + RenderCommand::SetMode(mode) => { + match &mode { + InputMode::Input => { + if let InputMode::PopUp(..) = self.mode { + self.layout.panes.pop(); + } + } + InputMode::PopUp(content, title, ..) => { + let pane = Pane::new( + Some(title.to_string()), + RenderTarget::PopUp, + RenderBuffer::Select(content.clone(), 0), + true, + ); + self.layout.panes.push(pane); + } + } + self.mode = mode + }, + } + } + + async fn listen( + &mut self, + rx: &mut tokio::sync::broadcast::Receiver, + ) -> Result { + if let Ok(Ok(mes)) = timeout(Duration::from_millis(400), rx.recv()).await { + return Ok(mes); + } + Err(()) + } +} + +impl Widget for &mut Renderer { + fn render(self, area: Rect, buf: &mut Buffer) { + let rects = self.layout.rects(area); + for (i, p) in self.layout.panes.iter_mut().enumerate() { + if p.target == RenderTarget::PopUp { + p.render(area, buf); + } else { + p.render(rects[i], buf) + } + } + } +} diff --git a/node/src/seeds_constants.rs b/node/src/seeds_constants.rs index f3a092d..c8ebe5b 100644 --- a/node/src/seeds_constants.rs +++ b/node/src/seeds_constants.rs @@ -2,9 +2,9 @@ use once_cell::sync::Lazy; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; pub static SEED_NODES: Lazy<[SocketAddr; 3]> = Lazy::new(|| { - [ - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8333), - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 3000), - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 5432), - ] + [ + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8333), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 3000), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 5432), + ] }); diff --git a/node/src/watcher/builder.rs b/node/src/watcher/builder.rs new file mode 100644 index 0000000..4c18b39 --- /dev/null +++ b/node/src/watcher/builder.rs @@ -0,0 +1,150 @@ +use std::net::SocketAddr; + +use tokio::sync::mpsc; +use vlogger::*; + +use crate::bus::{NetworkEvent, SystemEvent, subscribe_system_event}; +use crate::core; +use crate::executor::{Executor, ExecutorCommand}; +use crate::log; +use crate::node::{Node, NodeCommand}; +use crate::renderer::{RenderLayoutKind, Renderer}; +use crate::watcher::Watcher; + +#[derive(Default)] +pub struct WatcherBuilder { + addr: Option, + database: Option, + bootstrap: bool, + debug: bool, + seed: bool, + render: bool, +} + +impl WatcherBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn addr(mut self, addr: Option) -> Self { + self.addr = addr; + self + } + + pub fn database(mut self, database: Option) -> Self { + self.database = database; + self + } + + pub fn debug(mut self, debug: bool) -> Self { + self.debug = debug; + self + } + + pub fn bootstrap(mut self, bootstrap: bool) -> Self { + self.bootstrap = bootstrap; + self + } + + pub fn render(mut self, render: bool) -> Self { + self.render = render; + self + } + + pub fn seed(mut self, seed: bool) -> Self { + self.seed = seed; + self + } + + pub async fn start(mut self) -> Watcher { + let (exec_tx, exec_rx) = mpsc::channel::(100); + let mut sys_event = subscribe_system_event(); + + if self.debug { + Watcher::log_memory().await; + } + + let renderer = Renderer::new(RenderLayoutKind::CliHorizontal); + + log(msg!(DEBUG, "Database Location: {:?}", self.database)); + if self.seed { + self.addr = Some(crate::seeds_constants::SEED_NODES[0]); + } + + let chain = core::Blockchain::build(None).unwrap(); + let mut node = Node::new(self.addr.clone(), exec_tx.clone(), chain); + log(msg!(INFO, "Built Node")); + + let executor_handle = tokio::spawn({ + let node_tx = node.tx(); + async move { + let _ = Executor::new(node_tx, exec_rx).run().await; + } + }); + + for i in 0..3 { + if let Ok(ev) = sys_event.recv().await { + match ev { + SystemEvent::ExecutorStarted => { + log(msg!(INFO, "Executor Started")); + break; + } + _ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")), + } + } + } + + let node_tx = node.tx(); + let node_handle = tokio::spawn({ + async move { + node.run().await; + } + }); + + for i in 0..3 { + if let Ok(ev) = sys_event.recv().await { + match ev { + SystemEvent::NodeStarted => { + log(msg!(INFO, "Executor Started")); + break; + } + _ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")), + } + } + } + + if self.bootstrap { + let exec_tx = exec_tx.clone(); + + tokio::spawn(async move { + let seed_cmd = ExecutorCommand::Node(NodeCommand::ConnectToSeeds); + let mut ev_rx = crate::bus::subscribe_network_event(); + let _ = exec_tx.send(seed_cmd).await; + + while let Ok(e) = ev_rx.recv().await { + match e { + NetworkEvent::SeedConnected(_) => { + let bootstrap_cmd = ExecutorCommand::Node(NodeCommand::BootStrap); + let _ = exec_tx.send(bootstrap_cmd).await; + } + _ => {} + } + } + }); + } + + let cmd_history = Vec::new(); + let history_index = 0; + let cmd_buffer = String::new(); + let handles = vec![executor_handle, node_handle]; + Watcher::new( + node_tx, + exec_tx, + cmd_buffer, + cmd_history, + history_index, + handles, + renderer, + ) + } +} diff --git a/node/src/watcher/command.rs b/node/src/watcher/command.rs new file mode 100644 index 0000000..b77083d --- /dev/null +++ b/node/src/watcher/command.rs @@ -0,0 +1,20 @@ +use std::sync::Arc; + +use crate::{executor::ExecutorCommand, renderer::RenderCommand}; + +#[derive(Debug, Clone)] +pub enum WatcherCommand { + Render(RenderCommand), + SetMode(WatcherMode), +} + +#[derive(Debug, Clone)] +pub enum WatcherMode { + Input, + Select{ + content: Arc>, + title: String, + callback: Box, + index: usize, + }, +} diff --git a/node/src/watcher/executor.rs b/node/src/watcher/executor.rs deleted file mode 100644 index 8afd99a..0000000 --- a/node/src/watcher/executor.rs +++ /dev/null @@ -1,101 +0,0 @@ -use crate::{ - bus::{SystemEvent, publish_render_event, publish_system_event}, - log, - node::node::NodeCommand, - watcher::renderer::*, -}; -use thiserror::Error; -use tokio::sync::mpsc; -use vlogger::*; - -use super::RenderCommand; - -#[derive(Debug, Error)] -pub enum InProcessError { - #[error("TODO: {0}")] - TODO(String), -} - -#[derive(Clone, Debug)] -pub enum ExecutorCommand { - NodeResponse(String), - Echo(Vec), - Print(String), - InvalidCommand(String), - Node(NodeCommand), - Render(RenderCommand), - Exit, -} - - -pub struct Executor { - node_tx: mpsc::Sender, - rx: mpsc::Receiver, - exit: bool, -} - -impl Executor { - pub fn new(node_tx: mpsc::Sender, rx: mpsc::Receiver) -> Self { - Self { - node_tx, - rx, - exit: false, - } - } - - pub async fn run(&mut self) { - publish_system_event(SystemEvent::ExecutorStarted); - while !self.exit { - self.listen().await; - } - } - - async fn exit(&mut self) { - log(msg!(DEBUG, "Executor Exit")); - self.exit = true - } - - async fn listen(&mut self) { - if let Some(cmd) = self.rx.recv().await { - let _ = self.execute(cmd).await; - } - } - - async fn send_node_cmd(&self, cmd: NodeCommand) { - self.node_tx.send(cmd).await.unwrap() - } - - async fn handle_node_cmd(&self, cmd: NodeCommand) { - self.send_node_cmd(cmd).await; - } - - async fn echo(&self, s: Vec) { - let mut str = s.join(" "); - str.push_str("\n"); - let rd_cmd = RenderCommand::RenderStringToPane { - str, - pane: RenderPane::CliOutput, - }; - publish_render_event(rd_cmd); - } - - async fn invalid_command(&self, str: String) { - let rd_cmd = RenderCommand::RenderStringToPane { - str, - pane: RenderPane::CliOutput, - }; - publish_render_event(rd_cmd); - } - - async fn execute(&mut self, cmd: ExecutorCommand) { - match cmd { - ExecutorCommand::NodeResponse(resp) => log(resp), - ExecutorCommand::Node(n) => self.handle_node_cmd(n).await, - ExecutorCommand::Render(p) => publish_render_event(p), - ExecutorCommand::Echo(s) => self.echo(s).await, - ExecutorCommand::Print(s) => log(s), - ExecutorCommand::InvalidCommand(str) => self.invalid_command(str).await, - ExecutorCommand::Exit => self.exit().await, - } - } -} diff --git a/node/src/watcher/parser.rs b/node/src/watcher/parser.rs deleted file mode 100644 index 3d6c9e3..0000000 --- a/node/src/watcher/parser.rs +++ /dev/null @@ -1,63 +0,0 @@ -use crate::cli::cli; -use crate::watcher::executor::ExecutorCommand; -use vlogger::*; - -use tokio::time::{Duration, timeout}; - -use tokio::sync::mpsc; - -#[derive(Debug)] -pub struct Parser { - rx: mpsc::Receiver, - exec_tx: mpsc::Sender, - exit: bool, -} - -pub enum ParserCommand { - ParseCmdString(String), - Exit, -} - -impl Parser { - pub fn new(rx: mpsc::Receiver, exec_tx: mpsc::Sender) -> Self { - Self { - rx, - exec_tx, - exit: false, - } - } - - async fn exit(&mut self) { - self.log(msg!(DEBUG, "Parser Exit")).await; - self.exit = true; - } - - pub async fn run(&mut self) { - self.log(msg!(INFO, "Started Parser")).await; - while !self.exit { - self.listen().await; - } - } - - async fn log(&self, msg: String) { - if let Err(e) = self.exec_tx.send(ExecutorCommand::Print(msg)).await { - log!(ERROR, "Error response from exec: {e}"); - } - } - - async fn listen(&mut self) { - if let Ok(Some(mes)) = timeout(Duration::from_millis(400), self.rx.recv()).await { - match mes { - ParserCommand::ParseCmdString(s) => { - let argv: Vec<&str> = - std::iter::once(" ").chain(s.split_whitespace()).collect(); - let cmd = cli(&argv); - let _ = self.exec_tx.send(cmd).await; - } - ParserCommand::Exit => { - self.exit().await; - } - } - } - } -} diff --git a/node/src/watcher/renderer.rs b/node/src/watcher/renderer.rs deleted file mode 100644 index 9ae642a..0000000 --- a/node/src/watcher/renderer.rs +++ /dev/null @@ -1,405 +0,0 @@ -use crossterm::event::KeyCode; -use ratatui::prelude::*; -use ratatui::widgets::Wrap; -use ratatui::{ - Frame, - buffer::Buffer, - layout::Rect, - symbols::border, - widgets::{Block, List, Paragraph, Widget}, -}; - -use vlogger::*; - -use crate::bus::{SystemEvent, publish_system_event, subscribe_render_event}; -use std::io; -use tokio::time::{Duration, interval, timeout}; - -#[derive(Debug)] -pub struct Renderer { - buffer: String, - exit: bool, - layout: RenderLayout, -} - -#[derive(Debug)] -pub struct Pane { - title: Option, - target: RenderPane, - buffer: RenderBuffer, - focused: bool, - scroll: i16, - max_scroll: i16, -} - -#[derive(Debug, PartialEq, Clone, clap::ValueEnum)] -pub enum RenderPane { - #[value(name = "all", aliases = ["a"])] - All, - #[value(aliases = ["i", "in"])] - CliInput, - #[value(aliases = ["o", "out"])] - CliOutput, -} - -#[derive(Clone, Debug)] -enum RenderBuffer { - List { list: Vec, index: usize }, - String(String), -} - -impl Pane { - fn render(&mut self, area: Rect, buf: &mut Buffer) { - let block = Block::bordered() - .title({ - if let Some(t) = &self.title { - t.clone() - } else { - Default::default() - } - }) - .border_set(border::PLAIN) - .border_style({ - if self.focused { - Style::new().green() - } else { - Style::new().white() - } - }); - let inner_area = block.inner(area); - let content_width = inner_area.width as usize; - let content_height = inner_area.height as usize; - match &self.buffer { - RenderBuffer::String(s) => { - let wrapped_lines = s - .lines() - .map(|line| { - if line.is_empty() { - 1 - } else { - (line.len() + content_width - 1) / { - content_width + (content_width == 0) as usize - } - } - }) - .sum::(); - - self.max_scroll = if wrapped_lines > content_height { - (wrapped_lines - content_height) as i16 - } else { - 0 - }; - - let scroll_offset = self.max_scroll.saturating_sub(self.scroll) as u16; - - Paragraph::new(s.clone()) - .wrap(Wrap::default()) - .left_aligned() - .block(block) - .scroll((scroll_offset as u16, 0)) - .render(area, buf); - } - RenderBuffer::List { list, .. } => { - let list_w = - List::new(list.iter().map(|s| { - format!("> {}", textwrap::fill(s, content_width.saturating_sub(2))) - })) - .block(block); - Widget::render(list_w, area, buf); - } - } - } -} - -#[derive(Clone, Debug)] -pub enum RenderCommand { - RenderStringToPane { - str: String, - pane: RenderPane, - }, - RenderInput(KeyCode), - ListMove { - pane: RenderPane, - index: usize, - }, - ChangeLayout(RenderLayoutKind), - ClearPane(RenderPane), - - /// Mouse Events - MouseClickLeft(u16, u16), - MouseScrollUp, - MouseScrollDown, - Exit, -} - -#[derive(Debug, Clone, clap::ValueEnum)] -pub enum RenderLayoutKind { - #[value(name = "horizontal", aliases = ["h"])] - CliHorizontal, - #[value(name = "vertical", aliases = ["v"])] - CliVertical, -} - -#[derive(Debug)] -pub struct RenderLayout { - kind: RenderLayoutKind, - panes: Vec, -} - -impl RenderLayoutKind { - pub fn rects(&self, area: Rect) -> std::rc::Rc<[Rect]> { - match self { - Self::CliHorizontal => Layout::default() - .direction(Direction::Vertical) - .constraints(vec![Constraint::Percentage(70), Constraint::Percentage(30)]) - .split(area), - Self::CliVertical => Layout::default() - .direction(Direction::Horizontal) - .constraints(vec![Constraint::Percentage(30), Constraint::Percentage(70)]) - .split(area), - } - } - - pub fn generate(&self) -> RenderLayout { - match self { - RenderLayoutKind::CliVertical => RenderLayout { - kind: self.clone(), - panes: vec![ - Pane { - title: Some(" Input Pane ".to_string()), - target: RenderPane::CliInput, - buffer: RenderBuffer::List { - list: vec![String::new()], - index: 0, - }, - focused: true, - scroll: 0, - max_scroll: 0, - }, - Pane { - title: Some(" Output Pane ".to_string()), - target: RenderPane::CliOutput, - buffer: RenderBuffer::String(String::new()), - focused: false, - scroll: 0, - max_scroll: 0, - }, - ], - }, - RenderLayoutKind::CliHorizontal => RenderLayout { - kind: self.clone(), - panes: vec![ - Pane { - title: Some(" Output Pane ".to_string()), - target: RenderPane::CliOutput, - buffer: RenderBuffer::String(String::new()), - focused: false, - scroll: 0, - max_scroll: 0, - }, - Pane { - title: Some(" Input Pane ".to_string()), - target: RenderPane::CliInput, - buffer: RenderBuffer::List { - list: vec![String::new()], - index: 0, - }, - focused: true, - scroll: 0, - max_scroll: 0, - }, - ], - }, - } - } -} - -#[allow(dead_code)] -impl Renderer { - pub fn new(layout: RenderLayoutKind) -> Self { - Self { - buffer: String::new(), - exit: false, - layout: layout.generate(), - } - } - - pub async fn run(&mut self) -> io::Result<()> { - self.log(msg!(INFO, "Started Renderer")); - let mut rx = subscribe_render_event(); - let mut terminal = ratatui::init(); - publish_system_event(SystemEvent::RendererStarted); - - let mut render_interval = interval(Duration::from_millis(32)); // 60 FPS - - while !self.exit { - tokio::select! { - _ = render_interval.tick() => { - terminal.draw(|frame| self.draw(frame))?; - } - mes = rx.recv() => { - if let Ok(mes) = mes { - let frame = terminal.get_frame(); - let rects = self.layout.kind.rects(frame.area()); - self.apply(mes, rects); - } - } - } - } - ratatui::restore(); - Ok(()) - } - - fn log(&mut self, msg: String) { - self.buffer.push_str(&msg) - } - - pub fn draw(&mut self, frame: &mut Frame) { - frame.render_widget(self, frame.area()); - } - - fn exit(&mut self) { - log!(DEBUG, "Renderer Exit"); - self.exit = true; - } - - fn buffer_extend>(&mut self, input: S) { - self.buffer.push_str(input.as_ref()); - } - - fn input_pane(&mut self) -> Option<&mut Pane> { - self.layout - .panes - .iter_mut() - .find(|p| p.target == RenderPane::CliInput) - } - - fn focused(&mut self) -> Option<&mut Pane> { - self.layout.panes.iter_mut().find(|p| p.focused == true) - } - - fn handle_mouse_click_left(&mut self, x: u16, y: u16, rects: std::rc::Rc<[Rect]>) { - for (i, r) in rects.iter().enumerate() { - if r.contains(layout::Position { x, y }) { - self.layout.panes[i].focused = true; - } else { - self.layout.panes[i].focused = false; - } - } - } - - fn apply(&mut self, mes: RenderCommand, rects: std::rc::Rc<[Rect]>) { - match mes { - RenderCommand::MouseClickLeft(x, y) => { - self.handle_mouse_click_left(x, y, rects); - } - RenderCommand::MouseScrollUp => { - if let Some(p) = self.focused() { - if p.scroll < p.max_scroll { - p.scroll += 1; - } - } - } - RenderCommand::MouseScrollDown => { - if let Some(p) = self.focused() { - if p.scroll > i16::MIN { - p.scroll -= 1; - } - } - } - RenderCommand::RenderInput(k) => { - if let Some(p) = self.layout.panes.iter_mut().find(|p| p.focused) { - match k { - KeyCode::Char(c) => { - if let RenderBuffer::List { list, index } = &mut p.buffer { - list[*index].push(c); - } - } - KeyCode::Backspace => { - if let RenderBuffer::List { list, index } = &mut p.buffer { - list[*index].pop(); - } - } - KeyCode::Enter => { - if let RenderBuffer::List { list, index } = &mut p.buffer { - list.push(String::new()); - *index += 1; - } - } - _ => {} - } - } - } - - RenderCommand::ListMove { pane, index } => { - if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) { - if let RenderBuffer::List { list, index: idx } = &mut p.buffer { - if index > 0 && index < list.len() { - list[*idx] = list[index].clone(); - } - } - } - } - - RenderCommand::RenderStringToPane { str, pane } => { - if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) { - match &mut p.buffer { - RenderBuffer::List { list, index } => { - list.push(str); - *index += 1; - } - RenderBuffer::String(s) => s.push_str(&str), - } - } - } - RenderCommand::Exit => { - self.exit(); - } - RenderCommand::ChangeLayout(l) => { - self.layout = l.generate(); - } - RenderCommand::ClearPane(pane) => { - if matches!(pane, RenderPane::All) { - for p in self.layout.panes.iter_mut() { - match &mut p.buffer { - RenderBuffer::List { list, index } => { - list.clear(); - *index = 0; - list.push(String::new()); - } - RenderBuffer::String(s) => s.clear(), - } - } - } else if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) { - match &mut p.buffer { - RenderBuffer::List { list, index } => { - list.clear(); - *index = 0; - list.push(String::new()); - } - RenderBuffer::String(s) => s.clear(), - } - } - } - } - } - - async fn listen( - &mut self, - rx: &mut tokio::sync::broadcast::Receiver, - ) -> Result { - if let Ok(Ok(mes)) = timeout(Duration::from_millis(400), rx.recv()).await { - return Ok(mes); - } - Err(()) - } -} - -impl Widget for &mut Renderer { - fn render(self, area: Rect, buf: &mut Buffer) { - let rects = self.layout.kind.rects(area); - for (i, p) in self.layout.panes.iter_mut().enumerate() { - p.render(rects[i], buf) - } - } -} diff --git a/node/src/watcher/watcher.rs b/node/src/watcher/watcher.rs index 8901218..c829175 100644 --- a/node/src/watcher/watcher.rs +++ b/node/src/watcher/watcher.rs @@ -1,330 +1,294 @@ -use crossterm::event::{self, Event, KeyCode, KeyEventKind, MouseButton, MouseEventKind}; +use crate::{cli::cli, error::print_error_chain, node::node::NodeCommand, watcher::WatcherMode}; +use crossterm::event::{Event, EventStream, KeyCode, KeyEventKind, MouseButton, MouseEventKind}; +use futures::StreamExt; use memory_stats::memory_stats; -use std::{ - io::{self, Write}, - net::SocketAddr, - time::Duration, -}; -use tokio::sync::mpsc; - -use crate::{ - bus::{subscribe_system_event, NetworkEvent, SystemEvent}, core, node::node::{Node, NodeCommand} +use ratatui::{layout::Rect, Terminal}; +use std::io::{self, Stdout, Write}; +use tokio::{ + select, + sync::mpsc, + time::{Duration, interval}, }; use vlogger::*; -use super::*; +use super::{ WatcherBuilder, WatcherCommand }; -use crate::bus::{ publish_render_event }; +use crate::bus::subscribe_watcher_event; +use crate::executor::*; use crate::log; +use crate::renderer::*; + #[allow(dead_code)] pub struct Watcher { - parser_tx: mpsc::Sender, + node_tx: mpsc::Sender, + exec_tx: mpsc::Sender, + cmd_buffer: String, + cmd_history: Vec, + history_index: usize, + handles: Vec>, + event_stream: crossterm::event::EventStream, + mode: WatcherMode, + pub renderer: Renderer, +} + +impl Watcher { + pub fn new( node_tx: mpsc::Sender, exec_tx: mpsc::Sender, cmd_buffer: String, cmd_history: Vec, history_index: usize, handles: Vec>, -} - -impl Watcher { - pub fn build() -> WatcherBuilder { - WatcherBuilder::new() + renderer: Renderer, + ) -> Self { + Self { + node_tx, + exec_tx, + cmd_buffer, + cmd_history, + history_index, + handles, + renderer, + mode: WatcherMode::Input, + event_stream: EventStream::new(), } + } - pub fn parser_tx(&self) -> mpsc::Sender { - self.parser_tx.clone() + fn init(&self) -> io::Result<()>{ + crossterm::execute!( + std::io::stdout(), + crossterm::event::EnableBracketedPaste, + crossterm::event::EnableFocusChange, + crossterm::event::EnableMouseCapture, + ) + } + + fn shutdown(&self) -> io::Result<()> { + ratatui::restore(); + crossterm::execute!( + std::io::stdout(), + crossterm::event::DisableBracketedPaste, + crossterm::event::DisableFocusChange, + crossterm::event::DisableMouseCapture + ) + } + + pub fn handle_cmd(&mut self, cmd: WatcherCommand, terminal: &mut Terminal>) { + match cmd { + WatcherCommand::Render(rend_cmd) => { + let frame = terminal.get_frame(); + self.renderer.apply(rend_cmd, frame.area()); + } + WatcherCommand::SetMode(mode) => { + match &mode { + WatcherMode::Input => {} + WatcherMode::Select{content, title, ..} => { + let rd_cmd = RenderCommand::SetMode(InputMode::PopUp(content.clone(), title.clone(), 0)); + let frame = terminal.get_frame(); + self.renderer.apply(rd_cmd, frame.area()); + } + } + self.mode = mode; + } } + } - pub fn exec_tx(&self) -> mpsc::Sender { - self.exec_tx.clone() - } + pub async fn run(&mut self) -> std::io::Result<()> { + let mut ui_rx = subscribe_watcher_event(); + let mut render_interval = interval(Duration::from_millis(32)); + let mut terminal = ratatui::init(); - pub async fn exit(self) { - let rd_mes = RenderCommand::Exit; - let pr_mes = ParserCommand::Exit; - let exec_mes = ExecutorCommand::Exit; - let node_mes = NodeCommand::Exit; - publish_render_event(rd_mes); - let _ = self.parser_tx.send(pr_mes).await; - let _ = self.exec_tx.send(exec_mes).await; - let _ = self.node_tx.send(node_mes).await; - } + self.init()?; - pub async fn log_memory() { - tokio::spawn(async move { - let id = format!("{}_{}", current_timestamp(), std::process::id()); - let mut path = std::path::PathBuf::new(); - path.push("./proc/"); - path.push(id); - let mut mem_map = std::fs::OpenOptions::new() - .create(true) - .write(true) - .truncate(true) - .open(path) - .unwrap(); - loop { - let _ = tokio::time::sleep(Duration::from_secs(10)).await; - if let Some(usage) = memory_stats() { - let current = current_timestamp(); - let _ = mem_map.write_all( - msg!( - INFO, - "{}: Physical memory usage: {} MB", - current, - usage.physical_mem / 1024 / 1024 - ) - .as_bytes(), - ); - let _ = mem_map.write_all( - msg!( - INFO, - "{}: Virtual memory usage: {} MB", - current, - usage.virtual_mem / 1024 / 1024 - ) - .as_bytes(), - ); - } + loop { + select! { + poll_res = self.poll() => { + match poll_res { + Ok(event) => { + match self.handle_event(event, terminal.get_frame().area()).await { + Ok(ret) => if !ret { self.exit(); break } + Err(e) => log(msg!(ERROR, "{}", e)), + } } - }); - } - - pub async fn poll(&mut self) -> io::Result { - match event::read()? { - Event::Mouse(event) => match event.kind { - MouseEventKind::ScrollUp => { - publish_render_event(RenderCommand::MouseScrollUp); - } - MouseEventKind::ScrollDown => { - publish_render_event(RenderCommand::MouseScrollDown); - } - MouseEventKind::Down(b) => match b { - MouseButton::Left => { - publish_render_event(RenderCommand::MouseClickLeft( - event.column, - event.row, - )); - } - _ => {} - }, - _ => {} + Err(()) => { log(msg!(ERROR, "Failed to read from Stream")) } + } + } + ui_event = ui_rx.recv() => { + match ui_event { + Ok(cmd) => { + self.handle_cmd(cmd, &mut terminal); }, - Event::Key(k) if k.kind == KeyEventKind::Press => { - match k.code { - KeyCode::Char(c) => { - self.cmd_buffer.push(c); - let message = RenderCommand::RenderInput(k.code); - publish_render_event(message); - } - KeyCode::Backspace => { - self.cmd_buffer.pop(); - let message = RenderCommand::RenderInput(k.code); - publish_render_event(message); - } - KeyCode::Enter => { - let rd_mes = RenderCommand::RenderInput(k.code); - let pr_mes = ParserCommand::ParseCmdString(self.cmd_buffer.clone()); - let _ = self.parser_tx.send(pr_mes).await; - publish_render_event(rd_mes); - self.cmd_buffer.clear(); - } - KeyCode::Up => { - if self.history_index > 0 { - self.history_index -= 1; - let rd_mes = RenderCommand::ListMove { - pane: RenderPane::CliInput, - index: self.history_index, - }; - publish_render_event(rd_mes); - } - } - KeyCode::Down => { - if self.history_index < self.cmd_buffer.len() { - self.history_index += 1; - let rd_mes = RenderCommand::ListMove { - pane: RenderPane::CliInput, - index: self.history_index, - }; - publish_render_event(rd_mes); - } - } - KeyCode::Esc => { - return Ok(false); - } - _ => {} - }; + Err(e) => { + log(msg!(ERROR, "{}", e)) } - _ => {} - } - Ok(true) - } -} - -#[derive(Default)] -pub struct WatcherBuilder { - addr: Option, - database: Option, - bootstrap: bool, - debug: bool, - seed: bool, - render: bool, -} - -impl WatcherBuilder { - fn new() -> Self { - Self::default() - } - - pub fn addr(mut self, addr: Option) -> Self { - self.addr = addr; - self - } - - pub fn database(mut self, database: Option) -> Self { - self.database = database; - self - } - - pub fn debug(mut self, debug: bool) -> Self { - self.debug = debug; - self - } - - pub fn bootstrap(mut self, bootstrap: bool) -> Self { - self.bootstrap = bootstrap; - self - } - - pub fn render(mut self, render: bool) -> Self { - self.render = render; - self - } - - pub fn seed(mut self, seed: bool) -> Self { - self.seed = seed; - self - } - - pub async fn start(mut self) -> Watcher { - let (parser_tx, parser_rx) = mpsc::channel::(100); - let (exec_tx, exec_rx) = mpsc::channel::(100); - let mut sys_event = subscribe_system_event(); - - if self.debug { - Watcher::log_memory().await; - } - - let render_handle = if self.render { - Some(tokio::spawn({ - async move { - let _ = Renderer::new(RenderLayoutKind::CliHorizontal).run().await; - } - })) - } else { - None - }; - - for i in 0..3 { - if let Ok(ev) = sys_event.recv().await { - match ev { - SystemEvent::RendererStarted => { - log(msg!(INFO, "Renderer Started")); - break; - } - _ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")), - } - } - } - - let parser_handle = tokio::spawn({ - let exec_tx = exec_tx.clone(); - async move { - let _ = Parser::new(parser_rx, exec_tx).run().await; - } - }); - - log(msg!(DEBUG, "Database Location: {:?}", self.database)); - if self.seed { - self.addr = Some(crate::seeds_constants::SEED_NODES[0]); - } - - let chain = core::Blockchain::build(None).unwrap(); - let mut node = Node::new(self.addr.clone(), exec_tx.clone(), chain); - log(msg!(INFO, "Built Node")); - - let executor_handle = tokio::spawn({ - let node_tx = node.tx(); - async move { - let _ = Executor::new(node_tx, exec_rx).run().await; - } - }); - - for i in 0..3 { - if let Ok(ev) = sys_event.recv().await { - match ev { - SystemEvent::ExecutorStarted => { - log(msg!(INFO, "Executor Started")); - break; - } - _ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")), - } - } - } - - let node_tx = node.tx(); - let node_handle = tokio::spawn({ - async move { - node.run().await; - } - }); - - for i in 0..3 { - if let Ok(ev) = sys_event.recv().await { - match ev { - SystemEvent::NodeStarted => { - log(msg!(INFO, "Executor Started")); - break; - } - _ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")), - } - } - } - - if self.bootstrap { - let exec_tx = exec_tx.clone(); - - tokio::spawn(async move { - let seed_cmd = ExecutorCommand::Node(NodeCommand::ConnectToSeeds); - let mut ev_rx = crate::bus::subscribe_network_event(); - let _ = exec_tx.send(seed_cmd).await; - - while let Ok(e) = ev_rx.recv().await { - match e { - NetworkEvent::SeedConnected(_) => { - let bootstrap_cmd = ExecutorCommand::Node(NodeCommand::BootStrap); - let _ = exec_tx.send(bootstrap_cmd).await; - } - _ => {} - } - } - }); - } - Watcher { - node_tx, - cmd_history: Vec::new(), - history_index: 0, - parser_tx, - exec_tx, - cmd_buffer: String::new(), - handles: { - let mut h = vec![parser_handle, executor_handle, node_handle]; - if render_handle.is_some() { - h.push(render_handle.unwrap()); - } - h - }, + } } + _ = render_interval.tick() => { + terminal.draw(|frame| self.renderer.draw(frame))?; + } + } } + self.shutdown() + } + + pub fn build() -> WatcherBuilder { + WatcherBuilder::new() + } + + pub fn exec_tx(&self) -> mpsc::Sender { + self.exec_tx.clone() + } + + pub fn exit(&self) {} + + async fn handle_enter(&mut self) { + match &self.mode { + WatcherMode::Input => { + if !self.cmd_buffer.is_empty() { + let exec_event = cli(&self.cmd_buffer); + let _ = self.exec_tx.send(exec_event).await; + self.cmd_buffer.clear(); + self.renderer.handle_enter() + } + } + WatcherMode::Select { content, callback, index, .. } => { + match &&**callback { + &ExecutorCommand::Node(nd_cmd) => { + match nd_cmd { + NodeCommand::DisplayBlockByKey(_) => { + let key = (*content)[*index].clone().to_string(); + let resp = ExecutorCommand::Node(NodeCommand::DisplayBlockByKey(key)); + let _ = self.exec_tx.send(resp).await; + } + _ => {log(msg!(DEBUG, "TODO: Implement callback for {:?}", nd_cmd))} + } + } + _ => {log(msg!(DEBUG, "TODO: Implement callback for {:?}", *callback))} + } + self.mode = WatcherMode::Input; + let rd_cmd = RenderCommand::SetMode(InputMode::Input); + self.renderer.apply(rd_cmd, Rect::default()); + } + } + } + + fn handle_arrow_key(&mut self, key: KeyCode) { + match key { + KeyCode::Up => { + match &mut self.mode { + &mut WatcherMode::Select { ref mut index, .. } => { + *index = index.saturating_sub(1); + } + _ => {} + } + } + KeyCode::Down => { + match &mut self.mode { + &mut WatcherMode::Select { ref mut index, ref content, ..} => { + if *index < content.len().saturating_sub(1) { + *index = index.saturating_add(1); + } + } + _ => {} + } + } + _ => {} + } + } + + pub async fn log_memory() { + tokio::spawn(async move { + let id = format!("{}_{}", current_timestamp(), std::process::id()); + let mut path = std::path::PathBuf::new(); + path.push("./proc/"); + path.push(id); + let mut mem_map = std::fs::OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(path) + .unwrap(); + loop { + let _ = tokio::time::sleep(Duration::from_secs(10)).await; + if let Some(usage) = memory_stats() { + let current = current_timestamp(); + let _ = mem_map.write_all( + msg!( + INFO, + "{}: Physical memory usage: {} MB", + current, + usage.physical_mem / 1024 / 1024 + ) + .as_bytes(), + ); + let _ = mem_map.write_all( + msg!( + INFO, + "{}: Virtual memory usage: {} MB", + current, + usage.virtual_mem / 1024 / 1024 + ) + .as_bytes(), + ); + } + } + }); + } + + pub async fn handle_event(&mut self, event: Event, area: Rect) -> io::Result { + match event { + Event::Mouse(event) => match event.kind { + MouseEventKind::ScrollUp => { + self.renderer.handle_scroll_up(); + } + MouseEventKind::ScrollDown => { + self.renderer.handle_scroll_down(); + } + MouseEventKind::Down(b) => match b { + MouseButton::Left => { + let rects = self.renderer.rects(area); + self + .renderer + .handle_mouse_click_left(event.column, event.row, rects); + } + _ => {} + }, + _ => {} + }, + Event::Key(k) if k.kind == KeyEventKind::Press => match k.code { + KeyCode::Esc => return Ok(false), + KeyCode::Char(c) => { + self.cmd_buffer.push(c); + self.renderer.handle_char_input(c) + } + KeyCode::Backspace => { + self.cmd_buffer.pop(); + self.renderer.handle_backspace() + } + KeyCode::Enter => { + self.handle_enter().await; + } + KeyCode::Up | KeyCode::Down | KeyCode::Left | KeyCode::Right => { + self.handle_arrow_key(k.code); + self.renderer.handle_arrow_key(k.code) + } + _ => {} + }, + Event::Paste(text) => { + log(msg!(DEBUG, "Received pasted text: {text}")); + self.renderer.render_string_to_focused(text); + } + _ => {} + } + Ok(true) + } + + pub async fn poll(&mut self) -> Result { + match self.event_stream.next().await { + Some(Ok(event)) => Ok(event), + Some(Err(e)) => Err(print_error_chain(&e.into())), + None => Err(()), + } + } }