1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
/* This file is part of DarkFi (https://dark.fi)
 *
 * Copyright (C) 2020-2024 Dyne.org foundation
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <https://www.gnu.org/licenses/>.
 */

use std::{
    collections::{HashMap, HashSet},
    str::FromStr,
    sync::Arc,
};

use log::{error, info};
use smol::{lock::Mutex, stream::StreamExt};
use structopt_toml::{serde::Deserialize, structopt::StructOpt, StructOptToml};
use url::Url;

use darkfi::{
    async_daemonize,
    blockchain::{BlockInfo, HeaderHash},
    cli_desc,
    net::{settings::SettingsOpt, P2pPtr},
    rpc::{
        client::RpcChadClient,
        jsonrpc::JsonSubscriber,
        server::{listen_and_serve, RequestHandler},
    },
    system::{StoppableTask, StoppableTaskPtr},
    util::{encoding::base64, path::expand_path},
    validator::{Validator, ValidatorConfig, ValidatorPtr},
    Error, Result,
};
use darkfi_sdk::crypto::PublicKey;
use darkfi_serial::deserialize_async;

#[cfg(test)]
mod tests;

mod error;
use error::{server_error, RpcError};

/// JSON-RPC requests handler and methods
mod rpc;
mod rpc_blockchain;
mod rpc_tx;

/// Validator async tasks
mod task;
use task::{consensus_task, miner_task, sync_task};

/// P2P net protocols
mod proto;

/// Utility functions
mod utils;
use utils::{parse_blockchain_config, spawn_p2p};

const CONFIG_FILE: &str = "darkfid_config.toml";
const CONFIG_FILE_CONTENTS: &str = include_str!("../darkfid_config.toml");
/// Note:
/// If you change these don't forget to remove their corresponding database folder,
/// since if it already has a genesis block, provided one is ignored.
const GENESIS_BLOCK_LOCALNET: &str = include_str!("../genesis_block_localnet");
const GENESIS_BLOCK_TESTNET: &str = include_str!("../genesis_block_testnet");
const GENESIS_BLOCK_MAINNET: &str = include_str!("../genesis_block_mainnet");

#[derive(Clone, Debug, Deserialize, StructOpt, StructOptToml)]
#[serde(default)]
#[structopt(name = "darkfid", about = cli_desc!())]
struct Args {
    #[structopt(short, long)]
    /// Configuration file to use
    config: Option<String>,

    #[structopt(short, long, default_value = "tcp://127.0.0.1:8340")]
    /// JSON-RPC listen URL
    rpc_listen: Url,

    #[structopt(short, long, default_value = "testnet")]
    /// Blockchain network to use
    network: String,

    #[structopt(short, long)]
    /// Set log file to ouput into
    log: Option<String>,

    #[structopt(short, parse(from_occurrences))]
    /// Increase verbosity (-vvv supported)
    verbose: u8,
}

/// Defines a blockchain network configuration.
/// Default values correspond to a local network.
#[derive(Clone, Debug, serde::Deserialize, structopt::StructOpt, structopt_toml::StructOptToml)]
#[structopt()]
pub struct BlockchainNetwork {
    #[structopt(long, default_value = "~/.local/darkfi/darkfid_blockchain_localnet")]
    /// Path to blockchain database
    pub database: String,

    #[structopt(long, default_value = "3")]
    /// Finalization threshold, denominated by number of blocks
    pub threshold: usize,

    #[structopt(long, default_value = "tcp://127.0.0.1:28467")]
    /// minerd JSON-RPC endpoint
    pub minerd_endpoint: Url,

    #[structopt(long, default_value = "10")]
    /// PoW block production target, in seconds
    pub pow_target: usize,

    #[structopt(long)]
    /// Optional fixed PoW difficulty, used for testing
    pub pow_fixed_difficulty: Option<usize>,

    #[structopt(long)]
    /// Participate in block production
    pub miner: bool,

    #[structopt(long)]
    /// Wallet address to receive mining rewards
    pub recipient: Option<String>,

    #[structopt(long)]
    /// Skip syncing process and start node right away
    pub skip_sync: bool,

    #[structopt(long)]
    /// Disable transaction's fee verification, used for testing
    pub skip_fees: bool,

    #[structopt(long)]
    /// Optional sync checkpoint height
    pub checkpoint_height: Option<u32>,

    #[structopt(long)]
    /// Optional sync checkpoint hash
    pub checkpoint: Option<String>,

    /// P2P network settings
    #[structopt(flatten)]
    pub net: SettingsOpt,
}

/// Structure to hold a JSON-RPC client and its config,
/// so we can recreate it in case of an error.
pub struct MinerRpcCLient {
    endpoint: Url,
    ex: Arc<smol::Executor<'static>>,
    client: RpcChadClient,
}

impl MinerRpcCLient {
    pub async fn new(endpoint: Url, ex: Arc<smol::Executor<'static>>) -> Result<Self> {
        let client = RpcChadClient::new(endpoint.clone(), ex.clone()).await?;
        Ok(Self { endpoint, ex, client })
    }
}

/// Daemon structure
pub struct Darkfid {
    /// P2P network pointer
    p2p: P2pPtr,
    /// Validator(node) pointer
    validator: ValidatorPtr,
    /// Flag to specify node is a miner
    miner: bool,
    /// A map of various subscribers exporting live info from the blockchain
    subscribers: HashMap<&'static str, JsonSubscriber>,
    /// JSON-RPC connection tracker
    rpc_connections: Mutex<HashSet<StoppableTaskPtr>>,
    /// JSON-RPC client to execute requests to the miner daemon
    rpc_client: Option<Mutex<MinerRpcCLient>>,
}

impl Darkfid {
    pub async fn new(
        p2p: P2pPtr,
        validator: ValidatorPtr,
        miner: bool,
        subscribers: HashMap<&'static str, JsonSubscriber>,
        rpc_client: Option<Mutex<MinerRpcCLient>>,
    ) -> Self {
        Self {
            p2p,
            validator,
            miner,
            subscribers,
            rpc_connections: Mutex::new(HashSet::new()),
            rpc_client,
        }
    }
}

async_daemonize!(realmain);
async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
    info!(target: "darkfid", "Initializing DarkFi node...");

    // Grab blockchain network configuration
    let (blockchain_config, genesis_block) = match args.network.as_str() {
        "localnet" => {
            (parse_blockchain_config(args.config, "localnet").await?, GENESIS_BLOCK_LOCALNET)
        }
        "testnet" => {
            (parse_blockchain_config(args.config, "testnet").await?, GENESIS_BLOCK_TESTNET)
        }
        "mainnet" => {
            (parse_blockchain_config(args.config, "mainnet").await?, GENESIS_BLOCK_MAINNET)
        }
        _ => {
            error!("Unsupported chain `{}`", args.network);
            return Err(Error::UnsupportedChain)
        }
    };

    // Parse the genesis block
    let bytes = base64::decode(genesis_block.trim()).unwrap();
    let genesis_block: BlockInfo = deserialize_async(&bytes).await?;

    // Initialize or open sled database
    let db_path = expand_path(&blockchain_config.database)?;
    let sled_db = sled::open(&db_path)?;

    // Initialize validator configuration
    let pow_fixed_difficulty = if let Some(diff) = blockchain_config.pow_fixed_difficulty {
        info!(target: "darkfid", "Node is configured to run with fixed PoW difficulty: {}", diff);
        Some(diff.into())
    } else {
        None
    };

    let config = ValidatorConfig {
        finalization_threshold: blockchain_config.threshold,
        pow_target: blockchain_config.pow_target,
        pow_fixed_difficulty,
        genesis_block,
        verify_fees: !blockchain_config.skip_fees,
    };

    // Initialize validator
    let validator = Validator::new(&sled_db, config).await?;

    // Here we initialize various subscribers that can export live blockchain/consensus data.
    let mut subscribers = HashMap::new();
    subscribers.insert("blocks", JsonSubscriber::new("blockchain.subscribe_blocks"));
    subscribers.insert("txs", JsonSubscriber::new("blockchain.subscribe_txs"));
    subscribers.insert("proposals", JsonSubscriber::new("blockchain.subscribe_proposals"));

    // Initialize P2P network
    let p2p = spawn_p2p(&blockchain_config.net.into(), &validator, &subscribers, ex.clone()).await;

    // Initialize JSON-RPC client to perform requests to minerd
    let rpc_client = if blockchain_config.miner {
        let Ok(rpc_client) =
            MinerRpcCLient::new(blockchain_config.minerd_endpoint, ex.clone()).await
        else {
            error!(target: "darkfid", "Failed to initialize miner daemon rpc client, check if minerd is running");
            return Err(Error::RpcClientStopped)
        };
        Some(Mutex::new(rpc_client))
    } else {
        None
    };

    // Initialize node
    let darkfid =
        Darkfid::new(p2p.clone(), validator, blockchain_config.miner, subscribers, rpc_client)
            .await;
    let darkfid = Arc::new(darkfid);
    info!(target: "darkfid", "Node initialized successfully!");

    // Pinging minerd daemon to verify it listens
    if blockchain_config.miner {
        if let Err(e) = darkfid.ping_miner_daemon().await {
            error!(target: "darkfid", "Failed to ping miner daemon: {}", e);
            return Err(Error::RpcClientStopped)
        }
    }

    // JSON-RPC server
    info!(target: "darkfid", "Starting JSON-RPC server");
    // Here we create a task variable so we can manually close the
    // task later. P2P tasks don't need this since it has its own
    // stop() function to shut down, also terminating the task we
    // created for it.
    let rpc_task = StoppableTask::new();
    let darkfid_ = darkfid.clone();
    rpc_task.clone().start(
        listen_and_serve(args.rpc_listen, darkfid.clone(), None, ex.clone()),
        |res| async move {
            match res {
                Ok(()) | Err(Error::RpcServerStopped) => darkfid_.stop_connections().await,
                Err(e) => error!(target: "darkfid", "Failed starting sync JSON-RPC server: {}", e),
            }
        },
        Error::RpcServerStopped,
        ex.clone(),
    );

    info!(target: "darkfid", "Starting P2P network");
    p2p.clone().start().await?;

    // Sync blockchain
    if !blockchain_config.skip_sync {
        // Parse configured checkpoint
        if blockchain_config.checkpoint_height.is_some() && blockchain_config.checkpoint.is_none() {
            return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
        }

        let checkpoint = if let Some(height) = blockchain_config.checkpoint_height {
            Some((height, HeaderHash::from_str(&blockchain_config.checkpoint.unwrap())?))
        } else {
            None
        };

        sync_task(&darkfid, checkpoint).await?;
    } else {
        *darkfid.validator.synced.write().await = true;
    }

    // Consensus protocol
    info!(target: "darkfid", "Starting consensus protocol task");
    let consensus_task = if blockchain_config.miner {
        // Grab rewards recipient public key(address)
        if blockchain_config.recipient.is_none() {
            return Err(Error::ParseFailed("Recipient address missing"))
        }
        let recipient = match PublicKey::from_str(&blockchain_config.recipient.unwrap()) {
            Ok(address) => address,
            Err(_) => return Err(Error::InvalidAddress),
        };

        let task = StoppableTask::new();
        task.clone().start(
            miner_task(darkfid, recipient, blockchain_config.skip_sync, ex.clone()),
            |res| async move {
                match res {
                    Ok(()) | Err(Error::MinerTaskStopped) => { /* Do nothing */ }
                    Err(e) => error!(target: "darkfid", "Failed starting miner task: {}", e),
                }
            },
            Error::MinerTaskStopped,
            ex.clone(),
        );

        task
    } else {
        let task = StoppableTask::new();
        task.clone().start(
            consensus_task(darkfid, ex.clone()),
            |res| async move {
                match res {
                    Ok(()) | Err(Error::ConsensusTaskStopped) => { /* Do nothing */ }
                    Err(e) => error!(target: "darkfid", "Failed starting consensus task: {}", e),
                }
            },
            Error::ConsensusTaskStopped,
            ex.clone(),
        );

        task
    };

    // Signal handling for graceful termination.
    let (signals_handler, signals_task) = SignalHandler::new(ex)?;
    signals_handler.wait_termination(signals_task).await?;
    info!(target: "darkfid", "Caught termination signal, cleaning up and exiting...");

    info!(target: "darkfid", "Stopping JSON-RPC server...");
    rpc_task.stop().await;

    info!(target: "darkfid", "Stopping P2P network...");
    p2p.stop().await;

    info!(target: "darkfid", "Stopping consensus task...");
    consensus_task.stop().await;

    info!(target: "darkfid", "Flushing sled database...");
    let flushed_bytes = sled_db.flush_async().await?;
    info!(target: "darkfid", "Flushed {} bytes", flushed_bytes);

    Ok(())
}