From 3a8440c2cd9ef1bc5d5f3d25f8279b2ec4a14a9c Mon Sep 17 00:00:00 2001 From: victor Date: Fri, 29 Aug 2025 19:09:09 +0200 Subject: [PATCH] bless --- hell0 | 0 .gitignore => node/.gitignore | 0 Cargo.lock => node/Cargo.lock | 85 +++++ Cargo.toml => node/Cargo.toml | 5 + {database => node/database}/genesis.json | 0 {database => node/database}/state.json | 0 {database => node/database}/tx.db | 0 {lib => node/lib}/.gitignore | 0 {lib => node/lib}/logger-rs/.gitignore | 0 {lib => node/lib}/logger-rs/Cargo.lock | 0 {lib => node/lib}/logger-rs/Cargo.toml | 0 {lib => node/lib}/logger-rs/README.md | 0 {lib => node/lib}/logger-rs/src/lib.rs | 64 +--- {src => node/src}/args.rs | 102 +++--- {src => node/src}/cli.rs | 30 +- {src => node/src}/core/block.rs | 8 +- {src => node/src}/core/blockchain.rs | 84 ++--- {src => node/src}/core/tx.rs | 5 + {src => node/src}/error.rs | 39 +-- node/src/event_bus.rs | 75 +++++ node/src/lib.rs | 61 ++++ node/src/main.rs | 33 ++ {src => node/src}/native_node/error.rs | 0 node/src/native_node/node.rs | 383 +++++++++++++++++++++++ node/src/protocol/connection.rs | 92 ++++++ node/src/protocol/connector.rs | 272 ++++++++++++++++ node/src/protocol/message.rs | 69 ++++ {src => node/src}/seeds_constants.rs | 0 {src => node/src}/watcher/executor.rs | 60 ++-- {src => node/src}/watcher/parser.rs | 7 +- node/src/watcher/renderer.rs | 310 ++++++++++++++++++ node/src/watcher/watcher.rs | 254 +++++++++++++++ src/core.rs | 7 - src/main.rs | 37 --- src/native_node.rs | 4 - src/native_node/message.rs | 117 ------- src/native_node/network.rs | 167 ---------- src/native_node/node.rs | 279 ----------------- src/watcher.rs | 8 - src/watcher/renderer.rs | 231 -------------- src/watcher/watcher.rs | 203 ------------ testing/script/.gitignore | 1 + testing/script/Cargo.lock | 284 +++++++++++++++++ testing/script/Cargo.toml | 7 + testing/script/src/main.rs | 14 + 45 files changed, 2135 insertions(+), 1262 deletions(-) create mode 100644 hell0 rename .gitignore => node/.gitignore (100%) rename Cargo.lock => node/Cargo.lock (95%) rename Cargo.toml => node/Cargo.toml (86%) rename {database => node/database}/genesis.json (100%) rename {database => node/database}/state.json (100%) rename {database => node/database}/tx.db (100%) rename {lib => node/lib}/.gitignore (100%) rename {lib => node/lib}/logger-rs/.gitignore (100%) rename {lib => node/lib}/logger-rs/Cargo.lock (100%) rename {lib => node/lib}/logger-rs/Cargo.toml (100%) rename {lib => node/lib}/logger-rs/README.md (100%) rename {lib => node/lib}/logger-rs/src/lib.rs (54%) rename {src => node/src}/args.rs (81%) rename {src => node/src}/cli.rs (55%) rename {src => node/src}/core/block.rs (83%) rename {src => node/src}/core/blockchain.rs (76%) rename {src => node/src}/core/tx.rs (91%) rename {src => node/src}/error.rs (54%) create mode 100644 node/src/event_bus.rs create mode 100644 node/src/lib.rs create mode 100644 node/src/main.rs rename {src => node/src}/native_node/error.rs (100%) create mode 100644 node/src/native_node/node.rs create mode 100644 node/src/protocol/connection.rs create mode 100644 node/src/protocol/connector.rs create mode 100644 node/src/protocol/message.rs rename {src => node/src}/seeds_constants.rs (100%) rename {src => node/src}/watcher/executor.rs (54%) rename {src => node/src}/watcher/parser.rs (82%) create mode 100644 node/src/watcher/renderer.rs create mode 100644 node/src/watcher/watcher.rs delete mode 100644 src/core.rs delete mode 100644 src/main.rs delete mode 100644 src/native_node.rs delete mode 100644 src/native_node/message.rs delete mode 100644 src/native_node/network.rs delete mode 100644 src/native_node/node.rs delete mode 100644 src/watcher.rs delete mode 100644 src/watcher/renderer.rs delete mode 100644 src/watcher/watcher.rs create mode 100644 testing/script/.gitignore create mode 100644 testing/script/Cargo.lock create mode 100644 testing/script/Cargo.toml create mode 100644 testing/script/src/main.rs diff --git a/hell0 b/hell0 new file mode 100644 index 0000000..e69de29 diff --git a/.gitignore b/node/.gitignore similarity index 100% rename from .gitignore rename to node/.gitignore diff --git a/Cargo.lock b/node/Cargo.lock similarity index 95% rename from Cargo.lock rename to node/Cargo.lock index ae38ede..bd6fbca 100644 --- a/Cargo.lock +++ b/node/Cargo.lock @@ -88,6 +88,23 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "anyhow" +version = "1.0.99" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" + +[[package]] +name = "async-trait" +version = "0.1.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -140,10 +157,15 @@ dependencies = [ name = "blockchain" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", "chrono", "clap", "crossterm 0.29.0", "hex", + "jemalloc", + "jemallocator", + "memory-stats", "once_cell", "ratatui", "serde", @@ -795,6 +817,35 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "jemalloc" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd5927dc2630919333682c1757ed2d1518f510336096d0f43f0426a0f776f8e6" +dependencies = [ + "loca", +] + +[[package]] +name = "jemalloc-sys" +version = "0.5.4+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6c1946e1cea1788cbfde01c993b52a10e2da07f4bac608228d1bed20bfebf2" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0de374a9f8e63150e6f5e8a60cc14c668226d7a347d8aee1a45766e3c4dd3bc" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "js-sys" version = "0.3.77" @@ -829,6 +880,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5e54036fe321fd421e10d732f155734c4e4afd610dd556d9a82833ab3ee0bed" +[[package]] +name = "loca" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "669389264b730a98e0d11713d0260f0ab363ad7e50ffc6d36db9c75bb7a5e527" +dependencies = [ + "ptr", +] + [[package]] name = "lock_api" version = "0.4.13" @@ -860,6 +920,16 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "memory-stats" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c73f5c649995a115e1a0220b35e4df0a1294500477f97a91d0660fb5abeb574a" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "mime" version = "0.3.17" @@ -1012,6 +1082,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "ptr" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76208293e44a44a702aa22f5cf40e2e55f4a6cd19953ddadb0b3f68e337d87ce" + [[package]] name = "quote" version = "1.0.40" @@ -1708,6 +1784,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.59.0" diff --git a/Cargo.toml b/node/Cargo.toml similarity index 86% rename from Cargo.toml rename to node/Cargo.toml index 923f292..fffa66a 100644 --- a/Cargo.toml +++ b/node/Cargo.toml @@ -21,3 +21,8 @@ vlogger = { path = "./lib/logger-rs" } ratatui = "0.29.0" crossterm = "0.29.0" once_cell = "1.21.3" +async-trait = "0.1.89" +anyhow = "1.0.99" +memory-stats = "1.2.0" +jemalloc = "0.3.0" +jemallocator = "0.5.4" diff --git a/database/genesis.json b/node/database/genesis.json similarity index 100% rename from database/genesis.json rename to node/database/genesis.json diff --git a/database/state.json b/node/database/state.json similarity index 100% rename from database/state.json rename to node/database/state.json diff --git a/database/tx.db b/node/database/tx.db similarity index 100% rename from database/tx.db rename to node/database/tx.db diff --git a/lib/.gitignore b/node/lib/.gitignore similarity index 100% rename from lib/.gitignore rename to node/lib/.gitignore diff --git a/lib/logger-rs/.gitignore b/node/lib/logger-rs/.gitignore similarity index 100% rename from lib/logger-rs/.gitignore rename to node/lib/logger-rs/.gitignore diff --git a/lib/logger-rs/Cargo.lock b/node/lib/logger-rs/Cargo.lock similarity index 100% rename from lib/logger-rs/Cargo.lock rename to node/lib/logger-rs/Cargo.lock diff --git a/lib/logger-rs/Cargo.toml b/node/lib/logger-rs/Cargo.toml similarity index 100% rename from lib/logger-rs/Cargo.toml rename to node/lib/logger-rs/Cargo.toml diff --git a/lib/logger-rs/README.md b/node/lib/logger-rs/README.md similarity index 100% rename from lib/logger-rs/README.md rename to node/lib/logger-rs/README.md diff --git a/lib/logger-rs/src/lib.rs b/node/lib/logger-rs/src/lib.rs similarity index 54% rename from lib/logger-rs/src/lib.rs rename to node/lib/logger-rs/src/lib.rs index de456c5..a69e5e1 100644 --- a/lib/logger-rs/src/lib.rs +++ b/node/lib/logger-rs/src/lib.rs @@ -9,7 +9,7 @@ pub const FATAL: usize = 4; pub const LOG_LEVEL: [&str; 5] = [ "INFO", "DEBUG", - "WARNING", + "WARN", "ERROR", "FATAL", ]; @@ -43,61 +43,13 @@ pub fn current_timestamp() -> String { macro_rules! msg { ($level:expr, $($arg:tt)*) => {{ let formatted_msg = format!($($arg)*); - match $level { - $crate::INFO => { - format!( - "[{}] {} | {}\n", - //$crate::colored($crate::LOG_LEVEL[$level], "green"), - $crate::LOG_LEVEL[$level].to_string(), - $crate::current_timestamp(), - formatted_msg - ) - }, - $crate::DEBUG => { - format!( - "[{}]\t[{}:{}] {} | {}\n", - $crate::LOG_LEVEL[$level].to_string(), - file!(), - line!(), - $crate::current_timestamp(), - formatted_msg - ) - }, - $crate::WARNING => { - format!( - "[{}] {} | {}\n", - $crate::LOG_LEVEL[$level].to_string(), - $crate::current_timestamp(), - formatted_msg - ) - }, - $crate::ERROR => { - format!( - "[{}] {} | {}\n", - $crate::LOG_LEVEL[$level].to_string(), - $crate::current_timestamp(), - formatted_msg - ) - }, - $crate::FATAL => { - format!( - "[{}] [{}:{}] {} | {}\n", - $crate::LOG_LEVEL[$level].to_string(), - file!(), - line!(), - $crate::current_timestamp(), - formatted_msg - ) - }, - _ => { - format!( - "[{}] {} {}", - $crate::LOG_LEVEL[$crate::ERROR], - "logging error: log_level value invalid:", - $level - ) - } - } + format!( + "[{:<5}] [{:<15}:{:<4}] {}\n", + $crate::LOG_LEVEL[$level].to_string(), + { std::path::Path::new(file!()).file_name().unwrap().to_str().unwrap_or(file!()) }, + line!(), + formatted_msg + ) }}; } diff --git a/src/args.rs b/node/src/args.rs similarity index 81% rename from src/args.rs rename to node/src/args.rs index 2b87fee..171b8a0 100644 --- a/src/args.rs +++ b/node/src/args.rs @@ -16,12 +16,46 @@ pub struct Cli { #[derive(Subcommand)] pub enum CliCommand{ - #[command(name = "node")] - Node { + #[command(name = "ping")] + Ping{ #[command(subcommand)] - command: CliNodeCommand + ping_cmd: CliPingCommand }, + /// Peer related Cmd + #[command(name = "peer")] + Peer { + #[command(subcommand)] + peer_cmd: CliPeerCommand + }, + + /// Block related Cmd + #[command(name = "block")] + Block { + #[command(subcommand)] + block_cmd: CliBlockCommand + }, + + /// Make a Transaction + #[command(name = "tx")] + Transaction(core::Tx), + + /// Start new TcpListner on Addr + #[command(name = "listen")] + StartListner{addr: String}, + + /// Display Node id + #[command(name = "id")] + DebugShowId, + + /// Connect to Seed Nodes + #[command(name = "seed")] + Seeds { + #[command(subcommand)] + seed_cmd: CliSeedCommand + }, + + /// Clear Pane #[command(name = "clear", aliases = ["c"])] Clear{ pane: RenderPane @@ -69,44 +103,32 @@ pub enum CliBlockCommand { output: String } } + +#[derive(Subcommand)] +pub enum CliPingCommand { + + /// Ping Peer by Id + #[command(name = "id", aliases = ["i"])] + Id { + #[arg(short, long)] + id: String + }, + + /// Ping Peer by Address + #[command(name = "addr", aliases = ["a", "ad"])] + Addr { + #[arg(short, long)] + addr: String + } +} + + #[derive(Subcommand)] #[command(name = "node")] #[command(about = "A blockchain node CLI tool")] #[command(version = "1.0")] #[command(long_about = "A comprehensive CLI tool for managing blockchain nodes, peers, and transactions")] pub enum CliNodeCommand { - /// Peer related Cmd - #[command(name = "peer")] - Peer { - #[command(subcommand)] - peer_cmd: CliPeerCommand - }, - - /// Block related Cmd - #[command(name = "block")] - Block { - #[command(subcommand)] - block_cmd: CliBlockCommand - }, - - /// Make a Transaction - #[command(name = "tx")] - Transaction(core::Tx), - - /// Start new TcpListner on Addr - #[command(name = "listen")] - StartListner{addr: String}, - - /// Display Node id - #[command(name = "id")] - DebugShowId, - - /// Connect to Seed Nodes - #[command(name = "seed")] - Seeds { - #[command(subcommand)] - seed_cmd: CliSeedCommand - }, } #[derive(Parser, Debug)] @@ -120,10 +142,18 @@ pub struct CliArgs { #[arg(short = 'f', long)] pub seed_file: Option, - /// Enable debug mode (alternative syntax) + /// Enable bootstrap mode (alternative syntax) #[arg(short = 'b', long = "bootstrap", action = clap::ArgAction::SetTrue)] pub bootstrap: bool, + /// Enable debug mode (alternative syntax) + #[arg(short = 'd', long = "debug", action = clap::ArgAction::SetTrue)] + pub debug: bool, + + /// Enable rendering (alternative syntax) + #[arg(short = 'r', long = "render", action = clap::ArgAction::Set, default_value = "true")] + pub render: bool, + /// Enable debug mode (alternative syntax) #[arg(short = 's', long = "seed", action = clap::ArgAction::SetTrue)] pub seed: bool, diff --git a/src/cli.rs b/node/src/cli.rs similarity index 55% rename from src/cli.rs rename to node/src/cli.rs index 61a59ac..3a94a00 100644 --- a/src/cli.rs +++ b/node/src/cli.rs @@ -1,4 +1,5 @@ use crate::args::*; +use crate::core::NetworkData; use crate::native_node::node::*; use clap::Parser; use crate::watcher::ExecutorCommand; @@ -7,7 +8,7 @@ pub fn handle_peer_command(cmd: CliPeerCommand) -> NodeCommand { match cmd { CliPeerCommand::List => NodeCommand::ListPeers, CliPeerCommand::Remove { id } => NodeCommand::RemovePeer{ peer_id: id.parse::().unwrap() }, - CliPeerCommand::Connect { addr } => NodeCommand::ConnectToPeer(addr), + CliPeerCommand::Connect { addr } => NodeCommand::ConnectTcpPeer(addr), } } @@ -25,25 +26,26 @@ fn handle_seed_command(cmd: CliSeedCommand) -> NodeCommand { } } -fn node_cli(cmd: CliNodeCommand) -> ExecutorCommand { - match cmd { - CliNodeCommand::Peer { peer_cmd } => ExecutorCommand::Node(handle_peer_command(peer_cmd)), - CliNodeCommand::Block { block_cmd } => ExecutorCommand::Node(handle_block_command(block_cmd)), - CliNodeCommand::Transaction(tx)=> ExecutorCommand::Node(NodeCommand::Transaction{tx}), - CliNodeCommand::DebugShowId => ExecutorCommand::Node(NodeCommand::ShowId), - CliNodeCommand::StartListner { addr } => { - ExecutorCommand::Node(NodeCommand::StartListner(addr.parse().unwrap())) - }, - CliNodeCommand::Seeds { seed_cmd } => ExecutorCommand::Node(handle_seed_command(seed_cmd)), - } +fn handle_ping(cmd: CliPingCommand) -> NodeCommand { + match cmd { + CliPingCommand::Id {id} => NodeCommand::PingId(id), + CliPingCommand::Addr {addr} => NodeCommand::PingAddr(addr) + } } - pub fn cli(input: &[&str]) -> ExecutorCommand { match Cli::try_parse_from(input) { Ok(cmd) => match cmd.command { - CliCommand::Node{ command } => node_cli(command), CliCommand::Clear { pane } => ExecutorCommand::Clear(pane), + CliCommand::Peer { peer_cmd } => ExecutorCommand::Node(handle_peer_command(peer_cmd)), + CliCommand::Block { block_cmd } => ExecutorCommand::Node(handle_block_command(block_cmd)), + CliCommand::Transaction(tx)=> ExecutorCommand::Node(NodeCommand::ProcessNetworkData(NetworkData::Transaction(tx))), + CliCommand::DebugShowId => ExecutorCommand::Node(NodeCommand::ShowId), + CliCommand::StartListner { addr } => { + ExecutorCommand::Node(NodeCommand::StartListner(addr.parse().unwrap())) + }, + CliCommand::Seeds { seed_cmd } => ExecutorCommand::Node(handle_seed_command(seed_cmd)), + CliCommand::Ping { ping_cmd } => ExecutorCommand::Node(handle_ping(ping_cmd)) } Err(e) => ExecutorCommand::InvalidCommand(format!("{e}")) } diff --git a/src/core/block.rs b/node/src/core/block.rs similarity index 83% rename from src/core/block.rs rename to node/src/core/block.rs index d3e7579..dd1ddbb 100644 --- a/src/core/block.rs +++ b/node/src/core/block.rs @@ -1,4 +1,4 @@ -use crate::core; +use crate::core::NetworkData; #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Default)] pub struct BlockHeader { @@ -12,7 +12,7 @@ pub struct BlockHeader { #[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Default)] pub struct Block { pub head: BlockHeader, - pub tx: Vec + pub tx: Vec } impl BlockHeader { @@ -34,13 +34,13 @@ impl BlockHeader { } impl Block { - pub fn new(head: BlockHeader, tx: Vec) -> Self { + pub fn new(head: BlockHeader, tx: Vec) -> Self { Self { head, tx } } pub fn head(&self) -> &BlockHeader { &self.head } - pub fn tx(&self) -> &[core::Tx] { + pub fn tx(&self) -> &[NetworkData] { &self.tx } } diff --git a/src/core/blockchain.rs b/node/src/core/blockchain.rs similarity index 76% rename from src/core/blockchain.rs rename to node/src/core/blockchain.rs index ec818c4..39edf69 100644 --- a/src/core/blockchain.rs +++ b/node/src/core/blockchain.rs @@ -2,6 +2,8 @@ use sha2::Digest; use sha2::Sha256; use vlogger::*; +use crate::core::NetworkData; +use crate::log; use crate::core; use crate::error::{ BlockchainError, TxError }; @@ -9,14 +11,15 @@ use std::collections::HashMap; use std::io::Write; use std::time::UNIX_EPOCH; -const BLOCKCHAIN_ID: &str = "victors-first-blockchain"; +const BLOCKCHAIN_ID: &str = "watch-chain"; pub type Account = String; #[derive(Debug)] pub enum ValidationError { InvalidBlockHash, - InvalidPreviousBlockHash + InvalidPreviousBlockHash, + InvalidBlockJson } #[allow(dead_code)] @@ -25,23 +28,21 @@ pub struct Blockchain { id: String, balances: std::collections::HashMap, blocks: Vec, - tx_mempool: Vec, + mempool: Vec, } #[allow(dead_code)] impl Blockchain { - pub fn open_account(&mut self, tx: core::Tx) -> Result<(), BlockchainError> { - if !tx.is_new_account() { - Err(BlockchainError::InvalidAccountCreation) - } else { - self.add(tx) - } + pub fn add(&mut self, data: NetworkData) -> Result<(), BlockchainError> { + self.apply(&data)?; + self.mempool.push(data); + Ok(()) } - pub fn add(&mut self, tx: core::Tx) -> Result<(), BlockchainError> { - self.apply(&tx)?; - self.tx_mempool.push(tx); - Ok(()) + pub fn hash_network_data(data: &NetworkData) -> String { + match data { + NetworkData::Transaction(tx) => Self::hash_transaction(tx) + } } pub fn hash_transaction(tx: &core::Tx) -> String { @@ -69,10 +70,10 @@ impl Blockchain { next_level } - pub fn calculate_merkle_root(tx: &[core::Tx]) -> String { + pub fn calculate_merkle_root(tx: &[NetworkData]) -> String { let tx_hashes: Vec = tx .iter() - .map(|tx| Blockchain::hash_transaction(tx)) + .map(|tx| Blockchain::hash_network_data(tx)) .collect(); if tx_hashes.is_empty() { @@ -120,7 +121,7 @@ impl Blockchain { } else { "" }; - let merkle_root = Self::calculate_merkle_root(&self.tx_mempool); + let merkle_root = Self::calculate_merkle_root(&self.mempool); let timestamp = std::time::SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); let nonce = 0; let mut new_head = core::BlockHeader { @@ -139,18 +140,13 @@ impl Blockchain { new_head.block_hash = block_hash; - let new_block = core::Block::new(new_head, self.tx_mempool.clone()); + let new_block = core::Block::new(new_head, self.mempool.clone()); self.blocks.push(new_block); self.blocks.last().unwrap().clone() } - pub fn apply(&mut self, tx: &core::Tx) -> Result<(), BlockchainError> { - self.tx_mempool.push(tx.clone()); - match tx.validate() { - Ok(_) => {}, - Err(e) => return Err(BlockchainError::Tx(e)) - } - + fn apply_transaction(&mut self, tx: &core::Tx) -> Result<(), BlockchainError> { + tx.validate()?; if let Some(from_balance) = self.balances.get_mut(tx.from()) { if *from_balance > tx.value() { *from_balance -= tx.value(); @@ -170,16 +166,24 @@ impl Blockchain { self.balances.insert(tx.to().clone(), tx.value()); } } - Ok(()) } - pub fn new(balances: HashMap, blocks: Vec, tx_mempool: Vec) -> Blockchain { + pub fn apply(&mut self, data: &NetworkData) -> Result<(), BlockchainError> { + match &data { + NetworkData::Transaction(tx) => { + self.apply_transaction(tx)?; + } + } + Ok(()) + } + + pub fn new(balances: HashMap, blocks: Vec, mempool: Vec) -> Blockchain { return Self { id: BLOCKCHAIN_ID.to_string(), balances, blocks, - tx_mempool + mempool } } @@ -218,8 +222,9 @@ impl Blockchain { match self.validate_block(&block) { Ok(()) => self.blocks.push(block), Err(e) => match e { - ValidationError::InvalidBlockHash => log!(ERROR, "Invalid Block Hash"), - ValidationError::InvalidPreviousBlockHash => log!(ERROR, "Invalid Previos Block Hash") + ValidationError::InvalidBlockHash => log(msg!(ERROR, "Invalid Block Hash")), + ValidationError::InvalidPreviousBlockHash => log(msg!(ERROR, "Invalid Previos Block Hash")), + ValidationError::InvalidBlockJson => log(msg!(ERROR, "Invalid Block JSON")) } } } @@ -251,15 +256,18 @@ impl Blockchain { Ok(()) } - pub fn build(blocks: Vec) -> Result { - let chain = Blockchain { - blocks, - balances: HashMap::new(), - tx_mempool: vec![], - id: BLOCKCHAIN_ID.to_string(), - }; + pub fn build(blocks: &str) -> Result { + if let Ok(blocks) = serde_json::from_str::>(&blocks) { + let chain = Blockchain { + blocks, + balances: HashMap::new(), + mempool: vec![], + id: BLOCKCHAIN_ID.to_string(), + }; - chain.validate_chain()?; - Ok(chain) + chain.validate_chain()?; + return Ok(chain) + } + Err(ValidationError::InvalidBlockJson) } } diff --git a/src/core/tx.rs b/node/src/core/tx.rs similarity index 91% rename from src/core/tx.rs rename to node/src/core/tx.rs index 56a8f7e..5b7390d 100644 --- a/src/core/tx.rs +++ b/node/src/core/tx.rs @@ -1,6 +1,11 @@ use crate::core::Account; use crate::error::TxError; +#[derive(serde::Deserialize, serde::Serialize, Debug, Clone)] +pub enum NetworkData { + Transaction(Tx) +} + #[derive(serde::Deserialize, serde::Serialize, Debug, clap::Args, Clone)] pub struct Tx { from: Account, diff --git a/src/error.rs b/node/src/error.rs similarity index 54% rename from src/error.rs rename to node/src/error.rs index ca2b51f..564aad0 100644 --- a/src/error.rs +++ b/node/src/error.rs @@ -1,4 +1,5 @@ use thiserror::Error; +use crate::log; #[allow(dead_code)] #[derive(Error, Debug)] @@ -24,32 +25,34 @@ pub enum TxError { UnknownAccount(String) } -pub fn print_error_chain(err: &dyn std::error::Error) -> String { - - let mut err_string = String::from(format!("Error: {}", err)); +pub fn print_error_chain(err: anyhow::Error) { + let mut err_string = String::from(format!("Error: {}\n", err)); let mut source = err.source(); let mut level = 1; while let Some(err) = source { - err_string.push_str(format!(" {}: {}", level, err).as_str()); + err_string.push_str(format!(" {}: {}\n", level, err).as_str()); source = err.source(); level += 1; } - err_string + log(err_string) } -pub fn handle_error(err: BlockchainError) -> String { - match &err { - BlockchainError::Tx(tx) => { - match tx { - TxError::FromEmpty => { } - TxError::ToEmpty => { } - _ => {} - } - } - _ => {} - } - print_error_chain(&err) -} +#[derive(Debug, Error)] +pub enum SystemError +where + T: std::fmt::Debug +{ + #[error("TODO")] + TODO, + #[error("Failed to send message: {message:?}")] + ChannelSendError { message: T }, + + #[error("Channel {message:?} was closed")] + ChannelClosed { message: T }, + + #[error("Event Bus closed")] + EventBusClosed(#[from] tokio::sync::broadcast::error::RecvError) +} diff --git a/node/src/event_bus.rs b/node/src/event_bus.rs new file mode 100644 index 0000000..7ece230 --- /dev/null +++ b/node/src/event_bus.rs @@ -0,0 +1,75 @@ +use once_cell::sync::Lazy; +use tokio::sync::broadcast; +use std::sync::Arc; + +use crate::watcher::renderer::RenderCommand; + +static NETWORK_EVENT_BUS: Lazy>> = Lazy::new(|| Arc::new(EventBus::new())); +static SYSTEM_EVENT_BUS: Lazy>> = Lazy::new(|| Arc::new(EventBus::new())); +static RENDER_CHANNEL: Lazy>> = Lazy::new(|| Arc::new(EventBus::new())); + +pub fn publish_system_event(event: SystemEvent) { + SYSTEM_EVENT_BUS.publish(event); +} + +pub fn subscribe_system_event() -> broadcast::Receiver { + SYSTEM_EVENT_BUS.subscribe() +} + +pub fn publish_network_event(event: NetworkEvent) { + NETWORK_EVENT_BUS.publish(event); +} + +pub fn subscribe_network_event() -> broadcast::Receiver { + NETWORK_EVENT_BUS.subscribe() +} + +pub fn publish_render_event(event: RenderCommand) { + RENDER_CHANNEL.publish(event); +} + +pub fn subscribe_render_event() -> broadcast::Receiver { + RENDER_CHANNEL.subscribe() +} + +struct EventBus +where + T: Clone + std::fmt::Debug +{ + sender: broadcast::Sender, + _receiver: broadcast::Receiver +} + +impl EventBus { + pub fn new() -> Self { + let (sender, receiver) = broadcast::channel(1000); + Self { + sender, + _receiver: receiver + } + } + + pub fn publish(&self, event: T) { + if let Err(e) = self.sender.send(event) { + eprintln!("{e}") + } + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.sender.subscribe() + } +} + +#[derive(Clone, Debug)] +pub enum SystemEvent { + Exit +} + +#[derive(Clone, Debug)] +pub enum NetworkEvent { + SeedConnected(String), + SeedDisconnected(String), + AllSeedsConnected, + BootstrapCompleted, + NodeReady, +} diff --git a/node/src/lib.rs b/node/src/lib.rs new file mode 100644 index 0000000..624d947 --- /dev/null +++ b/node/src/lib.rs @@ -0,0 +1,61 @@ +pub mod native_node { + pub mod node; + pub use node::*; + + pub mod error; + pub use error::*; + +} + +pub mod cli; + +pub mod args; + +pub mod error; + +pub mod event_bus; + +pub mod watcher { + pub mod executor; + pub mod parser; + pub mod renderer; + pub mod watcher; + + pub use watcher::*; + pub use executor::*; + pub use parser::*; + pub use renderer::*; +} + +pub mod protocol { + pub mod message; + pub use message::*; + + pub mod connection; + pub use connection::*; + + pub mod connector; + pub use connector::*; +} + +pub mod core { + pub mod block; + pub use block::*; + + pub mod blockchain; + pub use blockchain::*; + + pub mod tx; + pub use tx::*; +} + +pub mod seeds_constants; + +use crate::watcher::renderer::{RenderPane, RenderCommand}; + +pub fn log(msg: String) { + crate::event_bus::publish_render_event(RenderCommand::RenderStringToPane{ + pane: RenderPane::CliOutput, + str: msg + }) +} diff --git a/node/src/main.rs b/node/src/main.rs new file mode 100644 index 0000000..7c772b3 --- /dev/null +++ b/node/src/main.rs @@ -0,0 +1,33 @@ +use blockchain::watcher::Watcher; +use blockchain::args; +use clap::Parser; + +// src/main.rs +use jemallocator::Jemalloc; + +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +#[tokio::main] +async fn main() { + + let args = args::CliArgs::parse(); + + let mut watcher = Watcher::build() + .file(args.seed_file) + .addr(args.addr) + .seed(args.seed) + .render(args.render) + .bootstrap(args.bootstrap) + .start().await; + + + loop { + if !watcher.poll().await.is_ok_and(|b| b) { + break ; + } + } + + ratatui::restore(); + println!("Hello, world!"); +} diff --git a/src/native_node/error.rs b/node/src/native_node/error.rs similarity index 100% rename from src/native_node/error.rs rename to node/src/native_node/error.rs diff --git a/node/src/native_node/node.rs b/node/src/native_node/node.rs new file mode 100644 index 0000000..1488ac6 --- /dev/null +++ b/node/src/native_node/node.rs @@ -0,0 +1,383 @@ +use crate::core::{self, Blockchain, NetworkData, ValidationError}; +use crate::error::print_error_chain; +use crate::protocol::ProtocolMessage; + +use crate::seeds_constants::SEED_NODES; +use crate::protocol::{Connector, ConnectorCommand}; +use crate::watcher::executor::ExecutorCommand; + +use std::collections::HashMap; +use std::net::SocketAddr; +use vlogger::*; +use tokio::sync::mpsc; +use uuid::Uuid; + +use crate::log; + +#[derive(Debug, Clone)] +pub struct TcpPeer { + pub id: Uuid, + pub addr: SocketAddr, + pub sender: tokio::sync::mpsc::Sender +} + +impl TcpPeer { + pub fn new( + id: Uuid, + addr: SocketAddr, + sender: tokio::sync::mpsc::Sender, + ) -> Self { + Self { + id, + addr, + sender + } + } +} + +#[allow(dead_code)] +pub struct NativeNode { + pub tcp_connector: Option>, + pub id: Uuid, + pub addr: Option, + pub tcp_peers: HashMap, + pub chain: core::Blockchain, + listner_handle: Option>, + exec_tx: mpsc::Sender, + rx: mpsc::Receiver, + tx: mpsc::Sender, +} + +#[derive(Debug)] +pub enum NodeCommand { + AddPeer(TcpPeer), + RemovePeer { peer_id: Uuid }, + ProcessMessage { peer_id: Uuid, message: ProtocolMessage }, + ProcessNetworkData(NetworkData), + StartListner(SocketAddr), + PingAddr(String), + PingId(String), + CreateBlock, + ListBlocks, + ListPeers, + ShowId, + DumpBlocks(String), + ConnectToSeeds, + ConnectTcpPeer(String), + BootStrap, + Exit, +} + +impl NativeNode { + pub fn peer_addresses(&self) -> Vec { + let mut addr: Vec = self.tcp_peers.iter().map(|p| p.1.addr.to_string().parse::().unwrap()).collect(); + if let Some(a) = self.addr { + addr.push(a.clone()); + } + addr + } + + pub fn list_peers(&self) -> String { + let mut ret = String::from("Peer List\n-----------\n"); + for (i, p) in self.tcp_peers.iter().enumerate() { + ret.push_str(format!("Peer #{i}: {}\n", p.1.id).as_str()) + } + ret + } + + pub async fn show_id(&self) { + log(msg!(DEBUG, "Node Id: {}", self.id)) + } + + async fn remove_tcp_peer(&mut self, peer_id: Uuid) { + log(msg!(DEBUG, "Removing Peer {peer_id}")); + self.tcp_peers.remove_entry(&peer_id); + } + + async fn add_tcp_peer(&mut self, peer: TcpPeer) { + log(msg!(DEBUG, "Added Peer from address: {}", peer.addr)); + self.tcp_peers.insert(peer.id, peer); + } + + pub async fn new_with_id( + id: uuid::Uuid, + exec_tx: mpsc::Sender, + addr: Option, + blocks_json: &str, + ) -> Self { + let (tx, rx) = mpsc::channel::(100); + Self { + id, + tcp_peers: HashMap::new(), + chain: Blockchain::build(blocks_json).unwrap_or(Default::default()), + addr, + exec_tx, + listner_handle: None, + tcp_connector: None, + tx, + rx, + } + } + + pub async fn new( + addr: Option, + blocks_json: &str, + exec_tx: mpsc::Sender, + ) -> Self { + let (tx, rx) = mpsc::channel::(100); + Self { + id: Uuid::new_v4(), + tcp_peers: HashMap::new(), + chain: Blockchain::build(blocks_json).unwrap_or(Default::default()), + addr, + exec_tx, + listner_handle: None, + tcp_connector: None, + tx, + rx, + } + } + + pub async fn process_message(&mut self, peer_id: uuid::Uuid, message: &ProtocolMessage) { + + match message { + ProtocolMessage::BootstrapRequest { .. } => { + log(msg!(DEBUG, "Received BootstrapRequest from {peer_id}")); + let peer = &self.tcp_peers[&peer_id]; + let resp = ProtocolMessage::BootstrapResponse { + blocks: serde_json::to_string(&self.chain.blocks().to_vec()).unwrap_or_else(|e| { + log(msg!(WARNING, "Failed to serde Chain for BootstrapResponse: {e}")); + Default::default() + }) + }; + peer.sender.send(resp).await.unwrap(); + log(msg!(DEBUG, "Send BootstrapResponse to {peer_id}")); + }, + ProtocolMessage::BootstrapResponse { blocks } => { + log(msg!(DEBUG, "Received BootstrapResponse from seed")); + self.chain = core::Blockchain::build(blocks).unwrap(); + }, + ProtocolMessage::Ping {peer_id} => { + log(msg!(DEBUG, "Received Ping from {peer_id}")); + let resp = ProtocolMessage::Pong { peer_id: self.id.clone() }; + let peer = &self.tcp_peers[peer_id]; + peer.sender.send(resp).await.unwrap(); + }, + ProtocolMessage::GetPeersRequest { peer_id } => { + log(msg!(DEBUG, "Received GetPeersRequest from {peer_id}")); + let peers = self.peer_addresses(); + let resp = ProtocolMessage::GetPeersResponse { peer_addresses: peers }; + let peer = &self.tcp_peers[peer_id]; + peer.sender.send(resp).await.unwrap(); + } + ProtocolMessage::Block { block, ..} => { + log(msg!(DEBUG, "Received Block from {peer_id}")); + self.chain.add_block(block.clone()) + } + ProtocolMessage::NetworkData { data, ..} => { + log(msg!(DEBUG, "Received NetworkData from {peer_id}")); + self.chain.apply(data).unwrap() + } + _ => { + log(msg!(DEBUG, "TODO: implement this message type")); + } + } + } + + pub async fn send_message_to_peer_addr(&self, addr: SocketAddr, msg: ProtocolMessage) { + if let Some((_, peer)) = self.tcp_peers.iter().find(|(_, v)| v.addr == addr) { + if let Err(e) = peer.sender.send(msg).await { + log(msg!(ERROR, "Error Sending message to peer: {e}")); + } + log(msg!(DEBUG, "Sent BootstrapRequest to seed")); + } else { + log(msg!(ERROR, "Error Sending message to peer: peer not in list")); + } + } + + pub async fn send_message_to_peer_id(&self, id: Uuid, msg: ProtocolMessage) { + if let Some(peer) = self.tcp_peers.get(&id) { + if let Err(e) = peer.sender.send(msg).await { + log(msg!(ERROR, "Error Sending message to peer: {e}")); + } + } + } + + async fn send_message_to_seed(&self, msg: ProtocolMessage) { + for seed in SEED_NODES.iter() { + if let Some(_) = self.tcp_peers.iter().find(|(_, v)| v.addr == *seed) { + self.send_message_to_peer_addr(*seed, msg).await; + return ; + } else { + self.send_message_to_peer_addr(*seed, msg).await; + return ; + } + } + log(msg!(ERROR, "No Seed Nodes Avaliable")); + } + + async fn bootstrap(&mut self) -> Result<(), ValidationError> { + log(msg!(DEBUG, "Bootstrapping")); + + let message = ProtocolMessage::BootstrapRequest{peer_id: self.id, version: "".to_string()}; + self.send_message_to_seed(message).await; + + Ok(()) + } + + async fn broadcast_network_data(&self, data: &NetworkData) { + for (id, peer) in &self.tcp_peers { + let message = ProtocolMessage::NetworkData{peer_id: self.id, data: data.clone()}; + peer.sender.send(message).await.unwrap(); + log(msg!(DEBUG, "Send Transaction message to {id}")); + } + } + + async fn broadcast_block(&self, block: &core::Block) { + for (id, peer) in &self.tcp_peers { + let message = ProtocolMessage::Block { + peer_id: self.id, + height: self.chain.blocks().len() as u64, + block: block.clone() + }; + peer.sender.send(message).await.unwrap(); + log(msg!(DEBUG, "Send Block message to {id}")); + } + } + + pub fn tx(&self) -> mpsc::Sender { + return self.tx.clone() + } + + pub fn exec_tx(&self) -> mpsc::Sender { + return self.exec_tx.clone() + } + + async fn network_data(&mut self, data: &NetworkData) { + match self.chain.apply(data) { + Ok(_) => log(msg!(DEBUG, "NetworkData Applied")), + Err(e) => print_error_chain(e.into()) + }; + } + + async fn connector_cmd(&self, cmd: ConnectorCommand) { + match &self.tcp_connector { + Some(t) => { + match t.send(cmd).await { + Ok(()) => {}, + Err(e) => log(msg!(ERROR, "Failed to Send Command to connector: {}", e)) + } + } + None => log(msg!(ERROR, "No Connector Availiable")) + } + } + + async fn start_connection_listner(&mut self, addr: SocketAddr) { + + log(msg!(DEBUG, "Starting Connection Listener")); + let (con_tx, con_rx) = mpsc::channel::(100); + + self.tcp_connector = Some(con_tx); + + self.listner_handle = Some(tokio::spawn({ + let mut connector = Connector::new(self.id, addr, self.exec_tx(), con_rx); + log(msg!(DEBUG, "Connector Build")); + async move { + connector.start().await + } + })); + + } + + async fn connect_to_seed(&mut self) { + self.connector_cmd(ConnectorCommand::ConnectToTcpSeed(SEED_NODES[0])).await; + } + + pub async fn run(&mut self) { + + if let Some(addr) = self.addr { + self.start_connection_listner(addr).await; + } else { + self.start_connection_listner(SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::new(0,0,0,0)), 8080)).await; + }; + + log(msg!(INFO, "Started Node")); + + while let Some(command) = self.rx.recv().await { + match command { + NodeCommand::BootStrap => { + log(msg!(DEBUG, "Received NodeCommand::BootStrap")); + let _ = self.bootstrap().await; + }, + NodeCommand::StartListner(addr) => { + self.start_connection_listner(addr).await; + }, + NodeCommand::ConnectToSeeds => { + self.connect_to_seed().await; + }, + NodeCommand::ConnectTcpPeer(addr) => { + log(msg!(DEBUG, "Received ConnectToPeer: {addr}")); + if let Ok(addr_sock) = addr.parse::() { + let mes = ConnectorCommand::ConnectToTcpPeer(addr_sock); + self.connector_cmd(mes).await; + } else { + log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}")); + } + }, + NodeCommand::PingAddr(addr) => { + if let Ok(addr_sock) = addr.parse::() { + let mes = ProtocolMessage::Ping { peer_id: self.id }; + self.send_message_to_peer_addr(addr_sock, mes).await; + } else { + log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}")); + } + } + NodeCommand::PingId(id) => { + if let Ok(id) = id.parse::() { + let mes = ProtocolMessage::Ping { peer_id: self.id }; + self.send_message_to_peer_id(id, mes).await; + } else { + log(msg!(ERROR, "Failed to Parse to sock_addr: {id}")); + } + } + NodeCommand::AddPeer(peer) => { + self.add_tcp_peer(peer).await; + }, + NodeCommand::RemovePeer { peer_id } => { + self.remove_tcp_peer(peer_id).await; + } + NodeCommand::ProcessMessage { peer_id, message } => { + self.process_message(peer_id, &message).await; + }, + NodeCommand::ProcessNetworkData(data) => { + self.network_data(&data).await; + self.broadcast_network_data(&data).await; + }, + NodeCommand::CreateBlock => { + log(msg!(DEBUG, "Received CreateBlock Command")); + let block = self.chain.create_block(); + self.broadcast_block(&block).await; + }, + NodeCommand::ListBlocks => { + log(msg!(DEBUG, "Received DebugListBlocks command")); + log(self.chain.list_blocks()); + }, + NodeCommand::ListPeers => { + log(msg!(DEBUG, "Received DebugListPeers command")); + log(self.list_peers()); + }, + NodeCommand::ShowId => { + log(msg!(DEBUG, "Received DebugListBlocks command")); + self.show_id().await; + }, + NodeCommand::DumpBlocks(s) => { + self.chain.dump_blocks(s); + }, + NodeCommand::Exit => { + log(msg!(DEBUG, "Node Exit")); + break ; + } + } + } + } +} diff --git a/node/src/protocol/connection.rs b/node/src/protocol/connection.rs new file mode 100644 index 0000000..d0ef99c --- /dev/null +++ b/node/src/protocol/connection.rs @@ -0,0 +1,92 @@ +use tokio::net; +use crate::protocol::ProtocolMessage; +use crate::watcher::ExecutorCommand; +use crate::native_node::node; +use tokio::sync::mpsc; + +use super::Connector; + +use vlogger::*; + +#[allow(dead_code)] +#[derive(Debug)] +pub struct Connection { + node_id: uuid::Uuid, + peer_id: uuid::Uuid, + stream: net::TcpStream, + exec_tx: mpsc::Sender, + rx: mpsc::Receiver, +} + +impl Connection { + pub fn new( + node_id: uuid::Uuid, + peer_id: uuid::Uuid, + stream: net::TcpStream, + exec_tx: mpsc::Sender, + rx: mpsc::Receiver, + ) -> Self { + Self { + node_id, + peer_id, + stream, + rx, + exec_tx + } + } + + async fn log(&self, msg: String) { + let _ = self.exec_tx.send(ExecutorCommand::Print(msg)).await; + } + + pub async fn start(mut self) { + tokio::spawn(async move { + self.log(msg!(DEBUG, "Started Message Handler for {}", self.peer_id)).await; + + loop { + tokio::select! { + response_result = self.rx.recv() => { + match response_result { + Some(response) => { + if let Err(e) = Connector::send_message(&mut self.stream, &response).await { + self.log(msg!(ERROR, "Failed to send response to {}: {}", self.peer_id, e)).await; + break; + } + }, + None => { + self.log(msg!(DEBUG, "Response channel closed for {}", self.peer_id)).await; + break; + } + } + } + + message_result = Connector::receive_message(&mut self.stream) => { + match message_result { + Ok(message) => { + self.log(msg!(DEBUG, "Received Message from {}", self.peer_id)).await; + + let command = ExecutorCommand::Node(node::NodeCommand::ProcessMessage { + peer_id: self.peer_id, + message: message.clone() + }); + + if self.exec_tx.send(command).await.is_err() { + self.log(msg!(ERROR, "Failed to send command to main thread from {}", self.peer_id)).await; + break; + } + }, + Err(e) => { + self.log(msg!(WARNING, "Connection to {} closed: {}", self.peer_id, e.message)).await; + let cmd = ExecutorCommand::Node(node::NodeCommand::RemovePeer { + peer_id: self.peer_id + }); + self.exec_tx.send(cmd).await.unwrap(); + break; + } + } + } + } + } + }); + } +} diff --git a/node/src/protocol/connector.rs b/node/src/protocol/connector.rs new file mode 100644 index 0000000..01fde84 --- /dev/null +++ b/node/src/protocol/connector.rs @@ -0,0 +1,272 @@ +use anyhow::Context; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net; +use std::net::SocketAddr; +use tokio::sync::mpsc; +use vlogger::*; + +use crate::error::print_error_chain; +use crate::log; + +use thiserror::*; +use super::Connection; +use crate::native_node::error; +use crate::protocol::ProtocolMessage; +use crate::watcher::ExecutorCommand; +use crate::native_node::node; +use crate::event_bus::*; + +pub enum ConnectorCommand { + ConnectToTcpPeer(SocketAddr), + ConnectToTcpSeed(SocketAddr), +} + +pub struct Connector { + node_id: uuid::Uuid, + addr: SocketAddr, + exec_tx: mpsc::Sender, + rx: mpsc::Receiver, +} + +#[derive(Error, Debug)] +pub enum ConnectorError { + #[error("Connection failed")] + ConnectionError(#[from] anyhow::Error), +} + +const MAX_LISTNER_TRIES: usize = 5; + +impl Connector { + + pub fn new( + node_id: uuid::Uuid, + addr: SocketAddr, + exec_tx: mpsc::Sender, + rx: mpsc::Receiver, + ) -> Self { + Self { + node_id, + addr, + exec_tx, + rx + } + } + + pub async fn start(&mut self) { + let mut listner: Option = None; + let mut listner_err = None; + for _ in 0..MAX_LISTNER_TRIES { + match tokio::net::TcpListener::bind(self.addr).await { + Ok(l) => { + log(msg!(DEBUG, "Listening on address: {}", self.addr)); + listner = Some(l); + break ; + } + Err(e) => { + self.addr.set_port(self.addr.port() + 1); + listner_err = Some(e); + } + }; + } + if let Some(listener) = listner { + loop { + tokio::select! { + cmd_result = self.rx.recv() => { + match cmd_result { + Some(cmd) => { + self.execute_cmd(cmd).await; + } + None => { + log(msg!(DEBUG, "Command channel closed")); + break; + } + } + } + accept_result = listener.accept() => { + match accept_result { + Ok((stream, addr)) => { + log(msg!(DEBUG, "Accepted connection from {}", addr)); + self.establish_connection_inbound(stream, addr).await; + } + Err(e) => { + log(msg!(ERROR, "Failed to accept connection: {}", e)); + } + } + } + } + } + } else { + log(msg!(FATAL, "Failed to start TCP Listener: {}", listner_err.unwrap())); + } + } + + async fn execute_cmd(&mut self, cmd: ConnectorCommand) { + match cmd { + ConnectorCommand::ConnectToTcpPeer(addr) => { + self.connect_to_peer(addr).await + } + ConnectorCommand::ConnectToTcpSeed(addr) => { + self.connect_to_seed(addr).await; + } + } + } + + pub async fn connect_to_seed(&self, addr: SocketAddr) { + match net::TcpStream::connect(addr).await + .with_context(|| format!("Connecting to {}", addr)){ + Ok(stream) => { + self.establish_connection_to_seed(stream, addr).await + } + Err(e) => { + // let err = ConnectorError::ConnectionError(e.into()); + print_error_chain(e.into()); + } + } + } + + pub async fn connect_to_peer(&self, addr: SocketAddr) { + match net::TcpStream::connect(addr).await { + Ok(stream) => { + self.establish_connection_outbound(stream, addr).await + } + Err(e) => { + let err = ConnectorError::ConnectionError(e.into()); + print_error_chain(err.into()); + } + } + } + + pub async fn establish_connection_to_seed( + &self, + mut stream: tokio::net::TcpStream, + addr: SocketAddr, + ) { + let handshake = ProtocolMessage::Handshake { + peer_id: self.node_id, + version: "".to_string() + }; + match Connector::send_message(&mut stream, &handshake).await { + Ok(()) => { + if let Ok(mes) = Connector::receive_message(&mut stream).await { + let (ch_tx, ch_rx) = mpsc::channel::(100); + let peer = match mes { + ProtocolMessage::HandshakeAck { peer_id, .. } => { + node::TcpPeer::new(peer_id, addr, ch_tx) + } + _ => { + log(msg!(ERROR, "Invalid Message On Connetion Establishment: {mes}")); + return ; + } + }; + let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone())); + publish_network_event(NetworkEvent::SeedConnected(addr.to_string())); + let _ = self.exec_tx.send(cmd).await; + Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx).start().await; + } + } + Err(e) => print_error_chain(e.into()) + } + } + + async fn establish_connection_outbound( + &self, + mut stream: tokio::net::TcpStream, + addr: SocketAddr, + ) { + let handshake = ProtocolMessage::Handshake { + peer_id: self.node_id, + version: "".to_string() + }; + match Connector::send_message(&mut stream, &handshake).await { + Ok(()) => { + if let Ok(mes) = Connector::receive_message(&mut stream).await { + let (ch_tx, ch_rx) = mpsc::channel::(100); + let peer = match mes { + ProtocolMessage::HandshakeAck { peer_id, .. } => { + node::TcpPeer::new(peer_id, addr, ch_tx) + } + _ => { + log(msg!(ERROR, "Invalid Message On Connetion Establishment: {mes}")); + return ; + } + }; + let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone())); + let _ = self.exec_tx.send(cmd).await; + Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx).start().await; + } + } + Err(e) => print_error_chain(e.into()), + } + } + + async fn establish_connection_inbound( + &self, + mut stream: tokio::net::TcpStream, + addr: SocketAddr, + ) { + if let Ok(mes) = Connector::receive_message(&mut stream).await { + let (ch_tx, ch_rx) = mpsc::channel::(100); + let peer = match mes { + ProtocolMessage::Handshake { peer_id, .. } => { + let ack = ProtocolMessage::HandshakeAck { + peer_id: self.node_id, + version: "".to_string() + }; + match Connector::send_message(&mut stream, &ack).await { + Ok(()) => node::TcpPeer::new(peer_id, addr, ch_tx), + Err(e) => return print_error_chain(e.into()), + } + } + _ => { + log(msg!(ERROR, "Invalid Message On Connetion Establishment: {mes}")); + return ; + } + }; + let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone())); + let _ = self.exec_tx.send(cmd).await; + Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx).start().await; + } + } + + pub async fn send_message(stream: &mut net::TcpStream, message: &ProtocolMessage) -> Result<(), error::NetworkError> { + let json = serde_json::to_string(message) + .map_err(|e| error::NetworkError { message: format!("Failed to serialize: {}", e) })?; + let data = json.as_bytes(); + + let len = data.len() as u32; + stream.write_all(&len.to_be_bytes()).await + .map_err(|e| error::NetworkError { message: format!("Failed to write data: {}", e) })?; + + stream.write_all(data).await + .map_err(|e| error::NetworkError { message: format!("Failed to write data: {}", e) })?; + stream.flush().await + .map_err(|e| error::NetworkError { message: format!("Failed to flush stream: {}", e) })?; + Ok(()) + } + + pub async fn receive_message(stream: &mut tokio::net::TcpStream) -> Result { + let mut len_bytes = [0u8; 4]; + stream.read_exact(&mut len_bytes).await + .map_err(|e| error::NetworkError { message: format!("Failed to read length: {}", e) })?; + + let len = u32::from_be_bytes(len_bytes) as usize; + + if len >= super::message::MAX_MESSAGE_SIZE { + return Err(error::NetworkError{ + message: format!("NetworkError: Inbound Message too large: max = {}, got = {len}", super::message::MAX_MESSAGE_SIZE) + }) + } + + let mut data = vec![0u8; len]; + stream.read_exact(&mut data).await + .map_err(|e| error::NetworkError { message: format!("Failed to read data: {}", e) })?; + + let json = String::from_utf8(data) + .map_err(|e| error::NetworkError { message: format!("Invalid UTF-8: {}", e) })?; + + let message: ProtocolMessage = serde_json::from_str(&json) + .map_err(|e| error::NetworkError { message: format!("JSON parse error: {}", e) })?; + + Ok(message) + } +} diff --git a/node/src/protocol/message.rs b/node/src/protocol/message.rs new file mode 100644 index 0000000..a781e96 --- /dev/null +++ b/node/src/protocol/message.rs @@ -0,0 +1,69 @@ +use crate::core::{self, NetworkData}; +use std::net::SocketAddr; +use std::fmt; + +pub const MAX_MESSAGE_SIZE: usize = 1_000_000; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +pub enum ProtocolMessage { + BootstrapRequest { + peer_id: uuid::Uuid, + version: String + }, + BootstrapResponse { + blocks: String + }, + GetPeersRequest { + peer_id: uuid::Uuid + }, + GetPeersResponse { + peer_addresses: Vec + }, + Handshake { peer_id: uuid::Uuid, version: String }, + HandshakeAck { peer_id: uuid::Uuid, version: String }, + Block { peer_id: uuid::Uuid, height: u64, block: core::Block }, + NetworkData { peer_id: uuid::Uuid, data: NetworkData }, + Ping { peer_id: uuid::Uuid }, + Pong { peer_id: uuid::Uuid }, + Disconnect { peer_id: uuid::Uuid }, +} + +impl fmt::Display for ProtocolMessage { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ProtocolMessage::BootstrapRequest { peer_id, version } => { + write!(f, "BootstrapRequest from {} (v{})", peer_id, version) + } + ProtocolMessage::BootstrapResponse { blocks } => { + write!(f, "BootstrapResponse with {} blocks", blocks.len()) + } + ProtocolMessage::GetPeersRequest { peer_id } => { + write!(f, "GetPeersRequest from {}", peer_id) + } + ProtocolMessage::GetPeersResponse { peer_addresses } => { + write!(f, "GetPeersResponse with {} peers", peer_addresses.len()) + } + ProtocolMessage::Handshake { peer_id, version } => { + write!(f, "Handshake from {} (v{})", peer_id, version) + } + ProtocolMessage::HandshakeAck { peer_id, version } => { + write!(f, "HandshakeAck from {} (v{})", peer_id, version) + } + ProtocolMessage::Block { peer_id, height, block: _ } => { + write!(f, "Block #{} from {}", height, peer_id) + } + ProtocolMessage::NetworkData { peer_id, data: _ } => { + write!(f, "NetworkData from {}", peer_id) + } + ProtocolMessage::Ping { peer_id } => { + write!(f, "Ping from {}", peer_id) + } + ProtocolMessage::Pong { peer_id } => { + write!(f, "Pong from {}", peer_id) + } + ProtocolMessage::Disconnect { peer_id } => { + write!(f, "Disconnect from {}", peer_id) + } + } + } +} diff --git a/src/seeds_constants.rs b/node/src/seeds_constants.rs similarity index 100% rename from src/seeds_constants.rs rename to node/src/seeds_constants.rs diff --git a/src/watcher/executor.rs b/node/src/watcher/executor.rs similarity index 54% rename from src/watcher/executor.rs rename to node/src/watcher/executor.rs index f0438d6..9e58d82 100644 --- a/src/watcher/executor.rs +++ b/node/src/watcher/executor.rs @@ -1,4 +1,5 @@ -use crate::{native_node::node::NodeCommand, watcher::{ watcher::Watcher, renderer::* }}; +use crate::{event_bus::publish_render_event, log, native_node::node::NodeCommand, watcher::renderer::*}; +use thiserror::Error; use tokio::sync::mpsc; use vlogger::*; @@ -13,25 +14,35 @@ pub enum ExecutorCommand { Exit } +#[derive(Debug, Error)] +pub enum InProcessError { + #[error("TODO: {0}")] + TODO(String), +} + pub struct Executor { - render_tx: mpsc::Sender, node_tx: mpsc::Sender, rx: mpsc::Receiver, exit: bool } impl Executor { - pub fn new(render_tx: mpsc::Sender, node_tx: mpsc::Sender, rx: mpsc::Receiver) -> Self { + pub fn new(node_tx: mpsc::Sender, rx: mpsc::Receiver) -> Self { Self { - render_tx, node_tx, rx, exit: false } } + pub async fn run(&mut self) { + while !self.exit { + self.listen().await; + } + } + async fn exit(&mut self) { - self.render_string(msg!(DEBUG, "Executor Exit")).await; + log(msg!(DEBUG, "Executor Exit")); self.exit = true } @@ -46,45 +57,22 @@ impl Executor { } async fn handle_node_cmd(&self, cmd: NodeCommand) { - Watcher::log(&self.render_tx, msg!(INFO, "Received Node Command: {cmd:?}")).await; self.send_node_cmd(cmd).await; - - // match cmd { - // NodeCommand::AddPeer { peer_id, addr, sender } => {}, - // NodeCommand::RemovePeer { peer_id } => {}, - // NodeCommand::ProcessMessage { peer_id, message } => {}, - // NodeCommand::Transaction { tx } => {}, - // NodeCommand::CreateBlock => {}, - // NodeCommand::DebugListBlocks => {}, - // NodeCommand::DebugListPeers => {}, - // NodeCommand::DebugShowId => {}, - // NodeCommand::DebugDumpBlocks => {}, - // NodeCommand::ConnectToSeeds => {} - // } - } - - pub async fn render_string(&self, str: String) { - let rd_cmd = RenderCommand::RenderStringToPane{ - str, - pane: RenderPane::CliOutput - }; - let _ = self.render_tx.send(rd_cmd).await; } async fn echo(&self, s: Vec) { - Watcher::log(&self.render_tx, msg!(INFO, "Exec: Recieved echo command")).await; let mut str = s.join(" "); str.push_str("\n"); let rd_cmd = RenderCommand::RenderStringToPane{ str, pane: RenderPane::CliOutput }; - let _ = self.render_tx.send(rd_cmd).await; + publish_render_event(rd_cmd); } async fn clear(&self, p: RenderPane) { let rd_cmd = RenderCommand::ClearPane(p); - let _ = self.render_tx.send(rd_cmd).await; + publish_render_event(rd_cmd); } async fn invalid_command(&self, str: String) { @@ -92,24 +80,18 @@ impl Executor { str, pane: RenderPane::CliOutput }; - let _ = self.render_tx.send(rd_cmd).await; + publish_render_event(rd_cmd); } async fn execute(&mut self, cmd: ExecutorCommand) { match cmd { - ExecutorCommand::NodeResponse(resp) => self.render_string(resp).await, + ExecutorCommand::NodeResponse(resp) => log(resp), ExecutorCommand::Node(n) => self.handle_node_cmd(n).await, ExecutorCommand::Clear(p) => self.clear(p).await, ExecutorCommand::Echo(s) => self.echo(s).await, - ExecutorCommand::Print(s) => self.render_string(s).await, + ExecutorCommand::Print(s) => log(s), ExecutorCommand::InvalidCommand(str) => self.invalid_command(str).await, ExecutorCommand::Exit => self.exit().await, } } - - pub async fn run(&mut self) { - while !self.exit { - self.listen().await; - } - } } diff --git a/src/watcher/parser.rs b/node/src/watcher/parser.rs similarity index 82% rename from src/watcher/parser.rs rename to node/src/watcher/parser.rs index 0ee2b5c..e15c2bb 100644 --- a/src/watcher/parser.rs +++ b/node/src/watcher/parser.rs @@ -36,27 +36,26 @@ impl Parser { } pub async fn run(&mut self) { + self.log(msg!(INFO, "Started Parser")).await; while !self.exit { self.listen().await; } } - pub async fn log(&self, msg: String) { - if let Err(e) = self.exec_tx.send(ExecutorCommand::Echo(vec![msg])).await { + async fn log(&self, msg: String) { + if let Err(e) = self.exec_tx.send(ExecutorCommand::Print(msg)).await { log!(ERROR, "Error response from exec: {e}"); } } async fn listen(&mut self) { if let Ok(Some(mes)) = timeout(Duration::from_millis(400), self.rx.recv()).await { - self.log(msg!(INFO, "Parser: Received message from watcher")).await; match mes { ParserCommand::ParseCmdString(s) => { let argv: Vec<&str> = std::iter::once(" ") .chain(s.split_whitespace()) .collect(); let cmd = cli(&argv); - self.log(msg!(INFO, "Sending command {cmd:?} to exec")).await; let _ = self.exec_tx.send(cmd).await; }, ParserCommand::Exit => { diff --git a/node/src/watcher/renderer.rs b/node/src/watcher/renderer.rs new file mode 100644 index 0000000..7d661a6 --- /dev/null +++ b/node/src/watcher/renderer.rs @@ -0,0 +1,310 @@ +use crossterm::event::KeyCode; +use ratatui::prelude::*; +use ratatui::widgets::Wrap; +use ratatui::{ + buffer::Buffer, + layout::Rect, + symbols::border, + widgets::{Block, Paragraph, Widget, List}, + Frame, +}; + +use vlogger::*; + +use crate::event_bus::subscribe_render_event; +use tokio::time::{timeout, Duration}; +use std::io; + +#[derive(Debug)] +pub struct Renderer { + buffer: String, + exit: bool, + layout: RenderLayout, +} + +#[derive(Debug)] +pub struct Pane { + title: Option, + target: RenderPane, + buffer: RenderBuffer, +} + +#[derive(Debug, PartialEq, Clone, clap::ValueEnum)] +pub enum RenderPane { + #[value(name = "all", aliases = ["a"])] + All, + #[value(aliases = ["i", "in"])] + CliInput, + #[value(aliases = ["o", "out"])] + CliOutput +} + +#[derive(Clone, Debug)] +enum RenderBuffer { + List{ list: Vec, index: usize }, + String(String) +} + +impl Pane { + fn render(&self, area: Rect, buf: &mut Buffer) { + let block = Block::bordered() + .title({ + if let Some(t) = &self.title { + t.clone() + } else { + Default::default() + } + }) + .border_set(border::PLAIN); + match &self.buffer { + RenderBuffer::String(s) => { + let inner_area = block.inner(area); + let content_width = inner_area.width as usize; + let content_height = inner_area.height as usize; + let wrapped_lines = s + .lines() + .map(|line| { + if line.is_empty() { + 1 + } else { + (line.len() + content_width - 1) / content_width + } + }) + .sum::(); + let scroll_offset = if wrapped_lines > content_height { + wrapped_lines - content_height + } else { + 0 + }; + Paragraph::new(s.clone()) + .wrap(Wrap::default()) + .left_aligned() + .block(block) + .scroll((scroll_offset as u16, 0)) + .render(area, buf); + } + RenderBuffer::List { list, .. } => { + let items: Vec = list + .iter() + .map(|s| { + format!("> {s}") + }) + .collect(); + Widget::render(List::new(items).block(block), area, buf); + } + } + } +} + +#[derive(Clone, Debug)] +pub enum RenderCommand { + RenderStringToPane{ + str: String, + pane: RenderPane + }, + RenderInput(KeyCode), + ListMove{ + pane: RenderPane, + index: usize + }, + ChangeLayout(RenderLayoutKind), + ClearPane(RenderPane), + Exit, +} + +#[derive(Debug, Clone)] +pub enum RenderLayoutKind { + Cli, +} + +#[derive(Debug)] +pub struct RenderLayout { + kind: RenderLayoutKind, + panes: Vec, +} + +impl RenderLayoutKind { + pub fn rects(&self, area: Rect) -> std::rc::Rc<[Rect]> { + match self { + Self::Cli => { + Layout::default() + .direction(Direction::Horizontal) + .constraints(vec![ + Constraint::Percentage(30), + Constraint::Percentage(70) + ]) + .split(area) + } + } + } + + pub fn generate(&self) -> RenderLayout { + RenderLayout { + kind: self.clone(), + panes: vec![ + Pane { + title: Some(" Input Pane ".to_string()), + target: RenderPane::CliInput, + buffer: RenderBuffer::List{ + list: vec![String::new()], + index: 0 + } + }, + Pane { + title: Some(" Output Pane ".to_string()), + target: RenderPane::CliOutput, + buffer: RenderBuffer::String(String::new()), + } + ] + } + } +} + +#[allow(dead_code)] +impl Renderer { + pub fn new(layout: RenderLayoutKind) -> Self { + Self { + buffer: String::new(), + exit: false, + layout: layout.generate(), + } + } + + pub async fn run(&mut self) -> io::Result<()> { + self.log(msg!(INFO, "Started Renderer")); + let mut rx = subscribe_render_event(); + let mut terminal = ratatui::init(); + while !self.exit { + terminal.draw(|frame| self.draw(frame))?; + if let Ok(mes) = self.listen(&mut rx).await { + self.apply(mes); + } + } + ratatui::restore(); + Ok(()) + } + + fn log(&mut self, msg: String) { + self.buffer.push_str(&msg) + } + + pub fn draw(&self, frame: &mut Frame) { + frame.render_widget(self, frame.area()); + } + + fn exit(&mut self) { + log!(DEBUG, "Renderer Exit"); + self.exit = true; + } + + fn buffer_extend>(&mut self, input: S) { + self.buffer.push_str(input.as_ref()); + } + + fn input_pane(&mut self) -> Option<&mut Pane> { + self.layout.panes.iter_mut().find(|p| p.target == RenderPane::CliInput) + } + + fn apply(&mut self, mes: RenderCommand) { + match mes { + RenderCommand::RenderInput(k) => { + if let Some(p) = self.input_pane() { + match k { + KeyCode::Char(c) => { + if let RenderBuffer::List{list, index} = &mut p.buffer { + list[*index].push(c); + } + } + KeyCode::Backspace => { + if let RenderBuffer::List{list, index} = &mut p.buffer { + list[*index].pop(); + } + } + KeyCode::Enter => { + if let RenderBuffer::List{list, index} = &mut p.buffer { + list.push(String::new()); + *index += 1; + } + } + _ => {} + } + } + }, + + RenderCommand::ListMove{ pane, index } => { + if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) { + if let RenderBuffer::List { list, index: idx } = &mut p.buffer { + if index > 0 && index < list.len() { + list[*idx] = list[index].clone(); + } + } + } + } + + RenderCommand::RenderStringToPane{ str, pane } => { + if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) { + match &mut p.buffer { + RenderBuffer::List { list, index } => { + list.push(str); + *index += 1; + } + RenderBuffer::String(s) => { + s.push_str(&str) + } + } + } + } + RenderCommand::Exit => { + self.exit(); + } + RenderCommand::ChangeLayout(l) => { + match l { + RenderLayoutKind::Cli => { + self.layout = l.generate(); + } + } + } + RenderCommand::ClearPane(pane) => { + if matches!(pane, RenderPane::All) { + for p in self.layout.panes.iter_mut() { + match &mut p.buffer { + RenderBuffer::List { list, index } => { + list.clear(); + *index = 0; + list.push(String::new()); + } + RenderBuffer::String(s) => s.clear() + } + } + } else if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) { + match &mut p.buffer { + RenderBuffer::List { list, index } => { + list.clear(); + *index = 0; + list.push(String::new()); + } + RenderBuffer::String(s) => s.clear() + } + } + } + } + } + + async fn listen(&mut self, rx: &mut tokio::sync::broadcast::Receiver) -> Result { + if let Ok(Ok(mes)) = timeout(Duration::from_millis(400), rx.recv()).await { + return Ok(mes); + } + Err(()) + } +} + +impl Widget for &Renderer { + fn render(self, area: Rect, buf: &mut Buffer) { + + let layout = self.layout.kind.rects(area); + + for (i, p) in self.layout.panes.iter().enumerate() { + p.render(layout[i], buf) + } + } +} diff --git a/node/src/watcher/watcher.rs b/node/src/watcher/watcher.rs new file mode 100644 index 0000000..cab54e5 --- /dev/null +++ b/node/src/watcher/watcher.rs @@ -0,0 +1,254 @@ +use crossterm::event::{self, Event, KeyCode, KeyEventKind}; +use memory_stats::memory_stats; +use tokio::sync::mpsc; +use std::{io::{self, Write}, net::SocketAddr, time::Duration}; + +use crate::{event_bus::NetworkEvent, native_node::node::{NativeNode, NodeCommand}}; +use vlogger::*; + +use super::*; + +use crate::log; +use crate::event_bus::publish_render_event; + +#[allow(dead_code)] +pub struct Watcher { + parser_tx: mpsc::Sender, + node_tx: mpsc::Sender, + exec_tx: mpsc::Sender, + cmd_buffer: String, + cmd_history: Vec, + history_index: usize, + handles: Vec> +} + +impl Watcher { + pub fn build() -> WatcherBuilder { + WatcherBuilder::new() + } + + pub fn parser_tx(&self) -> mpsc::Sender { + self.parser_tx.clone() + } + + pub fn exec_tx(&self) -> mpsc::Sender { + self.exec_tx.clone() + } + + pub async fn exit(self) { + let rd_mes = RenderCommand::Exit; + let pr_mes = ParserCommand::Exit; + let exec_mes = ExecutorCommand::Exit; + let node_mes = NodeCommand::Exit; + publish_render_event(rd_mes); + let _ = self.parser_tx.send(pr_mes).await; + let _ = self.exec_tx.send(exec_mes).await; + let _ = self.node_tx.send(node_mes).await; + } + + pub async fn log_memory() { + tokio::spawn(async move { + let id = format!("{}_{}", current_timestamp(), std::process::id()); + let mut path = std::path::PathBuf::new(); + path.push("./proc/"); + path.push(id); + let mut mem_map = std::fs::OpenOptions::new().create(true).write(true).truncate(true).open(path).unwrap(); + loop { + let _ = tokio::time::sleep(Duration::from_secs(30)).await; + if let Some(usage) = memory_stats() { + let current = current_timestamp(); + let _ = mem_map.write_all(msg!(INFO, "{}: Physical memory usage: {} MB", current, usage.physical_mem / 1024 / 1024).as_bytes()); + let _ = mem_map.write_all(msg!(INFO, "{}: Virtual memory usage: {} MB", current, usage.virtual_mem / 1024 / 1024).as_bytes()); + } + } + }); + } + + + pub async fn poll(&mut self) -> io::Result { + match event::read()? { + Event::Key(k) if k.kind == KeyEventKind::Press => { + match k.code { + KeyCode::Char(c) => { + self.cmd_buffer.push(c); + let message = RenderCommand::RenderInput(k.code); + publish_render_event(message); + } + KeyCode::Backspace => { + self.cmd_buffer.pop(); + let message = RenderCommand::RenderInput(k.code); + publish_render_event(message); + }, + KeyCode::Enter => { + let rd_mes = RenderCommand::RenderInput(k.code); + let pr_mes = ParserCommand::ParseCmdString(self.cmd_buffer.clone()); + let _ = self.parser_tx.send(pr_mes).await; + publish_render_event(rd_mes); + self.cmd_buffer.clear(); + } + KeyCode::Up => { + if self.history_index > 0 { + self.history_index -= 1; + let rd_mes = RenderCommand::ListMove{ + pane: RenderPane::CliInput, + index: self.history_index, + }; + publish_render_event(rd_mes); + } + } + KeyCode::Down => { + if self.history_index < self.cmd_buffer.len() { + self.history_index += 1; + let rd_mes = RenderCommand::ListMove{ + pane: RenderPane::CliInput, + index: self.history_index, + }; + publish_render_event(rd_mes); + } + } + KeyCode::Esc => { + return Ok(false); + } + _ => {} + }; + } + _ => {} + } + Ok(true) + } +} + +#[derive(Default)] +pub struct WatcherBuilder { + addr: Option, + seed_file: Option, + bootstrap: bool, + debug: bool, + seed: bool, + render: bool, +} + +impl WatcherBuilder { + fn new() -> Self { + Self::default() + } + + pub fn addr(mut self, addr: Option) -> Self { + self.addr = addr; + self + } + + pub fn file(mut self, seed_file: Option) -> Self { + self.seed_file = seed_file; + self.seed = true; + self + } + + pub fn debug(mut self, debug: bool) -> Self { + self.debug = debug; + self + } + + pub fn bootstrap(mut self, bootstrap: bool) -> Self { + self.bootstrap = bootstrap; + self + } + + pub fn render(mut self, render: bool) -> Self { + self.render = render; + self + } + + pub fn seed(mut self, seed: bool) -> Self { + self.seed = seed; + self + } + + pub async fn start(mut self) -> Watcher { + let (parser_tx, parser_rx) = mpsc::channel::(100); + let (exec_tx, exec_rx) = mpsc::channel::(100); + + + if self.debug { + Watcher::log_memory().await; + } + let render_handle = if self.render { + Some(tokio::spawn({ + async move { + let _ = Renderer::new(RenderLayoutKind::Cli).run().await; + } + })) + } else { + None + }; + let blocks = self.seed_file + .as_ref() + .and_then(|path| std::fs::read_to_string(path).ok()) + .unwrap_or_default(); + + if self.seed { + self.addr = Some(crate::seeds_constants::SEED_NODES[0]); + } + + let mut node = NativeNode::new(self.addr.clone(), &blocks, exec_tx.clone()).await; + log(msg!(INFO, "Build Node")); + + let parser_handle = tokio::spawn({ + let exec_tx = exec_tx.clone(); + async move { + let _ = Parser::new(parser_rx, exec_tx).run().await; + } + }); + + let executor_handle = tokio::spawn({ + let node_tx = node.tx(); + async move { + let _ = Executor::new(node_tx, exec_rx).run().await; + } + }); + + let node_tx = node.tx(); + + let node_handle = tokio::spawn({ + async move { + node.run().await; + } + }); + + if self.bootstrap { + let exec_tx = exec_tx.clone(); + + tokio::spawn(async move { + let seed_cmd = ExecutorCommand::Node(NodeCommand::ConnectToSeeds); + let mut ev_rx = crate::event_bus::subscribe_network_event(); + let _ = exec_tx.send(seed_cmd).await; + + while let Ok(e) = ev_rx.recv().await { + match e { + NetworkEvent::SeedConnected(_) => { + let bootstrap_cmd = ExecutorCommand::Node(NodeCommand::BootStrap); + let _ = exec_tx.send(bootstrap_cmd).await; + } + _ => {} + } + } + }); + } + + Watcher { + node_tx, + cmd_history: Vec::new(), + history_index: 0, + parser_tx, + exec_tx, + cmd_buffer: String::new(), + handles: { + let mut h = vec![parser_handle, executor_handle, node_handle]; + if render_handle.is_some() { + h.push(render_handle.unwrap()); + } + h + } + } + } +} diff --git a/src/core.rs b/src/core.rs deleted file mode 100644 index b8e71de..0000000 --- a/src/core.rs +++ /dev/null @@ -1,7 +0,0 @@ -pub mod block; -pub mod blockchain; -pub mod tx; - -pub use block::*; -pub use blockchain::*; -pub use tx::*; diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index c86badb..0000000 --- a/src/main.rs +++ /dev/null @@ -1,37 +0,0 @@ -pub mod error; -pub mod args; -pub mod core; -pub mod native_node; -pub mod seeds_constants; -pub mod watcher; -pub mod cli; - -use crate::watcher::watcher::Watcher; -use clap::Parser; - -pub async fn log(exec_tx: &tokio::sync::mpsc::Sender, msg: String) { - let cmd = watcher::ExecutorCommand::Print(msg); - let _ = exec_tx.send(cmd).await; -} - -#[tokio::main] -async fn main() { - - let args = args::CliArgs::parse(); - - let mut watcher = Watcher::build() - .file(args.seed_file) - .addr(args.addr) - .seed(args.seed) - .bootstrap(args.bootstrap) - .start().await; - - loop { - if !watcher.poll().await.is_ok_and(|b| b) { - break ; - } - } - - ratatui::restore(); - println!("Hello, world!"); -} diff --git a/src/native_node.rs b/src/native_node.rs deleted file mode 100644 index da02ad9..0000000 --- a/src/native_node.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod node; -pub mod network; -pub mod message; -pub mod error; diff --git a/src/native_node/message.rs b/src/native_node/message.rs deleted file mode 100644 index 6ef85f0..0000000 --- a/src/native_node/message.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::net::SocketAddr; - -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::net; -use crate::native_node::node; -use crate::native_node::error; -use crate::core; - -use vlogger::*; - -#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] -pub enum ProtocolMessage { - BootstrapRequest { - peer_id: uuid::Uuid, - version: String - }, - BootstrapResponse { - blocks: Vec - }, - GetPeersRequest { - peer_id: uuid::Uuid - }, - GetPeersResponse { - peer_addresses: Vec - }, - Handshake { peer_id: uuid::Uuid, version: String }, - Block { peer_id: uuid::Uuid, height: u64, block: core::Block }, - Transaction{ peer_id: uuid::Uuid, tx: core::Tx }, - Ping { peer_id: uuid::Uuid }, - Pong { peer_id: uuid::Uuid }, - Disconnect { peer_id: uuid::Uuid }, -} - -impl node::NativeNode { - - pub async fn send_message(stream: &mut net::TcpStream, message: &ProtocolMessage) -> Result<(), error::NetworkError> { - // Serialize message to JSON - let json = serde_json::to_string(message) - .map_err(|e| error::NetworkError { message: format!("Failed to serialize: {}", e) })?; - let data = json.as_bytes(); - - // Send length prefix (4 bytes) - let len = data.len() as u32; - stream.write_all(&len.to_be_bytes()).await - .map_err(|e| error::NetworkError { message: format!("Failed to write data: {}", e) })?; - - // Send the actual data - stream.write_all(data).await - .map_err(|e| error::NetworkError { message: format!("Failed to write data: {}", e) })?; - stream.flush().await - .map_err(|e| error::NetworkError { message: format!("Failed to flush stream: {}", e) })?; - Ok(()) - } - - pub async fn receive_message(stream: &mut tokio::net::TcpStream) -> Result { - let mut len_bytes = [0u8; 4]; - stream.read_exact(&mut len_bytes).await - .map_err(|e| error::NetworkError { message: format!("Failed to read length: {}", e) })?; - - let len = u32::from_be_bytes(len_bytes) as usize; - - let mut data = vec![0u8; len]; - stream.read_exact(&mut data).await - .map_err(|e| error::NetworkError { message: format!("Failed to read data: {}", e) })?; - - let json = String::from_utf8(data) - .map_err(|e| error::NetworkError { message: format!("Invalid UTF-8: {}", e) })?; - - let message: ProtocolMessage = serde_json::from_str(&json) - .map_err(|e| error::NetworkError { message: format!("JSON parse error: {}", e) })?; - - Ok(message) - } - - pub async fn process_message(&mut self, peer_id: uuid::Uuid, message: &ProtocolMessage) { - - match message { - ProtocolMessage::BootstrapRequest { .. } => { - self.log(msg!(INFO, "Received BootstrapRequest from {peer_id}")).await; - let peer = &self.tcp_peers[&peer_id]; - let resp = ProtocolMessage::BootstrapResponse { - blocks: self.chain.blocks().to_vec() - }; - peer.sender.send(resp).await.unwrap(); - log!(INFO, "Send BootstrapResponse to {peer_id}"); - }, - ProtocolMessage::BootstrapResponse { blocks } => { - self.log(msg!(INFO, "Received BootstrapResponse from seed")).await; - self.chain = core::Blockchain::build(blocks.to_vec()).unwrap(); - }, - ProtocolMessage::Ping {peer_id} => { - log!(INFO, "Received Ping from {peer_id}"); - let resp = ProtocolMessage::Pong { peer_id: self.id.clone() }; - let peer = &self.tcp_peers[peer_id]; - peer.sender.send(resp).await.unwrap(); - }, - ProtocolMessage::GetPeersRequest { peer_id } => { - log!(INFO, "Received GetPeersRequest from {peer_id}"); - let peers = self.peer_addresses(); - let resp = ProtocolMessage::GetPeersResponse { peer_addresses: peers }; - let peer = &self.tcp_peers[peer_id]; - peer.sender.send(resp).await.unwrap(); - } - ProtocolMessage::Block { block, ..} => { - log!(INFO, "Received Block from {peer_id}"); - self.chain.add_block(block.clone()) - } - ProtocolMessage::Transaction { tx, ..} => { - log!(INFO, "Received Transaction from {peer_id}"); - self.chain.apply(tx).unwrap() - } - _ => { - log!(DEBUG, "TODO: implement this message type"); - } - } - } -} diff --git a/src/native_node/network.rs b/src/native_node/network.rs deleted file mode 100644 index 2705466..0000000 --- a/src/native_node/network.rs +++ /dev/null @@ -1,167 +0,0 @@ -use std::net::SocketAddr; - -use crate::native_node::{message, node}; - -use crate::seeds_constants::SEED_NODES; - -use vlogger::*; -use tokio::select; -use tokio::sync::mpsc; - -impl node::NativeNode { - pub async fn connect_to_seeds(&mut self) { - for addr in SEED_NODES.iter() { - if let Some(a)= self.addr { - if *addr != a { - self.connect_to_peer(*addr).await; - } - } - } - } - - pub async fn connect_to_peer( - &self, - addr: SocketAddr, - ) { - if let Ok(stream) = tokio::net::TcpStream::connect(addr).await { - Self::establish_connection(self.id, None, addr, stream, self.tx().clone()).await; - } - } - - - pub async fn accept_connections( - listner: tokio::net::TcpListener, - request_sender: tokio::sync::mpsc::Sender, - node_id: uuid::Uuid - ) { - log!(INFO, "Starting to accept connections"); - - while let Ok((mut stream, addr)) = listner.accept().await { - if let Ok(message) = node::NativeNode::receive_message(&mut stream).await { - match message { - message::ProtocolMessage::Handshake { peer_id, .. } => { - node::NativeNode::establish_connection(node_id, Some(peer_id), addr, stream, request_sender.clone()).await; - }, - _ => { - log!(WARNING, "Invalid Response! expected Handshake, got {:?}", message); - } - } - } - } - - } - - pub async fn establish_connection( - node_id: uuid::Uuid, - peer_id: Option, - addr: SocketAddr, - mut stream: tokio::net::TcpStream, - request_sender: tokio::sync::mpsc::Sender - ) { - - let handshake_response = message::ProtocolMessage::Handshake { - peer_id: node_id.clone(), - version: "".to_string() - }; - - match peer_id { - Some(id) => { - if let Ok(()) = Self::send_message(&mut stream, &handshake_response).await { - let (response_sender, response_receiver) = mpsc::channel::(100); - - let add_peer = node::NodeCommand::AddPeer { - peer_id: id, - addr: addr.clone(), - sender: response_sender - }; - - if let Err(_) = request_sender.send(add_peer).await { - log!(ERROR, "Failed to send AddPeer to {}", addr); - } - - log!(INFO, "Established Connection with {}", addr); - node::NativeNode::start_peer_handler(stream, id, request_sender.clone(), response_receiver).await; - } - } - None => { - if let Ok(mes) = Self::receive_message(&mut stream).await { - let (response_sender, response_receiver) = mpsc::channel::(100); - - match mes { - message::ProtocolMessage::Handshake { peer_id, .. } => { - let add_peer = node::NodeCommand::AddPeer { - peer_id, - addr: addr.clone(), - sender: response_sender - }; - - if let Err(_) = request_sender.send(add_peer).await { - log!(ERROR, "Failed to send AddPeer to {}", addr); - } - - log!(INFO, "Established Connection with {}", addr); - node::NativeNode::start_peer_handler(stream, peer_id, request_sender.clone(), response_receiver).await; - } - _ => { } - } - } - } - } - } - - async fn start_peer_handler( - mut stream: tokio::net::TcpStream, - peer_id: uuid::Uuid, - request_sender: tokio::sync::mpsc::Sender, - mut response_receiver: tokio::sync::mpsc::Receiver - ) { - let peer_id_clone = peer_id.clone(); - - tokio::spawn(async move { - log!(INFO, "Started Message Handler for {}", peer_id_clone); - - loop { - select! { - response_result = response_receiver.recv() => { - match response_result { - Some(response) => { - log!(INFO, "Sending response to {peer_id_clone}: {:#?}", response); - if let Err(e) = node::NativeNode::send_message(&mut stream, &response).await { - log!(ERROR, "Failed to send response to {peer_id_clone}: {}", e); - break; - } - }, - None => { - log!(INFO, "Response channel closed for {peer_id_clone}"); - break; - } - } - } - message_result = node::NativeNode::receive_message(&mut stream) => { - match message_result { - Ok(message) => { - log!(INFO, "Received Message from {peer_id_clone}"); - - let command = node::NodeCommand::ProcessMessage { - peer_id, - message: message.clone() - }; - - if request_sender.send(command).await.is_err() { - log!(ERROR, "Failed to send command to main thread from {peer_id}"); - break; - } - }, - Err(e) => { - log!(WARNING, "Connection to {peer_id_clone} closed: {}", e.message); - let cmd = node::NodeCommand::RemovePeer { peer_id: peer_id_clone.clone() }; - request_sender.send(cmd).await.unwrap(); - break; - } - } - } - } - } - }); - } -} diff --git a/src/native_node/node.rs b/src/native_node/node.rs deleted file mode 100644 index 612c2a6..0000000 --- a/src/native_node/node.rs +++ /dev/null @@ -1,279 +0,0 @@ -use crate::core::{self, Blockchain, ValidationError}; -use crate::native_node::message::ProtocolMessage; - -use crate::seeds_constants::SEED_NODES; -use crate::watcher::executor::ExecutorCommand; - -use std::collections::HashMap; -use std::net::SocketAddr; -use vlogger::*; -use tokio::sync::mpsc; -use uuid::Uuid; - -pub struct TcpPeer { - pub id: Uuid, - pub addr: SocketAddr, - pub sender: tokio::sync::mpsc::Sender -} - -#[allow(dead_code)] -pub struct NativeNode { - pub id: Uuid, - pub addr: Option, - pub tcp_peers: HashMap, - pub chain: core::Blockchain, - listner_handle: Option>, - exec_tx: mpsc::Sender, - rx: mpsc::Receiver, - tx: mpsc::Sender, -} - -#[derive(Debug)] -pub enum NodeCommand { - AddPeer { peer_id: Uuid, addr: SocketAddr, sender: tokio::sync::mpsc::Sender }, - RemovePeer { peer_id: Uuid }, - ProcessMessage { peer_id: Uuid, message: ProtocolMessage }, - Transaction { tx: core::Tx }, - StartListner(SocketAddr), - CreateBlock, - ListBlocks, - ListPeers, - ShowId, - DumpBlocks(String), - ConnectToSeeds, - ConnectToPeer(String), - Exit, -} - -impl NativeNode { - - pub fn peer_addresses(&self) -> Vec { - let mut addr: Vec = self.tcp_peers.iter().map(|p| p.1.addr.to_string().parse::().unwrap()).collect(); - if let Some(a) = self.addr { - addr.push(a.clone()); - } - addr - } - - pub fn list_peers(&self) -> String { - let mut ret = String::from("Peer List\n-----------"); - for (i, p) in self.tcp_peers.iter().enumerate() { - ret.push_str(format!("Peer #{i}: {}", p.1.id).as_str()) - } - ret - } - - pub async fn show_id(&self) { - self.log(msg!(INFO, "Node Id: {}", self.id)).await - } - - async fn remove_tcp_peer(&mut self, peer_id: Uuid) { - self.log(msg!(INFO, "Removing Peer {peer_id}")).await; - self.tcp_peers.remove_entry(&peer_id); - } - - async fn add_tcp_peer(&mut self, id: Uuid, addr: SocketAddr, sender: tokio::sync::mpsc::Sender) { - let peer = TcpPeer { - id, - addr, - sender - }; - - self.log(msg!(INFO, "Added Peer from address: {addr}")).await; - self.tcp_peers.insert(id, peer); - } - - pub async fn new_with_id( - id: uuid::Uuid, - exec_tx: mpsc::Sender, - addr: Option, - blocks: Option>, - ) -> Self { - let (tx, rx) = mpsc::channel::(100); - Self { - id, - tcp_peers: HashMap::new(), - chain: { - if blocks.is_some() { - crate::log(&exec_tx, msg!(INFO, "Building Chain")).await; - Blockchain::build(blocks.unwrap()).unwrap_or(Default::default()) - } else { - Default::default() - } - }, - addr, - exec_tx, - listner_handle: None, - tx, - rx, - } - } - - pub async fn new( - addr: Option, - blocks: Option>, - exec_tx: mpsc::Sender, - ) -> Self { - let (tx, rx) = mpsc::channel::(100); - Self { - id: Uuid::new_v4(), - tcp_peers: HashMap::new(), - chain: { - if blocks.is_some() { - crate::log(&exec_tx, msg!(INFO, "Building Chain")).await; - Blockchain::build(blocks.unwrap()).unwrap_or(Default::default()) - } else { - Default::default() - } - }, - addr, - exec_tx, - listner_handle: None, - tx, - rx, - } - } - - pub async fn send_message_to_peer_addr(&self, addr: SocketAddr, msg: ProtocolMessage) { - if let Some((_, peer)) = self.tcp_peers.iter().find(|(_, v)| v.addr == addr) { - if let Err(e) = peer.sender.send(msg).await { - self.log(msg!(ERROR, "Error Sending message to peer: {e}")).await; - } - } - } - - pub async fn send_message_to_peer_id(&self, id: Uuid, msg: ProtocolMessage) { - if let Some(peer) = self.tcp_peers.get(&id) { - if let Err(e) = peer.sender.send(msg).await { - self.log(msg!(ERROR, "Error Sending message to peer: {e}")).await; - } - } - } - - pub async fn bootstrap(&mut self) -> Result<(), ValidationError> { - self.log(msg!(INFO, "Running As Native Node")).await; - - self.connect_to_peer(SEED_NODES[0]).await; - - let message = ProtocolMessage::BootstrapRequest{peer_id: self.id, version: "".to_string()}; - self.send_message_to_peer_addr(SEED_NODES[0], message).await; - - self.log(msg!(INFO, "Sent BootstrapRequest to seed")).await; - Ok(()) - } - - pub async fn broadcast_transaction(&self, tx: &core::Tx) { - for (id, peer) in &self.tcp_peers { - let message = ProtocolMessage::Transaction{peer_id: self.id, tx: tx.clone()}; - peer.sender.send(message).await.unwrap(); - self.log(msg!(DEBUG, "Send Transaction message to {id}")).await; - } - } - - pub async fn broadcast_block(&self, block: &core::Block) { - for (id, peer) in &self.tcp_peers { - let message = ProtocolMessage::Block { - peer_id: self.id, - height: self.chain.blocks().len() as u64, - block: block.clone() - }; - peer.sender.send(message).await.unwrap(); - self.log(msg!(DEBUG, "Send Block message to {id}")).await; - } - } - - pub fn tx(&self) -> mpsc::Sender { - return self.tx.clone() - } - - async fn start_connection_listner(&mut self, addr: SocketAddr) { - if self.listner_handle.is_some() { - self.listner_handle.as_ref().unwrap().abort(); - self.listner_handle = None; - } - self.log(msg!(INFO, "Listening on address: {}", addr)).await; - - if let Ok(tcp_listner) = tokio::net::TcpListener::bind(addr).await { - let id = self.id.clone(); - let node_tx = self.tx(); - self.listner_handle = Some(tokio::spawn({ - async move { - NativeNode::accept_connections(tcp_listner, node_tx, id).await; - }})) - }; - } - - pub async fn log(&self, msg: String) { - let _ = self.exec_tx.send(ExecutorCommand::Print(msg)).await; - } - - pub async fn transaction(&mut self, tx: &core::Tx) { - match self.chain.apply(tx) { - Ok(_) => self.log(msg!(INFO, "Transaction applied")).await, - Err(e) => self.log(crate::error::print_error_chain(&e)).await - }; - } - - pub async fn run(&mut self) { - - if let Some(a) = self.addr { - self.start_connection_listner(a.clone()).await; - }; - - while let Some(command) = self.rx.recv().await { - match command { - NodeCommand::StartListner(addr) => { - self.start_connection_listner(addr).await; - } - NodeCommand::ConnectToSeeds => { - self.connect_to_seeds().await; - }, - NodeCommand::ConnectToPeer(addr) => { - self.log(msg!(INFO, "Received ConnectToPeer: {addr}")).await; - if let Ok(addr_sock) = addr.parse::() { - self.connect_to_peer(addr_sock).await; - } else { - self.log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}")).await; - } - } - NodeCommand::AddPeer { peer_id, addr, sender } => { - self.add_tcp_peer(peer_id, addr, sender).await; - }, - NodeCommand::RemovePeer { peer_id } => { - self.remove_tcp_peer(peer_id).await; - } - NodeCommand::ProcessMessage { peer_id, message } => { - self.process_message(peer_id, &message).await; - }, - NodeCommand::Transaction { tx } => { - self.transaction(&tx).await; - self.broadcast_transaction(&tx).await; - }, - NodeCommand::CreateBlock => { - self.log(msg!(INFO, "Received CreateBlock Command")).await; - let block = self.chain.create_block(); - self.broadcast_block(&block).await; - }, - NodeCommand::ListBlocks => { - self.log(msg!(INFO, "Received DebugListBlocks command")).await; - self.log(self.chain.list_blocks()).await; - }, - NodeCommand::ListPeers => { - self.log(msg!(INFO, "Received DebugListPeers command")).await; - self.log(msg!(INFO, "{}", self.list_peers())).await; - }, - NodeCommand::ShowId => { - self.log(msg!(INFO, "Received DebugListBlocks command")).await; - self.show_id().await; - }, - NodeCommand::DumpBlocks(s) => { - self.chain.dump_blocks(s); - } - NodeCommand::Exit => { - self.log(msg!(DEBUG, "Node Exit")).await; - break ; - } - } - } - } -} diff --git a/src/watcher.rs b/src/watcher.rs deleted file mode 100644 index a8ecf5e..0000000 --- a/src/watcher.rs +++ /dev/null @@ -1,8 +0,0 @@ -pub mod executor; -pub mod parser; -pub mod renderer; -pub mod watcher; - -pub use executor::*; -pub use parser::*; -pub use renderer::*; diff --git a/src/watcher/renderer.rs b/src/watcher/renderer.rs deleted file mode 100644 index 78aaf74..0000000 --- a/src/watcher/renderer.rs +++ /dev/null @@ -1,231 +0,0 @@ -use crossterm::event::KeyCode; -use ratatui::prelude::*; -use ratatui::widgets::Wrap; -use ratatui::{ - buffer::Buffer, - layout::Rect, - symbols::border, - widgets::{Block, Paragraph, Widget}, - DefaultTerminal, Frame, -}; - -use vlogger::*; - -use tokio::sync::mpsc; -use tokio::time::{timeout, Duration}; -use std::io; - -#[derive(Debug)] -pub struct Renderer { - buffer: String, - exit: bool, - rx: mpsc::Receiver, - layout: RenderLayout -} - -#[derive(Debug, PartialEq)] -pub struct Pane { - title: Option, - target: RenderPane, - layout_index: u8, - buffer: String, -} - -#[derive(Debug, PartialEq, Clone, clap::ValueEnum)] -pub enum RenderPane { - All, - CliInput, - CliOutput -} - -pub enum RenderCommand { - RenderStringToPane{ - str: String, - pane: RenderPane - }, - RenderInput(KeyCode), - ChangeLayout(RenderLayoutKind), - ClearPane(RenderPane), - Exit, -} - -#[derive(Debug, Clone)] -pub enum RenderLayoutKind { - Cli, -} - -#[derive(Debug)] -pub struct RenderLayout { - kind: RenderLayoutKind, - panes: Vec, -} - -impl RenderLayoutKind { - pub fn rects(&self, area: Rect) -> std::rc::Rc<[Rect]> { - match self { - Self::Cli => { - Layout::default() - .direction(Direction::Horizontal) - .constraints(vec![ - Constraint::Percentage(30), - Constraint::Percentage(70) - ]) - .split(area) - } - } - } - - pub fn generate(&self) -> RenderLayout { - RenderLayout { - kind: self.clone(), - panes: vec![ - Pane { - title: Some(" Input Pane ".to_string()), - target: RenderPane::CliInput, - layout_index: 0, - buffer: String::with_capacity(CLI_INPUT_BUFFE_SIZE) + "> ", - }, - Pane { - title: Some(" Output Pane ".to_string()), - target: RenderPane::CliOutput, - layout_index: 1, - buffer: String::with_capacity(CLI_OUTPUT_BUFFE_SIZE), - } - ] - } - } -} - -const CLI_INPUT_BUFFE_SIZE: usize = 4096; -const CLI_OUTPUT_BUFFE_SIZE: usize = 4096; - -#[allow(dead_code)] -impl Renderer { - pub fn new(rx: mpsc::Receiver, layout: RenderLayoutKind) -> Self { - Self { - buffer: String::new(), - rx, - exit: false, - layout: layout.generate() - } - } - pub async fn run(&mut self, terminal: &mut DefaultTerminal) -> io::Result<()> { - while !self.exit { - terminal.draw(|frame| self.draw(frame))?; - self.listen().await? - } - Ok(()) - } - - pub fn draw(&self, frame: &mut Frame) { - frame.render_widget(self, frame.area()); - } - - fn exit(&mut self) { - log!(DEBUG, "Renderer Exit"); - self.exit = true; - } - - fn buffer_extend>(&mut self, input: S) { - self.buffer.push_str(input.as_ref()); - } - - fn input_pane(&mut self) -> Option<&mut Pane> { - self.layout.panes.iter_mut().find(|p| p.target == RenderPane::CliInput) - } - - async fn listen(&mut self) -> io::Result<()> { - if let Ok(Some(mes)) = timeout(Duration::from_millis(400), self.rx.recv()).await { - match mes { - RenderCommand::RenderInput(k) => { - if let Some(p) = self.input_pane() { - match k { - KeyCode::Char(c) => { - p.buffer.push(c); - } - KeyCode::Backspace => { - if !p.buffer.ends_with("> ") { - p.buffer.pop(); - } - } - KeyCode::Enter => { - p.buffer.push_str("\n> "); - } - _ => {} - } - } - }, - RenderCommand::RenderStringToPane{ str, pane } => { - if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) { - p.buffer.push_str(&str); - } - } - RenderCommand::Exit => { - self.exit(); - } - RenderCommand::ChangeLayout(l) => { - match l { - RenderLayoutKind::Cli => { - self.layout = l.generate(); - } - } - } - RenderCommand::ClearPane(pane) => { - if matches!(pane, RenderPane::All) { - for p in self.layout.panes.iter_mut() { - p.buffer.clear(); - } - } else if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) { - p.buffer.clear(); - } - } - } - } - Ok(()) - } -} - -impl Widget for &Renderer { - fn render(self, area: Rect, buf: &mut Buffer) { - - let layout = self.layout.kind.rects(area); - - for p in self.layout.panes.iter() { - let block = Block::bordered() - .title({ - if let Some(t) = &p.title { - t.clone() - } else { - Default::default() - } - }) - .border_set(border::THICK); - let inner_area = block.inner(layout[p.layout_index as usize]); - let content_width = inner_area.width as usize; - let content_height = inner_area.height as usize; - let wrapped_lines = p.buffer - .lines() - .map(|line| { - if line.is_empty() { - 1 - } else { - (line.len() + content_width - 1) / content_width - } - }) - .sum::(); - - let scroll_offset = if wrapped_lines > content_height { - wrapped_lines - content_height - } else { - 0 - }; - - Paragraph::new(p.buffer.clone()) - .wrap(Wrap::default()) - .left_aligned() - .block(block) - .scroll((scroll_offset as u16, 0)) - .render(layout[p.layout_index as usize], buf); - } - } -} diff --git a/src/watcher/watcher.rs b/src/watcher/watcher.rs deleted file mode 100644 index 9c7cacb..0000000 --- a/src/watcher/watcher.rs +++ /dev/null @@ -1,203 +0,0 @@ -use crate::watcher::*; - -use crossterm::event::{self, Event, KeyCode, KeyEventKind}; -use tokio::sync::mpsc; -use std::{io::{self}, net::SocketAddr}; - -use crate::native_node::node::{NativeNode, NodeCommand}; -use vlogger::*; - -#[allow(dead_code)] -pub struct Watcher { - render_tx: mpsc::Sender, - parser_tx: mpsc::Sender, - node_tx: mpsc::Sender, - exec_tx: mpsc::Sender, - cmd_buffer: String, - handles: Vec> -} - -impl Watcher { - pub fn build() -> WatcherBuilder { - WatcherBuilder::new() - } - - pub fn render_tx(&self) -> mpsc::Sender { - self.render_tx.clone() - } - - pub fn parser_tx(&self) -> mpsc::Sender { - self.parser_tx.clone() - } - - pub fn exec_tx(&self) -> mpsc::Sender { - self.exec_tx.clone() - } - - pub async fn log(render_tx: &mpsc::Sender, msg: String) { - let rendermsg = RenderCommand::RenderStringToPane { str: msg, pane: RenderPane::CliOutput }; - if let Err(e) = render_tx.send(rendermsg).await { - log!(FATAL, "Failed to send render command: {e}"); - } - } - - pub async fn exit(self) { - ratatui::restore(); - // for (i, handle) in self.handles.into_iter().enumerate() { - // let _ = handle.await; - // println!("Joined thread #{i}") - // } - } - - pub async fn poll(&mut self) -> io::Result { - match event::read()? { - Event::Key(k) if k.kind == KeyEventKind::Press => { - match k.code { - KeyCode::Char(c) => { - self.cmd_buffer.push(c); - let message = RenderCommand::RenderInput(k.code); - let _ = self.render_tx.send(message).await; - } - KeyCode::Backspace => { - self.cmd_buffer.pop(); - let message = RenderCommand::RenderInput(k.code); - let _ = self.render_tx.send(message).await; - }, - KeyCode::Enter => { - let rd_mes = RenderCommand::RenderInput(k.code); - let pr_mes = ParserCommand::ParseCmdString(self.cmd_buffer.clone()); - let _ = self.parser_tx.send(pr_mes).await; - let _ = self.render_tx.send(rd_mes).await; - self.cmd_buffer.clear(); - } - KeyCode::Esc => { - let rd_mes = RenderCommand::Exit; - let pr_mes = ParserCommand::Exit; - let exec_mes = ExecutorCommand::Exit; - let node_mes = NodeCommand::Exit; - let _ = self.render_tx.send(rd_mes).await; - let _ = self.parser_tx.send(pr_mes).await; - let _ = self.exec_tx.send(exec_mes).await; - let _ = self.node_tx.send(node_mes).await; - return Ok(false); - } - _ => {} - }; - } - _ => {} - } - Ok(true) - } -} - -#[derive(Default)] -pub struct WatcherBuilder { - addr: Option, - seed_file: Option, - bootstrap: bool, - seed: bool, -} - -impl WatcherBuilder { - fn new() -> Self { - Self::default() - } - - pub fn addr(mut self, addr: Option) -> Self { - self.addr = addr; - self - } - - pub fn file(mut self, seed_file: Option) -> Self { - self.seed_file = seed_file; - self.seed = true; - self - } - - pub fn bootstrap(mut self, bootstrap: bool) -> Self { - self.bootstrap = bootstrap; - self - } - - pub fn seed(mut self, seed: bool) -> Self { - self.seed = seed; - self - } - - pub async fn log(render_tx: &mpsc::Sender, msg: String) { - let rendermsg = RenderCommand::RenderStringToPane { str: msg, pane: RenderPane::CliOutput }; - let _ = render_tx.send(rendermsg).await; - } - - - pub async fn start(mut self) -> Watcher { - let (render_tx, render_rx) = mpsc::channel::(100); - let (parser_tx, parser_rx) = mpsc::channel::(100); - let (exec_tx, exec_rx) = mpsc::channel::(100); - - let mut terminal = ratatui::init(); - let render_handle = tokio::spawn({ - async move { - let _ = Renderer::new(render_rx, RenderLayoutKind::Cli).run(&mut terminal).await; - } - }); - let _ = Self::log(&render_tx, msg!(INFO, "Started Renderer")).await; - - let blocks = self.seed_file - .as_ref() - .and_then(|path| std::fs::read_to_string(path).ok()) - .and_then(|content| serde_json::from_str(&content).ok()); - - if self.seed { - self.addr = Some(crate::seeds_constants::SEED_NODES[0]); - } - - let _ = Self::log(&render_tx, msg!(INFO, "Addr: {:?}", self.addr)).await; - - let mut node = NativeNode::new(self.addr.clone(), blocks, exec_tx.clone()).await; - - if self.bootstrap { - let _ = node.bootstrap().await; - } - - let _ = Self::log(&render_tx, msg!(INFO, "Build Node")); - - let parser_handle = tokio::spawn({ - let exec_tx = exec_tx.clone(); - async move { - let _ = Parser::new(parser_rx, exec_tx).run().await; - } - }); - - Watcher::log(&render_tx, msg!(INFO, "Started Parser")).await; - - let executor_handle = tokio::spawn({ - let rend_tx = render_tx.clone(); - let node_tx = node.tx(); - async move { - let _ = Executor::new(rend_tx, node_tx, exec_rx).run().await; - } - }); - - Watcher::log(&render_tx, msg!(INFO, "Started Executor")).await; - - let node_tx = node.tx(); - - let node_handle = tokio::spawn({ - async move { - node.run().await; - } - }); - - Watcher::log(&render_tx, msg!(INFO, "Started Node")).await; - - Watcher { - render_tx, - node_tx, - parser_tx, - exec_tx, - cmd_buffer: String::new(), - handles: vec![parser_handle, render_handle, executor_handle, node_handle] - } - } -} diff --git a/testing/script/.gitignore b/testing/script/.gitignore new file mode 100644 index 0000000..2f7896d --- /dev/null +++ b/testing/script/.gitignore @@ -0,0 +1 @@ +target/ diff --git a/testing/script/Cargo.lock b/testing/script/Cargo.lock new file mode 100644 index 0000000..8abc119 --- /dev/null +++ b/testing/script/Cargo.lock @@ -0,0 +1,284 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "const-random", + "getrandom 0.3.3", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "bitflags" +version = "2.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34efbcccd345379ca2868b2b2c9d3782e9cc58ba87bc7d79d5b53d9c9ae6f25d" + +[[package]] +name = "cfg-if" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fd1289c04a9ea8cb22300a459a72a385d7c73d3259e2ed7dcb2af674838cfa9" + +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.16", + "once_cell", + "tiny-keccak", +] + +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.11.1+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.3+wasi-0.2.4", +] + +[[package]] +name = "instant" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "libc" +version = "0.2.175" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +dependencies = [ + "portable-atomic", +] + +[[package]] +name = "portable-atomic" +version = "1.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f84267b20a16ea918e43c6a88433c2d54fa145c92a811b5b047ccbe153674483" + +[[package]] +name = "proc-macro2" +version = "1.0.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "rhai" +version = "1.22.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2780e813b755850e50b178931aaf94ed24f6817f46aaaf5d21c13c12d939a249" +dependencies = [ + "ahash", + "bitflags", + "instant", + "num-traits", + "once_cell", + "rhai_codegen", + "smallvec", + "smartstring", + "thin-vec", +] + +[[package]] +name = "rhai_codegen" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5a11a05ee1ce44058fa3d5961d05194fdbe3ad6b40f904af764d81b86450e6b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "script" +version = "0.1.0" +dependencies = [ + "rhai", +] + +[[package]] +name = "smallvec" +version = "1.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" + +[[package]] +name = "smartstring" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fb72c633efbaa2dd666986505016c32c3044395ceaf881518399d2f4127ee29" +dependencies = [ + "autocfg", + "static_assertions", + "version_check", +] + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + +[[package]] +name = "syn" +version = "2.0.106" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ede7c438028d4436d71104916910f5bb611972c5cfd7f89b8300a8186e6fada6" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thin-vec" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "144f754d318415ac792f9d69fc87abbbfc043ce2ef041c60f16ad828f638717d" + +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "wasi" +version = "0.14.3+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51ae83037bdd272a9e28ce236db8c07016dd0d50c27038b3f407533c030c95" +dependencies = [ + "wit-bindgen", +] + +[[package]] +name = "wit-bindgen" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "052283831dbae3d879dc7f51f3d92703a316ca49f91540417d38591826127814" + +[[package]] +name = "zerocopy" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/testing/script/Cargo.toml b/testing/script/Cargo.toml new file mode 100644 index 0000000..6ca59cb --- /dev/null +++ b/testing/script/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "script" +version = "0.1.0" +edition = "2024" + +[dependencies] +rhai = "1.22.2" diff --git a/testing/script/src/main.rs b/testing/script/src/main.rs new file mode 100644 index 0000000..953839f --- /dev/null +++ b/testing/script/src/main.rs @@ -0,0 +1,14 @@ +use rhai::{self, EvalAltResult}; + +fn main() -> Result<(), Box> { + let engine = rhai::Engine::new(); + + let script = r#" + (40 / 2); + "#; + + let result = engine.eval::(script)?; + + println!("Answer: {result}"); + Ok(()) +}