need to fix pop selection in renderer

This commit is contained in:
victor 2025-08-31 20:58:20 +02:00
parent 97878de2ef
commit df67c432f3
33 changed files with 2344 additions and 2059 deletions

View File

@ -1,134 +1,135 @@
use std::net::SocketAddr;
use crate::core;
use crate::watcher::{RenderLayoutKind, RenderPane};
use crate::renderer::RenderLayoutKind;
use clap::{Parser, Subcommand};
use clap::*;
#[derive(Parser)]
pub struct Cli {
#[command(subcommand)]
pub command: CliCommand,
#[command(subcommand)]
pub command: CliCommand,
}
#[derive(Subcommand)]
pub enum CliCommand {
#[command(name = "ping")]
Ping {
#[command(subcommand)]
ping_cmd: CliPingCommand,
},
#[command(name = "ping")]
Ping {
#[command(subcommand)]
ping_cmd: CliPingCommand,
},
/// Peer related Cmd
#[command(name = "peer")]
Peer {
#[command(subcommand)]
peer_cmd: CliPeerCommand,
},
/// Peer related Cmd
#[command(name = "peer")]
Peer {
#[command(subcommand)]
peer_cmd: CliPeerCommand,
},
/// Block related Cmd
#[command(name = "block")]
Block {
#[command(subcommand)]
block_cmd: CliBlockCommand,
},
/// Block related Cmd
#[command(name = "block")]
Block {
#[command(subcommand)]
block_cmd: CliBlockCommand,
},
/// Make a Transaction
#[command(name = "tx")]
Transaction(core::Tx),
/// Make a Transaction
#[command(name = "tx")]
Transaction(core::Tx),
/// Start new TcpListner on Addr
#[command(name = "listen")]
StartListner { addr: String },
/// Start new TcpListner on Addr
#[command(name = "listen")]
StartListner { addr: String },
/// Display Node id
#[command(name = "id")]
DebugShowId,
/// Display Node id
#[command(name = "id")]
DebugShowId,
/// Connect to Seed Nodes
#[command(name = "seed")]
Seeds {
#[command(subcommand)]
seed_cmd: CliSeedCommand,
},
/// Connect to Seed Nodes
#[command(name = "seed")]
Seeds {
#[command(subcommand)]
seed_cmd: CliSeedCommand,
},
/// Clear Pane
#[command(name = "clear", aliases = ["c"])]
Clear { pane: RenderPane },
/// Clear Pane
#[command(name = "clear", aliases = ["c"])]
Clear,
#[command(name = "layout", aliases = ["lay"])]
Layout { mode: RenderLayoutKind },
/// Set TUI layout
#[command(name = "layout", aliases = ["lay"])]
Layout { mode: RenderLayoutKind },
}
#[derive(Subcommand)]
pub enum CliPeerCommand {
/// Connect To Peer With IpAddr
#[command(name = "connect", aliases = ["c", "con"])]
Connect { addr: String },
/// Connect To Peer With IpAddr
#[command(name = "connect", aliases = ["c", "con"])]
Connect { addr: String },
/// Remove Peer Connection
#[command(name = "remove", aliases = ["rm"])]
Remove { id: String },
/// Remove Peer Connection
#[command(name = "remove", aliases = ["rm"])]
Remove { id: String },
/// List Connected Peers
#[command(name = "list", aliases = ["ls", "l"])]
List,
/// List Connected Peers
#[command(name = "list", aliases = ["ls", "l"])]
List,
}
#[derive(Subcommand)]
pub enum CliSeedCommand {
/// Connect to Seed nodes
#[command(name = "connect", aliases = ["c", "con"])]
Connect,
/// Connect to Seed nodes
#[command(name = "connect", aliases = ["c", "con"])]
Connect,
}
#[derive(Subcommand)]
pub enum CliBlockCommand {
/// List Blocks in Chain
#[command(name = "list", aliases = ["ls", "l"])]
List,
/// List Blocks in Chain
#[command(name = "list", aliases = ["ls", "l"])]
List,
/// Create and Broadcast new Block
#[command(name = "create", aliases = ["c", "new"])]
Create,
/// Create and Broadcast new Block
#[command(name = "create", aliases = ["c", "new"])]
Create,
/// Export Blocks to file
#[command(name = "dump", aliases = ["export"])]
Dump {
/// Output file
#[arg(short, long)]
output: String,
},
/// Export Blocks to file
#[command(name = "dump", aliases = ["export"])]
Dump {
/// Output file
#[arg(short, long)]
output: String,
},
/// Display Block by Hash
#[command(name = "display", aliases = ["d"])]
#[group(required = true, multiple = false)]
Display{
/// Block Hash
#[arg(long)]
key: Option<String>,
/// Block Height
#[arg(long)]
height: Option<u64>
}
/// Display Block by Hash
#[command(name = "display", aliases = ["d"])]
#[group(multiple = false)]
Display {
/// Block Hash
#[arg(long)]
key: Option<String>,
/// Block Height
#[arg(long)]
height: Option<u64>,
},
}
#[derive(Subcommand)]
pub enum CliPingCommand {
/// Ping Peer by Id
#[command(name = "id", aliases = ["i"])]
Id {
#[arg(short, long)]
id: String,
},
/// Ping Peer by Id
#[command(name = "id", aliases = ["i"])]
Id {
#[arg(short, long)]
id: String,
},
/// Ping Peer by Address
#[command(name = "addr", aliases = ["a", "ad"])]
Addr {
#[arg(short, long)]
addr: String,
},
/// Ping Peer by Address
#[command(name = "addr", aliases = ["a", "ad"])]
Addr {
#[arg(short, long)]
addr: String,
},
}
#[derive(Subcommand)]
@ -136,36 +137,36 @@ pub enum CliPingCommand {
#[command(about = "A blockchain node CLI tool")]
#[command(version = "1.0")]
#[command(
long_about = "A comprehensive CLI tool for managing blockchain nodes, peers, and transactions"
long_about = "A comprehensive CLI tool for managing blockchain nodes, peers, and transactions"
)]
pub enum CliNodeCommand {}
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
pub struct CliArgs {
/// Provide address on which node will listen
#[arg(short = 'a', long)]
pub addr: Option<SocketAddr>,
/// Provide address on which node will listen
#[arg(short = 'a', long)]
pub addr: Option<SocketAddr>,
/// Provide File with current chain
#[arg(short = 'd', long)]
pub database: Option<String>,
/// Provide File with current chain
#[arg(short = 'd', long)]
pub database: Option<String>,
/// Enable bootstrap mode (alternative syntax)
#[arg(short = 'b', long = "bootstrap", action = clap::ArgAction::SetTrue)]
pub bootstrap: bool,
/// Enable bootstrap mode (alternative syntax)
#[arg(short = 'b', long = "bootstrap", action = clap::ArgAction::SetTrue)]
pub bootstrap: bool,
/// Enable debug mode (alternative syntax)
#[arg(long = "debug", action = clap::ArgAction::SetTrue)]
pub debug: bool,
/// Enable debug mode (alternative syntax)
#[arg(long = "debug", action = clap::ArgAction::SetTrue)]
pub debug: bool,
/// Enable rendering (alternative syntax)
#[arg(short = 'r', long = "render", action = clap::ArgAction::Set, default_value = "true")]
pub render: bool,
/// Enable rendering (alternative syntax)
#[arg(short = 'r', long = "render", action = clap::ArgAction::Set, default_value = "true")]
pub render: bool,
/// Enable debug mode (alternative syntax)
#[arg(short = 's', long = "seed", action = clap::ArgAction::SetTrue)]
pub seed: bool,
/// Enable debug mode (alternative syntax)
#[arg(short = 's', long = "seed", action = clap::ArgAction::SetTrue)]
pub seed: bool,
}
#[derive(Subcommand, Debug)]
@ -173,11 +174,11 @@ pub enum Commands {}
#[derive(Subcommand, Debug)]
pub enum TxCmd {
/// Add a new transaction to the DB
#[command(short_flag = 'a')]
Add(core::Tx),
/// Add a new transaction to the DB
#[command(short_flag = 'a')]
Add(core::Tx),
}
pub fn get_args() -> CliArgs {
CliArgs::parse()
CliArgs::parse()
}

View File

@ -3,15 +3,15 @@ use std::sync::Arc;
use tokio::sync::broadcast;
use super::event_bus::EventBus;
use crate::watcher::ExecutorCommand;
use crate::executor::ExecutorCommand;
static EXECUTOR_EVENT_BUS: Lazy<Arc<EventBus<ExecutorCommand>>> =
Lazy::new(|| Arc::new(EventBus::new()));
Lazy::new(|| Arc::new(EventBus::new()));
pub fn publish_executor_event(event: ExecutorCommand) {
EXECUTOR_EVENT_BUS.publish(event);
EXECUTOR_EVENT_BUS.publish(event);
}
pub fn subscribe_executor_event() -> broadcast::Receiver<ExecutorCommand> {
EXECUTOR_EVENT_BUS.subscribe()
EXECUTOR_EVENT_BUS.subscribe()
}

View File

@ -6,20 +6,20 @@ use super::event_bus::EventBus;
#[derive(Clone, Debug)]
pub enum NetworkEvent {
SeedConnected(String),
SeedDisconnected(String),
AllSeedsConnected,
BootstrapCompleted,
NodeReady,
SeedConnected(String),
SeedDisconnected(String),
AllSeedsConnected,
BootstrapCompleted,
NodeReady,
}
static NETWORK_EVENT_BUS: Lazy<Arc<EventBus<NetworkEvent>>> =
Lazy::new(|| Arc::new(EventBus::new()));
Lazy::new(|| Arc::new(EventBus::new()));
pub fn publish_network_event(event: NetworkEvent) {
NETWORK_EVENT_BUS.publish(event);
NETWORK_EVENT_BUS.publish(event);
}
pub fn subscribe_network_event() -> broadcast::Receiver<NetworkEvent> {
NETWORK_EVENT_BUS.subscribe()
NETWORK_EVENT_BUS.subscribe()
}

View File

@ -1,17 +0,0 @@
use once_cell::sync::Lazy;
use std::sync::Arc;
use tokio::sync::broadcast;
use super::event_bus::EventBus;
use crate::watcher::renderer::RenderCommand;
static RENDER_CHANNEL: Lazy<Arc<EventBus<RenderCommand>>> = Lazy::new(|| Arc::new(EventBus::new()));
pub fn publish_render_event(event: RenderCommand) {
RENDER_CHANNEL.publish(event);
}
pub fn subscribe_render_event() -> broadcast::Receiver<RenderCommand> {
RENDER_CHANNEL.subscribe()
}

View File

@ -6,18 +6,18 @@ use super::event_bus::EventBus;
#[derive(Clone, Debug)]
pub enum SystemEvent {
ExecutorStarted,
RendererStarted,
NodeStarted,
Exit,
ExecutorStarted,
RendererStarted,
NodeStarted,
Exit,
}
static SYSTEM_EVENT_BUS: Lazy<Arc<EventBus<SystemEvent>>> = Lazy::new(|| Arc::new(EventBus::new()));
pub fn publish_system_event(event: SystemEvent) {
SYSTEM_EVENT_BUS.publish(event);
SYSTEM_EVENT_BUS.publish(event);
}
pub fn subscribe_system_event() -> broadcast::Receiver<SystemEvent> {
SYSTEM_EVENT_BUS.subscribe()
SYSTEM_EVENT_BUS.subscribe()
}

15
node/src/bus/watcher.rs Normal file
View File

@ -0,0 +1,15 @@
use once_cell::sync::Lazy;
use std::sync::Arc;
use tokio::sync::broadcast;
use super::event_bus::EventBus;
use crate::watcher::WatcherCommand;
static WATCHER_CHANNEL: Lazy<Arc<EventBus<WatcherCommand>>> = Lazy::new(|| Arc::new(EventBus::new()));
pub fn publish_watcher_event(event: WatcherCommand) {
WATCHER_CHANNEL.publish(event);
}
pub fn subscribe_watcher_event() -> broadcast::Receiver<WatcherCommand> {
WATCHER_CHANNEL.subscribe()
}

View File

@ -1,68 +1,66 @@
use crate::args::*;
use crate::core::ChainData;
use crate::executor::ExecutorCommand;
use crate::node::*;
use crate::watcher::{RenderCommand, ExecutorCommand};
use crate::renderer::RenderCommand;
use clap::Parser;
pub fn handle_peer_command(cmd: CliPeerCommand) -> NodeCommand {
match cmd {
CliPeerCommand::List => NodeCommand::ListPeers,
CliPeerCommand::Remove { id } => NodeCommand::RemovePeer {
peer_id: id.parse::<uuid::Uuid>().unwrap(),
},
CliPeerCommand::Connect { addr } => NodeCommand::ConnectTcpPeer(addr),
}
match cmd {
CliPeerCommand::List => NodeCommand::ListPeers,
CliPeerCommand::Remove { id } => NodeCommand::RemovePeer {
peer_id: id.parse::<uuid::Uuid>().unwrap(),
},
CliPeerCommand::Connect { addr } => NodeCommand::ConnectTcpPeer(addr),
}
}
pub fn handle_block_command(cmd: CliBlockCommand) -> NodeCommand {
match cmd {
CliBlockCommand::List => NodeCommand::ListBlocks,
CliBlockCommand::Dump { output } => NodeCommand::DumpBlocks(output),
CliBlockCommand::Create => NodeCommand::CreateBlock,
CliBlockCommand::Display{key, height} => {
match (key, height) {
(Some(k), _) => return NodeCommand::DisplayBlockByKey(k),
(_, Some(h)) => return NodeCommand::DisplayBlockByHeight(h),
(None, None) => panic!()
}
},
}
match cmd {
CliBlockCommand::List => NodeCommand::ListBlocks,
CliBlockCommand::Dump { output } => NodeCommand::DumpBlocks(output),
CliBlockCommand::Create => NodeCommand::CreateBlock,
CliBlockCommand::Display { key, height } => match (key, height) {
(Some(k), _) => return NodeCommand::DisplayBlockByKey(k),
(_, Some(h)) => return NodeCommand::DisplayBlockByHeight(h),
(None, None) => return NodeCommand::DisplayBlockInteractive,
},
}
}
fn handle_seed_command(cmd: CliSeedCommand) -> NodeCommand {
match cmd {
CliSeedCommand::Connect => NodeCommand::ConnectToSeeds,
}
match cmd {
CliSeedCommand::Connect => NodeCommand::ConnectToSeeds,
}
}
fn handle_ping(cmd: CliPingCommand) -> NodeCommand {
match cmd {
CliPingCommand::Id { id } => NodeCommand::PingId(id),
CliPingCommand::Addr { addr } => NodeCommand::PingAddr(addr),
}
match cmd {
CliPingCommand::Id { id } => NodeCommand::PingId(id),
CliPingCommand::Addr { addr } => NodeCommand::PingAddr(addr),
}
}
pub fn cli(input: &[&str]) -> ExecutorCommand {
match Cli::try_parse_from(input) {
Ok(cmd) => match cmd.command {
CliCommand::Layout { mode } => {
ExecutorCommand::Render(RenderCommand::ChangeLayout(mode))
}
CliCommand::Clear { pane } => ExecutorCommand::Render(RenderCommand::ClearPane(pane)),
CliCommand::Peer { peer_cmd } => ExecutorCommand::Node(handle_peer_command(peer_cmd)),
CliCommand::Block { block_cmd } => {
ExecutorCommand::Node(handle_block_command(block_cmd))
}
CliCommand::Transaction(tx) => ExecutorCommand::Node(NodeCommand::ProcessChainData(
ChainData::Transaction(tx),
)),
CliCommand::DebugShowId => ExecutorCommand::Node(NodeCommand::ShowId),
CliCommand::StartListner { addr } => {
ExecutorCommand::Node(NodeCommand::StartListner(addr.parse().unwrap()))
}
CliCommand::Seeds { seed_cmd } => ExecutorCommand::Node(handle_seed_command(seed_cmd)),
CliCommand::Ping { ping_cmd } => ExecutorCommand::Node(handle_ping(ping_cmd)),
},
Err(e) => ExecutorCommand::InvalidCommand(format!("{e}")),
}
pub fn cli(input: &str) -> ExecutorCommand {
let argv: Vec<&str> = std::iter::once(" ")
.chain(input.split_whitespace())
.collect();
match Cli::try_parse_from(argv) {
Ok(cmd) => match cmd.command {
CliCommand::Layout { mode } => ExecutorCommand::Render(RenderCommand::ChangeLayout(mode)),
CliCommand::Clear => ExecutorCommand::Render(RenderCommand::ClearPane),
CliCommand::Peer { peer_cmd } => ExecutorCommand::Node(handle_peer_command(peer_cmd)),
CliCommand::Block { block_cmd } => ExecutorCommand::Node(handle_block_command(block_cmd)),
CliCommand::Transaction(tx) => {
ExecutorCommand::Node(NodeCommand::ProcessChainData(ChainData::Transaction(tx)))
}
CliCommand::DebugShowId => ExecutorCommand::Node(NodeCommand::ShowId),
CliCommand::StartListner { addr } => {
ExecutorCommand::Node(NodeCommand::StartListner(addr.parse().unwrap()))
}
CliCommand::Seeds { seed_cmd } => ExecutorCommand::Node(handle_seed_command(seed_cmd)),
CliCommand::Ping { ping_cmd } => ExecutorCommand::Node(handle_ping(ping_cmd)),
},
Err(e) => ExecutorCommand::InvalidCommand(format!("{e}")),
}
}

View File

@ -1,45 +1,49 @@
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Default, bincode::Decode, bincode::Encode)]
#[derive(
Clone, Debug, serde::Deserialize, serde::Serialize, Default, bincode::Decode, bincode::Encode,
)]
pub struct BlockHeader {
pub previous_hash: String,
pub timestamp: u64,
pub merkle_root: String,
pub block_hash: String,
pub nonce: u32,
pub height: u64,
pub previous_hash: String,
pub timestamp: u64,
pub merkle_root: String,
pub block_hash: String,
pub nonce: u32,
pub height: u64,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, Default, bincode::Decode, bincode::Encode)]
#[derive(
Clone, Debug, serde::Deserialize, serde::Serialize, Default, bincode::Decode, bincode::Encode,
)]
pub struct Block {
pub head: BlockHeader,
pub data: Vec<String>,
pub head: BlockHeader,
pub data: Vec<String>,
}
impl BlockHeader {
pub fn previous_hash(&self) -> &str {
&self.previous_hash
}
pub fn timestamp(&self) -> u64 {
self.timestamp
}
pub fn nonce(&self) -> u32 {
self.nonce
}
pub fn merkle_root(&self) -> &str {
&self.merkle_root
}
pub fn block_hash(&self) -> &str {
&self.block_hash
}
pub fn previous_hash(&self) -> &str {
&self.previous_hash
}
pub fn timestamp(&self) -> u64 {
self.timestamp
}
pub fn nonce(&self) -> u32 {
self.nonce
}
pub fn merkle_root(&self) -> &str {
&self.merkle_root
}
pub fn block_hash(&self) -> &str {
&self.block_hash
}
}
impl Block {
pub fn new(head: BlockHeader, data: Vec<String>) -> Self {
Self { head, data }
}
pub fn head(&self) -> &BlockHeader {
&self.head
}
pub fn data(&self) -> &[String] {
&self.data
}
pub fn new(head: BlockHeader, data: Vec<String>) -> Self {
Self { head, data }
}
pub fn head(&self) -> &BlockHeader {
&self.head
}
pub fn data(&self) -> &[String] {
&self.data
}
}

View File

@ -1,18 +1,18 @@
use std::sync::Arc;
use crate::core::ChainData;
use crate::db::database;
use crate::db;
use crate::core;
use crate::core::ChainData;
use crate::db;
use crate::db::DatabaseError;
use crate::db::database;
use crate::error::TxError;
use crate::log;
use super::hasher::Hasher;
use vlogger::*;
use std::collections::HashMap;
use std::time::UNIX_EPOCH;
use vlogger::*;
use thiserror::*;
#[allow(dead_code)]
@ -31,8 +31,7 @@ pub enum BlockchainError {
Validation(#[from] ValidationError),
#[error("Block Creation Error")]
BlockCreation
BlockCreation,
}
const BLOCKCHAIN_ID: &str = "watch-chain";
@ -66,7 +65,6 @@ impl Blockchain {
Ok(())
}
fn acc_exists(&self, acc: &Account) -> bool {
self.balances.iter().find(|(k, _)| *k == acc).is_some()
}
@ -76,10 +74,11 @@ impl Blockchain {
}
fn hash_transaction_pool(&self) -> Vec<String> {
self.mempool
.iter()
.map(|tx| Hasher::hash_chain_data(tx))
.collect()
self
.mempool
.iter()
.map(|tx| Hasher::hash_chain_data(tx))
.collect()
}
pub fn create_block(&mut self) -> Result<Arc<core::Block>, BlockchainError> {
@ -120,7 +119,7 @@ impl Blockchain {
self.add_block(new_block.clone())?;
Ok(new_block)
}
Err(_) => Err(BlockchainError::BlockCreation)
Err(_) => Err(BlockchainError::BlockCreation),
}
}
@ -163,11 +162,11 @@ impl Blockchain {
}
impl Blockchain {
pub fn list_blocks(&self) -> Result<String, BlockchainError> {
let mut ret = String::from("Blocks List\n-------------------\n");
pub fn list_blocks(&self) -> Result<Vec<String>, BlockchainError> {
let mut ret = Vec::new();
let blocks = self.blocks()?;
for (i, b) in blocks.iter().enumerate() {
ret.push_str(format!("Block Hash #{i}: {}\n", b.head.block_hash()).as_str())
for b in blocks.iter() {
ret.push(b.head.block_hash().to_string())
}
Ok(ret)
}
@ -185,7 +184,7 @@ impl Blockchain {
Ok(())
}
pub fn add_block(&mut self, block: Arc<core::Block>) -> Result<(), BlockchainError>{
pub fn add_block(&mut self, block: Arc<core::Block>) -> Result<(), BlockchainError> {
match self.validate_block(&block) {
Ok(()) => Ok(self.insert_block(&block)?),
Err(e) => Err(BlockchainError::Validation(e)),
@ -223,7 +222,7 @@ impl Blockchain {
fn validate_chain(&self) -> Result<(), ValidationError> {
if let Ok(blocks) = self.blocks() {
if let Some(mut prev_block) = blocks.first() {
for (i, block) in blocks.iter().enumerate() {
for (i, block) in blocks.iter().skip(1).enumerate() {
let head = block.head();
let hash = Hasher::calculate_block_hash(block.head());
@ -246,9 +245,11 @@ impl Blockchain {
balances: HashMap::new(),
mempool: vec![],
id: BLOCKCHAIN_ID.to_string(),
db
db,
};
chain.validate_chain().or_else(|e| return Err(BlockchainError::Validation(e)))?;
chain
.validate_chain()
.or_else(|e| return Err(BlockchainError::Validation(e)))?;
Ok(chain)
}
}

View File

@ -4,5 +4,5 @@ use super::Tx;
#[derive(serde::Deserialize, serde::Serialize, Encode, Decode, Debug, Clone)]
pub enum ChainData {
Transaction(Tx),
Transaction(Tx),
}

View File

@ -1,7 +1,7 @@
use sha2::Sha256;
use sha2::Digest;
use sha2::Sha256;
use super::{ChainData, BlockHeader};
use super::{BlockHeader, ChainData};
pub struct Hasher {}

View File

@ -1,50 +1,52 @@
use crate::core::Account;
use crate::error::TxError;
#[derive(serde::Deserialize, serde::Serialize, Debug, clap::Args, Clone, bincode::Encode, bincode::Decode)]
#[derive(
serde::Deserialize, serde::Serialize, Debug, clap::Args, Clone, bincode::Encode, bincode::Decode,
)]
pub struct Tx {
from: Account,
to: Account,
value: u32,
data: String,
from: Account,
to: Account,
value: u32,
data: String,
}
impl Tx {
pub fn new(from: Account, to: Account, value: u32, data: String) -> Self {
Self {
from,
to,
value,
data,
}
pub fn new(from: Account, to: Account, value: u32, data: String) -> Self {
Self {
from,
to,
value,
data,
}
}
pub fn validate(&self) -> Result<(), TxError> {
if self.from.is_empty() {
return Err(TxError::FromEmpty);
} else if self.to.is_empty() {
return Err(TxError::ToEmpty);
} else if self.value == 0 {
return Err(TxError::ValueEmpty);
}
Ok(())
}
pub fn is_new_account(&self) -> bool {
return self.data == "new_account";
}
pub fn is_reward(&self) -> bool {
return self.data == "reward";
}
pub fn from(&self) -> &Account {
&self.from
}
pub fn to(&self) -> &Account {
&self.to
}
pub fn value(&self) -> u32 {
self.value
}
pub fn data(&self) -> &str {
&self.data
pub fn validate(&self) -> Result<(), TxError> {
if self.from.is_empty() {
return Err(TxError::FromEmpty);
} else if self.to.is_empty() {
return Err(TxError::ToEmpty);
} else if self.value == 0 {
return Err(TxError::ValueEmpty);
}
Ok(())
}
pub fn is_new_account(&self) -> bool {
return self.data == "new_account";
}
pub fn is_reward(&self) -> bool {
return self.data == "reward";
}
pub fn from(&self) -> &Account {
&self.from
}
pub fn to(&self) -> &Account {
&self.to
}
pub fn value(&self) -> u32 {
self.value
}
pub fn data(&self) -> &str {
&self.data
}
}

View File

@ -1,8 +1,13 @@
use crate::{
core::{self, Block, ChainData, Hasher},
db::error::DatabaseError,
error::print_error_chain,
log,
};
use bincode::{self, config::Configuration};
use sled::{self, Batch};
use crate::{core::{self, Block, ChainData, Hasher}, db::error::DatabaseError, error::print_error_chain, log};
use vlogger::*;
use std::sync::Arc;
use vlogger::*;
static BINCODE_CONFIG: Configuration = bincode::config::standard();
@ -10,55 +15,66 @@ const DB_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/database");
const DB_TREE: &str = "blocks";
const BLOCK_INDEX: &str = "blocks:";
const CHAIN_DATA_INDEX: &str = "chain_data:";
const DATA_TO_BLOCK_INDEX: &str = "data_to_block:";
const METADATA_INDEX: &str= "metadata:";
const BLOCK_INDEX: &str = "blocks";
const CHAIN_DATA_INDEX: &str = "chain_data";
const DATA_TO_BLOCK_INDEX: &str = "data_to_block";
const METADATA_INDEX: &str = "metadata";
const TIP_KEY: &str = "chain_tip";
const HEIGHT_KEY: &str = "chain_height";
const HEIGHT_TO_HASH_INDEX: &str = "height_to_hash";
#[derive(Debug)]
pub struct ChainDb {
db: sled::Tree,
}
fn data_index(key: &str) -> String {
format!("{}{}", CHAIN_DATA_INDEX, key)
format!("{}:{}", CHAIN_DATA_INDEX, key)
}
fn data_to_block_index(key: &str) -> String {
format!("{}{}", DATA_TO_BLOCK_INDEX, key)
format!("{}:{}", DATA_TO_BLOCK_INDEX, key)
}
fn block_index(key: &str) -> String {
format!("{}{}", BLOCK_INDEX, key)
format!("{}:{}", BLOCK_INDEX, key)
}
fn metadata_index(key: &str) -> String {
format!("{}{}", METADATA_INDEX, key)
format!("{}:{}", METADATA_INDEX, key)
}
fn height_to_hash_index(height: u64) -> String {
format!("{}:{:020}", HEIGHT_TO_HASH_INDEX, height)
}
impl ChainDb {
pub fn new(path: Option<String>) ->Result<ChainDb, DatabaseError> {
pub fn new(path: Option<String>) -> Result<ChainDb, DatabaseError> {
let path = if path.is_some() {
&path.unwrap()
} else {
DB_PATH
};
match sled::open(&path) {
let config = sled::Config::new()
.cache_capacity(512 * 1024)
.segment_size(1024 * 1024)
.path(&path);
match config.open() {
Ok(db) => {
if db.was_recovered() {
log(msg!(INFO, "Loaded Database from Previous state at: {}", path));
log(msg!(
INFO,
"Loaded Database from Previous state at: {}",
path
));
} else {
log(msg!(INFO, "Created Database at {}", path));
}
let db = db
.open_tree(DB_TREE)?;
Ok(ChainDb {
db
})
let db = db.open_tree(DB_TREE)?;
Ok(ChainDb { db })
}
Err(err) => {
print_error_chain(&err.clone().into());
@ -70,15 +86,18 @@ impl ChainDb {
pub fn get_block_by_key(&self, block_hash: &str) -> Result<Option<core::Block>, DatabaseError> {
let block_hash = block_index(block_hash);
if let Some(bin_block) = self.db.get(block_hash)? {
let (block, _size) = bincode::decode_from_slice::<core::Block, _>(&bin_block, BINCODE_CONFIG)
.map_err(|e| DatabaseError::Decode(e))?;
let (block, _size) = bincode::decode_from_slice::<core::Block, _>(&bin_block, BINCODE_CONFIG)
.map_err(|e| DatabaseError::Decode(e))?;
Ok(Some(block))
} else {
Ok(None)
}
}
pub fn get_block_by_height(&self, height: u64) -> Result<Option<Arc<core::Block>>, DatabaseError> {
pub fn get_block_by_height(
&self,
height: u64,
) -> Result<Option<Arc<core::Block>>, DatabaseError> {
for result in self.db.scan_prefix(BLOCK_INDEX) {
let (_key, value) = result?;
let (block, _size) = bincode::decode_from_slice::<core::Block, _>(&value, BINCODE_CONFIG)?;
@ -107,7 +126,9 @@ impl ChainDb {
}
pub fn get_all_blocks(&self) -> Result<Vec<Arc<core::Block>>, DatabaseError> {
self.db.scan_prefix(BLOCK_INDEX)
self
.db
.scan_prefix(BLOCK_INDEX)
.map(|res| -> Result<Arc<core::Block>, DatabaseError> {
let (_key, value) = res?;
let (block, _size) = bincode::decode_from_slice::<core::Block, _>(&value, BINCODE_CONFIG)
@ -128,11 +149,21 @@ impl ChainDb {
let mut db_batch = Batch::default();
let bin_block = bincode::encode_to_vec(block, BINCODE_CONFIG)?;
db_batch.insert(block_index(block.head().block_hash()).as_str(), bin_block);
db_batch.insert(
height_to_hash_index(block.head().height).as_str(),
block.head().block_hash(),
);
for data in block.data() {
db_batch.insert(data_to_block_index(data.as_str()).as_str(), block.head().block_hash());
db_batch.insert(
data_to_block_index(data.as_str()).as_str(),
block.head().block_hash(),
);
}
db_batch.insert(metadata_index(TIP_KEY).as_str(), block.head().block_hash());
db_batch.insert(metadata_index(HEIGHT_KEY).as_str(), &block.head().height.to_be_bytes());
db_batch.insert(
metadata_index(HEIGHT_KEY).as_str(),
&block.head().height.to_be_bytes(),
);
self.db.apply_batch(db_batch)?;
Ok(())
}

View File

@ -18,5 +18,5 @@ pub enum DatabaseError {
Decode(#[from] bincode::error::DecodeError),
#[error("Missing chain data for hash: {0}")]
MissingData(String)
MissingData(String),
}

View File

@ -0,0 +1,15 @@
use crate::node::NodeCommand;
use crate::renderer::RenderCommand;
use crate::watcher::WatcherCommand;
#[derive(Clone, Debug)]
pub enum ExecutorCommand {
NodeResponse(String),
Echo(Vec<String>),
Print(String),
InvalidCommand(String),
Node(NodeCommand),
Render(RenderCommand),
Watcher(WatcherCommand),
Exit,
}

View File

@ -0,0 +1,92 @@
use crate::{
bus::{SystemEvent, publish_watcher_event, publish_system_event},
log,
node::NodeCommand,
renderer::RenderTarget,
watcher::WatcherCommand,
};
use thiserror::Error;
use tokio::sync::mpsc;
use vlogger::*;
use super::ExecutorCommand;
use crate::RenderCommand;
#[derive(Debug, Error)]
pub enum InProcessError {
#[error("TODO: {0}")]
TODO(String),
}
pub struct Executor {
node_tx: mpsc::Sender<NodeCommand>,
rx: mpsc::Receiver<ExecutorCommand>,
exit: bool,
}
impl Executor {
pub fn new(node_tx: mpsc::Sender<NodeCommand>, rx: mpsc::Receiver<ExecutorCommand>) -> Self {
Self {
node_tx,
rx,
exit: false,
}
}
pub async fn run(&mut self) {
publish_system_event(SystemEvent::ExecutorStarted);
while !self.exit {
self.listen().await;
}
}
async fn exit(&mut self) {
log(msg!(DEBUG, "Executor Exit"));
self.exit = true
}
async fn listen(&mut self) {
if let Some(cmd) = self.rx.recv().await {
let _ = self.execute(cmd).await;
}
}
async fn send_node_cmd(&self, cmd: NodeCommand) {
self.node_tx.send(cmd).await.unwrap()
}
async fn handle_node_cmd(&self, cmd: NodeCommand) {
self.send_node_cmd(cmd).await;
}
async fn echo(&self, s: Vec<String>) {
let mut str = s.join(" ");
str.push_str("\n");
let rd_cmd = WatcherCommand::Render(RenderCommand::RenderStringToPaneId {
str,
pane: RenderTarget::CliOutput,
});
publish_watcher_event(rd_cmd);
}
async fn invalid_command(&self, str: String) {
let rd_cmd = WatcherCommand::Render(RenderCommand::RenderStringToPaneId {
str,
pane: RenderTarget::CliOutput,
});
publish_watcher_event(rd_cmd);
}
async fn execute(&mut self, cmd: ExecutorCommand) {
match cmd {
ExecutorCommand::NodeResponse(resp) => log(resp),
ExecutorCommand::Node(n) => self.handle_node_cmd(n).await,
ExecutorCommand::Render(p) => publish_watcher_event(WatcherCommand::Render(p)),
ExecutorCommand::Watcher(w) => publish_watcher_event(w),
ExecutorCommand::Echo(s) => self.echo(s).await,
ExecutorCommand::Print(s) => log(s),
ExecutorCommand::InvalidCommand(str) => self.invalid_command(str).await,
ExecutorCommand::Exit => self.exit().await,
}
}
}

View File

@ -1,9 +1,9 @@
pub mod node {
pub mod node;
pub use node::*;
pub mod node;
pub use node::*;
pub mod error;
pub use error::*;
pub mod error;
pub use error::*;
}
pub mod cli;
@ -17,69 +17,85 @@ pub mod db {
pub mod error;
pub use database::*;
pub use error::*;
}
pub mod bus {
pub mod executor;
pub mod network;
pub mod render;
pub mod system;
pub mod event_bus;
pub mod network;
pub mod watcher;
pub mod system;
pub use executor::*;
pub mod executor;
pub use network::*;
pub use render::*;
pub use watcher::*;
pub use system::*;
}
pub mod watcher {
pub mod executor;
pub mod parser;
pub mod renderer;
pub mod watcher;
pub mod executor {
pub mod executor;
pub use executor::*;
pub use executor::*;
pub use parser::*;
pub use renderer::*;
pub use watcher::*;
pub mod command;
pub use command::*;
}
pub mod renderer {
pub mod renderer;
pub use renderer::*;
pub mod pane;
pub use pane::*;
pub mod layout;
pub use layout::*;
}
pub mod watcher {
pub mod builder;
pub mod watcher;
pub use builder::*;
pub use watcher::*;
pub mod command;
pub use command::*;
}
pub mod protocol {
pub mod message;
pub use message::*;
pub mod message;
pub use message::*;
pub mod connection;
pub use connection::*;
pub mod connection;
pub use connection::*;
pub mod connector;
pub use connector::*;
pub mod connector;
pub use connector::*;
}
pub mod core {
pub mod block;
pub use block::*;
pub mod block;
pub use block::*;
pub mod blockchain;
pub use blockchain::*;
pub mod blockchain;
pub use blockchain::*;
pub mod tx;
pub use tx::*;
pub mod tx;
pub use tx::*;
pub mod data;
pub use data::*;
pub mod data;
pub use data::*;
pub mod hasher;
pub use hasher::*;
pub mod hasher;
pub use hasher::*;
}
pub mod seeds_constants;
use crate::watcher::renderer::{RenderCommand, RenderPane};
use crate::renderer::{RenderCommand, RenderTarget};
pub fn log(msg: String) {
crate::bus::publish_render_event(RenderCommand::RenderStringToPane {
pane: RenderPane::CliOutput,
str: msg,
})
crate::bus::publish_watcher_event(watcher::WatcherCommand::Render(RenderCommand::RenderStringToPaneId {
pane: RenderTarget::CliOutput,
str: msg,
}))
}

View File

@ -10,37 +10,20 @@ use clap::Parser;
#[tokio::main]
async fn main() -> Result<(), std::io::Error> {
let args = args::CliArgs::parse();
let args = args::CliArgs::parse();
let mut watcher = Watcher::build()
.addr(args.addr)
.seed(args.seed)
.debug(args.debug)
.render(args.render)
.bootstrap(args.bootstrap)
.start()
.await;
let mut watcher = Watcher::build()
.addr(args.addr)
.seed(args.seed)
.debug(args.debug)
.render(args.render)
.bootstrap(args.bootstrap)
.start()
.await;
crossterm::execute!(
std::io::stdout(),
crossterm::event::EnableBracketedPaste,
crossterm::event::EnableFocusChange,
crossterm::event::EnableMouseCapture,
)?;
watcher.run().await?;
loop {
if !watcher.poll().await.is_ok_and(|b| b) {
break;
}
}
println!("Hello, world!");
crossterm::execute!(
std::io::stdout(),
crossterm::event::DisableBracketedPaste,
crossterm::event::DisableFocusChange,
crossterm::event::DisableMouseCapture
)?;
ratatui::restore();
println!("Hello, world!");
Ok(())
Ok(())
}

View File

@ -3,5 +3,5 @@ use thiserror::Error;
#[derive(Debug, Clone, Error)]
pub enum NetworkError {
#[error("Implement NetworkError Enum: ({})", file!())]
TODO
TODO,
}

View File

@ -1,443 +1,466 @@
use crate::bus::{publish_system_event, publish_watcher_event, SystemEvent};
use crate::core::{self, Blockchain, BlockchainError, ChainData, ValidationError};
use crate::error::print_error_chain;
use crate::bus::{SystemEvent, publish_system_event};
use crate::executor::ExecutorCommand;
use crate::log;
use crate::protocol::ProtocolMessage;
use crate::protocol::{Connector, ConnectorCommand};
use crate::seeds_constants::SEED_NODES;
use crate::watcher::executor::ExecutorCommand;
use crate::log;
use crate::watcher::{WatcherCommand, WatcherMode};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use thiserror::*;
use tokio::sync::mpsc;
use uuid::Uuid;
use vlogger::*;
use thiserror::*;
#[derive(Debug, Clone)]
pub struct TcpPeer {
pub id: Uuid,
pub addr: SocketAddr,
pub sender: tokio::sync::mpsc::Sender<ProtocolMessage>,
pub id: Uuid,
pub addr: SocketAddr,
pub sender: tokio::sync::mpsc::Sender<ProtocolMessage>,
}
impl TcpPeer {
pub fn new(
id: Uuid,
addr: SocketAddr,
sender: tokio::sync::mpsc::Sender<ProtocolMessage>,
) -> Self {
Self { id, addr, sender }
}
pub fn new(
id: Uuid,
addr: SocketAddr,
sender: tokio::sync::mpsc::Sender<ProtocolMessage>,
) -> Self {
Self { id, addr, sender }
}
}
#[allow(dead_code)]
pub struct Node {
pub tcp_connector: Option<mpsc::Sender<ConnectorCommand>>,
pub id: Uuid,
pub addr: Option<SocketAddr>,
pub tcp_peers: HashMap<Uuid, TcpPeer>,
chain: Blockchain,
listner_handle: Option<tokio::task::JoinHandle<()>>,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<NodeCommand>,
tx: mpsc::Sender<NodeCommand>,
pub tcp_connector: Option<mpsc::Sender<ConnectorCommand>>,
pub id: Uuid,
pub addr: Option<SocketAddr>,
pub tcp_peers: HashMap<Uuid, TcpPeer>,
chain: Blockchain,
listner_handle: Option<tokio::task::JoinHandle<()>>,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<NodeCommand>,
tx: mpsc::Sender<NodeCommand>,
}
#[derive(Debug, Error)]
pub enum NodeError {
#[error("Block chain error")]
ChainError(#[from] BlockchainError)
ChainError(#[from] BlockchainError),
}
#[derive(Debug, Clone)]
pub enum NodeCommand {
AddPeer(TcpPeer),
RemovePeer {
peer_id: Uuid,
},
ProcessMessage {
peer_id: Uuid,
message: ProtocolMessage,
},
ProcessChainData(ChainData),
StartListner(SocketAddr),
PingAddr(String),
PingId(String),
CreateBlock,
DisplayBlockByKey(String),
DisplayBlockByHeight(u64),
ListBlocks,
ListPeers,
ShowId,
DumpBlocks(String),
ConnectToSeeds,
ConnectTcpPeer(String),
BootStrap,
Exit,
AddPeer(TcpPeer),
RemovePeer {
peer_id: Uuid,
},
ProcessMessage {
peer_id: Uuid,
message: ProtocolMessage,
},
ProcessChainData(ChainData),
StartListner(SocketAddr),
PingAddr(String),
PingId(String),
CreateBlock,
DisplayBlockInteractive,
DisplayBlockByKey(String),
DisplayBlockByHeight(u64),
ListBlocks,
ListPeers,
ShowId,
DumpBlocks(String),
ConnectToSeeds,
ConnectTcpPeer(String),
BootStrap,
Exit,
}
impl Node {
pub fn peer_addresses(&self) -> Vec<SocketAddr> {
let mut addr: Vec<SocketAddr> = self
.tcp_peers
.iter()
.map(|p| p.1.addr.to_string().parse::<SocketAddr>().unwrap())
.collect();
if let Some(a) = self.addr {
addr.push(a.clone());
}
addr
pub fn peer_addresses(&self) -> Vec<SocketAddr> {
let mut addr: Vec<SocketAddr> = self
.tcp_peers
.iter()
.map(|p| p.1.addr.to_string().parse::<SocketAddr>().unwrap())
.collect();
if let Some(a) = self.addr {
addr.push(a.clone());
}
addr
}
pub fn list_peers(&self) -> String {
let mut ret = String::from("Peer List\n-----------\n");
for (i, p) in self.tcp_peers.iter().enumerate() {
ret.push_str(format!("Peer #{i}: {}\n", p.1.id).as_str())
}
ret
pub fn list_peers(&self) -> String {
let mut ret = String::from("Peer List\n-----------\n");
for (i, p) in self.tcp_peers.iter().enumerate() {
ret.push_str(format!("Peer #{i}: {}\n", p.1.id).as_str())
}
ret
}
pub async fn show_id(&self) {
log(msg!(DEBUG, "Node Id: {}", self.id))
pub async fn show_id(&self) {
log(msg!(DEBUG, "Node Id: {}", self.id))
}
async fn remove_tcp_peer(&mut self, peer_id: Uuid) {
log(msg!(DEBUG, "Removing Peer {peer_id}"));
self.tcp_peers.remove_entry(&peer_id);
}
async fn add_tcp_peer(&mut self, peer: TcpPeer) {
log(msg!(DEBUG, "Added Peer from address: {}", peer.addr));
self.tcp_peers.insert(peer.id, peer);
}
pub async fn new_with_id(
id: uuid::Uuid,
exec_tx: mpsc::Sender<ExecutorCommand>,
addr: Option<SocketAddr>,
chain: Blockchain,
) -> Self {
let (tx, rx) = mpsc::channel::<NodeCommand>(100);
Self {
id,
tcp_peers: HashMap::new(),
addr,
exec_tx,
chain,
listner_handle: None,
tcp_connector: None,
tx,
rx,
}
}
async fn remove_tcp_peer(&mut self, peer_id: Uuid) {
log(msg!(DEBUG, "Removing Peer {peer_id}"));
self.tcp_peers.remove_entry(&peer_id);
pub fn new(
addr: Option<SocketAddr>,
exec_tx: mpsc::Sender<ExecutorCommand>,
chain: Blockchain,
) -> Self {
let (tx, rx) = mpsc::channel::<NodeCommand>(100);
Self {
id: Uuid::new_v4(),
tcp_peers: HashMap::new(),
addr,
exec_tx,
listner_handle: None,
tcp_connector: None,
chain,
tx,
rx,
}
}
async fn add_tcp_peer(&mut self, peer: TcpPeer) {
log(msg!(DEBUG, "Added Peer from address: {}", peer.addr));
self.tcp_peers.insert(peer.id, peer);
}
fn get_blocks(&self) -> Result<Vec<Arc<core::Block>>, NodeError> {
Ok(self.chain.blocks()?)
}
pub async fn new_with_id(
id: uuid::Uuid,
exec_tx: mpsc::Sender<ExecutorCommand>,
addr: Option<SocketAddr>,
chain: Blockchain
) -> Self {
let (tx, rx) = mpsc::channel::<NodeCommand>(100);
Self {
id,
tcp_peers: HashMap::new(),
addr,
exec_tx,
chain,
listner_handle: None,
tcp_connector: None,
tx,
rx,
}
}
pub fn new(
addr: Option<SocketAddr>,
exec_tx: mpsc::Sender<ExecutorCommand>,
chain: Blockchain,
) -> Self {
let (tx, rx) = mpsc::channel::<NodeCommand>(100);
Self {
id: Uuid::new_v4(),
tcp_peers: HashMap::new(),
addr,
exec_tx,
listner_handle: None,
tcp_connector: None,
chain,
tx,
rx,
}
}
fn get_blocks(&self) -> Result<Vec<Arc<core::Block>>, NodeError> {
Ok(self.chain.blocks()?)
}
pub async fn process_message(&mut self, peer_id: uuid::Uuid, message: ProtocolMessage) {
match message {
ProtocolMessage::BootstrapRequest { .. } => {
log(msg!(DEBUG, "Received BootstrapRequest from {peer_id}"));
let peer = &self.tcp_peers[&peer_id];
let resp = ProtocolMessage::BootstrapResponse {
blocks: {
if let Ok(blocks) = self.get_blocks() {
serde_json::to_string(&blocks
.iter()
.map(|f| (**f).clone())
.collect::<Vec<core::Block>>()
).map_err(
|e| {
log(msg!(
ERROR,
"Failed to serde Chain for BootstrapResponse: {e}"
));
e
},
).ok()
} else {
None
}
}
};
peer.sender.send(resp).await.unwrap();
log(msg!(DEBUG, "Send BootstrapResponse to {peer_id}"));
}
ProtocolMessage::BootstrapResponse { blocks } => {
log(msg!(DEBUG, "Received BootstrapResponse from seed"));
self.chain = core::Blockchain::build(blocks).unwrap();
}
ProtocolMessage::Pong { peer_id } => {
log(msg!(DEBUG, "Received Pong from {peer_id}"));
}
ProtocolMessage::Ping { peer_id } => {
log(msg!(DEBUG, "Received Ping from {peer_id}"));
let resp = ProtocolMessage::Pong {
peer_id: self.id.clone(),
};
let peer = &self.tcp_peers[&peer_id];
peer.sender.send(resp).await.unwrap();
}
ProtocolMessage::GetPeersRequest { peer_id } => {
log(msg!(DEBUG, "Received GetPeersRequest from {peer_id}"));
let peers = self.peer_addresses();
let resp = ProtocolMessage::GetPeersResponse {
peer_addresses: peers,
};
let peer = &self.tcp_peers[&peer_id];
peer.sender.send(resp).await.unwrap();
}
ProtocolMessage::Block { block, .. } => {
log(msg!(DEBUG, "Received Block from {peer_id}"));
if let Err(_e) = self.chain.add_block(block.into()) {
log(msg!(DEBUG, "TODO: implement error handling in {}:{}", file!(), line!()));
}
}
ProtocolMessage::ChainData { data, .. } => {
log(msg!(DEBUG, "Received ChainData from {peer_id}"));
self.chain.apply(data).unwrap()
}
_ => {
log(msg!(DEBUG, "TODO: implement this message type"));
}
}
}
pub async fn send_message_to_peer_addr(&self, addr: SocketAddr, msg: ProtocolMessage) {
if let Some((_, peer)) = self.tcp_peers.iter().find(|(_, v)| v.addr == addr) {
if let Err(e) = peer.sender.send(msg).await {
log(msg!(ERROR, "Error Sending message to peer: {e}"));
}
log(msg!(DEBUG, "Sent BootstrapRequest to seed"));
} else {
log(msg!(
ERROR,
"Error Sending message to peer: peer not in list"
));
}
}
pub async fn send_message_to_peer_id(&self, id: Uuid, msg: ProtocolMessage) {
if let Some(peer) = self.tcp_peers.get(&id) {
if let Err(e) = peer.sender.send(msg).await {
log(msg!(ERROR, "Error Sending message to peer: {e}"));
}
}
}
async fn send_message_to_seed(&self, msg: ProtocolMessage) {
for seed in SEED_NODES.iter() {
if let Some(_) = self.tcp_peers.iter().find(|(_, v)| v.addr == *seed) {
self.send_message_to_peer_addr(*seed, msg).await;
return;
pub async fn process_message(&mut self, peer_id: uuid::Uuid, message: ProtocolMessage) {
match message {
ProtocolMessage::BootstrapRequest { .. } => {
log(msg!(DEBUG, "Received BootstrapRequest from {peer_id}"));
let peer = &self.tcp_peers[&peer_id];
let resp = ProtocolMessage::BootstrapResponse {
blocks: {
if let Ok(blocks) = self.get_blocks() {
serde_json::to_string(
&blocks
.iter()
.map(|f| (**f).clone())
.collect::<Vec<core::Block>>(),
)
.map_err(|e| {
log(msg!(
ERROR,
"Failed to serde Chain for BootstrapResponse: {e}"
));
e
})
.ok()
} else {
self.send_message_to_peer_addr(*seed, msg).await;
return;
None
}
}
log(msg!(ERROR, "No Seed Nodes Avaliable"));
}
async fn bootstrap(&mut self) -> Result<(), ValidationError> {
log(msg!(DEBUG, "Bootstrapping"));
let message = ProtocolMessage::BootstrapRequest {
peer_id: self.id,
version: "".to_string(),
},
};
self.send_message_to_seed(message).await;
Ok(())
}
async fn broadcast_network_data(&self, data: ChainData) {
for (id, peer) in &self.tcp_peers {
let message = ProtocolMessage::ChainData {
peer_id: self.id,
data: data.clone(),
};
peer.sender.send(message).await.unwrap();
log(msg!(DEBUG, "Send Transaction message to {id}"));
peer.sender.send(resp).await.unwrap();
log(msg!(DEBUG, "Send BootstrapResponse to {peer_id}"));
}
ProtocolMessage::BootstrapResponse { blocks } => {
log(msg!(DEBUG, "Received BootstrapResponse from seed"));
self.chain = core::Blockchain::build(blocks).unwrap();
}
ProtocolMessage::Pong { peer_id } => {
log(msg!(DEBUG, "Received Pong from {peer_id}"));
}
ProtocolMessage::Ping { peer_id } => {
log(msg!(DEBUG, "Received Ping from {peer_id}"));
let resp = ProtocolMessage::Pong {
peer_id: self.id.clone(),
};
let peer = &self.tcp_peers[&peer_id];
peer.sender.send(resp).await.unwrap();
}
ProtocolMessage::GetPeersRequest { peer_id } => {
log(msg!(DEBUG, "Received GetPeersRequest from {peer_id}"));
let peers = self.peer_addresses();
let resp = ProtocolMessage::GetPeersResponse {
peer_addresses: peers,
};
let peer = &self.tcp_peers[&peer_id];
peer.sender.send(resp).await.unwrap();
}
ProtocolMessage::Block { block, .. } => {
log(msg!(DEBUG, "Received Block from {peer_id}"));
if let Err(_e) = self.chain.add_block(block.into()) {
log(msg!(
DEBUG,
"TODO: implement error handling in {}:{}",
file!(),
line!()
));
}
}
ProtocolMessage::ChainData { data, .. } => {
log(msg!(DEBUG, "Received ChainData from {peer_id}"));
self.chain.apply(data).unwrap()
}
_ => {
log(msg!(DEBUG, "TODO: implement this message type"));
}
}
}
async fn broadcast_block(&self, block: &core::Block) {
for (id, peer) in &self.tcp_peers {
let message = ProtocolMessage::Block {
peer_id: self.id,
height: block.head().height as u64,
block: block.clone(),
};
peer.sender.send(message).await.unwrap();
log(msg!(DEBUG, "Send Block message to {id}"));
pub async fn send_message_to_peer_addr(&self, addr: SocketAddr, msg: ProtocolMessage) {
if let Some((_, peer)) = self.tcp_peers.iter().find(|(_, v)| v.addr == addr) {
if let Err(e) = peer.sender.send(msg).await {
log(msg!(ERROR, "Error Sending message to peer: {e}"));
}
log(msg!(DEBUG, "Sent BootstrapRequest to seed"));
} else {
log(msg!(
ERROR,
"Error Sending message to peer: peer not in list"
));
}
}
pub async fn send_message_to_peer_id(&self, id: Uuid, msg: ProtocolMessage) {
if let Some(peer) = self.tcp_peers.get(&id) {
if let Err(e) = peer.sender.send(msg).await {
log(msg!(ERROR, "Error Sending message to peer: {e}"));
}
}
}
async fn send_message_to_seed(&self, msg: ProtocolMessage) {
for seed in SEED_NODES.iter() {
if let Some(_) = self.tcp_peers.iter().find(|(_, v)| v.addr == *seed) {
self.send_message_to_peer_addr(*seed, msg).await;
return;
} else {
self.send_message_to_peer_addr(*seed, msg).await;
return;
}
}
log(msg!(ERROR, "No Seed Nodes Avaliable"));
}
async fn bootstrap(&mut self) -> Result<(), ValidationError> {
log(msg!(DEBUG, "Bootstrapping"));
let message = ProtocolMessage::BootstrapRequest {
peer_id: self.id,
version: "".to_string(),
};
self.send_message_to_seed(message).await;
Ok(())
}
async fn broadcast_network_data(&self, data: ChainData) {
for (id, peer) in &self.tcp_peers {
let message = ProtocolMessage::ChainData {
peer_id: self.id,
data: data.clone(),
};
peer.sender.send(message).await.unwrap();
log(msg!(DEBUG, "Send Transaction message to {id}"));
}
}
async fn broadcast_block(&self, block: &core::Block) {
for (id, peer) in &self.tcp_peers {
let message = ProtocolMessage::Block {
peer_id: self.id,
height: block.head().height as u64,
block: block.clone(),
};
peer.sender.send(message).await.unwrap();
log(msg!(DEBUG, "Send Block message to {id}"));
}
}
pub fn tx(&self) -> mpsc::Sender<NodeCommand> {
return self.tx.clone();
}
pub fn exec_tx(&self) -> mpsc::Sender<ExecutorCommand> {
return self.exec_tx.clone();
}
async fn network_data(&mut self, data: ChainData) {
match self.chain.apply(data) {
Ok(_) => log(msg!(DEBUG, "ChainData Applied")),
Err(e) => print_error_chain(&e.into()),
};
}
async fn connector_cmd(&self, cmd: ConnectorCommand) {
match &self.tcp_connector {
Some(t) => match t.send(cmd).await {
Ok(()) => {}
Err(e) => log(msg!(ERROR, "Failed to Send Command to connector: {}", e)),
},
None => log(msg!(ERROR, "No Connector Availiable")),
}
}
async fn start_connection_listner(&mut self, addr: SocketAddr) {
log(msg!(DEBUG, "Starting Connection Listener"));
let (con_tx, con_rx) = mpsc::channel::<ConnectorCommand>(100);
self.tcp_connector = Some(con_tx);
self.listner_handle = Some(tokio::spawn({
let mut connector = Connector::new(self.id, addr, self.exec_tx(), con_rx);
log(msg!(DEBUG, "Connector Build"));
async move { connector.start().await }
}));
}
async fn connect_to_seed(&mut self) {
self
.connector_cmd(ConnectorCommand::ConnectToTcpSeed(SEED_NODES[0]))
.await;
}
pub async fn run(&mut self) {
if let Some(addr) = self.addr {
self.start_connection_listner(addr).await;
} else {
self
.start_connection_listner(SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
8080,
))
.await;
};
publish_system_event(SystemEvent::NodeStarted);
while let Some(command) = self.rx.recv().await {
match command {
NodeCommand::BootStrap => {
log(msg!(DEBUG, "Received NodeCommand::BootStrap"));
let _ = self.bootstrap().await;
}
}
pub fn tx(&self) -> mpsc::Sender<NodeCommand> {
return self.tx.clone();
}
pub fn exec_tx(&self) -> mpsc::Sender<ExecutorCommand> {
return self.exec_tx.clone();
}
async fn network_data(&mut self, data: ChainData) {
match self.chain.apply(data) {
Ok(_) => log(msg!(DEBUG, "ChainData Applied")),
NodeCommand::StartListner(addr) => {
self.start_connection_listner(addr).await;
}
NodeCommand::ConnectToSeeds => {
self.connect_to_seed().await;
}
NodeCommand::ConnectTcpPeer(addr) => {
log(msg!(DEBUG, "Received ConnectToPeer: {addr}"));
if let Ok(addr_sock) = addr.parse::<SocketAddr>() {
let mes = ConnectorCommand::ConnectToTcpPeer(addr_sock);
self.connector_cmd(mes).await;
} else {
log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}"));
}
}
NodeCommand::PingAddr(addr) => {
if let Ok(addr_sock) = addr.parse::<SocketAddr>() {
let mes = ProtocolMessage::Ping { peer_id: self.id };
self.send_message_to_peer_addr(addr_sock, mes).await;
} else {
log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}"));
}
}
NodeCommand::PingId(id) => {
if let Ok(id) = id.parse::<Uuid>() {
let mes = ProtocolMessage::Ping { peer_id: self.id };
self.send_message_to_peer_id(id, mes).await;
} else {
log(msg!(ERROR, "Failed to Parse to sock_addr: {id}"));
}
}
NodeCommand::AddPeer(peer) => {
self.add_tcp_peer(peer).await;
}
NodeCommand::RemovePeer { peer_id } => {
self.remove_tcp_peer(peer_id).await;
}
NodeCommand::ProcessMessage { peer_id, message } => {
self.process_message(peer_id, message).await;
}
NodeCommand::ProcessChainData(data) => {
self.network_data(data.clone()).await;
self.broadcast_network_data(data).await;
}
NodeCommand::CreateBlock => {
log(msg!(DEBUG, "Received CreateBlock Command"));
if let Ok(block) = self.chain.create_block() {
log(msg!(
INFO,
"Created Block with hash {}",
block.head().block_hash()
));
self.broadcast_block(&block).await;
}
}
NodeCommand::DisplayBlockInteractive => {
let blocks = match self.chain.list_blocks() {
Ok(b) => b,
Err(e) => return print_error_chain(&e.into()),
};
let wat_cmd = WatcherCommand::SetMode(WatcherMode::Select {
content: blocks.into(),
title: "Select Block to display".to_string(),
callback: Box::new(ExecutorCommand::Node(NodeCommand::DisplayBlockByKey("".to_string()))),
index: 0
});
publish_watcher_event(wat_cmd);
}
NodeCommand::DisplayBlockByKey(key) => self.chain.display_block_by_key(key),
NodeCommand::DisplayBlockByHeight(height) => self.chain.display_block_by_height(height),
NodeCommand::ListBlocks => {
log(msg!(DEBUG, "Received DebugListBlocks command"));
match self.chain.list_blocks() {
Ok(s) => log(s.join("\n")),
Err(e) => print_error_chain(&e.into()),
};
}
async fn connector_cmd(&self, cmd: ConnectorCommand) {
match &self.tcp_connector {
Some(t) => match t.send(cmd).await {
Ok(()) => {}
Err(e) => log(msg!(ERROR, "Failed to Send Command to connector: {}", e)),
},
None => log(msg!(ERROR, "No Connector Availiable")),
}
}
}
async fn start_connection_listner(&mut self, addr: SocketAddr) {
log(msg!(DEBUG, "Starting Connection Listener"));
let (con_tx, con_rx) = mpsc::channel::<ConnectorCommand>(100);
self.tcp_connector = Some(con_tx);
self.listner_handle = Some(tokio::spawn({
let mut connector = Connector::new(self.id, addr, self.exec_tx(), con_rx);
log(msg!(DEBUG, "Connector Build"));
async move { connector.start().await }
}));
}
async fn connect_to_seed(&mut self) {
self.connector_cmd(ConnectorCommand::ConnectToTcpSeed(SEED_NODES[0]))
.await;
}
pub async fn run(&mut self) {
if let Some(addr) = self.addr {
self.start_connection_listner(addr).await;
} else {
self.start_connection_listner(SocketAddr::new(
std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
8080,
))
.await;
};
publish_system_event(SystemEvent::NodeStarted);
while let Some(command) = self.rx.recv().await {
match command {
NodeCommand::BootStrap => {
log(msg!(DEBUG, "Received NodeCommand::BootStrap"));
let _ = self.bootstrap().await;
}
NodeCommand::StartListner(addr) => {
self.start_connection_listner(addr).await;
}
NodeCommand::ConnectToSeeds => {
self.connect_to_seed().await;
}
NodeCommand::ConnectTcpPeer(addr) => {
log(msg!(DEBUG, "Received ConnectToPeer: {addr}"));
if let Ok(addr_sock) = addr.parse::<SocketAddr>() {
let mes = ConnectorCommand::ConnectToTcpPeer(addr_sock);
self.connector_cmd(mes).await;
} else {
log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}"));
}
}
NodeCommand::PingAddr(addr) => {
if let Ok(addr_sock) = addr.parse::<SocketAddr>() {
let mes = ProtocolMessage::Ping { peer_id: self.id };
self.send_message_to_peer_addr(addr_sock, mes).await;
} else {
log(msg!(ERROR, "Failed to Parse to sock_addr: {addr}"));
}
}
NodeCommand::PingId(id) => {
if let Ok(id) = id.parse::<Uuid>() {
let mes = ProtocolMessage::Ping { peer_id: self.id };
self.send_message_to_peer_id(id, mes).await;
} else {
log(msg!(ERROR, "Failed to Parse to sock_addr: {id}"));
}
}
NodeCommand::AddPeer(peer) => {
self.add_tcp_peer(peer).await;
}
NodeCommand::RemovePeer { peer_id } => {
self.remove_tcp_peer(peer_id).await;
}
NodeCommand::ProcessMessage { peer_id, message } => {
self.process_message(peer_id, message).await;
}
NodeCommand::ProcessChainData(data) => {
self.network_data(data.clone()).await;
self.broadcast_network_data(data).await;
}
NodeCommand::CreateBlock => {
log(msg!(DEBUG, "Received CreateBlock Command"));
if let Ok(block) = self.chain.create_block() {
log(msg!(INFO, "Created Block with hash {}", block.head().block_hash()));
self.broadcast_block(&block).await;
}
}
NodeCommand::DisplayBlockByKey(key) => {
self.chain.display_block_by_key(key)
}
NodeCommand::DisplayBlockByHeight(height) => {
self.chain.display_block_by_height(height)
}
NodeCommand::ListBlocks => {
log(msg!(DEBUG, "Received DebugListBlocks command"));
match self.chain.list_blocks() {
Ok(s) => log(s),
Err(e) => print_error_chain(&e.into()),
}
}
NodeCommand::ListPeers => {
log(msg!(DEBUG, "Received DebugListPeers command"));
log(self.list_peers());
}
NodeCommand::ShowId => {
log(msg!(DEBUG, "Received DebugListBlocks command"));
self.show_id().await;
}
NodeCommand::DumpBlocks(s) => {
self.chain.dump_blocks(s);
}
NodeCommand::Exit => {
log(msg!(DEBUG, "Node Exit"));
break;
}
}
NodeCommand::ListPeers => {
log(msg!(DEBUG, "Received DebugListPeers command"));
log(self.list_peers());
}
NodeCommand::ShowId => {
log(msg!(DEBUG, "Received DebugListBlocks command"));
self.show_id().await;
}
NodeCommand::DumpBlocks(s) => {
self.chain.dump_blocks(s);
}
NodeCommand::Exit => {
log(msg!(DEBUG, "Node Exit"));
break;
}
}
}
}
}

View File

@ -1,9 +1,9 @@
use crate::executor::ExecutorCommand;
use crate::log;
use crate::node::node;
use crate::protocol::ProtocolMessage;
use crate::watcher::ExecutorCommand;
use tokio::net;
use tokio::sync::mpsc;
use crate::log;
use super::Connector;
@ -12,78 +12,78 @@ use vlogger::*;
#[allow(dead_code)]
#[derive(Debug)]
pub struct Connection {
node_id: uuid::Uuid,
peer_id: uuid::Uuid,
stream: net::TcpStream,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ProtocolMessage>,
}
impl Connection {
pub fn new(
node_id: uuid::Uuid,
peer_id: uuid::Uuid,
stream: net::TcpStream,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ProtocolMessage>,
}
impl Connection {
pub fn new(
node_id: uuid::Uuid,
peer_id: uuid::Uuid,
stream: net::TcpStream,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ProtocolMessage>,
) -> Self {
Self {
node_id,
peer_id,
stream,
rx,
exec_tx,
}
) -> Self {
Self {
node_id,
peer_id,
stream,
rx,
exec_tx,
}
}
pub async fn start(mut self) {
tokio::spawn(async move {
log(msg!(DEBUG, "Started Message Handler for {}", self.peer_id));
pub async fn start(mut self) {
tokio::spawn(async move {
log(msg!(DEBUG, "Started Message Handler for {}", self.peer_id));
loop {
tokio::select! {
response_result = self.rx.recv() => {
match response_result {
Some(response) => {
if let Err(e) = Connector::send_message(&mut self.stream, &response).await {
log(msg!(ERROR, "Failed to send response to {}: {}", self.peer_id, e));
break;
}
},
None => {
log(msg!(DEBUG, "Response channel closed for {}", self.peer_id));
break;
}
}
}
message_result = Connector::receive_message(&mut self.stream) => {
match message_result {
Ok(message) => {
log(msg!(DEBUG, "Received Message from {}", self.peer_id));
let command = ExecutorCommand::Node(node::NodeCommand::ProcessMessage {
peer_id: self.peer_id,
message: message.clone()
});
if self.exec_tx.send(command).await.is_err() {
log(msg!(ERROR, "Failed to send command to main thread from {}", self.peer_id));
break;
}
},
Err(e) => {
log(msg!(WARNING, "Connection to {} closed: {}", self.peer_id, e));
let cmd = ExecutorCommand::Node(node::NodeCommand::RemovePeer {
peer_id: self.peer_id
});
self.exec_tx.send(cmd).await.unwrap();
break;
}
}
}
loop {
tokio::select! {
response_result = self.rx.recv() => {
match response_result {
Some(response) => {
if let Err(e) = Connector::send_message(&mut self.stream, &response).await {
log(msg!(ERROR, "Failed to send response to {}: {}", self.peer_id, e));
break;
}
},
None => {
log(msg!(DEBUG, "Response channel closed for {}", self.peer_id));
break;
}
}
});
}
}
message_result = Connector::receive_message(&mut self.stream) => {
match message_result {
Ok(message) => {
log(msg!(DEBUG, "Received Message from {}", self.peer_id));
let command = ExecutorCommand::Node(node::NodeCommand::ProcessMessage {
peer_id: self.peer_id,
message: message.clone()
});
if self.exec_tx.send(command).await.is_err() {
log(msg!(ERROR, "Failed to send command to main thread from {}", self.peer_id));
break;
}
},
Err(e) => {
log(msg!(WARNING, "Connection to {} closed: {}", self.peer_id, e));
let cmd = ExecutorCommand::Node(node::NodeCommand::RemovePeer {
peer_id: self.peer_id
});
self.exec_tx.send(cmd).await.unwrap();
break;
}
}
}
}
}
});
}
}

View File

@ -10,304 +10,284 @@ use crate::log;
use super::Connection;
use crate::bus::*;
use crate::node::{error, NetworkError};
use crate::executor::ExecutorCommand;
use crate::node::node;
use crate::node::{NetworkError, error};
use crate::protocol::ProtocolMessage;
use crate::watcher::ExecutorCommand;
use thiserror::*;
pub enum ConnectorCommand {
ConnectToTcpPeer(SocketAddr),
ConnectToTcpSeed(SocketAddr),
ConnectToTcpPeer(SocketAddr),
ConnectToTcpSeed(SocketAddr),
}
pub struct Connector {
node_id: uuid::Uuid,
addr: SocketAddr,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ConnectorCommand>,
node_id: uuid::Uuid,
addr: SocketAddr,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ConnectorCommand>,
}
#[derive(Error, Debug)]
pub enum ConnectorError {
#[error("Connection failed")]
ConnectionError(#[from] anyhow::Error),
#[error("Connection failed")]
ConnectionError(#[from] anyhow::Error),
}
const MAX_LISTNER_TRIES: usize = 5;
impl Connector {
pub fn new(
node_id: uuid::Uuid,
addr: SocketAddr,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ConnectorCommand>,
) -> Self {
Self {
node_id,
addr,
exec_tx,
rx,
}
pub fn new(
node_id: uuid::Uuid,
addr: SocketAddr,
exec_tx: mpsc::Sender<ExecutorCommand>,
rx: mpsc::Receiver<ConnectorCommand>,
) -> Self {
Self {
node_id,
addr,
exec_tx,
rx,
}
}
pub async fn start(&mut self) {
let mut listner: Option<tokio::net::TcpListener> = None;
let mut listner_err = None;
for _ in 0..MAX_LISTNER_TRIES {
match tokio::net::TcpListener::bind(self.addr).await {
Ok(l) => {
log(msg!(DEBUG, "Listening on address: {}", self.addr));
listner = Some(l);
break;
}
Err(e) => {
self.addr.set_port(self.addr.port() + 1);
listner_err = Some(e);
}
};
pub async fn start(&mut self) {
let mut listner: Option<tokio::net::TcpListener> = None;
let mut listner_err = None;
for _ in 0..MAX_LISTNER_TRIES {
match tokio::net::TcpListener::bind(self.addr).await {
Ok(l) => {
log(msg!(DEBUG, "Listening on address: {}", self.addr));
listner = Some(l);
break;
}
if let Some(listener) = listner {
loop {
tokio::select! {
cmd_result = self.rx.recv() => {
match cmd_result {
Some(cmd) => {
self.execute_cmd(cmd).await;
}
None => {
log(msg!(DEBUG, "Command channel closed"));
break;
}
}
}
accept_result = listener.accept() => {
match accept_result {
Ok((stream, addr)) => {
log(msg!(DEBUG, "Accepted connection from {}", addr));
self.establish_connection_inbound(stream, addr).await;
}
Err(e) => {
log(msg!(ERROR, "Failed to accept connection: {}", e));
}
}
}
}
Err(e) => {
self.addr.set_port(self.addr.port() + 1);
listner_err = Some(e);
}
};
}
if let Some(listener) = listner {
loop {
tokio::select! {
cmd_result = self.rx.recv() => {
match cmd_result {
Some(cmd) => {
self.execute_cmd(cmd).await;
}
None => {
log(msg!(DEBUG, "Command channel closed"));
break;
}
}
} else {
log(msg!(
FATAL,
"Failed to start TCP Listener: {}",
listner_err.unwrap()
));
}
}
async fn execute_cmd(&mut self, cmd: ConnectorCommand) {
match cmd {
ConnectorCommand::ConnectToTcpPeer(addr) => self.connect_to_peer(addr).await,
ConnectorCommand::ConnectToTcpSeed(addr) => {
self.connect_to_seed(addr).await;
}
accept_result = listener.accept() => {
match accept_result {
Ok((stream, addr)) => {
log(msg!(DEBUG, "Accepted connection from {}", addr));
self.establish_connection_inbound(stream, addr).await;
}
Err(e) => {
log(msg!(ERROR, "Failed to accept connection: {}", e));
}
}
}
}
}
} else {
log(msg!(
FATAL,
"Failed to start TCP Listener: {}",
listner_err.unwrap()
));
}
}
pub async fn connect_to_seed(&self, addr: SocketAddr) {
match net::TcpStream::connect(addr)
.await
.with_context(|| format!("Connecting to {}", addr))
{
Ok(stream) => self.establish_connection_to_seed(stream, addr).await,
Err(e) => {
// let err = ConnectorError::ConnectionError(e.into());
print_error_chain(&e.into());
}
}
async fn execute_cmd(&mut self, cmd: ConnectorCommand) {
match cmd {
ConnectorCommand::ConnectToTcpPeer(addr) => self.connect_to_peer(addr).await,
ConnectorCommand::ConnectToTcpSeed(addr) => {
self.connect_to_seed(addr).await;
}
}
}
pub async fn connect_to_peer(&self, addr: SocketAddr) {
match net::TcpStream::connect(addr).await {
Ok(stream) => self.establish_connection_outbound(stream, addr).await,
Err(e) => {
let err = ConnectorError::ConnectionError(e.into());
print_error_chain(&err.into());
}
}
pub async fn connect_to_seed(&self, addr: SocketAddr) {
match net::TcpStream::connect(addr)
.await
.with_context(|| format!("Connecting to {}", addr))
{
Ok(stream) => self.establish_connection_to_seed(stream, addr).await,
Err(e) => {
// let err = ConnectorError::ConnectionError(e.into());
print_error_chain(&e.into());
}
}
}
pub async fn establish_connection_to_seed(
&self,
mut stream: tokio::net::TcpStream,
addr: SocketAddr,
) {
let handshake = ProtocolMessage::Handshake {
peer_id: self.node_id,
version: "".to_string(),
};
match Connector::send_message(&mut stream, &handshake).await {
Ok(()) => {
if let Ok(mes) = Connector::receive_message(&mut stream).await {
let (ch_tx, ch_rx) = mpsc::channel::<ProtocolMessage>(100);
let peer = match mes {
ProtocolMessage::HandshakeAck { peer_id, .. } => {
node::TcpPeer::new(peer_id, addr, ch_tx)
}
_ => {
log(msg!(
ERROR,
"Invalid Message On Connetion Establishment: {mes}"
));
return;
}
};
let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone()));
publish_network_event(NetworkEvent::SeedConnected(addr.to_string()));
let _ = self.exec_tx.send(cmd).await;
Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx)
.start()
.await;
}
}
Err(e) => print_error_chain(&e.into()),
}
pub async fn connect_to_peer(&self, addr: SocketAddr) {
match net::TcpStream::connect(addr).await {
Ok(stream) => self.establish_connection_outbound(stream, addr).await,
Err(e) => {
let err = ConnectorError::ConnectionError(e.into());
print_error_chain(&err.into());
}
}
}
async fn establish_connection_outbound(
&self,
mut stream: tokio::net::TcpStream,
addr: SocketAddr,
) {
let handshake = ProtocolMessage::Handshake {
peer_id: self.node_id,
version: "".to_string(),
};
match Connector::send_message(&mut stream, &handshake).await {
Ok(()) => {
if let Ok(mes) = Connector::receive_message(&mut stream).await {
let (ch_tx, ch_rx) = mpsc::channel::<ProtocolMessage>(100);
let peer = match mes {
ProtocolMessage::HandshakeAck { peer_id, .. } => {
node::TcpPeer::new(peer_id, addr, ch_tx)
}
_ => {
log(msg!(
ERROR,
"Invalid Message On Connetion Establishment: {mes}"
));
return;
}
};
let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone()));
let _ = self.exec_tx.send(cmd).await;
Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx)
.start()
.await;
}
}
Err(e) => print_error_chain(&e.into()),
}
}
async fn establish_connection_inbound(
&self,
mut stream: tokio::net::TcpStream,
addr: SocketAddr,
) {
pub async fn establish_connection_to_seed(
&self,
mut stream: tokio::net::TcpStream,
addr: SocketAddr,
) {
let handshake = ProtocolMessage::Handshake {
peer_id: self.node_id,
version: "".to_string(),
};
match Connector::send_message(&mut stream, &handshake).await {
Ok(()) => {
if let Ok(mes) = Connector::receive_message(&mut stream).await {
let (ch_tx, ch_rx) = mpsc::channel::<ProtocolMessage>(100);
let peer = match mes {
ProtocolMessage::Handshake { peer_id, .. } => {
let ack = ProtocolMessage::HandshakeAck {
peer_id: self.node_id,
version: "".to_string(),
};
match Connector::send_message(&mut stream, &ack).await {
Ok(()) => node::TcpPeer::new(peer_id, addr, ch_tx),
Err(e) => return print_error_chain(&e.into()),
}
}
_ => {
log(msg!(
ERROR,
"Invalid Message On Connetion Establishment: {mes}"
));
return;
}
};
let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone()));
let _ = self.exec_tx.send(cmd).await;
Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx)
.start()
.await;
let (ch_tx, ch_rx) = mpsc::channel::<ProtocolMessage>(100);
let peer = match mes {
ProtocolMessage::HandshakeAck { peer_id, .. } => {
node::TcpPeer::new(peer_id, addr, ch_tx)
}
_ => {
log(msg!(
ERROR,
"Invalid Message On Connetion Establishment: {mes}"
));
return;
}
};
let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone()));
publish_network_event(NetworkEvent::SeedConnected(addr.to_string()));
let _ = self.exec_tx.send(cmd).await;
Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx)
.start()
.await;
}
}
Err(e) => print_error_chain(&e.into()),
}
}
pub async fn send_message(
stream: &mut net::TcpStream,
message: &ProtocolMessage,
) -> Result<(), NetworkError> {
let json = serde_json::to_string(message)
.map_err(|_e| {
NetworkError::TODO
})?;
let data = json.as_bytes();
let len = data.len() as u32;
stream
.write_all(&len.to_be_bytes())
.await
.map_err(|_e| {
NetworkError::TODO
})?;
stream
.write_all(data)
.await
.map_err(|_e| {
NetworkError::TODO
})?;
stream.flush().await
.map_err(|_e| {
NetworkError::TODO
})?;
Ok(())
}
pub async fn receive_message(
stream: &mut tokio::net::TcpStream,
) -> Result<ProtocolMessage, error::NetworkError> {
let mut len_bytes = [0u8; 4];
stream
.read_exact(&mut len_bytes)
.await
.map_err(|_e| {
NetworkError::TODO
})?;
let len = u32::from_be_bytes(len_bytes) as usize;
if len >= super::message::MAX_MESSAGE_SIZE {
return Err(NetworkError::TODO);
async fn establish_connection_outbound(
&self,
mut stream: tokio::net::TcpStream,
addr: SocketAddr,
) {
let handshake = ProtocolMessage::Handshake {
peer_id: self.node_id,
version: "".to_string(),
};
match Connector::send_message(&mut stream, &handshake).await {
Ok(()) => {
if let Ok(mes) = Connector::receive_message(&mut stream).await {
let (ch_tx, ch_rx) = mpsc::channel::<ProtocolMessage>(100);
let peer = match mes {
ProtocolMessage::HandshakeAck { peer_id, .. } => {
node::TcpPeer::new(peer_id, addr, ch_tx)
}
_ => {
log(msg!(
ERROR,
"Invalid Message On Connetion Establishment: {mes}"
));
return;
}
};
let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone()));
let _ = self.exec_tx.send(cmd).await;
Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx)
.start()
.await;
}
let mut data = vec![0u8; len];
stream
.read_exact(&mut data)
.await
.map_err(|_e| {
NetworkError::TODO
})?;
let json = String::from_utf8(data)
.map_err(|_e| {
NetworkError::TODO
})?;
let message: ProtocolMessage = serde_json::from_str(&json)
.map_err(|_e| {
NetworkError::TODO
})?;
Ok(message)
}
Err(e) => print_error_chain(&e.into()),
}
}
async fn establish_connection_inbound(
&self,
mut stream: tokio::net::TcpStream,
addr: SocketAddr,
) {
if let Ok(mes) = Connector::receive_message(&mut stream).await {
let (ch_tx, ch_rx) = mpsc::channel::<ProtocolMessage>(100);
let peer = match mes {
ProtocolMessage::Handshake { peer_id, .. } => {
let ack = ProtocolMessage::HandshakeAck {
peer_id: self.node_id,
version: "".to_string(),
};
match Connector::send_message(&mut stream, &ack).await {
Ok(()) => node::TcpPeer::new(peer_id, addr, ch_tx),
Err(e) => return print_error_chain(&e.into()),
}
}
_ => {
log(msg!(
ERROR,
"Invalid Message On Connetion Establishment: {mes}"
));
return;
}
};
let cmd = ExecutorCommand::Node(node::NodeCommand::AddPeer(peer.clone()));
let _ = self.exec_tx.send(cmd).await;
Connection::new(self.node_id, peer.id, stream, self.exec_tx.clone(), ch_rx)
.start()
.await;
}
}
pub async fn send_message(
stream: &mut net::TcpStream,
message: &ProtocolMessage,
) -> Result<(), NetworkError> {
let json = serde_json::to_string(message).map_err(|_e| NetworkError::TODO)?;
let data = json.as_bytes();
let len = data.len() as u32;
stream
.write_all(&len.to_be_bytes())
.await
.map_err(|_e| NetworkError::TODO)?;
stream
.write_all(data)
.await
.map_err(|_e| NetworkError::TODO)?;
stream.flush().await.map_err(|_e| NetworkError::TODO)?;
Ok(())
}
pub async fn receive_message(
stream: &mut tokio::net::TcpStream,
) -> Result<ProtocolMessage, error::NetworkError> {
let mut len_bytes = [0u8; 4];
stream
.read_exact(&mut len_bytes)
.await
.map_err(|_e| NetworkError::TODO)?;
let len = u32::from_be_bytes(len_bytes) as usize;
if len >= super::message::MAX_MESSAGE_SIZE {
return Err(NetworkError::TODO);
}
let mut data = vec![0u8; len];
stream
.read_exact(&mut data)
.await
.map_err(|_e| NetworkError::TODO)?;
let json = String::from_utf8(data).map_err(|_e| NetworkError::TODO)?;
let message: ProtocolMessage = serde_json::from_str(&json).map_err(|_e| NetworkError::TODO)?;
Ok(message)
}
}

View File

@ -6,87 +6,91 @@ pub const MAX_MESSAGE_SIZE: usize = 1_000_000;
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
pub enum ProtocolMessage {
BootstrapRequest {
peer_id: uuid::Uuid,
version: String,
},
BootstrapResponse {
blocks: Option<String>,
},
GetPeersRequest {
peer_id: uuid::Uuid,
},
GetPeersResponse {
peer_addresses: Vec<SocketAddr>,
},
Handshake {
peer_id: uuid::Uuid,
version: String,
},
HandshakeAck {
peer_id: uuid::Uuid,
version: String,
},
Block {
peer_id: uuid::Uuid,
height: u64,
block: core::Block,
},
ChainData {
peer_id: uuid::Uuid,
data: ChainData,
},
Ping {
peer_id: uuid::Uuid,
},
Pong {
peer_id: uuid::Uuid,
},
Disconnect {
peer_id: uuid::Uuid,
},
BootstrapRequest {
peer_id: uuid::Uuid,
version: String,
},
BootstrapResponse {
blocks: Option<String>,
},
GetPeersRequest {
peer_id: uuid::Uuid,
},
GetPeersResponse {
peer_addresses: Vec<SocketAddr>,
},
Handshake {
peer_id: uuid::Uuid,
version: String,
},
HandshakeAck {
peer_id: uuid::Uuid,
version: String,
},
Block {
peer_id: uuid::Uuid,
height: u64,
block: core::Block,
},
ChainData {
peer_id: uuid::Uuid,
data: ChainData,
},
Ping {
peer_id: uuid::Uuid,
},
Pong {
peer_id: uuid::Uuid,
},
Disconnect {
peer_id: uuid::Uuid,
},
}
impl fmt::Display for ProtocolMessage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ProtocolMessage::BootstrapRequest { peer_id, version } => {
write!(f, "BootstrapRequest from {} (v{})", peer_id, version)
}
ProtocolMessage::BootstrapResponse { blocks } => {
write!(f, "BootstrapResponse with {:?} blocks", blocks.clone().unwrap_or_default().len())
}
ProtocolMessage::GetPeersRequest { peer_id } => {
write!(f, "GetPeersRequest from {}", peer_id)
}
ProtocolMessage::GetPeersResponse { peer_addresses } => {
write!(f, "GetPeersResponse with {} peers", peer_addresses.len())
}
ProtocolMessage::Handshake { peer_id, version } => {
write!(f, "Handshake from {} (v{})", peer_id, version)
}
ProtocolMessage::HandshakeAck { peer_id, version } => {
write!(f, "HandshakeAck from {} (v{})", peer_id, version)
}
ProtocolMessage::Block {
peer_id,
height,
block: _,
} => {
write!(f, "Block #{} from {}", height, peer_id)
}
ProtocolMessage::ChainData { peer_id, data: _ } => {
write!(f, "ChainData from {}", peer_id)
}
ProtocolMessage::Ping { peer_id } => {
write!(f, "Ping from {}", peer_id)
}
ProtocolMessage::Pong { peer_id } => {
write!(f, "Pong from {}", peer_id)
}
ProtocolMessage::Disconnect { peer_id } => {
write!(f, "Disconnect from {}", peer_id)
}
}
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ProtocolMessage::BootstrapRequest { peer_id, version } => {
write!(f, "BootstrapRequest from {} (v{})", peer_id, version)
}
ProtocolMessage::BootstrapResponse { blocks } => {
write!(
f,
"BootstrapResponse with {:?} blocks",
blocks.clone().unwrap_or_default().len()
)
}
ProtocolMessage::GetPeersRequest { peer_id } => {
write!(f, "GetPeersRequest from {}", peer_id)
}
ProtocolMessage::GetPeersResponse { peer_addresses } => {
write!(f, "GetPeersResponse with {} peers", peer_addresses.len())
}
ProtocolMessage::Handshake { peer_id, version } => {
write!(f, "Handshake from {} (v{})", peer_id, version)
}
ProtocolMessage::HandshakeAck { peer_id, version } => {
write!(f, "HandshakeAck from {} (v{})", peer_id, version)
}
ProtocolMessage::Block {
peer_id,
height,
block: _,
} => {
write!(f, "Block #{} from {}", height, peer_id)
}
ProtocolMessage::ChainData { peer_id, data: _ } => {
write!(f, "ChainData from {}", peer_id)
}
ProtocolMessage::Ping { peer_id } => {
write!(f, "Ping from {}", peer_id)
}
ProtocolMessage::Pong { peer_id } => {
write!(f, "Pong from {}", peer_id)
}
ProtocolMessage::Disconnect { peer_id } => {
write!(f, "Disconnect from {}", peer_id)
}
}
}
}

View File

@ -0,0 +1,93 @@
use ratatui::layout::{ Rect, Flex };
use ratatui::prelude::*;
use super::{Pane, RenderBuffer, RenderTarget};
#[derive(Debug, Clone, clap::ValueEnum)]
pub enum RenderLayoutKind {
#[value(name = "horizontal", aliases = ["h"])]
CliHorizontal,
#[value(name = "vertical", aliases = ["v"])]
CliVertical,
}
const CLI_INPUT_PREFIX: &str = "> ";
#[derive(Debug)]
pub struct RenderLayout {
layout: Layout,
pub panes: Vec<Pane>,
}
impl Widget for &mut RenderLayout {
fn render(self, area: Rect, buffer: &mut Buffer) {
let rects = self.rects(area);
for (p, r) in self.panes.iter_mut().zip(rects.iter()) {
p.render(*r, buffer);
}
}
}
pub fn center(area: Rect, horizontal: Constraint, vertical: Constraint) -> Rect {
let [area] = Layout::horizontal([horizontal])
.flex(Flex::Center)
.areas(area);
let [area] = Layout::vertical([vertical]).flex(Flex::Center).areas(area);
area
}
impl RenderLayout {
pub fn rects(&self, area: Rect) -> std::rc::Rc<[Rect]> {
self.layout.split(area)
}
pub fn generate(kind: RenderLayoutKind) -> RenderLayout {
match kind {
RenderLayoutKind::CliVertical => RenderLayout {
layout: Layout::default()
.constraints([Constraint::Percentage(70), Constraint::Percentage(30)]),
panes: vec![
Pane::new(
Some(" Input Pane ".to_string()),
RenderTarget::CliInput,
RenderBuffer::List {
list: vec![String::new()],
index: 0,
prefix: CLI_INPUT_PREFIX,
},
true,
),
Pane::new(
Some(" Output Pane ".to_string()),
RenderTarget::CliOutput,
RenderBuffer::String(String::new()),
false,
),
],
},
RenderLayoutKind::CliHorizontal => RenderLayout {
layout: Layout::default()
.constraints([Constraint::Percentage(70), Constraint::Percentage(30)]),
panes: vec![
Pane::new(
Some(" Output Pane ".to_string()),
RenderTarget::CliOutput,
RenderBuffer::String(String::new()),
false,
),
Pane::new(
Some(" Input Pane ".to_string()),
RenderTarget::CliInput,
RenderBuffer::List {
list: vec![String::new()],
index: 0,
prefix: CLI_INPUT_PREFIX,
},
true,
),
],
},
}
}
}

172
node/src/renderer/pane.rs Normal file
View File

@ -0,0 +1,172 @@
use std::sync::Arc;
use ratatui::prelude::*;
use ratatui::widgets::{Clear, Wrap};
use ratatui::{
buffer::Buffer,
layout::Rect,
symbols::border,
widgets::{Block, List, Paragraph, Widget},
};
use vlogger::{msg, DEBUG};
use crate::log;
use super::center;
#[derive(Clone, Debug)]
pub enum RenderBuffer {
List {
list: Vec<String>,
index: usize,
prefix: &'static str,
},
String(String),
Select(Arc<Vec<String>>, usize),
}
#[derive(Debug, PartialEq, Clone)]
pub enum RenderTarget {
All,
CliInput,
CliOutput,
PopUp
}
#[derive(Debug)]
pub struct Pane {
pub title: Option<String>,
pub target: RenderTarget,
pub buffer: RenderBuffer,
pub focused: bool,
pub scroll: i16,
pub max_scroll: i16,
}
impl Pane {
pub fn new(
title: Option<String>,
target: RenderTarget,
buffer: RenderBuffer,
focused: bool,
) -> Self {
Self {
title,
target,
buffer,
focused,
scroll: 0,
max_scroll: 0,
}
}
}
impl Widget for &mut Pane {
fn render(self, area: Rect, buf: &mut Buffer) {
let block = Block::bordered()
.title({
if let Some(t) = &self.title {
t.clone()
} else {
Default::default()
}
})
.border_set(border::PLAIN)
.border_style({
if self.focused {
Style::new().green()
} else {
Style::new().white()
}
});
let inner_area = block.inner(area);
let content_width = inner_area.width as usize;
let content_height = inner_area.height as usize;
match &self.buffer {
RenderBuffer::String(s) => {
let wrapped_lines = s
.lines()
.map(|line| {
if line.is_empty() {
1
} else {
(line.len() + content_width - 1) / { content_width + (content_width == 0) as usize }
}
})
.sum::<usize>();
self.max_scroll = if wrapped_lines > content_height {
(wrapped_lines - content_height) as i16
} else {
0
};
let scroll_offset = self.max_scroll.saturating_sub(self.scroll) as u16;
Paragraph::new(s.clone())
.wrap(Wrap::default())
.left_aligned()
.block(block)
.scroll((scroll_offset as u16, 0))
.render(area, buf);
}
RenderBuffer::Select(list, idx) => {
let rect = center(area, Constraint::Percentage(60), Constraint::Percentage(60));
Clear.render(rect, buf);
self.max_scroll = if list.len() > content_height {
(list.len() - content_height) as i16
} else {
0
};
let scroll_offset = self.max_scroll.saturating_sub(self.scroll) as u16;
log(msg!(DEBUG, "idx {idx}"));
let list_w = List::new(
list
.iter()
.skip(scroll_offset as usize)
.take(content_height)
.enumerate()
.map(|(i, s)| {
Line::from(format!(
"{}{}",
"",
textwrap::fill(s, content_width.saturating_sub(2))
)).style(if i + scroll_offset as usize == *idx {
Style::new().fg(Color::Blue).bg(Color::Green)
} else {
Style::default()
})
}),
)
.block(block);
Widget::render(list_w, rect, buf);
}
RenderBuffer::List { list, prefix, .. } => {
self.max_scroll = if list.len() > content_height {
(list.len() - content_height) as i16
} else {
0
};
let scroll_offset = self.max_scroll.saturating_sub(self.scroll) as u16;
let list_w = List::new(
list
.iter()
.skip(scroll_offset as usize)
.take(content_height)
.map(|s| {
format!(
"{}{}",
prefix,
textwrap::fill(s, content_width.saturating_sub(2))
)
}),
)
.block(block);
Widget::render(list_w, area, buf);
}
}
}
}

View File

@ -0,0 +1,307 @@
use std::sync::Arc;
use crossterm::event::KeyCode;
use ratatui::{Frame, buffer::Buffer, layout::Rect, widgets::Widget};
use vlogger::*;
use tokio::time::{Duration, timeout};
use crate::log;
use super::*;
#[derive(Debug, Clone)]
pub enum InputMode {
Input,
PopUp(Arc<Vec<String>>, String, usize),
}
#[derive(Debug)]
pub struct Renderer {
buffer: String,
exit: bool,
layout: RenderLayout,
mode: InputMode,
}
#[derive(Clone, Debug)]
pub enum RenderCommand {
RenderStringToPaneId {
str: String,
pane: RenderTarget,
},
RenderStringToPaneFocused {
str: String,
},
RenderKeyInput(KeyCode),
ListMove {
pane: RenderTarget,
index: usize,
},
ChangeLayout(RenderLayoutKind),
ClearPane,
/// Mouse Events
MouseClickLeft(u16, u16),
MouseScrollUp,
MouseScrollDown,
SetMode(InputMode),
Exit,
}
#[allow(dead_code)]
impl Renderer {
pub fn new(layout: RenderLayoutKind) -> Self {
Self {
buffer: String::new(),
exit: false,
layout: RenderLayout::generate(layout),
mode: InputMode::Input,
}
}
fn log(&mut self, msg: String) {
self.buffer.push_str(&msg)
}
pub fn draw(&mut self, frame: &mut Frame) {
frame.render_widget(self, frame.area());
}
fn exit(&mut self) {
log!(DEBUG, "Renderer Exit");
self.exit = true;
}
fn buffer_extend<S: AsRef<str>>(&mut self, input: S) {
self.buffer.push_str(input.as_ref());
}
fn input_pane(&mut self) -> Option<&mut Pane> {
self
.layout
.panes
.iter_mut()
.find(|p| p.target == RenderTarget::CliInput)
}
fn get_pane(&mut self, pane: RenderTarget) -> Option<&mut Pane> {
self.layout.panes.iter_mut().find(|p| p.target == pane)
}
fn focused(&mut self) -> Option<&mut Pane> {
self.layout.panes.iter_mut().find(|p| p.focused == true)
}
pub fn rects(&self, area: Rect) -> std::rc::Rc<[Rect]> {
self.layout.rects(area)
}
pub fn handle_mouse_click_left(&mut self, x: u16, y: u16, rects: std::rc::Rc<[Rect]>) {
for (i, r) in rects.iter().enumerate() {
if r.contains(ratatui::layout::Position { x, y }) {
self.layout.panes[i].focused = true;
} else {
self.layout.panes[i].focused = false;
}
}
}
pub fn handle_scroll_up(&mut self) {
if let Some(p) = self.focused() {
if p.scroll < p.max_scroll {
p.scroll += 1;
}
}
}
pub fn handle_scroll_down(&mut self) {
if let Some(p) = self.focused() {
if p.scroll > i16::MIN {
p.scroll -= 1;
}
}
}
pub fn handle_char_input(&mut self, c: char) {
if let Some(p) = self.input_pane() {
if let RenderBuffer::List { list, index, .. } = &mut p.buffer {
list[*index].push(c);
}
}
}
pub fn handle_backspace(&mut self) {
if let Some(p) = self.input_pane() {
if let RenderBuffer::List { list, index, .. } = &mut p.buffer {
list[*index].pop();
}
}
}
pub fn handle_enter(&mut self) {
if let Some(p) = self.input_pane() {
if let RenderBuffer::List { list, index, .. } = &mut p.buffer {
list.push(String::new());
*index += 1;
}
}
}
pub fn handle_arrow_key(&mut self, key: KeyCode) {
match &mut self.mode {
InputMode::Input => {}
InputMode::PopUp(content, .., idx) => {
log(msg!(DEBUG, "Received keycode: {key}"));
log(msg!(DEBUG, "idx before: {idx}"));
match key {
KeyCode::Up => { *idx = idx.saturating_sub(1) }
KeyCode::Down => {
if *idx < content.len().saturating_sub(1) {
*idx += 1;
}
}
_ => {}
}
log(msg!(DEBUG, "idx after: {idx}"))
}
}
if let Some(pane) = self.focused() {
match &pane.target {
RenderTarget::CliInput => {}
RenderTarget::CliOutput => {}
_ => {}
}
}
}
pub fn list_move(&mut self, pane: RenderTarget, index: usize) {
if let Some(p) = self.get_pane(pane) {
if let RenderBuffer::List {
list, index: idx, ..
} = &mut p.buffer
{
if index > 0 && index < list.len() {
list[*idx] = list[index].clone();
}
}
}
}
pub fn render_string_to_focused(&mut self, str: String) {
if let Some(p) = self.focused() {
match &mut p.buffer {
RenderBuffer::List { list, index, .. } => {
list.push(str);
*index += 1;
}
RenderBuffer::String(s) => s.push_str(&str),
_ => {}
}
}
}
pub fn render_string_to_id(&mut self, str: String, pane: RenderTarget) {
if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) {
match &mut p.buffer {
RenderBuffer::List { list, index, .. } => {
list.push(str);
*index += 1;
}
RenderBuffer::String(s) => s.push_str(&str),
_ => {}
}
}
}
pub fn clear_pane(&mut self, pane: RenderTarget) {
if matches!(pane, RenderTarget::All) {
for p in self.layout.panes.iter_mut() {
match &mut p.buffer {
RenderBuffer::List { list, index, .. } => {
list.clear();
*index = 0;
list.push(String::new());
}
RenderBuffer::String(s) => s.clear(),
_ => {}
}
}
} else if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) {
match &mut p.buffer {
RenderBuffer::List { list, index, .. } => {
list.clear();
*index = 0;
list.push(String::new());
}
RenderBuffer::String(s) => s.clear(),
_ => {}
}
}
}
pub fn apply(&mut self, mes: RenderCommand, area: Rect) {
let rects = self.layout.rects(area);
match mes {
RenderCommand::MouseClickLeft(x, y) => self.handle_mouse_click_left(x, y, rects),
RenderCommand::MouseScrollUp => self.handle_scroll_up(),
RenderCommand::MouseScrollDown => self.handle_scroll_down(),
RenderCommand::RenderKeyInput(k) => match k {
KeyCode::Char(c) => self.handle_char_input(c),
KeyCode::Backspace => self.handle_backspace(),
KeyCode::Enter => self.handle_enter(),
KeyCode::Up | KeyCode::Down | KeyCode::Left | KeyCode::Right => self.handle_arrow_key(k),
_ => {}
},
RenderCommand::ListMove { pane, index } => self.list_move(pane, index),
RenderCommand::RenderStringToPaneFocused { str } => self.render_string_to_focused(str),
RenderCommand::RenderStringToPaneId { str, pane } => self.render_string_to_id(str, pane),
RenderCommand::Exit => self.exit(),
RenderCommand::ChangeLayout(l) => self.layout = RenderLayout::generate(l),
RenderCommand::ClearPane => self.clear_pane(RenderTarget::All),
RenderCommand::SetMode(mode) => {
match &mode {
InputMode::Input => {
if let InputMode::PopUp(..) = self.mode {
self.layout.panes.pop();
}
}
InputMode::PopUp(content, title, ..) => {
let pane = Pane::new(
Some(title.to_string()),
RenderTarget::PopUp,
RenderBuffer::Select(content.clone(), 0),
true,
);
self.layout.panes.push(pane);
}
}
self.mode = mode
},
}
}
async fn listen(
&mut self,
rx: &mut tokio::sync::broadcast::Receiver<RenderCommand>,
) -> Result<RenderCommand, ()> {
if let Ok(Ok(mes)) = timeout(Duration::from_millis(400), rx.recv()).await {
return Ok(mes);
}
Err(())
}
}
impl Widget for &mut Renderer {
fn render(self, area: Rect, buf: &mut Buffer) {
let rects = self.layout.rects(area);
for (i, p) in self.layout.panes.iter_mut().enumerate() {
if p.target == RenderTarget::PopUp {
p.render(area, buf);
} else {
p.render(rects[i], buf)
}
}
}
}

View File

@ -2,9 +2,9 @@ use once_cell::sync::Lazy;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
pub static SEED_NODES: Lazy<[SocketAddr; 3]> = Lazy::new(|| {
[
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8333),
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 3000),
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 5432),
]
[
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8333),
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 3000),
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 5432),
]
});

150
node/src/watcher/builder.rs Normal file
View File

@ -0,0 +1,150 @@
use std::net::SocketAddr;
use tokio::sync::mpsc;
use vlogger::*;
use crate::bus::{NetworkEvent, SystemEvent, subscribe_system_event};
use crate::core;
use crate::executor::{Executor, ExecutorCommand};
use crate::log;
use crate::node::{Node, NodeCommand};
use crate::renderer::{RenderLayoutKind, Renderer};
use crate::watcher::Watcher;
#[derive(Default)]
pub struct WatcherBuilder {
addr: Option<SocketAddr>,
database: Option<String>,
bootstrap: bool,
debug: bool,
seed: bool,
render: bool,
}
impl WatcherBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn addr(mut self, addr: Option<SocketAddr>) -> Self {
self.addr = addr;
self
}
pub fn database(mut self, database: Option<String>) -> Self {
self.database = database;
self
}
pub fn debug(mut self, debug: bool) -> Self {
self.debug = debug;
self
}
pub fn bootstrap(mut self, bootstrap: bool) -> Self {
self.bootstrap = bootstrap;
self
}
pub fn render(mut self, render: bool) -> Self {
self.render = render;
self
}
pub fn seed(mut self, seed: bool) -> Self {
self.seed = seed;
self
}
pub async fn start(mut self) -> Watcher {
let (exec_tx, exec_rx) = mpsc::channel::<ExecutorCommand>(100);
let mut sys_event = subscribe_system_event();
if self.debug {
Watcher::log_memory().await;
}
let renderer = Renderer::new(RenderLayoutKind::CliHorizontal);
log(msg!(DEBUG, "Database Location: {:?}", self.database));
if self.seed {
self.addr = Some(crate::seeds_constants::SEED_NODES[0]);
}
let chain = core::Blockchain::build(None).unwrap();
let mut node = Node::new(self.addr.clone(), exec_tx.clone(), chain);
log(msg!(INFO, "Built Node"));
let executor_handle = tokio::spawn({
let node_tx = node.tx();
async move {
let _ = Executor::new(node_tx, exec_rx).run().await;
}
});
for i in 0..3 {
if let Ok(ev) = sys_event.recv().await {
match ev {
SystemEvent::ExecutorStarted => {
log(msg!(INFO, "Executor Started"));
break;
}
_ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")),
}
}
}
let node_tx = node.tx();
let node_handle = tokio::spawn({
async move {
node.run().await;
}
});
for i in 0..3 {
if let Ok(ev) = sys_event.recv().await {
match ev {
SystemEvent::NodeStarted => {
log(msg!(INFO, "Executor Started"));
break;
}
_ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")),
}
}
}
if self.bootstrap {
let exec_tx = exec_tx.clone();
tokio::spawn(async move {
let seed_cmd = ExecutorCommand::Node(NodeCommand::ConnectToSeeds);
let mut ev_rx = crate::bus::subscribe_network_event();
let _ = exec_tx.send(seed_cmd).await;
while let Ok(e) = ev_rx.recv().await {
match e {
NetworkEvent::SeedConnected(_) => {
let bootstrap_cmd = ExecutorCommand::Node(NodeCommand::BootStrap);
let _ = exec_tx.send(bootstrap_cmd).await;
}
_ => {}
}
}
});
}
let cmd_history = Vec::new();
let history_index = 0;
let cmd_buffer = String::new();
let handles = vec![executor_handle, node_handle];
Watcher::new(
node_tx,
exec_tx,
cmd_buffer,
cmd_history,
history_index,
handles,
renderer,
)
}
}

View File

@ -0,0 +1,20 @@
use std::sync::Arc;
use crate::{executor::ExecutorCommand, renderer::RenderCommand};
#[derive(Debug, Clone)]
pub enum WatcherCommand {
Render(RenderCommand),
SetMode(WatcherMode),
}
#[derive(Debug, Clone)]
pub enum WatcherMode {
Input,
Select{
content: Arc<Vec<String>>,
title: String,
callback: Box<ExecutorCommand>,
index: usize,
},
}

View File

@ -1,101 +0,0 @@
use crate::{
bus::{SystemEvent, publish_render_event, publish_system_event},
log,
node::node::NodeCommand,
watcher::renderer::*,
};
use thiserror::Error;
use tokio::sync::mpsc;
use vlogger::*;
use super::RenderCommand;
#[derive(Debug, Error)]
pub enum InProcessError {
#[error("TODO: {0}")]
TODO(String),
}
#[derive(Clone, Debug)]
pub enum ExecutorCommand {
NodeResponse(String),
Echo(Vec<String>),
Print(String),
InvalidCommand(String),
Node(NodeCommand),
Render(RenderCommand),
Exit,
}
pub struct Executor {
node_tx: mpsc::Sender<NodeCommand>,
rx: mpsc::Receiver<ExecutorCommand>,
exit: bool,
}
impl Executor {
pub fn new(node_tx: mpsc::Sender<NodeCommand>, rx: mpsc::Receiver<ExecutorCommand>) -> Self {
Self {
node_tx,
rx,
exit: false,
}
}
pub async fn run(&mut self) {
publish_system_event(SystemEvent::ExecutorStarted);
while !self.exit {
self.listen().await;
}
}
async fn exit(&mut self) {
log(msg!(DEBUG, "Executor Exit"));
self.exit = true
}
async fn listen(&mut self) {
if let Some(cmd) = self.rx.recv().await {
let _ = self.execute(cmd).await;
}
}
async fn send_node_cmd(&self, cmd: NodeCommand) {
self.node_tx.send(cmd).await.unwrap()
}
async fn handle_node_cmd(&self, cmd: NodeCommand) {
self.send_node_cmd(cmd).await;
}
async fn echo(&self, s: Vec<String>) {
let mut str = s.join(" ");
str.push_str("\n");
let rd_cmd = RenderCommand::RenderStringToPane {
str,
pane: RenderPane::CliOutput,
};
publish_render_event(rd_cmd);
}
async fn invalid_command(&self, str: String) {
let rd_cmd = RenderCommand::RenderStringToPane {
str,
pane: RenderPane::CliOutput,
};
publish_render_event(rd_cmd);
}
async fn execute(&mut self, cmd: ExecutorCommand) {
match cmd {
ExecutorCommand::NodeResponse(resp) => log(resp),
ExecutorCommand::Node(n) => self.handle_node_cmd(n).await,
ExecutorCommand::Render(p) => publish_render_event(p),
ExecutorCommand::Echo(s) => self.echo(s).await,
ExecutorCommand::Print(s) => log(s),
ExecutorCommand::InvalidCommand(str) => self.invalid_command(str).await,
ExecutorCommand::Exit => self.exit().await,
}
}
}

View File

@ -1,63 +0,0 @@
use crate::cli::cli;
use crate::watcher::executor::ExecutorCommand;
use vlogger::*;
use tokio::time::{Duration, timeout};
use tokio::sync::mpsc;
#[derive(Debug)]
pub struct Parser {
rx: mpsc::Receiver<ParserCommand>,
exec_tx: mpsc::Sender<ExecutorCommand>,
exit: bool,
}
pub enum ParserCommand {
ParseCmdString(String),
Exit,
}
impl Parser {
pub fn new(rx: mpsc::Receiver<ParserCommand>, exec_tx: mpsc::Sender<ExecutorCommand>) -> Self {
Self {
rx,
exec_tx,
exit: false,
}
}
async fn exit(&mut self) {
self.log(msg!(DEBUG, "Parser Exit")).await;
self.exit = true;
}
pub async fn run(&mut self) {
self.log(msg!(INFO, "Started Parser")).await;
while !self.exit {
self.listen().await;
}
}
async fn log(&self, msg: String) {
if let Err(e) = self.exec_tx.send(ExecutorCommand::Print(msg)).await {
log!(ERROR, "Error response from exec: {e}");
}
}
async fn listen(&mut self) {
if let Ok(Some(mes)) = timeout(Duration::from_millis(400), self.rx.recv()).await {
match mes {
ParserCommand::ParseCmdString(s) => {
let argv: Vec<&str> =
std::iter::once(" ").chain(s.split_whitespace()).collect();
let cmd = cli(&argv);
let _ = self.exec_tx.send(cmd).await;
}
ParserCommand::Exit => {
self.exit().await;
}
}
}
}
}

View File

@ -1,405 +0,0 @@
use crossterm::event::KeyCode;
use ratatui::prelude::*;
use ratatui::widgets::Wrap;
use ratatui::{
Frame,
buffer::Buffer,
layout::Rect,
symbols::border,
widgets::{Block, List, Paragraph, Widget},
};
use vlogger::*;
use crate::bus::{SystemEvent, publish_system_event, subscribe_render_event};
use std::io;
use tokio::time::{Duration, interval, timeout};
#[derive(Debug)]
pub struct Renderer {
buffer: String,
exit: bool,
layout: RenderLayout,
}
#[derive(Debug)]
pub struct Pane {
title: Option<String>,
target: RenderPane,
buffer: RenderBuffer,
focused: bool,
scroll: i16,
max_scroll: i16,
}
#[derive(Debug, PartialEq, Clone, clap::ValueEnum)]
pub enum RenderPane {
#[value(name = "all", aliases = ["a"])]
All,
#[value(aliases = ["i", "in"])]
CliInput,
#[value(aliases = ["o", "out"])]
CliOutput,
}
#[derive(Clone, Debug)]
enum RenderBuffer {
List { list: Vec<String>, index: usize },
String(String),
}
impl Pane {
fn render(&mut self, area: Rect, buf: &mut Buffer) {
let block = Block::bordered()
.title({
if let Some(t) = &self.title {
t.clone()
} else {
Default::default()
}
})
.border_set(border::PLAIN)
.border_style({
if self.focused {
Style::new().green()
} else {
Style::new().white()
}
});
let inner_area = block.inner(area);
let content_width = inner_area.width as usize;
let content_height = inner_area.height as usize;
match &self.buffer {
RenderBuffer::String(s) => {
let wrapped_lines = s
.lines()
.map(|line| {
if line.is_empty() {
1
} else {
(line.len() + content_width - 1) / {
content_width + (content_width == 0) as usize
}
}
})
.sum::<usize>();
self.max_scroll = if wrapped_lines > content_height {
(wrapped_lines - content_height) as i16
} else {
0
};
let scroll_offset = self.max_scroll.saturating_sub(self.scroll) as u16;
Paragraph::new(s.clone())
.wrap(Wrap::default())
.left_aligned()
.block(block)
.scroll((scroll_offset as u16, 0))
.render(area, buf);
}
RenderBuffer::List { list, .. } => {
let list_w =
List::new(list.iter().map(|s| {
format!("> {}", textwrap::fill(s, content_width.saturating_sub(2)))
}))
.block(block);
Widget::render(list_w, area, buf);
}
}
}
}
#[derive(Clone, Debug)]
pub enum RenderCommand {
RenderStringToPane {
str: String,
pane: RenderPane,
},
RenderInput(KeyCode),
ListMove {
pane: RenderPane,
index: usize,
},
ChangeLayout(RenderLayoutKind),
ClearPane(RenderPane),
/// Mouse Events
MouseClickLeft(u16, u16),
MouseScrollUp,
MouseScrollDown,
Exit,
}
#[derive(Debug, Clone, clap::ValueEnum)]
pub enum RenderLayoutKind {
#[value(name = "horizontal", aliases = ["h"])]
CliHorizontal,
#[value(name = "vertical", aliases = ["v"])]
CliVertical,
}
#[derive(Debug)]
pub struct RenderLayout {
kind: RenderLayoutKind,
panes: Vec<Pane>,
}
impl RenderLayoutKind {
pub fn rects(&self, area: Rect) -> std::rc::Rc<[Rect]> {
match self {
Self::CliHorizontal => Layout::default()
.direction(Direction::Vertical)
.constraints(vec![Constraint::Percentage(70), Constraint::Percentage(30)])
.split(area),
Self::CliVertical => Layout::default()
.direction(Direction::Horizontal)
.constraints(vec![Constraint::Percentage(30), Constraint::Percentage(70)])
.split(area),
}
}
pub fn generate(&self) -> RenderLayout {
match self {
RenderLayoutKind::CliVertical => RenderLayout {
kind: self.clone(),
panes: vec![
Pane {
title: Some(" Input Pane ".to_string()),
target: RenderPane::CliInput,
buffer: RenderBuffer::List {
list: vec![String::new()],
index: 0,
},
focused: true,
scroll: 0,
max_scroll: 0,
},
Pane {
title: Some(" Output Pane ".to_string()),
target: RenderPane::CliOutput,
buffer: RenderBuffer::String(String::new()),
focused: false,
scroll: 0,
max_scroll: 0,
},
],
},
RenderLayoutKind::CliHorizontal => RenderLayout {
kind: self.clone(),
panes: vec![
Pane {
title: Some(" Output Pane ".to_string()),
target: RenderPane::CliOutput,
buffer: RenderBuffer::String(String::new()),
focused: false,
scroll: 0,
max_scroll: 0,
},
Pane {
title: Some(" Input Pane ".to_string()),
target: RenderPane::CliInput,
buffer: RenderBuffer::List {
list: vec![String::new()],
index: 0,
},
focused: true,
scroll: 0,
max_scroll: 0,
},
],
},
}
}
}
#[allow(dead_code)]
impl Renderer {
pub fn new(layout: RenderLayoutKind) -> Self {
Self {
buffer: String::new(),
exit: false,
layout: layout.generate(),
}
}
pub async fn run(&mut self) -> io::Result<()> {
self.log(msg!(INFO, "Started Renderer"));
let mut rx = subscribe_render_event();
let mut terminal = ratatui::init();
publish_system_event(SystemEvent::RendererStarted);
let mut render_interval = interval(Duration::from_millis(32)); // 60 FPS
while !self.exit {
tokio::select! {
_ = render_interval.tick() => {
terminal.draw(|frame| self.draw(frame))?;
}
mes = rx.recv() => {
if let Ok(mes) = mes {
let frame = terminal.get_frame();
let rects = self.layout.kind.rects(frame.area());
self.apply(mes, rects);
}
}
}
}
ratatui::restore();
Ok(())
}
fn log(&mut self, msg: String) {
self.buffer.push_str(&msg)
}
pub fn draw(&mut self, frame: &mut Frame) {
frame.render_widget(self, frame.area());
}
fn exit(&mut self) {
log!(DEBUG, "Renderer Exit");
self.exit = true;
}
fn buffer_extend<S: AsRef<str>>(&mut self, input: S) {
self.buffer.push_str(input.as_ref());
}
fn input_pane(&mut self) -> Option<&mut Pane> {
self.layout
.panes
.iter_mut()
.find(|p| p.target == RenderPane::CliInput)
}
fn focused(&mut self) -> Option<&mut Pane> {
self.layout.panes.iter_mut().find(|p| p.focused == true)
}
fn handle_mouse_click_left(&mut self, x: u16, y: u16, rects: std::rc::Rc<[Rect]>) {
for (i, r) in rects.iter().enumerate() {
if r.contains(layout::Position { x, y }) {
self.layout.panes[i].focused = true;
} else {
self.layout.panes[i].focused = false;
}
}
}
fn apply(&mut self, mes: RenderCommand, rects: std::rc::Rc<[Rect]>) {
match mes {
RenderCommand::MouseClickLeft(x, y) => {
self.handle_mouse_click_left(x, y, rects);
}
RenderCommand::MouseScrollUp => {
if let Some(p) = self.focused() {
if p.scroll < p.max_scroll {
p.scroll += 1;
}
}
}
RenderCommand::MouseScrollDown => {
if let Some(p) = self.focused() {
if p.scroll > i16::MIN {
p.scroll -= 1;
}
}
}
RenderCommand::RenderInput(k) => {
if let Some(p) = self.layout.panes.iter_mut().find(|p| p.focused) {
match k {
KeyCode::Char(c) => {
if let RenderBuffer::List { list, index } = &mut p.buffer {
list[*index].push(c);
}
}
KeyCode::Backspace => {
if let RenderBuffer::List { list, index } = &mut p.buffer {
list[*index].pop();
}
}
KeyCode::Enter => {
if let RenderBuffer::List { list, index } = &mut p.buffer {
list.push(String::new());
*index += 1;
}
}
_ => {}
}
}
}
RenderCommand::ListMove { pane, index } => {
if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) {
if let RenderBuffer::List { list, index: idx } = &mut p.buffer {
if index > 0 && index < list.len() {
list[*idx] = list[index].clone();
}
}
}
}
RenderCommand::RenderStringToPane { str, pane } => {
if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) {
match &mut p.buffer {
RenderBuffer::List { list, index } => {
list.push(str);
*index += 1;
}
RenderBuffer::String(s) => s.push_str(&str),
}
}
}
RenderCommand::Exit => {
self.exit();
}
RenderCommand::ChangeLayout(l) => {
self.layout = l.generate();
}
RenderCommand::ClearPane(pane) => {
if matches!(pane, RenderPane::All) {
for p in self.layout.panes.iter_mut() {
match &mut p.buffer {
RenderBuffer::List { list, index } => {
list.clear();
*index = 0;
list.push(String::new());
}
RenderBuffer::String(s) => s.clear(),
}
}
} else if let Some(p) = self.layout.panes.iter_mut().find(|p| p.target == pane) {
match &mut p.buffer {
RenderBuffer::List { list, index } => {
list.clear();
*index = 0;
list.push(String::new());
}
RenderBuffer::String(s) => s.clear(),
}
}
}
}
}
async fn listen(
&mut self,
rx: &mut tokio::sync::broadcast::Receiver<RenderCommand>,
) -> Result<RenderCommand, ()> {
if let Ok(Ok(mes)) = timeout(Duration::from_millis(400), rx.recv()).await {
return Ok(mes);
}
Err(())
}
}
impl Widget for &mut Renderer {
fn render(self, area: Rect, buf: &mut Buffer) {
let rects = self.layout.kind.rects(area);
for (i, p) in self.layout.panes.iter_mut().enumerate() {
p.render(rects[i], buf)
}
}
}

View File

@ -1,330 +1,294 @@
use crossterm::event::{self, Event, KeyCode, KeyEventKind, MouseButton, MouseEventKind};
use crate::{cli::cli, error::print_error_chain, node::node::NodeCommand, watcher::WatcherMode};
use crossterm::event::{Event, EventStream, KeyCode, KeyEventKind, MouseButton, MouseEventKind};
use futures::StreamExt;
use memory_stats::memory_stats;
use std::{
io::{self, Write},
net::SocketAddr,
time::Duration,
};
use tokio::sync::mpsc;
use crate::{
bus::{subscribe_system_event, NetworkEvent, SystemEvent}, core, node::node::{Node, NodeCommand}
use ratatui::{layout::Rect, Terminal};
use std::io::{self, Stdout, Write};
use tokio::{
select,
sync::mpsc,
time::{Duration, interval},
};
use vlogger::*;
use super::*;
use super::{ WatcherBuilder, WatcherCommand };
use crate::bus::{ publish_render_event };
use crate::bus::subscribe_watcher_event;
use crate::executor::*;
use crate::log;
use crate::renderer::*;
#[allow(dead_code)]
pub struct Watcher {
parser_tx: mpsc::Sender<ParserCommand>,
node_tx: mpsc::Sender<NodeCommand>,
exec_tx: mpsc::Sender<ExecutorCommand>,
cmd_buffer: String,
cmd_history: Vec<String>,
history_index: usize,
handles: Vec<tokio::task::JoinHandle<()>>,
event_stream: crossterm::event::EventStream,
mode: WatcherMode,
pub renderer: Renderer,
}
impl Watcher {
pub fn new(
node_tx: mpsc::Sender<NodeCommand>,
exec_tx: mpsc::Sender<ExecutorCommand>,
cmd_buffer: String,
cmd_history: Vec<String>,
history_index: usize,
handles: Vec<tokio::task::JoinHandle<()>>,
}
impl Watcher {
pub fn build() -> WatcherBuilder {
WatcherBuilder::new()
renderer: Renderer,
) -> Self {
Self {
node_tx,
exec_tx,
cmd_buffer,
cmd_history,
history_index,
handles,
renderer,
mode: WatcherMode::Input,
event_stream: EventStream::new(),
}
}
pub fn parser_tx(&self) -> mpsc::Sender<ParserCommand> {
self.parser_tx.clone()
fn init(&self) -> io::Result<()>{
crossterm::execute!(
std::io::stdout(),
crossterm::event::EnableBracketedPaste,
crossterm::event::EnableFocusChange,
crossterm::event::EnableMouseCapture,
)
}
fn shutdown(&self) -> io::Result<()> {
ratatui::restore();
crossterm::execute!(
std::io::stdout(),
crossterm::event::DisableBracketedPaste,
crossterm::event::DisableFocusChange,
crossterm::event::DisableMouseCapture
)
}
pub fn handle_cmd(&mut self, cmd: WatcherCommand, terminal: &mut Terminal<ratatui::backend::CrosstermBackend<Stdout>>) {
match cmd {
WatcherCommand::Render(rend_cmd) => {
let frame = terminal.get_frame();
self.renderer.apply(rend_cmd, frame.area());
}
WatcherCommand::SetMode(mode) => {
match &mode {
WatcherMode::Input => {}
WatcherMode::Select{content, title, ..} => {
let rd_cmd = RenderCommand::SetMode(InputMode::PopUp(content.clone(), title.clone(), 0));
let frame = terminal.get_frame();
self.renderer.apply(rd_cmd, frame.area());
}
}
self.mode = mode;
}
}
}
pub fn exec_tx(&self) -> mpsc::Sender<ExecutorCommand> {
self.exec_tx.clone()
}
pub async fn run(&mut self) -> std::io::Result<()> {
let mut ui_rx = subscribe_watcher_event();
let mut render_interval = interval(Duration::from_millis(32));
let mut terminal = ratatui::init();
pub async fn exit(self) {
let rd_mes = RenderCommand::Exit;
let pr_mes = ParserCommand::Exit;
let exec_mes = ExecutorCommand::Exit;
let node_mes = NodeCommand::Exit;
publish_render_event(rd_mes);
let _ = self.parser_tx.send(pr_mes).await;
let _ = self.exec_tx.send(exec_mes).await;
let _ = self.node_tx.send(node_mes).await;
}
self.init()?;
pub async fn log_memory() {
tokio::spawn(async move {
let id = format!("{}_{}", current_timestamp(), std::process::id());
let mut path = std::path::PathBuf::new();
path.push("./proc/");
path.push(id);
let mut mem_map = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.unwrap();
loop {
let _ = tokio::time::sleep(Duration::from_secs(10)).await;
if let Some(usage) = memory_stats() {
let current = current_timestamp();
let _ = mem_map.write_all(
msg!(
INFO,
"{}: Physical memory usage: {} MB",
current,
usage.physical_mem / 1024 / 1024
)
.as_bytes(),
);
let _ = mem_map.write_all(
msg!(
INFO,
"{}: Virtual memory usage: {} MB",
current,
usage.virtual_mem / 1024 / 1024
)
.as_bytes(),
);
}
loop {
select! {
poll_res = self.poll() => {
match poll_res {
Ok(event) => {
match self.handle_event(event, terminal.get_frame().area()).await {
Ok(ret) => if !ret { self.exit(); break }
Err(e) => log(msg!(ERROR, "{}", e)),
}
}
});
}
pub async fn poll(&mut self) -> io::Result<bool> {
match event::read()? {
Event::Mouse(event) => match event.kind {
MouseEventKind::ScrollUp => {
publish_render_event(RenderCommand::MouseScrollUp);
}
MouseEventKind::ScrollDown => {
publish_render_event(RenderCommand::MouseScrollDown);
}
MouseEventKind::Down(b) => match b {
MouseButton::Left => {
publish_render_event(RenderCommand::MouseClickLeft(
event.column,
event.row,
));
}
_ => {}
},
_ => {}
Err(()) => { log(msg!(ERROR, "Failed to read from Stream")) }
}
}
ui_event = ui_rx.recv() => {
match ui_event {
Ok(cmd) => {
self.handle_cmd(cmd, &mut terminal);
},
Event::Key(k) if k.kind == KeyEventKind::Press => {
match k.code {
KeyCode::Char(c) => {
self.cmd_buffer.push(c);
let message = RenderCommand::RenderInput(k.code);
publish_render_event(message);
}
KeyCode::Backspace => {
self.cmd_buffer.pop();
let message = RenderCommand::RenderInput(k.code);
publish_render_event(message);
}
KeyCode::Enter => {
let rd_mes = RenderCommand::RenderInput(k.code);
let pr_mes = ParserCommand::ParseCmdString(self.cmd_buffer.clone());
let _ = self.parser_tx.send(pr_mes).await;
publish_render_event(rd_mes);
self.cmd_buffer.clear();
}
KeyCode::Up => {
if self.history_index > 0 {
self.history_index -= 1;
let rd_mes = RenderCommand::ListMove {
pane: RenderPane::CliInput,
index: self.history_index,
};
publish_render_event(rd_mes);
}
}
KeyCode::Down => {
if self.history_index < self.cmd_buffer.len() {
self.history_index += 1;
let rd_mes = RenderCommand::ListMove {
pane: RenderPane::CliInput,
index: self.history_index,
};
publish_render_event(rd_mes);
}
}
KeyCode::Esc => {
return Ok(false);
}
_ => {}
};
Err(e) => {
log(msg!(ERROR, "{}", e))
}
_ => {}
}
Ok(true)
}
}
#[derive(Default)]
pub struct WatcherBuilder {
addr: Option<SocketAddr>,
database: Option<String>,
bootstrap: bool,
debug: bool,
seed: bool,
render: bool,
}
impl WatcherBuilder {
fn new() -> Self {
Self::default()
}
pub fn addr(mut self, addr: Option<SocketAddr>) -> Self {
self.addr = addr;
self
}
pub fn database(mut self, database: Option<String>) -> Self {
self.database = database;
self
}
pub fn debug(mut self, debug: bool) -> Self {
self.debug = debug;
self
}
pub fn bootstrap(mut self, bootstrap: bool) -> Self {
self.bootstrap = bootstrap;
self
}
pub fn render(mut self, render: bool) -> Self {
self.render = render;
self
}
pub fn seed(mut self, seed: bool) -> Self {
self.seed = seed;
self
}
pub async fn start(mut self) -> Watcher {
let (parser_tx, parser_rx) = mpsc::channel::<ParserCommand>(100);
let (exec_tx, exec_rx) = mpsc::channel::<ExecutorCommand>(100);
let mut sys_event = subscribe_system_event();
if self.debug {
Watcher::log_memory().await;
}
let render_handle = if self.render {
Some(tokio::spawn({
async move {
let _ = Renderer::new(RenderLayoutKind::CliHorizontal).run().await;
}
}))
} else {
None
};
for i in 0..3 {
if let Ok(ev) = sys_event.recv().await {
match ev {
SystemEvent::RendererStarted => {
log(msg!(INFO, "Renderer Started"));
break;
}
_ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")),
}
}
}
let parser_handle = tokio::spawn({
let exec_tx = exec_tx.clone();
async move {
let _ = Parser::new(parser_rx, exec_tx).run().await;
}
});
log(msg!(DEBUG, "Database Location: {:?}", self.database));
if self.seed {
self.addr = Some(crate::seeds_constants::SEED_NODES[0]);
}
let chain = core::Blockchain::build(None).unwrap();
let mut node = Node::new(self.addr.clone(), exec_tx.clone(), chain);
log(msg!(INFO, "Built Node"));
let executor_handle = tokio::spawn({
let node_tx = node.tx();
async move {
let _ = Executor::new(node_tx, exec_rx).run().await;
}
});
for i in 0..3 {
if let Ok(ev) = sys_event.recv().await {
match ev {
SystemEvent::ExecutorStarted => {
log(msg!(INFO, "Executor Started"));
break;
}
_ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")),
}
}
}
let node_tx = node.tx();
let node_handle = tokio::spawn({
async move {
node.run().await;
}
});
for i in 0..3 {
if let Ok(ev) = sys_event.recv().await {
match ev {
SystemEvent::NodeStarted => {
log(msg!(INFO, "Executor Started"));
break;
}
_ => log(msg!(WARNING, "Wrong Event: {ev:?}! Retrying... (try {i})")),
}
}
}
if self.bootstrap {
let exec_tx = exec_tx.clone();
tokio::spawn(async move {
let seed_cmd = ExecutorCommand::Node(NodeCommand::ConnectToSeeds);
let mut ev_rx = crate::bus::subscribe_network_event();
let _ = exec_tx.send(seed_cmd).await;
while let Ok(e) = ev_rx.recv().await {
match e {
NetworkEvent::SeedConnected(_) => {
let bootstrap_cmd = ExecutorCommand::Node(NodeCommand::BootStrap);
let _ = exec_tx.send(bootstrap_cmd).await;
}
_ => {}
}
}
});
}
Watcher {
node_tx,
cmd_history: Vec::new(),
history_index: 0,
parser_tx,
exec_tx,
cmd_buffer: String::new(),
handles: {
let mut h = vec![parser_handle, executor_handle, node_handle];
if render_handle.is_some() {
h.push(render_handle.unwrap());
}
h
},
}
}
_ = render_interval.tick() => {
terminal.draw(|frame| self.renderer.draw(frame))?;
}
}
}
self.shutdown()
}
pub fn build() -> WatcherBuilder {
WatcherBuilder::new()
}
pub fn exec_tx(&self) -> mpsc::Sender<ExecutorCommand> {
self.exec_tx.clone()
}
pub fn exit(&self) {}
async fn handle_enter(&mut self) {
match &self.mode {
WatcherMode::Input => {
if !self.cmd_buffer.is_empty() {
let exec_event = cli(&self.cmd_buffer);
let _ = self.exec_tx.send(exec_event).await;
self.cmd_buffer.clear();
self.renderer.handle_enter()
}
}
WatcherMode::Select { content, callback, index, .. } => {
match &&**callback {
&ExecutorCommand::Node(nd_cmd) => {
match nd_cmd {
NodeCommand::DisplayBlockByKey(_) => {
let key = (*content)[*index].clone().to_string();
let resp = ExecutorCommand::Node(NodeCommand::DisplayBlockByKey(key));
let _ = self.exec_tx.send(resp).await;
}
_ => {log(msg!(DEBUG, "TODO: Implement callback for {:?}", nd_cmd))}
}
}
_ => {log(msg!(DEBUG, "TODO: Implement callback for {:?}", *callback))}
}
self.mode = WatcherMode::Input;
let rd_cmd = RenderCommand::SetMode(InputMode::Input);
self.renderer.apply(rd_cmd, Rect::default());
}
}
}
fn handle_arrow_key(&mut self, key: KeyCode) {
match key {
KeyCode::Up => {
match &mut self.mode {
&mut WatcherMode::Select { ref mut index, .. } => {
*index = index.saturating_sub(1);
}
_ => {}
}
}
KeyCode::Down => {
match &mut self.mode {
&mut WatcherMode::Select { ref mut index, ref content, ..} => {
if *index < content.len().saturating_sub(1) {
*index = index.saturating_add(1);
}
}
_ => {}
}
}
_ => {}
}
}
pub async fn log_memory() {
tokio::spawn(async move {
let id = format!("{}_{}", current_timestamp(), std::process::id());
let mut path = std::path::PathBuf::new();
path.push("./proc/");
path.push(id);
let mut mem_map = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.unwrap();
loop {
let _ = tokio::time::sleep(Duration::from_secs(10)).await;
if let Some(usage) = memory_stats() {
let current = current_timestamp();
let _ = mem_map.write_all(
msg!(
INFO,
"{}: Physical memory usage: {} MB",
current,
usage.physical_mem / 1024 / 1024
)
.as_bytes(),
);
let _ = mem_map.write_all(
msg!(
INFO,
"{}: Virtual memory usage: {} MB",
current,
usage.virtual_mem / 1024 / 1024
)
.as_bytes(),
);
}
}
});
}
pub async fn handle_event(&mut self, event: Event, area: Rect) -> io::Result<bool> {
match event {
Event::Mouse(event) => match event.kind {
MouseEventKind::ScrollUp => {
self.renderer.handle_scroll_up();
}
MouseEventKind::ScrollDown => {
self.renderer.handle_scroll_down();
}
MouseEventKind::Down(b) => match b {
MouseButton::Left => {
let rects = self.renderer.rects(area);
self
.renderer
.handle_mouse_click_left(event.column, event.row, rects);
}
_ => {}
},
_ => {}
},
Event::Key(k) if k.kind == KeyEventKind::Press => match k.code {
KeyCode::Esc => return Ok(false),
KeyCode::Char(c) => {
self.cmd_buffer.push(c);
self.renderer.handle_char_input(c)
}
KeyCode::Backspace => {
self.cmd_buffer.pop();
self.renderer.handle_backspace()
}
KeyCode::Enter => {
self.handle_enter().await;
}
KeyCode::Up | KeyCode::Down | KeyCode::Left | KeyCode::Right => {
self.handle_arrow_key(k.code);
self.renderer.handle_arrow_key(k.code)
}
_ => {}
},
Event::Paste(text) => {
log(msg!(DEBUG, "Received pasted text: {text}"));
self.renderer.render_string_to_focused(text);
}
_ => {}
}
Ok(true)
}
pub async fn poll(&mut self) -> Result<Event, ()> {
match self.event_stream.next().await {
Some(Ok(event)) => Ok(event),
Some(Err(e)) => Err(print_error_chain(&e.into())),
None => Err(()),
}
}
}