From 5658b4e3977cc1ca9130c01a6076360a6b410fbc Mon Sep 17 00:00:00 2001 From: victor Date: Tue, 2 Sep 2025 23:45:16 +0200 Subject: [PATCH] enhanced account and balance --- TODO | 1 + node/.rustfmt.toml | 1 + node/Cargo.lock | 59 ++++++++++ node/Cargo.toml | 3 +- node/proc/15:16:35_93227 | 6 + node/proc/15:17:19_93429 | 4 + node/proc/15:37:10_96980 | 0 node/proc/15:40:52_98073 | 6 + node/src/args.rs | 8 -- node/src/bus/error.rs | 21 ++++ node/src/bus/system.rs | 3 +- node/src/cli.rs | 1 - node/src/core/account.rs | 22 ++++ node/src/core/blockchain.rs | 79 +++++-------- node/src/core/tx.rs | 16 +-- node/src/db/database.rs | 208 ++++++++++++++++++++++++++------- node/src/db/error.rs | 25 ++-- node/src/executor/executor.rs | 23 +++- node/src/lib.rs | 42 ++++--- node/src/node/node.rs | 82 ++++++++----- node/src/protocol/connector.rs | 8 +- node/src/renderer/pane.rs | 4 - node/src/renderer/renderer.rs | 57 +++++---- node/src/watcher/watcher.rs | 23 +++- 24 files changed, 491 insertions(+), 211 deletions(-) create mode 100644 node/.rustfmt.toml create mode 100644 node/proc/15:16:35_93227 create mode 100644 node/proc/15:17:19_93429 create mode 100644 node/proc/15:37:10_96980 create mode 100644 node/proc/15:40:52_98073 create mode 100644 node/src/bus/error.rs create mode 100644 node/src/core/account.rs diff --git a/TODO b/TODO index f96b6ea..a0f21d5 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,6 @@ Non Exaustive list of ideas: +- Add Readme - Scripting with rhai [] - Mouse Input [] - Http server [] diff --git a/node/.rustfmt.toml b/node/.rustfmt.toml new file mode 100644 index 0000000..b196eaa --- /dev/null +++ b/node/.rustfmt.toml @@ -0,0 +1 @@ +tab_spaces = 2 diff --git a/node/Cargo.lock b/node/Cargo.lock index 1c110d1..f3dc0dc 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -177,6 +177,7 @@ dependencies = [ "chrono", "clap", "crossterm 0.29.0", + "futures", "hex", "memory-stats", "once_cell", @@ -398,6 +399,7 @@ dependencies = [ "crossterm_winapi", "derive_more", "document-features", + "futures-core", "mio", "parking_lot 0.12.4", "rustix 1.0.8", @@ -550,12 +552,65 @@ dependencies = [ "winapi", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -574,9 +629,13 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", diff --git a/node/Cargo.toml b/node/Cargo.toml index 0e40796..5aeb52d 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -16,7 +16,7 @@ tokio-tungstenite = "0.27.0" uuid = { version = "1.18.0", features = ["v4", "serde"] } vlogger = { path = "./lib/logger-rs" } ratatui = "0.29.0" -crossterm = "0.29.0" +crossterm = { version = "0.29.0", features = ["event-stream"] } once_cell = "1.21.3" async-trait = "0.1.89" anyhow = "1.0.99" @@ -26,3 +26,4 @@ memory-stats = "1.2.0" textwrap = "0.16.2" sled = "0.34.7" bincode = { version = "2.0.1", features = ["derive", "serde"] } +futures = "0.3.31" diff --git a/node/proc/15:16:35_93227 b/node/proc/15:16:35_93227 new file mode 100644 index 0000000..cde0813 --- /dev/null +++ b/node/proc/15:16:35_93227 @@ -0,0 +1,6 @@ +[INFO ] [watcher.rs :145 ] 15:16:45: Physical memory usage: 32 MB +[INFO ] [watcher.rs :154 ] 15:16:45: Virtual memory usage: 1691 MB +[INFO ] [watcher.rs :145 ] 15:16:55: Physical memory usage: 32 MB +[INFO ] [watcher.rs :154 ] 15:16:55: Virtual memory usage: 1691 MB +[INFO ] [watcher.rs :145 ] 15:17:05: Physical memory usage: 32 MB +[INFO ] [watcher.rs :154 ] 15:17:05: Virtual memory usage: 1691 MB diff --git a/node/proc/15:17:19_93429 b/node/proc/15:17:19_93429 new file mode 100644 index 0000000..e0da935 --- /dev/null +++ b/node/proc/15:17:19_93429 @@ -0,0 +1,4 @@ +[INFO ] [watcher.rs :145 ] 15:17:29: Physical memory usage: 32 MB +[INFO ] [watcher.rs :154 ] 15:17:29: Virtual memory usage: 1691 MB +[INFO ] [watcher.rs :145 ] 15:17:39: Physical memory usage: 32 MB +[INFO ] [watcher.rs :154 ] 15:17:39: Virtual memory usage: 1691 MB diff --git a/node/proc/15:37:10_96980 b/node/proc/15:37:10_96980 new file mode 100644 index 0000000..e69de29 diff --git a/node/proc/15:40:52_98073 b/node/proc/15:40:52_98073 new file mode 100644 index 0000000..28b90f4 --- /dev/null +++ b/node/proc/15:40:52_98073 @@ -0,0 +1,6 @@ +[INFO ] [watcher.rs :145 ] 15:41:02: Physical memory usage: 32 MB +[INFO ] [watcher.rs :154 ] 15:41:02: Virtual memory usage: 1691 MB +[INFO ] [watcher.rs :145 ] 15:41:12: Physical memory usage: 32 MB +[INFO ] [watcher.rs :154 ] 15:41:12: Virtual memory usage: 1691 MB +[INFO ] [watcher.rs :145 ] 15:41:22: Physical memory usage: 32 MB +[INFO ] [watcher.rs :154 ] 15:41:22: Virtual memory usage: 1691 MB diff --git a/node/src/args.rs b/node/src/args.rs index f8e5a37..712554b 100644 --- a/node/src/args.rs +++ b/node/src/args.rs @@ -94,14 +94,6 @@ pub enum CliBlockCommand { #[command(name = "create", aliases = ["c", "new"])] Create, - /// Export Blocks to file - #[command(name = "dump", aliases = ["export"])] - Dump { - /// Output file - #[arg(short, long)] - output: String, - }, - /// Display Block by Hash #[command(name = "display", aliases = ["d"])] #[group(multiple = false)] diff --git a/node/src/bus/error.rs b/node/src/bus/error.rs new file mode 100644 index 0000000..27f453d --- /dev/null +++ b/node/src/bus/error.rs @@ -0,0 +1,21 @@ +use once_cell::sync::Lazy; +use std::sync::Arc; +use tokio::sync::broadcast; + +use super::event_bus::EventBus; +use crate::executor::ExecutorCommand; + +pub enum ErrorEvent { + +} + +static ERROR_BUS: Lazy>> = + Lazy::new(|| Arc::new(EventBus::new())); + +pub fn publish_error(event: ExecutorCommand) { + ERROR_BUS.publish(event); +} + +pub fn subscribe_error_bus() -> broadcast::Receiver { + ERROR_BUS.subscribe() +} diff --git a/node/src/bus/system.rs b/node/src/bus/system.rs index 1056230..f6afddb 100644 --- a/node/src/bus/system.rs +++ b/node/src/bus/system.rs @@ -9,7 +9,8 @@ pub enum SystemEvent { ExecutorStarted, RendererStarted, NodeStarted, - Exit, + Shutdown, + Abort, } static SYSTEM_EVENT_BUS: Lazy>> = Lazy::new(|| Arc::new(EventBus::new())); diff --git a/node/src/cli.rs b/node/src/cli.rs index 17be654..cc79673 100644 --- a/node/src/cli.rs +++ b/node/src/cli.rs @@ -18,7 +18,6 @@ pub fn handle_peer_command(cmd: CliPeerCommand) -> NodeCommand { pub fn handle_block_command(cmd: CliBlockCommand) -> NodeCommand { match cmd { CliBlockCommand::List => NodeCommand::ListBlocks, - CliBlockCommand::Dump { output } => NodeCommand::DumpBlocks(output), CliBlockCommand::Create => NodeCommand::CreateBlock, CliBlockCommand::Display { key, height } => match (key, height) { (Some(k), _) => return NodeCommand::DisplayBlockByKey(k), diff --git a/node/src/core/account.rs b/node/src/core/account.rs new file mode 100644 index 0000000..684784a --- /dev/null +++ b/node/src/core/account.rs @@ -0,0 +1,22 @@ +pub type Address = String; + +#[derive(Debug, bincode::Decode, bincode::Encode)] +pub struct Account { + address: Address, + balance: u64, + nonce: u64, +} + +impl Account { + pub fn nonce(&self) -> u64 { + self.nonce + } + + pub fn balance(&self) -> u64 { + self.balance + } + + pub fn address(&self) -> &Address { + &self.address + } +} diff --git a/node/src/core/blockchain.rs b/node/src/core/blockchain.rs index 25681be..07023e4 100644 --- a/node/src/core/blockchain.rs +++ b/node/src/core/blockchain.rs @@ -1,4 +1,7 @@ use std::sync::Arc; +use std::time::UNIX_EPOCH; + +use thiserror::*; use crate::core; use crate::core::ChainData; @@ -10,15 +13,10 @@ use crate::log; use super::hasher::Hasher; -use std::collections::HashMap; -use std::time::UNIX_EPOCH; -use vlogger::*; - -use thiserror::*; #[allow(dead_code)] #[derive(Error, Debug)] pub enum BlockchainError { - #[error("Blockchain initialisation failed")] + #[error("Database operation failed")] Database(#[from] DatabaseError), #[error("invalid account creation")] @@ -30,14 +28,15 @@ pub enum BlockchainError { #[error("Validation Error")] Validation(#[from] ValidationError), + #[error("Insufficient fonds on address {0}")] + InsufficientFunds(String), + #[error("Block Creation Error")] BlockCreation, } const BLOCKCHAIN_ID: &str = "watch-chain"; -pub type Account = String; - #[derive(Debug, thiserror::Error)] pub enum ValidationError { #[error("Invalid Block Hash Detected at height {0}")] @@ -52,27 +51,12 @@ pub enum ValidationError { #[derive(Debug)] pub struct Blockchain { id: String, - balances: std::collections::HashMap, mempool: Vec, db: database::ChainDb, } #[allow(dead_code)] impl Blockchain { - pub fn add(&mut self, data: ChainData) -> Result<(), BlockchainError> { - self.apply(data.clone())?; - self.mempool.push(data); - Ok(()) - } - - fn acc_exists(&self, acc: &Account) -> bool { - self.balances.iter().find(|(k, _)| *k == acc).is_some() - } - - pub fn dump_blocks(&self, path: String) { - log(msg!(DEBUG, "TODO: implement block export")); - } - fn hash_transaction_pool(&self) -> Vec { self .mempool @@ -125,32 +109,23 @@ impl Blockchain { fn apply_transaction(&mut self, tx: &core::Tx) -> Result<(), BlockchainError> { tx.validate()?; - return Ok(()); - if let Some(from_balance) = self.balances.get_mut(tx.from()) { - if *from_balance > tx.value() { - *from_balance -= tx.value(); - } else { - return Err(BlockchainError::Tx(TxError::FromInsuffitientFonds)); - } - } else { - return Err(BlockchainError::Tx(TxError::UnknownAccount( - tx.from().to_string(), - ))); + let from = tx.from(); + let to = tx.to(); + let from_balance = self.db.get_balance(tx.from())?; + let to_balance = self.db.get_balance(tx.to())?; + + if tx.value() > from_balance { + return Err(BlockchainError::InsufficientFunds(tx.from().to_string())) } - if let Some(to_balance) = self.balances.get_mut(&tx.to().to_string()) { - *to_balance += tx.value() - } else { - if self.acc_exists(tx.to()) { - *self.balances.get_mut(tx.to()).unwrap() += tx.value(); - } else { - self.balances.insert(tx.to().clone(), tx.value()); - } - } - Ok(()) + let new_from_balance = from_balance - tx.value(); + let new_to_balance = to_balance + tx.value(); + + let changes = vec![(from, new_from_balance), (to, new_to_balance)]; + Ok(self.db.set_balance_batch(changes)?) } - pub fn apply(&mut self, data: ChainData) -> Result<(), BlockchainError> { + pub fn apply_chain_data(&mut self, data: ChainData) -> Result<(), BlockchainError> { match &data { ChainData::Transaction(tx) => { self.apply_transaction(tx)?; @@ -171,10 +146,6 @@ impl Blockchain { Ok(ret) } - pub fn get_balances(&self) -> &std::collections::HashMap { - &self.balances - } - pub fn blocks(&self) -> Result>, BlockchainError> { Ok(self.db.get_all_blocks()?) } @@ -238,12 +209,18 @@ impl Blockchain { Ok(()) } + pub async fn shutdown(&self) -> Result<(), BlockchainError> { + self.db.dump_mempool(&self.mempool)?; + self.db.shutdown().await?; + Ok(()) + } + pub fn build(path: Option) -> Result { let db = db::ChainDb::new(path).or_else(|e| Err(BlockchainError::Database(e)))?; + let mempool = db.recover_mempool()?; let chain = Blockchain { - balances: HashMap::new(), - mempool: vec![], + mempool, id: BLOCKCHAIN_ID.to_string(), db, }; diff --git a/node/src/core/tx.rs b/node/src/core/tx.rs index 9be6420..2d35be2 100644 --- a/node/src/core/tx.rs +++ b/node/src/core/tx.rs @@ -1,18 +1,18 @@ -use crate::core::Account; use crate::error::TxError; +use super::Address; #[derive( serde::Deserialize, serde::Serialize, Debug, clap::Args, Clone, bincode::Encode, bincode::Decode, )] pub struct Tx { - from: Account, - to: Account, - value: u32, + from: String, + to: String, + value: u64, data: String, } impl Tx { - pub fn new(from: Account, to: Account, value: u32, data: String) -> Self { + pub fn new(from: String, to: String, value: u64, data: String) -> Self { Self { from, to, @@ -37,13 +37,13 @@ impl Tx { pub fn is_reward(&self) -> bool { return self.data == "reward"; } - pub fn from(&self) -> &Account { + pub fn from(&self) -> &Address { &self.from } - pub fn to(&self) -> &Account { + pub fn to(&self) -> &Address { &self.to } - pub fn value(&self) -> u32 { + pub fn value(&self) -> u64 { self.value } pub fn data(&self) -> &str { diff --git a/node/src/db/database.rs b/node/src/db/database.rs index 999168b..de4261a 100644 --- a/node/src/db/database.rs +++ b/node/src/db/database.rs @@ -1,53 +1,73 @@ -use crate::{ - core::{self, Block, ChainData, Hasher}, - db::error::DatabaseError, - error::print_error_chain, - log, -}; use bincode::{self, config::Configuration}; use sled::{self, Batch}; use std::sync::Arc; use vlogger::*; +use crate::{ + core::{self, Block, ChainData, Hasher, Address, Account}, + db::error::DatabaseError, + error::print_error_chain, + log, +}; + static BINCODE_CONFIG: Configuration = bincode::config::standard(); const DB_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/database"); const DB_TREE: &str = "blocks"; -const BLOCK_INDEX: &str = "blocks"; -const CHAIN_DATA_INDEX: &str = "chain_data"; -const DATA_TO_BLOCK_INDEX: &str = "data_to_block"; -const METADATA_INDEX: &str = "metadata"; +const BLOCK_PREFIX: &str = "blocks"; +const CHAIN_DATA_PREFIX: &str = "chain_data"; +const DATA_TO_BLOCK_PREFIX: &str = "data_to_block"; +const METADATA_PREFIX: &str = "metadata"; +const BALANCE_PREFIX: &str = "balance"; +const ACCOUNT_PREFIX: &str = "account"; + +const MEMPOOL_PREFIX: &str = "mempool"; const TIP_KEY: &str = "chain_tip"; const HEIGHT_KEY: &str = "chain_height"; - -const HEIGHT_TO_HASH_INDEX: &str = "height_to_hash"; +const HEIGHT_TO_HASH_PREFIX: &str = "height_to_hash"; #[derive(Debug)] pub struct ChainDb { db: sled::Tree, } -fn data_index(key: &str) -> String { - format!("{}:{}", CHAIN_DATA_INDEX, key) +fn data_prefix(key: &str) -> String { + format!("{}:{}", CHAIN_DATA_PREFIX, key) } -fn data_to_block_index(key: &str) -> String { - format!("{}:{}", DATA_TO_BLOCK_INDEX, key) +fn data_to_block_prefix(key: &str) -> String { + format!("{}:{}", DATA_TO_BLOCK_PREFIX, key) } -fn block_index(key: &str) -> String { - format!("{}:{}", BLOCK_INDEX, key) +fn block_prefix(key: &str) -> String { + format!("{}:{}", BLOCK_PREFIX, key) } -fn metadata_index(key: &str) -> String { - format!("{}:{}", METADATA_INDEX, key) +fn metadata_prefix(key: &str) -> String { + format!("{}:{}", METADATA_PREFIX, key) } -fn height_to_hash_index(height: u64) -> String { - format!("{}:{:020}", HEIGHT_TO_HASH_INDEX, height) +fn height_to_hash_prefix(height: u64) -> String { + format!("{}:{:020}", HEIGHT_TO_HASH_PREFIX, height) +} + +fn balances_prefix(address: &str) -> String { + format!("{}:{}", BALANCE_PREFIX, address) +} + +fn nonce_prefix(address: &str) -> String { + format!("{}:{}", ACCOUNT_PREFIX, address) +} + +fn mempool_prefix(tx_hash: &str) -> String { + format!("{}:{}", MEMPOOL_PREFIX, tx_hash) +} + +fn account_prefix(addr: &str) -> String { + format!("{}:{}", ACCOUNT_PREFIX, addr) } impl ChainDb { @@ -83,12 +103,118 @@ impl ChainDb { } } - pub fn get_block_by_key(&self, block_hash: &str) -> Result, DatabaseError> { - let block_hash = block_index(block_hash); + pub fn get_account(&self, addr: &Address) -> Result, DatabaseError> { + if let Some(bin_acc) = self.db.get(account_prefix(addr))? { + let (acc, _) = bincode::decode_from_slice::(&bin_acc, BINCODE_CONFIG)?; + Ok(Some(acc)) + } else { + Ok(None) + } + } + + pub fn insert_account(&self, acc: &Account) -> Result<(), DatabaseError> { + let mut batch = Batch::default(); + if let Some(_) = self.get_account(acc.address())? { + return Err(DatabaseError::AccountExists(acc.address().to_string())); + } + let bin_acc = bincode::encode_to_vec(acc, BINCODE_CONFIG)?; + let bin_nonce = bincode::encode_to_vec(&acc.nonce(), BINCODE_CONFIG)?; + let bin_balance = bincode::encode_to_vec(&acc.balance(), BINCODE_CONFIG)?; + batch.insert(account_prefix(acc.address()).as_str(), bin_acc); + batch.insert(nonce_prefix(acc.address()).as_str(), bin_nonce); + batch.insert(balances_prefix(acc.address()).as_str(), bin_balance); + Ok(self.db.apply_batch(batch)?) + } + + pub fn recover_mempool(&self) -> Result, DatabaseError> { + let mem_tree = self.db.scan_prefix(MEMPOOL_PREFIX); + let mut mem_vec = vec![]; + for mem in mem_tree { + if let Ok((_, value)) = mem { + let (value, _) = bincode::decode_from_slice::(&value, BINCODE_CONFIG)?; + mem_vec.push(value); + } else { + return Err(DatabaseError::Corruption("Failed to recover Mempool".to_string())) + } + } + Ok(mem_vec) + } + + pub fn dump_mempool(&self, mempool: &[ChainData]) -> Result<(), DatabaseError> { + let mut batch = Batch::default(); + for mem in mempool { + let data_hash = Hasher::hash_chain_data(&mem); + let bin_data = bincode::encode_to_vec(&mem, BINCODE_CONFIG)?; + let key = mempool_prefix(&data_hash); + batch.insert(key.as_str(), bin_data); + } + self.db.apply_batch(batch)?; + Ok(()) + } + + pub async fn shutdown(&self) -> Result { + Ok(self.db.flush_async().await?) + } + + pub fn set_balance(&self, address: &Address, new_balance: u64) -> Result<(), DatabaseError> { + let mut batch = Batch::default(); + let account_nonce = self.get_nonce(address)?; + let account_nonce = account_nonce + 1; + let bin_balance = bincode::encode_to_vec(new_balance, BINCODE_CONFIG)?; + let bin_nonce = bincode::encode_to_vec(account_nonce, BINCODE_CONFIG)?; + batch.insert(balances_prefix(address).as_str(), bin_balance); + batch.insert(nonce_prefix(address).as_str(), bin_nonce); + Ok(self.db.apply_batch(batch)?) + } + + fn set_nonce(&self, address: &Address, nonce: u64) -> Result<(), DatabaseError> { + let bin_nonce = bincode::encode_to_vec(&nonce, BINCODE_CONFIG)?; + let address = nonce_prefix(address); + self.db.insert(address, bin_nonce)?; + Ok(()) + } + + fn get_nonce(&self, address: &Address) -> Result { + match self.db.get(nonce_prefix(address))? { + Some(n) => { + let (nonce, _) = bincode::decode_from_slice::(&n, BINCODE_CONFIG)?; + Ok(nonce) + }, + None => Err(DatabaseError::AccountNotFound(address.to_string())) + } + } + + pub fn set_balance_batch(&self, changes: Vec<(&Address, u64)>) -> Result<(), DatabaseError> { + let mut batch = Batch::default(); + + for (address, amount) in changes { + let account_nonce = self.get_nonce(address)?; + let account_nonce = account_nonce + 1; + let balance_key = balances_prefix(&address); + let bin_amount = bincode::encode_to_vec(amount, BINCODE_CONFIG)?; + let bin_nonce = bincode::encode_to_vec(&account_nonce, BINCODE_CONFIG)?; + batch.insert(nonce_prefix(address).as_str(), bin_nonce); + batch.insert(balance_key.as_str(), bin_amount); + } + Ok(self.db.apply_batch(batch)?) + } + + pub fn get_balance(&self, address: &Address) -> Result { + match self.db.get(balances_prefix(address))? { + Some(b) => { + let (balance, _) = bincode::decode_from_slice::(&b, BINCODE_CONFIG)?; + Ok(balance) + }, + None => Err(DatabaseError::AccountNotFound(address.to_string())), + } + } + + pub fn get_block_by_key(&self, block_hash: &str) -> Result>, DatabaseError> { + let block_hash = block_prefix(block_hash); if let Some(bin_block) = self.db.get(block_hash)? { let (block, _size) = bincode::decode_from_slice::(&bin_block, BINCODE_CONFIG) .map_err(|e| DatabaseError::Decode(e))?; - Ok(Some(block)) + Ok(Some(block.into())) } else { Ok(None) } @@ -98,22 +224,19 @@ impl ChainDb { &self, height: u64, ) -> Result>, DatabaseError> { - for result in self.db.scan_prefix(BLOCK_INDEX) { - let (_key, value) = result?; - let (block, _size) = bincode::decode_from_slice::(&value, BINCODE_CONFIG)?; - - if block.head().height == height { - return Ok(Some(Arc::new(block))); - } + if let Some(hash) = self.db.get(height_to_hash_prefix(height))? { + let (hash_str, _) = bincode::decode_from_slice::(&hash, BINCODE_CONFIG)?; + Ok(self.get_block_by_key(&hash_str)?) + } else { + Ok(None) } - Ok(None) } pub fn get_block_data(&self, block: &Block) -> Result>, DatabaseError> { let mut chain_data = Vec::new(); for data_hash in &block.data { - let data_hash = data_index(data_hash); + let data_hash = data_prefix(data_hash); if let Some(bin_data) = self.db.get(&data_hash)? { let (data, _) = bincode::decode_from_slice::(&bin_data, BINCODE_CONFIG)?; chain_data.push(data); @@ -128,7 +251,7 @@ impl ChainDb { pub fn get_all_blocks(&self) -> Result>, DatabaseError> { self .db - .scan_prefix(BLOCK_INDEX) + .scan_prefix(BLOCK_PREFIX) .map(|res| -> Result, DatabaseError> { let (_key, value) = res?; let (block, _size) = bincode::decode_from_slice::(&value, BINCODE_CONFIG) @@ -140,7 +263,7 @@ impl ChainDb { pub fn add_data(&self, data: &core::ChainData) -> Result<(), DatabaseError> { let bin_data = bincode::encode_to_vec(data, BINCODE_CONFIG)?; - let data_hash = data_index(&Hasher::hash_chain_data(data)); + let data_hash = data_prefix(&Hasher::hash_chain_data(data)); self.db.insert(data_hash, bin_data)?; Ok(()) } @@ -148,21 +271,22 @@ impl ChainDb { pub fn add_block(&self, block: &core::Block) -> Result<(), DatabaseError> { let mut db_batch = Batch::default(); let bin_block = bincode::encode_to_vec(block, BINCODE_CONFIG)?; - db_batch.insert(block_index(block.head().block_hash()).as_str(), bin_block); + db_batch.insert(block_prefix(block.head().block_hash()).as_str(), bin_block); db_batch.insert( - height_to_hash_index(block.head().height).as_str(), + height_to_hash_prefix(block.head().height).as_str(), block.head().block_hash(), ); for data in block.data() { db_batch.insert( - data_to_block_index(data.as_str()).as_str(), + data_to_block_prefix(data.as_str()).as_str(), block.head().block_hash(), ); } - db_batch.insert(metadata_index(TIP_KEY).as_str(), block.head().block_hash()); + db_batch.insert(metadata_prefix(TIP_KEY).as_str(), block.head().block_hash()); + let bin_head = bincode::encode_to_vec(&block.head().height, BINCODE_CONFIG)?; db_batch.insert( - metadata_index(HEIGHT_KEY).as_str(), - &block.head().height.to_be_bytes(), + metadata_prefix(HEIGHT_KEY).as_str(), + bin_head ); self.db.apply_batch(db_batch)?; Ok(()) diff --git a/node/src/db/error.rs b/node/src/db/error.rs index b07be5b..7fd342b 100644 --- a/node/src/db/error.rs +++ b/node/src/db/error.rs @@ -2,21 +2,20 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum DatabaseError { - #[error("Database initialization failed")] + #[error("Database initialization failed: {0}")] Init(#[from] std::io::Error), - - #[error("Database read failed")] - Read(#[from] anyhow::Error), - - #[error("Sled Failed")] - Sled(#[from] sled::Error), - - #[error("Database Encode failed")] + #[error("Database operation failed: {0}")] + Operation(#[from] sled::Error), + #[error("Failed to serialize data: {0}")] Encode(#[from] bincode::error::EncodeError), - - #[error("Database Decode failed")] + #[error("Failed to deserialize data: {0}")] Decode(#[from] bincode::error::DecodeError), - - #[error("Missing chain data for hash: {0}")] + #[error("Chain data not found for hash: {0}")] MissingData(String), + #[error("Account already exists: {0}")] + AccountExists(String), + #[error("Account does not exist: {0}")] + AccountNotFound(String), + #[error("Database corruption detected: {0}")] + Corruption(String), } diff --git a/node/src/executor/executor.rs b/node/src/executor/executor.rs index 2a7979a..14700fd 100644 --- a/node/src/executor/executor.rs +++ b/node/src/executor/executor.rs @@ -1,12 +1,12 @@ use crate::{ - bus::{SystemEvent, publish_watcher_event, publish_system_event}, + bus::{publish_system_event, publish_watcher_event, subscribe_system_event, SystemEvent}, log, node::NodeCommand, renderer::RenderTarget, watcher::WatcherCommand, }; use thiserror::Error; -use tokio::sync::mpsc; +use tokio::{select, sync::mpsc}; use vlogger::*; use super::ExecutorCommand; @@ -35,8 +35,21 @@ impl Executor { pub async fn run(&mut self) { publish_system_event(SystemEvent::ExecutorStarted); + let mut sys_rx = subscribe_system_event(); while !self.exit { - self.listen().await; + select! { + _ = self.listen() => {} + event_res = sys_rx.recv() => { + if let Ok(event) = event_res { + match event { + SystemEvent::Shutdown => { + self.exit().await; + } + _ => {} + } + } + } + } } } @@ -62,7 +75,7 @@ impl Executor { async fn echo(&self, s: Vec) { let mut str = s.join(" "); str.push_str("\n"); - let rd_cmd = WatcherCommand::Render(RenderCommand::RenderStringToPaneId { + let rd_cmd = WatcherCommand::Render(RenderCommand::StringToPaneId { str, pane: RenderTarget::CliOutput, }); @@ -70,7 +83,7 @@ impl Executor { } async fn invalid_command(&self, str: String) { - let rd_cmd = WatcherCommand::Render(RenderCommand::RenderStringToPaneId { + let rd_cmd = WatcherCommand::Render(RenderCommand::StringToPaneId { str, pane: RenderTarget::CliOutput, }); diff --git a/node/src/lib.rs b/node/src/lib.rs index 079991e..c578c56 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -12,6 +12,26 @@ pub mod args; pub mod error; +pub mod core { + pub mod block; + pub use block::*; + + pub mod blockchain; + pub use blockchain::*; + + pub mod tx; + pub use tx::*; + + pub mod data; + pub use data::*; + + pub mod hasher; + pub use hasher::*; + + pub mod account; + pub use account::*; +} + pub mod db { pub mod database; pub mod error; @@ -23,9 +43,10 @@ pub mod bus { pub mod event_bus; pub mod network; pub mod watcher; + pub mod error; pub mod system; - pub use executor::*; + pub use error::*; pub mod executor; pub use network::*; pub use watcher::*; @@ -72,29 +93,12 @@ pub mod protocol { pub use connector::*; } -pub mod core { - pub mod block; - pub use block::*; - - pub mod blockchain; - pub use blockchain::*; - - pub mod tx; - pub use tx::*; - - pub mod data; - pub use data::*; - - pub mod hasher; - pub use hasher::*; -} - pub mod seeds_constants; use crate::renderer::{RenderCommand, RenderTarget}; pub fn log(msg: String) { - crate::bus::publish_watcher_event(watcher::WatcherCommand::Render(RenderCommand::RenderStringToPaneId { + crate::bus::publish_watcher_event(watcher::WatcherCommand::Render(RenderCommand::StringToPaneId { pane: RenderTarget::CliOutput, str: msg, })) diff --git a/node/src/node/node.rs b/node/src/node/node.rs index 9f3f413..35bea75 100644 --- a/node/src/node/node.rs +++ b/node/src/node/node.rs @@ -1,4 +1,4 @@ -use crate::bus::{publish_system_event, publish_watcher_event, SystemEvent}; +use crate::bus::{publish_system_event, publish_watcher_event, subscribe_system_event, SystemEvent}; use crate::core::{self, Blockchain, BlockchainError, ChainData, ValidationError}; use crate::error::print_error_chain; use crate::executor::ExecutorCommand; @@ -13,6 +13,7 @@ use std::net::SocketAddr; use std::sync::Arc; use thiserror::*; +use tokio::select; use tokio::sync::mpsc; use uuid::Uuid; use vlogger::*; @@ -74,7 +75,6 @@ pub enum NodeCommand { ListBlocks, ListPeers, ShowId, - DumpBlocks(String), ConnectToSeeds, ConnectTcpPeer(String), BootStrap, @@ -155,6 +155,16 @@ impl Node { } } + async fn shutdown(&mut self) { + if let Some(conn) = &self.tcp_connector { + let res = conn.send(ConnectorCommand::Shutdown).await; + if res.is_err() { + log(msg!(ERROR, "Failed to send shutdown signal to connector")); + } + } + let _ = self.chain.shutdown().await; + } + fn get_blocks(&self) -> Result>, NodeError> { Ok(self.chain.blocks()?) } @@ -226,7 +236,7 @@ impl Node { } ProtocolMessage::ChainData { data, .. } => { log(msg!(DEBUG, "Received ChainData from {peer_id}")); - self.chain.apply(data).unwrap() + self.chain.apply_chain_data(data).unwrap() } _ => { log(msg!(DEBUG, "TODO: implement this message type")); @@ -312,13 +322,6 @@ impl Node { return self.exec_tx.clone(); } - async fn network_data(&mut self, data: ChainData) { - match self.chain.apply(data) { - Ok(_) => log(msg!(DEBUG, "ChainData Applied")), - Err(e) => print_error_chain(&e.into()), - }; - } - async fn connector_cmd(&self, cmd: ConnectorCommand) { match &self.tcp_connector { Some(t) => match t.send(cmd).await { @@ -348,20 +351,7 @@ impl Node { .await; } - pub async fn run(&mut self) { - if let Some(addr) = self.addr { - self.start_connection_listner(addr).await; - } else { - self - .start_connection_listner(SocketAddr::new( - std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), - 8080, - )) - .await; - }; - - publish_system_event(SystemEvent::NodeStarted); - + async fn accept_command(&mut self) { while let Some(command) = self.rx.recv().await { match command { NodeCommand::BootStrap => { @@ -409,7 +399,9 @@ impl Node { self.process_message(peer_id, message).await; } NodeCommand::ProcessChainData(data) => { - self.network_data(data.clone()).await; + if let Err(e) = self.chain.apply_chain_data(data.clone()) { + print_error_chain(&e.into()); + } self.broadcast_network_data(data).await; } NodeCommand::CreateBlock => { @@ -453,9 +445,6 @@ impl Node { log(msg!(DEBUG, "Received DebugListBlocks command")); self.show_id().await; } - NodeCommand::DumpBlocks(s) => { - self.chain.dump_blocks(s); - } NodeCommand::Exit => { log(msg!(DEBUG, "Node Exit")); break; @@ -463,4 +452,41 @@ impl Node { } } } + + pub async fn run(&mut self) { + if let Some(addr) = self.addr { + self.start_connection_listner(addr).await; + } else { + self + .start_connection_listner(SocketAddr::new( + std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)), + 8080, + )) + .await; + }; + let mut system_rx = subscribe_system_event(); + publish_system_event(SystemEvent::NodeStarted); + + loop { + select! { + _ = self.accept_command() => { + + } + event_result = system_rx.recv() => { + match event_result { + Ok(e) => { + match e { + SystemEvent::Shutdown => { + break; + } + _ => {} + } + } + _ => {} + } + } + } + } + self.shutdown().await; + } } diff --git a/node/src/protocol/connector.rs b/node/src/protocol/connector.rs index 1fdeb24..dad9703 100644 --- a/node/src/protocol/connector.rs +++ b/node/src/protocol/connector.rs @@ -19,6 +19,7 @@ use thiserror::*; pub enum ConnectorCommand { ConnectToTcpPeer(SocketAddr), ConnectToTcpSeed(SocketAddr), + Shutdown, } pub struct Connector { @@ -26,6 +27,7 @@ pub struct Connector { addr: SocketAddr, exec_tx: mpsc::Sender, rx: mpsc::Receiver, + exit: bool, } #[derive(Error, Debug)] @@ -48,6 +50,7 @@ impl Connector { addr, exec_tx, rx, + exit: false, } } @@ -68,7 +71,7 @@ impl Connector { }; } if let Some(listener) = listner { - loop { + while !self.exit { tokio::select! { cmd_result = self.rx.recv() => { match cmd_result { @@ -109,6 +112,9 @@ impl Connector { ConnectorCommand::ConnectToTcpSeed(addr) => { self.connect_to_seed(addr).await; } + ConnectorCommand::Shutdown => { + self.exit = true; + } } } diff --git a/node/src/renderer/pane.rs b/node/src/renderer/pane.rs index 9054c5a..ad6b911 100644 --- a/node/src/renderer/pane.rs +++ b/node/src/renderer/pane.rs @@ -8,9 +8,6 @@ use ratatui::{ symbols::border, widgets::{Block, List, Paragraph, Widget}, }; -use vlogger::{msg, DEBUG}; - -use crate::log; use super::center; @@ -122,7 +119,6 @@ impl Widget for &mut Pane { 0 }; let scroll_offset = self.max_scroll.saturating_sub(self.scroll) as u16; - log(msg!(DEBUG, "idx {idx}")); let list_w = List::new( list .iter() diff --git a/node/src/renderer/renderer.rs b/node/src/renderer/renderer.rs index 933e125..1f500bc 100644 --- a/node/src/renderer/renderer.rs +++ b/node/src/renderer/renderer.rs @@ -26,14 +26,14 @@ pub struct Renderer { #[derive(Clone, Debug)] pub enum RenderCommand { - RenderStringToPaneId { + StringToPaneId { str: String, pane: RenderTarget, }, - RenderStringToPaneFocused { + StringToPaneFocused { str: String, }, - RenderKeyInput(KeyCode), + KeyInput(KeyCode), ListMove { pane: RenderTarget, index: usize, @@ -53,7 +53,6 @@ pub enum RenderCommand { #[allow(dead_code)] impl Renderer { pub fn new(layout: RenderLayoutKind) -> Self { - todo!("Fix renderer idx in select mode"); Self { buffer: String::new(), exit: false, @@ -151,27 +150,28 @@ impl Renderer { } pub fn handle_arrow_key(&mut self, key: KeyCode) { - match &mut self.mode { - InputMode::Input => {} - InputMode::PopUp(ref content, .., idx) => { - log(msg!(DEBUG, "Received keycode: {key}")); - log(msg!(DEBUG, "idx before: {idx}")); - match key { - KeyCode::Up => { *idx = idx.saturating_sub(1) } - KeyCode::Down => { - if *idx < content.len() { - *idx += 1; - } - } - _ => {} - } - log(msg!(DEBUG, "idx after: {idx}")) - } - } if let Some(pane) = self.focused() { - match &pane.target { + match &mut pane.target { RenderTarget::CliInput => {} RenderTarget::CliOutput => {} + RenderTarget::PopUp => { + match &mut pane.buffer { + RenderBuffer::Select(content, idx) => { + log(msg!(DEBUG, "idx before: {idx}")); + match key { + KeyCode::Up => { *idx = idx.saturating_sub(1) } + KeyCode::Down => { + if *idx < content.len().saturating_sub(1) { + *idx += 1; + } + } + _ => {} + } + log(msg!(DEBUG, "idx after: {idx}")) + } + _ => {} + } + } _ => {} } } @@ -216,6 +216,12 @@ impl Renderer { } } + fn clear_focus(&mut self) { + while let Some(focused) = self.focused() { + focused.focused = false; + } + } + pub fn clear_pane(&mut self, pane: RenderTarget) { if matches!(pane, RenderTarget::All) { for p in self.layout.panes.iter_mut() { @@ -248,7 +254,7 @@ impl Renderer { RenderCommand::MouseClickLeft(x, y) => self.handle_mouse_click_left(x, y, rects), RenderCommand::MouseScrollUp => self.handle_scroll_up(), RenderCommand::MouseScrollDown => self.handle_scroll_down(), - RenderCommand::RenderKeyInput(k) => match k { + RenderCommand::KeyInput(k) => match k { KeyCode::Char(c) => self.handle_char_input(c), KeyCode::Backspace => self.handle_backspace(), KeyCode::Enter => self.handle_enter(), @@ -256,8 +262,8 @@ impl Renderer { _ => {} }, RenderCommand::ListMove { pane, index } => self.list_move(pane, index), - RenderCommand::RenderStringToPaneFocused { str } => self.render_string_to_focused(str), - RenderCommand::RenderStringToPaneId { str, pane } => self.render_string_to_id(str, pane), + RenderCommand::StringToPaneFocused { str } => self.render_string_to_focused(str), + RenderCommand::StringToPaneId { str, pane } => self.render_string_to_id(str, pane), RenderCommand::Exit => self.exit(), RenderCommand::ChangeLayout(l) => self.layout = RenderLayout::generate(l), RenderCommand::ClearPane => self.clear_pane(RenderTarget::All), @@ -269,6 +275,7 @@ impl Renderer { } } InputMode::PopUp(content, title, ..) => { + self.clear_focus(); let pane = Pane::new( Some(title.to_string()), RenderTarget::PopUp, diff --git a/node/src/watcher/watcher.rs b/node/src/watcher/watcher.rs index c829175..cf48022 100644 --- a/node/src/watcher/watcher.rs +++ b/node/src/watcher/watcher.rs @@ -1,4 +1,4 @@ -use crate::{cli::cli, error::print_error_chain, node::node::NodeCommand, watcher::WatcherMode}; +use crate::{bus::{publish_system_event, subscribe_system_event, SystemEvent}, cli::cli, error::print_error_chain, node::node::NodeCommand, watcher::WatcherMode}; use crossterm::event::{Event, EventStream, KeyCode, KeyEventKind, MouseButton, MouseEventKind}; use futures::StreamExt; use memory_stats::memory_stats; @@ -64,8 +64,12 @@ impl Watcher { ) } - fn shutdown(&self) -> io::Result<()> { + async fn shutdown(&mut self) -> io::Result<()> { ratatui::restore(); + let handles = std::mem::take(&mut self.handles); + for handle in handles { + handle.await.unwrap() + } crossterm::execute!( std::io::stdout(), crossterm::event::DisableBracketedPaste, @@ -98,6 +102,7 @@ impl Watcher { let mut ui_rx = subscribe_watcher_event(); let mut render_interval = interval(Duration::from_millis(32)); let mut terminal = ratatui::init(); + let mut system_rx = subscribe_system_event(); self.init()?; @@ -124,12 +129,22 @@ impl Watcher { } } } + event_res = system_rx.recv() => { + if let Ok(event) = event_res { + match event { + SystemEvent::Shutdown => { + break ; + } + _ => {} + } + } + } _ = render_interval.tick() => { terminal.draw(|frame| self.renderer.draw(frame))?; } } } - self.shutdown() + self.shutdown().await } pub fn build() -> WatcherBuilder { @@ -257,7 +272,7 @@ impl Watcher { _ => {} }, Event::Key(k) if k.kind == KeyEventKind::Press => match k.code { - KeyCode::Esc => return Ok(false), + KeyCode::Esc => publish_system_event(SystemEvent::Shutdown), KeyCode::Char(c) => { self.cmd_buffer.push(c); self.renderer.handle_char_input(c)